tp自带的queue支持rabbitmq(升级版)

浏览:290 发布日期:2019/07/24 分类:功能实现 关键字: rabbitmq amqp queue
thinkphp【5.1】自带的queue【版本2.0】增加支持rabbitmq,支持消费者模式、拉取模式。
队列用法示例think-queue
https://packagist.org/packages/topthink/think-queue

需要php-amqplib
https://packagist.org/packages/php-amqplib/php-amqplib

---------------------------------
消费者模式:php think queue:amqp --queue SaveException2Es --daemon
备注:不带daemon是拉取模式。
---------------------------------
更新:
更新延迟队列的命名方式,同一个队列支持多个不同的延迟时间。
修复部分问题
---------------------------------

config/queue.phpreturn [
    'connector' => 'Amqp',
    'expire' => 60, 
    'default' => 'default',
    'host' => '127.0.0.1',
    'username' => 'guest',
    'password' => 'guest',
    'port' => 5672,
    'vhost' => '/',
    'select' => 0,
    'timeout' => 0,
    'persistent' => false, // 是否是长连接
];
vendor\topthink\think-queue\src\common.php\think\Console::addDefaultCommands([
    "think\\queue\\command\\Work",
    "think\\queue\\command\\Restart",
    "think\\queue\\command\\Listen",
    "think\\queue\\command\\Subscribe",
    "think\\queue\\command\\Amqp"
]);

if (!function_exists('queue')) {

    /**
     * 添加到队列
     * @param        $job
     * @param string $data
     * @param int    $delay
     * @param null   $queue
     */
    function queue($job, $data = '', $delay = 0, $queue = null)
    {
        if ($delay > 0) {
            \think\Queue::later($delay, $job, $data, $queue);
        } else {
            \think\Queue::push($job, $data, $queue);
        }
    }
}
vendor\topthink\think-queue\src\queue\Amqp.php<?php
// +----------------------------------------------------------------------
// | ThinkPHP [ WE CAN DO IT JUST THINK IT ]
// +----------------------------------------------------------------------
// | Copyright (c) 2006-2015 http://thinkphp.cn All rights reserved.
// +----------------------------------------------------------------------
// | Licensed ( http://www.apache.org/licenses/LICENSE-2.0 )
// +----------------------------------------------------------------------
// | Author: yunwuxin <448901948@qq.com>
// +----------------------------------------------------------------------

namespace think\queue;

use Exception;
use think\facade\Hook;
use think\Queue;
use think\queue\command\Amqp as AmqpCommand;

class Amqp
{

    public function getStartJob(AmqpCommand $amqcmd, $queue = null, $delay = 0, $sleep = 3, $maxTries = 0)
    {
        $job = Queue::consume($this, $amqcmd, $queue, $delay, $maxTries);
    }


    /**
     * 执行下个任务
     * @param  string $queue
     * @param  int    $delay
     * @param  int    $sleep
     * @param  int    $maxTries
     * @return array
     */
    public function pop($queue = null, $delay = 0, $sleep = 3, $maxTries = 0)
    {
        //处理获取错误问题,防止无限制获取
        try {
            $job = $this->getNextJob($queue);
        } catch (\Exception $e) {
            Hook::listen('worker_before_sleep', $queue);
            $this->sleep($sleep);
            //重新抛出错误
            throw $e;
            return ['job' => null, 'failed' => false];
        }

        if (!is_null($job)) {
            Hook::listen('worker_before_process', $queue);
            return $this->process($job, $maxTries, $delay);
        }

        Hook::listen('worker_before_sleep', $queue);
        $this->sleep($sleep);

        return ['job' => null, 'failed' => false];
    }

    /**
     * 获取下个任务
     * @param  string $queue
     * @return Job
     */
    protected function getNextJob($queue)
    {
        if (is_null($queue)) {
            return Queue::pop();
        }

        foreach (explode(',', $queue) as $queue) {
            if (!is_null($job = Queue::pop($queue))) {
                return $job;
            }
        }
    }

