Nacos Ribbon实现服务平滑上下线

79

现象

公司项目中使用k8s进行服务部署,每次发版本时会存在短暂的服务不可用现象。

原因

nacos中服务上下线后,Ribbon会存在一段时间的缓存,由于本地缓存没有及时刷新,所有导致请求到已经下线的服务。

Ribbon默认情况下30秒刷新一次缓存,可以通过配置ribbon.ServerListRefreshInterval​设置缓存刷新间隔。

# ribbon 源码
package com.netflix.loadbalancer;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.netflix.client.config.CommonClientConfigKey;
import com.netflix.client.config.IClientConfig;
import com.netflix.config.DynamicIntProperty;
import java.util.Date;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PollingServerListUpdater implements ServerListUpdater {
    private static final Logger logger = LoggerFactory.getLogger(PollingServerListUpdater.class);
	// 缓存刷新间隔时长,单位毫秒
    private static int LISTOFSERVERS_CACHE_REPEAT_INTERVAL = 30000;

    private static long getRefreshIntervalMs(IClientConfig clientConfig) {
        return (long)(Integer)clientConfig.get(CommonClientConfigKey.ServerListRefreshInterval, LISTOFSERVERS_CACHE_REPEAT_INTERVAL);
    }
}

解决方案

Nacos服务变更时会发送通知,可以通过监听Nacos的服务变更通知实现需求

@Slf4j
@Component
public class NacosServiceInstanceChangeNotifier extends Subscriber<InstancesChangeEvent> {

    @Autowired
    private SpringClientFactory springClientFactory;

    @PostConstruct
    public void registerToNotifyClient() {
        NotifyCenter.registerSubscriber(this);
    }

    @Override
    public void onEvent(InstancesChangeEvent instancesChangeEvent) {
        String serviceName = instancesChangeEvent.getServiceName();
        if (StringUtils.isNotBlank(serviceName)) {
			// servieName:DEFAULT_GROUP@@member
            serviceName = serviceName.substring(serviceName.lastIndexOf("@") + 1);
            Set<String> contextNames = springClientFactory.getContextNames();

            if (contextNames.contains(serviceName)) {
                ILoadBalancer loadBalancer = springClientFactory.getLoadBalancer(serviceName);
                if (Objects.nonNull(loadBalancer) && loadBalancer instanceof DynamicServerListLoadBalancer) {
                    DynamicServerListLoadBalancer dynamicServerListLoadBalancer = (DynamicServerListLoadBalancer) loadBalancer;
					// 刷新缓存
                    dynamicServerListLoadBalancer.updateListOfServers();
                    log.info("dynamicServiceListLoadBalancer updateListOfServers success: {}", serviceName);
                }
            }
        }

    }

    @Override
    public Class<? extends Event> subscribeType() {
        return InstancesChangeEvent.class;
    }

	/**
	* Nacos 2.1.1 版本bug,这里需要重新scopeMatches(),返回true
    * {@link <a href="https://github.com/alibaba/nacos/issues/9227">issue</a>}
	**/
    @Override
    public boolean scopeMatches(InstancesChangeEvent event) {
        return true;
    }
}

拓展:LoadBalancer


@Slf4j
@Component
public class ServiceChangeNotifier extends Subscriber<InstancesChangeEvent> {
  
    /**
     * 由于会有多个类型的 CacheManager bean, 这里的 defaultLoadBalancerCacheManager 名称不可修改
     */
    @Resource
	private CacheManager defaultLoadBalancerCacheManager;

	@PostConstruct
	public void init() {
        // 注册当前自定义的订阅者以获取通知
		NotifyCenter.registerSubscriber(this);
	}

	@Override
	public void onEvent(InstancesChangeEvent event) {
		String serviceName = event.getServiceName();
        // 使用 dubbo 时包含 rpc 服务类会注册以 providers: 或者 consumers: 开头的服务
        // 由于不是正式的服务, 这里需要进行排除, 如果未使用 dubbo 则不需要该处理
		if (serviceName.contains(":")) {
			return;
		}
        // serviceName 格式为 groupName@@name
        String split = Constants.SERVICE_INFO_SPLITER;
		if (serviceName.contains(split)) {
			serviceName = serviceName.substring(serviceName.indexOf(split) + split.length());
		}
		log.info("服务上下线: {}", serviceName);
		// 手动更新服务列表
        Cache cache = defaultLoadBalancerCacheManager.getCache(
            CachingServiceInstanceListSupplier.SERVICE_INSTANCE_CACHE_NAME);
		if (cache != null) {
			cache.evictIfPresent(serviceName);
		}
	}

	@Override
	public Class<? extends Event> subscribeType() {
		return InstancesChangeEvent.class;
	}

}

参考

Nacos 实现服务平滑上下线(Ribbon 和 LB)