老雷编程技术分享之PHPer的kafka快速入门
kafka简介
官网 https://kafka.apache.org/
Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流数据。

下载kafka
https://mirror-hk.koddos.net/apache/kafka/2.8.0/kafka_2.12-2.8.0.tgz
启动kafak
Linux
./bin/zookeeper-server-start.sh config/zookeeper.properties./bin/kafka-server-start.sh config/server.properties
Windows
bin\windows\zookeeper-server-start config\zookeeper.propertiesbin\windows\kafka-server-start config\server.properties
kafka基础知识
Topic
每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic。类似于缓存Key
生产者
生产者即数据的发布者,该角色将消息发布到Kafka的topic中
消费者
消费者从kafka中读取数据
安装kafka的PhP SDK
Rdkafka C扩展
http://pecl.php.net/package/rdkafka
phpkafka PHP库
https://github.com/swoole/phpkafka
我们使用phpkafka
composer require longlang/phpkafka简单Demo
生产者
use longlang\phpkafka\Producer\Producer;use longlang\phpkafka\Producer\ProducerConfig;require dirname(__DIR__) . '/vendor/autoload.php';$config = new ProducerConfig();$config->setBootstrapServer('127.0.0.1:9092');$config->setUpdateBrokers(true);$config->setAcks(-1);$producer = new Producer($config);$time=date("Y-m-d H:i:s");$producer->send('test-http', $time);echo "生产内容".$time;
消费者
use longlang\phpkafka\Consumer\ConsumeMessage;use longlang\phpkafka\Consumer\Consumer;use longlang\phpkafka\Consumer\ConsumerConfig;require dirname(__DIR__) . '/vendor/autoload.php';$config = new ConsumerConfig();$config->setBroker('127.0.0.1:9092');$config->setTopic('test-http'); // 主题名称$config->setGroupId('testGroup'); // 分组ID$config->setClientId('test'); // 客户端ID$config->setGroupInstanceId('test'); // 分组实例ID$consumer = new Consumer($config);$message = $consumer->consume();if($message){var_dump($message->getKey() . ':' . $message->getValue());$consumer->ack($message); // 消费成功反馈}else{echo "无消费";}
评论
