RocketMQ 消费端如何监听消息?
创始人
2024-02-22 13:46:32
0

前言

RocketMQ消息消费者是如何启动的,有一个步骤是非常重要的,就是启动消息的监听,通过不断的拉取消息,来实现消息的监听,

那具体怎么做,让我们我们跟着源码来学习一下~

流程地图

源码跟踪

这一块的代码比较多,我自己对关键点的一些整理,这个图我画的不是很OK

核心模块(消息拉取)

入口:this.pullMessageService.start();

  1. 执行线程池run方法,轮流从pullRequestQueue中获取PullRequest

org.apache.rocketmq.client.impl.consumer.PullMessageService#run

声明一个阻塞队列用来存放 PullRequest 对象

PullRequest 用于消息拉取任务,如果 pullRequestQueue 为空则会阻塞,直到拉取任务被放入

private final LinkedBlockingQueue pullRequestQueue = new LinkedBlockingQueue();
复制代码

将 stopped 用volatile来修饰,每次执行的时候都检测stopped的状态,线程只要修改了这个状态,其余线程就会马上知道

protected volatile boolean stopped = false;
@Override
public void run() {log.info(this.getServiceName() + " service started");// 判断启动状态while (!this.isStopped()) {try {// 取出一个PullRequest对象PullRequest pullRequest = this.pullRequestQueue.take();this.pullMessage(pullRequest);} catch (InterruptedException ignored) {} catch (Exception e) {log.error("Pull Message Service Run Method exception", e);}}log.info(this.getServiceName() + " service end");
}
复制代码
  1. 获取消费队列快照,判断状态是否正常,同时更新最后一次拉取时间

PullMessageService 从消息服务器默认拉取32条消息,按消息的偏移量顺序存放在 ProcessQueue 队列

final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());
复制代码

入口:org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#pullMessage

// 获取消费队列快照
final ProcessQueue processQueue = pullRequest.getProcessQueue();
if (processQueue.isDropped()) {log.info("the pull request[{}] is dropped.", pullRequest.toString());return;
}// 设置最后一次拉取时间
pullRequest.getProcessQueue().setLastPullTimestamp(System.currentTimeMillis());
复制代码
  1. 校验客户端运行状态
// 校验状态
this.makeSureStateOK();
private void makeSureStateOK() throws MQClientException {if (this.serviceState != ServiceState.RUNNING) {throw new MQClientException("The consumer service state not OK, "+ this.serviceState+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),null);}
}
复制代码

如果消费者状态不正确,则抛出异常,启动定时线程池过段时间回收 PullRequest 对象,以便pullMessageService能及时唤醒并再次执行消息拉取,这个逻辑在多个地方使用到了

public void executePullRequestLater(final PullRequest pullRequest, final long timeDelay) {if (!isStopped()) {this.scheduledExecutorService.schedule(new Runnable() {@Overridepublic void run() {PullMessageService.this.executePullRequestImmediately(pullRequest);}}, timeDelay, TimeUnit.MILLISECONDS);} else {log.warn("PullMessageServiceScheduledThread has shutdown");}
}
复制代码
public void executePullRequestImmediately(final PullRequest pullRequest) {try {// 最后将pullRequest放入pullRequestQueue中this.pullRequestQueue.put(pullRequest);} catch (InterruptedException e) {log.error("executePullRequestImmediately pullRequestQueue.put", e);}
}
复制代码
  1. 校验消费队列中的消息数量和大小是否符合设置

如果触发流量控制,则延迟拉取消息,先将 PullRequest 对象进行回收,以便pullMessageService能及时唤醒并再次执行消息拉取

// 缓存消息条数
long cachedMessageCount = processQueue.getMsgCount().get();
// 缓存消息的大小
long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024);
// 当队列中的消息跳过,超过设置 则延迟拉取消息
if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) {this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);if ((queueFlowControlTimes++ % 1000) == 0) {log.warn("the cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",this.defaultMQPushConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);}return;
}
复制代码
if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) {this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);if ((queueFlowControlTimes++ % 1000) == 0) {log.warn("the cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",this.defaultMQPushConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);}return;
}
复制代码
  1. 根据主题获取配置的订阅关系

