如何优雅地开始研究一个新技术

低并发编程

共 17467字,需浏览 35分钟

 · 2021-05-14

低并发编程
战略上藐视技术,战术上重视技术

不讲过多大道理,本文以我开始研究 rocketmq 为例,就最近的事。

大家不要嘲笑我,我对 rocketmq 一无所知,所以写这篇文章刚好合适,正好也记录下我开始学习 rocketmq 这个冷启动阶段,大家看看对自己是否有帮助。

这篇文章是我边看边写的,力求还原一下我最真实的启动过程,不了解 rocketmq 的不用担心,甚至更好,因为本文不会涉及到 rocketmq 的细节,只是突出从一无所知开始研究它的启动阶段。

开始吧。


先把源码跑起来


第一步,先把源码搞到手。

百度出来它的官方网站:

http://rocketmq.apache.org/

傻瓜式的首页,非常友好,直接点击最大的那个 Getting Started 按钮。

然后就进入了 quick-start 文档页。

根据开头给的 4.8.0 版本的 rocketmq 源码地址 

https://www.apache.org/dyn/closer.cgi?path=rocketmq/4.8.0/rocketmq-all-4.8.0-source-release.zip 

把它下载了下来。

是个 Java 项目,还是 maven 构建的,那好办了,直接 idea 打开!

紧接着人家就教我如何构建它。

> mvn -Prelease-all -DskipTests clean install -U
cd distribution/target/rocketmq-4.8.0/rocketmq-4.8.0

mvn 构建这一步就受挫了,好像是 maven 默认中央仓库好多 jar 包下载不了,于是我把 maven 配置文件的中央仓库改成阿里的,就好了。

<mirror>
  <id>alimaven</id>
  <name>aliyun maven</name
    <url>
http://maven.aliyun.com/nexus/content/repositories/central/</url>
  <mirrorOf>central</mirrorOf>
</mirror>

阿里云牛逼!(此处应有广告位)


体验一下最简操作


接着往下读文档,现在代码已经到手,也构建成功,接下来讲的是如果体验一下发个消息,再收个消息的过程。

我是 windows 系统,就照着下面 windows 的教程来,就是这么任性。

嗯,配置一下环境变量而已。

接下来是四个启动脚本,很清晰。

## Start Name Server
.\bin\mqnamesrv.cmd
## Start Broker
.\bin\mqbroker.cmd -n localhost:9876 autoCreateTopicEnable=true
## Send Messages
.\bin\tools.cmd  org.apache.rocketmq.example.quickstart.Producer
## Receive Messages
.\bin\tools.cmd  org.apache.rocketmq.example.quickstart.Consumer

我看了一下这些 cmd 文件,意思就是执行这些类的 main 方法而已,于是我改在 idea 里分别执行他们,大概长这个样子。

‍‍‍‍‍‍‍

具体看下较为关心的两个 main 方法。

Producer 的 main 方法,很简单,简化版如下。

DefaultMQProducer producer = new DefaultMQProducer("example");
producer.start();
for (int i = 0; i < 1000; i++) {
    Message msg = new Message(...);
    SendResult sendResult = producer.send(msg);
    System.out.printf("%s%n", sendResult);
}

执行后,会看到一堆消息作为生产者被生产出来。

