【微服务】SpringCloud微服务注册源码解析
创始人
2024-01-28 17:18:49
0

目录

一、前言

1、简述

2、SpringCloudCommons 项目

二、客户端服务注册

1、流程图

2、入口

2.1、客户端注册引入依赖

3、EurekaServiceRegistry服务注册机

3.1、EurekaServiceRegistry注册逻辑

4、ApplicationInfoManager

4.1、setInstanceStatus(InstanceStatus status)逻辑

5、DiscoveryClient核心组件

5.1、状态变更者

6、InstanceInfoReplicator组件

2)启动复制任务

6.1、onDemandUpdate()按需更新逻辑

6.2、run()任务逻辑注册微服务

7、DiscoveryClient的注册逻辑

8、发送HTTP请求注册中心完成微服务注册

三、服务端服务注册

1、InstanceRegistry

2、缓存到注册中心本地


一、前言

1、简述

    本篇文章要探讨的问题如下:服务注册发现的抽象(这个提供了方向)?Eureka微服务注册的入口?Future模式的使用、调度线程池使用、周期性任务、令牌桶限流、加锁、取消了如何恢复?当然,提出了这些问题,都是下面涉及到的。

2、SpringCloudCommons 项目

    SpringCloudCommons 提供了两个库的特性: SpringCloudContext 和 SpringCloudCommons。Spring Cloud Context 为 Spring Cloud 应用程序的 ApplicationContext 提供实用工具和特殊服务(引导上下文、加密、刷新范围和环境端点)。SpringCloudCommons 是在不同的 SpringCloud 实现中使用的一组抽象公共类(Nacos、Spring Cloud Netflix、Spring Cloud Consul都是基于它实现微服务注册和发现的)。

二、客户端服务注册

1、流程图(自行补充)

2、入口

2.1、客户端注册引入依赖

org.springframework.cloudspring-cloud-starter-netflix-eureka-client

如果想了解SpringCloud项目搭建请看SpringCloud微服务第1章之项目搭建

是不是很熟悉,同样的配方,跟Nacos客户端注册核心相关类前缀不一样,还有SpringBoot自动装配相关的META-INF下的spring.factories文件。关于集成这方面前面的文章都详细分析了,这里就不会过多赘述,感兴趣的读者可以回头巩固一下,这里我们就直奔主题了。

3、EurekaServiceRegistry服务注册机

RegistrationServiceRegistryspring-cloud-commons项目下的,Registration用来维护需要注册的服务信息,ServiceRegistry抽象了微服务注册、发现等,EurekaServiceRegistry是spring-cloud-netflix-eureka-client下的服务注册机。

3.1、EurekaServiceRegistry注册逻辑

