celery-mq-assistantMQ 助手

联合创作 · 2023-10-01 04:03

MQ助手 - 是一个基于pulsar包自研实现的Spring Boot Stater。通过简单注解配置即可进行MQ消息生产与消费。


主要特性



  • 基于成熟pulsar包扩展,没有任何框架变动,只为简化开发使用

  • 配置简单灵活,无需复杂的配置文件:开发者可以快速注解类或者方法实现消息的生产与消费

  • Apache Pulsar 云原生分布式消息流平台,当下最佳解决方案


使用指引


引入依赖



implementation("cool.doudou:mq-assistant:latest")

Pulsar配置



pulsar:
service-url: pulsar://127.0.0.1:6650
subscription-name: sub-celery
subscription-type: Shared

使用方式



消息订阅


  • 生产者与topic进行关联绑定



/**
* 生产者主题绑定
*/
@MqProducer(topics = {"celery"})
@Component
public class MqComponent {
}


  • 消费者与topic进行关联绑定,注意:每个消费者须绑定一个subscription-name后才能进行消费



/**
* 消费者主题绑定
*/
@Component
public class MqComponent {
@MqConsumer(topics = {"celery"})
public void receive(String topic, byte[] msg) {
System.out.println("consumer: topic[" + topic + "] => " + new String(msg));
}
}


消息发送


  • send():发送

  • sendAsync():异步发送



/**
* 消息发送
*/
@AllArgsConstructor
@Service
public class MqServiceImpl {
private MqHelper mqHelper;

public void test() {
// 同步
String msgId = mqHelper.send("celery", "hello");
System.out.println("send: " + msgId);

// 异步
mqHelper.sendAsync("celery", "您好Async", System.out::println);

// 同步
String msgId = mqHelper.send("celery", new byte[]{0x01, 0x02, 0x03, 0x04});
System.out.println("send: " + msgId);

// 异步
mqHelper.sendAsync("celery", new byte[]{0x01, 0x02, 0x03, 0x04}, System.out::println);
}
}
浏览 34
点赞
评论
收藏
分享

手机扫一扫分享

编辑 分享
举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

编辑 分享
举报