...
SendResult [sendStatus=SEND_OK, msgId=7F000001162C18B4AAC28D83A84C03DF, offsetMsgId=0AE84A8400002A9F00000000000F7133, messageQueue=MessageQueue [topic=TopicTest, brokerName=DESKTOP-Q6M76VI, queueId=2], queueOffset=1244]
SendResult [sendStatus=SEND_OK, msgId=7F000001162C18B4AAC28D83A84C03E0, offsetMsgId=0AE84A8400002A9F00000000000F71FE, messageQueue=MessageQueue [topic=TopicTest, brokerName=DESKTOP-Q6M76VI, queueId=3], queueOffset=1249]
SendResult [sendStatus=SEND_OK, msgId=7F000001162C18B4AAC28D83A84D03E1, offsetMsgId=0AE84A8400002A9F00000000000F72C9, messageQueue=MessageQueue [topic=TopicTest, brokerName=DESKTOP-Q6M76VI, queueId=0], queueOffset=1244]
SendResult [sendStatus=SEND_OK, msgId=7F000001162C18B4AAC28D83A84D03E2, offsetMsgId=0AE84A8400002A9F00000000000F7394, messageQueue=MessageQueue [topic=TopicTest, brokerName=DESKTOP-Q6M76VI, queueId=1], queueOffset=1251]
SendResult [sendStatus=SEND_OK, msgId=7F000001162C18B4AAC28D83A84D03E3, offsetMsgId=0AE84A8400002A9F00000000000F745F, messageQueue=MessageQueue [topic=TopicTest, brokerName=DESKTOP-Q6M76VI, queueId=2], queueOffset=1245]
SendResult [sendStatus=SEND_OK, msgId=7F000001162C18B4AAC28D83A84D03E4, offsetMsgId=0AE84A8400002A9F00000000000F752A, messageQueue=MessageQueue [topic=TopicTest, brokerName=DESKTOP-Q6M76VI, queueId=3], queueOffset=1250]
SendResult [sendStatus=SEND_OK, msgId=7F000001162C18B4AAC28D83A84E03E5, offsetMsgId=0AE84A8400002A9F00000000000F75F5, messageQueue=MessageQueue [topic=TopicTest, brokerName=DESKTOP-Q6M76VI, queueId=0], queueOffset=1245]
SendResult [sendStatus=SEND_OK, msgId=7F000001162C18B4AAC28D83A84E03E6, offsetMsgId=0AE84A8400002A9F00000000000F76C0, messageQueue=MessageQueue [topic=TopicTest, brokerName=DESKTOP-Q6M76VI, queueId=1], queueOffset=1252]
SendResult [sendStatus=SEND_OK, msgId=7F000001162C18B4AAC28D83A84E03E7, offsetMsgId=0AE84A8400002A9F00000000000F778B, messageQueue=MessageQueue [topic=TopicTest, brokerName=DESKTOP-Q6M76VI, queueId=2], queueOffset=1246]
...

同样,Consumer 的 mian 方法,也非常简单。

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("example");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("TopicTest""*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
        ConsumeConcurrentlyContext context)
 
{
        System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
});
consumer.start();

执行后会看到一堆消息被成功消费

