php-nsqNSQ 的 PHP 客户端
php-nsq
php-nsq 是nsq的php客户端,采用c扩展编写,性能和稳定性。
安装 :
请提前安装libevent
Dependencies: libevent (apt-get install libevent-dev ,yum install libevent-devel) 1. sudo phpize 2. ./configure 3. make 4. make install add in your php.ini: extension = nsq.so;
pub例子:
$nsqdAddr = array( "127.0.0.1:4150", "127.0.0.1:4154" ); $nsq = new Nsq(); $isTrue = $nsq->connectNsqd($nsqdAddr); for($i = 0; $i < 10000; $i++){ $nsq->publish("test", "nihao"); } $nsq->closeNsqdConnection(); // Deferred publish //function : deferredPublish(string topic,string message, int millisecond); //millisecond default : [0 < millisecond < 3600000] $deferred = new Nsq(); $isTrue = $deferred->connectNsqd($nsqdAddr); for($i = 0; $i < 20; $i++){ $deferred->deferredPublish("test", "message daly", 3000); } $deferred->closeNsqdConnection();
sub例子:
<?php //sub $nsq_lookupd = new NsqLookupd("127.0.0.1:4161"); //the nsqlookupd http addr $nsq = new Nsq(); $config = array( "topic" => "test", "channel" => "struggle", "rdy" => 2, //optional , default 1 "connect_num" => 1, //optional , default 1 "retry_delay_time" => 5000, //optional, default 0 , if run callback failed, after 5000 msec, message will be retried "auto_finish" => true, //default true ); $nsq->subscribe($nsq_lookupd, $config, function($msg,$bev){ echo $msg->payload; echo $msg->attempts; echo $msg->message_id; echo $msg->timestamp; });
Nsq 类方法:
connectNsqd($nsqdAddrArr)
pub的时候连接nsq,你也可以利用此函数做健康检查closeNsqdConnection()
关闭nsq的连接publish($topic,$msg)
消息发送deferredPublish($topic,$msg,$msec)
延迟消息发送subscribe($nsq_lookupd,$config,$callback)
消息订阅
Message 类方法与属性:
timestamp
消息时间戳attempts
消息的重试次数,(从1开始)message_id
消息idpayload
消息内容finish($bev,$msg->message_id)
主动的 ack消息方法touch($bev,$msg->message_id)
如果你消息执行太长,可以利用次函数告知nsq 你还活着,一般用于执行频率比较规律的场景。
Tips :
1.如果callback内需要外部变量,可以采用以下use的写法:
$nsq->subscribe($nsq_lookupd, $config, function($msg,$bev) use ($you_variable){ echo $msg->payload; echo $msg->attempts; echo $msg->message_id; echo $msg->timestamp; });
2.消息重试,只要抛异常就可以,切记不要陷入死循环,超过自己觉得可以的次数 要return:
subscribe($nsq_lookupd, $config, function($msg){ try{ echo $msg->payload . " " . "attempts:".$msg->attempts."\n"; //do something }catch(Exception $e){ if($msg->attempts < 3){ //the message will be retried after you configure retry_delay_time throw new Exception(""); }else{ echo $e->getMessage(); return; } } });
3.如果你想增加 客户端的心跳时间与消息的超时时间 :
第一步 在nsqd启动时要加入相关参数,这个参数是最大的限制,比如--max-heartbeat-interval=1m30s 心跳时间最大不能超过1分30秒: nsqd --lookupd-tcp-address=127.0.0.1:4160 --max-heartbeat-interval=1m30s --msg-timeout=10m30s 第二步 因为第一步是指定最大时间,所以还需要第二步在客户端指定所需要的值 具体请看 example目录中的identify开头的文件例子。
4.如果你想增强消费能力,可以加大rdy参数
5.你可以用supervisor管理,但是因为是多进程消费,你需要在supervisor job的配置文件 添加:
stopasgroup=true killasgroup=true
Changes
3.0
修复因libevent 超过4096消息被截断问题
增加identify指令功能,可以增加客户端心跳时间 与 消息超时时间
2.4.0
修复 pub bug
修复 sub coredump
修覆盖 touch bug
增加等待,当刚初始化的topic没消息时
2.3.1
pub支持域名
修复 pub coredump