SpringCloud Stream下基于Kafka的消息驱动实践

ProjectDaedalus

共 6454字,需浏览 13分钟

 ·

2021-11-14 19:04

SpringCloud Stream针对消息中间件服务致力于提供统一的编程模型

abstract.jpeg

SpringCloud Stream 模型

针对市面上多种消息中间件并存的局面,Spring Cloud Stream作为一个构建消息驱动的微服务框架。其目标在于对开发者提供统一的消息中间件操作接口,屏蔽底层中间件的实现细节。可以看到Stream本身并不具有消息中间件的能力,其底层依然需要依赖具体的中间件来实现消息服务。现阶段Stream支持RabbitMQ、Kafka两种消息中间件。在Stream的模型中,其针对各消息中间件均提供了相应的Binder,其实现了对具体中间件的封装、屏蔽功能。同时开发者在应用程序中只需通过相应的通道(输出通道output、输入通道input)与Binder进行交互,即可实现消息的生产、消费。其模型架构如下所示

figure 1.png

消息生产者

这里我们以Kafka为例介绍如何通过SpringCloud Stream使用消息服务。首先建立一个消息的生产者order服务,向POM中引入spring-cloud-starter-stream-kafka依赖

<dependencyManagement>
  <dependencies>

    
    <dependency>
      <groupId>org.springframework.bootgroupId>
      <artifactId>spring-boot-dependenciesartifactId>
      <version>2.2.2.RELEASEversion>
      <type>pomtype>
      <scope>importscope>
    dependency>

    
    <dependency>
      <groupId>org.springframework.cloudgroupId>
      <artifactId>spring-cloud-dependenciesartifactId>
      <version>Hoxton.SR1version>
      <type>pomtype>
      <scope>importscope>
    dependency>

  dependencies>
dependencyManagement>

<dependencies>

  
  <dependency>
    <groupId>org.springframework.cloudgroupId>
    <artifactId>spring-cloud-starter-stream-kafkaartifactId>
  dependency>

dependencies>

配置文件如下所示。可以看到,我们在其中定义所使用的Binder类型及相关底层中间件的连接信息。然后定义相关通道,在Kafka中通道的目的地为Topic(RabbitMQ下则为Exchange)

server:
  port: 83

spring:
  application:
    name: order
  cloud:
    stream:
      binders:
        # 定义一个名为 myKafka 的 Kafka Binder
        myKafka:
          type: kafka
          # Kafka的地址信息
          environment:
            spring:
              kafka:
                bootstrap-servers: 127.0.0.1:9092
      bindings:
        # 定义一个名为 alarmOutput 的通道
        alarmOutput:
          # 通道目的地: topic.alarm 主题
          destination: topic.alarm
          # 消息类型定义: json
          content-type: application/json
          # 使用刚刚定义过的Binder
          binder: myKafka
        # 定义一个名为 billOutput 的通道
        billOutput:
          # 通道目的地: topic.bill 主题
          destination: topic.bill
          # 消息类型定义: json
          content-type: application/json
          # 使用刚刚定义过的Binder
          binder: myKafka

通过接口定义生产者的通道。具体地,通过@Output注解来标识Stream模型中的输出通道。其中通道名即为配置文件中所定义的通道名。在Stream的模型中,生产者发布的消息通过输出通道离开应用程序

import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;

/**
 * 生产者通道,即Stream模型中的输出通道
 */

public interface MySource {

    /**
     * 主题 topic.alarm 的输出通道,名为 alarmOutput
     */

    String alarmOutput = "alarmOutput";

    /**
     * 主题 topic.bill 的输出通道,名为 billOutput
     */

    String billOutput = "billOutput";

    @Output(alarmOutput)
    MessageChannel alarmOutput();

    @Output(billOutput)
    MessageChannel billOutput();

}

现在来实现生产者的消息发送方法。首先向@EnableBinding注解传入刚刚的接口类MySource来使能绑定过程,然后注入MySource实例以调用相关的发送方法,实现如下。由于@EnableBinding注解本身继承了@Configuration注解,故无需开发者自行添加

import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;

@EnableBinding(MySource.class)
public class SendService 
{

    @Autowired
    private MySource mySource;

    public void sendAlarm(AlarmMsg alarmMsg) {
        String json = JSON.toJSONString(alarmMsg);
        // 获取相应地输出通道
        MessageChannel messageChannel =  mySource.alarmOutput();
        messageChannel.send(MessageBuilder.withPayload(json).build() );
    }