...
ConsumeMessageThread_3 Receive New Messages: [MessageExt [brokerName=DESKTOP-Q6M76VI, queueId=3, storeSize=203, queueOffset=1239, sysFlag=0, bornTimestamp=1619580615742, bornHost=/10.232.74.132:55094, storeTimestamp=1619580615742, storeHost=/10.232.74.132:10911, msgId=0AE84A8400002A9F00000000000F5246, commitLogOffset=1004102, bodyCRC=493758879, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1251, CONSUME_START_TIME=1619580616221, UNIQ_KEY=7F000001162C18B4AAC28D83A83E03B8, CLUSTER=DefaultCluster, WAIT=true, TAGS=TagA}, body=[72101108108111328211199107101116778132575350], transactionId='null'}]] 
ConsumeMessageThread_8 Receive New Messages: [MessageExt [brokerName=DESKTOP-Q6M76VI, queueId=3, storeSize=203, queueOffset=1238, sysFlag=0, bornTimestamp=1619580615740, bornHost=/10.232.74.132:55094, storeTimestamp=1619580615740, storeHost=/10.232.74.132:10911, msgId=0AE84A8400002A9F00000000000F4F1A, commitLogOffset=1003290, bodyCRC=1688269248, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1251, CONSUME_START_TIME=1619580616221, UNIQ_KEY=7F000001162C18B4AAC28D83A83C03B4, CLUSTER=DefaultCluster, WAIT=true, TAGS=TagA}, body=[72101108108111328211199107101116778132575256], transactionId='null'}]] 
ConsumeMessageThread_5 Receive New Messages: [MessageExt [brokerName=DESKTOP-Q6M76VI, queueId=3, storeSize=203, queueOffset=1237, sysFlag=0, bornTimestamp=1619580615737, bornHost=/10.232.74.132:55094, storeTimestamp=1619580615737, storeHost=/10.232.74.132:10911, msgId=0AE84A8400002A9F00000000000F4BEE, commitLogOffset=1002478, bodyCRC=1830206955, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1251, CONSUME_START_TIME=1619580616221, UNIQ_KEY=7F000001162C18B4AAC28D83A83903B0, CLUSTER=DefaultCluster, WAIT=true, TAGS=TagA}, body=[72101108108111328211199107101116778132575252], transactionId='null'}]] 
ConsumeMessageThread_4 Receive New Messages: [MessageExt [brokerName=DESKTOP-Q6M76VI, queueId=3, storeSize=203, queueOffset=1236, sysFlag=0, bornTimestamp=1619580615735, bornHost=/10.232.74.132:55094, storeTimestamp=1619580615735, storeHost=/10.232.74.132:10911, msgId=0AE84A8400002A9F00000000000F48C2, commitLogOffset=1001666, bodyCRC=1786477042, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1251, CONSUME_START_TIME=1619580616221, UNIQ_KEY=7F000001162C18B4AAC28D83A83703AC, CLUSTER=DefaultCluster, WAIT=true, TAGS=TagA}, body=[72101108108111328211199107101116778132575248], transactionId='null'}]] 
ConsumeMessageThread_18 Receive New Messages: [MessageExt [brokerName=DESKTOP-Q6M76VI, queueId=3, storeSize=203, queueOffset=1235, sysFlag=0, bornTimestamp=1619580615733, bornHost=/10.232.74.132:55094, storeTimestamp=1619580615733, storeHost=/10.232.74.132:10911, msgId=0AE84A8400002A9F00000000000F4596, commitLogOffset=1000854, bodyCRC=1280920064, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1251, CONSUME_START_TIME=1619580616221, UNIQ_KEY=7F000001162C18B4AAC28D83A83503A8, CLUSTER=DefaultCluster, WAIT=true, TAGS=TagA}, body=[72101108108111328211199107101116778132575154], transactionId='null'}]] 
ConsumeMessageThread_16 Receive New Messages: [MessageExt [brokerName=DESKTOP-Q6M76VI, queueId=3, storeSize=203, queueOffset=1234, sysFlag=0, bornTimestamp=1619580615731, bornHost=/10.232.74.132:55094, storeTimestamp=1619580615731, storeHost=/10.232.74.132:10911, msgId=0AE84A8400002A9F00000000000F426A, commitLogOffset=1000042, bodyCRC=1261735449, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1251, CONSUME_START_TIME=1619580616221, UNIQ_KEY=7F000001162C18B4AAC28D83A83303A4, CLUSTER=DefaultCluster, WAIT=true, TAGS=TagA}, body=[72101108108111328211199107101116778132575150], transactionId='null'}]] 
ConsumeMessageThread_10 Receive New Messages: [MessageExt [brokerName=DESKTOP-Q6M76VI, queueId=3, storeSize=203, queueOffset=1233, sysFlag=0, bornTimestamp=1619580615729, bornHost=/10.232.74.132:55094, storeTimestamp=1619580615729, storeHost=/10.232.74.132:10911, msgId=0AE84A8400002A9F00000000000F3F3E, commitLogOffset=999230, bodyCRC=855266886, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1251, CONSUME_START_TIME=1619580616221, UNIQ_KEY=7F000001162C18B4AAC28D83A83103A0, CLUSTER=DefaultCluster, WAIT=true, TAGS=TagA}, body=[72101108108111328211199107101116778132575056], transactionId='null'}]] 
ConsumeMessageThread_14 Receive New Messages: [MessageExt [brokerName=DESKTOP-Q6M76VI, queueId=3, storeSize=203, queueOffset=1232, sysFlag=0, bornTimestamp=1619580615727, bornHost=/10.232.74.132:55094, storeTimestamp=1619580615728, storeHost=/10.232.74.132:10911, msgId=0AE84A8400002A9F00000000000F3C12, commitLogOffset=998418, bodyCRC=994843245, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1251, CONSUME_START_TIME=1619580616221, UNIQ_KEY=7F000001162C18B4AAC28D83A82F039C, CLUSTER=DefaultCluster, WAIT=true, TAGS=TagA}, body=[72101108108111328211199107101116778132575052], transactionId='null'}]] 
ConsumeMessageThread_11 Receive New Messages: [MessageExt [brokerName=DESKTOP-Q6M76VI, queueId=3, storeSize=203, queueOffset=1231, sysFlag=0, bornTimestamp=1619580615725, bornHost=/10.232.74.132:55094, storeTimestamp=1619580615726, storeHost=/10.232.74.132:10911, msgId=0AE84A8400002A9F00000000000F38E6, commitLogOffset=997606, bodyCRC=1008852596, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1251, CONSUME_START_TIME=1619580616221, UNIQ_KEY=7F000001162C18B4AAC28D83A82D0398, CLUSTER=DefaultCluster, WAIT=true, TAGS=TagA}, body=[72101108108111328211199107101116778132575048], transactionId='null'}]] 
ConsumeMessageThread_1 Receive New Messages: [MessageExt [brokerName=DESKTOP-Q6M76VI, queueId=3, storeSize=203, queueOffset=1230, sysFlag=0, bornTimestamp=1619580615723, bornHost=/10.232.74.132:55094, storeTimestamp=1619580615724, storeHost=/10.232.74.132:10911, msgId=0AE84A8400002A9F00000000000F35BA, commitLogOffset=996794, bodyCRC=2121214082, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1251, CONSUME_START_TIME=1619580616221, UNIQ_KEY=7F000001162C18B4AAC28D83A82B0394, CLUSTER=DefaultCluster, WAIT=true, TAGS=TagA}, body=[72101108108111328211199107101116778132574954], transactionId='null'}]] 
...

