RocketMQ快速入门,看这篇就够了~

Java技术迷

共 16066字,需浏览 33分钟

 · 2021-04-02

汪伟俊 作者

Java技术迷 | 出品

MQ,是消息队列(Message Queue)的简称,它是一种跨进程的通信机制,作用是传递消息。

MQ最常见的场景是用户注册,当用户在某个网站成功注册后,需要发送注册邮件和短信通知,以告知用户注册成功;传统的做法是在用户注册成功后便立马发送邮件和短信,当发送完成后才返回结果给客户端,用户才能够登录。这种做法的弊端是,用户在注册完成之后其实已经可以登录了,没有必要再等待邮件和短信的发送,所以我们可以使用MQ来异步地实现邮件和短信的发送。异步解耦是MQ的主要特点,主要目的是减少请求响应时间和解耦,主要的使用场景就是将比较耗时而且不需要即时(同步)返回结果的操作作为消息放入消息队列,同时,由于使用了消息队列MQ,只要保证消息格式不变,消息的发送方和接收方并不需要彼此联系,也不需要受对方的影响。

MQ也常常被用于流量削峰,比如在秒杀、团购等活动时,由于用户访问量非常大,会导致服务无法承受这么大的流量,甚至导致系统崩溃,为此,可在服务调用之间加入MQ,使得巨大的流量按照顺序依次调用服务,精准控制服务的最大请求数,保证服务的可用性。

这里我们介绍阿里巴巴的开源中间件,RocketMQ,底层采用Java语言实现,性能非常好,能够撑住淘宝双十一的巨大流量就足见其能力了。

01

环境搭建

首先下载RocketMQ的压缩包:

wget https://archive.apache.org/dist/rocketmq/4.5.1/rocketmq-all-4.5.1-bin-release.zip

然后将其解压:

unzip rocketmq-all-4.5.1-bin-release.zip

解压完成后切换到bin目录,启动NameServer:

nohup sh mqnamesrv &

查看日志文件判断是否启动成功:

tail -f ~/logs/rocketmqlogs/namesrv.log

这样则说明启动成功。接下来启动Broker,同样在bin目录下执行指令:

nohup sh mqbroker -n localhost:9876 &

Broker需要指定NameServer的地址用于连接,NameServer默认端口号为9876,启动后查看日志文件:并没有发现Broker的日志文件,我们猜测应该是启动失败了,查看bin目录下的nohup.out文件:可以看到是因为内存的问题,事实上,RocketMQ默认配置的内存比较大,而我们的虚拟机并没有这么大的内存,因此启动失败了,所以我们需要修改一下RocketMQ的默认内存配置:

vim runbroker.sh
image.png
vim runserver.sh

此时重新执行指令启动Broker,然后查看日志:

tail ~/logs/rocketmqlogs/broker.log

启动成功。这里我们启动了两个东西,分别是NameServer和Broker,先不用管这两个东西是什么,后续会对它们进行分析。

02

RocketMQ初体验

环境搭建完成后,我们来体验一下RocketMQ的功能,测试一下发送消息和接收消息。

这里我们借助RocketMQ的一个Demo来发送消息,首先需要修改脚本的内容:

vim tools.sh

在文件中添加这样一段内容:然后执行脚本发送消息:

# 使用安装包的Demo发送消息,该Demo脚本在bin目录下
sh tools.sh org.apache.rocketmq.example.quickstart.Producer

这样就表示发送成功了,消息发送完成后,我们来接收一下消息:

sh tools.sh org.apache.rocketmq.example.quickstart.Consumer

成功接收到了消息,而且该窗口会一直处于监听状态,当有新消息发送时,便会立马接收到。

03

RocketMQ核心架构和概念

RocketMQ架构中分为消息生产者和消息消费者,生产者生产消息放入RocketMQ,消费者从RocketMQ中取出消息进行消费。RocketMQ中的Broker是其核心组件,用于消息的接收、存储和投递,而NameServer是用来管理Broker的。

首先消息生产者会向NameServer中获取Broker,所以在这之前我们就要将Broker注册到NameServer中,获取到Broker以后,就可以生产消息并交由Broker存储,Broker内部由MessageQueue存储消息,而MessageQueue又会被逻辑划分为一个一个的topic,结构如下:消息消费者也需要从NameServer中获取Broker,获取到Broker后,取出消息进行消费。

