tp自带的queue支持rabbitmq

浏览:6177 发布日期:2019/07/19 分类:功能实现 关键字: rabbitmq amqp queue
thinkphp【5.1】自带的queue【版本2.0】增加支持rabbitmq
----------------------------
更新
修复延时消息投递的问题。
增加消费者模式http://www.thinkphp.cn/code/7131.html
-----------------------------
队列用法示例think-queue
https://packagist.org/packages/topthink/think-queue

需要php-amqplib
https://packagist.org/packages/php-amqplib/php-amqplib

config/queue.phpreturn [
    'connector' => 'Amqp',
    'expire' => 60, 
    'default' => 'default',
    'host' => '127.0.0.1',
    'username' => 'guest',
    'password' => 'guest',
    'port' => 5672,
    'vhost' => '/',
    'select' => 0,
    'timeout' => 0,
    'persistent' => false, // 是否是长连接
];
vendor\topthink\think-queue\src\queue\connector\Amqp.php<?php
// +----------------------------------------------------------------------
// | ThinkPHP [ WE CAN DO IT JUST THINK IT ]
// +----------------------------------------------------------------------
// | Copyright (c) 2006-2015 http://thinkphp.cn All rights reserved.
// +----------------------------------------------------------------------
// | Licensed ( http://www.apache.org/licenses/LICENSE-2.0 )
// +----------------------------------------------------------------------
// | Author: yunwuxin <448901948@qq.com>
// +----------------------------------------------------------------------

namespace think\queue\connector;

use Exception;
use think\helper\Str;
use think\queue\Connector;
use think\queue\job\Amqp as AmqpJob;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;


class Amqp extends Connector
{
    protected $connection;
    protected $channel;


    protected $options = [
        'expire'     => 60,
        'host'       => '127.0.0.1',
        'port'       => 5672,
        'username'   => 'guest',
        'password'   => 'guest',
        'timeout'    => 0
    ];

    public function __construct(array $options)
    {
        if (!extension_loaded('sockets')) {
            throw new Exception('sockets扩展未安装');
        }
        if (!empty($options)) {
            $this->options = array_merge($this->options, $options);
        }

        $this->connection = new AMQPStreamConnection($this->options['host'], $this->options['port'], $this->options['username'], $this->options['password']);
        $this->channel = $this->connection->channel();
    }

    public function __destruct() 
    {
        $this->channel->close();
        $this->connection->close();
    }

    public function push($job, $data = '', $queue = null)
    {
        return $this->pushRaw($this->createPayload($job, $data), $queue);
    }

    public function later($delay, $job, $data = '', $queue = null)
    {
        $payload = $this->createPayload($job, $data);

        $queue_name = $this->getQueue($queue);
        $topic_name = $queue_name.'_topic';
        $queue_name_delay = $queue_name.'_delay';
        $topic_name_delay = $queue_name.'_topic_delay';
        
        if (empty($delay)) {
            $delay = 5 * 1000;
        }else{
            $delay = $delay * 1000;
        }

        $this->channel->exchange_declare($topic_name, 'topic', false, true, true);
        $this->channel->queue_declare($queue_name, false, true, false, true);
        $this->channel->queue_bind($queue_name, $topic_name);
        
        $option = new AMQPTable();
        $option->set('x-message-ttl', $delay);
        $option->set('x-dead-letter-exchange', $topic_name);
        //$option->set('x-dead-letter-routing-key','routing_key');
        
        $this->channel->exchange_declare($topic_name_delay, 'topic', false, true, true);
        $this->channel->queue_declare($queue_name_delay, false, true, false, true, false, $option);
        $this->channel->queue_bind($queue_name_delay, $topic_name_delay);
        
        $msg = new AMQPMessage($payload);
        $this->channel->basic_publish($msg, $topic_name_delay);
    }