一个生产,一个消费,清晰明了,体验很好。

嗯,至此为止,quick-start 就基本通了,也就知道这玩意的最简单的用法了。


看看整体架构


此时已经上手体验了一把用法,来总结下。

1. 我启动了一个 namesrv,暴露的端口是 9876。
2. 我又启动了一个 broker,启动时加了个参数 -n localhost:9876,很明显,就是指向了刚刚那个 namesrv。
3. 然后我启动 produer 发了一堆消息。
4. 最后我启动了 consumer,就神奇地收到了这个消息。

启动 producer 和 consumer 时都必须使用一个环境变量叫 NAMESRV_ADDR=localhost:9876

OK 了,此时我已经有了合理的猜测,producer 和 consumer 都是通过环境变量连接了 9876 这个端口,也就是 namesrv,然后通过这个 namesrv 能找到 broker,那我可以简单画出这样的架构图(这是我猜的,不一定对)。

嗯,有了这个自己的猜测,我去看了官方文档中的架构部分:

https://github.com/apache/rocketmq/blob/master/docs/cn/architecture.md

上来就是一张图。

哇塞,大差不差,只是官方文档的图是考虑到了集群情况,其实我反倒觉得开始的图不应该整合太多内容。

紧接着,下面就解释了这四个东西都是啥,我简写下。

Producer:消息发布的角色,选择相应的 Broker 集群队列进行消息投递。
Consumer:消息消费的角色,支持以 push 推,pull 拉两种模式对消息进行消费。
NameServer:一个非常简单的 Topic 路由注册中心,支持 Broker 的动态注册与发现。
BrokerServer:Broker 主要负责消息的存储、投递和查询,包括以下几个子模块:

Remoting Module:整个 Broker 的实体,负责处理来自 clients 端的请求。

Client Manager:负责管理客户端(Producer/Consumer)和维护 Consumer 的 Topic 订阅信息。

