RocketMQ-nameserv
创始人
2024-05-30 18:20:57
0

RocketMQ-nameserv

接着上一篇,来看一下nameserv。nameserv作为rocketMQ的注册中心,保存着borker的路由信息。

看到上面它的作用,你有什么问题吗?你觉得这个系统的关键点是啥?我说一下我关注的地方。欢迎大家补充。

  1. 注册中心那最重要的就是注册,注册的信息怎么保存的,保存在哪里?我还想关注一下,更新数据的时候,对应并发是怎么处理的?

  2. 注册中心挂了怎么办?其实就是怎么保证高可用。是不是一主多从,主从数据自己同步,然后主挂了,从节点就顶上?但是其实它是多个机器都是主节点,broker注册的时候,给所有的nameserv都发注册信息。

  3. 有broker挂了,怎么及时发现?

  4. comsumer和producer查询路由信息是怎么查的?

我想到的问题就是这样。下面带着问题来看下代码。需要再说明一下,我的版本是5.0.0。

注册过程

其实注册信息都存储在各个本地变量中。

    private final Map> topicQueueTable;private final Map brokerAddrTable;private final Map> clusterAddrTable;private final Map brokerLiveTable;private final Map/* Filter Server */> filterServerTable;private final Map> topicQueueMappingInfoTable;   

可以看到就是各种map,而且不是ConcurrentHashMap,只是简单的Map。因为它控制并发是通过加锁来的。

