Ribbon – 负载均衡流程

Ribbon – 初始化中提到了, @LoadBalanced 注解的 RestTemplate 会注入拦截器 LoadBalancerInterceptor ,我们看看 LoadBalancerInterceptor 是怎么做的。

LoadBalancerInterceptor#intercept

这里主要是通过URL把serviceId取出来,然后调用 LoadBalancerClientexecute 方法。

@Override
public ClientHttpResponse intercept(final HttpRequest request, final byte[] body,
final ClientHttpRequestExecution execution) throws IOException {
final URI originalUri = request.getURI();
// 这个就是我们注册到注册中心的服务名称
String serviceName = originalUri.getHost();
Assert.state(serviceName != null,
"Request URI does not contain a valid hostname: " + originalUri);
return this.loadBalancer.execute(serviceName,
this.requestFactory.createRequest(request, body, execution));
}

RibbonLoadBalancerClient#execute

主要是获取 ILoadBalancerServer ,通过Server进行远程调用。这个server已经是有对应的IP和端口了。

public <T> T execute(String serviceId, LoadBalancerRequest<T> request, Object hint)
throws IOException {
// 获取ILoadBalancer
ILoadBalancer loadBalancer = getLoadBalancer(serviceId);
// 通过ILoadBalancer获取Server
Server server = getServer(loadBalancer, hint);
if (server == null) {
throw new IllegalStateException("No instances available for " + serviceId);
}
// 远程调用
RibbonServer ribbonServer = new RibbonServer(serviceId, server,
isSecure(server, serviceId),
serverIntrospector(serviceId).getMetadata(server));
return execute(serviceId, ribbonServer, request);
}

RibbonLoadBalancerClient#getLoadBalancer会调用SpringClientFactory#getLoadBalancer,SpringClientFactory的注入可以看上一章。SpringClientFactory#getLoadBalancer调用父类的NamedContextFactory#getInstance方法,NamedContextFactory#getInstance会判断之前是否创建了serviceId对应的AnnotationConfigApplicationContext,如果没有则创建。

NamedContextFactory#getContext

这里关注是createContext方法,他注册了RibbonEurekaAutoConfiguration和RibbonClientConfiguration,注册后就调用refresh方法。这两个值是怎么来的,可以参考上一章SpringClientFactory的创建。这两个有先后顺序,所以会先加载RibbonEurekaAutoConfiguration后加载RibbonEurekaAutoConfiguration,而IPing、ServerList都有 @ConditionalOnMissingBean 注解,所以优先实例化RibbonEurekaAutoConfiguration的IPing、ServerList。

protected AnnotationConfigApplicationContext getContext(String name) {
// 如果没有,则调用createContext创建
if (!this.contexts.containsKey(name)) {
synchronized (this.contexts) {
if (!this.contexts.containsKey(name)) {
this.contexts.put(name, createContext(name));
}
}
}
//返回
return this.contexts.get(name);
}
protected AnnotationConfigApplicationContext createContext(String name) {
AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext();
if (this.configurations.containsKey(name)) {
for (Class<?> configuration : this.configurations.get(name)
.getConfiguration()) {
context.register(configuration);
}
}
// 注册org.springframework.cloud.netflix.ribbon.eureka.RibbonEurekaAutoConfiguration
for (Map.Entry<String, C> entry : this.configurations.entrySet()) {
if (entry.getKey().startsWith("default.")) {
for (Class<?> configuration : entry.getValue().getConfiguration()) {
context.register(configuration);
}
}
}
// 注册org.springframework.cloud.netflix.ribbon.RibbonClientConfiguration
context.register(PropertyPlaceholderAutoConfiguration.class,
this.defaultConfigType);
context.getEnvironment().getPropertySources().addFirst(new MapPropertySource(
this.propertySourceName,
Collections.<String, Object>singletonMap(this.propertyName, name)));
if (this.parent != null) {
// Uses Environment from parent as well as beans
context.setParent(this.parent);
// jdk11 issue
// https://github.com/spring-cloud/spring-cloud-netflix/issues/3101
context.setClassLoader(this.parent.getClassLoader());
}
context.setDisplayName(generateDisplayName(name));
context.refresh();
return context;
}