    /**
     * Process a given job from the queue.
     * @param \think\queue\Job $job
     * @param  int             $maxTries
     * @param  int             $delay
     * @return array
     * @throws Exception
     */
    public function process(Job $job, $maxTries = 0, $delay = 0)
    {
        if ($maxTries > 0 && $job->attempts() > $maxTries) {
            return $this->logFailedJob($job);
        }

        try {
            $job->fire();

            return ['job' => $job, 'failed' => false];
        } catch (Exception $e) {
            if (!$job->isDeleted()) {
                $job->release($delay);
            }

            throw $e;
        }
    }

    /**
     * Log a failed job into storage.
     * @param  \Think\Queue\Job $job
     * @return array
     */
    protected function logFailedJob(Job $job)
    {
        if (!$job->isDeleted()) {
            try {
                $job->delete();
                $job->failed();
            } finally {
                Hook::listen('queue_failed', $job);
            }
        }

        return ['job' => $job, 'failed' => true];
    }

    /**
     * Sleep the script for a given number of seconds.
     * @param  int $seconds
     * @return void
     */
    public function sleep($seconds)
    {
        sleep($seconds);
    }

}
vendor\topthink\think-queue\src\queue\command\Amqp.php<?php
// +----------------------------------------------------------------------
// | ThinkPHP [ WE CAN DO IT JUST THINK IT ]
// +----------------------------------------------------------------------
// | Copyright (c) 2006-2015 http://thinkphp.cn All rights reserved.
// +----------------------------------------------------------------------
// | Licensed ( http://www.apache.org/licenses/LICENSE-2.0 )
// +----------------------------------------------------------------------
// | Author: yunwuxin <448901948@qq.com>
// +----------------------------------------------------------------------
namespace think\queue\command;

use Exception;
use think\console\Command;
use think\console\Input;
use think\console\input\Option;
use think\console\Output;
use think\exception\Handle;
use think\exception\ThrowableError;
use think\facade\Cache;
use think\facade\Config;
use think\facade\Hook;
use think\queue\Job;
use think\queue\Amqp as AmqpQueue;
use Throwable;

class Amqp extends Command
{

    /**
     * The queue worker instance.
     * @var \think\queue\Amqp
     */
    protected $amqp;
    protected $memory;
    protected $lastRestart;

    protected function initialize(Input $input, Output $output)
    {
        $this->amqp = new AmqpQueue();
    }

    protected function configure()
    {
        $this->setName('queue:amqp')
            ->addOption('queue', null, Option::VALUE_OPTIONAL, 'The queue to listen on')
            ->addOption('daemon', null, Option::VALUE_NONE, 'Run the worker in daemon mode')
            ->addOption('delay', null, Option::VALUE_OPTIONAL, 'Amount of time to delay failed jobs', 0)
            ->addOption('force', null, Option::VALUE_NONE, 'Force the worker to run even in maintenance mode')
            ->addOption('memory', null, Option::VALUE_OPTIONAL, 'The memory limit in megabytes', 128)
            ->addOption('sleep', null, Option::VALUE_OPTIONAL, 'Number of seconds to sleep when no job is available', 3)
            ->addOption('tries', null, Option::VALUE_OPTIONAL, 'Number of times to attempt a job before logging it failed', 0)
            ->setDescription('Process the next job on a queue');
    }

    /**
     * Execute the console command.
     * @param Input  $input
     * @param Output $output
     * @return int|null|void
     */
    public function execute(Input $input, Output $output)
    {
        $queue = $input->getOption('queue');

        $delay = $input->getOption('delay');

        $this->memory = $memory = $input->getOption('memory');

        if ($input->getOption('daemon')) {
            Hook::listen('worker_daemon_start', $queue);
            $this->daemon(
                $queue, $delay, $memory,
                $input->getOption('sleep'), $input->getOption('tries')
            );
        } else {
            $response = $this->amqp->pop($queue, $delay, $input->getOption('sleep'), $input->getOption('tries'));
            $this->output($response);
        }
    }