    public function pop($queue = null)
    {
        $queue_name = $this->getQueue($queue);
        $topic_name = $queue_name.'_topic';
        $this->channel->exchange_declare($topic_name, 'topic', false, true, true);
        $this->channel->queue_declare($queue_name, false, true, false, true);
        $this->channel->queue_bind($queue_name, $topic_name);

        //拉取模式
        $msg = $this->channel->basic_get($queue_name,false);
        if (!empty($msg)) {
            $job = $msg->body;
            $this->channel->basic_ack($msg->delivery_info['delivery_tag']);

            return new AmqpJob($this, $job, $queue);
        }


        //消费者模式,暂不兼容
        // $callback = function($msg) use ($original){
        //    echo date('Y-m-d H:i:s')." [x] Received",$msg->body,PHP_EOL;
        //    $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
        //    $job = $msg->body;
        //    return new AmqpJob($this, $job, $original);
        // };
        //只有consumer已经处理并确认了上一条message时queue才分派新的message给它
        // $this->channel->basic_qos(0, 1, false);
        // $this->channel->basic_consume($queue_name,'',false,false,false,false, $callback);
        // while (count($this->channel->callbacks)) {
        //     $this->channel->wait();
        // }

    }

    /**
     * 重新发布任务
     *
     * @param  string $queue
     * @param  string $payload
     * @param  int    $delay
     * @param  int    $attempts
     * @return void
     */
    public function release($queue, $payload, $delay, $attempts)
    {
        $payload = $this->setMeta($payload, 'attempts', $attempts);

        $queue_name = $this->getQueue($queue);
        $queue_name_delay = $queue_name.'_delay';
        $topic_name_delay = $queue_name.'_topic_delay';
        
        if (empty($delay)) {
            $delay = 5 * 1000;
        }else{
            $delay = $delay * 1000;
        }
        
        $option = new AMQPTable();
        $option->set('x-message-ttl', $delay);
        $option->set('x-dead-letter-exchange', $queue_name.'_topic');
        //$option->set('x-dead-letter-routing-key','routing_key');
        
        $this->channel->exchange_declare($topic_name_delay, 'topic', false, true, true);
        $this->channel->queue_declare($queue_name_delay, false, true, false, true, false, $option);
        $this->channel->queue_bind($queue_name_delay, $topic_name_delay);
        
        $msg = new AMQPMessage($payload);
        $this->channel->basic_publish($msg, $topic_name_delay);

    }

    public function pushRaw($payload, $queue = null)
    {
        $queue_name = $this->getQueue($queue);
        $topic_name = $queue_name.'_topic';

        $this->channel->exchange_declare($topic_name, 'topic', false, true, true);
        $this->channel->queue_declare($queue_name, false, true, false, true);
        $this->channel->queue_bind($queue_name, $topic_name);
        
        $msg = new AMQPMessage($payload);
        $this->channel->basic_publish($msg, $topic_name);

        //测试延迟队列
        //$this->release($queue, $payload, 0, 2);

        return json_decode($payload, true)['id'];
    }

    protected function createPayload($job, $data = '', $queue = null)
    {
        $payload = $this->setMeta(
            parent::createPayload($job, $data), 'id', $this->getRandomId()
        );

        return $this->setMeta($payload, 'attempts', 1);
    }

    /**
     * 删除任务
     *
     * @param  string $queue
     * @param  string $job
     * @return void
     */
    public function deleteReserved($queue, $job)
    {
        
    }


    /**
     * 随机id
     *
     * @return string
     */
    protected function getRandomId()
    {
        return Str::random(32);
    }

    /**
     * 获取队列名
     *
     * @param  string|null $queue
     * @return string
     */
    protected function getQueue($queue)
    {
        return 'queues_' . ($queue ?: $this->options['default']);
    }
}
vendor\topthink\think-queue\src\queue\job\Amqp.php<?php
// +----------------------------------------------------------------------
// | ThinkPHP [ WE CAN DO IT JUST THINK IT ]
// +----------------------------------------------------------------------
// | Copyright (c) 2006-2015 http://thinkphp.cn All rights reserved.
// +----------------------------------------------------------------------
// | Licensed ( http://www.apache.org/licenses/LICENSE-2.0 )
// +----------------------------------------------------------------------
// | Author: yunwuxin <448901948@qq.com>
// +----------------------------------------------------------------------

namespace think\queue\job;

use think\queue\Job;
use think\queue\connector\Amqp as AmqpQueue;

class Amqp extends Job
{

    /**
     * The redis queue instance.
     * @var RedisQueue
     */
    protected $amqp;

    /**
     * The database job payload.
     * @var Object
     */
    protected $job;

    public function __construct(AmqpQueue $amqp, $job, $queue)
    {   
        $this->job   = $job;
        $this->queue = $queue;
        $this->amqp = $amqp;
    }

