RocketMQ(四):生产者消息数据写入实现细节

JAVA烂猪皮

共 57529字,需浏览 116分钟

 ·

2021-02-02 08:28

走过路过不要错过

点击蓝字关注我们


producer 在消息send之后,其实就是调用了 broker 对应的api,要想了解消息的具体写入过程就得细看broker。今天我们就来看看 broker是如何进行消息的存储的!(消息消费另说)

broker启动起来之后,就可以接收客户端的生产消费请求了!

1:broker与客户端的通信: broker 服务端端口的暴露

// org.apache.rocketmq.remoting.netty.NettyRemotingServer#start    @Override    public void start() {        this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(            nettyServerConfig.getServerWorkerThreads(),            new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override public Thread newThread(Runnable r) { return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet()); } }); // 创建共享的 handler, 如 serverHandler prepareSharableHandlers();
ServerBootstrap childHandler = this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector) .channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 1024) .option(ChannelOption.SO_REUSEADDR, true) .option(ChannelOption.SO_KEEPALIVE, false) .childOption(ChannelOption.TCP_NODELAY, true) .childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize()) .childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize()) .localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort())) .childHandler(new ChannelInitializer() { @Override public void initChannel(SocketChannel ch) throws Exception { // 标准 netty 服务接入,注册系列 handler // 编解码器,空闲管理,连接管理,业务处理处理器 ch.pipeline() .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler) .addLast(defaultEventExecutorGroup, encoder, new NettyDecoder(), new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()), connectionManageHandler, // 最重要的是 serverHandler serverHandler ); } });
if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) { childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT); }
try { ChannelFuture sync = this.serverBootstrap.bind().sync(); InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress(); this.port = addr.getPort(); } catch (InterruptedException e1) { throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1); }
if (this.channelEventListener != null) { this.nettyEventExecutor.start(); } // 超时定时扫描 this.timer.scheduleAtFixedRate(new TimerTask() {
@Override public void run() { try { NettyRemotingServer.this.scanResponseTable(); } catch (Throwable e) { log.error("scanResponseTable exception", e); } } }, 1000 * 3, 1000); }

可以看出,最重要的处理器是 serverHandler, 它是直接被 new 出来的一个内部类, 在 serverBootstrap 启动之前创建!

 private void prepareSharableHandlers() {        handshakeHandler = new HandshakeHandler(TlsSystemConfig.tlsMode);        encoder = new NettyEncoder();        connectionManageHandler = new NettyConnectManageHandler();        serverHandler = new NettyServerHandler();    }    // org.apache.rocketmq.remoting.netty.NettyRemotingServer.NettyServerHandler    @ChannelHandler.Sharable    class NettyServerHandler extends SimpleChannelInboundHandler {
@Override protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception { // 主要业务处理即由 processMessageReceived 处理 processMessageReceived(ctx, msg); } } // /** * Entry of incoming command processing. * *

* Note: * The incoming remoting command may be *

    *
  • An inquiry request from a remote peer component;
  • *
  • A response to a previous request issued by this very participant.
  • *
*

* * @param ctx Channel handler context. * @param msg incoming remoting command. * @throws Exception if there were any error while processing the incoming command. */ public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception { final RemotingCommand cmd = msg; if (cmd != null) { switch (cmd.getType()) { case REQUEST_COMMAND: // 客户端请求为 REQUEST_COMMAND processRequestCommand(ctx, cmd); break; case RESPONSE_COMMAND: processResponseCommand(ctx, cmd); break; default: break; } } }
// org.apache.rocketmq.remoting.netty.NettyRemotingAbstract#processRequestCommand /** * Process incoming request command issued by remote peer. * * @param ctx channel handler context. * @param cmd request command. */ public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) { final Pair matched = this.processorTable.get(cmd.getCode()); final Pair pair = null == matched ? this.defaultRequestProcessor : matched; // opaque 相当于是一个请求id, 用于找到对应的请求和响应 final int opaque = cmd.getOpaque();
if (pair != null) { Runnable run = new Runnable() { @Override public void run() { try { // rpc 钩子处理 doBeforeRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd); // 此处有许多的 processor, 而处理 producer 请求的是 SendMessageProcessor final RemotingCommand response = pair.getObject1().processRequest(ctx, cmd); doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response);
if (!cmd.isOnewayRPC()) { if (response != null) { response.setOpaque(opaque); response.markResponseType(); try { ctx.writeAndFlush(response); } catch (Throwable e) { log.error("process request over, but response failed", e); log.error(cmd.toString()); log.error(response.toString()); } } else {
} } } catch (Throwable e) { log.error("process request exception", e); log.error(cmd.toString());
if (!cmd.isOnewayRPC()) { final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR, RemotingHelper.exceptionSimpleDesc(e)); response.setOpaque(opaque); ctx.writeAndFlush(response); } } } };
if (pair.getObject1().rejectRequest()) { final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY, "[REJECTREQUEST]system busy, start flow control for a while"); response.setOpaque(opaque); ctx.writeAndFlush(response); return; }
try { final RequestTask requestTask = new RequestTask(run, ctx.channel(), cmd); // 将请求提交到 对应的线程池中,然后返回 pair.getObject2().submit(requestTask); } catch (RejectedExecutionException e) { if ((System.currentTimeMillis() % 10000) == 0) { log.warn(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) + ", too many requests and system thread pool busy, RejectedExecutionException " + pair.getObject2().toString() + " request code: " + cmd.getCode()); }
if (!cmd.isOnewayRPC()) { final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY, "[OVERLOAD]system busy, start flow control for a while"); response.setOpaque(opaque); ctx.writeAndFlush(response); } } } else { String error = " request type " + cmd.getCode() + " not supported"; final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED, error); response.setOpaque(opaque); ctx.writeAndFlush(response); log.error(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) + error); } }