EurekaServiceRegistry的初始化是SpringBoot的自动装配完成的,触发注册逻辑也是这个时候开始的。

	@Overridepublic void register(EurekaRegistration reg) {maybeInitializeClient(reg);if (log.isInfoEnabled()) {log.info("Registering application "+ reg.getApplicationInfoManager().getInfo().getAppName()+ " with eureka with status "+ reg.getInstanceConfig().getInitialStatus());}// 设置实例状态,开始注册reg.getApplicationInfoManager().setInstanceStatus(reg.getInstanceConfig().getInitialStatus());// 注册健康检查reg.getHealthCheckHandler().ifAvailable(healthCheckHandler -> reg.getEurekaClient().registerHealthCheck(healthCheckHandler));}

initialStatus为向远程 Eureka 服务器注册的初始状态,默认InstanceStatus.UP,在EurekaInstanceConfigBean中维护。setInstanceStatus()设置实例的状态,里面会触发注册逻辑。点击跟进到下面的处理逻辑:

4、ApplicationInfoManager

下面的一些组件都是在原生的Eureka项目。

4.1、setInstanceStatus(InstanceStatus status)逻辑

    public synchronized void setInstanceStatus(InstanceStatus status) {// UPInstanceStatus next = instanceStatusMapper.map(status);if (next == null) {return;}// STARTINGInstanceStatus prev = instanceInfo.setStatus(next);if (prev != null) {// 自动装配流程已经初始化for (StatusChangeListener listener : listeners.values()) {try {// 通知状态变更listener.notify(new StatusChangeEvent(prev, next));} catch (Exception e) {logger.warn("failed to notify listener: {}", listener.getId(), e);}}}}

启动注册中心再启动一个微服务,

会发现StatusChangeEvent的两个状态值,说明执行了这里的逻辑;而这个日志是下面的方法打印的,即这里调用了下面的方法,这就意味着:next为UP,prev为STARTING。调用监听器的通知方法,发布状态变更事件。

1)setStatus(InstanceStatus status)

    public synchronized InstanceStatus setStatus(InstanceStatus status) {if (this.status != status) {InstanceStatus prev = this.status;this.status = status;setIsDirty();return prev;}return null;}

该方法在InstanceInfo维护,方法逻辑:如果设置了与当前状态不同的状态,则返回 prev 状态,否则返回 null。

2)InstanceStatus服务实例状态枚举类

    public enum InstanceStatus {UP, // Ready to receive traffic准备接收通讯DOWN, // Do not send traffic- healthcheck callback failed不要发送交通健康检查回调失败STARTING, // Just about starting- initializations to be done - do not send traffic// 刚刚开始-初始化要做-不要发送流量OUT_OF_SERVICE, // Intentionally shutdown for traffic故意关闭交通UNKNOWN;public static InstanceStatus toEnum(String s) {if (s != null) {try {return InstanceStatus.valueOf(s.toUpperCase());} catch (IllegalArgumentException e) {// ignore and fall through to unknownlogger.debug("illegal argument supplied to InstanceStatus.valueOf: {}, defaulting to {}", s, UNKNOWN);}}return UNKNOWN;}}

该类同样是在InstanceInfo维护:

  • UP, // 运行中状态,准备接收通讯
  • DOWN, // 下线状态,不要发送交通健康检查回调失败
  • STARTING, // 初始化状态,刚刚开始-初始化不要发送流量
  • OUT_OF_SERVICE, // 故意关闭交通
  • UNKNOWN; // 未知状态

5、DiscoveryClient核心组件

5.1、状态变更者

它是在DiscoveryClient初始化时初始化的。

            // 状态变更监听者statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {@Overridepublic String getId() {return "statusChangeListener";}@Overridepublic void notify(StatusChangeEvent statusChangeEvent) {// Saw local status change event StatusChangeEvent [timestamp=1668595102513, current=UP, previous=STARTING]logger.info("Saw local status change event {}", statusChangeEvent);instanceInfoReplicator.onDemandUpdate();}};// 初始化状态变更监听者if (clientConfig.shouldOnDemandUpdateStatusChange()) {applicationInfoManager.registerStatusChangeListener(statusChangeListener);}

这个是Java匿名内部类的,ApplicationInfoManager维护了监听器的缓存注册等,故在4.1中调用监听器的方法来到这里的处理逻辑。这里打印日志,如果你开始了日志打印可以在控制台看到。我们点击继续跟踪到下面逻辑:

6、InstanceInfoReplicator组件

InstanceInfoReplicator实现了Runnable接口,在DiscoveryClient初始化时初始化,职责:更新本地instanceInfo并将其复制到远程服务器的任务:

  • 配置为单个更新线程,以保证对远程服务器的顺序更新
  • 更新任务可以通过onDemandUpdate()按需调度
  • 任务处理的速率受到BurstSize(默认2)的限制
  • 新的更新任务总是在早期更新任务之后自动安排。但是,如果启动了 按需任务,计划的自动更新任务将被丢弃(并且将在 新的按需 更新之后安排新的任务)。

1)初始化

            // InstanceInfo replicator实例信息复制任务instanceInfoReplicator = new InstanceInfoReplicator(this,instanceInfo,clientConfig.getInstanceInfoReplicationIntervalSeconds(),2); // burstSize// 状态变更监听者.....省略.......// 定时刷新服务实例信息和检查应用状态的变化,在服务实例信息发生改变的情况下向server重新发起注册instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());

构建实例信息复制任务,定时刷新服务实例信息和检查应用状态的变化,在服务实例信息发生改变的情况下向server重新发起注册。