    public void sendBill(Bill bill) {
        String json = JSON.toJSONString(bill);
        // 获取相应地输出通道
        MessageChannel messageChannel =  mySource.billOutput();
        messageChannel.send(MessageBuilder.withPayload(json).build() );
    }

}

现在,我们添加一个Controller来便于我们发送消息进行测试

@RestController
@RequestMapping("order")
public class MqController {

    @Autowired
    private SendService sendService;

    @GetMapping("/sendAlarm")
    public void sendAlarm(@RequestParam String msg) {
        AlarmMsg alarmMsg = AlarmMsg.builder()
            .msg(msg)
            .level(3)
            .type("ERROR")
            .build();
        sendService.sendAlarm(alarmMsg);
    }

    @GetMapping("/sendBill")
    public void sendBill(@RequestParam Double money ) {
        Bill bill = Bill.builder()
            .money(money)
            .remark("消费")
            .build();
        sendService.sendBill(bill);
    }

}

消息消费者

现在来实现消息的消费者payment服务,同样需要向POM引入spring-cloud-starter-stream-kafka依赖,此处不再赘述。消费者相关配置如下所示,可以看到生产者、消费者虽然使用不同的通道,但其是通过相同的目的地(即主题)实现对接

server:
  port: 8008

spring:
  application:
    name: payment
  cloud:
    stream:
      binders:
        # 定义一个名为myKafka的Kafka Binder
        myKafka:
          type: kafka
          # Kafka的地址信息
          environment:
            spring:
              kafka:
                bootstrap-servers: 127.0.0.1:9092
      bindings:
        # 定义一个名为 alarmInput 的通道
        alarmInput:
          # 通道目的地: topic.alarm 主题
          destination: topic.alarm
          # 消息类型定义: json
          content-type: application/json
          # 使用刚刚定义过的Binder
          binder: myKafka
        # 定义一个名为 billInput 的通道
        billInput:
          # 通道目的地: topic.bill 主题
          destination: topic.bill
          # 消息类型定义: json
          content-type: application/json
          # 使用刚刚定义过的Binder
          binder: myKafka

通过接口定义消费者的通道。具体地,通过@Input注解来标识Stream模型中的输入通道。其中通道名即为配置文件中所定义的通道名。在Stream的模型中,消费者通过输入通道接收、消费进入应用程序的消息

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;

/**
 * 消费者通道,即Stream模型中的输入通道
 */

public interface MySink {

    /**
     * 主题 topic.alarm 的输入通道,名为 alarmInput
     */

    String alarmInput = "alarmInput";

    /**
     * 主题 topic.bill 的输入通道,名为 billInput
     */

    String billInput = "billInput";

    @Input(alarmInput)
    SubscribableChannel alarmInput();

    @Input(billInput)
    SubscribableChannel billInput();

}

现在来实现消息的监听。首先向@EnableBinding注解传入刚刚的接口类MySink来使能绑定过程,然后向@StreamListener注解传入相关输入通道的名称,实现消息的监听消费。实现如下所示

import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;

@EnableBinding(MySink.class)
public class ReceiveService 
{

    @Value("${server.port}")
    private Integer serverPort;

    @StreamListener(MySink.alarmInput)
    public void receiveAlarm(String msg) {
        AlarmMsg alarmMsg = JSON.parseObject(msg, AlarmMsg.class);
        String info =  "[ Payment:"+serverPort+" ]: " + alarmMsg;
        System.out.println(info);
    }

    @StreamListener(MySink.billInput)
    public void receiveBill(String msg) {
        Bill bill = JSON.parseObject(msg, Bill.class);
        String info =  "[ Payment:"+serverPort+" ]: " + bill;
        System.out.println(info);
    }

}

验证

启动消息生产者order、消费者payment服务,并向order服务发送HTTP请求以触发消息的发送。如下所示

figure 2.png

消费者payment服务收到的消息,如下所示,符合预期

figure 3.jpeg

Note

对于同一消费者服务的多个实例,未避免消息的重复消费,需将各实例的消费者群组的名称设置为相同的。具体地,可在配置文件通过 spring.cloud.stream.bindings..group 配置项设置消费者群组的名称

参考文献

  1. Spring微服务实战 John Carnell著
浏览 63
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报