当接到外部消息后,经过初步判断进行简单封装,更多的处理放入到下游的线程池中进行处理。

标准的 netty 服务处理流程: 编解码器 -> 空闲管理 -> 连接管理 -> 业务处理处理器

2. 消息的具体写入框架逻辑


消息写入由 SendMessageProcessor 进行管理。

// org.apache.rocketmq.broker.processor.SendMessageProcessor#processRequest    @Override    public RemotingCommand processRequest(ChannelHandlerContext ctx,                                          RemotingCommand request) throws RemotingCommandException {        SendMessageContext mqtraceContext;        switch (request.getCode()) {            case RequestCode.CONSUMER_SEND_MSG_BACK:                return this.consumerSendMsgBack(ctx, request);            default:                // 获取 header                SendMessageRequestHeader requestHeader = parseRequestHeader(request);                if (requestHeader == null) {                    return null;                }
mqtraceContext = buildMsgContext(ctx, requestHeader); // 写入钩子判定 this.executeSendMessageHookBefore(ctx, request, mqtraceContext);
RemotingCommand response; if (requestHeader.isBatch()) { response = this.sendBatchMessage(ctx, request, mqtraceContext, requestHeader); } else { // 普通写入消息 response = this.sendMessage(ctx, request, mqtraceContext, requestHeader); }
this.executeSendMessageHookAfter(response, mqtraceContext); return response; } } // org.apache.rocketmq.broker.processor.AbstractSendMessageProcessor#parseRequestHeader protected SendMessageRequestHeader parseRequestHeader(RemotingCommand request) throws RemotingCommandException {
SendMessageRequestHeaderV2 requestHeaderV2 = null; SendMessageRequestHeader requestHeader = null; switch (request.getCode()) { case RequestCode.SEND_BATCH_MESSAGE: case RequestCode.SEND_MESSAGE_V2: requestHeaderV2 = (SendMessageRequestHeaderV2) request .decodeCommandCustomHeader(SendMessageRequestHeaderV2.class); case RequestCode.SEND_MESSAGE: if (null == requestHeaderV2) { requestHeader = (SendMessageRequestHeader) request .decodeCommandCustomHeader(SendMessageRequestHeader.class); } else { requestHeader = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV1(requestHeaderV2); } default: break; } return requestHeader; }
// 消息发送逻辑 // org.apache.rocketmq.broker.processor.SendMessageProcessor#sendMessage private RemotingCommand sendMessage(final ChannelHandlerContext ctx, final RemotingCommand request, final SendMessageContext sendMessageContext, final SendMessageRequestHeader requestHeader) throws RemotingCommandException { // 响应类 final RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class); final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader)response.readCustomHeader();
response.setOpaque(request.getOpaque());
response.addExtField(MessageConst.PROPERTY_MSG_REGION, this.brokerController.getBrokerConfig().getRegionId()); response.addExtField(MessageConst.PROPERTY_TRACE_SWITCH, String.valueOf(this.brokerController.getBrokerConfig().isTraceOn()));
log.debug("receive SendMessage request command, {}", request); // 时间检查 final long startTimstamp = this.brokerController.getBrokerConfig().getStartAcceptSendRequestTimeStamp(); if (this.brokerController.getMessageStore().now() < startTimstamp) { response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark(String.format("broker unable to service, until %s", UtilAll.timeMillisToHumanString2(startTimstamp))); return response; }
response.setCode(-1); super.msgCheck(ctx, requestHeader, response); if (response.getCode() != -1) { return response; }
final byte[] body = request.getBody();
int queueIdInt = requestHeader.getQueueId(); TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
if (queueIdInt < 0) { queueIdInt = Math.abs(this.random.nextInt() % 99999999) % topicConfig.getWriteQueueNums(); }
MessageExtBrokerInner msgInner = new MessageExtBrokerInner(); msgInner.setTopic(requestHeader.getTopic()); msgInner.setQueueId(queueIdInt);
if (!handleRetryAndDLQ(requestHeader, response, request, msgInner, topicConfig)) { return response; }
msgInner.setBody(body); msgInner.setFlag(requestHeader.getFlag()); MessageAccessor.setProperties(msgInner, MessageDecoder.string2messageProperties(requestHeader.getProperties())); msgInner.setPropertiesString(requestHeader.getProperties()); msgInner.setBornTimestamp(requestHeader.getBornTimestamp()); msgInner.setBornHost(ctx.channel().remoteAddress()); msgInner.setStoreHost(this.getStoreHost()); msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes()); PutMessageResult putMessageResult = null; Map oriProps = MessageDecoder.string2messageProperties(requestHeader.getProperties()); String traFlag = oriProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED); if (traFlag != null && Boolean.parseBoolean(traFlag)) { if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) { response.setCode(ResponseCode.NO_PERMISSION); response.setRemark( "the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1() + "] sending transaction message is forbidden"); return response; } putMessageResult = this.brokerController.getTransactionalMessageService().prepareMessage(msgInner); } else { // 将消息放入 messagestore 中 putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner); } // 处理写入结果 return handlePutMessageResult(putMessageResult, response, request, msgInner, responseHeader, sendMessageContext, ctx, queueIdInt);
}

