这个时候延时队列可以实现,
最近离职了。写PHP很多年,把自己一些资料整理了下发表在自己的博客,不正确的地方希望大家指正,共同提高。
把RabbitMq写了三个DEMO
1、点对点的
2、发布订阅的
3、延时队列的。
延时入库代码:
<?php
date_default_timezone_set('Asia/Shanghai');
$conn_args = array(
'host' => '127.0.0.1',
'port' => '5672',
'login' => 'guest',
'password' => 'guest',
'vhost' => '/'
);
$params = array(
'exchangeName' => 'test_cache_exchange',
'queueName' => 'test_cache_queue',
'routeKey' => 'test_cache_route',
);
$conn = new AMQPConnection($conn_args);
$conn->connect();
if (!$conn->isConnected()) {
echo 'rabbit-mq 连接错误:', json_encode($conn_args);
exit();
}
$channel = new AMQPChannel($conn);
if (!$channel->isConnected()) {
echo 'channel failed:', json_encode($connectConfig);
exit();
}
// 创建交换机
$exchange = new AMQPExchange($channel);
$exchange->setFlags(AMQP_DURABLE);//持久化
$exchange->setName($params['exchangeName'] ?: '');
$exchange->setType(AMQP_EX_TYPE_DIRECT);
$exchange->declareExchange();
// 创建队列
$queue = new AMQPQueue($channel);
$queue->setName($params['queueName'] ?: '');
$queue->setFlags(AMQP_DURABLE);
$queue->setArguments(array(
'x-dead-letter-exchange' => 'delay_exchange', // 死信交换机
'x-dead-letter-routing-key' => 'delay_route', // 死信路由
'x-message-ttl' => 60000, // 当上面的消息扔到该队列中后,过了60秒,如果没有被消费,它就死了
// 在RMQ中想要使用优先级特性需要的版本为3.5+。
//'x-max-priority'=>0,//将队列声明为优先级队列,即在创建队列的时候添加参数 x-max-priority 以指定最大的优先级,值为0-255(整数)。
));
$queue->declareQueue();
//绑定队列和交换机
$queue->bind($params['exchangeName'], $params['routeKey']);
//$num = mt_rand(100, 500);
$num = 1;
//生成消息
echo date("Y-m-d H:i:s", time());
echo "\n";
// 当mandatory标志位设置为true时,如果exchange根据自身类型和消息routeKey无法找到一个符合条件的queue,那么会调用basic.return方法将消息返还给生产者;当mandatory设为false时,出现上述情形broker会直接将消息扔掉。
$exchange->publish("这是测试消息", $params['routeKey'], AMQP_MANDATORY, array('delivery_mode' => 2));
延时出库代码,可以看我的博客。
http://123.207.244.195/blog/article.php?id=51