2)启动复制任务

    public void start(int initialDelayMs) {// 上面默认false,可见CAS成功if (started.compareAndSet(false, true)) {instanceInfo.setIsDirty();  // for initial register// 调度执行任务(自己的run()逻辑)Future next = scheduler.schedule(this, initialDelayMs, TimeUnit.SECONDS);scheduledPeriodicRef.set(next);}}

新的更新任务总是在早期更新任务之后自动安排。但是,如果启动了 按需任务,计划的自动更新任务将被丢弃(并且将在 新的按需 更新之后安排新的任务)。如下面:

6.1、onDemandUpdate()按需更新逻辑

    public boolean onDemandUpdate() {// 令牌桶算法进行限流if (rateLimiter.acquire(burstSize, allowedRatePerMinute)) {// 执行器未关闭if (!scheduler.isShutdown()) {// 提交任务scheduler.submit(new Runnable() {@Overridepublic void run() {logger.debug("Executing on-demand update of local InstanceInfo");Future latestPeriodic = scheduledPeriodicRef.get();// isDone()如果此任务完成,则返回 true。观察控制台初始化时可见是falseif (latestPeriodic != null && !latestPeriodic.isDone()) {// 取消最新的预定更新,将在onDemandUpdate()结束时重新调度// 由于scheduler只有一个核心线程,而start(int initialDelayMs)中使用了,故这里取消logger.debug("Canceling the latest scheduled update, it will be rescheduled at the end of on demand update");// 取消但是不中断latestPeriodic.cancel(false);}// 执行自己的run逻辑InstanceInfoReplicator.this.run();}});return true;} else {logger.warn("Ignoring onDemand update due to stopped scheduler");return false;}} else {logger.warn("Ignoring onDemand update due to rate limiter");return false;}}

主要逻辑:

  1. RateLimiter基于令牌桶算法进行限流(令牌桶算法的原理是系统会以一个恒定的速度往桶里放入令牌,而如果请求需要被处理,则需要先从桶里获取一个令牌,当桶里没有令牌可取时,则拒绝服务)。
  2. 执行器未关闭,提交任务
  3. 取消最新的预定更新,将在onDemandUpdate()结束时重新调度。由于scheduler只有一个核心线程,而start(int initialDelayMs)中使用了,故这里取消
  4. 执行自己的run逻辑,下面分析

6.2、run()任务逻辑注册微服务

    public void run() {try {// 刷新本地服务实例信息discoveryClient.refreshInstanceInfo();// start中初始化时设置了isInstanceInfoDirty为true,所以dirtyTimestamp不为空Long dirtyTimestamp = instanceInfo.isDirtyWithTime();if (dirtyTimestamp != null) {// 注册discoveryClient.register();instanceInfo.unsetIsDirty(dirtyTimestamp);}} catch (Throwable t) {// 实例信息复制器出了点问题logger.warn("There was a problem with the instance info replicator", t);} finally {// 恢复复制,默认实例信息复制器随需应变允许的每分钟更新率为4Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);scheduledPeriodicRef.set(next);}}

主要逻辑:

  1. 刷新本地服务实例信息
  2. 请求注册中心注册
  3. 恢复复制,默认实例信息复制器随需应变允许的每分钟更新率为4。如果onDemandUpdate()又被执行,还是可能被丢弃的。

7、DiscoveryClient的注册逻辑

    boolean register() throws Throwable {logger.info(PREFIX + "{}: registering service...", appPathIdentifier);EurekaHttpResponse httpResponse;try {// 请求注册httpResponse = eurekaTransport.registrationClient.register(instanceInfo);} catch (Exception e) {logger.warn(PREFIX + "{} - registration failed {}", appPathIdentifier, e.getMessage(), e);throw e;}if (logger.isInfoEnabled()) {logger.info(PREFIX + "{} - registration status: {}", appPathIdentifier, httpResponse.getStatusCode());}// 返回注册是否成功return httpResponse.getStatusCode() == Status.NO_CONTENT.getStatusCode();}

兜个圈又回来了!!!调用与注册中心通信类的注册方法,返回是否注册成功。

8、发送HTTP请求注册中心完成微服务注册

    @Overridepublic EurekaHttpResponse register(InstanceInfo info) {// 拼接URLString urlPath = "apps/" + info.getAppName();ClientResponse response = null;try {// 获取resourceBuilderBuilder resourceBuilder = jerseyClient.resource(serviceUrl).path(urlPath).getRequestBuilder();addExtraHeaders(resourceBuilder);// 发送post请求注册中心response = resourceBuilder.header("Accept-Encoding", "gzip").type(MediaType.APPLICATION_JSON_TYPE).accept(MediaType.APPLICATION_JSON).post(ClientResponse.class, info);return anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build();} finally {if (logger.isDebugEnabled()) {logger.debug("Jersey HTTP POST {}{} with instance {}; statusCode={}", serviceUrl, urlPath, info.getId(),response == null ? "N/A" : response.getStatus());}if (response != null) {response.close();}}}

主要逻辑:

  1. 拼接请求注册中心URL
  2. 获取resourceBuilder,这个是sun公司提供的
  3. 发送post请求注册中心,返回响应结果

三、服务端服务注册

1、InstanceRegistry

	@Overridepublic void register(InstanceInfo info, int leaseDuration, boolean isReplication) {handleRegistration(info, leaseDuration, isReplication);super.register(info, leaseDuration, isReplication);}@Overridepublic void register(final InstanceInfo info, final boolean isReplication) {handleRegistration(info, resolveInstanceLeaseDuration(info), isReplication);super.register(info, isReplication);}

这里的逻辑不多,两个注册方法,通过super委托PeerAwareInstanceRegistryImpl父类AbstractInstanceRegistry处理。

AbstractInstanceRegistry抽象服务实例注册机,我们下面看看它的注册逻辑:

2、缓存到注册中心本地

    public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {// 加锁read.lock();try {// 尝试从本地获取Map> gMap = registry.get(registrant.getAppName());REGISTER.increment(isReplication);// 本地没有则缓存起来if (gMap == null) {final ConcurrentHashMap> gNewMap = new ConcurrentHashMap>();gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);if (gMap == null) {gMap = gNewMap;}}Lease existingLease = gMap.get(registrant.getId());// Retain the last dirty timestamp without overwriting it, if there is already a lease// 如果已经有租约,则保留最后一个脏时间戳而不覆盖它if (existingLease != null && (existingLease.getHolder() != null)) {Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp();Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);// this is a > instead of a >= because if the timestamps are equal, we still take the remote transmitted// InstanceInfo instead of the server local copy.if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater" +" than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant");registrant = existingLease.getHolder();}} else {// The lease does not exist and hence it is a new registration// 租约不存在,因此是一个新的注册synchronized (lock) {if (this.expectedNumberOfClientsSendingRenews > 0) {// Since the client wants to register it, increase the number of clients sending renews// 因为客户端想要注册它,所以增加发送更新的客户端的数量this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews + 1;updateRenewsPerMinThreshold();}}logger.debug("No previous lease information found; it is new registration");}Lease lease = new Lease<>(registrant, leaseDuration);if (existingLease != null) {lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());}gMap.put(registrant.getId(), lease);// 修改数据保存到最近队列 用于客户端增量更新recentRegisteredQueue.add(new Pair(System.currentTimeMillis(),registrant.getAppName() + "(" + registrant.getId() + ")"));// This is where the initial state transfer of overridden status happensif (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) {logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the "+ "overrides", registrant.getOverriddenStatus(), registrant.getId());if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) {logger.info("Not found overridden id {} and hence adding it", registrant.getId());overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus());}}InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId());if (overriddenStatusFromMap != null) {logger.info("Storing overridden status {} from map", overriddenStatusFromMap);registrant.setOverriddenStatus(overriddenStatusFromMap);}// Set the status based on the overridden status rulesInstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication);registrant.setStatusWithoutDirty(overriddenInstanceStatus);// If the lease is registered with UP status, set lease service up timestampif (InstanceStatus.UP.equals(registrant.getStatus())) {lease.serviceUp();}registrant.setActionType(ActionType.ADDED);// 修改数据保存到最近队列 用于客户端增量更新recentlyChangedQueue.add(new RecentlyChangedItem(lease));registrant.setLastUpdatedTimestamp();invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());logger.info("Registered instance {}/{} with status {} (replication={})",registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication);} finally {// 释放锁read.unlock();}}

主要逻辑:

  1. 加锁,尝试从本地获取看下有没有注册了
  2. 本地registry(ConcurrentHashMap>>)没有则重新缓存起来
  3. 如果已经有租约,则保留最后一个脏时间戳而不覆盖它;否则租约不存在,因此是一个新的注册,因为客户端想要注册它,所以增加发送更新的客户端的数量。
  4. 修改数据保存到最近队列 用于客户端增量更新

相关内容

热门资讯

喜欢穿一身黑的男生性格(喜欢穿... 今天百科达人给各位分享喜欢穿一身黑的男生性格的知识,其中也会对喜欢穿一身黑衣服的男人人好相处吗进行解...
网络用语zl是什么意思(zl是... 今天给各位分享网络用语zl是什么意思的知识,其中也会对zl是啥意思是什么网络用语进行解释,如果能碰巧...
发春是什么意思(思春和发春是什... 本篇文章极速百科给大家谈谈发春是什么意思,以及思春和发春是什么意思对应的知识点,希望对各位有所帮助,...
苏州离哪个飞机场近(苏州离哪个... 本篇文章极速百科小编给大家谈谈苏州离哪个飞机场近,以及苏州离哪个飞机场近点对应的知识点,希望对各位有...
为什么酷狗音乐自己唱的歌不能下... 本篇文章极速百科小编给大家谈谈为什么酷狗音乐自己唱的歌不能下载到本地?,以及为什么酷狗下载的歌曲不是...
家里可以做假山养金鱼吗(假山能... 今天百科达人给各位分享家里可以做假山养金鱼吗的知识,其中也会对假山能放鱼缸里吗进行解释,如果能碰巧解...
华为下载未安装的文件去哪找(华... 今天百科达人给各位分享华为下载未安装的文件去哪找的知识,其中也会对华为下载未安装的文件去哪找到进行解...
四分五裂是什么生肖什么动物(四... 本篇文章极速百科小编给大家谈谈四分五裂是什么生肖什么动物,以及四分五裂打一生肖是什么对应的知识点,希...
怎么往应用助手里添加应用(应用... 今天百科达人给各位分享怎么往应用助手里添加应用的知识,其中也会对应用助手怎么添加微信进行解释,如果能...
客厅放八骏马摆件可以吗(家里摆... 今天给各位分享客厅放八骏马摆件可以吗的知识,其中也会对家里摆八骏马摆件好吗进行解释,如果能碰巧解决你...