如上,由 SendMessageProcessor 进行总控消息的写入。主要分这么几步:

    1. 解析消息头 SendMessageRequestHeader;
    2. 从消息头中判定出处理流程,如是针对单个消息写入还是批量消息的处理,转到处理逻辑;
    3. 针对单条消息的写入,进来先判断是否处理超时,如果超时就不再处理了;
    4. 写入消息,不管成功失败;
    5. 处理写入结果,判定成功或失败;

其中写入消息还是调用内部的逻辑处理,当然只是为了进一步调用 commitLog, 进行真正的存入。

// org.apache.rocketmq.store.DefaultMessageStore#putMessage    public PutMessageResult putMessage(MessageExtBrokerInner msg) {        if (this.shutdown) {            log.warn("message store has shutdown, so putMessage is forbidden");            return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);        }        // SLAVE 不可写入数据         if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {            long value = this.printTimes.getAndIncrement();            if ((value % 50000) == 0) {                log.warn("message store is slave mode, so putMessage is forbidden ");            }
return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null); }
if (!this.runningFlags.isWriteable()) { long value = this.printTimes.getAndIncrement(); if ((value % 50000) == 0) { log.warn("message store is not writeable, so putMessage is forbidden " + this.runningFlags.getFlagBits()); }
return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null); } else { this.printTimes.set(0); }
if (msg.getTopic().length() > Byte.MAX_VALUE) { log.warn("putMessage message topic length too long " + msg.getTopic().length()); return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null); }
if (msg.getPropertiesString() != null && msg.getPropertiesString().length() > Short.MAX_VALUE) { log.warn("putMessage message properties length too long " + msg.getPropertiesString().length()); return new PutMessageResult(PutMessageStatus.PROPERTIES_SIZE_EXCEEDED, null); }
if (this.isOSPageCacheBusy()) { return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, null); }
long beginTime = this.getSystemClock().now(); // 放入 commitLog 中 PutMessageResult result = this.commitLog.putMessage(msg);
long elapsedTime = this.getSystemClock().now() - beginTime; if (elapsedTime > 500) { log.warn("putMessage not in lock elapsed time(ms)={}, bodyLength={}", elapsedTime, msg.getBody().length); } this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime);
if (null == result || !result.isOk()) { this.storeStatsService.getPutMessageFailedTimes().incrementAndGet(); }
return result; }

3. CommitLog 如何写入数据?

CommitLog 会执行真正的写入数据逻辑,主要借助 MappedFileQueue 和 MappedFile。

  // org.apache.rocketmq.store.CommitLog#putMessage    public PutMessageResult putMessage(final MessageExtBrokerInner msg) {        // Set the storage time        msg.setStoreTimestamp(System.currentTimeMillis());        // Set the message body BODY CRC (consider the most appropriate setting        // on the client)        msg.setBodyCRC(UtilAll.crc32(msg.getBody()));        // Back to Results        AppendMessageResult result = null;
StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();
String topic = msg.getTopic(); int queueId = msg.getQueueId();
final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag()); if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) { // Delay Delivery if (msg.getDelayTimeLevel() > 0) { if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) { msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()); }
topic = ScheduleMessageService.SCHEDULE_TOPIC; queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
// Backup real topic, queueId MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic()); MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId())); msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
msg.setTopic(topic); msg.setQueueId(queueId); } }
long elapsedTimeInLock = 0; MappedFile unlockMappedFile = null; // 获取最后一个 mappedFile, 写入数据 MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(); // 上锁写入 putMessageLock.lock(); //spin or ReentrantLock ,depending on store config try { long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now(); this.beginTimeInLock = beginLockTimestamp;
// Here settings are stored timestamp, in order to ensure an orderly // global msg.setStoreTimestamp(beginLockTimestamp);
if (null == mappedFile || mappedFile.isFull()) { mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise } if (null == mappedFile) { log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString()); beginTimeInLock = 0; return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null); } // 向 mappedFile 中添加数据 result = mappedFile.appendMessage(msg, this.appendMessageCallback); switch (result.getStatus()) { case PUT_OK: break; case END_OF_FILE: unlockMappedFile = mappedFile; // Create a new file, re-write the message mappedFile = this.mappedFileQueue.getLastMappedFile(0); if (null == mappedFile) { // XXX: warn and notify me log.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString()); beginTimeInLock = 0; return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result); } result = mappedFile.appendMessage(msg, this.appendMessageCallback); break; case MESSAGE_SIZE_EXCEEDED: case PROPERTIES_SIZE_EXCEEDED: beginTimeInLock = 0; return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result); case UNKNOWN_ERROR: beginTimeInLock = 0; return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result); default: beginTimeInLock = 0; return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result); }
elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp; beginTimeInLock = 0; } finally { putMessageLock.unlock(); }
if (elapsedTimeInLock > 500) { log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", elapsedTimeInLock, msg.getBody().length, result); }
if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) { this.defaultMessageStore.unlockMappedFile(unlockMappedFile); }
PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);
// Statistics storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).incrementAndGet(); storeStatsService.getSinglePutMessageTopicSizeTotal(topic).addAndGet(result.getWroteBytes()); // 通知进行刷盘操作 handleDiskFlush(result, putMessageResult, msg); // HA 处理,在要求同步刷盘时,要求 SLAVE 也写入数据,才算成功 handleHA(result, putMessageResult, msg);
return putMessageResult; }
// org.apache.rocketmq.store.MappedFile#appendMessage(org.apache.rocketmq.store.MessageExtBrokerInner, org.apache.rocketmq.store.AppendMessageCallback) public AppendMessageResult appendMessage(final MessageExtBrokerInner msg, final AppendMessageCallback cb) { return appendMessagesInner(msg, cb); } // org.apache.rocketmq.store.MappedFile#appendMessagesInner public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb) { assert messageExt != null; assert cb != null; // 写入位置 int currentPos = this.wrotePosition.get();
if (currentPos < this.fileSize) { ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice(); byteBuffer.position(currentPos); AppendMessageResult result; if (messageExt instanceof MessageExtBrokerInner) { result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBrokerInner) messageExt); } else if (messageExt instanceof MessageExtBatch) { result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBatch) messageExt); } else { return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR); } this.wrotePosition.addAndGet(result.getWroteBytes()); this.storeTimestamp = result.getStoreTimestamp(); return result; } log.error("MappedFile.appendMessage return null, wrotePosition: {} fileSize: {}", currentPos, this.fileSize); return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR); } // org.apache.rocketmq.store.CommitLog.DefaultAppendMessageCallback#doAppend public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank, final MessageExtBrokerInner msgInner) { // STORETIMESTAMP + STOREHOSTADDRESS + OFFSET

