RabbitMq实现延时队列

浏览:3885 发布日期:2018/12/31 分类:功能实现
在很多场景中,需要做延时处理,比如下单后,多少X时间内没操作订单,需要将订单做废之类的业务场景。
这个时候延时队列可以实现,
最近离职了。写PHP很多年,把自己一些资料整理了下发表在自己的博客,不正确的地方希望大家指正,共同提高。
把RabbitMq写了三个DEMO
1、点对点的
2、发布订阅的
3、延时队列的。

延时入库代码:
<?php
date_default_timezone_set('Asia/Shanghai');
$conn_args = array(
'host' => '127.0.0.1',
'port' => '5672',
'login' => 'guest',
'password' => 'guest',
'vhost' => '/'
);

$params = array(
'exchangeName' => 'test_cache_exchange',
'queueName' => 'test_cache_queue',
'routeKey' => 'test_cache_route',
);

$conn = new AMQPConnection($conn_args);
$conn->connect();

if (!$conn->isConnected()) {
echo 'rabbit-mq 连接错误:', json_encode($conn_args);
exit();
}

$channel = new AMQPChannel($conn);
if (!$channel->isConnected()) {
echo 'channel failed:', json_encode($connectConfig);
exit();
}

// 创建交换机
$exchange = new AMQPExchange($channel);
$exchange->setFlags(AMQP_DURABLE);//持久化
$exchange->setName($params['exchangeName'] ?: '');
$exchange->setType(AMQP_EX_TYPE_DIRECT);
$exchange->declareExchange();

// 创建队列
$queue = new AMQPQueue($channel);
$queue->setName($params['queueName'] ?: '');
$queue->setFlags(AMQP_DURABLE);
$queue->setArguments(array(
'x-dead-letter-exchange' => 'delay_exchange', // 死信交换机
'x-dead-letter-routing-key' => 'delay_route', // 死信路由
'x-message-ttl' => 60000, // 当上面的消息扔到该队列中后,过了60秒,如果没有被消费,它就死了
// 在RMQ中想要使用优先级特性需要的版本为3.5+。
//'x-max-priority'=>0,//将队列声明为优先级队列,即在创建队列的时候添加参数 x-max-priority 以指定最大的优先级,值为0-255(整数)。
));

$queue->declareQueue();

//绑定队列和交换机
$queue->bind($params['exchangeName'], $params['routeKey']);


//$num = mt_rand(100, 500);
$num = 1;

//生成消息
echo date("Y-m-d H:i:s", time());
echo "\n";
// 当mandatory标志位设置为true时,如果exchange根据自身类型和消息routeKey无法找到一个符合条件的queue,那么会调用basic.return方法将消息返还给生产者;当mandatory设为false时,出现上述情形broker会直接将消息扔掉。
$exchange->publish("这是测试消息", $params['routeKey'], AMQP_MANDATORY, array('delivery_mode' => 2));

延时出库代码,可以看我的博客。

http://123.207.244.195/blog/article.php?id=51
评论( 相关
后面还有条评论,点击查看>>