【微服务】Nacos服务发现源码分析
创始人
2024-01-20 16:35:05
0

💖Spring家族及微服务系列文章

✨【微服务】SpringBoot监听器机制以及在Nacos中的应用

✨【微服务】Nacos服务端完成微服务注册以及健康检查流程

✨【微服务】Nacos客户端微服务注册原理流程

✨【微服务】SpringCloud中使用Ribbon实现负载均衡的原理

✨【微服务】SpringBoot启动流程注册FeignClient

✨【微服务】SpringBoot启动流程初始化OpenFeign的入口

✨Spring Bean的生命周期

✨Spring事务原理

✨SpringBoot自动装配原理机制及过程

✨SpringBoot获取处理器流程

✨SpringBoot中处理器映射关系注册流程

✨Spring5.x中Bean初始化流程

✨Spring中Bean定义的注册流程

✨Spring的处理器映射器与适配器的架构设计

✨SpringMVC执行流程图解及源码

目录

💖Spring家族及微服务系列文章

💖前言

💖Nacos服务发现

✨流程图

✨服务发现的入口

💫SpringCloud原生项目spring-cloud-commons

💫Nacos是如何继承下来的?

💫NacosServiceDiscovery#getInstances()获取服务实例

✨NacosNamingService初始化流程

💖NacosNamingService构造初始化

💫HostReactor构造初始化

💖PushReceiver构造初始化

💫PushReceiver#run

✨从集成的client模块本地服务发现

💫获取服务实例列表

💖从本地缓存/发送http从服务端获取服务信息

💫从本地缓存获取

💫发送HTTP调用从Nacos服务端获取

💫scheduleUpdateIfAbsent()

💫UpdateTask#run()任务逻辑

💫queryList()发送http请求注册中心


💖前言

     这篇文章就介绍下,服务发现的入口是什么?本地缓存数据结构、缓存时机、如果缓存中没有如何处理?使用了定时任务,那定时任务的底层基于什么的、它是干什么的、定时间隔?监听服务端UDP通知、发送ACK?发送http请求到服务端,谁发的、如何接收?服务端如何推送服务实例的、采用什么方式?带着这些问题,下面我们来探究探究。

    注意:Nacos源码版本为1.x

💖Nacos服务发现

✨流程图

✨服务发现的入口

💫SpringCloud原生项目spring-cloud-commons

    你会发现@EnableDiscoveryClient注解也是在spring-cloud-commons项目,还有个discovery文件夹。我们本节注意下DiscoveryClient接口,以及其中声明的接口方法。SpringCloud是由几个关键项目组成的,spring-cloud-commons项目是其中之一。SpringCloud Alibaba也不是完全替代SpringCloud的,一些基本的规范还是继承下来了,做扩展等。

💫Nacos是如何继承下来的?

    Nacos是通过自己的spring-cloud-alibaba-nacos-discovery项目去集成到SpringCloud的以及基于SpringBoot的自动装配机制集成到SpringBoot项目的。而服务发现方面,NacosDiscoveryClient 实现了spring-cloud-commons项目的DiscoveryClient接口,即Nacos中服务发现入口是NacosDiscoveryClient类。

点击方法继续跟进到下面的逻辑

💫NacosServiceDiscovery#getInstances()获取服务实例

