SpringCloud Stream下基于Kafka的消息驱动实践
SpringCloud Stream针对消息中间件服务致力于提供统一的编程模型
SpringCloud Stream 模型
针对市面上多种消息中间件并存的局面,Spring Cloud Stream作为一个构建消息驱动的微服务框架。其目标在于对开发者提供统一的消息中间件操作接口,屏蔽底层中间件的实现细节。可以看到Stream本身并不具有消息中间件的能力,其底层依然需要依赖具体的中间件来实现消息服务。现阶段Stream支持RabbitMQ、Kafka两种消息中间件。在Stream的模型中,其针对各消息中间件均提供了相应的Binder,其实现了对具体中间件的封装、屏蔽功能。同时开发者在应用程序中只需通过相应的通道(输出通道output、输入通道input)与Binder进行交互,即可实现消息的生产、消费。其模型架构如下所示
消息生产者
这里我们以Kafka为例介绍如何通过SpringCloud Stream使用消息服务。首先建立一个消息的生产者order服务,向POM中引入spring-cloud-starter-stream-kafka依赖
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.bootgroupId>
<artifactId>spring-boot-dependenciesartifactId>
<version>2.2.2.RELEASEversion>
<type>pomtype>
<scope>importscope>
dependency>
<dependency>
<groupId>org.springframework.cloudgroupId>
<artifactId>spring-cloud-dependenciesartifactId>
<version>Hoxton.SR1version>
<type>pomtype>
<scope>importscope>
dependency>
dependencies>
dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloudgroupId>
<artifactId>spring-cloud-starter-stream-kafkaartifactId>
dependency>
dependencies>
配置文件如下所示。可以看到,我们在其中定义所使用的Binder类型及相关底层中间件的连接信息。然后定义相关通道,在Kafka中通道的目的地为Topic(RabbitMQ下则为Exchange)
server:
port: 83
spring:
application:
name: order
cloud:
stream:
binders:
# 定义一个名为 myKafka 的 Kafka Binder
myKafka:
type: kafka
# Kafka的地址信息
environment:
spring:
kafka:
bootstrap-servers: 127.0.0.1:9092
bindings:
# 定义一个名为 alarmOutput 的通道
alarmOutput:
# 通道目的地: topic.alarm 主题
destination: topic.alarm
# 消息类型定义: json
content-type: application/json
# 使用刚刚定义过的Binder
binder: myKafka
# 定义一个名为 billOutput 的通道
billOutput:
# 通道目的地: topic.bill 主题
destination: topic.bill
# 消息类型定义: json
content-type: application/json
# 使用刚刚定义过的Binder
binder: myKafka
通过接口定义生产者的通道。具体地,通过@Output注解来标识Stream模型中的输出通道。其中通道名即为配置文件中所定义的通道名。在Stream的模型中,生产者发布的消息通过输出通道离开应用程序
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
/**
* 生产者通道,即Stream模型中的输出通道
*/
public interface MySource {
/**
* 主题 topic.alarm 的输出通道,名为 alarmOutput
*/
String alarmOutput = "alarmOutput";
/**
* 主题 topic.bill 的输出通道,名为 billOutput
*/
String billOutput = "billOutput";
@Output(alarmOutput)
MessageChannel alarmOutput();
@Output(billOutput)
MessageChannel billOutput();
}
现在来实现生产者的消息发送方法。首先向@EnableBinding注解传入刚刚的接口类MySource来使能绑定过程,然后注入MySource实例以调用相关的发送方法,实现如下。由于@EnableBinding注解本身继承了@Configuration注解,故无需开发者自行添加
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;
@EnableBinding(MySource.class)
public class SendService {
@Autowired
private MySource mySource;
public void sendAlarm(AlarmMsg alarmMsg) {
String json = JSON.toJSONString(alarmMsg);
// 获取相应地输出通道
MessageChannel messageChannel = mySource.alarmOutput();
messageChannel.send(MessageBuilder.withPayload(json).build() );
}
public void sendBill(Bill bill) {
String json = JSON.toJSONString(bill);
// 获取相应地输出通道
MessageChannel messageChannel = mySource.billOutput();
messageChannel.send(MessageBuilder.withPayload(json).build() );
}
}
现在,我们添加一个Controller来便于我们发送消息进行测试
@RestController
@RequestMapping("order")
public class MqController {
@Autowired
private SendService sendService;
@GetMapping("/sendAlarm")
public void sendAlarm(@RequestParam String msg) {
AlarmMsg alarmMsg = AlarmMsg.builder()
.msg(msg)
.level(3)
.type("ERROR")
.build();
sendService.sendAlarm(alarmMsg);
}
@GetMapping("/sendBill")
public void sendBill(@RequestParam Double money ) {
Bill bill = Bill.builder()
.money(money)
.remark("消费")
.build();
sendService.sendBill(bill);
}
}
消息消费者
现在来实现消息的消费者payment服务,同样需要向POM引入spring-cloud-starter-stream-kafka依赖,此处不再赘述。消费者相关配置如下所示,可以看到生产者、消费者虽然使用不同的通道,但其是通过相同的目的地(即主题)实现对接
server:
port: 8008
spring:
application:
name: payment
cloud:
stream:
binders:
# 定义一个名为myKafka的Kafka Binder
myKafka:
type: kafka
# Kafka的地址信息
environment:
spring:
kafka:
bootstrap-servers: 127.0.0.1:9092
bindings:
# 定义一个名为 alarmInput 的通道
alarmInput:
# 通道目的地: topic.alarm 主题
destination: topic.alarm
# 消息类型定义: json
content-type: application/json
# 使用刚刚定义过的Binder
binder: myKafka
# 定义一个名为 billInput 的通道
billInput:
# 通道目的地: topic.bill 主题
destination: topic.bill
# 消息类型定义: json
content-type: application/json
# 使用刚刚定义过的Binder
binder: myKafka
通过接口定义消费者的通道。具体地,通过@Input注解来标识Stream模型中的输入通道。其中通道名即为配置文件中所定义的通道名。在Stream的模型中,消费者通过输入通道接收、消费进入应用程序的消息
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;
/**
* 消费者通道,即Stream模型中的输入通道
*/
public interface MySink {
/**
* 主题 topic.alarm 的输入通道,名为 alarmInput
*/
String alarmInput = "alarmInput";
/**
* 主题 topic.bill 的输入通道,名为 billInput
*/
String billInput = "billInput";
@Input(alarmInput)
SubscribableChannel alarmInput();
@Input(billInput)
SubscribableChannel billInput();
}
现在来实现消息的监听。首先向@EnableBinding注解传入刚刚的接口类MySink来使能绑定过程,然后向@StreamListener注解传入相关输入通道的名称,实现消息的监听消费。实现如下所示
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
@EnableBinding(MySink.class)
public class ReceiveService {
@Value("${server.port}")
private Integer serverPort;
@StreamListener(MySink.alarmInput)
public void receiveAlarm(String msg) {
AlarmMsg alarmMsg = JSON.parseObject(msg, AlarmMsg.class);
String info = "[ Payment:"+serverPort+" ]: " + alarmMsg;
System.out.println(info);
}
@StreamListener(MySink.billInput)
public void receiveBill(String msg) {
Bill bill = JSON.parseObject(msg, Bill.class);
String info = "[ Payment:"+serverPort+" ]: " + bill;
System.out.println(info);
}
}
验证
启动消息生产者order、消费者payment服务,并向order服务发送HTTP请求以触发消息的发送。如下所示
消费者payment服务收到的消息,如下所示,符合预期
Note
对于同一消费者服务的多个实例,未避免消息的重复消费,需将各实例的消费者群组的名称设置为相同的。具体地,可在配置文件通过 spring.cloud.stream.bindings.
参考文献
Spring微服务实战 John Carnell著