/**
* @var Redis
*/
protected $_server;
/**
* @var string
*/
protected $_key = 'callback';
/**
* Center_Sdk_Queue constructor.
*/
public function __construct(){
$this->_server = new Redis();
$redisCfg = conf('Redis','queue');
$this->_server->connect($redisCfg['host'],$redisCfg['port']);
}
/**
* 将callback_notify表中的数据 加入队列
*/
public function pushList(){
$callBack_m = new Model_CallbackNotify();
$callBack_info = $callBack_m->getLessthenNow();
//将通知时间小于等于当前时间的回调信息 加入队列中 并且更新状态为通知中
foreach ($callBack_info as $value){
$this->_server->rPush($this->_key,json_encode($value));
$updata = [
'notify_status' => 1
];
$callBack_m->updateDbById($value['id'],$updata);
}
$myfile = fopen(dirname (dirname ( __FILE__ ))."/Log/queueErr.txt", "a");
fwrite($myfile, "\r\n");
fwrite($myfile, json_encode($callBack_info));
}
/**
* 开始执行队列
*/
public function run(){
while ($data = $this->_server->blPop($this->_key, 0)) {
if (!empty($data)) {
$this->dispatch($data);
}
}
}
/**
* 数据处理
* @param $data
*/
public function dispatch($data){
$data_info = json_decode($data[1],true);
$notify_id = $data_info['id'];
$notify_number = $data_info['notify_number'];
$callBack_m = new Model_CallbackNotify();
if ($notify_number <= 5){
$time = $this->notifyTime($notify_number);
$notify_info = json_decode($data_info['notify_info'],true);
$network = new Sys_Lib_Network_Curl(array(
'method' => 'post',
'url' => $notify_info['notify_url'],
'params' => http_build_query(
json_encode($notify_info['data'])
),
));
$res = $network->getData();
$res = json_decode($res,true);
if ($res['status'] != "success"){
$updata = [
'notify_time' => $time,
'notify_status' => 0,
'notify_number' => $notify_number + 1
];
$callBack_m->updateDbById($notify_id,$updata);
}else{
$order_m = new Model_Order();
$updateData = [
'notify_status' => 1
];
$order_id = $notify_info['data']['order_id'];
$table = substr("$order_id",0,4);
$order_m->updateTable($table,$order_id,$updateData);
$callBack_m->deleteDbById($notify_id);
}
}else{
$callBack_m->deleteDbById($notify_id);
}
}
/**
* 通知时间间隔
* @param $num
* @return int
*/
public function notifyTime($num){
switch ($num){
case 1:
$time = time()+5;
return $time;
case 2:
$time = time()+15;
return $time;
case 3:
$time = time()+15;
return $time;
case 4:
$time = time()+180;
return $time;
case 5:
$time = time()+1800;
return $time;
}
}
}
最佳答案