    /**
     * Fire the job.
     * @return void
     */
    public function fire()
    {
        $this->resolveAndFire(json_decode($this->getRawBody(), true));
    }

    /**
     * Get the number of times the job has been attempted.
     * @return int
     */
    public function attempts()
    {
        return json_decode($this->job, true)['attempts'];
    }

    /**
     * Get the raw body string for the job.
     * @return string
     */
    public function getRawBody()
    {
        return $this->job;
    }

    /**
     * 删除任务
     *
     * @return void
     */
    public function delete()
    {
        parent::delete();

        $this->amqp->deleteReserved($this->queue, $this->job);
    }

    /**
     * 重新发布任务
     *
     * @param  int $delay
     * @return void
     */
    public function release($delay = 0)
    {
        parent::release($delay);

        $this->delete();

        $this->amqp->release($this->queue, $this->job, $delay, $this->attempts() + 1);
    }
}
修改了获取异常,防止无限制获取的问题
vendor\topthink\think-queue\src\queue\Worker.php<?php
// +----------------------------------------------------------------------
// | ThinkPHP [ WE CAN DO IT JUST THINK IT ]
// +----------------------------------------------------------------------
// | Copyright (c) 2006-2015 http://thinkphp.cn All rights reserved.
// +----------------------------------------------------------------------
// | Licensed ( http://www.apache.org/licenses/LICENSE-2.0 )
// +----------------------------------------------------------------------
// | Author: yunwuxin <448901948@qq.com>
// +----------------------------------------------------------------------

namespace think\queue;

use Exception;
use think\facade\Hook;
use think\Queue;

class Worker
{

    /**
     * 执行下个任务
     * @param  string $queue
     * @param  int    $delay
     * @param  int    $sleep
     * @param  int    $maxTries
     * @return array
     */
    public function pop($queue = null, $delay = 0, $sleep = 3, $maxTries = 0)
    {
        //处理获取错误问题,防止无限制获取
        try {
            $job = $this->getNextJob($queue);
        } catch (\Exception $e) {
            Hook::listen('worker_before_sleep', $queue);
            $this->sleep($sleep);
            //重新抛出错误
            throw $e;
            return ['job' => null, 'failed' => false];
        }

        if (!is_null($job)) {
            Hook::listen('worker_before_process', $queue);
            return $this->process($job, $maxTries, $delay);
        }

        Hook::listen('worker_before_sleep', $queue);
        $this->sleep($sleep);

        return ['job' => null, 'failed' => false];
    }

    /**
     * 获取下个任务
     * @param  string $queue
     * @return Job
     */
    protected function getNextJob($queue)
    {
        if (is_null($queue)) {
            return Queue::pop();
        }

        foreach (explode(',', $queue) as $queue) {
            if (!is_null($job = Queue::pop($queue))) {
                return $job;
            }
        }
    }

    /**
     * Process a given job from the queue.
     * @param \think\queue\Job $job
     * @param  int             $maxTries
     * @param  int             $delay
     * @return array
     * @throws Exception
     */
    public function process(Job $job, $maxTries = 0, $delay = 0)
    {
        if ($maxTries > 0 && $job->attempts() > $maxTries) {
            return $this->logFailedJob($job);
        }

        try {
            $job->fire();

            return ['job' => $job, 'failed' => false];
        } catch (Exception $e) {
            if (!$job->isDeleted()) {
                $job->release($delay);
            }

            throw $e;
        }
    }

    /**
     * Log a failed job into storage.
     * @param  \Think\Queue\Job $job
     * @return array
     */
    protected function logFailedJob(Job $job)
    {
        if (!$job->isDeleted()) {
            try {
                $job->delete();
                $job->failed();
            } finally {
                Hook::listen('queue_failed', $job);
            }
        }

        return ['job' => $job, 'failed' => true];
    }

    /**
     * Sleep the script for a given number of seconds.
     * @param  int $seconds
     * @return void
     */
    public function sleep($seconds)
    {
        sleep($seconds);
    }

}
Worker.php和mq队列没有关系。
使用的是拉取模式,效率上和消费者模式还是差点。
还在测试阶段有问题留言反馈。

附件 think-queue.zip ( 32.65 KB 下载:52 次 )

评论( 相关
后面还有条评论,点击查看>>