✨【微服务】Nacos服务发现源码分析
✨【微服务】SpringBoot监听器机制以及在Nacos中的应用
✨【微服务】Nacos客户端微服务注册原理流程
✨【微服务】SpringCloud中使用Ribbon实现负载均衡的原理
✨【微服务】SpringBoot启动流程注册FeignClient
✨【微服务】SpringBoot启动流程初始化OpenFeign的入口
✨Spring Bean的生命周期
✨Spring事务原理
✨SpringBoot自动装配原理机制及过程
✨SpringBoot获取处理器流程
✨SpringBoot中处理器映射关系注册流程
✨Spring5.x中Bean初始化流程
✨Spring中Bean定义的注册流程
目录
💖前言
💖通知客户端服务变更以及重试机制
✨流程图
✨Service的updateIPs
✨UdpPushService组件
💫onApplicationEvent()处理事件
💫UDP推送客户端
✨UDP推送ACK
💫重试任务
✨取消重试
💫初始化Receiver并启动一个线程
💫Receiver接收器
💖总结
本篇文章将讲解如下内容:服务端是如何通知客户端服务变更的、使用了什么协议、推送失败如何处理、推送给谁?如果需要重试,怎么判断的,如何避免多次重试?
Service的逻辑在之前已经详细解析,这里只是提一下。重点看下面逻辑:
这里在Spring的事件机制中已经详细分析,感兴趣的读者可以回去翻看一下。
@Overridepublic void onApplicationEvent(ServiceChangeEvent event) {// If upgrade to 2.0.X, do not push for v1.如果升级到2.0.X,不要推动 v1。if (ApplicationUtils.getBean(UpgradeJudgement.class).isUseGrpcFeatures()) {return;}// 从事件获取服务Service service = event.getService();// 服务名String serviceName = service.getName();// 命名空间String namespaceId = service.getNamespaceId();//merge some change events to reduce the push frequency:// 合并一些更改事件以减少推送频率:if (futureMap.containsKey(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName))) {return;}Future future = GlobalExecutor.scheduleUdpSender(() -> {try {// 服务已变更,把它添加到推送队列Loggers.PUSH.info(serviceName + " is changed, add it to push queue.");// nacos服务端给每个客户端实例推送udp包时,该实例就是一个udp客户端,// clientMap中存放的就是这些udp客户端信息ConcurrentMap clients = subscriberServiceV1.getClientMap().get(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName));if (MapUtils.isEmpty(clients)) {return;}Map cache = new HashMap<>(16);long lastRefTime = System.nanoTime();for (PushClient client : clients.values()) {if (client.zombie()) {Loggers.PUSH.debug("client is zombie: " + client);clients.remove(client.toString());Loggers.PUSH.debug("client is zombie: " + client);continue;}AckEntry ackEntry;Loggers.PUSH.debug("push serviceName: {} to client: {}", serviceName, client);String key = getPushCacheKey(serviceName, client.getIp(), client.getAgent());byte[] compressData = null;Map data = null;// switchDomain.getDefaultPushCacheMillis()默认是10秒,// 即10000毫秒,不会进入这个分支,所以compressData = nullif (switchDomain.getDefaultPushCacheMillis() >= 20000 && cache.containsKey(key)) {org.javatuples.Pair pair = (org.javatuples.Pair) cache.get(key);compressData = (byte[]) (pair.getValue0());data = (Map) pair.getValue1();Loggers.PUSH.debug("[PUSH-CACHE] cache hit: {}:{}", serviceName, client.getAddrStr());}// 组装ack数据if (compressData != null) {ackEntry = prepareAckEntry(client, compressData, data, lastRefTime);} else {ackEntry = prepareAckEntry(client, prepareHostsData(client), lastRefTime);if (ackEntry != null) {cache.put(key,new org.javatuples.Pair<>(ackEntry.getOrigin().getData(), ackEntry.getData()));}}Loggers.PUSH.info("serviceName: {} changed, schedule push for: {}, agent: {}, key: {}",client.getServiceName(), client.getAddrStr(), client.getAgent(),(ackEntry == null ? null : ackEntry.getKey()));// UDP推送客户端udpPush(ackEntry);}} catch (Exception e) {Loggers.PUSH.error("[NACOS-PUSH] failed to push serviceName: {} to client, error: {}", serviceName, e);} finally {futureMap.remove(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName));}}, 1000, TimeUnit.MILLISECONDS);// 合并一些更改事件以减少推送频率,上面代码判断futureMap.put(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName), future);}
主要逻辑:
- 从事件获取服务,然后获取服务名、命名空间
- 更改事件是否已经合并,已经推送不再推送
- 获取订阅客户端,没有订阅者则return。nacos服务端给每个客户端实例推送udp包时,该实例就是一个udp客户端,clientMap中存放的就是这些udp客户端信息
- 组装ack数据,UDP通知订阅的客户端
- 合并一些更改事件以减少推送频率,上面代码判断
private static AckEntry udpPush(AckEntry ackEntry) {if (ackEntry == null) {Loggers.PUSH.error("[NACOS-PUSH] ackEntry is null.");return null;}// 初始化是0,如果重试次数大于MAX_RETRY_TIMES=1次,就不再发送udp包了if (ackEntry.getRetryTimes() > Constants.UDP_MAX_RETRY_TIMES) {Loggers.PUSH.warn("max re-push times reached, retry times {}, key: {}", ackEntry.getRetryTimes(),ackEntry.getKey());// 清理数据ackMap.remove(ackEntry.getKey());udpSendTimeMap.remove(ackEntry.getKey());MetricsMonitor.incrementFailPush();return ackEntry;}try {if (!ackMap.containsKey(ackEntry.getKey())) {MetricsMonitor.incrementPush();}// 填充数据预备重试ackMap.put(ackEntry.getKey(), ackEntry);udpSendTimeMap.put(ackEntry.getKey(), System.currentTimeMillis());Loggers.PUSH.info("send udp packet: " + ackEntry.getKey());// UDP推送客户端服务发生变更udpSocket.send(ackEntry.getOrigin());// 重试次数原子加1,以防多次推送ackEntry.increaseRetryTime();// 10秒内未接收到客户端UDP推送ack则重试GlobalExecutor.scheduleRetransmitter(new Retransmitter(ackEntry),TimeUnit.NANOSECONDS.toMillis(Constants.ACK_TIMEOUT_NANOS), TimeUnit.MILLISECONDS);return ackEntry;} catch (Exception e) {Loggers.PUSH.error("[NACOS-PUSH] failed to push data: {} to client: {}, error: {}", ackEntry.getData(),ackEntry.getOrigin().getAddress().getHostAddress(), e);ackMap.remove(ackEntry.getKey());udpSendTimeMap.remove(ackEntry.getKey());MetricsMonitor.incrementFailPush();return null;}}
主要逻辑:
- 初始化是0,如果重试次数大于MAX_RETRY_TIMES=1次,就不再发送udp包了
- ackMap填充数据预备重试
- UDP推送通知客户端服务发生变更
- 重试次数原子加1,以防多次推送
- 启动延迟任务,10秒内未接收到客户端UDP推送ack则重试
这里是client模块的PushReceiver#run()逻辑,本质上就是我们微服务自己,也就是客户端。这里只是提取一段重要逻辑要保持连贯性,上一篇已具体分析。
@Overridepublic void run() {while (!closed) {try {// byte[] is initialized with 0 full filled by defaultbyte[] buffer = new byte[UDP_MSS];DatagramPacket packet = new DatagramPacket(buffer, buffer.length);// 监听Nacos服务端服务实例信息变更后的通知udpSocket.receive(packet);String json = new String(IoUtils.tryDecompress(packet.getData()), UTF_8).trim();NAMING_LOGGER.info("received push data: " + json + " from " + packet.getAddress().toString());PushPacket pushPacket = JacksonUtils.toObj(json, PushPacket.class);String ack;if (PUSH_PACKAGE_TYPE_DOM.equals(pushPacket.type) || PUSH_PACKAGE_TYPE_SERVICE.equals(pushPacket.type)) {serviceInfoHolder.processServiceInfo(pushPacket.data);// send ack to server发送ack到服务器ack = "{\"type\": \"push-ack\"" + ", \"lastRefTime\":\"" + pushPacket.lastRefTime + "\", \"data\":"+ "\"\"}";} else if (PUSH_PACKAGE_TYPE_DUMP.equals(pushPacket.type)) {// dump data to server将数据转储到服务器ack = "{\"type\": \"dump-ack\"" + ", \"lastRefTime\": \"" + pushPacket.lastRefTime + "\", \"data\":"+ "\"" + StringUtils.escapeJavaScript(JacksonUtils.toJson(serviceInfoHolder.getServiceInfoMap()))+ "\"}";} else {// do nothing send ack only仅仅发送ackack = "{\"type\": \"unknown-ack\"" + ", \"lastRefTime\":\"" + pushPacket.lastRefTime+ "\", \"data\":" + "\"\"}";}// 发送ack到服务端udpSocket.send(new DatagramPacket(ack.getBytes(UTF_8), ack.getBytes(UTF_8).length,packet.getSocketAddress()));} catch (Exception e) {if (closed) {return;}NAMING_LOGGER.error("[NA] error while receiving push data", e);}}}
主要逻辑:
- 监听Nacos服务端服务实例信息变更后的通知
- 解析注册中心推送的结果,组装回调ack报文,将注册中心推送的变更服务信息缓存到本地serviceInfoMap
- UDP推送ack到注册中心,以便注册中心决定是否需要重试。
public static class Retransmitter implements Runnable {AckEntry ackEntry;public Retransmitter(AckEntry ackEntry) {this.ackEntry = ackEntry;}@Overridepublic void run() {// 重试推送if (ackMap.containsKey(ackEntry.getKey())) {Loggers.PUSH.info("retry to push data, key: " + ackEntry.getKey());udpPush(ackEntry);}}}
这里逻辑很简单,如果在10秒收到来自客户端的ACK通知,那么这里的ackMap就匹配不到ackKey,就不会重试了;否则重试UDP推送客户端。
static {try {// 初始化套接字udpSocket = new DatagramSocket();// 初始化接收客户端UDP通知的UDP接收器Receiver receiver = new Receiver();// 启动一个线程Thread inThread = new Thread(receiver);inThread.setDaemon(true);inThread.setName("com.alibaba.nacos.naming.push.receiver");inThread.start();} catch (SocketException e) {Loggers.SRV_LOG.error("[NACOS-PUSH] failed to init push service");}}
UdpPushService在构造初始化时就初始化接收客户端UDP通知的UDP接收器了。Receiver 实现了Runnable接口,所以启动一个线程执行该任务。任务逻辑如下面所示:
public static class Receiver implements Runnable {@Overridepublic void run() {while (true) {byte[] buffer = new byte[1024 * 64];DatagramPacket packet = new DatagramPacket(buffer, buffer.length);try {// 监听客户端ACK确认udpSocket.receive(packet);String json = new String(packet.getData(), 0, packet.getLength(), StandardCharsets.UTF_8).trim();AckPacket ackPacket = JacksonUtils.toObj(json, AckPacket.class);InetSocketAddress socketAddress = (InetSocketAddress) packet.getSocketAddress();String ip = socketAddress.getAddress().getHostAddress();int port = socketAddress.getPort();// 接受到ACK响应的时间距离上次接受到的时间之差如果大于10秒// ACK_TIMEOUT_NANOS = TimeUnit.SECONDS.toNanos(10L)if (System.nanoTime() - ackPacket.lastRefTime > Constants.ACK_TIMEOUT_NANOS) {Loggers.PUSH.warn("ack takes too long from {} ack json: {}", packet.getSocketAddress(), json);}String ackKey = AckEntry.getAckKey(ip, port, ackPacket.lastRefTime);// 返回的是删除健的值// 这一删除就意味着不需要重试了即Retransmitter的run()中匹配不到该key// 如果超过10秒客户端未UDP推送ack还是可能重试的AckEntry ackEntry = ackMap.remove(ackKey);if (ackEntry == null) {throw new IllegalStateException("unable to find ackEntry for key: " + ackKey + ", ack json: " + json);}// 每个数据包的耗时long pushCost = System.currentTimeMillis() - udpSendTimeMap.get(ackKey);Loggers.PUSH.info("received ack: {} from: {}:{}, cost: {} ms, unacked: {}, total push: {}", json, ip,port, pushCost, ackMap.size(), MetricsMonitor.getTotalPushMonitor().get());MetricsMonitor.incrementPushCost(pushCost);udpSendTimeMap.remove(ackKey);} catch (Throwable e) {Loggers.PUSH.error("[NACOS-PUSH] error while receiving ack data", e);}}}
主要逻辑:
- 监听客户端ACK确认
- 接受到ACK响应的时间距离上次接受到的时间之差如果大于10秒,打印警告
- 拼接ackKey,然后根据ackKey删除健的值。这一删除就意味着不需要重试了即Retransmitter的run()中匹配不到该ackKey,如果超过10秒客户端未UDP推送ack还是可能重试的
- 每个数据包的耗时,打印接收到ack日志,清理数据等
采用UDP通知客户端,客户端使用PushReceiver监听服务端UDP通知,监听到则缓存到本地,组装ACK数据UDP通知服务端;启动延迟任务,10秒没有接收到ACK则重试;Receiver接收器监听客户端ACK推送,接收到则移除ackKey及其value。