think-queue3的http://www.thinkphp.cn/topic/69415.html
队列用法示例think-queue
https://packagist.org/packages/topthink/think-queue
需要php-amqplib
https://packagist.org/packages/php-amqplib/php-amqplib
---------------------------------
消费者模式:php think queue:amqp --queue SaveException2Es --daemon
备注:不带daemon是拉取模式。
---------------------------------
更新:
更新延迟队列的命名方式,同一个队列支持多个不同的延迟时间。
修复部分问题
---------------------------------
config/queue.php
return [
'connector' => 'Amqp',
'expire' => 60,
'default' => 'default',
'host' => '127.0.0.1',
'username' => 'guest',
'password' => 'guest',
'port' => 5672,
'vhost' => '/',
'select' => 0,
'timeout' => 0,
'persistent' => false, // 是否是长连接
];
vendor\topthink\think-queue\src\common.php\think\Console::addDefaultCommands([
"think\\queue\\command\\Work",
"think\\queue\\command\\Restart",
"think\\queue\\command\\Listen",
"think\\queue\\command\\Subscribe",
"think\\queue\\command\\Amqp"
]);
if (!function_exists('queue')) {
/**
* 添加到队列
* @param $job
* @param string $data
* @param int $delay
* @param null $queue
*/
function queue($job, $data = '', $delay = 0, $queue = null)
{
if ($delay > 0) {
\think\Queue::later($delay, $job, $data, $queue);
} else {
\think\Queue::push($job, $data, $queue);
}
}
}
vendor\topthink\think-queue\src\queue\Amqp.php<?php
// +----------------------------------------------------------------------
// | ThinkPHP [ WE CAN DO IT JUST THINK IT ]
// +----------------------------------------------------------------------
// | Copyright (c) 2006-2015 http://thinkphp.cn All rights reserved.
// +----------------------------------------------------------------------
// | Licensed ( http://www.apache.org/licenses/LICENSE-2.0 )
// +----------------------------------------------------------------------
// | Author: yunwuxin <448901948@qq.com>
// +----------------------------------------------------------------------
namespace think\queue;
use Exception;
use think\facade\Hook;
use think\Queue;
use think\queue\command\Amqp as AmqpCommand;
class Amqp
{
public function getStartJob(AmqpCommand $amqcmd, $queue = null, $delay = 0, $sleep = 3, $maxTries = 0)
{
$job = Queue::consume($this, $amqcmd, $queue, $delay, $maxTries);
}
/**
* 执行下个任务
* @param string $queue
* @param int $delay
* @param int $sleep
* @param int $maxTries
* @return array
*/
public function pop($queue = null, $delay = 0, $sleep = 3, $maxTries = 0)
{
//处理获取错误问题,防止无限制获取
try {
$job = $this->getNextJob($queue);
} catch (\Exception $e) {
Hook::listen('worker_before_sleep', $queue);
$this->sleep($sleep);
//重新抛出错误
throw $e;
return ['job' => null, 'failed' => false];
}
if (!is_null($job)) {
Hook::listen('worker_before_process', $queue);
return $this->process($job, $maxTries, $delay);
}
Hook::listen('worker_before_sleep', $queue);
$this->sleep($sleep);
return ['job' => null, 'failed' => false];
}
/**
* 获取下个任务
* @param string $queue
* @return Job
*/
protected function getNextJob($queue)
{
if (is_null($queue)) {
return Queue::pop();
}
foreach (explode(',', $queue) as $queue) {
if (!is_null($job = Queue::pop($queue))) {
return $job;
}
}
}
/**
* Process a given job from the queue.
* @param \think\queue\Job $job
* @param int $maxTries
* @param int $delay
* @return array
* @throws Exception
*/
public function process(Job $job, $maxTries = 0, $delay = 0)
{
if ($maxTries > 0 && $job->attempts() > $maxTries) {
return $this->logFailedJob($job);
}
try {
$job->fire();
return ['job' => $job, 'failed' => false];
} catch (Exception $e) {
if (!$job->isDeleted()) {
$job->release($delay);
}
throw $e;
}
}
/**
* Log a failed job into storage.
* @param \Think\Queue\Job $job
* @return array
*/
protected function logFailedJob(Job $job)
{
if (!$job->isDeleted()) {
try {
$job->delete();
$job->failed();
} finally {
Hook::listen('queue_failed', $job);
}
}
return ['job' => $job, 'failed' => true];
}
/**
* Sleep the script for a given number of seconds.
* @param int $seconds
* @return void
*/
public function sleep($seconds)
{
sleep($seconds);
}
}
vendor\topthink\think-queue\src\queue\command\Amqp.php<?php
// +----------------------------------------------------------------------
// | ThinkPHP [ WE CAN DO IT JUST THINK IT ]
// +----------------------------------------------------------------------
// | Copyright (c) 2006-2015 http://thinkphp.cn All rights reserved.
// +----------------------------------------------------------------------
// | Licensed ( http://www.apache.org/licenses/LICENSE-2.0 )
// +----------------------------------------------------------------------
// | Author: yunwuxin <448901948@qq.com>
// +----------------------------------------------------------------------
namespace think\queue\command;
use Exception;
use think\console\Command;
use think\console\Input;
use think\console\input\Option;
use think\console\Output;
use think\exception\Handle;
use think\exception\ThrowableError;
use think\facade\Cache;
use think\facade\Config;
use think\facade\Hook;
use think\queue\Job;
use think\queue\Amqp as AmqpQueue;
use Throwable;
class Amqp extends Command
{
/**
* The queue worker instance.
* @var \think\queue\Amqp
*/
protected $amqp;
protected $memory;
protected $lastRestart;
protected function initialize(Input $input, Output $output)
{
$this->amqp = new AmqpQueue();
}
protected function configure()
{
$this->setName('queue:amqp')
->addOption('queue', null, Option::VALUE_OPTIONAL, 'The queue to listen on')
->addOption('daemon', null, Option::VALUE_NONE, 'Run the worker in daemon mode')
->addOption('delay', null, Option::VALUE_OPTIONAL, 'Amount of time to delay failed jobs', 0)
->addOption('force', null, Option::VALUE_NONE, 'Force the worker to run even in maintenance mode')
->addOption('memory', null, Option::VALUE_OPTIONAL, 'The memory limit in megabytes', 128)
->addOption('sleep', null, Option::VALUE_OPTIONAL, 'Number of seconds to sleep when no job is available', 3)
->addOption('tries', null, Option::VALUE_OPTIONAL, 'Number of times to attempt a job before logging it failed', 0)
->setDescription('Process the next job on a queue');
}
/**
* Execute the console command.
* @param Input $input
* @param Output $output
* @return int|null|void
*/
public function execute(Input $input, Output $output)
{
$queue = $input->getOption('queue');
$delay = $input->getOption('delay');
$this->memory = $memory = $input->getOption('memory');
if ($input->getOption('daemon')) {
Hook::listen('worker_daemon_start', $queue);
$this->daemon(
$queue, $delay, $memory,
$input->getOption('sleep'), $input->getOption('tries')
);
} else {
$response = $this->amqp->pop($queue, $delay, $input->getOption('sleep'), $input->getOption('tries'));
$this->output($response);
}
}
protected function output($response)
{
if (!is_null($response['job'])) {
/** @var Job $job */
$job = $response['job'];
if ($response['failed']) {
$this->output->writeln('<error>Failed:</error> ' . $job->getName());
} else {
$this->output->writeln('<info>Processed:</info> ' . $job->getName());
}
}
}
/**
* 启动一个守护进程执行任务.
*
* @param string $queue
* @param int $delay
* @param int $memory
* @param int $sleep
* @param int $maxTries
* @return array
*/
protected function daemon($queue = null, $delay = 0, $memory = 128, $sleep = 3, $maxTries = 0)
{
$this->lastRestart = $this->getTimestampOfLastQueueRestart();
do {
$this->runNextJobForDaemon(
$queue, $delay, $sleep, $maxTries
);
if ($this->memoryExceeded($memory)) {
Hook::listen('worker_memory_exceeded', $queue);
$this->stop();
}
if ($this->queueShouldRestart($lastRestart)) {
Hook::listen('worker_queue_restart', $queue);
$this->stop();
}
} while (false);
}
public function checkDaemon($queue, $response)
{
if ($this->memoryExceeded($this->memory)) {
Hook::listen('worker_memory_exceeded', $queue);
$this->stop();
}
if ($this->queueShouldRestart($this->lastRestart)) {
Hook::listen('worker_queue_restart', $queue);
$this->stop();
}
$this->output($response);
}
/**
* 以守护进程的方式执行下个任务.
*
* @param string $queue
* @param int $delay
* @param int $sleep
* @param int $maxTries
* @return void
*/
protected function runNextJobForDaemon($queue, $delay, $sleep, $maxTries)
{
try {
$response = $this->amqp->getStartJob($this, $queue, $delay, $sleep, $maxTries);
//$response = $this->amqp->pop($queue, $delay, $sleep, $maxTries);
//$this->output($response);
} catch (Exception $e) {
$this->getExceptionHandler()->report($e);
} catch (Throwable $e) {
$this->getExceptionHandler()->report(new ThrowableError($e));
}
}
/**
* 获取上次重启守护进程的时间
*
* @return int|null
*/
protected function getTimestampOfLastQueueRestart()
{
return Cache::get('think:queue:restart');
}
/**
* 检查是否要重启守护进程
*
* @param int|null $lastRestart
* @return bool
*/
protected function queueShouldRestart($lastRestart)
{
return $this->getTimestampOfLastQueueRestart() != $lastRestart;
}
/**
* 检查内存是否超出
* @param int $memoryLimit
* @return bool
*/
protected function memoryExceeded($memoryLimit)
{
return (memory_get_usage() / 1024 / 1024) >= $memoryLimit;
}
/**
* 获取异常处理实例
*
* @return \think\exception\Handle
*/
protected function getExceptionHandler()
{
static $handle;
if (!$handle) {
if ($class = Config::get('exception_handle')) {
if (class_exists($class) && is_subclass_of($class, "\\think\\exception\\Handle")) {
$handle = new $class;
}
}
if (!$handle) {
$handle = new Handle();
}
}
return $handle;
}
/**
* 停止执行任务的守护进程.
* @return void
*/
public function stop()
{
die;
}
}
vendor\topthink\think-queue\src\queue\connector\Amqp.php<?php
// +----------------------------------------------------------------------
// | ThinkPHP [ WE CAN DO IT JUST THINK IT ]
// +----------------------------------------------------------------------
// | Copyright (c) 2006-2015 http://thinkphp.cn All rights reserved.
// +----------------------------------------------------------------------
// | Licensed ( http://www.apache.org/licenses/LICENSE-2.0 )
// +----------------------------------------------------------------------
// | Author: yunwuxin <448901948@qq.com>
// +----------------------------------------------------------------------
namespace think\queue\connector;
use Exception;
use think\helper\Str;
use think\queue\Connector;
use think\queue\job\Amqp as AmqpJob;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;
use think\queue\command\Amqp as AmqpCommand;
use think\queue\Amqp as AmqpQueue;
class Amqp extends Connector
{
protected $connection;
protected $channel;
protected $options = [
'expire' => 60,
'host' => '127.0.0.1',
'port' => 5672,
'username' => 'guest',
'password' => 'guest',
'timeout' => 0
];
public function __construct(array $options)
{
if (!extension_loaded('sockets')) {
throw new Exception('sockets扩展未安装');
}
if (!empty($options)) {
$this->options = array_merge($this->options, $options);
}
$this->connection = new AMQPStreamConnection($this->options['host'], $this->options['port'], $this->options['username'], $this->options['password']);
$this->channel = $this->connection->channel();
}
public function __destruct()
{
$this->channel->close();
$this->connection->close();
}
public function push($job, $data = '', $queue = null)
{
return $this->pushRaw($this->createPayload($job, $data), $queue);
}
public function later($delay, $job, $data = '', $queue = null)
{
$payload = $this->createPayload($job, $data);
$queue_name = $this->getQueue($queue);
if (empty($delay)) {
$delay = 5 * 1000;
}else{
$delay = $delay * 1000;
}
$topic_name = $queue_name.'_topic';
$queue_name_delay = $queue_name.$delay.'_delay';
$topic_name_delay = $queue_name.$delay.'_topic_delay';
$this->channel->exchange_declare($topic_name, 'topic', false, true, false);
$this->channel->queue_declare($queue_name, false, true, false, false);
$this->channel->queue_bind($queue_name, $topic_name);
$option = new AMQPTable();
$option->set('x-message-ttl', $delay);
$option->set('x-dead-letter-exchange', $topic_name);
//$option->set('x-dead-letter-routing-key','routing_key');
$this->channel->exchange_declare($topic_name_delay, 'topic', false, true, false);
$this->channel->queue_declare($queue_name_delay, false, true, false, false, false, $option);
$this->channel->queue_bind($queue_name_delay, $topic_name_delay);
$msg = new AMQPMessage($payload);
$this->channel->basic_publish($msg, $topic_name_delay);
}
//拉取模式,默认模式
public function pop($queue = null)
{
$queue_name = $this->getQueue($queue);
$topic_name = $queue_name.'_topic';
$this->channel->exchange_declare($topic_name, 'topic', false, true, false);
$this->channel->queue_declare($queue_name, false, true, false, false);
$this->channel->queue_bind($queue_name, $topic_name);
//拉取模式
$msg = $this->channel->basic_get($queue_name,false);
if (!empty($msg)) {
$job = $msg->body;
$this->channel->basic_ack($msg->delivery_info['delivery_tag']);
return new AmqpJob($this, $job, $queue);
}
}
//消费者模式,amqp模式下
//echo date('Y-m-d H:i:s')." [x] Received",$msg->body,PHP_EOL;
public function consume(AmqpQueue $work, AmqpCommand $amqcmd, $queue = null, $delay = 0, $maxTries = 0)
{ echo date('Y-m-d H:i:s')." [x] Received-","consume",PHP_EOL;
$queue_name = $this->getQueue($queue);
$topic_name = $queue_name.'_topic';
$this->channel->exchange_declare($topic_name, 'topic', false, true, false);
$this->channel->queue_declare($queue_name, false, true, false, false);
$this->channel->queue_bind($queue_name, $topic_name);
$_this = $this;
//消费者模式
$callback = function($msg) use ($_this, $work, $amqcmd, $queue, $delay, $maxTries){
//echo date('Y-m-d H:i:s')." [x] Received",$msg->body,PHP_EOL;
$amqpJob = new AmqpJob($_this, $msg->body, $queue);
$result = $work->process($amqpJob, $maxTries, $delay);
$amqcmd->checkDaemon($queue, $result);
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
};
//只有consumer已经处理并确认了上一条message时queue才分派新的message给它
$this->channel->basic_qos(0, 1, false);
$this->channel->basic_consume($queue_name,'',false,false,false,false, $callback);
while (count($this->channel->callbacks)) {
$this->channel->wait();
}
}
/**
* 重新发布任务
*
* @param string $queue
* @param string $payload
* @param int $delay
* @param int $attempts
* @return void
*/
public function release($queue, $payload, $delay, $attempts)
{
$payload = $this->setMeta($payload, 'attempts', $attempts);
$queue_name = $this->getQueue($queue);
if (empty($delay)) {
$delay = 5 * 1000;
}else{
$delay = $delay * 1000;
}
$queue_name_delay = $queue_name.$delay.'_delay';
$topic_name_delay = $queue_name.$delay.'_topic_delay';
$option = new AMQPTable();
$option->set('x-message-ttl', $delay);
$option->set('x-dead-letter-exchange', $queue_name.'_topic');
//$option->set('x-dead-letter-routing-key','routing_key');
$this->channel->exchange_declare($topic_name_delay, 'topic', false, true, false);
$this->channel->queue_declare($queue_name_delay, false, true, false, false, false, $option);
$this->channel->queue_bind($queue_name_delay, $topic_name_delay);
$msg = new AMQPMessage($payload);
$this->channel->basic_publish($msg, $topic_name_delay);
}
public function pushRaw($payload, $queue = null)
{
$queue_name = $this->getQueue($queue);
$topic_name = $queue_name.'_topic';
$this->channel->exchange_declare($topic_name, 'topic', false, true, false);
$this->channel->queue_declare($queue_name, false, true, false, false);
$this->channel->queue_bind($queue_name, $topic_name);
$msg = new AMQPMessage($payload);
$this->channel->basic_publish($msg, $topic_name);
//测试延迟队列
//$this->release($queue, $payload, 0, 2);
return json_decode($payload, true)['id'];
}
protected function createPayload($job, $data = '', $queue = null)
{
$payload = $this->setMeta(
parent::createPayload($job, $data), 'id', $this->getRandomId()
);
return $this->setMeta($payload, 'attempts', 1);
}
/**
* 删除任务
*
* @param string $queue
* @param string $job
* @return void
*/
public function deleteReserved($queue, $job)
{
}
/**
* 随机id
*
* @return string
*/
protected function getRandomId()
{
return Str::random(32);
}
/**
* 获取队列名
*
* @param string|null $queue
* @return string
*/
protected function getQueue($queue)
{
return 'queues_' . ($queue ?: $this->options['default']);
}
}
vendor\topthink\think-queue\src\queue\job\Amqp.php<?php
// +----------------------------------------------------------------------
// | ThinkPHP [ WE CAN DO IT JUST THINK IT ]
// +----------------------------------------------------------------------
// | Copyright (c) 2006-2015 http://thinkphp.cn All rights reserved.
// +----------------------------------------------------------------------
// | Licensed ( http://www.apache.org/licenses/LICENSE-2.0 )
// +----------------------------------------------------------------------
// | Author: yunwuxin <448901948@qq.com>
// +----------------------------------------------------------------------
namespace think\queue\job;
use think\queue\Job;
use think\queue\connector\Amqp as AmqpQueue;
class Amqp extends Job
{
/**
* The redis queue instance.
* @var RedisQueue
*/
protected $amqp;
/**
* The database job payload.
* @var Object
*/
protected $job;
public function __construct(AmqpQueue $amqp, $job, $queue)
{
$this->job = $job;
$this->queue = $queue;
$this->amqp = $amqp;
}
/**
* Fire the job.
* @return void
*/
public function fire()
{
$this->resolveAndFire(json_decode($this->getRawBody(), true));
}
/**
* Get the number of times the job has been attempted.
* @return int
*/
public function attempts()
{
return json_decode($this->job, true)['attempts'];
}
/**
* Get the raw body string for the job.
* @return string
*/
public function getRawBody()
{
return $this->job;
}
/**
* 删除任务
*
* @return void
*/
public function delete()
{
parent::delete();
$this->amqp->deleteReserved($this->queue, $this->job);
}
/**
* 重新发布任务
*
* @param int $delay
* @return void
*/
public function release($delay = 0)
{
parent::release($delay);
$this->delete();
$this->amqp->release($this->queue, $this->job, $delay, $this->attempts() + 1);
}
}
修改了获取异常,防止无限制获取的问题vendor\topthink\think-queue\src\queue\Worker.php
<?php
// +----------------------------------------------------------------------
// | ThinkPHP [ WE CAN DO IT JUST THINK IT ]
// +----------------------------------------------------------------------
// | Copyright (c) 2006-2015 http://thinkphp.cn All rights reserved.
// +----------------------------------------------------------------------
// | Licensed ( http://www.apache.org/licenses/LICENSE-2.0 )
// +----------------------------------------------------------------------
// | Author: yunwuxin <448901948@qq.com>
// +----------------------------------------------------------------------
namespace think\queue;
use Exception;
use think\facade\Hook;
use think\Queue;
class Worker
{
/**
* 执行下个任务
* @param string $queue
* @param int $delay
* @param int $sleep
* @param int $maxTries
* @return array
*/
public function pop($queue = null, $delay = 0, $sleep = 3, $maxTries = 0)
{
//处理获取错误问题,防止无限制获取
try {
$job = $this->getNextJob($queue);
} catch (\Exception $e) {
Hook::listen('worker_before_sleep', $queue);
$this->sleep($sleep);
//重新抛出错误
throw $e;
return ['job' => null, 'failed' => false];
}
if (!is_null($job)) {
Hook::listen('worker_before_process', $queue);
return $this->process($job, $maxTries, $delay);
}
Hook::listen('worker_before_sleep', $queue);
$this->sleep($sleep);
return ['job' => null, 'failed' => false];
}
/**
* 获取下个任务
* @param string $queue
* @return Job
*/
protected function getNextJob($queue)
{
if (is_null($queue)) {
return Queue::pop();
}
foreach (explode(',', $queue) as $queue) {
if (!is_null($job = Queue::pop($queue))) {
return $job;
}
}
}
/**
* Process a given job from the queue.
* @param \think\queue\Job $job
* @param int $maxTries
* @param int $delay
* @return array
* @throws Exception
*/
public function process(Job $job, $maxTries = 0, $delay = 0)
{
if ($maxTries > 0 && $job->attempts() > $maxTries) {
return $this->logFailedJob($job);
}
try {
$job->fire();
return ['job' => $job, 'failed' => false];
} catch (Exception $e) {
if (!$job->isDeleted()) {
$job->release($delay);
}
throw $e;
}
}
/**
* Log a failed job into storage.
* @param \Think\Queue\Job $job
* @return array
*/
protected function logFailedJob(Job $job)
{
if (!$job->isDeleted()) {
try {
$job->delete();
$job->failed();
} finally {
Hook::listen('queue_failed', $job);
}
}
return ['job' => $job, 'failed' => true];
}
/**
* Sleep the script for a given number of seconds.
* @param int $seconds
* @return void
*/
public function sleep($seconds)
{
sleep($seconds);
}
}
Worker.php和mq队列没有关系。还在测试阶段有问题留言反馈。
think-queue.zip
( 37.14 KB 下载:69 次 )