【微服务】Nacos通知客户端服务变更以及重试机制
创始人
2024-01-21 03:04:54
0

💖Spring家族源码解析及微服务系列

✨【微服务】Nacos服务发现源码分析

✨【微服务】SpringBoot监听器机制以及在Nacos中的应用

✨【微服务】Nacos客户端微服务注册原理流程

✨【微服务】SpringCloud中使用Ribbon实现负载均衡的原理

✨【微服务】SpringBoot启动流程注册FeignClient

✨【微服务】SpringBoot启动流程初始化OpenFeign的入口

✨Spring Bean的生命周期

✨Spring事务原理

✨SpringBoot自动装配原理机制及过程

✨SpringBoot获取处理器流程

✨SpringBoot中处理器映射关系注册流程

✨Spring5.x中Bean初始化流程

✨Spring中Bean定义的注册流程

目录

💖前言

💖通知客户端服务变更以及重试机制

✨流程图

✨Service的updateIPs

✨UdpPushService组件

💫onApplicationEvent()处理事件

💫UDP推送客户端

✨UDP推送ACK

💫重试任务

✨取消重试

💫初始化Receiver并启动一个线程

💫Receiver接收器

💖总结


💖前言

    本篇文章将讲解如下内容:服务端是如何通知客户端服务变更的、使用了什么协议、推送失败如何处理、推送给谁?如果需要重试,怎么判断的,如何避免多次重试?

💖通知客户端服务变更以及重试机制

✨流程图

 

✨Service的updateIPs

Service的逻辑在之前已经详细解析,这里只是提一下。重点看下面逻辑:

UdpPushService组件

这里在Spring的事件机制中已经详细分析,感兴趣的读者可以回去翻看一下。

