----------------------------
更新:
修复延时消息投递的问题。
增加消费者模式http://www.thinkphp.cn/code/7131.html
-----------------------------
队列用法示例think-queue
https://packagist.org/packages/topthink/think-queue
需要php-amqplib
https://packagist.org/packages/php-amqplib/php-amqplib
config/queue.php
return [
'connector' => 'Amqp',
'expire' => 60,
'default' => 'default',
'host' => '127.0.0.1',
'username' => 'guest',
'password' => 'guest',
'port' => 5672,
'vhost' => '/',
'select' => 0,
'timeout' => 0,
'persistent' => false, // 是否是长连接
];
vendor\topthink\think-queue\src\queue\connector\Amqp.php<?php
// +----------------------------------------------------------------------
// | ThinkPHP [ WE CAN DO IT JUST THINK IT ]
// +----------------------------------------------------------------------
// | Copyright (c) 2006-2015 http://thinkphp.cn All rights reserved.
// +----------------------------------------------------------------------
// | Licensed ( http://www.apache.org/licenses/LICENSE-2.0 )
// +----------------------------------------------------------------------
// | Author: yunwuxin <448901948@qq.com>
// +----------------------------------------------------------------------
namespace think\queue\connector;
use Exception;
use think\helper\Str;
use think\queue\Connector;
use think\queue\job\Amqp as AmqpJob;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;
class Amqp extends Connector
{
protected $connection;
protected $channel;
protected $options = [
'expire' => 60,
'host' => '127.0.0.1',
'port' => 5672,
'username' => 'guest',
'password' => 'guest',
'timeout' => 0
];
public function __construct(array $options)
{
if (!extension_loaded('sockets')) {
throw new Exception('sockets扩展未安装');
}
if (!empty($options)) {
$this->options = array_merge($this->options, $options);
}
$this->connection = new AMQPStreamConnection($this->options['host'], $this->options['port'], $this->options['username'], $this->options['password']);
$this->channel = $this->connection->channel();
}
public function __destruct()
{
$this->channel->close();
$this->connection->close();
}
public function push($job, $data = '', $queue = null)
{
return $this->pushRaw($this->createPayload($job, $data), $queue);
}
public function later($delay, $job, $data = '', $queue = null)
{
$payload = $this->createPayload($job, $data);
$queue_name = $this->getQueue($queue);
$topic_name = $queue_name.'_topic';
$queue_name_delay = $queue_name.'_delay';
$topic_name_delay = $queue_name.'_topic_delay';
if (empty($delay)) {
$delay = 5 * 1000;
}else{
$delay = $delay * 1000;
}
$this->channel->exchange_declare($topic_name, 'topic', false, true, true);
$this->channel->queue_declare($queue_name, false, true, false, true);
$this->channel->queue_bind($queue_name, $topic_name);
$option = new AMQPTable();
$option->set('x-message-ttl', $delay);
$option->set('x-dead-letter-exchange', $topic_name);
//$option->set('x-dead-letter-routing-key','routing_key');
$this->channel->exchange_declare($topic_name_delay, 'topic', false, true, true);
$this->channel->queue_declare($queue_name_delay, false, true, false, true, false, $option);
$this->channel->queue_bind($queue_name_delay, $topic_name_delay);
$msg = new AMQPMessage($payload);
$this->channel->basic_publish($msg, $topic_name_delay);
}
public function pop($queue = null)
{
$queue_name = $this->getQueue($queue);
$topic_name = $queue_name.'_topic';
$this->channel->exchange_declare($topic_name, 'topic', false, true, true);
$this->channel->queue_declare($queue_name, false, true, false, true);
$this->channel->queue_bind($queue_name, $topic_name);
//拉取模式
$msg = $this->channel->basic_get($queue_name,false);
if (!empty($msg)) {
$job = $msg->body;
$this->channel->basic_ack($msg->delivery_info['delivery_tag']);
return new AmqpJob($this, $job, $queue);
}
//消费者模式,暂不兼容
// $callback = function($msg) use ($original){
// echo date('Y-m-d H:i:s')." [x] Received",$msg->body,PHP_EOL;
// $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
// $job = $msg->body;
// return new AmqpJob($this, $job, $original);
// };
//只有consumer已经处理并确认了上一条message时queue才分派新的message给它
// $this->channel->basic_qos(0, 1, false);
// $this->channel->basic_consume($queue_name,'',false,false,false,false, $callback);
// while (count($this->channel->callbacks)) {
// $this->channel->wait();
// }
}
/**
* 重新发布任务
*
* @param string $queue
* @param string $payload
* @param int $delay
* @param int $attempts
* @return void
*/
public function release($queue, $payload, $delay, $attempts)
{
$payload = $this->setMeta($payload, 'attempts', $attempts);
$queue_name = $this->getQueue($queue);
$queue_name_delay = $queue_name.'_delay';
$topic_name_delay = $queue_name.'_topic_delay';
if (empty($delay)) {
$delay = 5 * 1000;
}else{
$delay = $delay * 1000;
}
$option = new AMQPTable();
$option->set('x-message-ttl', $delay);
$option->set('x-dead-letter-exchange', $queue_name.'_topic');
//$option->set('x-dead-letter-routing-key','routing_key');
$this->channel->exchange_declare($topic_name_delay, 'topic', false, true, true);
$this->channel->queue_declare($queue_name_delay, false, true, false, true, false, $option);
$this->channel->queue_bind($queue_name_delay, $topic_name_delay);
$msg = new AMQPMessage($payload);
$this->channel->basic_publish($msg, $topic_name_delay);
}
public function pushRaw($payload, $queue = null)
{
$queue_name = $this->getQueue($queue);
$topic_name = $queue_name.'_topic';
$this->channel->exchange_declare($topic_name, 'topic', false, true, true);
$this->channel->queue_declare($queue_name, false, true, false, true);
$this->channel->queue_bind($queue_name, $topic_name);
$msg = new AMQPMessage($payload);
$this->channel->basic_publish($msg, $topic_name);
//测试延迟队列
//$this->release($queue, $payload, 0, 2);
return json_decode($payload, true)['id'];
}
protected function createPayload($job, $data = '', $queue = null)
{
$payload = $this->setMeta(
parent::createPayload($job, $data), 'id', $this->getRandomId()
);
return $this->setMeta($payload, 'attempts', 1);
}
/**
* 删除任务
*
* @param string $queue
* @param string $job
* @return void
*/
public function deleteReserved($queue, $job)
{
}
/**
* 随机id
*
* @return string
*/
protected function getRandomId()
{
return Str::random(32);
}
/**
* 获取队列名
*
* @param string|null $queue
* @return string
*/
protected function getQueue($queue)
{
return 'queues_' . ($queue ?: $this->options['default']);
}
}
vendor\topthink\think-queue\src\queue\job\Amqp.php<?php
// +----------------------------------------------------------------------
// | ThinkPHP [ WE CAN DO IT JUST THINK IT ]
// +----------------------------------------------------------------------
// | Copyright (c) 2006-2015 http://thinkphp.cn All rights reserved.
// +----------------------------------------------------------------------
// | Licensed ( http://www.apache.org/licenses/LICENSE-2.0 )
// +----------------------------------------------------------------------
// | Author: yunwuxin <448901948@qq.com>
// +----------------------------------------------------------------------
namespace think\queue\job;
use think\queue\Job;
use think\queue\connector\Amqp as AmqpQueue;
class Amqp extends Job
{
/**
* The redis queue instance.
* @var RedisQueue
*/
protected $amqp;
/**
* The database job payload.
* @var Object
*/
protected $job;
public function __construct(AmqpQueue $amqp, $job, $queue)
{
$this->job = $job;
$this->queue = $queue;
$this->amqp = $amqp;
}
/**
* Fire the job.
* @return void
*/
public function fire()
{
$this->resolveAndFire(json_decode($this->getRawBody(), true));
}
/**
* Get the number of times the job has been attempted.
* @return int
*/
public function attempts()
{
return json_decode($this->job, true)['attempts'];
}
/**
* Get the raw body string for the job.
* @return string
*/
public function getRawBody()
{
return $this->job;
}
/**
* 删除任务
*
* @return void
*/
public function delete()
{
parent::delete();
$this->amqp->deleteReserved($this->queue, $this->job);
}
/**
* 重新发布任务
*
* @param int $delay
* @return void
*/
public function release($delay = 0)
{
parent::release($delay);
$this->delete();
$this->amqp->release($this->queue, $this->job, $delay, $this->attempts() + 1);
}
}
修改了获取异常,防止无限制获取的问题vendor\topthink\think-queue\src\queue\Worker.php
<?php
// +----------------------------------------------------------------------
// | ThinkPHP [ WE CAN DO IT JUST THINK IT ]
// +----------------------------------------------------------------------
// | Copyright (c) 2006-2015 http://thinkphp.cn All rights reserved.
// +----------------------------------------------------------------------
// | Licensed ( http://www.apache.org/licenses/LICENSE-2.0 )
// +----------------------------------------------------------------------
// | Author: yunwuxin <448901948@qq.com>
// +----------------------------------------------------------------------
namespace think\queue;
use Exception;
use think\facade\Hook;
use think\Queue;
class Worker
{
/**
* 执行下个任务
* @param string $queue
* @param int $delay
* @param int $sleep
* @param int $maxTries
* @return array
*/
public function pop($queue = null, $delay = 0, $sleep = 3, $maxTries = 0)
{
//处理获取错误问题,防止无限制获取
try {
$job = $this->getNextJob($queue);
} catch (\Exception $e) {
Hook::listen('worker_before_sleep', $queue);
$this->sleep($sleep);
//重新抛出错误
throw $e;
return ['job' => null, 'failed' => false];
}
if (!is_null($job)) {
Hook::listen('worker_before_process', $queue);
return $this->process($job, $maxTries, $delay);
}
Hook::listen('worker_before_sleep', $queue);
$this->sleep($sleep);
return ['job' => null, 'failed' => false];
}
/**
* 获取下个任务
* @param string $queue
* @return Job
*/
protected function getNextJob($queue)
{
if (is_null($queue)) {
return Queue::pop();
}
foreach (explode(',', $queue) as $queue) {
if (!is_null($job = Queue::pop($queue))) {
return $job;
}
}
}
/**
* Process a given job from the queue.
* @param \think\queue\Job $job
* @param int $maxTries
* @param int $delay
* @return array
* @throws Exception
*/
public function process(Job $job, $maxTries = 0, $delay = 0)
{
if ($maxTries > 0 && $job->attempts() > $maxTries) {
return $this->logFailedJob($job);
}
try {
$job->fire();
return ['job' => $job, 'failed' => false];
} catch (Exception $e) {
if (!$job->isDeleted()) {
$job->release($delay);
}
throw $e;
}
}
/**
* Log a failed job into storage.
* @param \Think\Queue\Job $job
* @return array
*/
protected function logFailedJob(Job $job)
{
if (!$job->isDeleted()) {
try {
$job->delete();
$job->failed();
} finally {
Hook::listen('queue_failed', $job);
}
}
return ['job' => $job, 'failed' => true];
}
/**
* Sleep the script for a given number of seconds.
* @param int $seconds
* @return void
*/
public function sleep($seconds)
{
sleep($seconds);
}
}
Worker.php和mq队列没有关系。使用的是拉取模式,效率上和消费者模式还是差点。
还在测试阶段有问题留言反馈。
think-queue.zip
( 32.65 KB 下载:52 次 )