spring cloud手动下掉节点
背景
项目中使用nacos作为注册中心,在服务发布过程中,经常会遇到服务发布过程中实例重启过程中还会有流量打向实例A,对业务造成一定影响
目标
实现服务的平滑重启
方案
现状介绍
当前服务是部署在虚拟机上的,在服务重启时,使用systemctl命令实现服务重启
systemctl restart xxxx
重启过程中,会先向服务发送SIGTERM(15)信号,然后等待一段时间之后再发送SIGKILL(9)
SIGTERM信号java程序可以通过Runtime类监听,在程序结束之前做一些善后工作
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
try {
doSomeThing();
} catch (Exception ex) {
log.error("doSomeThing error. ", ex);
}
}));
事实上spring已经有这部分工作,参考
org.springframework.boot.SpringApplicationShutdownHook
class SpringApplicationShutdownHook implements Runnable {
// ...
void registerApplicationContext(ConfigurableApplicationContext context) {
addRuntimeShutdownHookIfNecessary();
synchronized (SpringApplicationShutdownHook.class) {
assertNotInProgress();
context.addApplicationListener(this.contextCloseListener);
this.contexts.add(context);
}
}
private void addRuntimeShutdownHookIfNecessary() {
if (this.shutdownHookAdded.compareAndSet(false, true)) {
addRuntimeShutdownHook();
}
}
void addRuntimeShutdownHook() {
try {
Runtime.getRuntime().addShutdownHook(new Thread(this, "SpringApplicationShutdownHook"));
}
catch (AccessControlException ex) {
// Not allowed in some environments
}
}
@Override
public void run() {
Set<ConfigurableApplicationContext> contexts;
Set<ConfigurableApplicationContext> closedContexts;
Set<Runnable> actions;
synchronized (SpringApplicationShutdownHook.class) {
this.inProgress = true;
contexts = new LinkedHashSet<>(this.contexts);
closedContexts = new LinkedHashSet<>(this.closedContexts);
actions = new LinkedHashSet<>(this.handlers.getActions());
}
contexts.forEach(this::closeAndWait);
closedContexts.forEach(this::closeAndWait);
actions.forEach(Runnable::run);
}
//...
}
spring cloud注册中心实现
参考org.springframework.cloud.client.serviceregistry.AbstractAutoServiceRegistration#destroy
public abstract class AbstractAutoServiceRegistration<R extends Registration>
implements AutoServiceRegistration, ApplicationContextAware, ApplicationListener<WebServerInitializedEvent> {
// ...
public void onApplicationEvent(WebServerInitializedEvent event) {
bind(event);
}
@Deprecated
public void bind(WebServerInitializedEvent event) {
ApplicationContext context = event.getApplicationContext();
if (context instanceof ConfigurableWebServerApplicationContext) {
if ("management".equals(((ConfigurableWebServerApplicationContext) context).getServerNamespace())) {
return;
}
}
this.port.compareAndSet(0, event.getWebServer().getPort());
this.start();
}
// ...
public void start() {
if (!isEnabled()) {
if (logger.isDebugEnabled()) {
logger.debug("Discovery Lifecycle disabled. Not starting");
}
return;
}
// only initialize if nonSecurePort is greater than 0 and it isn't already running
// because of containerPortInitializer below
if (!this.running.get()) {
this.context.publishEvent(new InstancePreRegisteredEvent(this, getRegistration()));
register();
if (shouldRegisterManagement()) {
registerManagement();
}
this.context.publishEvent(new InstanceRegisteredEvent<>(this, getConfiguration()));
this.running.compareAndSet(false, true);
}
}
// ...
protected void register() {
this.serviceRegistry.register(getRegistration());
}
// ...
@PreDestroy
public void destroy() {
stop();
}
// ...
public void stop() {
if (this.getRunning().compareAndSet(true, false) && isEnabled()) {
deregister();
if (shouldRegisterManagement()) {
deregisterManagement();
}
this.serviceRegistry.close();
}
}
通过监听spring WebServerInitializedEvent实现服务的注册,见onApplicationEvent;通过bean销毁钩子实现服务的下线,参考destroy方法,方法上有@PreDestroy注解,会在bean被销毁前调用。
nacos实现了该类,参考com.alibaba.cloud.nacos.registry.NacosServiceRegistry;也就是说nacos会在服务销毁之前将当前实例从nacos上下掉
为什么重启过车中还会有流量?
服务销毁前,已经从上有打过来的未处理完的流量
当前spring boot 已经配置了平滑销毁,具体配置如下
server:
port: 8801
shutdown: graceful
spring:
lifecycle:
timeout-per-shutdown-phase: 30s
理论上销毁前会将未处理完的请求处理完之后再销毁服务,所以这种可能排除
服务从nacos上下掉之后,上游本地的缓存还未更新,导致上游未及时感知到下游提供方的变更
nacos discover client实现参考com.alibaba.cloud.nacos.discovery.NacosDiscoveryClient
public class NacosDiscoveryClient implements DiscoveryClient {
// ...
@Override
public List<ServiceInstance> getInstances(String serviceId) {
try {
return Optional.of(serviceDiscovery.getInstances(serviceId)).map(instances -> {
ServiceCache.setInstances(serviceId, instances);
return instances;
}).get();
}
catch (Exception e) {
if (failureToleranceEnabled) {
return ServiceCache.getInstances(serviceId);
}
throw new RuntimeException(
"Can not get hosts from nacos server. serviceId: " + serviceId, e);
}
}
// ...
}
接下来看com.alibaba.cloud.nacos.discovery.NacosServiceDiscovery
public class NacosServiceDiscovery {
// ...
public List<ServiceInstance> getInstances(String serviceId) throws NacosException {
String group = discoveryProperties.getGroup();
List<Instance> instances = namingService().selectInstances(serviceId, group,
true);
return hostToServiceInstanceList(instances, serviceId);
}
// ...
private NamingService namingService() {
return nacosServiceManager
.getNamingService(discoveryProperties.getNacosProperties());
}
}
再看com.alibaba.nacos.client.naming.NacosNamingService
@SuppressWarnings("PMD.ServiceOrDaoClassShouldEndWithImplRule")
public class NacosNamingService implements NamingService {
// ...
@Override
public List<Instance> selectInstances(String serviceName, String groupName, boolean healthy) throws NacosException {
return selectInstances(serviceName, groupName, healthy, true);
}
// ...
@Override
public List<Instance> selectInstances(String serviceName, String groupName, List<String> clusters, boolean healthy,
boolean subscribe) throws NacosException {
ServiceInfo serviceInfo;
if (subscribe) { // 走到这个分支
serviceInfo = hostReactor.getServiceInfo(NamingUtils.getGroupedName(serviceName, groupName),
StringUtils.join(clusters, ","));
} else {
serviceInfo = hostReactor
.getServiceInfoDirectlyFromServer(NamingUtils.getGroupedName(serviceName, groupName),
StringUtils.join(clusters, ","));
}
return selectInstances(serviceInfo, healthy);
}
// ...
}
再看com.alibaba.nacos.client.naming.core.HostReactor
public class HostReactor implements Closeable {
// ...
public ServiceInfo getServiceInfo(final String serviceName, final String clusters) {
NAMING_LOGGER.debug("failover-mode: " + failoverReactor.isFailoverSwitch());
String key = ServiceInfo.getKey(serviceName, clusters);
if (failoverReactor.isFailoverSwitch()) {
return failoverReactor.getService(key);
}
ServiceInfo serviceObj = getServiceInfo0(serviceName, clusters);
if (null == serviceObj) {
serviceObj = new ServiceInfo(serviceName, clusters);
serviceInfoMap.put(serviceObj.getKey(), serviceObj);
updatingMap.put(serviceName, new Object());
updateServiceNow(serviceName, clusters);
updatingMap.remove(serviceName);
} else if (updatingMap.containsKey(serviceName)) {
if (UPDATE_HOLD_INTERVAL > 0) {
// hold a moment waiting for update finish
synchronized (serviceObj) {
try {
serviceObj.wait(UPDATE_HOLD_INTERVAL);
} catch (InterruptedException e) {
NAMING_LOGGER
.error("[getServiceInfo] serviceName:" + serviceName + ", clusters:" + clusters, e);
}
}
}
}
scheduleUpdateIfAbsent(serviceName, clusters); // 定时更新
return serviceInfoMap.get(serviceObj.getKey());
}
// ...
private ServiceInfo getServiceInfo0(String serviceName, String clusters) {
String key = ServiceInfo.getKey(serviceName, clusters);
return serviceInfoMap.get(key);
}
// ...
public void scheduleUpdateIfAbsent(String serviceName, String clusters) {
if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) {
return;
}
synchronized (futureMap) {
if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) {
return;
}
ScheduledFuture<?> future = addTask(new UpdateTask(serviceName, clusters));
futureMap.put(ServiceInfo.getKey(serviceName, clusters), future);
}
}
// ...
public synchronized ScheduledFuture<?> addTask(UpdateTask task) {
return executor.schedule(task, DEFAULT_DELAY, TimeUnit.MILLISECONDS);
}
public HostReactor(NamingProxy serverProxy, BeatReactor beatReactor, String cacheDir) {
this(serverProxy, beatReactor, cacheDir, false, false, UtilAndComs.DEFAULT_POLLING_THREAD_COUNT);
}
public HostReactor(NamingProxy serverProxy, BeatReactor beatReactor, String cacheDir, boolean loadCacheAtStart,
boolean pushEmptyProtection, int pollingThreadCount) {
// init executorService
this.executor = new ScheduledThreadPoolExecutor(pollingThreadCount, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setDaemon(true);
thread.setName("com.alibaba.nacos.client.naming.updater");
return thread;
}
});
// ...
}
}
从这里可以看到,调用方的提供方实例列表是异步更新的,有一定的时间差,一次存在这种可能。
解决
如上分析,流量基本上是因为nacos客户端更新缓存延迟造成的,如果要解决的,可以提前从nacos上下掉要重启的实例,等待一段时间后再重启
因此,可以在应用上提供一个自动下线服务的接口,在发布之前curl一下,等待一段时间后再重启应用
自动下线服务业很简单,直接利用spring cloud 提供的组件即可
参考代码如下
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.client.serviceregistry.Registration;
import org.springframework.cloud.client.serviceregistry.ServiceRegistry;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class DeregisterController {
@Autowired
private Registration registration;
@Autowired
private ServiceRegistry serviceRegistry;
@RequestMapping("/deregister")
public String deregister() {
serviceRegistry.deregister(registration);
return "ok";
}
}