// PHY OFFSET long wroteOffset = fileFromOffset + byteBuffer.position();
this.resetByteBuffer(hostHolder, 8); String msgId = MessageDecoder.createMessageId(this.msgIdMemory, msgInner.getStoreHostBytes(hostHolder), wroteOffset);
// Record ConsumeQueue information keyBuilder.setLength(0); keyBuilder.append(msgInner.getTopic()); keyBuilder.append('-'); keyBuilder.append(msgInner.getQueueId()); String key = keyBuilder.toString(); Long queueOffset = CommitLog.this.topicQueueTable.get(key); if (null == queueOffset) { queueOffset = 0L; CommitLog.this.topicQueueTable.put(key, queueOffset); }
// Transaction messages that require special handling final int tranType = MessageSysFlag.getTransactionValue(msgInner.getSysFlag()); switch (tranType) { // Prepared and Rollback message is not consumed, will not enter the // consumer queuec case MessageSysFlag.TRANSACTION_PREPARED_TYPE: case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: queueOffset = 0L; break; case MessageSysFlag.TRANSACTION_NOT_TYPE: case MessageSysFlag.TRANSACTION_COMMIT_TYPE: default: break; }
/** * Serialize message */ final byte[] propertiesData = msgInner.getPropertiesString() == null ? null : msgInner.getPropertiesString().getBytes(MessageDecoder.CHARSET_UTF8);
final int propertiesLength = propertiesData == null ? 0 : propertiesData.length;
if (propertiesLength > Short.MAX_VALUE) { log.warn("putMessage message properties length too long. length={}", propertiesData.length); return new AppendMessageResult(AppendMessageStatus.PROPERTIES_SIZE_EXCEEDED); }
final byte[] topicData = msgInner.getTopic().getBytes(MessageDecoder.CHARSET_UTF8); final int topicLength = topicData.length;
final int bodyLength = msgInner.getBody() == null ? 0 : msgInner.getBody().length;
final int msgLen = calMsgLength(bodyLength, topicLength, propertiesLength);
// Exceeds the maximum message if (msgLen > this.maxMessageSize) { CommitLog.log.warn("message size exceeded, msg total size: " + msgLen + ", msg body size: " + bodyLength + ", maxMessageSize: " + this.maxMessageSize); return new AppendMessageResult(AppendMessageStatus.MESSAGE_SIZE_EXCEEDED); }
// Determines whether there is sufficient free space if ((msgLen + END_FILE_MIN_BLANK_LENGTH) > maxBlank) { this.resetByteBuffer(this.msgStoreItemMemory, maxBlank); // 1 TOTALSIZE this.msgStoreItemMemory.putInt(maxBlank); // 2 MAGICCODE this.msgStoreItemMemory.putInt(CommitLog.BLANK_MAGIC_CODE); // 3 The remaining space may be any value // Here the length of the specially set maxBlank final long beginTimeMills = CommitLog.this.defaultMessageStore.now(); byteBuffer.put(this.msgStoreItemMemory.array(), 0, maxBlank); return new AppendMessageResult(AppendMessageStatus.END_OF_FILE, wroteOffset, maxBlank, msgId, msgInner.getStoreTimestamp(), queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills); }
// 依次写入协议数据 // Initialization of storage space this.resetByteBuffer(msgStoreItemMemory, msgLen); // 1 TOTALSIZE this.msgStoreItemMemory.putInt(msgLen); // 2 MAGICCODE this.msgStoreItemMemory.putInt(CommitLog.MESSAGE_MAGIC_CODE); // 3 BODYCRC this.msgStoreItemMemory.putInt(msgInner.getBodyCRC()); // 4 QUEUEID this.msgStoreItemMemory.putInt(msgInner.getQueueId()); // 5 FLAG this.msgStoreItemMemory.putInt(msgInner.getFlag()); // 6 QUEUEOFFSET this.msgStoreItemMemory.putLong(queueOffset); // 7 PHYSICALOFFSET this.msgStoreItemMemory.putLong(fileFromOffset + byteBuffer.position()); // 8 SYSFLAG this.msgStoreItemMemory.putInt(msgInner.getSysFlag()); // 9 BORNTIMESTAMP this.msgStoreItemMemory.putLong(msgInner.getBornTimestamp()); // 10 BORNHOST this.resetByteBuffer(hostHolder, 8); this.msgStoreItemMemory.put(msgInner.getBornHostBytes(hostHolder)); // 11 STORETIMESTAMP this.msgStoreItemMemory.putLong(msgInner.getStoreTimestamp()); // 12 STOREHOSTADDRESS this.resetByteBuffer(hostHolder, 8); this.msgStoreItemMemory.put(msgInner.getStoreHostBytes(hostHolder)); //this.msgBatchMemory.put(msgInner.getStoreHostBytes()); // 13 RECONSUMETIMES this.msgStoreItemMemory.putInt(msgInner.getReconsumeTimes()); // 14 Prepared Transaction Offset this.msgStoreItemMemory.putLong(msgInner.getPreparedTransactionOffset()); // 15 BODY this.msgStoreItemMemory.putInt(bodyLength); if (bodyLength > 0) this.msgStoreItemMemory.put(msgInner.getBody()); // 16 TOPIC this.msgStoreItemMemory.put((byte) topicLength); this.msgStoreItemMemory.put(topicData); // 17 PROPERTIES this.msgStoreItemMemory.putShort((short) propertiesLength); if (propertiesLength > 0) this.msgStoreItemMemory.put(propertiesData);
final long beginTimeMills = CommitLog.this.defaultMessageStore.now(); // Write messages to the queue buffer byteBuffer.put(this.msgStoreItemMemory.array(), 0, msgLen);
AppendMessageResult result = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, msgLen, msgId, msgInner.getStoreTimestamp(), queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
switch (tranType) { case MessageSysFlag.TRANSACTION_PREPARED_TYPE: case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: break; case MessageSysFlag.TRANSACTION_NOT_TYPE: case MessageSysFlag.TRANSACTION_COMMIT_TYPE: // The next update ConsumeQueue information CommitLog.this.topicQueueTable.put(key, ++queueOffset); break; default: break; } return result; } // org.apache.rocketmq.store.CommitLog#handleDiskFlush public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) { // Synchronization flush if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) { final GroupCommitService service = (GroupCommitService) this.flushCommitLogService; if (messageExt.isWaitStoreMsgOK()) { GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes()); service.putRequest(request); boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout()); if (!flushOK) { log.error("do groupcommit, wait for flush failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags() + " client address: " + messageExt.getBornHostString()); putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT); } } else { service.wakeup(); } } // Asynchronous flush else { if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) { // 唤醒刷盘服务,进行异步刷盘 flushCommitLogService.wakeup(); } else { commitLogService.wakeup(); } } }
// 处理写入数据的结果 // org.apache.rocketmq.broker.processor.SendMessageProcessor#handlePutMessageResult private RemotingCommand handlePutMessageResult(PutMessageResult putMessageResult, RemotingCommand response, RemotingCommand request, MessageExt msg, SendMessageResponseHeader responseHeader, SendMessageContext sendMessageContext, ChannelHandlerContext ctx, int queueIdInt) { if (putMessageResult == null) { response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark("store putMessage return null"); return response; } boolean sendOK = false;
switch (putMessageResult.getPutMessageStatus()) { // Success case PUT_OK: sendOK = true; response.setCode(ResponseCode.SUCCESS); break; case FLUSH_DISK_TIMEOUT: response.setCode(ResponseCode.FLUSH_DISK_TIMEOUT); sendOK = true; break; case FLUSH_SLAVE_TIMEOUT: response.setCode(ResponseCode.FLUSH_SLAVE_TIMEOUT); sendOK = true; break; case SLAVE_NOT_AVAILABLE: response.setCode(ResponseCode.SLAVE_NOT_AVAILABLE); sendOK = true; break;
// Failed case CREATE_MAPEDFILE_FAILED: response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark("create mapped file failed, server is busy or broken."); break; case MESSAGE_ILLEGAL: case PROPERTIES_SIZE_EXCEEDED: response.setCode(ResponseCode.MESSAGE_ILLEGAL); response.setRemark( "the message is illegal, maybe msg body or properties length not matched. msg body length limit 128k, msg properties length limit 32k."); break; case SERVICE_NOT_AVAILABLE: response.setCode(ResponseCode.SERVICE_NOT_AVAILABLE); response.setRemark( "service not available now, maybe disk full, " + diskUtil() + ", maybe your broker machine memory too small."); break; case OS_PAGECACHE_BUSY: response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark("[PC_SYNCHRONIZED]broker busy, start flow control for a while"); break; case UNKNOWN_ERROR: response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark("UNKNOWN_ERROR"); break; default: response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark("UNKNOWN_ERROR DEFAULT"); break; }
String owner = request.getExtFields().get(BrokerStatsManager.COMMERCIAL_OWNER); // 写入成功,则直接响应 if (sendOK) { // 状态统计 this.brokerController.getBrokerStatsManager().incTopicPutNums(msg.getTopic(), putMessageResult.getAppendMessageResult().getMsgNum(), 1); this.brokerController.getBrokerStatsManager().incTopicPutSize(msg.getTopic(), putMessageResult.getAppendMessageResult().getWroteBytes()); this.brokerController.getBrokerStatsManager().incBrokerPutNums(putMessageResult.getAppendMessageResult().getMsgNum());
response.setRemark(null);
responseHeader.setMsgId(putMessageResult.getAppendMessageResult().getMsgId()); responseHeader.setQueueId(queueIdInt); responseHeader.setQueueOffset(putMessageResult.getAppendMessageResult().getLogicsOffset()); // 直接向客户端输出结果 doResponse(ctx, request, response);
if (hasSendMessageHook()) { sendMessageContext.setMsgId(responseHeader.getMsgId()); sendMessageContext.setQueueId(responseHeader.getQueueId()); sendMessageContext.setQueueOffset(responseHeader.getQueueOffset());
int commercialBaseCount = brokerController.getBrokerConfig().getCommercialBaseCount(); int wroteSize = putMessageResult.getAppendMessageResult().getWroteBytes(); int incValue = (int)Math.ceil(wroteSize / BrokerStatsManager.SIZE_PER_COUNT) * commercialBaseCount;
sendMessageContext.setCommercialSendStats(BrokerStatsManager.StatsType.SEND_SUCCESS); sendMessageContext.setCommercialSendTimes(incValue); sendMessageContext.setCommercialSendSize(wroteSize); sendMessageContext.setCommercialOwner(owner); } return null; } else { if (hasSendMessageHook()) { int wroteSize = request.getBody().length; int incValue = (int)Math.ceil(wroteSize / BrokerStatsManager.SIZE_PER_COUNT);
sendMessageContext.setCommercialSendStats(BrokerStatsManager.StatsType.SEND_FAILURE); sendMessageContext.setCommercialSendTimes(incValue); sendMessageContext.setCommercialSendSize(wroteSize); sendMessageContext.setCommercialOwner(owner); } } return response; }