看一下具体的注册过程,其实挺繁琐的,就是更新上面各个变量。

    public RegisterBrokerResult registerBroker(final String clusterName,final String brokerAddr,final String brokerName,final long brokerId,final String haServerAddr,final String zoneName,final Long timeoutMillis,final Boolean enableActingMaster,final TopicConfigSerializeWrapper topicConfigWrapper,final List filterServerList,final Channel channel) {RegisterBrokerResult result = new RegisterBrokerResult();try {// 加写锁this.lock.writeLock().lockInterruptibly();//init or update the cluster infoSet brokerNames = ConcurrentHashMapUtils.computeIfAbsent((ConcurrentHashMap>) this.clusterAddrTable, clusterName, k -> new HashSet<>());brokerNames.add(brokerName);boolean registerFirst = false;BrokerData brokerData = this.brokerAddrTable.get(brokerName);// 首次注册if (null == brokerData) {registerFirst = true;brokerData = new BrokerData(clusterName, brokerName, new HashMap<>());this.brokerAddrTable.put(brokerName, brokerData);}boolean isOldVersionBroker = enableActingMaster == null;brokerData.setEnableActingMaster(!isOldVersionBroker && enableActingMaster);brokerData.setZoneName(zoneName);Map brokerAddrsMap = brokerData.getBrokerAddrs();boolean isMinBrokerIdChanged = false;long prevMinBrokerId = 0;// 判断最小id是否变化if (!brokerAddrsMap.isEmpty()) {prevMinBrokerId = Collections.min(brokerAddrsMap.keySet());}if (brokerId < prevMinBrokerId) {isMinBrokerIdChanged = true;}//Switch slave to master: first remove <1, IP:PORT> in namesrv, then add <0, IP:PORT>//The same IP:PORT must only have one record in brokerAddrTablebrokerAddrsMap.entrySet().removeIf(item -> null != brokerAddr && brokerAddr.equals(item.getValue()) && brokerId != item.getKey());//If Local brokerId stateVersion bigger than the registering one,String oldBrokerAddr = brokerAddrsMap.get(brokerId);if (null != oldBrokerAddr && !oldBrokerAddr.equals(brokerAddr)) {BrokerLiveInfo oldBrokerInfo = brokerLiveTable.get(new BrokerAddrInfo(clusterName, oldBrokerAddr));if (null != oldBrokerInfo) {long oldStateVersion = oldBrokerInfo.getDataVersion().getStateVersion();long newStateVersion = topicConfigWrapper.getDataVersion().getStateVersion();if (oldStateVersion > newStateVersion) {log.warn("Registered Broker conflicts with the existed one, just ignore.: Cluster:{}, BrokerName:{}, BrokerId:{}, " +"Old BrokerAddr:{}, Old Version:{}, New BrokerAddr:{}, New Version:{}.",clusterName, brokerName, brokerId, oldBrokerAddr, oldStateVersion, brokerAddr, newStateVersion);//Remove the rejected brokerAddr from brokerLiveTable.brokerLiveTable.remove(new BrokerAddrInfo(clusterName, brokerAddr));return result;}}}if (!brokerAddrsMap.containsKey(brokerId) && topicConfigWrapper.getTopicConfigTable().size() == 1) {log.warn("Can't register topicConfigWrapper={} because broker[{}]={} has not registered.",topicConfigWrapper.getTopicConfigTable(), brokerId, brokerAddr);return null;}String oldAddr = brokerAddrsMap.put(brokerId, brokerAddr);registerFirst = registerFirst || (StringUtils.isEmpty(oldAddr));boolean isMaster = MixAll.MASTER_ID == brokerId;boolean isPrimeSlave = !isOldVersionBroker && !isMaster&& brokerId == Collections.min(brokerAddrsMap.keySet());// 如果是主节点,或者当主节点没了,是最主要的子节点。需要创建QueueDataif (null != topicConfigWrapper && (isMaster || isPrimeSlave)) {ConcurrentMap tcTable =topicConfigWrapper.getTopicConfigTable();if (tcTable != null) {for (Map.Entry entry : tcTable.entrySet()) {if (registerFirst || this.isTopicConfigChanged(clusterName, brokerAddr,topicConfigWrapper.getDataVersion(), brokerName,entry.getValue().getTopicName())) {final TopicConfig topicConfig = entry.getValue();if (isPrimeSlave) {// Wipe write perm for prime slavetopicConfig.setPerm(topicConfig.getPerm() & (~PermName.PERM_WRITE));}// 更新topicQueueTablethis.createAndUpdateQueueData(brokerName, topicConfig);}}}if (this.isBrokerTopicConfigChanged(clusterName, brokerAddr, topicConfigWrapper.getDataVersion()) || registerFirst) {TopicConfigAndMappingSerializeWrapper mappingSerializeWrapper = TopicConfigAndMappingSerializeWrapper.from(topicConfigWrapper);Map topicQueueMappingInfoMap = mappingSerializeWrapper.getTopicQueueMappingInfoMap();//the topicQueueMappingInfoMap should never be null, but can be emptyfor (Map.Entry entry : topicQueueMappingInfoMap.entrySet()) {if (!topicQueueMappingInfoTable.containsKey(entry.getKey())) {topicQueueMappingInfoTable.put(entry.getKey(), new HashMap<>());}//Note asset brokerName equal entry.getValue().getBname()//here use the mappingDetail.bnametopicQueueMappingInfoTable.get(entry.getKey()).put(entry.getValue().getBname(), entry.getValue());}}}BrokerAddrInfo brokerAddrInfo = new BrokerAddrInfo(clusterName, brokerAddr);// 直接更新当前的broker存活时间BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddrInfo,new BrokerLiveInfo(System.currentTimeMillis(),timeoutMillis == null ? DEFAULT_BROKER_CHANNEL_EXPIRED_TIME : timeoutMillis,topicConfigWrapper == null ? new DataVersion() : topicConfigWrapper.getDataVersion(),channel,haServerAddr));if (null == prevBrokerLiveInfo) {log.info("new broker registered, {} HAService: {}", brokerAddrInfo, haServerAddr);}if (filterServerList != null) {if (filterServerList.isEmpty()) {this.filterServerTable.remove(brokerAddrInfo);} else {this.filterServerTable.put(brokerAddrInfo, filterServerList);}}if (MixAll.MASTER_ID != brokerId) {String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);if (masterAddr != null) {BrokerAddrInfo masterAddrInfo = new BrokerAddrInfo(clusterName, masterAddr);BrokerLiveInfo masterLiveInfo = this.brokerLiveTable.get(masterAddrInfo);if (masterLiveInfo != null) {result.setHaServerAddr(masterLiveInfo.getHaServerAddr());result.setMasterAddr(masterAddr);}}}// 如果主从变化了,通知所有客户端if (isMinBrokerIdChanged && namesrvConfig.isNotifyMinBrokerIdChanged()) {notifyMinBrokerIdChanged(brokerAddrsMap, null,this.brokerLiveTable.get(brokerAddrInfo).getHaServerAddr());}} catch (Exception e) {log.error("registerBroker Exception", e);} finally {this.lock.writeLock().unlock();}return result;}

首先第一步就是加上写锁。这也解答了前面的问题,控制并发就是在最外层加个写锁。我原本以为会有细粒度的并发控制,根据topic控制啥的。现在想一下,borker发生变化的情况是比较少的,我们集群的发布一般也是一台一台发布,就算是好几台一起发,并发度也很低,至少都会差个几秒。所以说就是最简单的加锁就够了,最简单的就是最有效的。

其他的部分就自己看代码了,我也不想深入,感觉没有必要

然后第一个问题解答了,第二个问题怎么保证高可用,我觉得应该在其他查询nameserv的地方才可以知道,留着后面。

第三个问题,borker挂了,怎么及时发现?是通过心跳机制

心跳机制

nameserv在启动的时候,会开启一个定时任务

        this.scanExecutorService.scheduleAtFixedRate(NamesrvController.this.routeInfoManager::scanNotActiveBroker,5, this.namesrvConfig.getScanNotActiveBrokerInterval(), TimeUnit.MILLISECONDS);
    public void scanNotActiveBroker() {try {log.info("start scanNotActiveBroker");for (Entry next : this.brokerLiveTable.entrySet()) {long last = next.getValue().getLastUpdateTimestamp();long timeoutMillis = next.getValue().getHeartbeatTimeoutMillis();if ((last + timeoutMillis) < System.currentTimeMillis()) {RemotingUtil.closeChannel(next.getValue().getChannel());log.warn("The broker channel expired, {} {}ms", next.getKey(), timeoutMillis);this.onChannelDestroy(next.getKey());}}} catch (Exception e) {log.error("scanNotActiveBroker exception", e);}}

默认每隔5s,会去扫描brokerLiveTable的数据,判断是否超时了,如果超时了,就走取消注册的逻辑

    public void onChannelDestroy(BrokerAddrInfo brokerAddrInfo) {UnRegisterBrokerRequestHeader unRegisterRequest = new UnRegisterBrokerRequestHeader();boolean needUnRegister = false;if (brokerAddrInfo != null) {try {try {// 加上写锁this.lock.readLock().lockInterruptibly();needUnRegister = setupUnRegisterRequest(unRegisterRequest, brokerAddrInfo);} finally {this.lock.readLock().unlock();}} catch (Exception e) {log.error("onChannelDestroy Exception", e);}}if (needUnRegister) {// 这里会提交到队列boolean result = this.submitUnRegisterBrokerRequest(unRegisterRequest);log.info("the broker's channel destroyed, submit the unregister request at once, " +"broker info: {}, submit result: {}", unRegisterRequest, result);}}

最终会往队列里面提交一个请求

    public boolean submitUnRegisterBrokerRequest(UnRegisterBrokerRequestHeader unRegisterRequest) {return this.unRegisterService.submit(unRegisterRequest);}

然后启动的时候,还会启动一个取消注册的线程

    public void start() {this.unRegisterService.start();}

这个线程是BatchUnregistrationService,在它的run方法里面,会去从队列里面拿数据,负责具体的取消队列的逻辑。

    @Overridepublic void run() {while (!this.isStopped()) {try {final UnRegisterBrokerRequestHeader request = unregistrationQueue.take();Set unregistrationRequests = new HashSet<>();unregistrationQueue.drainTo(unregistrationRequests);// Add polled requestunregistrationRequests.add(request);this.routeInfoManager.unRegisterBroker(unregistrationRequests);} catch (Throwable e) {log.error("Handle unregister broker request failed", e);}}}

这里从队列里面取出请求,然后执行取消注册的方法。

    public void unRegisterBroker(Set unRegisterRequests) {try {Set removedBroker = new HashSet<>();Set reducedBroker = new HashSet<>();Map needNotifyBrokerMap = new HashMap<>();this.lock.writeLock().lockInterruptibly();for (final UnRegisterBrokerRequestHeader unRegisterRequest : unRegisterRequests) {final String brokerName = unRegisterRequest.getBrokerName();final String clusterName = unRegisterRequest.getClusterName();BrokerAddrInfo brokerAddrInfo = new BrokerAddrInfo(clusterName, unRegisterRequest.getBrokerAddr());// 从心跳移除BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.remove(brokerAddrInfo);log.info("unregisterBroker, remove from brokerLiveTable {}, {}",brokerLiveInfo != null ? "OK" : "Failed",brokerAddrInfo);this.filterServerTable.remove(brokerAddrInfo);boolean removeBrokerName = false;boolean isMinBrokerIdChanged = false;// 更新borker信息BrokerData brokerData = this.brokerAddrTable.get(brokerName);if (null != brokerData) {if (!brokerData.getBrokerAddrs().isEmpty() &&unRegisterRequest.getBrokerId().equals(Collections.min(brokerData.getBrokerAddrs().keySet()))) {isMinBrokerIdChanged = true;}String addr = brokerData.getBrokerAddrs().remove(unRegisterRequest.getBrokerId());log.info("unregisterBroker, remove addr from brokerAddrTable {}, {}",addr != null ? "OK" : "Failed",brokerAddrInfo);if (brokerData.getBrokerAddrs().isEmpty()) {this.brokerAddrTable.remove(brokerName);log.info("unregisterBroker, remove name from brokerAddrTable OK, {}",brokerName);removeBrokerName = true;} else if (isMinBrokerIdChanged) {needNotifyBrokerMap.put(brokerName, new BrokerStatusChangeInfo(brokerData.getBrokerAddrs(), addr, null));}}if (removeBrokerName) {Set nameSet = this.clusterAddrTable.get(clusterName);if (nameSet != null) {boolean removed = nameSet.remove(brokerName);log.info("unregisterBroker, remove name from clusterAddrTable {}, {}",removed ? "OK" : "Failed",brokerName);if (nameSet.isEmpty()) {this.clusterAddrTable.remove(clusterName);log.info("unregisterBroker, remove cluster from clusterAddrTable {}",clusterName);}}removedBroker.add(brokerName);} else {reducedBroker.add(brokerName);}}// 整个broker下线进行处理cleanTopicByUnRegisterRequests(removedBroker, reducedBroker);// 通知发生了主从切换if (!needNotifyBrokerMap.isEmpty() && namesrvConfig.isNotifyMinBrokerIdChanged()) {notifyMinBrokerIdChanged(needNotifyBrokerMap);}} catch (Exception e) {log.error("unregisterBroker Exception", e);} finally {this.lock.writeLock().unlock();}}

也是加了个锁,然后对相关数据进行更新。这里和注册一样,如果触发了主从切换,就要通知各个客户端。

到这里就解答了第三个问题:有broker挂了,怎么及时发现?

最后一个问题:comsumer和producer查询路由信息是怎么样的?

路由信息查询

org.apache.rocketmq.namesrv.NamesrvController#registerProcessor这里注册了两个processor

    private void registerProcessor() {if (namesrvConfig.isClusterTest()) {this.remotingServer.registerDefaultProcessor(new ClusterTestRequestProcessor(this, namesrvConfig.getProductEnvName()), this.defaultExecutor);} else {// Support get route info only temporarilyClientRequestProcessor clientRequestProcessor = new ClientRequestProcessor(this);this.remotingServer.registerProcessor(RequestCode.GET_ROUTEINFO_BY_TOPIC, clientRequestProcessor, this.clientRequestExecutor);this.remotingServer.registerDefaultProcessor(new DefaultRequestProcessor(this), this.defaultExecutor);}}

一个是客户端处理器,一个是默认的处理器。可以看到这两个处理器的线程池是不同的,是相互隔离的。因为客户端(消费者或者生产者)查询是比较重要的,所以单独弄个一个线程池。

主要请求是在ClientRequestProcessor#getRouteInfoByTopic中。里面有个org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#pickupTopicRouteData

    public TopicRouteData pickupTopicRouteData(final String topic) {TopicRouteData topicRouteData = new TopicRouteData();boolean foundQueueData = false;boolean foundBrokerData = false;List brokerDataList = new LinkedList<>();topicRouteData.setBrokerDatas(brokerDataList);HashMap> filterServerMap = new HashMap<>();topicRouteData.setFilterServerTable(filterServerMap);try {this.lock.readLock().lockInterruptibly();Map queueDataMap = this.topicQueueTable.get(topic);if (queueDataMap != null) {// 查询到的queueDatastopicRouteData.setQueueDatas(new ArrayList<>(queueDataMap.values()));foundQueueData = true;Set brokerNameSet = new HashSet<>(queueDataMap.keySet());for (String brokerName : brokerNameSet) {BrokerData brokerData = this.brokerAddrTable.get(brokerName);if (null == brokerData) {continue;}BrokerData brokerDataClone = new BrokerData(brokerData.getCluster(),brokerData.getBrokerName(),(HashMap) brokerData.getBrokerAddrs().clone(),brokerData.isEnableActingMaster(), brokerData.getZoneName());// 保存brokerDatasbrokerDataList.add(brokerDataClone);foundBrokerData = true;if (filterServerTable.isEmpty()) {continue;}// 查询filterServerfor (final String brokerAddr : brokerDataClone.getBrokerAddrs().values()) {BrokerAddrInfo brokerAddrInfo = new BrokerAddrInfo(brokerDataClone.getCluster(), brokerAddr);List filterServerList = this.filterServerTable.get(brokerAddrInfo);filterServerMap.put(brokerAddr, filterServerList);}}}} catch (Exception e) {log.error("pickupTopicRouteData Exception", e);} finally {this.lock.readLock().unlock();}log.debug("pickupTopicRouteData {} {}", topic, topicRouteData);// 后半段主要是判断一下是否需要切换masterif (foundBrokerData && foundQueueData) {if (topicRouteData == null) {return null;}topicRouteData.setTopicQueueMappingByBroker(this.topicQueueMappingInfoTable.get(topic));if (!namesrvConfig.isSupportActingMaster()) {return topicRouteData;}if (topic.startsWith(TopicValidator.SYNC_BROKER_MEMBER_GROUP_PREFIX)) {return topicRouteData;}if (topicRouteData.getBrokerDatas().size() == 0 || topicRouteData.getQueueDatas().size() == 0) {return topicRouteData;}boolean needActingMaster = false;for (final BrokerData brokerData : topicRouteData.getBrokerDatas()) {if (brokerData.getBrokerAddrs().size() != 0&& !brokerData.getBrokerAddrs().containsKey(MixAll.MASTER_ID)) {needActingMaster = true;break;}}if (!needActingMaster) {return topicRouteData;}for (final BrokerData brokerData : topicRouteData.getBrokerDatas()) {final HashMap brokerAddrs = brokerData.getBrokerAddrs();if (brokerAddrs.size() == 0 || brokerAddrs.containsKey(MixAll.MASTER_ID) || !brokerData.isEnableActingMaster()) {continue;}// No masterfor (final QueueData queueData : topicRouteData.getQueueDatas()) {if (queueData.getBrokerName().equals(brokerData.getBrokerName())) {if (!PermName.isWriteable(queueData.getPerm())) {final Long minBrokerId = Collections.min(brokerAddrs.keySet());final String actingMasterAddr = brokerAddrs.remove(minBrokerId);brokerAddrs.put(MixAll.MASTER_ID, actingMasterAddr);}break;}}}return topicRouteData;}return null;}

其实主要在前半段,首先加了一个写锁,然后分别查询topicQueueTable,brokerAddrTable,和filterServerTable这三个本地缓存获取到相应的数据,填充到返回体里面。

后半段就是看一下是否存在主节点,不存在的话,需要把返回的数据里面的最小的broker提升为主节点。

好了,本篇结束,可以看到nameserv是比较简单的,rocketMq之前是用zk来做这个角色的,现在发现其实自己写一个简单的模块就可以实现功能,没必要引入zk搞复杂了。所以就有了现在的nameserv。

好了,下期预告:下一篇我们讲broker。

相关内容

热门资讯

喜欢穿一身黑的男生性格(喜欢穿... 今天百科达人给各位分享喜欢穿一身黑的男生性格的知识,其中也会对喜欢穿一身黑衣服的男人人好相处吗进行解...
发春是什么意思(思春和发春是什... 本篇文章极速百科给大家谈谈发春是什么意思,以及思春和发春是什么意思对应的知识点,希望对各位有所帮助,...
网络用语zl是什么意思(zl是... 今天给各位分享网络用语zl是什么意思的知识,其中也会对zl是啥意思是什么网络用语进行解释,如果能碰巧...
为什么酷狗音乐自己唱的歌不能下... 本篇文章极速百科小编给大家谈谈为什么酷狗音乐自己唱的歌不能下载到本地?,以及为什么酷狗下载的歌曲不是...
华为下载未安装的文件去哪找(华... 今天百科达人给各位分享华为下载未安装的文件去哪找的知识,其中也会对华为下载未安装的文件去哪找到进行解...
怎么往应用助手里添加应用(应用... 今天百科达人给各位分享怎么往应用助手里添加应用的知识,其中也会对应用助手怎么添加微信进行解释,如果能...
家里可以做假山养金鱼吗(假山能... 今天百科达人给各位分享家里可以做假山养金鱼吗的知识,其中也会对假山能放鱼缸里吗进行解释,如果能碰巧解...
一帆风顺二龙腾飞三阳开泰祝福语... 本篇文章极速百科给大家谈谈一帆风顺二龙腾飞三阳开泰祝福语,以及一帆风顺二龙腾飞三阳开泰祝福语结婚对应...
四分五裂是什么生肖什么动物(四... 本篇文章极速百科小编给大家谈谈四分五裂是什么生肖什么动物,以及四分五裂打一生肖是什么对应的知识点,希...
美团联名卡审核成功待激活(美团... 今天百科达人给各位分享美团联名卡审核成功待激活的知识,其中也会对美团联名卡审核未通过进行解释,如果能...