接着上一篇,来看一下nameserv。nameserv作为rocketMQ的注册中心,保存着borker的路由信息。
看到上面它的作用,你有什么问题吗?你觉得这个系统的关键点是啥?我说一下我关注的地方。欢迎大家补充。
注册中心那最重要的就是注册,注册的信息怎么保存的,保存在哪里?我还想关注一下,更新数据的时候,对应并发是怎么处理的?
注册中心挂了怎么办?其实就是怎么保证高可用。是不是一主多从,主从数据自己同步,然后主挂了,从节点就顶上?但是其实它是多个机器都是主节点,broker注册的时候,给所有的nameserv都发注册信息。
有broker挂了,怎么及时发现?
comsumer和producer查询路由信息是怎么查的?
我想到的问题就是这样。下面带着问题来看下代码。需要再说明一下,我的版本是5.0.0。
其实注册信息都存储在各个本地变量中。
private final Map> topicQueueTable;private final Map brokerAddrTable;private final Map > clusterAddrTable;private final Map brokerLiveTable;private final Map/* Filter Server */> filterServerTable;private final Map> topicQueueMappingInfoTable;
可以看到就是各种map,而且不是ConcurrentHashMap,只是简单的Map。因为它控制并发是通过加锁来的。
看一下具体的注册过程,其实挺繁琐的,就是更新上面各个变量。
public RegisterBrokerResult registerBroker(final String clusterName,final String brokerAddr,final String brokerName,final long brokerId,final String haServerAddr,final String zoneName,final Long timeoutMillis,final Boolean enableActingMaster,final TopicConfigSerializeWrapper topicConfigWrapper,final List filterServerList,final Channel channel) {RegisterBrokerResult result = new RegisterBrokerResult();try {// 加写锁this.lock.writeLock().lockInterruptibly();//init or update the cluster infoSet brokerNames = ConcurrentHashMapUtils.computeIfAbsent((ConcurrentHashMap>) this.clusterAddrTable, clusterName, k -> new HashSet<>());brokerNames.add(brokerName);boolean registerFirst = false;BrokerData brokerData = this.brokerAddrTable.get(brokerName);// 首次注册if (null == brokerData) {registerFirst = true;brokerData = new BrokerData(clusterName, brokerName, new HashMap<>());this.brokerAddrTable.put(brokerName, brokerData);}boolean isOldVersionBroker = enableActingMaster == null;brokerData.setEnableActingMaster(!isOldVersionBroker && enableActingMaster);brokerData.setZoneName(zoneName);Map brokerAddrsMap = brokerData.getBrokerAddrs();boolean isMinBrokerIdChanged = false;long prevMinBrokerId = 0;// 判断最小id是否变化if (!brokerAddrsMap.isEmpty()) {prevMinBrokerId = Collections.min(brokerAddrsMap.keySet());}if (brokerId < prevMinBrokerId) {isMinBrokerIdChanged = true;}//Switch slave to master: first remove <1, IP:PORT> in namesrv, then add <0, IP:PORT>//The same IP:PORT must only have one record in brokerAddrTablebrokerAddrsMap.entrySet().removeIf(item -> null != brokerAddr && brokerAddr.equals(item.getValue()) && brokerId != item.getKey());//If Local brokerId stateVersion bigger than the registering one,String oldBrokerAddr = brokerAddrsMap.get(brokerId);if (null != oldBrokerAddr && !oldBrokerAddr.equals(brokerAddr)) {BrokerLiveInfo oldBrokerInfo = brokerLiveTable.get(new BrokerAddrInfo(clusterName, oldBrokerAddr));if (null != oldBrokerInfo) {long oldStateVersion = oldBrokerInfo.getDataVersion().getStateVersion();long newStateVersion = topicConfigWrapper.getDataVersion().getStateVersion();if (oldStateVersion > newStateVersion) {log.warn("Registered Broker conflicts with the existed one, just ignore.: Cluster:{}, BrokerName:{}, BrokerId:{}, " +"Old BrokerAddr:{}, Old Version:{}, New BrokerAddr:{}, New Version:{}.",clusterName, brokerName, brokerId, oldBrokerAddr, oldStateVersion, brokerAddr, newStateVersion);//Remove the rejected brokerAddr from brokerLiveTable.brokerLiveTable.remove(new BrokerAddrInfo(clusterName, brokerAddr));return result;}}}if (!brokerAddrsMap.containsKey(brokerId) && topicConfigWrapper.getTopicConfigTable().size() == 1) {log.warn("Can't register topicConfigWrapper={} because broker[{}]={} has not registered.",topicConfigWrapper.getTopicConfigTable(), brokerId, brokerAddr);return null;}String oldAddr = brokerAddrsMap.put(brokerId, brokerAddr);registerFirst = registerFirst || (StringUtils.isEmpty(oldAddr));boolean isMaster = MixAll.MASTER_ID == brokerId;boolean isPrimeSlave = !isOldVersionBroker && !isMaster&& brokerId == Collections.min(brokerAddrsMap.keySet());// 如果是主节点,或者当主节点没了,是最主要的子节点。需要创建QueueDataif (null != topicConfigWrapper && (isMaster || isPrimeSlave)) {ConcurrentMap tcTable =topicConfigWrapper.getTopicConfigTable();if (tcTable != null) {for (Map.Entry entry : tcTable.entrySet()) {if (registerFirst || this.isTopicConfigChanged(clusterName, brokerAddr,topicConfigWrapper.getDataVersion(), brokerName,entry.getValue().getTopicName())) {final TopicConfig topicConfig = entry.getValue();if (isPrimeSlave) {// Wipe write perm for prime slavetopicConfig.setPerm(topicConfig.getPerm() & (~PermName.PERM_WRITE));}// 更新topicQueueTablethis.createAndUpdateQueueData(brokerName, topicConfig);}}}if (this.isBrokerTopicConfigChanged(clusterName, brokerAddr, topicConfigWrapper.getDataVersion()) || registerFirst) {TopicConfigAndMappingSerializeWrapper mappingSerializeWrapper = TopicConfigAndMappingSerializeWrapper.from(topicConfigWrapper);Map topicQueueMappingInfoMap = mappingSerializeWrapper.getTopicQueueMappingInfoMap();//the topicQueueMappingInfoMap should never be null, but can be emptyfor (Map.Entry entry : topicQueueMappingInfoMap.entrySet()) {if (!topicQueueMappingInfoTable.containsKey(entry.getKey())) {topicQueueMappingInfoTable.put(entry.getKey(), new HashMap<>());}//Note asset brokerName equal entry.getValue().getBname()//here use the mappingDetail.bnametopicQueueMappingInfoTable.get(entry.getKey()).put(entry.getValue().getBname(), entry.getValue());}}}BrokerAddrInfo brokerAddrInfo = new BrokerAddrInfo(clusterName, brokerAddr);// 直接更新当前的broker存活时间BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddrInfo,new BrokerLiveInfo(System.currentTimeMillis(),timeoutMillis == null ? DEFAULT_BROKER_CHANNEL_EXPIRED_TIME : timeoutMillis,topicConfigWrapper == null ? new DataVersion() : topicConfigWrapper.getDataVersion(),channel,haServerAddr));if (null == prevBrokerLiveInfo) {log.info("new broker registered, {} HAService: {}", brokerAddrInfo, haServerAddr);}if (filterServerList != null) {if (filterServerList.isEmpty()) {this.filterServerTable.remove(brokerAddrInfo);} else {this.filterServerTable.put(brokerAddrInfo, filterServerList);}}if (MixAll.MASTER_ID != brokerId) {String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);if (masterAddr != null) {BrokerAddrInfo masterAddrInfo = new BrokerAddrInfo(clusterName, masterAddr);BrokerLiveInfo masterLiveInfo = this.brokerLiveTable.get(masterAddrInfo);if (masterLiveInfo != null) {result.setHaServerAddr(masterLiveInfo.getHaServerAddr());result.setMasterAddr(masterAddr);}}}// 如果主从变化了,通知所有客户端if (isMinBrokerIdChanged && namesrvConfig.isNotifyMinBrokerIdChanged()) {notifyMinBrokerIdChanged(brokerAddrsMap, null,this.brokerLiveTable.get(brokerAddrInfo).getHaServerAddr());}} catch (Exception e) {log.error("registerBroker Exception", e);} finally {this.lock.writeLock().unlock();}return result;}
首先第一步就是加上写锁。这也解答了前面的问题,控制并发就是在最外层加个写锁。我原本以为会有细粒度的并发控制,根据topic控制啥的。现在想一下,borker发生变化的情况是比较少的,我们集群的发布一般也是一台一台发布,就算是好几台一起发,并发度也很低,至少都会差个几秒。所以说就是最简单的加锁就够了,最简单的就是最有效的。
其他的部分就自己看代码了,我也不想深入,感觉没有必要
然后第一个问题解答了,第二个问题怎么保证高可用,我觉得应该在其他查询nameserv的地方才可以知道,留着后面。
第三个问题,borker挂了,怎么及时发现?是通过心跳机制
nameserv在启动的时候,会开启一个定时任务
this.scanExecutorService.scheduleAtFixedRate(NamesrvController.this.routeInfoManager::scanNotActiveBroker,5, this.namesrvConfig.getScanNotActiveBrokerInterval(), TimeUnit.MILLISECONDS);
public void scanNotActiveBroker() {try {log.info("start scanNotActiveBroker");for (Entry next : this.brokerLiveTable.entrySet()) {long last = next.getValue().getLastUpdateTimestamp();long timeoutMillis = next.getValue().getHeartbeatTimeoutMillis();if ((last + timeoutMillis) < System.currentTimeMillis()) {RemotingUtil.closeChannel(next.getValue().getChannel());log.warn("The broker channel expired, {} {}ms", next.getKey(), timeoutMillis);this.onChannelDestroy(next.getKey());}}} catch (Exception e) {log.error("scanNotActiveBroker exception", e);}}
默认每隔5s,会去扫描brokerLiveTable的数据,判断是否超时了,如果超时了,就走取消注册的逻辑
public void onChannelDestroy(BrokerAddrInfo brokerAddrInfo) {UnRegisterBrokerRequestHeader unRegisterRequest = new UnRegisterBrokerRequestHeader();boolean needUnRegister = false;if (brokerAddrInfo != null) {try {try {// 加上写锁this.lock.readLock().lockInterruptibly();needUnRegister = setupUnRegisterRequest(unRegisterRequest, brokerAddrInfo);} finally {this.lock.readLock().unlock();}} catch (Exception e) {log.error("onChannelDestroy Exception", e);}}if (needUnRegister) {// 这里会提交到队列boolean result = this.submitUnRegisterBrokerRequest(unRegisterRequest);log.info("the broker's channel destroyed, submit the unregister request at once, " +"broker info: {}, submit result: {}", unRegisterRequest, result);}}
最终会往队列里面提交一个请求
public boolean submitUnRegisterBrokerRequest(UnRegisterBrokerRequestHeader unRegisterRequest) {return this.unRegisterService.submit(unRegisterRequest);}
然后启动的时候,还会启动一个取消注册的线程
public void start() {this.unRegisterService.start();}
这个线程是BatchUnregistrationService,在它的run方法里面,会去从队列里面拿数据,负责具体的取消队列的逻辑。
@Overridepublic void run() {while (!this.isStopped()) {try {final UnRegisterBrokerRequestHeader request = unregistrationQueue.take();Set unregistrationRequests = new HashSet<>();unregistrationQueue.drainTo(unregistrationRequests);// Add polled requestunregistrationRequests.add(request);this.routeInfoManager.unRegisterBroker(unregistrationRequests);} catch (Throwable e) {log.error("Handle unregister broker request failed", e);}}}
这里从队列里面取出请求,然后执行取消注册的方法。
public void unRegisterBroker(Set unRegisterRequests) {try {Set removedBroker = new HashSet<>();Set reducedBroker = new HashSet<>();Map needNotifyBrokerMap = new HashMap<>();this.lock.writeLock().lockInterruptibly();for (final UnRegisterBrokerRequestHeader unRegisterRequest : unRegisterRequests) {final String brokerName = unRegisterRequest.getBrokerName();final String clusterName = unRegisterRequest.getClusterName();BrokerAddrInfo brokerAddrInfo = new BrokerAddrInfo(clusterName, unRegisterRequest.getBrokerAddr());// 从心跳移除BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.remove(brokerAddrInfo);log.info("unregisterBroker, remove from brokerLiveTable {}, {}",brokerLiveInfo != null ? "OK" : "Failed",brokerAddrInfo);this.filterServerTable.remove(brokerAddrInfo);boolean removeBrokerName = false;boolean isMinBrokerIdChanged = false;// 更新borker信息BrokerData brokerData = this.brokerAddrTable.get(brokerName);if (null != brokerData) {if (!brokerData.getBrokerAddrs().isEmpty() &&unRegisterRequest.getBrokerId().equals(Collections.min(brokerData.getBrokerAddrs().keySet()))) {isMinBrokerIdChanged = true;}String addr = brokerData.getBrokerAddrs().remove(unRegisterRequest.getBrokerId());log.info("unregisterBroker, remove addr from brokerAddrTable {}, {}",addr != null ? "OK" : "Failed",brokerAddrInfo);if (brokerData.getBrokerAddrs().isEmpty()) {this.brokerAddrTable.remove(brokerName);log.info("unregisterBroker, remove name from brokerAddrTable OK, {}",brokerName);removeBrokerName = true;} else if (isMinBrokerIdChanged) {needNotifyBrokerMap.put(brokerName, new BrokerStatusChangeInfo(brokerData.getBrokerAddrs(), addr, null));}}if (removeBrokerName) {Set nameSet = this.clusterAddrTable.get(clusterName);if (nameSet != null) {boolean removed = nameSet.remove(brokerName);log.info("unregisterBroker, remove name from clusterAddrTable {}, {}",removed ? "OK" : "Failed",brokerName);if (nameSet.isEmpty()) {this.clusterAddrTable.remove(clusterName);log.info("unregisterBroker, remove cluster from clusterAddrTable {}",clusterName);}}removedBroker.add(brokerName);} else {reducedBroker.add(brokerName);}}// 整个broker下线进行处理cleanTopicByUnRegisterRequests(removedBroker, reducedBroker);// 通知发生了主从切换if (!needNotifyBrokerMap.isEmpty() && namesrvConfig.isNotifyMinBrokerIdChanged()) {notifyMinBrokerIdChanged(needNotifyBrokerMap);}} catch (Exception e) {log.error("unregisterBroker Exception", e);} finally {this.lock.writeLock().unlock();}}
也是加了个锁,然后对相关数据进行更新。这里和注册一样,如果触发了主从切换,就要通知各个客户端。
到这里就解答了第三个问题:有broker挂了,怎么及时发现?
最后一个问题:comsumer和producer查询路由信息是怎么样的?
org.apache.rocketmq.namesrv.NamesrvController#registerProcessor这里注册了两个processor
private void registerProcessor() {if (namesrvConfig.isClusterTest()) {this.remotingServer.registerDefaultProcessor(new ClusterTestRequestProcessor(this, namesrvConfig.getProductEnvName()), this.defaultExecutor);} else {// Support get route info only temporarilyClientRequestProcessor clientRequestProcessor = new ClientRequestProcessor(this);this.remotingServer.registerProcessor(RequestCode.GET_ROUTEINFO_BY_TOPIC, clientRequestProcessor, this.clientRequestExecutor);this.remotingServer.registerDefaultProcessor(new DefaultRequestProcessor(this), this.defaultExecutor);}}
一个是客户端处理器,一个是默认的处理器。可以看到这两个处理器的线程池是不同的,是相互隔离的。因为客户端(消费者或者生产者)查询是比较重要的,所以单独弄个一个线程池。
主要请求是在ClientRequestProcessor#getRouteInfoByTopic中。里面有个org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#pickupTopicRouteData
public TopicRouteData pickupTopicRouteData(final String topic) {TopicRouteData topicRouteData = new TopicRouteData();boolean foundQueueData = false;boolean foundBrokerData = false;List brokerDataList = new LinkedList<>();topicRouteData.setBrokerDatas(brokerDataList);HashMap> filterServerMap = new HashMap<>();topicRouteData.setFilterServerTable(filterServerMap);try {this.lock.readLock().lockInterruptibly();Map queueDataMap = this.topicQueueTable.get(topic);if (queueDataMap != null) {// 查询到的queueDatastopicRouteData.setQueueDatas(new ArrayList<>(queueDataMap.values()));foundQueueData = true;Set brokerNameSet = new HashSet<>(queueDataMap.keySet());for (String brokerName : brokerNameSet) {BrokerData brokerData = this.brokerAddrTable.get(brokerName);if (null == brokerData) {continue;}BrokerData brokerDataClone = new BrokerData(brokerData.getCluster(),brokerData.getBrokerName(),(HashMap) brokerData.getBrokerAddrs().clone(),brokerData.isEnableActingMaster(), brokerData.getZoneName());// 保存brokerDatasbrokerDataList.add(brokerDataClone);foundBrokerData = true;if (filterServerTable.isEmpty()) {continue;}// 查询filterServerfor (final String brokerAddr : brokerDataClone.getBrokerAddrs().values()) {BrokerAddrInfo brokerAddrInfo = new BrokerAddrInfo(brokerDataClone.getCluster(), brokerAddr);List filterServerList = this.filterServerTable.get(brokerAddrInfo);filterServerMap.put(brokerAddr, filterServerList);}}}} catch (Exception e) {log.error("pickupTopicRouteData Exception", e);} finally {this.lock.readLock().unlock();}log.debug("pickupTopicRouteData {} {}", topic, topicRouteData);// 后半段主要是判断一下是否需要切换masterif (foundBrokerData && foundQueueData) {if (topicRouteData == null) {return null;}topicRouteData.setTopicQueueMappingByBroker(this.topicQueueMappingInfoTable.get(topic));if (!namesrvConfig.isSupportActingMaster()) {return topicRouteData;}if (topic.startsWith(TopicValidator.SYNC_BROKER_MEMBER_GROUP_PREFIX)) {return topicRouteData;}if (topicRouteData.getBrokerDatas().size() == 0 || topicRouteData.getQueueDatas().size() == 0) {return topicRouteData;}boolean needActingMaster = false;for (final BrokerData brokerData : topicRouteData.getBrokerDatas()) {if (brokerData.getBrokerAddrs().size() != 0&& !brokerData.getBrokerAddrs().containsKey(MixAll.MASTER_ID)) {needActingMaster = true;break;}}if (!needActingMaster) {return topicRouteData;}for (final BrokerData brokerData : topicRouteData.getBrokerDatas()) {final HashMap brokerAddrs = brokerData.getBrokerAddrs();if (brokerAddrs.size() == 0 || brokerAddrs.containsKey(MixAll.MASTER_ID) || !brokerData.isEnableActingMaster()) {continue;}// No masterfor (final QueueData queueData : topicRouteData.getQueueDatas()) {if (queueData.getBrokerName().equals(brokerData.getBrokerName())) {if (!PermName.isWriteable(queueData.getPerm())) {final Long minBrokerId = Collections.min(brokerAddrs.keySet());final String actingMasterAddr = brokerAddrs.remove(minBrokerId);brokerAddrs.put(MixAll.MASTER_ID, actingMasterAddr);}break;}}}return topicRouteData;}return null;}
其实主要在前半段,首先加了一个写锁,然后分别查询topicQueueTable,brokerAddrTable,和filterServerTable这三个本地缓存获取到相应的数据,填充到返回体里面。
后半段就是看一下是否存在主节点,不存在的话,需要把返回的数据里面的最小的broker提升为主节点。
好了,本篇结束,可以看到nameserv是比较简单的,rocketMq之前是用zk来做这个角色的,现在发现其实自己写一个简单的模块就可以实现功能,没必要引入zk搞复杂了。所以就有了现在的nameserv。
好了,下期预告:下一篇我们讲broker。