4. 刷盘的实现

前面的put操作只是将数据写入到mappedByteBuffer中,还没有进行真正的磁盘写入,所以需要进行刷盘。

刷盘动作分为同步刷盘和异常刷盘,同步可以保证写入的及时性及可靠性,但是性能会有比较大的影响。异步刷盘:能够充分利用OS的PageCache的优势,只要消息写入PageCache即可将成功的ACK返回给Producer端。消息刷盘采用后台异步线程提交的方式进行,降低了读写延迟,提高了MQ的性能和吞吐量。

同步刷盘由 GroupCommitRequest 进行处理,而异步刷盘则是由 CommitLog$FlushRealTimeService/CommitLog$CommitRealTimeService/CommitLog$GroupCommitService 进行处理。源码解释如下:

 // org.apache.rocketmq.store.CommitLog#handleDiskFlush    public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {        // Synchronization flush        if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {            final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;            if (messageExt.isWaitStoreMsgOK()) {                // 初始化要写入的数据偏移                GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());                // 将要刷盘的数据放入队列中                service.putRequest(request);                // 等待刷盘结果                boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());                if (!flushOK) {                    log.error("do groupcommit, wait for flush failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags()                        + " client address: " + messageExt.getBornHostString());                    putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);                }            } else {                service.wakeup();            }        }        // Asynchronous flush        else {            if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {                flushCommitLogService.wakeup();            } else {                commitLogService.wakeup();            }        }    }

