spring cloud自定义loaderbalancer实现流量就近访问
背景
文旅项目处于稳定需要,目前每个服务部署了多个实例,分布在不同的结点;同时为了应对甲方的稳定性需要,接下来会上云
这样,每个服务的多个节点会跨网络,甚至是不同的运营商,这样会带来一些问题
基于现有的负载均衡策略,会出现跨网络节点互相访问,这样带来的超时会比较明显,因此需要解决这个问题
目标
实现流量的就近访问
方案
基于spring cloud loadbalancer
代码分析
先看spring-cloud-loadbalancer下spring.factories
# AutoConfiguration
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
org.springframework.cloud.loadbalancer.config.LoadBalancerAutoConfiguration,\
org.springframework.cloud.loadbalancer.config.BlockingLoadBalancerClientAutoConfiguration,\
org.springframework.cloud.loadbalancer.config.LoadBalancerCacheAutoConfiguration,\
org.springframework.cloud.loadbalancer.security.OAuth2LoadBalancerClientAutoConfiguration,\
org.springframework.cloud.loadbalancer.config.LoadBalancerStatsAutoConfiguration
重点关注org.springframework.cloud.loadbalancer.config.LoadBalancerAutoConfiguration
@Configuration(
proxyBeanMethods = false
)
@LoadBalancerClients // 重点看这里
@EnableConfigurationProperties({LoadBalancerClientsProperties.class})
@AutoConfigureBefore({ReactorLoadBalancerClientAutoConfiguration.class, LoadBalancerBeanPostProcessorAutoConfiguration.class})
@ConditionalOnProperty(
value = {"spring.cloud.loadbalancer.enabled"},
havingValue = "true",
matchIfMissing = true
)
public class LoadBalancerAutoConfiguration {
}
接着看org.springframework.cloud.loadbalancer.annotation.LoadBalancerClients
@Configuration(
proxyBeanMethods = false
)
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE})
@Documented
@Import({LoadBalancerClientConfigurationRegistrar.class})
public @interface LoadBalancerClients {
LoadBalancerClient[] value() default {};
Class<?>[] defaultConfiguration() default {};
}
通过defaultConfiguration可以指定默认配置,如果不指定,会用到org.springframework.cloud.loadbalancer.annotation.LoadBalancerClientConfiguration
@Configuration(
proxyBeanMethods = false
)
@ConditionalOnDiscoveryEnabled
public class LoadBalancerClientConfiguration {
private static final int REACTIVE_SERVICE_INSTANCE_SUPPLIER_ORDER = 193827465;
public LoadBalancerClientConfiguration() {
}
@Bean
@ConditionalOnMissingBean // 默认走这里,实际走的是轮询
public ReactorLoadBalancer<ServiceInstance> reactorServiceInstanceLoadBalancer(Environment environment, LoadBalancerClientFactory loadBalancerClientFactory) {
String name = environment.getProperty("loadbalancer.client.name");
return new RoundRobinLoadBalancer(loadBalancerClientFactory.getLazyProvider(name, ServiceInstanceListSupplier.class), name);
}
}
为什么默认会走到org.springframework.cloud.loadbalancer.annotation.LoadBalancerClientConfiguration
看org.springframework.cloud.loadbalancer.support.LoadBalancerClientFactory
public class LoadBalancerClientFactory extends NamedContextFactory<LoadBalancerClientSpecification> implements Factory<ServiceInstance> {
private static final Log log = LogFactory.getLog(LoadBalancerClientFactory.class);
public static final String NAMESPACE = "loadbalancer";
public static final String PROPERTY_NAME = "loadbalancer.client.name";
private final LoadBalancerClientsProperties properties;
/** @deprecated */
@Deprecated
public LoadBalancerClientFactory() {
this((LoadBalancerClientsProperties)null);
}
public LoadBalancerClientFactory(LoadBalancerClientsProperties properties) {
super(LoadBalancerClientConfiguration.class, "loadbalancer", "loadbalancer.client.name"); // 主要是这里
this.properties = properties;
}
// ...
}
如果还想继续往下跟,可以看org.springframework.cloud.gateway.filter.ReactiveLoadBalancerClientFilter
public class ReactiveLoadBalancerClientFilter implements GlobalFilter, Ordered {
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
URI url = (URI)exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_REQUEST_URL_ATTR);
String schemePrefix = (String)exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_SCHEME_PREFIX_ATTR);
if (url != null && ("lb".equals(url.getScheme()) || "lb".equals(schemePrefix))) {
ServerWebExchangeUtils.addOriginalRequestUrl(exchange, url);
if (log.isTraceEnabled()) {
log.trace(ReactiveLoadBalancerClientFilter.class.getSimpleName() + " url before: " + url);
}
URI requestUri = (URI)exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_REQUEST_URL_ATTR);
String serviceId = requestUri.getHost();
Set<LoadBalancerLifecycle> supportedLifecycleProcessors = LoadBalancerLifecycleValidator.getSupportedLifecycleProcessors(this.clientFactory.getInstances(serviceId, LoadBalancerLifecycle.class), RequestDataContext.class, ResponseData.class, ServiceInstance.class); // 重点在这里
DefaultRequest<RequestDataContext> lbRequest = new DefaultRequest(new RequestDataContext(new RequestData(exchange.getRequest()), this.getHint(serviceId)));
// ...
}
// ...
private Mono<Response<ServiceInstance>> choose(Request<RequestDataContext> lbRequest, String serviceId, Set<LoadBalancerLifecycle> supportedLifecycleProcessors) {
ReactorLoadBalancer<ServiceInstance> loadBalancer = (ReactorLoadBalancer)this.clientFactory.getInstance(serviceId, ReactorServiceInstanceLoadBalancer.class); // 以及这里
if (loadBalancer == null) {
throw new NotFoundException("No loadbalancer available for " + serviceId);
} else {
supportedLifecycleProcessors.forEach((lifecycle) -> {
lifecycle.onStart(lbRequest);
});
return loadBalancer.choose(lbRequest);
}
}
// ...
}
具体实施
替换默认配置
@Slf4j
@ConditionalOnDiscoveryEnabled
public class NearbyLoadBalancerConfig {
public NearbyLoadBalancerConfig() {
}
@PostConstruct
public void init() {
log.info("NearbyLoadBalancerConfig inited");
}
@Bean
@ConditionalOnMissingBean
public ReactorLoadBalancer<ServiceInstance> reactorServiceInstanceLoadBalancer(LoadBalancerClientFactory loadBalancerClientFactory, Environment environment, InetUtils inetUtils) {
String name = environment.getProperty(LoadBalancerClientFactory.PROPERTY_NAME);
return new NearbyServiceLoadBalancer(loadBalancerClientFactory.getLazyProvider(name, ServiceInstanceListSupplier.class), name, environment, inetUtils);
}
}
应用端通过@LoadBalancerClients 注解修改默认配置
@LoadBalancerClients(defaultConfiguration = NearbyLoadBalancerConfig.class)
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
log.info("......服务启动成功!");
}
}
负载策略
1、优先访问通ip节点,也可以访问同网段节点(此处未做实现)
2、如果没有通ip节点,轮询选择一个
3、开关控制,关闭之后继续走轮询
代码如下
@Slf4j
public class NearbyServiceLoadBalancer implements ReactorServiceInstanceLoadBalancer {
public static final String SWITCH_KEY = "discovery.nearby.enable"; // 开关配置key
private ObjectProvider<ServiceInstanceListSupplier> serviceInstanceListSupplierProvider;
private final String serviceId;
private Environment environment;
private final AtomicInteger position;
private final String nearbyIp; // 本机ip
private RoundRobinLoadBalancer roundRobinLoadBalancer;
public NearbyServiceLoadBalancer(ObjectProvider<ServiceInstanceListSupplier> serviceInstanceListSupplierProvider, String serviceId, Environment environment, InetUtils inetUtils) {
this.serviceInstanceListSupplierProvider = serviceInstanceListSupplierProvider;
this.serviceId = serviceId;
this.environment = environment;
this.position = new AtomicInteger(1000);
this.nearbyIp = inetUtils.findFirstNonLoopbackHostInfo().getIpAddress();
this.roundRobinLoadBalancer = new RoundRobinLoadBalancer(serviceInstanceListSupplierProvider, serviceId);
log.info("NearbyServiceLoadBalancer.construct, serviceId={}, nearbyIp={}", this.serviceId, this.nearbyIp);
}
/**
* 判断是否走就近访问策略
**/
private boolean disableNearby() {
return StringUtils.isBlank(this.nearbyIp)
|| !Boolean.TRUE.toString().equals(environment.getProperty(SWITCH_KEY));
}
@Override
public Mono<Response<ServiceInstance>> choose(Request request) {
if (disableNearby()) {
return this.roundRobinLoadBalancer.choose(request);
}
ServiceInstanceListSupplier supplier = this.serviceInstanceListSupplierProvider.getIfAvailable(NoopServiceInstanceListSupplier::new);
return supplier.get(request).next().map((serviceInstances) -> this.processInstanceResponse(supplier, serviceInstances));
}
protected Response<ServiceInstance> processInstanceResponse(ServiceInstanceListSupplier supplier, List<ServiceInstance> serviceInstances) {
Response<ServiceInstance> serviceInstanceResponse = this.getInstanceResponse(serviceInstances);
if (supplier instanceof SelectedInstanceCallback && serviceInstanceResponse.hasServer()) {
((SelectedInstanceCallback) supplier).selectedServiceInstance(serviceInstanceResponse.getServer());
}
return serviceInstanceResponse;
}
private Response<ServiceInstance> getInstanceResponse(List<ServiceInstance> instances) {
ServiceInstance instance = selectInstance(instances);
return instance == null ? new EmptyResponse() : new DefaultResponse(instance);
}
/**
* 选择最合适的节点
**/
protected ServiceInstance selectInstance(List<ServiceInstance> instances) {
if (log.isDebugEnabled()) {
log.debug("NearbyServiceLoadBalancer.selectInstance, instances={}", formatServiceInstances(instances));
}
if (CollectionUtils.isEmpty(instances)) {
return null;
}
// 只有一个候选节点,不用走策略,直接使用
if (instances.size() == 1) {
return instances.get(0);
}
ServiceInstance instance = instances.stream()
.filter(o -> this.nearbyIp.equals(o.getHost()))
.findFirst()
.orElse(null);
if (instance == null) {
int pos = Math.abs(this.position.incrementAndGet());
instance = instances.get(pos % instances.size());
}
if (log.isDebugEnabled()) {
log.debug("NearbyServiceLoadBalancer.selectInstance, instance={}", formatServiceInstance(instance));
}
return instance;
}
private String formatServiceInstances(List<ServiceInstance> serviceInstances) {
if (serviceInstances == null) {
return null;
}
if (serviceInstances.size() == 0) {
return "[]";
}
return "[" + serviceInstances.stream().map(this::formatServiceInstance).reduce((s1, s2) -> s1 + "," + s2).get() + "]";
}
private String formatServiceInstance(ServiceInstance serviceInstance) {
if (serviceInstance == null) {
return null;
}
return "{instanceId=" + serviceInstance.getInstanceId() + "}";
}
}
功能开启
discovery:
nearby:
enable: true