    protected function output($response)
    {
        if (!is_null($response['job'])) {
            /** @var Job $job */
            $job = $response['job'];
            if ($response['failed']) {
                $this->output->writeln('<error>Failed:</error> ' . $job->getName());
            } else {
                $this->output->writeln('<info>Processed:</info> ' . $job->getName());
            }
        }
    }

    /**
     * 启动一个守护进程执行任务.
     *
     * @param  string $queue
     * @param  int    $delay
     * @param  int    $memory
     * @param  int    $sleep
     * @param  int    $maxTries
     * @return array
     */
    protected function daemon($queue = null, $delay = 0, $memory = 128, $sleep = 3, $maxTries = 0)
    {
        $this->lastRestart = $this->getTimestampOfLastQueueRestart();

         do {
             $this->runNextJobForDaemon(
                 $queue, $delay, $sleep, $maxTries
             );

             if ($this->memoryExceeded($memory)) {
                 Hook::listen('worker_memory_exceeded', $queue);
                 $this->stop();
             }

             if ($this->queueShouldRestart($lastRestart)) {
                 Hook::listen('worker_queue_restart', $queue);
                 $this->stop();
             }
         } while (false);
    }

    public function checkDaemon($queue, $response)
    {
        if ($this->memoryExceeded($this->memory)) {
            Hook::listen('worker_memory_exceeded', $queue);
            $this->stop();
        }

        if ($this->queueShouldRestart($this->lastRestart)) {
            Hook::listen('worker_queue_restart', $queue);
            $this->stop();
        }

        $this->output($response);
    }

    /**
     * 以守护进程的方式执行下个任务.
     *
     * @param  string $queue
     * @param  int    $delay
     * @param  int    $sleep
     * @param  int    $maxTries
     * @return void
     */
    protected function runNextJobForDaemon($queue, $delay, $sleep, $maxTries)
    {
        try {
            $response = $this->amqp->getStartJob($this, $queue, $delay, $sleep, $maxTries);
            //$response = $this->amqp->pop($queue, $delay, $sleep, $maxTries);
            //$this->output($response);
        } catch (Exception $e) {
            $this->getExceptionHandler()->report($e);
        } catch (Throwable $e) {
            $this->getExceptionHandler()->report(new ThrowableError($e));
        }
    }

    /**
     * 获取上次重启守护进程的时间
     *
     * @return int|null
     */
    protected function getTimestampOfLastQueueRestart()
    {
        return Cache::get('think:queue:restart');
    }

    /**
     * 检查是否要重启守护进程
     *
     * @param  int|null $lastRestart
     * @return bool
     */
    protected function queueShouldRestart($lastRestart)
    {
        return $this->getTimestampOfLastQueueRestart() != $lastRestart;
    }

    /**
     * 检查内存是否超出
     * @param  int $memoryLimit
     * @return bool
     */
    protected function memoryExceeded($memoryLimit)
    {
        return (memory_get_usage() / 1024 / 1024) >= $memoryLimit;
    }

    /**
     * 获取异常处理实例
     *
     * @return \think\exception\Handle
     */
    protected function getExceptionHandler()
    {
        static $handle;

        if (!$handle) {

            if ($class = Config::get('exception_handle')) {
                if (class_exists($class) && is_subclass_of($class, "\\think\\exception\\Handle")) {
                    $handle = new $class;
                }
            }
            if (!$handle) {
                $handle = new Handle();
            }
        }

        return $handle;
    }

    /**
     * 停止执行任务的守护进程.
     * @return void
     */
    public function stop()
    {
        die;
    }

}
vendor\topthink\think-queue\src\queue\connector\Amqp.php<?php
// +----------------------------------------------------------------------
// | ThinkPHP [ WE CAN DO IT JUST THINK IT ]
// +----------------------------------------------------------------------
// | Copyright (c) 2006-2015 http://thinkphp.cn All rights reserved.
// +----------------------------------------------------------------------
// | Licensed ( http://www.apache.org/licenses/LICENSE-2.0 )
// +----------------------------------------------------------------------
// | Author: yunwuxin <448901948@qq.com>
// +----------------------------------------------------------------------

namespace think\queue\connector;