4.1 同步刷盘服务

 // org.apache.rocketmq.store.CommitLog.GroupCommitRequest#GroupCommitRequest        public GroupCommitRequest(long nextOffset) {            this.nextOffset = nextOffset;        }        // org.apache.rocketmq.store.CommitLog.GroupCommitService#putRequest        public synchronized void putRequest(final GroupCommitRequest request) {            synchronized (this.requestsWrite) {                this.requestsWrite.add(request);            }            if (hasNotified.compareAndSet(false, true)) {                waitPoint.countDown(); // notify            }        }        // org.apache.rocketmq.store.CommitLog.GroupCommitRequest#waitForFlush        public boolean waitForFlush(long timeout) {            try {                // 提交request后,就一直在此处等待                this.countDownLatch.await(timeout, TimeUnit.MILLISECONDS);                return this.flushOK;            } catch (InterruptedException e) {                log.error("Interrupted", e);                return false;            }        }        //后台会有一个线程一直扫描 request 队列!        // org.apache.rocketmq.store.CommitLog.GroupCommitService#run        public void run() {            CommitLog.log.info(this.getServiceName() + " service started");
while (!this.isStopped()) { try { // 调用父类 ServiceThread 模板方法,覆写 onWaitEnd() 方法 // 此处理休眠等待的同时,也进行了队列的转换,如从 requestsWrite 队列转换数据到 requestsRead 中 this.waitForRunning(10); // 提交 requestsRead 队列 this.doCommit(); } catch (Exception e) { CommitLog.log.warn(this.getServiceName() + " service has exception. ", e); } } // 以下为异常处理流程 // Under normal circumstances shutdown, wait for the arrival of the // request, and then flush try { Thread.sleep(10); } catch (InterruptedException e) { CommitLog.log.warn("GroupCommitService Exception, ", e); }
synchronized (this) { this.swapRequests(); }
this.doCommit();
CommitLog.log.info(this.getServiceName() + " service end"); } // org.apache.rocketmq.common.ServiceThread#waitForRunning protected void waitForRunning(long interval) { if (hasNotified.compareAndSet(true, false)) { // GroupCommitService 覆写,进行队列交换 this.onWaitEnd(); return; }
//entry to wait waitPoint.reset();
try { waitPoint.await(interval, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { log.error("Interrupted", e); } finally { hasNotified.set(false); this.onWaitEnd(); } } // org.apache.rocketmq.store.CommitLog.GroupCommitService#onWaitEnd @Override protected void onWaitEnd() { this.swapRequests(); } private void swapRequests() { // 通过 requestsWrite, 减少 requestsRead 队列上锁竞争机会 // 只有在交换的短瞬间可能存在竞争,它会保证 放入写队列操作不会被 刷写操作阻塞,从而提高性能 List tmp = this.requestsWrite; this.requestsWrite = this.requestsRead; this.requestsRead = tmp; } // doCommit 进行真正的 数据刷盘操作 // org.apache.rocketmq.store.CommitLog.GroupCommitService#doCommit private void doCommit() { // 同步操作,保证线程安全,由于 requestsWrite 与 requestsRead 经常进行交换操作,所以,此处的锁也相当于分段锁,并不会锁全局 synchronized (this.requestsRead) { // 队列为空,则无需刷盘 if (!this.requestsRead.isEmpty()) { for (GroupCommitRequest req : this.requestsRead) { // There may be a message in the next file, so a maximum of // two times the flush // 为防止消息写入后,还有一段数据是被写入到第二个 mapfile中,所以,会再尝试刷写入第二次 boolean flushOK = false; for (int i = 0; i < 2 && !flushOK; i++) { // 此处有两种情况 // 1. 消息未刷入file, !flushOK // 2. 消息写入了file, 但是被分到了两个文件中, 从而 flushedWhere变小, 需要再刷一次 !flushOK // 具体每次刷写多少数据,且看后续分解 flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();
if (!flushOK) { CommitLog.this.mappedFileQueue.flush(0); } }
req.wakeupCustomer(flushOK); }
long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp(); if (storeTimestamp > 0) { CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp); }
this.requestsRead.clear(); } else { // Because of individual messages is set to not sync flush, it // will come to this process CommitLog.this.mappedFileQueue.flush(0); } } } // org.apache.rocketmq.store.MappedFileQueue#flush public boolean flush(final int flushLeastPages) { boolean result = true; MappedFile mappedFile = this.findMappedFileByOffset(this.flushedWhere, this.flushedWhere == 0); if (mappedFile != null) { long tmpTimeStamp = mappedFile.getStoreTimestamp(); // 调用 mappedFile 进行写数据 int offset = mappedFile.flush(flushLeastPages); long where = mappedFile.getFileFromOffset() + offset; result = where == this.flushedWhere; this.flushedWhere = where; if (0 == flushLeastPages) { this.storeTimestamp = tmpTimeStamp; } }
return result; } // org.apache.rocketmq.store.MappedFile#flush /** * @return The current flushed position */ public int flush(final int flushLeastPages) { if (this.isAbleToFlush(flushLeastPages)) { if (this.hold()) { int value = getReadPosition();
try { //We only append data to fileChannel or mappedByteBuffer, never both. // fileChannel 即是最终的文件通道, 调用 force() 方法进行刷盘 if (writeBuffer != null || this.fileChannel.position() != 0) { this.fileChannel.force(false); } else { this.mappedByteBuffer.force(); } } catch (Throwable e) { log.error("Error occurred when force data to disk.", e); }
this.flushedPosition.set(value); this.release(); } else { log.warn("in flush, hold failed, flush offset = " + this.flushedPosition.get()); this.flushedPosition.set(getReadPosition()); } } return this.getFlushedPosition(); }

