前面我们讲了《nacos源码分析-服务注册(客户端)》 和 《nacos源码分析-服务注册(服务端)》,主要是讲的服务注册流程,本章节我们来讲服务心跳检测机制。
其实我们在讲 nacos服务注册客户端的时候顺带就说了心跳,服务注册流程是:
nacos客户端服务心跳在服务注册的流程中触发,这里我再贴一下源码, NacosNamingService#registerInstance的源码:
public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {if (instance.isEphemeral()) {BeatInfo beatInfo = new BeatInfo();beatInfo.setServiceName(NamingUtils.getGroupedName(serviceName, groupName));beatInfo.setIp(instance.getIp());beatInfo.setPort(instance.getPort());beatInfo.setCluster(instance.getClusterName());beatInfo.setWeight(instance.getWeight());beatInfo.setMetadata(instance.getMetadata());beatInfo.setScheduled(false);beatInfo.setPeriod(instance.getInstanceHeartBeatInterval());//添加心跳this.beatReactor.addBeatInfo(NamingUtils.getGroupedName(serviceName, groupName), beatInfo);}this.serverProxy.registerService(NamingUtils.getGroupedName(serviceName, groupName), groupName, instance);}
这里就看的比较清楚了,这里会把服务的ip,端口,服务名等信息封装到 BeatInfo 对象中,beatReactor.addBeatInfo是把当前服务实例加入心跳机制(心跳续约),然后通过serverProxy.registerService注册
代码在 BeatReactor#addBeatInfo中添加的心跳续约,在 NacosNamingService#registerInstance方法中把服务信息封装为一个 BeatInfo ,然后加入this.beatReactor.addBeatInfo 心跳机制。我们来看一下心跳是如何做的,下面是beatReactor.addBeatInfo的源码
public void addBeatInfo(String serviceName, BeatInfo beatInfo) {LogUtils.NAMING_LOGGER.info("[BEAT] adding beat: {} to beat map.", beatInfo);String key = this.buildKey(serviceName, beatInfo.getIp(), beatInfo.getPort());BeatInfo existBeat = null;if ((existBeat = (BeatInfo)this.dom2Beat.remove(key)) != null) {existBeat.setStopped(true);}this.dom2Beat.put(key, beatInfo);//线程池,定时任务,5000毫秒发送一次心跳。beatInfo.getPeriod()是定时任务执行的频率this.executorService.schedule(new BeatTask(beatInfo), beatInfo.getPeriod(), TimeUnit.MILLISECONDS);MetricsMonitor.getDom2BeatSizeMonitor().set((double)this.dom2Beat.size());}//心跳任务class BeatTask implements Runnable {BeatInfo beatInfo;public BeatTask(BeatInfo beatInfo) {this.beatInfo = beatInfo;}public void run() {if (!this.beatInfo.isStopped()) {long nextTime = this.beatInfo.getPeriod();try {//发送心跳请求,拿到结果JSONObject result = BeatReactor.this.serverProxy.sendBeat(this.beatInfo, BeatReactor.this.lightBeatEnabled);long interval = (long)result.getIntValue("clientBeatInterval");boolean lightBeatEnabled = false;if (result.containsKey("lightBeatEnabled")) {lightBeatEnabled = result.getBooleanValue("lightBeatEnabled");}BeatReactor.this.lightBeatEnabled = lightBeatEnabled;if (interval > 0L) {nextTime = interval;}int code = 10200;if (result.containsKey("code")) {code = result.getIntValue("code");}if (code == 20404) {//实例不存在就创建Instance instance = new Instance();instance.setPort(this.beatInfo.getPort());instance.setIp(this.beatInfo.getIp());instance.setWeight(this.beatInfo.getWeight());instance.setMetadata(this.beatInfo.getMetadata());instance.setClusterName(this.beatInfo.getCluster());instance.setServiceName(this.beatInfo.getServiceName());instance.setInstanceId(instance.getInstanceId());instance.setEphemeral(true);try {//注册服务BeatReactor.this.serverProxy.registerService(this.beatInfo.getServiceName(), NamingUtils.getGroupName(this.beatInfo.getServiceName()), instance);} catch (Exception var10) {}}} catch (NacosException var11) {LogUtils.NAMING_LOGGER.error("[CLIENT-BEAT] failed to send beat: {}, code: {}, msg: {}", new Object[]{JSON.toJSONString(this.beatInfo), var11.getErrCode(), var11.getErrMsg()});}//定时任务:5s一次执行心跳任务BeatReactor.this.executorService.schedule(BeatReactor.this.new BeatTask(this.beatInfo), nextTime, TimeUnit.MILLISECONDS);}}}
和Eureka一样,心跳也是通过线程池 ScheduledExecutorService 来实现的,时间频率默认是5秒一次。
BeatTask作为心跳续约的线程对象,他的run方法中 通过 BeatReactor.this.serverProxy.sendBeat发送心跳,如果发现服务未注册会通过 BeatReactor.this.serverProxy.registerService 注册服务。
下面是 com.alibaba.nacos.client.naming.net.NamingProxy#sendBeat 发送心跳的方法
public JSONObject sendBeat(BeatInfo beatInfo, boolean lightBeatEnabled) throws NacosException {if (LogUtils.NAMING_LOGGER.isDebugEnabled()) {LogUtils.NAMING_LOGGER.debug("[BEAT] {} sending beat to server: {}", this.namespaceId, beatInfo.toString());}Map params = new HashMap(8);String body = "";if (!lightBeatEnabled) {try {body = "beat=" + URLEncoder.encode(JSON.toJSONString(beatInfo), "UTF-8");} catch (UnsupportedEncodingException var6) {throw new NacosException(500, "encode beatInfo error", var6);}}params.put("namespaceId", this.namespaceId);params.put("serviceName", beatInfo.getServiceName());params.put("clusterName", beatInfo.getCluster());params.put("ip", beatInfo.getIp());params.put("port", String.valueOf(beatInfo.getPort()));String result = this.reqAPI(UtilAndComs.NACOS_URL_BASE + "/instance/beat", params, body, "PUT");return JSON.parseObject(result);}
这里也是会拼接好心跳的地址 :127.0.0.1:8848/nacos/v1/ns/instance/beat ,参数包括namespaceId命名空间ID;serviceName 服务名;clusterName 集群名;ip 服务的IP;port 端口。然后发送一个PUT请求。底层依然是从多个NacosServer随机选择一个发起心跳请求。底层交给httpClient去执行
服务端还是在InstanceController中,其中提供了一个beat方法,我们出了要考虑他是如何处理心跳请求外,还要考虑他是如何做心跳过期检查的。源码如下
/*** Create a beat for instance.* 心跳检测* @param request http request* @return detail information of instance* @throws Exception any error during handle*/@CanDistro@PutMapping("/beat")@Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)public ObjectNode beat(HttpServletRequest request) throws Exception {//客户端心跳频率 5s/次ObjectNode result = JacksonUtils.createEmptyJsonNode();result.put(SwitchEntry.CLIENT_BEAT_INTERVAL, switchDomain.getClientBeatInterval());//拿到请求中的beat数据,转成clientBeat对象String beat = WebUtils.optional(request, "beat", StringUtils.EMPTY);RsInfo clientBeat = null;if (StringUtils.isNotBlank(beat)) {clientBeat = JacksonUtils.toObj(beat, RsInfo.class);}//集群名String clusterName = WebUtils.optional(request, CommonParams.CLUSTER_NAME, UtilsAndCommons.DEFAULT_CLUSTER_NAME);//拿到客户端IP,端口String ip = WebUtils.optional(request, "ip", StringUtils.EMPTY);int port = Integer.parseInt(WebUtils.optional(request, "port", "0"));if (clientBeat != null) {if (StringUtils.isNotBlank(clientBeat.getCluster())) {clusterName = clientBeat.getCluster();} else {// fix #2533clientBeat.setCluster(clusterName);}ip = clientBeat.getIp();port = clientBeat.getPort();}//拿到命名空间ID和服务名String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);//检查服务名NamingUtils.checkServiceNameFormat(serviceName);Loggers.SRV_LOG.debug("[CLIENT-BEAT] full arguments: beat: {}, serviceName: {}", clientBeat, serviceName);//拿到服务表中的服务实例Instance instance = serviceManager.getInstance(namespaceId, serviceName, clusterName, ip, port);// 如果获取失败,说明心跳失败,实例尚未注册if (instance == null) {if (clientBeat == null) {//如果客户端心跳出现为空(请求参数中没beat),返回资源没找到result.put(CommonParams.CODE, NamingResponseCode.RESOURCE_NOT_FOUND);return result;}Loggers.SRV_LOG.warn("[CLIENT-BEAT] The instance has been removed for health mechanism, "+ "perform data compensation operations, beat: {}, serviceName: {}", clientBeat, serviceName);//创建一个实例instance = new Instance();instance.setPort(clientBeat.getPort());instance.setIp(clientBeat.getIp());instance.setWeight(clientBeat.getWeight());instance.setMetadata(clientBeat.getMetadata());instance.setClusterName(clusterName);instance.setServiceName(serviceName);instance.setInstanceId(instance.getInstanceId());instance.setEphemeral(clientBeat.isEphemeral());//注册实例serviceManager.registerInstance(namespaceId, serviceName, instance);}//获取服务Service service = serviceManager.getService(namespaceId, serviceName);if (service == null) {//服务为空throw new NacosException(NacosException.SERVER_ERROR,"service not found: " + serviceName + "@" + namespaceId);}if (clientBeat == null) {clientBeat = new RsInfo();clientBeat.setIp(ip);clientBeat.setPort(port);clientBeat.setCluster(clusterName);}//处理心跳请求service.processClientBeat(clientBeat);result.put(CommonParams.CODE, NamingResponseCode.OK);if (instance.containsMetadata(PreservedMetadataKeys.HEART_BEAT_INTERVAL)) {result.put(SwitchEntry.CLIENT_BEAT_INTERVAL, instance.getInstanceHeartBeatInterval());}result.put(SwitchEntry.LIGHT_BEAT_ENABLED, switchDomain.isLightBeatEnabled());return result;}
方法大致逻辑如下
下面是 service#processClientBeat方法源码
public void processClientBeat(final RsInfo rsInfo) {//心跳处理器,runnable对象ClientBeatProcessor clientBeatProcessor = new ClientBeatProcessor();clientBeatProcessor.setService(this);clientBeatProcessor.setRsInfo(rsInfo);//这里HealthCheckReactor.scheduleNow(clientBeatProcessor);// 开启一个没有延迟的任务,可以理解为这里就是开启了一个异步线程处理心跳续约逻辑HealthCheckReactor.scheduleNow(clientBeatProcessor);}/** 没有延迟的任务* Schedule client beat check task without a delay.** @param task health check task* @return scheduled future*/public static ScheduledFuture> scheduleNow(Runnable task) {return GlobalExecutor.scheduleNamingHealth(task, 0, TimeUnit.MILLISECONDS);}
看得出来,心跳是通过 ClientBeatProcessor去处理的。通过定时任务去执行。ClientBeatProcessor是一个线程对象
public class ClientBeatProcessor implements Runnable {public static final long CLIENT_BEAT_TIMEOUT = TimeUnit.SECONDS.toMillis(15);private RsInfo rsInfo;private Service service;@JsonIgnorepublic PushService getPushService() {return ApplicationUtils.getBean(PushService.class);}public RsInfo getRsInfo() {return rsInfo;}public void setRsInfo(RsInfo rsInfo) {this.rsInfo = rsInfo;}public Service getService() {return service;}public void setService(Service service) {this.service = service;}@Overridepublic void run() {//拿到续约的服务Service service = this.service;if (Loggers.EVT_LOG.isDebugEnabled()) {Loggers.EVT_LOG.debug("[CLIENT-BEAT] processing beat: {}", rsInfo.toString());}//拿到ip,端口,集群名等String ip = rsInfo.getIp();String clusterName = rsInfo.getCluster();int port = rsInfo.getPort();//拿到服务中的cLuster对象Cluster cluster = service.getClusterMap().get(clusterName);//拿到所有实例List instances = cluster.allIPs(true);for (Instance instance : instances) {//找到当前发送心跳的instance,通过IP和端口对比if (instance.getIp().equals(ip) && instance.getPort() == port) {if (Loggers.EVT_LOG.isDebugEnabled()) {Loggers.EVT_LOG.debug("[CLIENT-BEAT] refresh beat: {}", rsInfo.toString());}//设置心跳最后发送时间【重要】instance.setLastBeat(System.currentTimeMillis());if (!instance.isMarked() && !instance.isHealthy()) {//设置健康状态为trueinstance.setHealthy(true);Loggers.EVT_LOG.info("service: {} {POS} {IP-ENABLED} valid: {}:{}@{}, region: {}, msg: client beat ok",cluster.getService().getName(), ip, port, cluster.getName(),UtilsAndCommons.LOCALHOST_SITE);//发布一个改变事件:ServiceChangeEvent//PushService发布ServiceChangeEvent事件,使用udpPush推送给所有的客户端getPushService().serviceChanged(service);}}}}
}
方法中会从服务注册表中取出心跳续约对应的服务,然后设置最后心跳时间和健康状态。
Nacos中的实例分为临时实例和永久实例,临时实例会在心跳续约超时后被注册中心剔除,则不会。对于非临时实例(ephemeral=false),Nacos会采用主动的健康检测,定时向实例发送请求,根据响应来判断实例健康状态。
上面只是心跳续约的处理流程,心跳过期检测入口在servieManager#registerInstance 注册服务方法中,会调用servieManager#putServiceAndInit(service)方法对service进行初始化,在该方法中调用Service#init方法来开启心跳检查,该方法是在服务注册成功之后就会被调用
。
// servieManager#putServiceAndInit 服务初始化
private void putServiceAndInit(Service service) throws NacosException {putService(service);service = getService(service.getNamespaceId(), service.getName());//服务初始化,心跳检查入口service.init();consistencyService.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), service);consistencyService.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), service);Loggers.SRV_LOG.info("[NEW-SERVICE] {}", service.toJson());}
下面是service#init()方法
@JsonInclude(Include.NON_NULL)
public class Service extends com.alibaba.nacos.api.naming.pojo.Service implements Record, RecordListener {public void init() {//心跳检查。对临时服务的初始化 HealthCheckReactor.scheduleCheck(clientBeatCheckTask);//遍历注册表,初始化集群for (Map.Entry entry : clusterMap.entrySet()) {entry.getValue().setService(this);//对永久实例初始化,调用Cluster.init()entry.getValue().init();}}//定时心跳超时检查 5s一次
public static void scheduleCheck(ClientBeatCheckTask task) {futureMap.computeIfAbsent(task.taskKey(),k -> GlobalExecutor.scheduleNamingHealth(task, 5000, 5000, TimeUnit.MILLISECONDS));}
临时服务,心跳检查通过定时任务5s一次,通过 ClientBeatCheckTask 线程对象来完成,
//客户端心跳检查
public class ClientBeatCheckTask implements Runnable {@Overridepublic void run() {try {if (!getDistroMapper().responsible(service.getName())) {return;}if (!getSwitchDomain().isHealthCheckEnabled()) {return;}//拿到注册表中的所有实例List instances = service.allIPs(true);// first set health status of instances:for (Instance instance : instances) {//判断心跳是否超时:系统时间 - 最后心跳时间 > 超时时间if (System.currentTimeMillis() - instance.getLastBeat() > instance.getInstanceHeartBeatTimeOut()) {if (!instance.isMarked()) {//如果是健康的,设置为不健康if (instance.isHealthy()) {instance.setHealthy(false);Loggers.EVT_LOG.info("{POS} {IP-DISABLED} valid: {}:{}@{}@{}, region: {}, msg: client timeout after {}, last beat: {}",instance.getIp(), instance.getPort(), instance.getClusterName(),service.getName(), UtilsAndCommons.LOCALHOST_SITE,instance.getInstanceHeartBeatTimeOut(), instance.getLastBeat());//抛出服务改变时间getPushService().serviceChanged(service);//抛出服务超时事件ApplicationUtils.publishEvent(new InstanceHeartbeatTimeoutEvent(this, instance));}}}}if (!getGlobalConfig().isExpireInstance()) {return;}//移除过时的实例// then remove obsolete instances:for (Instance instance : instances) {//是否超时的标记if (instance.isMarked()) {continue;}//超时时间大于30s就要把服务剔除if (System.currentTimeMillis() - instance.getLastBeat() > instance.getIpDeleteTimeout()) {// delete instanceLoggers.SRV_LOG.info("[AUTO-DELETE-IP] service: {}, ip: {}", service.getName(),JacksonUtils.toJson(instance));//剔除服务deleteIp(instance);}}} catch (Exception e) {Loggers.SRV_LOG.warn("Exception while processing client beat time out.", e);}}
方法做如下几个事情
对于serviceChanged 服务改变事件的话是通过:PushService#serviceChanged来发布的,他会采用 udpPush 协议push给所有的客户端,当前服务状态。
下面是 com.alibaba.nacos.naming.core.Cluster#init 方法源码
public synchronized void init() {if (inited) {return;}checkTask = new HealthCheckTask(this);//开启对 永久实例的 定时健康检测HealthCheckReactor.scheduleCheck(checkTask);inited = true;}public static ScheduledFuture> scheduleCheck(HealthCheckTask task) {task.setStartTime(System.currentTimeMillis());//开启定时任务心跳检查return GlobalExecutor.scheduleNamingHealth(task, task.getCheckRtNormalized(), TimeUnit.MILLISECONDS);}
这里通过HealthCheckTask来处理永久实例的健康检查,通过定时任务定时检查。下面是 HealthCheckTask源码
//这里在计算定时任务的时间频率
private void initCheckRT() {// first check time delay 计算主动检测的时间频率//周期为2000 + 5000毫秒内的随机数checkRtNormalized =2000 + RandomUtils.nextInt(0, RandomUtils.nextInt(0, switchDomain.getTcpHealthParams().getMax()));checkRtBest = Long.MAX_VALUE;checkRtWorst = 0L;}@Overridepublic void run() {try {if (distroMapper.responsible(cluster.getService().getName()) && switchDomain.isHealthCheckEnabled(cluster.getService().getName())) {//执行检查逻辑,使用的是 TcpSuperSenseProcessor 处理,基于TCP模式healthCheckProcessor.process(this);if (Loggers.EVT_LOG.isDebugEnabled()) {Loggers.EVT_LOG.debug("[HEALTH-CHECK] schedule health check task: {}", cluster.getService().getName());}}} catch (Throwable e) {Loggers.SRV_LOG.error("[HEALTH-CHECK] error while process health check for {}:{}", cluster.getService().getName(),cluster.getName(), e);} finally {...}}
代码 healthCheckProcessor.process(this); 是处理心跳检查,使用的是实现类 TcpSuperSenseProcessor ,他是一个Runnable,源码如下
@Overridepublic void process(HealthCheckTask task) {//拿到集群中的所有实例,非临时ephemeral=false的实例List ips = task.getCluster().allIPs(false);if (CollectionUtils.isEmpty(ips)) {return;}for (Instance ip : ips) {...Beat beat = new Beat(ip, task);//添加到队列LinkedBlockingQueue,可以看到,所有的健康检测任务都被放入一个阻塞队列taskQueue.add(beat);MetricsMonitor.getTcpHealthCheckMonitor().incrementAndGet();}}//处理任务private void processTask() throws Exception {Collection> tasks = new LinkedList<>();do {Beat beat = taskQueue.poll(CONNECT_TIMEOUT_MS / 2, TimeUnit.MILLISECONDS);if (beat == null) {return;}//把任务封装到TaskProcessortasks.add(new TaskProcessor(beat));} while (taskQueue.size() > 0 && tasks.size() < NIO_THREAD_COUNT * 64);//执行所有任务,批量执行for (Future> f : GlobalExecutor.invokeAllTcpSuperSenseTask(tasks)) {f.get();}}@Overridepublic void run() {//循环,不停的从队列中拿到beat心跳任务去执行while (true) {try {//执行任务processTask();int readyCount = selector.selectNow();if (readyCount <= 0) {continue;}Iterator iter = selector.selectedKeys().iterator();while (iter.hasNext()) {SelectionKey key = iter.next();iter.remove();GlobalExecutor.executeTcpSuperSense(new PostProcessor(key));}} catch (Throwable e) {SRV_LOG.error("[HEALTH-CHECK] error while processing NIO task", e);}}}
看到这里我们大概明白,healthCheckProcessor通过TCP来向客户端发送心跳检查,底层通过队列LinkedBlockingQueue来存储心跳任务Beat 。 然后TcpSuperSenseProcessor 本身是一个Runnable,通过定时从队列中取出Beat任务,并封装陈 TaskProcessor批量执行。下面是 TaskProcessor源码
private class TaskProcessor implements Callable {private static final int MAX_WAIT_TIME_MILLISECONDS = 500;Beat beat;public TaskProcessor(Beat beat) {this.beat = beat;}@Overridepublic Void call() {long waited = System.currentTimeMillis() - beat.getStartTime();if (waited > MAX_WAIT_TIME_MILLISECONDS) {Loggers.SRV_LOG.warn("beat task waited too long: " + waited + "ms");}SocketChannel channel = null;try {Instance instance = beat.getIp();BeatKey beatKey = keyMap.get(beat.toString());if (beatKey != null && beatKey.key.isValid()) {if (System.currentTimeMillis() - beatKey.birthTime < TCP_KEEP_ALIVE_MILLIS) {instance.setBeingChecked(false);return null;}beatKey.key.cancel();beatKey.key.channel().close();}channel = SocketChannel.open();channel.configureBlocking(false);// only by setting this can we make the socket close event asynchronouschannel.socket().setSoLinger(false, -1);channel.socket().setReuseAddress(true);channel.socket().setKeepAlive(true);channel.socket().setTcpNoDelay(true);Cluster cluster = beat.getTask().getCluster();int port = cluster.isUseIPPort4Check() ? instance.getPort() : cluster.getDefCkport();channel.connect(new InetSocketAddress(instance.getIp(), port));SelectionKey key = channel.register(selector, SelectionKey.OP_CONNECT | SelectionKey.OP_READ);key.attach(beat);keyMap.put(beat.toString(), new BeatKey(key));beat.setStartTime(System.currentTimeMillis());GlobalExecutor.scheduleTcpSuperSenseTask(new TimeOutTask(key), CONNECT_TIMEOUT_MS, TimeUnit.MILLISECONDS);} catch (Exception e) {beat.finishCheck(false, false, switchDomain.getTcpHealthParams().getMax(),"tcp:error:" + e.getMessage());if (channel != null) {try {channel.close();} catch (Exception ignore) {}}}return null;}}
看得出来他是一个 Callable,通过 NIO去发送TCP请求。这里做个小总结
Nacos的健康检测分为临时实例和永久实例两种:
好了文章到此结束,用一个图来总结一下服务注册和心跳
如果文章对你有所帮助,请给个好评把,你的肯定是我最大的动力