💫onApplicationEvent()处理事件

    @Overridepublic void onApplicationEvent(ServiceChangeEvent event) {// If upgrade to 2.0.X, do not push for v1.如果升级到2.0.X,不要推动 v1。if (ApplicationUtils.getBean(UpgradeJudgement.class).isUseGrpcFeatures()) {return;}// 从事件获取服务Service service = event.getService();// 服务名String serviceName = service.getName();// 命名空间String namespaceId = service.getNamespaceId();//merge some change events to reduce the push frequency:// 合并一些更改事件以减少推送频率:if (futureMap.containsKey(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName))) {return;}Future future = GlobalExecutor.scheduleUdpSender(() -> {try {// 服务已变更,把它添加到推送队列Loggers.PUSH.info(serviceName + " is changed, add it to push queue.");// nacos服务端给每个客户端实例推送udp包时,该实例就是一个udp客户端,// clientMap中存放的就是这些udp客户端信息ConcurrentMap clients = subscriberServiceV1.getClientMap().get(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName));if (MapUtils.isEmpty(clients)) {return;}Map cache = new HashMap<>(16);long lastRefTime = System.nanoTime();for (PushClient client : clients.values()) {if (client.zombie()) {Loggers.PUSH.debug("client is zombie: " + client);clients.remove(client.toString());Loggers.PUSH.debug("client is zombie: " + client);continue;}AckEntry ackEntry;Loggers.PUSH.debug("push serviceName: {} to client: {}", serviceName, client);String key = getPushCacheKey(serviceName, client.getIp(), client.getAgent());byte[] compressData = null;Map data = null;// switchDomain.getDefaultPushCacheMillis()默认是10秒,// 即10000毫秒,不会进入这个分支,所以compressData = nullif (switchDomain.getDefaultPushCacheMillis() >= 20000 && cache.containsKey(key)) {org.javatuples.Pair pair = (org.javatuples.Pair) cache.get(key);compressData = (byte[]) (pair.getValue0());data = (Map) pair.getValue1();Loggers.PUSH.debug("[PUSH-CACHE] cache hit: {}:{}", serviceName, client.getAddrStr());}// 组装ack数据if (compressData != null) {ackEntry = prepareAckEntry(client, compressData, data, lastRefTime);} else {ackEntry = prepareAckEntry(client, prepareHostsData(client), lastRefTime);if (ackEntry != null) {cache.put(key,new org.javatuples.Pair<>(ackEntry.getOrigin().getData(), ackEntry.getData()));}}Loggers.PUSH.info("serviceName: {} changed, schedule push for: {}, agent: {}, key: {}",client.getServiceName(), client.getAddrStr(), client.getAgent(),(ackEntry == null ? null : ackEntry.getKey()));// UDP推送客户端udpPush(ackEntry);}} catch (Exception e) {Loggers.PUSH.error("[NACOS-PUSH] failed to push serviceName: {} to client, error: {}", serviceName, e);} finally {futureMap.remove(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName));}}, 1000, TimeUnit.MILLISECONDS);// 合并一些更改事件以减少推送频率,上面代码判断futureMap.put(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName), future);}

主要逻辑:

  1. 从事件获取服务,然后获取服务名、命名空间
  2. 更改事件是否已经合并,已经推送不再推送
  3. 获取订阅客户端,没有订阅者则return。nacos服务端给每个客户端实例推送udp包时,该实例就是一个udp客户端,clientMap中存放的就是这些udp客户端信息
  4. 组装ack数据,UDP通知订阅的客户端
  5. 合并一些更改事件以减少推送频率,上面代码判断

💫UDP推送客户端

    private static AckEntry udpPush(AckEntry ackEntry) {if (ackEntry == null) {Loggers.PUSH.error("[NACOS-PUSH] ackEntry is null.");return null;}// 初始化是0,如果重试次数大于MAX_RETRY_TIMES=1次,就不再发送udp包了if (ackEntry.getRetryTimes() > Constants.UDP_MAX_RETRY_TIMES) {Loggers.PUSH.warn("max re-push times reached, retry times {}, key: {}", ackEntry.getRetryTimes(),ackEntry.getKey());// 清理数据ackMap.remove(ackEntry.getKey());udpSendTimeMap.remove(ackEntry.getKey());MetricsMonitor.incrementFailPush();return ackEntry;}try {if (!ackMap.containsKey(ackEntry.getKey())) {MetricsMonitor.incrementPush();}// 填充数据预备重试ackMap.put(ackEntry.getKey(), ackEntry);udpSendTimeMap.put(ackEntry.getKey(), System.currentTimeMillis());Loggers.PUSH.info("send udp packet: " + ackEntry.getKey());// UDP推送客户端服务发生变更udpSocket.send(ackEntry.getOrigin());// 重试次数原子加1,以防多次推送ackEntry.increaseRetryTime();// 10秒内未接收到客户端UDP推送ack则重试GlobalExecutor.scheduleRetransmitter(new Retransmitter(ackEntry),TimeUnit.NANOSECONDS.toMillis(Constants.ACK_TIMEOUT_NANOS), TimeUnit.MILLISECONDS);return ackEntry;} catch (Exception e) {Loggers.PUSH.error("[NACOS-PUSH] failed to push data: {} to client: {}, error: {}", ackEntry.getData(),ackEntry.getOrigin().getAddress().getHostAddress(), e);ackMap.remove(ackEntry.getKey());udpSendTimeMap.remove(ackEntry.getKey());MetricsMonitor.incrementFailPush();return null;}}

主要逻辑:

  1. 初始化是0,如果重试次数大于MAX_RETRY_TIMES=1次,就不再发送udp包了
  2. ackMap填充数据预备重试
  3. UDP推送通知客户端服务发生变更
  4. 重试次数原子加1,以防多次推送
  5. 启动延迟任务10秒内未接收到客户端UDP推送ack重试

✨UDP推送ACK

    这里是client模块的PushReceiver#run()逻辑,本质上就是我们微服务自己,也就是客户端。这里只是提取一段重要逻辑要保持连贯性,上一篇已具体分析。

    @Overridepublic void run() {while (!closed) {try {// byte[] is initialized with 0 full filled by defaultbyte[] buffer = new byte[UDP_MSS];DatagramPacket packet = new DatagramPacket(buffer, buffer.length);// 监听Nacos服务端服务实例信息变更后的通知udpSocket.receive(packet);String json = new String(IoUtils.tryDecompress(packet.getData()), UTF_8).trim();NAMING_LOGGER.info("received push data: " + json + " from " + packet.getAddress().toString());PushPacket pushPacket = JacksonUtils.toObj(json, PushPacket.class);String ack;if (PUSH_PACKAGE_TYPE_DOM.equals(pushPacket.type) || PUSH_PACKAGE_TYPE_SERVICE.equals(pushPacket.type)) {serviceInfoHolder.processServiceInfo(pushPacket.data);// send ack to server发送ack到服务器ack = "{\"type\": \"push-ack\"" + ", \"lastRefTime\":\"" + pushPacket.lastRefTime + "\", \"data\":"+ "\"\"}";} else if (PUSH_PACKAGE_TYPE_DUMP.equals(pushPacket.type)) {// dump data to server将数据转储到服务器ack = "{\"type\": \"dump-ack\"" + ", \"lastRefTime\": \"" + pushPacket.lastRefTime + "\", \"data\":"+ "\"" + StringUtils.escapeJavaScript(JacksonUtils.toJson(serviceInfoHolder.getServiceInfoMap()))+ "\"}";} else {// do nothing send ack only仅仅发送ackack = "{\"type\": \"unknown-ack\"" + ", \"lastRefTime\":\"" + pushPacket.lastRefTime+ "\", \"data\":" + "\"\"}";}// 发送ack到服务端udpSocket.send(new DatagramPacket(ack.getBytes(UTF_8), ack.getBytes(UTF_8).length,packet.getSocketAddress()));} catch (Exception e) {if (closed) {return;}NAMING_LOGGER.error("[NA] error while receiving push data", e);}}}

主要逻辑:

  1. 监听Nacos服务端服务实例信息变更后的通知
  2. 解析注册中心推送的结果,组装回调ack报文,将注册中心推送的变更服务信息缓存到本地serviceInfoMap
  3. UDP推送ack到注册中心,以便注册中心决定是否需要重试

💫重试任务

    public static class Retransmitter implements Runnable {AckEntry ackEntry;public Retransmitter(AckEntry ackEntry) {this.ackEntry = ackEntry;}@Overridepublic void run() {// 重试推送if (ackMap.containsKey(ackEntry.getKey())) {Loggers.PUSH.info("retry to push data, key: " + ackEntry.getKey());udpPush(ackEntry);}}}

    这里逻辑很简单,如果在10秒收到来自客户端的ACK通知,那么这里的ackMap就匹配不到ackKey,就不会重试了;否则重试UDP推送客户端。

✨取消重试

💫初始化Receiver并启动一个线程

    static {try {// 初始化套接字udpSocket = new DatagramSocket();// 初始化接收客户端UDP通知的UDP接收器Receiver receiver = new Receiver();// 启动一个线程Thread inThread = new Thread(receiver);inThread.setDaemon(true);inThread.setName("com.alibaba.nacos.naming.push.receiver");inThread.start();} catch (SocketException e) {Loggers.SRV_LOG.error("[NACOS-PUSH] failed to init push service");}}

    UdpPushService在构造初始化时就初始化接收客户端UDP通知的UDP接收器了。Receiver 实现了Runnable接口,所以启动一个线程执行该任务。任务逻辑如下面所示:

💫Receiver接收器

public static class Receiver implements Runnable {@Overridepublic void run() {while (true) {byte[] buffer = new byte[1024 * 64];DatagramPacket packet = new DatagramPacket(buffer, buffer.length);try {// 监听客户端ACK确认udpSocket.receive(packet);String json = new String(packet.getData(), 0, packet.getLength(), StandardCharsets.UTF_8).trim();AckPacket ackPacket = JacksonUtils.toObj(json, AckPacket.class);InetSocketAddress socketAddress = (InetSocketAddress) packet.getSocketAddress();String ip = socketAddress.getAddress().getHostAddress();int port = socketAddress.getPort();// 接受到ACK响应的时间距离上次接受到的时间之差如果大于10秒// ACK_TIMEOUT_NANOS = TimeUnit.SECONDS.toNanos(10L)if (System.nanoTime() - ackPacket.lastRefTime > Constants.ACK_TIMEOUT_NANOS) {Loggers.PUSH.warn("ack takes too long from {} ack json: {}", packet.getSocketAddress(), json);}String ackKey = AckEntry.getAckKey(ip, port, ackPacket.lastRefTime);// 返回的是删除健的值// 这一删除就意味着不需要重试了即Retransmitter的run()中匹配不到该key// 如果超过10秒客户端未UDP推送ack还是可能重试的AckEntry ackEntry = ackMap.remove(ackKey);if (ackEntry == null) {throw new IllegalStateException("unable to find ackEntry for key: " + ackKey + ", ack json: " + json);}// 每个数据包的耗时long pushCost = System.currentTimeMillis() - udpSendTimeMap.get(ackKey);Loggers.PUSH.info("received ack: {} from: {}:{}, cost: {} ms, unacked: {}, total push: {}", json, ip,port, pushCost, ackMap.size(), MetricsMonitor.getTotalPushMonitor().get());MetricsMonitor.incrementPushCost(pushCost);udpSendTimeMap.remove(ackKey);} catch (Throwable e) {Loggers.PUSH.error("[NACOS-PUSH] error while receiving ack data", e);}}}

主要逻辑:

  1. 监听客户端ACK确认
  2. 接受到ACK响应的时间距离上次接受到的时间之差如果大于10秒,打印警告
  3. 拼接ackKey,然后根据ackKey删除健的值。这一删除就意味着不需要重试了即Retransmitterrun()中匹配不到该ackKey,如果超过10秒客户端未UDP推送ack还是可能重试的
  4. 每个数据包的耗时,打印接收到ack日志,清理数据等

💖总结

采用UDP通知客户端,客户端使用PushReceiver监听服务端UDP通知,监听到则缓存到本地,组装ACK数据UDP通知服务端;启动延迟任务,10秒没有接收到ACK则重试;Receiver接收器监听客户端ACK推送,接收到则移除ackKey及其value。

相关内容

热门资讯

喜欢穿一身黑的男生性格(喜欢穿... 今天百科达人给各位分享喜欢穿一身黑的男生性格的知识,其中也会对喜欢穿一身黑衣服的男人人好相处吗进行解...
发春是什么意思(思春和发春是什... 本篇文章极速百科给大家谈谈发春是什么意思,以及思春和发春是什么意思对应的知识点,希望对各位有所帮助,...
网络用语zl是什么意思(zl是... 今天给各位分享网络用语zl是什么意思的知识,其中也会对zl是啥意思是什么网络用语进行解释,如果能碰巧...
为什么酷狗音乐自己唱的歌不能下... 本篇文章极速百科小编给大家谈谈为什么酷狗音乐自己唱的歌不能下载到本地?,以及为什么酷狗下载的歌曲不是...
华为下载未安装的文件去哪找(华... 今天百科达人给各位分享华为下载未安装的文件去哪找的知识,其中也会对华为下载未安装的文件去哪找到进行解...
怎么往应用助手里添加应用(应用... 今天百科达人给各位分享怎么往应用助手里添加应用的知识,其中也会对应用助手怎么添加微信进行解释,如果能...
家里可以做假山养金鱼吗(假山能... 今天百科达人给各位分享家里可以做假山养金鱼吗的知识,其中也会对假山能放鱼缸里吗进行解释,如果能碰巧解...
一帆风顺二龙腾飞三阳开泰祝福语... 本篇文章极速百科给大家谈谈一帆风顺二龙腾飞三阳开泰祝福语,以及一帆风顺二龙腾飞三阳开泰祝福语结婚对应...
四分五裂是什么生肖什么动物(四... 本篇文章极速百科小编给大家谈谈四分五裂是什么生肖什么动物,以及四分五裂打一生肖是什么对应的知识点,希...
美团联名卡审核成功待激活(美团... 今天百科达人给各位分享美团联名卡审核成功待激活的知识,其中也会对美团联名卡审核未通过进行解释,如果能...