这里通过查询 subscriptionInner Map容器,利用主题来获取对应的订阅关系,如果没有找到对应的订阅关系,则延迟拉取消息,先将 PullRequest 对象进行回收以便 pullMessageService 能及时唤醒并再次执行消息拉取

protected final ConcurrentMap subscriptionInner =new ConcurrentHashMap();
复制代码
final SubscriptionData subscriptionData = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
if (null == subscriptionData) {this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);log.warn("find the consumer's subscription failed, {}", pullRequest);return;
}
复制代码
  1. 如果为集群模式,则从内存中读取位置

通过消费者启动的模块中,我们知道RocketMQ是根据不同模式,将消息进度存储在不同的地方

广播模式:消息进度存储在本地文件

集群模式:消息进度存储在Broker 服务器上

boolean commitOffsetEnable = false;
long commitOffsetValue = 0L;
if (MessageModel.CLUSTERING == this.defaultMQPushConsumer.getMessageModel()) {// 从内存中读取位置commitOffsetValue = this.offsetStore.readOffset(pullRequest.getMessageQueue(), ReadOffsetType.READ_FROM_MEMORY);if (commitOffsetValue > 0) {commitOffsetEnable = true;}
}
复制代码
  1. 内核中拉取消息(最重要的模块)

入口:org.apache.rocketmq.client.impl.consumer.PullAPIWrapper#pullKernelImpl

public PullResult pullKernelImpl(final MessageQueue mq,final String subExpression,final String expressionType,final long subVersion,final long offset,final int maxNums,final int sysFlag,final long commitOffset,final long brokerSuspendMaxTimeMillis,final long timeoutMillis,final CommunicationMode communicationMode,final PullCallback pullCallback
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {}
复制代码

我们看到他有非常多的参数

拉取流程

  1. 通过BrokerName找到对应的Broker
// step 1 通过BrokerName找到对应的Broker
FindBrokerResult findBrokerResult =this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),this.recalculatePullFromWhichNode(mq), false);
复制代码
  1. 如果没有找到对应的,则更新路由信息
// step 2 如果没有找到对应的,则更新路由信息
if (null == findBrokerResult) {this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());findBrokerResult =this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),this.recalculatePullFromWhichNode(mq), false);
}
复制代码
  1. 检查Broker版本和Tag信息
// check version
if (!ExpressionType.isTagType(expressionType)&& findBrokerResult.getBrokerVersion() < MQVersion.Version.V4_1_0_SNAPSHOT.ordinal()) {throw new MQClientException("The broker[" + mq.getBrokerName() + ", "+ findBrokerResult.getBrokerVersion() + "] does not upgrade to support for filter message by " + expressionType, null);
}
复制代码
  1. 设置PullMessageRequestHeader
PullMessageRequestHeader requestHeader = new PullMessageRequestHeader();
requestHeader.setConsumerGroup(this.consumerGroup);
requestHeader.setTopic(mq.getTopic());
requestHeader.setQueueId(mq.getQueueId());
requestHeader.setQueueOffset(offset);
requestHeader.setMaxMsgNums(maxNums);
requestHeader.setSysFlag(sysFlagInner);
requestHeader.setCommitOffset(commitOffset);
requestHeader.setSuspendTimeoutMillis(brokerSuspendMaxTimeMillis);
requestHeader.setSubscription(subExpression);
requestHeader.setSubVersion(subVersion);
requestHeader.setExpressionType(expressionType);
复制代码
  1. 调用pullMessage方法拉取消息,返回拉取结果
