Skip to content

Commit e4c697e

Browse files
committed
延时队列
1 parent ff15a6a commit e4c697e

1,282 files changed

Lines changed: 201879 additions & 0 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

.DS_Store

8 KB
Binary file not shown.

DqAlert.php

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
<?php
2+
3+
class DqAlert{
4+
static function send($msg,$topic){
5+
$topicList= DqModule::getRegisterTopic();
6+
if(isset($topicList[$topic])){
7+
$mailList = trim($topicList[$topic]['email']);
8+
if(empty($mailList)){
9+
return;
10+
}
11+
$mailto = array();
12+
$tmp = explode(',',$mailList);
13+
foreach ($tmp as $v){
14+
$v = trim($v);
15+
if(!empty($v)) {
16+
$mailto[] = $v;
17+
}
18+
}
19+
20+
if(!empty($mailto)){
21+
DqMailer::sendMail($mailto,'[延时队列通知]',$msg);
22+
}
23+
}
24+
}
25+
26+
static function send_redis_down_notice($redis,$msg=''){
27+
$mailInfo = DqMysql::select('dq_alert');
28+
if(empty($mailInfo)){
29+
DqLog::writeLog('empty alert mail conf,plear check',DqLog::LOG_TYPE_EXCEPTION);
30+
return false;
31+
}else{
32+
$mailInfo = $mailInfo[0];
33+
$extArr = json_decode($mailInfo['ext'],true);
34+
if(isset($extArr['redis'])){
35+
$tmp = explode(',',$extArr['redis']);
36+
$mailTo = array();
37+
foreach ($tmp as $v){
38+
$v = trim($v);
39+
if(!empty($v)){
40+
$mailTo[] = $v;
41+
}
42+
}
43+
44+
if(!empty($mailTo)){
45+
DqMailer::sendMail($mailTo,'[延时队列通知]-redi连接异常','info='.json_encode($redis).',msg='.$msg);
46+
}
47+
48+
}
49+
}
50+
}
51+
52+
53+
}
54+
55+

DqBench.php

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
<?php
2+
3+
include_once 'DqLoader.php';
4+
include_once 'DqClient.php';
5+
include_once 'DqComm.php';
6+
7+
8+
class DqBench extends Thread{
9+
private $name;
10+
public function __construct($name){
11+
$this->name = $name;
12+
}
13+
static $concurrency=10;
14+
static $nums = 2;
15+
16+
function run(){
17+
$server=array(
18+
'10.13.131.116:6789',
19+
);
20+
$time = self::msectime();
21+
$dqClient = new DqClient();
22+
$dqClient->addServer($server);
23+
24+
$topic ='order_openvip_checker'; //topic在后台注册
25+
for($i=0;$i<self::$nums;$i++) {
26+
$id = uniqid();
27+
$data = array(
28+
'id' => $id,
29+
'body' => array(
30+
'a' => 1,
31+
'b' => 2,
32+
'c' => 3,
33+
'ext' => str_repeat('a', 128),
34+
),
35+
//可选,设置后以这个通知时间为准,默认延时时间在注册topic的时候指定
36+
//'fix_time' => date('Y-m-d 23:50:50'),
37+
);
38+
$boolRet = $dqClient->add($topic, $data);
39+
echo 'add耗时:'.(self::msectime() - $time)."ms\n";
40+
}
41+
}
42+
43+
static function msectime() {
44+
list($msec, $sec) = explode(' ', microtime());
45+
$msectime = (float)sprintf('%.0f', (floatval($msec) + floatval($sec)) * 1000);
46+
return $msectime;
47+
}
48+
}
49+
50+
DqBench::$concurrency =$_SERVER['argv'][1];
51+
DqBench::$nums = $_SERVER['argv'][2];
52+
53+
54+
55+
56+
for($i=0;$i<DqBench::$concurrency;$i++){
57+
$pool[] = new DqBench("name:".$i);
58+
}
59+
60+
$start = DqBench::msectime();
61+
foreach($pool as $worker){
62+
$worker->start();
63+
}
64+
65+
66+
foreach($pool as $worker) {
67+
$worker->join();//等待执行完成
68+
}
69+
70+
echo "总耗时:".(DqBench::msectime()-$start)."ms\n";
71+
72+
// php DqBench 100 2
73+
74+
75+