use Exception;
use think\helper\Str;
use think\queue\Connector;
use think\queue\job\Amqp as AmqpJob;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;
use think\queue\command\Amqp as AmqpCommand;
use think\queue\Amqp as AmqpQueue;


class Amqp extends Connector
{
    protected $connection;
    protected $channel;


    protected $options = [
        'expire'     => 60,
        'host'       => '127.0.0.1',
        'port'       => 5672,
        'username'   => 'guest',
        'password'   => 'guest',
        'timeout'    => 0
    ];

    public function __construct(array $options)
    {
        if (!extension_loaded('sockets')) {
            throw new Exception('sockets扩展未安装');
        }
        if (!empty($options)) {
            $this->options = array_merge($this->options, $options);
        }

        $this->connection = new AMQPStreamConnection($this->options['host'], $this->options['port'], $this->options['username'], $this->options['password']);
        $this->channel = $this->connection->channel();
    }

    public function __destruct() 
    {
        $this->channel->close();
        $this->connection->close();
    }

    public function push($job, $data = '', $queue = null)
    {
        return $this->pushRaw($this->createPayload($job, $data), $queue);
    }

    public function later($delay, $job, $data = '', $queue = null)
    {
        $payload = $this->createPayload($job, $data);

        $queue_name = $this->getQueue($queue);

        if (empty($delay)) {
            $delay = 5 * 1000;
        }else{
            $delay = $delay * 1000;
        }

        $topic_name = $queue_name.'_topic';
        $queue_name_delay = $queue_name.$delay.'_delay';
        $topic_name_delay = $queue_name.$delay.'_topic_delay';
        

        $this->channel->exchange_declare($topic_name, 'topic', false, true, false);
        $this->channel->queue_declare($queue_name, false, true, false, false);
        $this->channel->queue_bind($queue_name, $topic_name);

        $option = new AMQPTable();
        $option->set('x-message-ttl', $delay);
        $option->set('x-dead-letter-exchange', $topic_name);
        //$option->set('x-dead-letter-routing-key','routing_key');

        $this->channel->exchange_declare($topic_name_delay, 'topic', false, true, false);
        $this->channel->queue_declare($queue_name_delay, false, true, false, false, false, $option);
        $this->channel->queue_bind($queue_name_delay, $topic_name_delay);

        $msg = new AMQPMessage($payload);
        $this->channel->basic_publish($msg, $topic_name_delay);
    }

    //拉取模式,默认模式
    public function pop($queue = null)
    {
        $queue_name = $this->getQueue($queue);
        $topic_name = $queue_name.'_topic';
        $this->channel->exchange_declare($topic_name, 'topic', false, true, false);
        $this->channel->queue_declare($queue_name, false, true, false, false);
        $this->channel->queue_bind($queue_name, $topic_name);

        //拉取模式
        $msg = $this->channel->basic_get($queue_name,false);
        if (!empty($msg)) {
            $job = $msg->body;
            $this->channel->basic_ack($msg->delivery_info['delivery_tag']);

            return new AmqpJob($this, $job, $queue);
        }

    }

    //消费者模式,amqp模式下
    //echo date('Y-m-d H:i:s')." [x] Received",$msg->body,PHP_EOL;
    public function consume(AmqpQueue $work, AmqpCommand $amqcmd, $queue = null, $delay = 0, $maxTries = 0)
    {    echo date('Y-m-d H:i:s')." [x] Received-","consume",PHP_EOL;
         $queue_name = $this->getQueue($queue);
         $topic_name = $queue_name.'_topic';
         $this->channel->exchange_declare($topic_name, 'topic', false, true, false);
         $this->channel->queue_declare($queue_name, false, true, false, false);
         $this->channel->queue_bind($queue_name, $topic_name);

         $_this = $this;
         //消费者模式
         $callback = function($msg) use ($_this, $work, $amqcmd, $queue, $delay, $maxTries){
             //echo date('Y-m-d H:i:s')." [x] Received",$msg->body,PHP_EOL;

             $amqpJob = new AmqpJob($_this, $msg->body, $queue);
             $result = $work->process($amqpJob, $maxTries, $delay);
             $amqcmd->checkDaemon($queue, $result);

             $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
         };

         //只有consumer已经处理并确认了上一条message时queue才分派新的message给它
         $this->channel->basic_qos(0, 1, false);
         $this->channel->basic_consume($queue_name,'',false,false,false,false, $callback);
         while (count($this->channel->callbacks)) {
             $this->channel->wait();
         }
    }