PullResult pullResult = this.mQClientFactory.getMQClientAPIImpl().pullMessage(brokerAddr,requestHeader,timeoutMillis,communicationMode,pullCallback);
复制代码

因为 CommunicationMode 传递的是ASYNC,我们着重来看一下这个方法

入口: org.apache.rocketmq.client.impl.MQClientAPIImpl#pullMessageAsync

调用 this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback()

这里我们就先不细看了

拉取消息处理

  1. 如果PullCallback回调成功,则对结果进行处理
// 处理pullResult数据
pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,subscriptionData);
复制代码

主要做了三件事,转换消息格式、设置消息信息、放入msgFoundList

将pullResult 转成 PullResultExt,转换消息格式为List

PullResultExt pullResultExt = (PullResultExt) pullResult;// 转换消息格式为List
ByteBuffer byteBuffer = ByteBuffer.wrap(pullResultExt.getMessageBinary());
List msgList = MessageDecoder.decodes(byteBuffer);
复制代码

执行消息过滤,匹配符合的tag

if (!subscriptionData.getTagsSet().isEmpty() && !subscriptionData.isClassFilterMode()) {msgListFilterAgain = new ArrayList(msgList.size());for (MessageExt msg : msgList) {if (msg.getTags() != null) {if (subscriptionData.getTagsSet().contains(msg.getTags())) {msgListFilterAgain.add(msg);}}}
}
复制代码

设置消息的transactionId、扩展属性、BrokerName名称,放入List中

for (MessageExt msg : msgListFilterAgain) {String traFlag = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);if (Boolean.parseBoolean(traFlag)) {msg.setTransactionId(msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX));}MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MIN_OFFSET,Long.toString(pullResult.getMinOffset()));MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MAX_OFFSET,Long.toString(pullResult.getMaxOffset()));msg.setBrokerName(mq.getBrokerName());
}pullResultExt.setMsgFoundList(msgListFilterAgain);
复制代码

当pullStatus为FOUND,消息进行提交消费的请求

  1. 获取第一条消息的offset(偏移量)
// 获取第一条消息的offset
firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();
复制代码
  1. 将读取消息List,更新到processQueue的TreeMap里面
boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
复制代码

主要做了两件事,循环读取消息list,存入msgTreeMap和计算此次读取信息偏移量

public boolean putMessage(final List msgs) {boolean dispatchToConsume = false;try {// 上锁this.treeMapLock.writeLock().lockInterruptibly();try {int validMsgCnt = 0;// 循环读取消息list,存入msgTreeMapfor (MessageExt msg : msgs) {MessageExt old = msgTreeMap.put(msg.getQueueOffset(), msg);if (null == old) {validMsgCnt++;this.queueOffsetMax = msg.getQueueOffset();msgSize.addAndGet(msg.getBody().length);}}msgCount.addAndGet(validMsgCnt);if (!msgTreeMap.isEmpty() && !this.consuming) {dispatchToConsume = true;this.consuming = true;}if (!msgs.isEmpty()) {// 获取最后一条消息MessageExt messageExt = msgs.get(msgs.size() - 1);// 获取最大偏移量String property = messageExt.getProperty(MessageConst.PROPERTY_MAX_OFFSET);...}} finally {this.treeMapLock.writeLock().unlock();}}...
}
复制代码
  1. 提交消费请求,消息提交到内部的线程池
// 提交消费请求,消息提交到内部的线程池
DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(pullResult.getMsgFoundList(),processQueue,pullRequest.getMessageQueue(),dispatchToConsume);
复制代码

入口:org.apache.rocketmq.client.impl.consumer.ConsumeMessageService#submitConsumeRequest

获取 ConsumeRequest对象,拿到当前主题的监听器

这里拿到的监听器,就是我们在启动消费者的时候所注册的,监听到消息后执行相关的业务逻辑

consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List msgs,...return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});
复制代码

