老雷编程技术分享之PHPer的kafka快速入门
kafka简介
官网 https://kafka.apache.org/
Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流数据。
下载kafka
https://mirror-hk.koddos.net/apache/kafka/2.8.0/kafka_2.12-2.8.0.tgz
启动kafak
Linux
./bin/zookeeper-server-start.sh config/zookeeper.properties
./bin/kafka-server-start.sh config/server.properties
Windows
bin\windows\zookeeper-server-start config\zookeeper.properties
bin\windows\kafka-server-start config\server.properties
kafka基础知识
Topic
每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic。类似于缓存Key
生产者
生产者即数据的发布者,该角色将消息发布到Kafka的topic中
消费者
消费者从kafka中读取数据
安装kafka的PhP SDK
Rdkafka C扩展
http://pecl.php.net/package/rdkafka
phpkafka PHP库
https://github.com/swoole/phpkafka
我们使用phpkafka
composer require longlang/phpkafka
简单Demo
生产者
use longlang\phpkafka\Producer\Producer;
use longlang\phpkafka\Producer\ProducerConfig;
require dirname(__DIR__) . '/vendor/autoload.php';
$config = new ProducerConfig();
$config->setBootstrapServer('127.0.0.1:9092');
$config->setUpdateBrokers(true);
$config->setAcks(-1);
$producer = new Producer($config);
$time=date("Y-m-d H:i:s");
$producer->send('test-http', $time);
echo "生产内容".$time;
消费者
use longlang\phpkafka\Consumer\ConsumeMessage;
use longlang\phpkafka\Consumer\Consumer;
use longlang\phpkafka\Consumer\ConsumerConfig;
require dirname(__DIR__) . '/vendor/autoload.php';
$config = new ConsumerConfig();
$config->setBroker('127.0.0.1:9092');
$config->setTopic('test-http'); // 主题名称
$config->setGroupId('testGroup'); // 分组ID
$config->setClientId('test'); // 客户端ID
$config->setGroupInstanceId('test'); // 分组实例ID
$consumer = new Consumer($config);
$message = $consumer->consume();
if($message){
var_dump($message->getKey() . ':' . $message->getValue());
$consumer->ack($message); // 消费成功反馈
}else{
echo "无消费";
}
评论