4.2 异步刷盘

CommitRealTimeService, 服务线程会一直进行commit..


// org.apache.rocketmq.store.CommitLog.CommitRealTimeService#run @Override public void run() { CommitLog.log.info(this.getServiceName() + " service started"); while (!this.isStopped()) { int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitIntervalCommitLog();
int commitDataLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogLeastPages();
int commitDataThoroughInterval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogThoroughInterval();
long begin = System.currentTimeMillis(); if (begin >= (this.lastCommitTimestamp + commitDataThoroughInterval)) { this.lastCommitTimestamp = begin; commitDataLeastPages = 0; }
try { boolean result = CommitLog.this.mappedFileQueue.commit(commitDataLeastPages); long end = System.currentTimeMillis(); if (!result) { this.lastCommitTimestamp = end; // result = false means some data committed. //now wake up flush thread. flushCommitLogService.wakeup(); }
if (end - begin > 500) { log.info("Commit data to file costs {} ms", end - begin); } this.waitForRunning(interval); } catch (Throwable e) { CommitLog.log.error(this.getServiceName() + " service has exception. ", e); } }
boolean result = false; for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) { result = CommitLog.this.mappedFileQueue.commit(0); CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK")); } CommitLog.log.info(this.getServiceName() + " service end"); } } commit操作流程如下: // org.apache.rocketmq.store.MappedFileQueue#commit public boolean commit(final int commitLeastPages) { boolean result = true; MappedFile mappedFile = this.findMappedFileByOffset(this.committedWhere, this.committedWhere == 0); if (mappedFile != null) { int offset = mappedFile.commit(commitLeastPages); long where = mappedFile.getFileFromOffset() + offset; result = where == this.committedWhere; this.committedWhere = where; }
return result; } // org.apache.rocketmq.store.MappedFile#commit public int commit(final int commitLeastPages) { if (writeBuffer == null) { //no need to commit data to file channel, so just regard wrotePosition as committedPosition. return this.wrotePosition.get(); } if (this.isAbleToCommit(commitLeastPages)) { if (this.hold()) { commit0(commitLeastPages); this.release(); } else { log.warn("in commit, hold failed, commit offset = " + this.committedPosition.get()); } }
// All dirty data has been committed to FileChannel. if (writeBuffer != null && this.transientStorePool != null && this.fileSize == this.committedPosition.get()) { this.transientStorePool.returnBuffer(writeBuffer); this.writeBuffer = null; }
return this.committedPosition.get(); } // org.apache.rocketmq.store.MappedFile#commit0 protected void commit0(final int commitLeastPages) { int writePos = this.wrotePosition.get(); int lastCommittedPosition = this.committedPosition.get();
if (writePos - this.committedPosition.get() > 0) { try { ByteBuffer byteBuffer = writeBuffer.slice(); byteBuffer.position(lastCommittedPosition); byteBuffer.limit(writePos); this.fileChannel.position(lastCommittedPosition); this.fileChannel.write(byteBuffer); this.committedPosition.set(writePos); } catch (Throwable e) { log.error("Error occurred when commit data to FileChannel.", e); } } } flush 操作流程如下: // org.apache.rocketmq.store.CommitLog.FlushRealTimeService#run public void run() { CommitLog.log.info(this.getServiceName() + " service started");
while (!this.isStopped()) { boolean flushCommitLogTimed = CommitLog.this.defaultMessageStore.getMessageStoreConfig().isFlushCommitLogTimed();
int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushIntervalCommitLog(); int flushPhysicQueueLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogLeastPages();
int flushPhysicQueueThoroughInterval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogThoroughInterval();
boolean printFlushProgress = false;
// Print flush progress long currentTimeMillis = System.currentTimeMillis(); if (currentTimeMillis >= (this.lastFlushTimestamp + flushPhysicQueueThoroughInterval)) { this.lastFlushTimestamp = currentTimeMillis; flushPhysicQueueLeastPages = 0; printFlushProgress = (printTimes++ % 10) == 0; }
try { // 等待时间间隔 if (flushCommitLogTimed) { Thread.sleep(interval); } else { this.waitForRunning(interval); }
if (printFlushProgress) { this.printFlushProgress(); }
long begin = System.currentTimeMillis(); // 时间到, flush CommitLog.this.mappedFileQueue.flush(flushPhysicQueueLeastPages); long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp(); if (storeTimestamp > 0) { CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp); } long past = System.currentTimeMillis() - begin; if (past > 500) { log.info("Flush data to disk costs {} ms", past); } } catch (Throwable e) { CommitLog.log.warn(this.getServiceName() + " service has exception. ", e); this.printFlushProgress(); } }
// Normal shutdown, to ensure that all the flush before exit boolean result = false; for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) { result = CommitLog.this.mappedFileQueue.flush(0); CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK")); }
this.printFlushProgress();
CommitLog.log.info(this.getServiceName() + " service end"); } // org.apache.rocketmq.store.MappedFileQueue#flush public boolean flush(final int flushLeastPages) { boolean result = true; MappedFile mappedFile = this.findMappedFileByOffset(this.flushedWhere, this.flushedWhere == 0); if (mappedFile != null) { long tmpTimeStamp = mappedFile.getStoreTimestamp(); // 同样调用 mappedFile.flush() 方法进行刷盘 int offset = mappedFile.flush(flushLeastPages); long where = mappedFile.getFileFromOffset() + offset; result = where == this.flushedWhere; this.flushedWhere = where; if (0 == flushLeastPages) { this.storeTimestamp = tmpTimeStamp; } }
return result; }


以上刷盘流程,都是调用 mappedFileQueue 的 commit 或者 flush 方法进行。

  由三个线程完成:

    GroupCommitService, 进行同步请求处理;
    CommitRealTimeService, 进行异步刷盘commit;
    FlushRealTimeService, 同步刷盘服务, 此服务依赖于 broker 配置;

  

整个处理流程时序图可大致归结如下:

  1. 网络接入

2. 数据存储

3. 客户端响应

处理过程还是相对容易理解的。



往期精彩推荐



腾讯、阿里、滴滴后台面试题汇总总结 — (含答案)

面试:史上最全多线程面试题 !

最新阿里内推Java后端面试题

JVM难学?那是因为你没认真看完这篇文章


END


关注作者微信公众号 —《JAVA烂猪皮》


了解更多java后端架构知识以及最新面试宝典


你点的每个好看,我都认真当成了


看完本文记得给作者点赞+在看哦~~~大家的支持,是作者源源不断出文的动力


作者:等你归去来

出处:https://www.cnblogs.com/yougewe/p/12133260.html

浏览 49
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报