在这里触发我们在一开始重写的consumeMessage方法,这里msgs用Collections.unmodifiableList进行包装,意思就是不可以修改的,是一个只读的List

ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue);status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
复制代码
  1. ProcessQueue中移除已经处理的消息,同时更新Offset位置
ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);
复制代码
public void processConsumeResult(final ConsumeConcurrentlyStatus status,final ConsumeConcurrentlyContext context,final ConsumeRequest consumeRequest) {int ackIndex = context.getAckIndex();if (consumeRequest.getMsgs().isEmpty())return;switch (status) {case CONSUME_SUCCESS:if (ackIndex >= consumeRequest.getMsgs().size()) {ackIndex = consumeRequest.getMsgs().size() - 1;}int ok = ackIndex + 1;int failed = consumeRequest.getMsgs().size() - ok;this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), ok);this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), failed);break;...}switch (this.defaultMQPushConsumer.getMessageModel()) {...case CLUSTERING:List msgBackFailed = new ArrayList(consumeRequest.getMsgs().size());for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {MessageExt msg = consumeRequest.getMsgs().get(i);boolean result = this.sendMessageBack(msg, context);if (!result) {msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);msgBackFailed.add(msg);}}// 如果存在失败消息,则过5秒在定时执行if (!msgBackFailed.isEmpty()) {consumeRequest.getMsgs().removeAll(msgBackFailed);this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());}break;...}long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());// 更新Offset位置  if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);}}
复制代码
  1. 最后pullRequest放入pullRequestQueue中

入口:org.apache.rocketmq.client.impl.consumer.PullMessageService#executePullRequestImmediately

DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
复制代码

消息消费进度提交

  1. 成功消费一条消息后,更新本地缓存表
  2. 每5s向Broker提交消息消费进度
  3. Broker每5s将进度持久化到consumerOffset.json

总结

目前只是将整体的一个消费端监听消息的流程了解清楚,里面还有许多细节需要去推敲~

相关内容

热门资讯

喜欢穿一身黑的男生性格(喜欢穿... 今天百科达人给各位分享喜欢穿一身黑的男生性格的知识,其中也会对喜欢穿一身黑衣服的男人人好相处吗进行解...
发春是什么意思(思春和发春是什... 本篇文章极速百科给大家谈谈发春是什么意思,以及思春和发春是什么意思对应的知识点,希望对各位有所帮助,...
网络用语zl是什么意思(zl是... 今天给各位分享网络用语zl是什么意思的知识,其中也会对zl是啥意思是什么网络用语进行解释,如果能碰巧...
为什么酷狗音乐自己唱的歌不能下... 本篇文章极速百科小编给大家谈谈为什么酷狗音乐自己唱的歌不能下载到本地?,以及为什么酷狗下载的歌曲不是...
家里可以做假山养金鱼吗(假山能... 今天百科达人给各位分享家里可以做假山养金鱼吗的知识,其中也会对假山能放鱼缸里吗进行解释,如果能碰巧解...
华为下载未安装的文件去哪找(华... 今天百科达人给各位分享华为下载未安装的文件去哪找的知识,其中也会对华为下载未安装的文件去哪找到进行解...
四分五裂是什么生肖什么动物(四... 本篇文章极速百科小编给大家谈谈四分五裂是什么生肖什么动物,以及四分五裂打一生肖是什么对应的知识点,希...
怎么往应用助手里添加应用(应用... 今天百科达人给各位分享怎么往应用助手里添加应用的知识,其中也会对应用助手怎么添加微信进行解释,如果能...
苏州离哪个飞机场近(苏州离哪个... 本篇文章极速百科小编给大家谈谈苏州离哪个飞机场近,以及苏州离哪个飞机场近点对应的知识点,希望对各位有...
客厅放八骏马摆件可以吗(家里摆... 今天给各位分享客厅放八骏马摆件可以吗的知识,其中也会对家里摆八骏马摆件好吗进行解释,如果能碰巧解决你...