DqClient.php

Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,153 @@
1+
<?php
2+
include_once 'DqLoader.php';
3+
4+
class DqClient{
5+
6+
7+
private $serverList = array();
8+
private $fd = NULL;
9+
10+
public function addServer($server){
11+
if(is_string($server)){
12+
$this->serverList[] = $server;
13+
}
14+
if(is_array($server) && !empty($server)){
15+
$this->serverList = array_merge($this->serverList,$server);
16+
}
17+
$this->serverList = array_unique($this->serverList);
18+
}
19+
20+
21+
22+
public function connect(){
23+
if(!empty($this->fd)){
24+
return $this->fd;
25+
}
26+
if (empty($this->serverList)){
27+
DqLog::writeLog('empty server list');
28+
return false;
29+
}
30+
$serverList=$this->serverList;
31+
32+
while(count($serverList)){
33+
try {
34+
$idx = rand(0,count($serverList)-1);
35+
$server = $serverList[$idx];
36+
list($host,$port)= explode(':',$server);
37+
38+
$fd = socket_create(AF_INET, SOCK_STREAM, SOL_TCP);
39+
//1s内没处理完直接返回
40+
socket_set_option($fd,SOL_SOCKET,SO_RCVTIMEO,array("sec"=>1, "usec"=>0));
41+
if (!is_resource($fd)) {
42+
$strMsg = 'socket_create error:' . socket_strerror(socket_last_error());
43+
throw new DqException($strMsg);
44+
}
45+
if (!socket_connect($fd, $host, $port)) {
46+
$strMsg = 'socket_create error:' . socket_strerror(socket_last_error().' ip='.$host.' port='.$port);
47+
throw new DqException($strMsg);
48+
}
49+
$this->fd = $fd;
50+
return $fd;
51+
} catch (DqException $e) {
52+
unset($this->serverList[$idx]); //删除无用的配置
53+
54+
unset($serverList[$idx]);
55+
$serverList = array_values($serverList);
56+
57+
DqLog::writeLog($e->getDqMessage(), DqLog::LOG_TYPE_EXCEPTION);
58+
}
59+
60+
}
61+
return false;
62+
}
63+
64+
public function parse_result($ret){
65+
if($ret['code']==1){
66+
return true;
67+
}else{
68+
return false;
69+
}
70+
}
71+
72+
public function add($topic,$data){
73+
try{
74+
$fd = $this->connect();
75+
if($fd===false){
76+
throw new DqException(' connect server faild');
77+
}
78+
$data['cmd']='add';
79+
$data['topic'] = $topic;
80+
if(DqComm::socket_wirte($fd,$data)){
81+
$ret = DqComm::socket_read($fd);
82+
return $this->parse_result($ret);
83+
}else{
84+
throw new DqException('add error,data='.json_encode($data));
85+
}
86+
}catch (DqException $e){
87+
DqLog::writeLog($e->getDqMessage(),DqLog::LOG_TYPE_EXCEPTION);
88+
}
89+
return false;
90+
}
91+
92+
93+
public function del($topic,$id){
94+
try{
95+
$fd = $this->connect();
96+
if($fd===false){
97+
throw new DqException(' connect server faild');
98+
}
99+
$data['cmd']='del';
100+
$data['topic'] = $topic;
101+
$data['id'] = $id;
102+
if(DqComm::socket_wirte($fd,$data)){
103+
$ret = DqComm::socket_read($fd);
104+
return $this->parse_result($ret);
105+
}else{
106+
throw new DqException('add error,data='.json_encode($data));
107+
}
108+
}catch (DqException $e){
109+
DqLog::writeLog($e->getDqMessage(),DqLog::LOG_TYPE_EXCEPTION);
110+
}
111+
return false;
112+
}
113+
114+
public function get($topic,$id){
115+
try{
116+
$fd = $this->connect();
117+
if($fd===false){
118+
throw new DqException(' connect server faild');
119+
}
120+
$data['cmd']='get';
121+
$data['topic'] = $topic;
122+
$data['id'] = $id;
123+
if(DqComm::socket_wirte($fd,$data)){
124+
$ret = DqComm::socket_read($fd);
125+
return $ret;
126+
}else{
127+
throw new DqException('add error,data='.json_encode($data));
128+
}
129+
}catch (DqException $e){
130+
DqLog::writeLog($e->getDqMessage(),DqLog::LOG_TYPE_EXCEPTION);
131+
return false;
132+
}
133+
}
134+
135+
136+
137+
}
138+
139+
//$server=array('10.210.234.203:6879');
140+
//
141+
//$topic ='order_comment';
142+
//$id = rand(1,10000);
143+
//$data=array(
144+
// 'id'=>$id, //topic+业务唯一id的组合
145+
// 'body'=>array('a'=>1,'b'=>2,'c'=>3),
146+
//);
147+
//
148+
//$dqClient = new DqClient();
149+
//$dqClient->addServer($server);
150+
//
151+
//var_dump($dqClient->add($topic,$data));
152+
//var_dump($dqClient->get($topic,$id));
153+
//var_dump($dqClient->del($topic,$id));