我们可以对这一架构进行一个形象的比喻,把消息生产者看作是寄件人,消息消费者看作收件人,Broker就是邮递员,NameServer就是邮局,MessageQueue是具体的邮件,而topic是地区的划分,其结构如下:下面是官方对一些概念的定义:

  • Broker:Broker是RocketMQ的核心,负责消息的接收、存储、投递等功能
  • NameServer:消息队列的协调者,Broker向它注册路由信息,同时Producer和Consumer向其获取路由信息
  • Producer:消息的生产者,需要从NameServer中获取Broker信息,然后与Broker建立连接,向Broker发送消息
  • Consumer:消息的消费者,需要从NameServer中获取Broker信息,然后与Broker建立连接,从Broker中获取消息
  • Topic:用来区分不同类型的消息,发送和接收消息前都需要先创建Topic,针对Topic来发送和接收消息
  • Message Queue:为了提高系统的吞吐量,引入了Message Queue,一个Topic可以设置一个或多个Message Queue,这样消息就可以并行地往各个Message Queue发送消息,消费者也可以并行地从Message Queue中读取消息
  • Message:消息的载体
  • Producer Group:生产者组,多个发送同一类消息的生产者称之为一个生产者组
  • Consumer Group:消费者组,多个消费同一类消息的消费者称之为一个消费者组

官方还为我们提供了一个可视化的操作页面,能够轻松监控消息的生产消费等等功能,我们只需将其项目下载下来然后运行即可,来到项目地址 https://github.com/apache/rocketmq-externals/releases :若是下载速度过慢也可以使用下面的加速链接:

https://github.91chifun.workers.dev/https://github.com//apache/rocketmq-externals/archive/refs/tags/rocketmq-console-1.0.0.zip

下载好后将其解压出来:这是一个SpringBoot应用,我们需要修改一下应用的配置:首先是端口号,随意填写,不要被其它程序占用了即可,然后配置NameServer的地址,我们需要将该应用也注册到NameServer中才能够查询Broker的信息。然后使用Maven进行打包:

mvn clean package -Dmaven.test.skip=true

打包完成后会生成target目录,在该目录下有打包好的jar文件,执行它:

java -jar rocketmq-console-ng-1.0.0.jar

此时访问 http://localhost:5555/ :右上角可以将语言切换为中文。若是启动控制台报错:需要修改一下Broker配置,在RocketMQ安装目录下的conf文件夹中,修改一下broker.conf:添加这样一项配置:

brokerIP1=192.168.190.134     # 你的虚拟机IP
nameSrvAddr=192.168.190.134:9876

然后重启一下Broker:

sh mqshutdown broker # 关闭broker
nohup sh mqbroker -n localhost:9876 -c conf/broker.conf &

-c conf/broker.conf 用于重新加载配置文件,此时访问控制台将不会产生错误。

04

SpringBoot整合RocketMQ

我们已经简单地了解到RocketMQ的相关概念和一些操作,接下来将其与SpringBoot进行整合,引入依赖:

<dependency>
  <groupId>org.apache.rocketmq</groupId>
  <artifactId>rocketmq-spring-boot-starter</artifactId>
  <version>2.0.2</version>
</dependency>

然后就可以发送消息了:

public static void main(String[] args) throws Exception {
    // 创建消息生产者,并指定生产者组名
    DefaultMQProducer producer = new DefaultMQProducer("myproducer-group");
    // 设置NameServer地址
    producer.setNamesrvAddr("192.168.190.134:9876");
    // 启动生产者
    producer.start();
    // 创建消息对象
    Message message = new Message("myTopic""myTag", ("myMessage").getBytes());
    // 发送消息,设置超时时间
    SendResult result = producer.send(message, 10000);
    System.out.println(result);
    // 关闭生产者
    producer.shutdown();
}

若是出现 No route info of this topic 错误,则重新启动一下Broker,带上 autoCreateTopicEnable=true 参数:

nohup sh mqbroker -n localhost:9876 autoCreateTopicEnable=true -c ../conf/broker.conf

重新运行发送消息的代码,输出如下:

SendResult [sendStatus=SEND_OK, msgId=AC1221A12EB418B4AAC28B1A483F0000, offsetMsgId=C0A8BE8600002A9F000000000002BEB2, messageQueue=MessageQueue [topic=myTopic, brokerName=broker-a, queueId=0], queueOffset=0]

这样就发送成功了,然后我们来接收一下消息,编写代码:

public static void main(String[] args) throws Exception {
    // 创建消费者,并指定消费者组名
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("myconsumer-group");
    // 设置NameServer地址
    consumer.setNamesrvAddr("192.168.190.134:9876");
    // 指定消费者订阅的主题和标签
    consumer.subscribe("myTopic""*");
    // 设置回调函数
    consumer.registerMessageListener(new MessageListenerConcurrently() {
        // 处理消息
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
            System.out.println("接收到的消息:" + list);
            // 消费成功
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });
    // 启动消费者
    consumer.start();
}

输出如下:

接收到的消息:[MessageExt [queueId=0, storeSize=170, queueOffset=0, sysFlag=0, bornTimestamp=1616861755457, bornHost=/192.168.190.1:65330, storeTimestamp=1616890518525, storeHost=/192.168.190.134:10911, msgId=C0A8BE8600002A9F000000000002BEB2, commitLogOffset=179890, bodyCRC=475410043, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='myTopic', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1, CONSUME_START_TIME=1616862486130, UNIQ_KEY=AC1221A12EB418B4AAC28B1A483F0000, WAIT=true, TAGS=myTag}, body=[1091217710111511597103101], transactionId='null'}]]

消费成功。

05

消息类型

在RocketMQ中,消息被分为了三种类型:

  1. 普通消息
  2. 顺序消息
  3. 事务消息

下面分别来看看RocketMQ中如何处理这三种消息。

06

普通消息

RocketMQ提供了三种方式来发送普通消息:

  • 可靠同步发送:指消息发送方发出数据后,在收到接收方发回响应之后才进行下一个数据包的发送
  • 可靠异步发送:指消息发送发发出数据后,不等接收方发回响应就进行下一个数据包的发送,发送方通过回调接口接收服务器响应,并对响应结果进行处理
  • 单向发送:指发送方只负责发送消息,不等待接收方响应也没有回调方法触发

发送同步消息非常简单,代码如下:

@RunWith(SpringRunner.class)
@SpringBootTest
public class SendMessageTypeTest 
{

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    @Test
    public void testSyncSend(){
        // 参数一:topic
        // 参数二:消息内容
        // 参数三:超时时间
        SendResult result = rocketMQTemplate.syncSend("myTopic1""testSyncSend"10000);
        System.out.println(result);
    }
}

若是发布异步消息,只需修改一下调用方法即可:

@RunWith(SpringRunner.class)
@SpringBootTest
public class SendMessageTypeTest 
{

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    @Test
    public void testAsyncSend() throws InterruptedException {
        // 参数一:topic
        // 参数二:消息内容
        // 参数三:回调
        rocketMQTemplate.asyncSend("myTopic2""testAsyncSend"new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                System.out.println(sendResult);
            }

            @Override
            public void onException(Throwable throwable) {
                System.out.println(throwable);
            }
        });
        Thread.sleep(300000);
    }
}

发送异步消息的时候要注意,因为它是基于回调的形式处理消息结果的,所以我们需要对主线程进行休眠,让其一直等待回调方法被执行,否则主线程会立马执行结束,回调方法也就无法被调用了。

最后是单向发送消息:

@RunWith(SpringRunner.class)
@SpringBootTest
public class SendMessageTypeTest 
{

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    @Test
    public void testOneWay() {
        rocketMQTemplate.sendOneWay("myTopic3","testOneWay");
    }
}

07

顺序消息

顺序消息是消息队列提供的一种严格按照顺序来发布和消费的消息类型,我们知道,RocketMQ中的Broker将消息存储在MessageQueue中,若是现在发送三条消息,这三条消息分别被存储在三个不同的MessageQueue中,该如何保证在消费这些消息时的顺序与发布时的顺序一致呢?这就需要借助顺序消息了。