Store Service:提供方便简单的 API 接口处理消息存储到物理硬盘和查询功能。

HA Service:高可用服务,提供 Master Broker 和 Slave Broker 之间的数据同步功能。

Index Service:根据特定的 Message key 对投递到 Broker 的消息进行索引服务,以提供消息的快速查询。

嗯,又有了些新概念,BrokerServer 还分成五个子模块,看着有些蒙,先不管。

再往下看,到了部署架构,有了部署步骤。

嗯,跟我们刚刚 quick-start 部分的部署顺序一样,只不过步骤三的创建 Topic,可以提前创建,也可以在发送消息时自动创建,我们刚刚用的应该就是发消息时自动创建啦!这里再拿个小本本记下来,如何提前创建 Topic,OK 不去管它。

再往下,就是最最最最重要的部分了,也就是设计原理

地址是这个:

https://github.com/apache/rocketmq/blob/master/docs/cn/design.md

这也就是我们研究 rocketmq 的原理,阅读源码,最需要看的部分,当然,也包括面试,滋滋滋。

设计中包含六个部分,分别是消息存储、通信机制、消息过滤、负载均衡、事务消息、消息查询

比如消息存储,是整个设计部分的第一个版块,上来就是一张劝退图。

下面配上了文字讲解。

之前的 quick-start 和整体架构的描述,可以被快速地理解,到这就不行了,就真得花时间开始细琢磨了。

但实际上呀,仔细看消息存储版块里面的内容,包含三个部分:

1.1 消息存储整体架构
1.2 页缓存与内存映射
1.3 消息刷盘

这里面包括 IO 模型,内存映射,磁盘顺序读写,PageCache,同步异步刷盘等通用的底层知识,如果这些都统统掌握,整个这一部分就跟拼积木一样,很顺利拼起来了。

这回知道底层知识有啥用了吧?起码能让你更深入和快速理解一个由它们拼起来的上层技术。

这块就没法完全展开啦,不然就成一篇讲 rocketmq 原理的文章了,我拿出这里的一句话。

另外,RocketMQ 主要通过 MappedByteBuffer 对文件进行读写操作。其中,利用了 NIO 中的 FileChannel 模型将磁盘上的物理文件直接映射到用户态的内存地址中(这种 Mmap 的方式减少了传统 IO 将磁盘文件数据在操作系统内核地址空间的缓冲区和用户应用程序地址空间的缓冲区之间来回进行拷贝的性能开销),将对文件的操作转化为直接对内存地址进行操作,从而极大地提高了文件的读写效率(正因为需要使用内存映射机制,故 RocketMQ 的文件存储都使用定长结构来存储,方便一次将整个文件映射至内存)。

这段话如果你对零拷贝这个底层概念了解,其实整段话一秒钟就看完了,而且会觉得它是废话,就像是在凑字数一样。(当然这个我做不到哈,我对底层还没有了解很透彻,所以用了 10 秒,理解到了一个马马虎虎的程度)


见仁见智


再接下来就是见仁见智的部分啦,如果是已经有了非常多技术深入学习经验的大牛,直接根据设计文档,再对照源码即可,对大牛来说是最快的方式。
如果不是的话,可以先找一些市面上比较经典的入门书籍,或者质量高的视频教程,过一遍,然后再配合设计文档和源码,基本就能把这个技术吃透了。
学多了之后你会发现,好多底层的技术或中间件,和我们应用层一样,也是有套路的,也是拼积木拼起来的。所以今后在你想学一门新技术时,别想着学它有没有用,面试会不会考,起码你看多了之后会发现,越新技术越学越快,学到最后你会发现根本就没有新技术,一切都是在拼积木。
好了不装逼了,我是第二类人群,已经卡在设计文档那里,所以我现在去找视频看了,有推荐的可以留言区评论下。
同时欢迎加我好友看我朋友圈的一惊一乍,就在公众号菜单栏里可以找到哦。
浏览 35
点赞
评论
收藏
分享

手机扫一扫分享

举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

举报