DqComm.php

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
<?php
2+
include_once 'DqLog.php';
3+
4+
class DqComm{
5+
static $max_package_size=4096;
6+
//获取消息
7+
public static function socket_read($cfd){
8+
try {
9+
if (is_resource($cfd)) {
10+
$msg_len = socket_read($cfd, 4); //读取数据前四个字节,表示本条消息的长度
11+
if($msg_len===false){
12+
$strMsg = socket_strerror(socket_last_error($cfd));
13+
throw new DqException('read error,msg='.$strMsg);
14+
}
15+
$msg_len = intval($msg_len);
16+
$body = socket_read($cfd, $msg_len);
17+
if ($body===false) {
18+
return false;
19+
}
20+
$body_len = strlen($body);
21+
if ($body_len!=$msg_len) {
22+
throw new DqException(' body len is not match,body='.$body.' needed_len='.$msg_len);
23+
}
24+
$arr = json_decode($body,true);
25+
if(!is_array($arr)){
26+
throw new DqException(' parse body error,body='.$body);
27+
}
28+
return $arr;
29+
} else {
30+
throw new DqException('given fd not a resource');
31+
}
32+
}catch (DqException $e){
33+
DqLog::writeLog($e->getDqMessage(),DqLog::LOG_TYPE_EXCEPTION);
34+
return false;
35+
}
36+
}
37+
38+
public static function format_data($params){
39+
$str = json_encode($params);
40+
$len = strlen($str);
41+
$packageData=sprintf('%04d%s',$len,$str);
42+
return array($len+4,$packageData);
43+
}
44+
45+
public static function socket_wirte($fd,$data){
46+
list($len,$data) = self::format_data($data);
47+
if($len>=self::$max_package_size){
48+
throw new DqException('data too long,str='.$data.' max_package_size='.self::$max_package_size);
49+
}
50+
while($len){
51+
$nwrite = socket_write($fd, $data, $len); //可能一次性写不完,需要多次写入
52+
if($nwrite===false){ /*数据写入失败*/
53+
throw new DqException('socker error: data='.$data);
54+
}else if($nwrite>0){
55+
$len -= $nwrite;
56+
$data = substr($data,$nwrite);
57+
}
58+
}
59+
return true;
60+
}
61+
62+
public static function msectime() {
63+
list($msec, $sec) = explode(' ', microtime());
64+
$msectime = (float)sprintf('%.0f', (floatval($msec) + floatval($sec)) * 1000);
65+
return $msectime;
66+
}
67+
68+
}

0 commit comments

Comments
 (0)