​public List getInstances(String serviceId) throws NacosException {// 获取配置文件组信息String group = this.discoveryProperties.getGroup();// 调用API模块中NamingService的selectInstances()方法,// 引用是NacosNamingService的反射获取,之前文章已分析List instances = this.namingService().selectInstances(serviceId, group, true);// 将Nacos的服务实例适配为SpringCloud的ServiceInstance服务实例return hostToServiceInstanceList(instances, serviceId);}​

主要逻辑:

  1. 获取配置文件组信息
  2. 调用API模块中NamingService接口的selectInstances()方法。引用是NacosNamingService的,通过反射获取,之前文章已详细分析。NacosNamingService是Nacos的client模块里面的一个组件,下面分析。
  3. 将Nacos的服务实例适配为SpringCloud的ServiceInstance服务实例

✨NacosNamingService初始化流程

    它的构造方法是在NamingFactory通过反射方式调用的,上面也提到了。因为这个流程也是不小的,故在获取服务实例前先讲解。

💖NacosNamingService构造初始化

    public NacosNamingService(Properties properties) throws NacosException {init(properties);}private void init(Properties properties) throws NacosException {ValidatorUtils.checkInitParam(properties);this.namespace = InitUtils.initNamespaceForNaming(properties);InitUtils.initSerialization();initServerAddr(properties);InitUtils.initWebRootContext();initCacheDir();initLogName(properties);this.eventDispatcher = new EventDispatcher();// 初始化服务代理this.serverProxy = new NamingProxy(this.namespace, this.endpoint, this.serverList, properties);// 初始化心跳组件this.beatReactor = new BeatReactor(this.serverProxy, initClientBeatThreadCount(properties));// 初始化hostReactor this.hostReactor = new HostReactor(this.eventDispatcher, this.serverProxy, beatReactor, this.cacheDir,isLoadCacheAtStart(properties), initPollingThreadCount(properties));}

初始化服务代理、心跳发送组件以及hostReactor,重点看hostReactor的构造初始化

💫HostReactor构造初始化

    public HostReactor(EventDispatcher eventDispatcher, NamingProxy serverProxy, BeatReactor beatReactor,String cacheDir, boolean loadCacheAtStart, int pollingThreadCount) {// init executorServicethis.executor = new ScheduledThreadPoolExecutor(pollingThreadCount, new ThreadFactory() {@Overridepublic Thread newThread(Runnable r) {Thread thread = new Thread(r);thread.setDaemon(true);thread.setName("com.alibaba.nacos.client.naming.updater");return thread;}});this.eventDispatcher = eventDispatcher;this.beatReactor = beatReactor;this.serverProxy = serverProxy;this.cacheDir = cacheDir;// 初始化本地缓存if (loadCacheAtStart) {this.serviceInfoMap = new ConcurrentHashMap(DiskCache.read(this.cacheDir));} else {this.serviceInfoMap = new ConcurrentHashMap(16);}this.updatingMap = new ConcurrentHashMap();this.failoverReactor = new FailoverReactor(this, cacheDir);// 初始化pushReceiverthis.pushReceiver = new PushReceiver(this);}

初始化本地缓存、pushReceiver,重点关注PushReceiver的构造方法

💖PushReceiver构造初始化

    public PushReceiver(HostReactor hostReactor) {try {this.hostReactor = hostReactor;// 初始化udp套接字this.udpSocket = new DatagramSocket();// 启动一个线程this.executorService = new ScheduledThreadPoolExecutor(1, new ThreadFactory() {@Overridepublic Thread newThread(Runnable r) {Thread thread = new Thread(r);thread.setDaemon(true);thread.setName("com.alibaba.nacos.naming.push.receiver");return thread;}});// 执行任务,下面的run()this.executorService.execute(this);} catch (Exception e) {NAMING_LOGGER.error("[NA] init udp socket failed", e);}}

初始化udp套接字用于监听注册中心变更服务推送以及发送ack确认、启动一个线程死循环用于监听注册中心udp推送服务变更、执行任务,this就是PushReceiver的引用即任务,所以执行下面的run()逻辑。

💫PushReceiver#run

    @Overridepublic void run() {while (!closed) {try {// byte[] is initialized with 0 full filled by defaultbyte[] buffer = new byte[UDP_MSS];DatagramPacket packet = new DatagramPacket(buffer, buffer.length);// 监听Nacos服务端服务实例信息变更后的通知udpSocket.receive(packet);String json = new String(IoUtils.tryDecompress(packet.getData()), UTF_8).trim();NAMING_LOGGER.info("received push data: " + json + " from " + packet.getAddress().toString());PushPacket pushPacket = JacksonUtils.toObj(json, PushPacket.class);String ack;if ("dom".equals(pushPacket.type) || "service".equals(pushPacket.type)) {// 将数据缓存到本地hostReactor.processServiceJson(pushPacket.data);// send ack to serverack = "{\"type\": \"push-ack\"" + ", \"lastRefTime\":\"" + pushPacket.lastRefTime + "\", \"data\":"+ "\"\"}";} else if ("dump".equals(pushPacket.type)) {// dump data to serverack = "{\"type\": \"dump-ack\"" + ", \"lastRefTime\": \"" + pushPacket.lastRefTime + "\", \"data\":"+ "\"" + StringUtils.escapeJavaScript(JacksonUtils.toJson(hostReactor.getServiceInfoMap()))+ "\"}";} else {// do nothing send ack onlyack = "{\"type\": \"unknown-ack\"" + ", \"lastRefTime\":\"" + pushPacket.lastRefTime+ "\", \"data\":" + "\"\"}";}// 发送ack到服务端udpSocket.send(new DatagramPacket(ack.getBytes(UTF_8), ack.getBytes(UTF_8).length,packet.getSocketAddress()));} catch (Exception e) {NAMING_LOGGER.error("[NA] error while receiving push data", e);}}}

主要逻辑:

  1. 监听Nacos服务端服务实例信息变更后的通知
  2. 解析注册中心推送的结果,组装回调ack报文,将注册中心推送的变更服务信息缓存到本地
  3. 发送ack到注册中心,以便注册中心决定是否需要重试。

✨从集成的client模块本地服务发现

    本节点讲解的就是客户端服务发现,之所以这样说是因为SpringBoot的自动装配将Nacos的client模块集成进来了,想了解更多去看前面的文章分析。

💫获取服务实例列表

调用重载的selectInstances()方法,healthy默认true即健康,subscribe默认true即订阅

    @Overridepublic List selectInstances(String serviceName, String groupName, List clusters, boolean healthy,boolean subscribe) throws NacosException {ServiceInfo serviceInfo;// 默认订阅模式if (subscribe) {// 委托hostReactor处理serviceInfo = hostReactor.getServiceInfo(NamingUtils.getGroupedName(serviceName, groupName),StringUtils.join(clusters, ","));} else {serviceInfo = hostReactor.getServiceInfoDirectlyFromServer(NamingUtils.getGroupedName(serviceName, groupName),StringUtils.join(clusters, ","));}// 选取健康实例return selectInstances(serviceInfo, healthy);}

    默认使用订阅模式,但是委托hostReactor去获取服务信息,以服务名、分组拼接作为入参即Nacos可识别的服务名。

💖从本地缓存/发送http从服务端获取服务信息

    public ServiceInfo getServiceInfo(final String serviceName, final String clusters) {// failover-mode:默认falseNAMING_LOGGER.debug("failover-mode: " + failoverReactor.isFailoverSwitch());String key = ServiceInfo.getKey(serviceName, clusters);if (failoverReactor.isFailoverSwitch()) {return failoverReactor.getService(key);}// 从本地缓存serviceInfoMap获取ServiceInfo serviceObj = getServiceInfo0(serviceName, clusters);// 如果本地缓存中没有,则发送HTTP调用从Nacos服务端获取if (null == serviceObj) {serviceObj = new ServiceInfo(serviceName, clusters);serviceInfoMap.put(serviceObj.getKey(), serviceObj);updatingMap.put(serviceName, new Object());// 更新服务updateServiceNow(serviceName, clusters);updatingMap.remove(serviceName);} else if (updatingMap.containsKey(serviceName)) {if (UPDATE_HOLD_INTERVAL > 0) {// hold a moment waiting for update finish等待更新完成synchronized (serviceObj) {try {serviceObj.wait(UPDATE_HOLD_INTERVAL);} catch (InterruptedException e) {NAMING_LOGGER.error("[getServiceInfo] serviceName:" + serviceName + ", clusters:" + clusters, e);}}}}// 开启一个定时任务,每隔1秒从Nacos服务端获取最新的服务实例信息,// 更新到本地缓存seriveInfoMap中scheduleUpdateIfAbsent(serviceName, clusters);// 从本地缓存serviceInfoMap中获取服务实例信息return serviceInfoMap.get(serviceObj.getKey());}

主要逻辑:

  1. 从本地缓存serviceInfoMap获取
  2. 如果本地缓存中没有,则发送HTTP调用从Nacos服务端获取
  3. 开启一个定时任务,每隔1秒从Nacos服务端获取最新的服务实例信息, 更新到本地缓存seriveInfoMap中
  4.  从本地缓存serviceInfoMap中获取服务实例信息

💫从本地缓存获取

    private ServiceInfo getServiceInfo0(String serviceName, String clusters) {String key = ServiceInfo.getKey(serviceName, clusters);// 本地缓存serviceInfoMap获取return serviceInfoMap.get(key);}

就单纯地从本地缓存serviceInfoMap获取

💫发送HTTP调用从Nacos服务端获取

    public void updateService(String serviceName, String clusters) throws NacosException {ServiceInfo oldService = getServiceInfo0(serviceName, clusters);try {// 通过NamingProxy发送HTTP调用,获取服务信息String result = serverProxy.queryList(serviceName, clusters, pushReceiver.getUdpPort(), false);if (StringUtils.isNotEmpty(result)) {// 更新本地缓存serviceInfoMapprocessServiceJson(result);}} finally {if (oldService != null) {synchronized (oldService) {oldService.notifyAll();}}}}

    通过NamingProxy发送HTTP调用,获取服务信息;响应结果不为空更新本地缓存serviceInfoMap

💫scheduleUpdateIfAbsent()

​public void scheduleUpdateIfAbsent(String serviceName, String clusters) {if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) {return;}synchronized (futureMap) {if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) {return;}// 启动定时任务ScheduledFuture future = addTask(new UpdateTask(serviceName, clusters));futureMap.put(ServiceInfo.getKey(serviceName, clusters), future);}}​

DEFAULT_DELAY默认1000在这里即1秒。启动定时任务,每隔1秒执行一次,任务逻辑如下:

💫UpdateTask#run()任务逻辑

        @Overridepublic void run() {long delayTime = DEFAULT_DELAY;try {// 尝试从本地获取ServiceInfo serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters));if (serviceObj == null) {// 本地还是没有则发送http从服务端获取,并缓存到本地updateService(serviceName, clusters);return;}// 过期服务(服务的最新更新时间小于等于缓存刷新时间),从注册中心重新查询if (serviceObj.getLastRefTime() <= lastRefTime) {updateService(serviceName, clusters);serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters));} else {// if serviceName already updated by push, we should not override it// since the push data may be different from pull through force push// 如果 serviceName 已经通过 push 更新,我们不应该覆盖它,// 因为 push 数据可能与 pull through force push 不同refreshOnly(serviceName, clusters);}// 刷新更新时间lastRefTime = serviceObj.getLastRefTime();if (!eventDispatcher.isSubscribed(serviceName, clusters) && !futureMap.containsKey(ServiceInfo.getKey(serviceName, clusters))) {// abort the update taskNAMING_LOGGER.info("update task is stopped, service:" + serviceName + ", clusters:" + clusters);return;}if (CollectionUtils.isEmpty(serviceObj.getHosts())) {incFailCount();return;}delayTime = serviceObj.getCacheMillis();// 重置失败数量为0resetFailCount();} catch (Throwable e) {incFailCount();NAMING_LOGGER.warn("[NA] failed to update serviceName: " + serviceName, e);} finally {// 下次调度刷新时间,下次执行的时间与failCount有关// failCount=0,则下次调度时间为6秒,最长为1分钟,即当无异常情况下缓存实例的刷新时间是6秒executor.schedule(this, Math.min(delayTime << failCount, DEFAULT_DELAY * 60), TimeUnit.MILLISECONDS);}}

主要逻辑:

  1. 尝试从本地缓存获取
  2. 本地还是没有则发送http从服务端获取,并缓存到本地
  3. 过期服务,从注册中心重新查询;否则如果 serviceName 已经通过 push 更新,不应该覆盖它,因为 push 数据可能与 pull through force push 不同
  4. 刷新更新时间、重置失败数量为0等
  5. 下次调度刷新时间,下次执行的时间与failCount有关failCount=0,则下次调度时间为6秒,最长为1分钟,即当无异常情况下缓存实例的刷新时间是6秒

💫queryList()发送http请求注册中心

里面会调用重载的reqApi()方法,调用前组装入参、拼接URL等。

    public String queryList(String serviceName, String clusters, int udpPort, boolean healthyOnly)throws NacosException {final Map params = new HashMap(8);params.put(CommonParams.NAMESPACE_ID, namespaceId);params.put(CommonParams.SERVICE_NAME, serviceName);params.put("clusters", clusters);params.put("udpPort", String.valueOf(udpPort));params.put("clientIP", NetUtils.localIP());params.put("healthyOnly", String.valueOf(healthyOnly));return reqApi(UtilAndComs.nacosUrlBase + "/instance/list", params, HttpMethod.GET);}

这篇文章是基于Nacos地1.x版本地,Nacos已经发布了新的2.x版本,官方也推荐使用新的。故还会出新的关于服务发现的文章。

相关内容

热门资讯

喜欢穿一身黑的男生性格(喜欢穿... 今天百科达人给各位分享喜欢穿一身黑的男生性格的知识,其中也会对喜欢穿一身黑衣服的男人人好相处吗进行解...
发春是什么意思(思春和发春是什... 本篇文章极速百科给大家谈谈发春是什么意思,以及思春和发春是什么意思对应的知识点,希望对各位有所帮助,...
网络用语zl是什么意思(zl是... 今天给各位分享网络用语zl是什么意思的知识,其中也会对zl是啥意思是什么网络用语进行解释,如果能碰巧...
为什么酷狗音乐自己唱的歌不能下... 本篇文章极速百科小编给大家谈谈为什么酷狗音乐自己唱的歌不能下载到本地?,以及为什么酷狗下载的歌曲不是...
华为下载未安装的文件去哪找(华... 今天百科达人给各位分享华为下载未安装的文件去哪找的知识,其中也会对华为下载未安装的文件去哪找到进行解...
怎么往应用助手里添加应用(应用... 今天百科达人给各位分享怎么往应用助手里添加应用的知识,其中也会对应用助手怎么添加微信进行解释,如果能...
家里可以做假山养金鱼吗(假山能... 今天百科达人给各位分享家里可以做假山养金鱼吗的知识,其中也会对假山能放鱼缸里吗进行解释,如果能碰巧解...
一帆风顺二龙腾飞三阳开泰祝福语... 本篇文章极速百科给大家谈谈一帆风顺二龙腾飞三阳开泰祝福语,以及一帆风顺二龙腾飞三阳开泰祝福语结婚对应...
美团联名卡审核成功待激活(美团... 今天百科达人给各位分享美团联名卡审核成功待激活的知识,其中也会对美团联名卡审核未通过进行解释,如果能...
四分五裂是什么生肖什么动物(四... 本篇文章极速百科小编给大家谈谈四分五裂是什么生肖什么动物,以及四分五裂打一生肖是什么对应的知识点,希...