refresh加载比较重要的有以下几个,代码就不贴了,这几个类就是在上面注册的RibbonEurekaAutoConfiguration和RibbonClientConfiguration里面。

  1. 加载IPing,检查服务存活(NIWSDiscoveryPing)
  2. 加载ServerList<Server>:服务实例列表,(DomainExtractingServerList)
  3. 加载IClientConfig(DefaultClientConfigImpl)
  4. 加载IRule:负载均衡规则,(ZoneAvoidanceRule)
  5. 加载ServerListUpdater,服务实例更新列表,(PollingServerListUpdater)
  6. 加载ServerListFilter<Server>,服务实例过滤列表,(ZonePreferenceServerListFilter)
  7. 加载ILoadBalancer:用于选择server,(ZoneAwareLoadBalancer)

这几个bean加载的时候,都有类似以下的判断,这个主要是判断是否有自定义配置,如果没有,则取默认,有就取自定义的。

if (this.propertiesFactory.isSet(ILoadBalancer.class, name)) {
return this.propertiesFactory.get(ILoadBalancer.class, config, name);
}

下面主要看ZoneAwareLoadBalancer的构造函数,他会调用父类的BaseLoadBalancer#initWithConfig方法和DynamicServerListLoadBalancer#restOfInit方法。

BaseLoadBalancer#initWithConfig

这个方法主要是开启定时任务ping,也就是检查其他服务是否是存活的。

void initWithConfig(IClientConfig clientConfig, IRule rule, IPing ping, LoadBalancerStats stats) {
// 其他略
// 设置ping的时间,默认30秒,也就是说服务挂了最多30秒就发现了
setPingInterval(pingIntervalTime);
// 设置最大次数
setMaxTotalPingTime(maxTotalPingTime);
// 设置IRule,这里会把当前的ILoadBalancer赋值给IRule
setRule(rule);
// 开启定时任务ping
setPing(ping);
// 其他略
}

定时任务的调用过程简略为:PingTask#run–>BaseLoadBalance.Pinger#runPinger–>BaseLoadBalance.SerialPingStrategy#pingServers–>NIWSDiscoveryPing#isAlive,NIWSDiscoveryPing#isAlive是判断。

public void runPinger() throws Exception {
// 其他略
// 通过BaseLoadBalance.SerialPingStrategy#pingServers调用NIWSDiscoveryPing#isAlive,
// 主要是判断服务的状态是不是UP,如果是UP,就是存活。
results = pingerStrategy.pingServers(ping, allServers);
final List<Server> newUpList = new ArrayList<Server>();
final List<Server> changedServers = new ArrayList<Server>();
// 后面代码主要是把存活的存入到newUpList再到upServerList,把变化的存入changedServers,并调用监听通知
// 其他略
}

DynamicServerListLoadBalancer#restOfInit

这个方法主要有两个功能,一个是调用DynamicServerListLoadBalancer#enableAndInitLearnNewServersFeature开启定时任务获取Eureka的注册表,一个是调用DynamicServerListLoadBalancer#updateAllServerList方法获取Eureka的注册表。所以主要看看DynamicServerListLoadBalancer#updateListOfServers。

public void updateListOfServers() {
List<T> servers = new ArrayList<T>();
if (serverListImpl != null) {
// 获取Eureka的注册表
servers = serverListImpl.getUpdatedListOfServers();
if (filter != null) {
// 过滤zone为defaultZone
servers = filter.getFilteredListOfServers(servers);
LOGGER.debug("Filtered List of Servers for {} obtained from Discovery client: {}",
getIdentifier(), servers);
}
}
// 赋值allServerList
updateAllServerList(servers);
}

RibbonLoadBalancerClient#getServer

我们回到RibbonLoadBalancerClient#execute来,getLoadBalancer方法已经把该加载的加载了,获取到ILoadBalancer后,我们就已经获取到被调用的服务列表了。现在就要获取某一个服务来做远程调用了,由于注入的是ZoneAvoidanceRule,所以默认的就是轮询来获取Server。获取到Server后,就可以进行远程调用了。

private int incrementAndGetModulo(int modulo) {
for (;;) {
int current = nextIndex.get();
int next = (current + 1) % modulo;
if (nextIndex.compareAndSet(current, next) && current < modulo)
return current;
}
}

总结

ribbon在调用之前,会获取到Eureka的注册信息,并开启定时任务去更新Eureka的注册信息,以及检测是否存活,默认都是30秒。调用的时候,通过serviceId,获取服务列表(此时已转为IP+端口),再通过负载均衡策略,获取某个服务进行远程调用。

SegmentFault博客
我还没有学会写个人说明!
下一篇

我一戴起来就斯文的……金丝眼镜框,是怎么做出来的?

你也可能喜欢

评论已经被关闭。

插入图片