这里是weihubeats,觉得文章不错可以关注公众号小奏技术,文章首发。拒绝营销号,拒绝标题党
现有的微服务是使用的Spring Cloud Zookeeper
这一套,实际应用在Kubernetes
中部署并不需要额外的注册中心,本身Kubernetes
自己就支持,所以打算替换到Zookeeper
替换为Spring Cloud Kubernetes
org.springframework.cloud spring-cloud-starter-zookeeper-discovery
org.springframework.cloud spring-cloud-starter-kubernetes-client-all org.springframework.cloud spring-cloud-kubernetes-client-loadbalancer
版本我这里使用的最新版本,2.1.4
由于最新版本有bug,就是service.yaml
中如果没有定义port的name会报错,所以这里我们采用修改源码方式去解决
问题详情可以参考我之前发的博文
直接创建一个包名为:org.springframework.cloud.kubernetes.client.discovery
创建类KubernetesInformerDiscoveryClient
代码如下
package org.springframework.cloud.kubernetes.client.discovery;import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Supplier;
import java.util.stream.Collectors;import io.kubernetes.client.extended.wait.Wait;
import io.kubernetes.client.informer.SharedInformer;
import io.kubernetes.client.informer.SharedInformerFactory;
import io.kubernetes.client.informer.cache.Lister;
import io.kubernetes.client.openapi.models.V1EndpointAddress;
import io.kubernetes.client.openapi.models.V1EndpointPort;
import io.kubernetes.client.openapi.models.V1Endpoints;
import io.kubernetes.client.openapi.models.V1Service;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;import org.springframework.beans.factory.InitializingBean;
import org.springframework.cloud.client.ServiceInstance;
import org.springframework.cloud.client.discovery.DiscoveryClient;
import org.springframework.cloud.kubernetes.commons.discovery.KubernetesDiscoveryProperties;
import org.springframework.cloud.kubernetes.commons.discovery.KubernetesServiceInstance;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;/***@author : wh*@date : 2022/10/27 14:36*@description:*/
public class KubernetesInformerDiscoveryClient implements DiscoveryClient, InitializingBean {private static final Log log = LogFactory.getLog(KubernetesInformerDiscoveryClient.class);private static final String PRIMARY_PORT_NAME_LABEL_KEY = "primary-port-name";private static final String HTTPS_PORT_NAME = "https";private static final String UNSET_PORT_NAME = "";private static final String HTTP_PORT_NAME = "http";private final SharedInformerFactory sharedInformerFactory;private final Lister serviceLister;private final Supplier informersReadyFunc;private final Lister endpointsLister;private final KubernetesDiscoveryProperties properties;private final String namespace;public KubernetesInformerDiscoveryClient(String namespace, SharedInformerFactory sharedInformerFactory,Lister serviceLister, Lister endpointsLister,SharedInformer serviceInformer, SharedInformer endpointsInformer,KubernetesDiscoveryProperties properties) {this.namespace = namespace;this.sharedInformerFactory = sharedInformerFactory;this.serviceLister = serviceLister;this.endpointsLister = endpointsLister;this.informersReadyFunc = () -> serviceInformer.hasSynced() && endpointsInformer.hasSynced();this.properties = properties;}@Overridepublic String description() {return "Kubernetes Client Discovery";}@Overridepublic List getInstances(String serviceId) {Assert.notNull(serviceId, "[Assertion failed] - the object argument must not be null");if (!StringUtils.hasText(namespace) && !properties.isAllNamespaces()) {log.warn("Namespace is null or empty, this may cause issues looking up services");}V1Service service = properties.isAllNamespaces() ? this.serviceLister.list().stream().filter(svc -> serviceId.equals(svc.getMetadata().getName())).findFirst().orElse(null): this.serviceLister.namespace(this.namespace).get(serviceId);if (service == null || !matchServiceLabels(service)) {// no such service present in the clusterreturn new ArrayList<>();}Map svcMetadata = new HashMap<>();if (this.properties.getMetadata() != null) {if (this.properties.getMetadata().isAddLabels()) {if (service.getMetadata() != null && service.getMetadata().getLabels() != null) {String labelPrefix = this.properties.getMetadata().getLabelsPrefix() != null? this.properties.getMetadata().getLabelsPrefix() : "";service.getMetadata().getLabels().entrySet().stream().filter(e -> e.getKey().startsWith(labelPrefix)).forEach(e -> svcMetadata.put(e.getKey(), e.getValue()));}}if (this.properties.getMetadata().isAddAnnotations()) {if (service.getMetadata() != null && service.getMetadata().getAnnotations() != null) {String annotationPrefix = this.properties.getMetadata().getAnnotationsPrefix() != null? this.properties.getMetadata().getAnnotationsPrefix() : "";service.getMetadata().getAnnotations().entrySet().stream().filter(e -> e.getKey().startsWith(annotationPrefix)).forEach(e -> svcMetadata.put(e.getKey(), e.getValue()));}}}V1Endpoints ep = this.endpointsLister.namespace(service.getMetadata().getNamespace()).get(service.getMetadata().getName());if (ep == null || ep.getSubsets() == null) {// no available endpoints in the clusterreturn new ArrayList<>();}Optional discoveredPrimaryPortName = Optional.empty();if (service.getMetadata() != null && service.getMetadata().getLabels() != null) {discoveredPrimaryPortName = Optional.ofNullable(service.getMetadata().getLabels().get(PRIMARY_PORT_NAME_LABEL_KEY));}final String primaryPortName = discoveredPrimaryPortName.orElse(this.properties.getPrimaryPortName());return ep.getSubsets().stream().filter(subset -> subset.getPorts() != null && subset.getPorts().size() > 0) // safeguard.flatMap(subset -> {Map metadata = new HashMap<>(svcMetadata);List endpointPorts = subset.getPorts();if (this.properties.getMetadata() != null && this.properties.getMetadata().isAddPorts()) {endpointPorts.forEach(p -> metadata.put(StringUtils.hasText(p.getName()) ? p.getName() : UNSET_PORT_NAME,Integer.toString(p.getPort())));}List addresses = subset.getAddresses();if (addresses == null) {addresses = new ArrayList<>();}if (this.properties.isIncludeNotReadyAddresses()&& !CollectionUtils.isEmpty(subset.getNotReadyAddresses())) {addresses.addAll(subset.getNotReadyAddresses());}final int port = findEndpointPort(endpointPorts, primaryPortName, serviceId);return addresses.stream().map(addr -> new KubernetesServiceInstance(addr.getTargetRef() != null ? addr.getTargetRef().getUid() : "", serviceId,addr.getIp(), port, metadata, false, service.getMetadata().getNamespace(),service.getMetadata().getClusterName()));}).collect(Collectors.toList());}private int findEndpointPort(List endpointPorts, String primaryPortName, String serviceId) {if (endpointPorts.size() == 1) {return endpointPorts.get(0).getPort();}else {Map ports = endpointPorts.stream().filter(p -> StringUtils.hasText(p.getName())).collect(Collectors.toMap(V1EndpointPort::getName, V1EndpointPort::getPort));// This oneliner is looking for a port with a name equal to the primary port// name specified in the service label// or in spring.cloud.kubernetes.discovery.primary-port-name, equal to https,// or equal to http.// In case no port has been found return -1 to log a warning and fall back to// the first port in the list.int discoveredPort = ports.getOrDefault(primaryPortName,ports.getOrDefault(HTTPS_PORT_NAME, ports.getOrDefault(HTTP_PORT_NAME, -1)));if (discoveredPort == -1) {if (StringUtils.hasText(primaryPortName)) {log.warn("Could not find a port named '" + primaryPortName + "', 'https', or 'http' for service '"+ serviceId + "'.");}else {log.warn("Could not find a port named 'https' or 'http' for service '" + serviceId + "'.");}log.warn("Make sure that either the primary-port-name label has been added to the service, or that spring.cloud.kubernetes.discovery.primary-port-name has been configured.");log.warn("Alternatively name the primary port 'https' or 'http'");log.warn("An incorrect configuration may result in non-deterministic behaviour.");discoveredPort = endpointPorts.get(0).getPort();}return discoveredPort;}}@Overridepublic List getServices() {List services = this.properties.isAllNamespaces() ? this.serviceLister.list(): this.serviceLister.namespace(this.namespace).list();return services.stream().filter(this::matchServiceLabels).map(s -> s.getMetadata().getName()).collect(Collectors.toList());}@Overridepublic void afterPropertiesSet() throws Exception {this.sharedInformerFactory.startAllRegisteredInformers();if (!Wait.poll(Duration.ofSeconds(1), Duration.ofSeconds(this.properties.getCacheLoadingTimeoutSeconds()),() -> {log.info("Waiting for the cache of informers to be fully loaded..");return this.informersReadyFunc.get();})) {if (this.properties.isWaitCacheReady()) {throw new IllegalStateException("Timeout waiting for informers cache to be ready, is the kubernetes service up?");}else {log.warn("Timeout waiting for informers cache to be ready, ignoring the failure because waitForInformerCacheReady property is false");}}log.info("Cache fully loaded (total " + serviceLister.list().size()+ " services) , discovery client is now available");}private boolean matchServiceLabels(V1Service service) {if (log.isDebugEnabled()) {log.debug("Kubernetes Service Label Properties:");if (this.properties.getServiceLabels() != null) {this.properties.getServiceLabels().forEach((key, value) -> log.debug(key + ":" + value));}log.debug("Service " + service.getMetadata().getName() + " labels:");if (service.getMetadata() != null && service.getMetadata().getLabels() != null) {service.getMetadata().getLabels().forEach((key, value) -> log.debug(key + ":" + value));}}// safeguardif (service.getMetadata() == null) {return false;}if (properties.getServiceLabels() == null || properties.getServiceLabels().isEmpty()) {return true;}return properties.getServiceLabels().keySet().stream().allMatch(k -> service.getMetadata().getLabels() != null&& service.getMetadata().getLabels().containsKey(k)&& service.getMetadata().getLabels().get(k).equals(properties.getServiceLabels().get(k)));}}
builder.routes().route(r -> r.path("/ms/test/**").filters(f -> f.stripPrefix(1)).uri("lb://test-service"))
原先使用的是zookeeper
上的服务名进行转发的,如果pod
上面的服务名和之前zookeeper
上面注册的名字不一致,就需要改一下路由的服务名
然后再k8s中重新部署服务
可以看到这里是pod
没有权限调用k8s相关的api,授权就好了。使用RBAC权限处理
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:namespace: defaultname: pod-reader
rules:- apiGroups: [""]resources: ["pods","configmaps"]verbs: ["get", "watch", "list"]
apiVersion: v1
kind: ServiceAccount
metadata:name: config-readernamespace: default
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:name: pod-readernamespace: default
roleRef:apiGroup: rbac.authorization.k8s.iokind: Rolename: pod-reader
subjects:- kind: ServiceAccountname: config-readernamespace: default
参考博客
重新部署启动就会发现是无缝切换的
总得来说切换比较简单,基本是无缝的!这样就不用依赖外部的注册中心了