    /**
     * 重新发布任务
     *
     * @param  string $queue
     * @param  string $payload
     * @param  int    $delay
     * @param  int    $attempts
     * @return void
     */
    public function release($queue, $payload, $delay, $attempts)
    {
        $payload = $this->setMeta($payload, 'attempts', $attempts);

        $queue_name = $this->getQueue($queue);

        if (empty($delay)) {
            $delay = 5 * 1000;
        }else{
            $delay = $delay * 1000;
        }

        $queue_name_delay = $queue_name.$delay.'_delay';
        $topic_name_delay = $queue_name.$delay.'_topic_delay';

        

        $option = new AMQPTable();
        $option->set('x-message-ttl', $delay);
        $option->set('x-dead-letter-exchange', $queue_name.'_topic');
        //$option->set('x-dead-letter-routing-key','routing_key');

        $this->channel->exchange_declare($topic_name_delay, 'topic', false, true, false);
        $this->channel->queue_declare($queue_name_delay, false, true, false, false, false, $option);
        $this->channel->queue_bind($queue_name_delay, $topic_name_delay);

        $msg = new AMQPMessage($payload);
        $this->channel->basic_publish($msg, $topic_name_delay);

    }

    public function pushRaw($payload, $queue = null)
    {
        $queue_name = $this->getQueue($queue);
        $topic_name = $queue_name.'_topic';

        $this->channel->exchange_declare($topic_name, 'topic', false, true, false);
        $this->channel->queue_declare($queue_name, false, true, false, false);
        $this->channel->queue_bind($queue_name, $topic_name);
        
        $msg = new AMQPMessage($payload);
        $this->channel->basic_publish($msg, $topic_name);

        //测试延迟队列
        //$this->release($queue, $payload, 0, 2);

        return json_decode($payload, true)['id'];
    }

    protected function createPayload($job, $data = '', $queue = null)
    {
        $payload = $this->setMeta(
            parent::createPayload($job, $data), 'id', $this->getRandomId()
        );

        return $this->setMeta($payload, 'attempts', 1);
    }

    /**
     * 删除任务
     *
     * @param  string $queue
     * @param  string $job
     * @return void
     */
    public function deleteReserved($queue, $job)
    {
        
    }


    /**
     * 随机id
     *
     * @return string
     */
    protected function getRandomId()
    {
        return Str::random(32);
    }

    /**
     * 获取队列名
     *
     * @param  string|null $queue
     * @return string
     */
    protected function getQueue($queue)
    {
        return 'queues_' . ($queue ?: $this->options['default']);
    }
}
vendor\topthink\think-queue\src\queue\job\Amqp.php<?php
// +----------------------------------------------------------------------
// | ThinkPHP [ WE CAN DO IT JUST THINK IT ]
// +----------------------------------------------------------------------
// | Copyright (c) 2006-2015 http://thinkphp.cn All rights reserved.
// +----------------------------------------------------------------------
// | Licensed ( http://www.apache.org/licenses/LICENSE-2.0 )
// +----------------------------------------------------------------------
// | Author: yunwuxin <448901948@qq.com>
// +----------------------------------------------------------------------

namespace think\queue\job;

use think\queue\Job;
use think\queue\connector\Amqp as AmqpQueue;

class Amqp extends Job
{

    /**
     * The redis queue instance.
     * @var RedisQueue
     */
    protected $amqp;

    /**
     * The database job payload.
     * @var Object
     */
    protected $job;

    public function __construct(AmqpQueue $amqp, $job, $queue)
    {   
        $this->job   = $job;
        $this->queue = $queue;
        $this->amqp = $amqp;
    }

    /**
     * Fire the job.
     * @return void
     */
    public function fire()
    {
        $this->resolveAndFire(json_decode($this->getRawBody(), true));
    }

