目录
1. 配置引起变更的两种方式
1.1 后台管理直接操作
1.2 NacosClient 调用 RPC 接口
2. 变更事件处理 AsyncNotifyService
2.1 HTTP 任务
2.2 RPC任务
2.3 NacosServer 其他节点接收到消息后如何处理
3. 客户端推送实现:DumpService.dump
接着上一篇 Nacos 配置中心源码讲解 继续
当配置发生变更时,NacosServer 会对配置变更的客户端主动推送消息。
那么 Nacos 是如何实现的呢?
配置变更?配置如何才会产生变化呢?有两种方式
在后台管理修改配置并发布时就发生了变更,而在发布配置接口上可见以下源码:
com.alibaba.nacos.config.server.service.ConfigOperationService#publishConfig
ConfigChangePublisher.notifyConfigChange(new ConfigDataChangeEvent(false, configForm.getDataId(), configForm.getGroup(),configForm.getNamespaceId(), time.getTime()));
这段代码代表发布了一个事件:ConfigDataChangeEvent 配置数据改变事件。
NacosClient 操作发布配置
NacosServer端源码
com.alibaba.nacos.config.server.remote.ConfigPublishRequestHandler#handle
可见 RPC Server 端,最终同样会发布 ConfigDataChangeEvent 配置数据改变事件。
事件发出了,那就得有人处理
在 Nacos 中,负责处理的类就是 com.alibaba.nacos.config.server.service.notify.AsyncNotifyService
可见在 AsyncNotifyService 类构造器中,注册了专门处理 ConfigDataChangeEvent 的消费者,onEvent 方法将回调所有发送过来的事件。
看看 onEvent 如何处理 ConfigDataChangeEvent 事件的:
@Override
public void onEvent(Event event) {ConfigDataChangeEvent evt = (ConfigDataChangeEvent) event;// 获取到 event 上携带的数据long dumpTs = evt.lastModifiedTs;String dataId = evt.dataId;String group = evt.group;String tenant = evt.tenant;String tag = evt.tag;// 统计数据累加MetricsMonitor.incrementConfigChangeCount(tenant, group, dataId);// 获取到整个集群中的所有成员节点Collection ipList = memberManager.allMembers();// 任务队列,一个 HTTP 请求任务、一个 RPC 请求任务Queue httpQueue = new LinkedList<>();Queue rpcQueue = new LinkedList<>();// 循环所有成员,决定好每个成员使用什么类型任务for (Member member : ipList) {if (!MemberUtil.isSupportedLongCon(member)) {// 当前集群成员节点是不是支持长连接,不支持放入 HTTP 队列里httpQueue.add(new NotifySingleTask(dataId, group, tenant, tag, dumpTs, member.getAddress(),evt.isBeta));} else {// 支持长连接放入 RPC 队列里rpcQueue.add(new NotifySingleRpcTask(dataId, group, tenant, tag, dumpTs, evt.isBeta, member));}}// 也就是说,上面根据每个成员的特点创建了他们使用的任务类型的 task// 现在开始使用小程序执行两个队列里的任务if (!httpQueue.isEmpty()) {ConfigExecutor.executeAsyncNotify(new AsyncTask(nacosAsyncRestTemplate, httpQueue));}
if (!rpcQueue.isEmpty()) {ConfigExecutor.executeAsyncNotify(new AsyncRpcTask(rpcQueue));}}
任务分为两种类型, HTTP 任务、 RPC 任务
首先先看简单一点的 HTTP 任务, 见上方 onEvent 源码,HTTP 任务传入了新建的 AsyncTask 对象里
看看 AsyncTask 内部做了什么:
class AsyncTask implements Runnable {private Queue queue;private NacosAsyncRestTemplate restTemplate;public AsyncTask(NacosAsyncRestTemplate restTemplate, Queue queue) {this.restTemplate = restTemplate;this.queue = queue;}@Overridepublic void run() {executeAsyncInvoke();}private void executeAsyncInvoke() {while (!queue.isEmpty()) {// 从队列中取出一个任务NotifySingleTask task = queue.poll();String targetIp = task.getTargetIP();if (memberManager.hasMember(targetIp)) {// 进来了代表节点还在线boolean unHealthNeedDelay = memberManager.isUnHealth(targetIp);if (unHealthNeedDelay) {// 节点不健康了,任务延迟执行ConfigTraceService.logNotifyEvent(task.getDataId(), task.getGroup(), task.getTenant(), null,task.getLastModified(), InetUtils.getSelfIP(), ConfigTraceService.NOTIFY_EVENT_UNHEALTH,0, task.target);// 延迟执行任务asyncTaskExecute(task);} else {// 构建请求头Header header = Header.newInstance();header.addParam(NotifyService.NOTIFY_HEADER_LAST_MODIFIED,String.valueOf(task.getLastModified()));header.addParam(NotifyService.NOTIFY_HEADER_OP_HANDLE_IP, InetUtils.getSelfIP());
if (task.isBeta) {header.addParam("isBeta", "true");}// 添加 认证 请求头AuthHeaderUtil.addIdentityToHeader(header);// 发送 HTTP 请求restTemplate.get(task.url, header, Query.EMPTY, String.class, new AsyncNotifyCallBack(task));}}}}}
class AsyncRpcTask implements Runnable {private Queue queue;public AsyncRpcTask(Queue queue) {this.queue = queue;}@Overridepublic void run() {while (!queue.isEmpty()) {// 取出一个任务NotifySingleRpcTask task = queue.poll();// 构建一个请求对象ConfigChangeClusterSyncRequest syncRequest = new ConfigChangeClusterSyncRequest();syncRequest.setDataId(task.getDataId());syncRequest.setGroup(task.getGroup());syncRequest.setBeta(task.isBeta);syncRequest.setLastModified(task.getLastModified());syncRequest.setTag(task.tag);syncRequest.setTenant(task.getTenant());Member member = task.member;if (memberManager.getSelf().equals(member)) {// 节点如果是 自己if (syncRequest.isBeta()) {dumpService.dump(syncRequest.getDataId(), syncRequest.getGroup(), syncRequest.getTenant(),syncRequest.getLastModified(), NetUtils.localIP(), true);} else {dumpService.dump(syncRequest.getDataId(), syncRequest.getGroup(), syncRequest.getTenant(),syncRequest.getTag(), syncRequest.getLastModified(), NetUtils.localIP());}continue;}if (memberManager.hasMember(member.getAddress())) {// 节点还没下线boolean unHealthNeedDelay = memberManager.isUnHealth(member.getAddress());if (unHealthNeedDelay) {// 节点不健康了,任务延迟执行ConfigTraceService.logNotifyEvent(task.getDataId(), task.getGroup(), task.getTenant(), null,task.getLastModified(), InetUtils.getSelfIP(), ConfigTraceService.NOTIFY_EVENT_UNHEALTH,0, member.getAddress());// 延迟执行任务asyncTaskExecute(task);} else {if (!MemberUtil.isSupportedLongCon(member)) {// 如果当前节点不支持长连接,就使用 HTTP 方式// 上面判断过长连接,这里再判断是因为不止上面一种方式创建这个任务asyncTaskExecute(new NotifySingleTask(task.getDataId(), task.getGroup(), task.getTenant(), task.tag,task.getLastModified(), member.getAddress(), task.isBeta));} else {try {// 发送 RPC 请求configClusterRpcClientProxy.syncConfigChange(member, syncRequest, new AsyncRpcNotifyCallBack(task));} catch (Exception e) {// 统计数据维护MetricsMonitor.getConfigNotifyException().increment();asyncTaskExecute(task);}}}} else {}}}
}
以上两种方式本质都是发送了一个请求,HTTP 方式发送了HTTP 请求,RPC 长连接方式发送了一个 RPC 请求,并且专门处理了当前节点的逻辑。
看看 HTTP 方式调用的 接口:
/communication/dataChange
最后调用了 dumpService.dump 方法
RPC 方式发送的请求后,以下是接收端的处理方法:
发送请求时判断了如果是当前节点则进行的处理:
最后调用了 dumpService.dump 方法
画个图总结下现在的情况:
为什么配置变更了要通知集群中的全部节点呢?
这里并不是同步什么数据,而是因为客户端可能注册到不同的集群节点上,而如果只推送当前节点上注册的客户端,那会导致其他客户端明明也监听了某个配置,但是配置变化了却无法推送过来。
因为集群中的每个节点可能都接受不同客户端的连接
DumpService.dump
这个方法就是关键之处了, 因为它完成了向客户端推送功能,看看如何实现:
// com.alibaba.nacos.config.server.service.dump.DumpService#dump(java.lang.String, java.lang.String, java.lang.String, long, java.lang.String, boolean)
private TaskManager dumpTaskMgr;
public void dump(String dataId, String group, String tenant, long lastModified, String handleIp, boolean isBeta) {String groupKey = GroupKey2.getKey(dataId, group, tenant);String taskKey = String.join("+", dataId, group, tenant, String.valueOf(isBeta));dumpTaskMgr.addTask(taskKey, new DumpTask(groupKey, lastModified, handleIp, isBeta));DUMP_LOG.info("[dump-task] add task. groupKey={}, taskKey={}", groupKey, taskKey);
}
最终调用了以下代码:
public class NacosDelayTaskExecuteEngine extends AbstractNacosTaskExecuteEngine {
protected final ConcurrentHashMap
该方法流程结束,最终就是将任务放入 map 中。
放入了任务,那就得有取出来处理的流程。
还是同样的类,看看它的构造初始化方法
public class NacosDelayTaskExecuteEngine extends AbstractNacosTaskExecuteEngine {private final ScheduledExecutorService processingExecutor;protected final ConcurrentHashMap
重点在线程池的执行这里
参数 processInterval 为 100L,也就代表延迟 100毫秒后每隔100毫秒执行一次
接着 看看 ProcessRunnable ,这个定时任务的内容:
public class NacosDelayTaskExecuteEngine extends AbstractNacosTaskExecuteEngine {private class ProcessRunnable implements Runnable {@Overridepublic void run() {try {// 处理任务processTasks();} catch (Throwable e) {getEngineLog().error(e.toString(), e);}}} // 处理任务protected void processTasks() {Collection
上述代码找出当前任务的 processor 后,就继续 process 处理
看看它怎么处理的:
public class DumpChangeProcessor implements NacosTaskProcessor {@Overridepublic boolean process(NacosTask task) {long startUpdateMd5 = System.currentTimeMillis();// 从数据库查询全部配置的 MD5 值List updateMd5List = configInfoPersistService.listAllGroupKeyMd5();for (ConfigInfoWrapper config : updateMd5List) {final String groupKey = GroupKey2.getKey(config.getDataId(), config.getGroup());// 更新不一样的 MD5 值,并且发布 LocalDataChangeEvent 本地数据改变事件ConfigCacheService.updateMd5(groupKey, config.getMd5(), config.getLastModified(), config.getEncryptedDataKey());}
// 代码省略return true;}
}
重点看 updateMd5 方法
// com.alibaba.nacos.config.server.service.ConfigCacheService#updateMd5
public static void updateMd5(String groupKey, String md5, long lastModifiedTs, String encryptedDataKey){CacheItem cache = makeSure(groupKey, encryptedDataKey, false);if (cache.md5 == null || !cache.md5.equals(md5)) {// 更新本地缓存信息cache.md5 = md5;cache.lastModifiedTs = lastModifiedTs;
// 发布事件:本地数据改变NotifyCenter.publishEvent(new LocalDataChangeEvent(groupKey));}
}
如果比对出 md5 不一致,代表数据发生了改变,就继续发布 LocalDataChangeEvent 事件。
看看 LocalDataChangeEvent 处理方法:
public class RpcConfigChangeNotifier extends Subscriber {@Overridepublic void onEvent(LocalDataChangeEvent event) {String groupKey = event.groupKey;boolean isBeta = event.isBeta;List betaIps = event.betaIps;String[] strings = GroupKey.parseKey(groupKey);String dataId = strings[0];String group = strings[1];String tenant = strings.length > 2 ? strings[2] : "";String tag = event.tag;// 配置数据改变configDataChanged(groupKey, dataId, group, tenant, isBeta, betaIps, tag);}// 配置数据改变public void configDataChanged(String groupKey, String dataId, String group, String tenant, boolean isBeta,List betaIps, String tag) {// 获取到当前配置的所有监听器Set listeners = configChangeListenContext.getListeners(groupKey);if (CollectionUtils.isEmpty(listeners)) {return;}int notifyClientCount = 0;// 循环推送for (final String client : listeners) {Connection connection = connectionManager.getConnection(client);if (connection == null) {continue;}ConnectionMeta metaInfo = connection.getMetaInfo();// beta ips check.String clientIp = metaInfo.getClientIp();String clientTag = metaInfo.getTag();if (isBeta && betaIps != null && !betaIps.contains(clientIp)) {continue;}// tag checkif (StringUtils.isNotBlank(tag) && !tag.equals(clientTag)) {continue;}// 构建一个请求对象ConfigChangeNotifyRequest notifyRequest = ConfigChangeNotifyRequest.build(dataId, group, tenant);// 创建一个 RPC 推送任务RpcPushTask rpcPushRetryTask = new RpcPushTask(notifyRequest, 50, client, clientIp, metaInfo.getAppName());// 开始推送push(rpcPushRetryTask);// 推送客户端次数累计notifyClientCount++;}Loggers.REMOTE_PUSH.info("push [{}] clients ,groupKey=[{}]", notifyClientCount, groupKey);}
}
重点当前是最后 push 方法了 :
// com.alibaba.nacos.config.server.remote.RpcConfigChangeNotifier#push
/*** 推送一个任务*/private void push(RpcPushTask retryTask) {// 获取到请求ConfigChangeNotifyRequest notifyRequest = retryTask.notifyRequest;if (retryTask.isOverTimes()) {// 已经超过重试次数Loggers.REMOTE_PUSH.warn("push callback retry fail over times .dataId={},group={},tenant={},clientId={},will unregister client.", notifyRequest.getDataId(), notifyRequest.getGroup(), notifyRequest.getTenant(), retryTask.connectionId);connectionManager.unregister(retryTask.connectionId);} else if (connectionManager.getConnection(retryTask.connectionId) != null) {// 连接还存在,客户端还在线// 开始使用线程池执行任务ConfigExecutor.getClientConfigNotifierServiceExecutor().schedule(retryTask, retryTask.tryTimes * 2, TimeUnit.SECONDS);} else {// client is already offline, ignore task.}}
看到 RpcPushTask 任务放到了线程池中执行,那就看看 RpcPushTask 任务内容:
class RpcPushTask implements Runnable {/*** 请求对象*/ConfigChangeNotifyRequest notifyRequest;/*** 最大重试次数*/int maxRetryTimes = -1;/*** 已经尝试次数*/int tryTimes = 0;String connectionId;String clientIp;String appName;public RpcPushTask(ConfigChangeNotifyRequest notifyRequest, int maxRetryTimes, String connectionId,String clientIp, String appName) {this.notifyRequest = notifyRequest;this.maxRetryTimes = maxRetryTimes;this.connectionId = connectionId;this.clientIp = clientIp;this.appName = appName;}/*** 是否超过重试次数 true 超过*/public boolean isOverTimes() {return maxRetryTimes > 0 && this.tryTimes >= maxRetryTimes;}@Overridepublic void run() {// 已尝试次数 +1tryTimes++;TpsCheckRequest tpsCheckRequest = new TpsCheckRequest();tpsCheckRequest.setPointName(POINT_CONFIG_PUSH);if (!tpsControlManager.check(tpsCheckRequest).isSuccess()) {// 检查失败了push(this);} else {// ======================================// pushWithCallback 推送给客户端 !!// ======================================rpcPushService.pushWithCallback(connectionId, notifyRequest, new AbstractPushCallBack(3000L) {@Overridepublic void onSuccess() {TpsCheckRequest tpsCheckRequest = new TpsCheckRequest();tpsCheckRequest.setPointName(POINT_CONFIG_PUSH_SUCCESS);tpsControlManager.check(tpsCheckRequest);}@Overridepublic void onFail(Throwable e) {TpsCheckRequest tpsCheckRequest = new TpsCheckRequest();tpsCheckRequest.setPointName(POINT_CONFIG_PUSH_FAIL);tpsControlManager.check(tpsCheckRequest);Loggers.REMOTE_PUSH.warn("Push fail", e);push(RpcPushTask.this);}}, ConfigExecutor.getClientConfigNotifierServiceExecutor());}}}
}
可见,这里完成了 服务端 向客户端推送数据
查看请求字段,可见,并没有推送配置变更的内容,从上一章就可以看出,客户端收到推送的消息后,会主动对变更的数据进行拉取操作
public class ConfigChangeNotifyRequest extends ServerRequest {String dataId;String group;String tenant;
}
下一篇讲解 SpringCloud 和 Nacos 配置中心整合源码