Nacos 配置中心 服务端推送变更源码讲解
创始人
2024-05-19 18:28:30
0

目录

1. 配置引起变更的两种方式

1.1 后台管理直接操作

1.2 NacosClient 调用 RPC 接口

2. 变更事件处理 AsyncNotifyService

2.1 HTTP 任务

2.2 RPC任务

2.3 NacosServer 其他节点接收到消息后如何处理

3. 客户端推送实现:DumpService.dump


接着上一篇 Nacos 配置中心源码讲解 继续

当配置发生变更时,NacosServer 会对配置变更的客户端主动推送消息。

那么 Nacos 是如何实现的呢?

1. 配置引起变更的两种方式

配置变更?配置如何才会产生变化呢?有两种方式

1.1 后台管理直接操作

在后台管理修改配置并发布时就发生了变更,而在发布配置接口上可见以下源码:

com.alibaba.nacos.config.server.service.ConfigOperationService#publishConfig

ConfigChangePublisher.notifyConfigChange(new ConfigDataChangeEvent(false, configForm.getDataId(), configForm.getGroup(),configForm.getNamespaceId(), time.getTime()));

这段代码代表发布了一个事件:ConfigDataChangeEvent 配置数据改变事件。

1.2 NacosClient 调用 RPC 接口

NacosClient 操作发布配置

NacosServer端源码

com.alibaba.nacos.config.server.remote.ConfigPublishRequestHandler#handle

可见 RPC Server 端,最终同样会发布 ConfigDataChangeEvent 配置数据改变事件。

2. 变更事件处理 AsyncNotifyService

事件发出了,那就得有人处理

在 Nacos 中,负责处理的类就是 com.alibaba.nacos.config.server.service.notify.AsyncNotifyService

可见在 AsyncNotifyService 类构造器中,注册了专门处理 ConfigDataChangeEvent 的消费者,onEvent 方法将回调所有发送过来的事件。

看看 onEvent 如何处理 ConfigDataChangeEvent 事件的:

@Override
public void onEvent(Event event) {ConfigDataChangeEvent evt = (ConfigDataChangeEvent) event;// 获取到 event 上携带的数据long dumpTs = evt.lastModifiedTs;String dataId = evt.dataId;String group = evt.group;String tenant = evt.tenant;String tag = evt.tag;// 统计数据累加MetricsMonitor.incrementConfigChangeCount(tenant, group, dataId);// 获取到整个集群中的所有成员节点Collection ipList = memberManager.allMembers();// 任务队列,一个 HTTP 请求任务、一个 RPC 请求任务Queue httpQueue = new LinkedList<>();Queue rpcQueue = new LinkedList<>();// 循环所有成员,决定好每个成员使用什么类型任务for (Member member : ipList) {if (!MemberUtil.isSupportedLongCon(member)) {// 当前集群成员节点是不是支持长连接,不支持放入 HTTP 队列里httpQueue.add(new NotifySingleTask(dataId, group, tenant, tag, dumpTs, member.getAddress(),evt.isBeta));} else {// 支持长连接放入 RPC 队列里rpcQueue.add(new NotifySingleRpcTask(dataId, group, tenant, tag, dumpTs, evt.isBeta, member));}}// 也就是说,上面根据每个成员的特点创建了他们使用的任务类型的 task// 现在开始使用小程序执行两个队列里的任务if (!httpQueue.isEmpty()) {ConfigExecutor.executeAsyncNotify(new AsyncTask(nacosAsyncRestTemplate, httpQueue));}
​if (!rpcQueue.isEmpty()) {ConfigExecutor.executeAsyncNotify(new AsyncRpcTask(rpcQueue));}}

任务分为两种类型, HTTP 任务、 RPC 任务

2.1 HTTP 任务

首先先看简单一点的 HTTP 任务, 见上方 onEvent 源码,HTTP 任务传入了新建的 AsyncTask 对象里

看看 AsyncTask 内部做了什么:

class AsyncTask implements Runnable {private Queue queue;private NacosAsyncRestTemplate restTemplate;public AsyncTask(NacosAsyncRestTemplate restTemplate, Queue queue) {this.restTemplate = restTemplate;this.queue = queue;}@Overridepublic void run() {executeAsyncInvoke();}private void executeAsyncInvoke() {while (!queue.isEmpty()) {// 从队列中取出一个任务NotifySingleTask task = queue.poll();String targetIp = task.getTargetIP();if (memberManager.hasMember(targetIp)) {// 进来了代表节点还在线boolean unHealthNeedDelay = memberManager.isUnHealth(targetIp);if (unHealthNeedDelay) {// 节点不健康了,任务延迟执行ConfigTraceService.logNotifyEvent(task.getDataId(), task.getGroup(), task.getTenant(), null,task.getLastModified(), InetUtils.getSelfIP(), ConfigTraceService.NOTIFY_EVENT_UNHEALTH,0, task.target);// 延迟执行任务asyncTaskExecute(task);} else {// 构建请求头Header header = Header.newInstance();header.addParam(NotifyService.NOTIFY_HEADER_LAST_MODIFIED,String.valueOf(task.getLastModified()));header.addParam(NotifyService.NOTIFY_HEADER_OP_HANDLE_IP, InetUtils.getSelfIP());
​if (task.isBeta) {header.addParam("isBeta", "true");}// 添加 认证 请求头AuthHeaderUtil.addIdentityToHeader(header);// 发送 HTTP 请求restTemplate.get(task.url, header, Query.EMPTY, String.class, new AsyncNotifyCallBack(task));}}}}}

2.2 RPC任务

class AsyncRpcTask implements Runnable {private Queue queue;public AsyncRpcTask(Queue queue) {this.queue = queue;}@Overridepublic void run() {while (!queue.isEmpty()) {// 取出一个任务NotifySingleRpcTask task = queue.poll();// 构建一个请求对象ConfigChangeClusterSyncRequest syncRequest = new ConfigChangeClusterSyncRequest();syncRequest.setDataId(task.getDataId());syncRequest.setGroup(task.getGroup());syncRequest.setBeta(task.isBeta);syncRequest.setLastModified(task.getLastModified());syncRequest.setTag(task.tag);syncRequest.setTenant(task.getTenant());Member member = task.member;if (memberManager.getSelf().equals(member)) {// 节点如果是 自己if (syncRequest.isBeta()) {dumpService.dump(syncRequest.getDataId(), syncRequest.getGroup(), syncRequest.getTenant(),syncRequest.getLastModified(), NetUtils.localIP(), true);} else {dumpService.dump(syncRequest.getDataId(), syncRequest.getGroup(), syncRequest.getTenant(),syncRequest.getTag(), syncRequest.getLastModified(), NetUtils.localIP());}continue;}if (memberManager.hasMember(member.getAddress())) {// 节点还没下线boolean unHealthNeedDelay = memberManager.isUnHealth(member.getAddress());if (unHealthNeedDelay) {// 节点不健康了,任务延迟执行ConfigTraceService.logNotifyEvent(task.getDataId(), task.getGroup(), task.getTenant(), null,task.getLastModified(), InetUtils.getSelfIP(), ConfigTraceService.NOTIFY_EVENT_UNHEALTH,0, member.getAddress());// 延迟执行任务asyncTaskExecute(task);} else {if (!MemberUtil.isSupportedLongCon(member)) {// 如果当前节点不支持长连接,就使用 HTTP 方式// 上面判断过长连接,这里再判断是因为不止上面一种方式创建这个任务asyncTaskExecute(new NotifySingleTask(task.getDataId(), task.getGroup(), task.getTenant(), task.tag,task.getLastModified(), member.getAddress(), task.isBeta));} else {try {// 发送 RPC 请求configClusterRpcClientProxy.syncConfigChange(member, syncRequest, new AsyncRpcNotifyCallBack(task));} catch (Exception e) {// 统计数据维护MetricsMonitor.getConfigNotifyException().increment();asyncTaskExecute(task);}}}} else {}}}
}

以上两种方式本质都是发送了一个请求,HTTP 方式发送了HTTP 请求,RPC 长连接方式发送了一个 RPC 请求,并且专门处理了当前节点的逻辑。

2.3 NacosServer 其他节点接收到消息后如何处理

看看 HTTP 方式调用的 接口:

/communication/dataChange

最后调用了 dumpService.dump 方法

RPC 方式发送的请求后,以下是接收端的处理方法:

发送请求时判断了如果是当前节点则进行的处理:

 最后调用了 dumpService.dump 方法

画个图总结下现在的情况:

为什么配置变更了要通知集群中的全部节点呢?

这里并不是同步什么数据,而是因为客户端可能注册到不同的集群节点上,而如果只推送当前节点上注册的客户端,那会导致其他客户端明明也监听了某个配置,但是配置变化了却无法推送过来。

因为集群中的每个节点可能都接受不同客户端的连接

3. 客户端推送实现:DumpService.dump

DumpService.dump

这个方法就是关键之处了, 因为它完成了向客户端推送功能,看看如何实现:

// com.alibaba.nacos.config.server.service.dump.DumpService#dump(java.lang.String, java.lang.String, java.lang.String, long, java.lang.String, boolean)
​
private TaskManager dumpTaskMgr;
​
public void dump(String dataId, String group, String tenant, long lastModified, String handleIp, boolean isBeta) {String groupKey = GroupKey2.getKey(dataId, group, tenant);String taskKey = String.join("+", dataId, group, tenant, String.valueOf(isBeta));dumpTaskMgr.addTask(taskKey, new DumpTask(groupKey, lastModified, handleIp, isBeta));DUMP_LOG.info("[dump-task] add task. groupKey={}, taskKey={}", groupKey, taskKey);
}

最终调用了以下代码:

public class NacosDelayTaskExecuteEngine extends AbstractNacosTaskExecuteEngine {
​protected final ConcurrentHashMap tasks;
​@Overridepublic void addTask(Object key, AbstractDelayTask newTask) {lock.lock();try {AbstractDelayTask existTask = tasks.get(key);if (null != existTask) {newTask.merge(existTask);}// 将任务放入 ConcurrentHashMap 中tasks.put(key, newTask);} finally {lock.unlock();}}
}

该方法流程结束,最终就是将任务放入 map 中。

放入了任务,那就得有取出来处理的流程。

还是同样的类,看看它的构造初始化方法

public class NacosDelayTaskExecuteEngine extends AbstractNacosTaskExecuteEngine {private final ScheduledExecutorService processingExecutor;protected final ConcurrentHashMap tasks;public NacosDelayTaskExecuteEngine(String name, int initCapacity, Logger logger, long processInterval) {super(logger);// map 初始化tasks = new ConcurrentHashMap<>(initCapacity);// 创建定时线程池processingExecutor = ExecutorFactory.newSingleScheduledExecutorService(new NameThreadFactory(name));// 定时线程池开始执行processingExecutor.scheduleWithFixedDelay(new ProcessRunnable(), processInterval, processInterval, TimeUnit.MILLISECONDS);}
}

重点在线程池的执行这里

参数 processInterval 为 100L,也就代表延迟 100毫秒后每隔100毫秒执行一次

接着 看看 ProcessRunnable ,这个定时任务的内容:

public class NacosDelayTaskExecuteEngine extends AbstractNacosTaskExecuteEngine {private class ProcessRunnable implements Runnable {@Overridepublic void run() {try {// 处理任务processTasks();} catch (Throwable e) {getEngineLog().error(e.toString(), e);}}}  // 处理任务protected void processTasks() {Collection keys = getAllTaskKeys();for (Object taskKey : keys) {AbstractDelayTask task = removeTask(taskKey);if (null == task) {continue;}// 获取到任务的处理器NacosTaskProcessor processor = getProcessor(taskKey);if (null == processor) {getEngineLog().error("processor not found for task, so discarded. " + task);continue;}try {// 开始处理任务if (!processor.process(task)) {// 如果任务返回 false 失败,就重试retryFailedTask(taskKey, task);}} catch (Throwable e) {getEngineLog().error("Nacos task execute error ", e);retryFailedTask(taskKey, task);}}}    
} 

上述代码找出当前任务的 processor 后,就继续 process 处理

看看它怎么处理的:

public class DumpChangeProcessor implements NacosTaskProcessor {@Overridepublic boolean process(NacosTask task) {long startUpdateMd5 = System.currentTimeMillis();// 从数据库查询全部配置的 MD5 值List updateMd5List = configInfoPersistService.listAllGroupKeyMd5();for (ConfigInfoWrapper config : updateMd5List) {final String groupKey = GroupKey2.getKey(config.getDataId(), config.getGroup());// 更新不一样的 MD5 值,并且发布 LocalDataChangeEvent 本地数据改变事件ConfigCacheService.updateMd5(groupKey, config.getMd5(), config.getLastModified(), config.getEncryptedDataKey());}
​// 代码省略return true;}
}

重点看 updateMd5 方法

// com.alibaba.nacos.config.server.service.ConfigCacheService#updateMd5
public static void updateMd5(String groupKey, String md5, long lastModifiedTs, String encryptedDataKey){CacheItem cache = makeSure(groupKey, encryptedDataKey, false);if (cache.md5 == null || !cache.md5.equals(md5)) {// 更新本地缓存信息cache.md5 = md5;cache.lastModifiedTs = lastModifiedTs;
​// 发布事件:本地数据改变NotifyCenter.publishEvent(new LocalDataChangeEvent(groupKey));}
}

如果比对出 md5 不一致,代表数据发生了改变,就继续发布 LocalDataChangeEvent 事件。

看看 LocalDataChangeEvent 处理方法:

public class RpcConfigChangeNotifier extends Subscriber {@Overridepublic void onEvent(LocalDataChangeEvent event) {String groupKey = event.groupKey;boolean isBeta = event.isBeta;List betaIps = event.betaIps;String[] strings = GroupKey.parseKey(groupKey);String dataId = strings[0];String group = strings[1];String tenant = strings.length > 2 ? strings[2] : "";String tag = event.tag;// 配置数据改变configDataChanged(groupKey, dataId, group, tenant, isBeta, betaIps, tag);}// 配置数据改变public void configDataChanged(String groupKey, String dataId, String group, String tenant, boolean isBeta,List betaIps, String tag) {// 获取到当前配置的所有监听器Set listeners = configChangeListenContext.getListeners(groupKey);if (CollectionUtils.isEmpty(listeners)) {return;}int notifyClientCount = 0;// 循环推送for (final String client : listeners) {Connection connection = connectionManager.getConnection(client);if (connection == null) {continue;}ConnectionMeta metaInfo = connection.getMetaInfo();// beta ips check.String clientIp = metaInfo.getClientIp();String clientTag = metaInfo.getTag();if (isBeta && betaIps != null && !betaIps.contains(clientIp)) {continue;}// tag checkif (StringUtils.isNotBlank(tag) && !tag.equals(clientTag)) {continue;}// 构建一个请求对象ConfigChangeNotifyRequest notifyRequest = ConfigChangeNotifyRequest.build(dataId, group, tenant);// 创建一个 RPC 推送任务RpcPushTask rpcPushRetryTask = new RpcPushTask(notifyRequest, 50, client, clientIp, metaInfo.getAppName());// 开始推送push(rpcPushRetryTask);// 推送客户端次数累计notifyClientCount++;}Loggers.REMOTE_PUSH.info("push [{}] clients ,groupKey=[{}]", notifyClientCount, groupKey);}    
}

重点当前是最后 push 方法了 :

// com.alibaba.nacos.config.server.remote.RpcConfigChangeNotifier#push
​/*** 推送一个任务*/private void push(RpcPushTask retryTask) {// 获取到请求ConfigChangeNotifyRequest notifyRequest = retryTask.notifyRequest;if (retryTask.isOverTimes()) {// 已经超过重试次数Loggers.REMOTE_PUSH.warn("push callback retry fail over times .dataId={},group={},tenant={},clientId={},will unregister client.", notifyRequest.getDataId(), notifyRequest.getGroup(), notifyRequest.getTenant(), retryTask.connectionId);connectionManager.unregister(retryTask.connectionId);} else if (connectionManager.getConnection(retryTask.connectionId) != null) {// 连接还存在,客户端还在线// 开始使用线程池执行任务ConfigExecutor.getClientConfigNotifierServiceExecutor().schedule(retryTask, retryTask.tryTimes * 2, TimeUnit.SECONDS);} else {// client is already offline, ignore task.}}

看到 RpcPushTask 任务放到了线程池中执行,那就看看 RpcPushTask 任务内容:

class RpcPushTask implements Runnable {/*** 请求对象*/ConfigChangeNotifyRequest notifyRequest;/*** 最大重试次数*/int maxRetryTimes = -1;/*** 已经尝试次数*/int tryTimes = 0;String connectionId;String clientIp;String appName;public RpcPushTask(ConfigChangeNotifyRequest notifyRequest, int maxRetryTimes, String connectionId,String clientIp, String appName) {this.notifyRequest = notifyRequest;this.maxRetryTimes = maxRetryTimes;this.connectionId = connectionId;this.clientIp = clientIp;this.appName = appName;}/*** 是否超过重试次数 true 超过*/public boolean isOverTimes() {return maxRetryTimes > 0 && this.tryTimes >= maxRetryTimes;}@Overridepublic void run() {// 已尝试次数 +1tryTimes++;TpsCheckRequest tpsCheckRequest = new TpsCheckRequest();tpsCheckRequest.setPointName(POINT_CONFIG_PUSH);if (!tpsControlManager.check(tpsCheckRequest).isSuccess()) {// 检查失败了push(this);} else {// ======================================//  pushWithCallback 推送给客户端 !!// ======================================rpcPushService.pushWithCallback(connectionId, notifyRequest, new AbstractPushCallBack(3000L) {@Overridepublic void onSuccess() {TpsCheckRequest tpsCheckRequest = new TpsCheckRequest();tpsCheckRequest.setPointName(POINT_CONFIG_PUSH_SUCCESS);tpsControlManager.check(tpsCheckRequest);}@Overridepublic void onFail(Throwable e) {TpsCheckRequest tpsCheckRequest = new TpsCheckRequest();tpsCheckRequest.setPointName(POINT_CONFIG_PUSH_FAIL);tpsControlManager.check(tpsCheckRequest);Loggers.REMOTE_PUSH.warn("Push fail", e);push(RpcPushTask.this);}}, ConfigExecutor.getClientConfigNotifierServiceExecutor());}}}
}

可见,这里完成了 服务端 向客户端推送数据

查看请求字段,可见,并没有推送配置变更的内容,从上一章就可以看出,客户端收到推送的消息后,会主动对变更的数据进行拉取操作

public class ConfigChangeNotifyRequest extends ServerRequest {String dataId;String group;String tenant;
}

下一篇讲解 SpringCloud 和 Nacos 配置中心整合源码

相关内容

热门资讯

喜欢穿一身黑的男生性格(喜欢穿... 今天百科达人给各位分享喜欢穿一身黑的男生性格的知识,其中也会对喜欢穿一身黑衣服的男人人好相处吗进行解...
发春是什么意思(思春和发春是什... 本篇文章极速百科给大家谈谈发春是什么意思,以及思春和发春是什么意思对应的知识点,希望对各位有所帮助,...
网络用语zl是什么意思(zl是... 今天给各位分享网络用语zl是什么意思的知识,其中也会对zl是啥意思是什么网络用语进行解释,如果能碰巧...
为什么酷狗音乐自己唱的歌不能下... 本篇文章极速百科小编给大家谈谈为什么酷狗音乐自己唱的歌不能下载到本地?,以及为什么酷狗下载的歌曲不是...
华为下载未安装的文件去哪找(华... 今天百科达人给各位分享华为下载未安装的文件去哪找的知识,其中也会对华为下载未安装的文件去哪找到进行解...
怎么往应用助手里添加应用(应用... 今天百科达人给各位分享怎么往应用助手里添加应用的知识,其中也会对应用助手怎么添加微信进行解释,如果能...
家里可以做假山养金鱼吗(假山能... 今天百科达人给各位分享家里可以做假山养金鱼吗的知识,其中也会对假山能放鱼缸里吗进行解释,如果能碰巧解...
四分五裂是什么生肖什么动物(四... 本篇文章极速百科小编给大家谈谈四分五裂是什么生肖什么动物,以及四分五裂打一生肖是什么对应的知识点,希...
一帆风顺二龙腾飞三阳开泰祝福语... 本篇文章极速百科给大家谈谈一帆风顺二龙腾飞三阳开泰祝福语,以及一帆风顺二龙腾飞三阳开泰祝福语结婚对应...
美团联名卡审核成功待激活(美团... 今天百科达人给各位分享美团联名卡审核成功待激活的知识,其中也会对美团联名卡审核未通过进行解释,如果能...