    /**
     * Get the number of times the job has been attempted.
     * @return int
     */
    public function attempts()
    {
        return json_decode($this->job, true)['attempts'];
    }

    /**
     * Get the raw body string for the job.
     * @return string
     */
    public function getRawBody()
    {
        return $this->job;
    }

    /**
     * 删除任务
     *
     * @return void
     */
    public function delete()
    {
        parent::delete();

        $this->amqp->deleteReserved($this->queue, $this->job);
    }

    /**
     * 重新发布任务
     *
     * @param  int $delay
     * @return void
     */
    public function release($delay = 0)
    {
        parent::release($delay);

        $this->delete();

        $this->amqp->release($this->queue, $this->job, $delay, $this->attempts() + 1);
    }
}
修改了获取异常,防止无限制获取的问题
vendor\topthink\think-queue\src\queue\Worker.php<?php
// +----------------------------------------------------------------------
// | ThinkPHP [ WE CAN DO IT JUST THINK IT ]
// +----------------------------------------------------------------------
// | Copyright (c) 2006-2015 http://thinkphp.cn All rights reserved.
// +----------------------------------------------------------------------
// | Licensed ( http://www.apache.org/licenses/LICENSE-2.0 )
// +----------------------------------------------------------------------
// | Author: yunwuxin <448901948@qq.com>
// +----------------------------------------------------------------------

namespace think\queue;

use Exception;
use think\facade\Hook;
use think\Queue;

class Worker
{

    /**
     * 执行下个任务
     * @param  string $queue
     * @param  int    $delay
     * @param  int    $sleep
     * @param  int    $maxTries
     * @return array
     */
    public function pop($queue = null, $delay = 0, $sleep = 3, $maxTries = 0)
    {
        //处理获取错误问题,防止无限制获取
        try {
            $job = $this->getNextJob($queue);
        } catch (\Exception $e) {
            Hook::listen('worker_before_sleep', $queue);
            $this->sleep($sleep);
            //重新抛出错误
            throw $e;
            return ['job' => null, 'failed' => false];
        }

        if (!is_null($job)) {
            Hook::listen('worker_before_process', $queue);
            return $this->process($job, $maxTries, $delay);
        }

        Hook::listen('worker_before_sleep', $queue);
        $this->sleep($sleep);

        return ['job' => null, 'failed' => false];
    }

    /**
     * 获取下个任务
     * @param  string $queue
     * @return Job
     */
    protected function getNextJob($queue)
    {
        if (is_null($queue)) {
            return Queue::pop();
        }

        foreach (explode(',', $queue) as $queue) {
            if (!is_null($job = Queue::pop($queue))) {
                return $job;
            }
        }
    }

    /**
     * Process a given job from the queue.
     * @param \think\queue\Job $job
     * @param  int             $maxTries
     * @param  int             $delay
     * @return array
     * @throws Exception
     */
    public function process(Job $job, $maxTries = 0, $delay = 0)
    {
        if ($maxTries > 0 && $job->attempts() > $maxTries) {
            return $this->logFailedJob($job);
        }

        try {
            $job->fire();

            return ['job' => $job, 'failed' => false];
        } catch (Exception $e) {
            if (!$job->isDeleted()) {
                $job->release($delay);
            }

            throw $e;
        }
    }

    /**
     * Log a failed job into storage.
     * @param  \Think\Queue\Job $job
     * @return array
     */
    protected function logFailedJob(Job $job)
    {
        if (!$job->isDeleted()) {
            try {
                $job->delete();
                $job->failed();
            } finally {
                Hook::listen('queue_failed', $job);
            }
        }

        return ['job' => $job, 'failed' => true];
    }

    /**
     * Sleep the script for a given number of seconds.
     * @param  int $seconds
     * @return void
     */
    public function sleep($seconds)
    {
        sleep($seconds);
    }

}
Worker.php和mq队列没有关系。
还在测试阶段有问题留言反馈。

附件 think-queue.zip ( 37.14 KB 下载:5 次 )

评论( 相关
后面还有条评论,点击查看>>