@RunWith(SpringRunner.class)
@SpringBootTest
public class SendMessageTypeTest 
{

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    @Test
    public void testOneWayOrderly() {
        rocketMQTemplate.sendOneWayOrderly("myTopic","testOneWayOrderly","a");
    }
}

只需要调用sendOneWayOrderly方法即可,该方法需要接收一个特殊的参数,即hashKey,RocketMQ正是通过它来决定消息该被发送到哪个MessageQueue中,值可以任意填写,只需要有所区分即可。

08

事务消息

事务消息是支持事务的消息,通过事务消息,我们就能够实现分布式事务的最终一致性。

事务消息的流程如下:

首先消息生产者发送事务消息给RocketMQ,RocketMQ会进行响应,当接收到响应后,生产者会调用本地的事务方法,调用本地事务方法有两种结果,成功和失败,成功返回COMMIT,失败返回ROLLBACK。若是遇到网络等问题导致事务消息的二次确认丢失,RocketMQ会进行回查,即查询本地事务的状态,然后继续进行相关操作。

这里我们模拟一个下单业务,首先编写发送事务消息代码:

public void createOrderBefore(Order order) {
    String txId = UUID.randomUUID().toString();
    // 发送事务消息
    rocketMQTemplate.sendMessageInTransaction(
        "tx-producer-group",
        "tx-topic",
        MessageBuilder.withPayload(order).setHeader("txId", txId).build(),
        order
    );
}

然后需要编写一个接口实现RocketMQLocalTransactionListener:

@Service
@RocketMQTransactionListener(txProducerGroup = "tx-producer-group")
public class OrderServiceImplListener implements RocketMQLocalTransactionListener {

    @Autowired
    private OrderService orderService;
    @Autowired
    private TxLogMapper txLogMapper;

    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
        String txId = (String) message.getHeaders().get("txId");
        try {
            // 执行本地事务
            Order order = (Order) o;
            orderService.createOrder(order, txId);
            return RocketMQLocalTransactionState.COMMIT;
        } catch (Exception e) {
            return RocketMQLocalTransactionState.ROLLBACK;
        }
    }

    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
        // 消息回查
        String txId = (String) message.getHeaders().get("txId");
        TxLog txLog = txLogMapper.selectById(txId);
        if (txLog != null) {
            // 本地事务执行成功
            return RocketMQLocalTransactionState.COMMIT;
        } else {
            return RocketMQLocalTransactionState.ROLLBACK;
        }
    }
}

该接口需要实现两个方法,分别是对事务方法的处理和消息回查的逻辑,在事务处理方法中,我们调用本地事务即可,本地事务代码如下:

@Transactional
@Override
public void createOrder(Order order, String txId) {
    orderMapper.insert(order);
    TxLog txLog = new TxLog();
    txLog.setTxId(txId);
    txLog.setDate(new Date());
    txLogMapper.insert(txLog);
}

这里的消息回查是基于一张数据表实现的,所以创建Bean:

@Data
@TableName("shop_txlog")
public class TxLog {

    @TableId(type = IdType.AUTO)
    private String txId;
    private Date date;
}

因为订单和该数据是在同一个事务方法中,它们俩会同时成功或失败,所以可以通过查询指定订单id是否在该表中存在数据来判断事务方法是否执行成功,最后调用 createOrderBefore 方法进行测试即可。

以上便是关于RocketMQ的全部内容了,本篇文章比较基础,适合初学者学习。

本文作者:汪伟俊 为Java技术迷专栏作者 投稿,未经允许请勿转载。

1、网曝IDEA2020.3.2,自动注释类和方法注释模板配置

2、牛逼!IntelliJ IDEA居然支持视频聊天了~速来尝鲜!快来冲一波

3、微信这些表情包,我可能再也不敢用了!你还用吗?

4、知名国产网盘翻车?清空免费用户文件后,又开始清理付费用户资源

5、Chrome新功能曝光:你访问的敏感网站可以自动隐藏起来

6、万万没想到,“红孩儿”竟然做了程序员,还是CTO!

7、徒手撸一个Spring Boot中的starter,解密自动化配置,超级棒!

点分享

点收藏

点点赞

点在看

浏览 49
点赞
评论
收藏
分享

手机扫一扫分享

举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

举报