SpringCloud踩坑记(四)SpringCloud负载均衡器Ribbo

前言

先前我们已经了解Spring Cloud使用Eureka作为注册中心进行注册服务,
那我们注册完的服务如何进行消费呢?Spring Cloud本身结合了俩种方式进行
服务消费。

  1. 负载均衡器Ribbon+RestTemplate
  2. 声明式的HTTP客户端Feign

今天我们就来学习负载均衡器Ribbon如何使用及源码分析。

Ribbon 介绍

Ribbon是Netflix公司开源的一个负载均衡的组件,是将负载
均衡逻辑封装在客户端中且内部提供多种负载均衡策略可供选择,
并且运行在客户端的进程里。
Ribbon是客户端IPC库,在云端经过了实战测试。
它提供以下功能

  • 负载均衡
  • 容错能力
  • 异步和响应模型中的多种协议(HTTP,TCP,UDP)支持
  • 缓存和批处理

Ribbon的开源项目多个子模块包,但是生产中可能会用到子模块如下:

  • ribbon:集成了负载平衡、容错、缓存/批处理等功能的api
  • ribbon-core:核心代码,客户端配置api和其他共享api
  • ribbon-eureka:使用Eureka客户端为云提供动态服务器列表的api
  • ribbon-loadbalancer:可以独立使用或与其他模块一起使用的负载平衡器api
  • ribbon-transport:使用具有负载平衡功能的RxNetty传输支持HTTP、TCP和UDP协议的客户端

使用负载均衡器Ribbon+RestTemplate来消费服务

服务端注册编写见先前《Spring Cloud Eureka学习》一文内。

EurekaClientProducer1和EurekaClientProducer2模块下增加测试MessageController类,
内容如下:

@RestController
public class MessageController {@Value("${server.port}")String port;@GetMapping("/get")public String getMessage(@RequestParam("name")String name){return "Hi " + name + " ,I am from port:" + port;}}

启动EurekaServer、EurekaClientProducer1、EurekaClientProducer2服务。

编写消费客户端工程

新建EurekaClientConsumer模块
pom.xml文件如下:


<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><parent><artifactId>EurekaDemoartifactId><groupId>com.smallstepgroupId><version>1.0-SNAPSHOTversion>parent><modelVersion>4.0.0modelVersion><artifactId>EurekaClientConsumerartifactId><dependencies><dependency><groupId>org.springframework.cloudgroupId><artifactId>spring-cloud-starter-netflix-eureka-clientartifactId>dependency><dependency><groupId>org.springframework.bootgroupId><artifactId>spring-boot-starter-webartifactId>dependency><dependency><groupId>org.springframework.cloudgroupId><artifactId>spring-cloud-starter-netflix-ribbonartifactId>dependency>dependencies>
project>

编写EurekaClientConsumer启动类并注入一个RestTemplate类型并增加@LoadBalanced注解。
具体代码如下:

@SpringBootApplication
public class EurekaClientConsumerApplication {public static void main(String[] args) {SpringApplication.run(EurekaClientConsumerApplication.class, args);}@LoadBalanced //使用负载均衡机制@Beanpublic RestTemplate restTemplate(){return new RestTemplate();}
}

并增加测试MessageController类,代码如下:

@RestController
public class MessageController {@AutowiredRestTemplate restTemplate;@GetMapping("/show")public String showMessage(@RequestParam String name){//producer 为提供的服务注入到eureka的名称return restTemplate.getForObject("http://producer/get?name="+name, String.class);}
}

其中producer为注册到Eureka的serviceId

application.yml文件内容如下:

server:port: 8600spring:application:name: consumereureka:client:serviceUrl:defaultZone: http://localhost:9000/eureka/

启动服务,访问http://127.0.0.1:8600/show?name=IT_LiGe ,显示如下

Hi IT_LiGe ,I am from port:8001

并刷新界面,端口就会变成8002,再刷新端口又会变成8001,这样就实现负载均衡消费服务。

源码分析

为什么在RestTemplate上加入个@LoadBalanced注解
就可以实现负载均衡消费Eureka上注册的服务?

因此首先查看spring-cloud-netflix-ribbon.jar下META-INF/spring.factories,
内容如下:

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
org.springframework.cloud.netflix.ribbon.RibbonAutoConfiguration

根据内容得知增加了自动配置类RibbonAutoConfiguration,自动配置类内
又引入了LoadBalancerAutoConfiguration配置类型。配置类关键代码如下:

@Configuration
@ConditionalOnClass({RestTemplate.class})
@ConditionalOnBean({LoadBalancerClient.class})
@EnableConfigurationProperties({LoadBalancerRetryProperties.class})
public class LoadBalancerAutoConfiguration {@LoadBalanced@Autowired(required = false)private List<RestTemplate> restTemplates = Collections.emptyList();...省略其他代码@Beanpublic SmartInitializingSingleton loadBalancedRestTemplateInitializerDeprecated(final ObjectProvider<List<RestTemplateCustomizer>> restTemplateCustomizers) {return () -> {restTemplateCustomizers.ifAvailable((customizers) -> {Iterator var2 = this.restTemplates.iterator();while(var2.hasNext()) {RestTemplate restTemplate = (RestTemplate)var2.next();Iterator var4 = customizers.iterator();while(var4.hasNext()) {RestTemplateCustomizer customizer = (RestTemplateCustomizer)var4.next();customizer.customize(restTemplate);}}});};}...省略其他代码@Configuration@ConditionalOnMissingClass({"org.springframework.retry.support.RetryTemplate"})static class LoadBalancerInterceptorConfig {LoadBalancerInterceptorConfig() {}@Beanpublic LoadBalancerInterceptor ribbonInterceptor(LoadBalancerClient loadBalancerClient, LoadBalancerRequestFactory requestFactory) {return new LoadBalancerInterceptor(loadBalancerClient, requestFactory);}@Bean@ConditionalOnMissingBeanpublic RestTemplateCustomizer restTemplateCustomizer(final LoadBalancerInterceptor loadBalancerInterceptor) {return (restTemplate) -> {List<ClientHttpRequestInterceptor> list = new ArrayList(restTemplate.getInterceptors());list.add(loadBalancerInterceptor);restTemplate.setInterceptors(list);};}}
}

其中定义了一个RestTemplate的集合,只要有RestTemplate类型的对象实例
并且实例上增加LoadBalanced就会自动注入到这里。如果RestTemplate的对象
实例上未增加LoadBalanced注解是不会增加到这。原因是因为LoadBalanced内有增加了
Qualifier注解进行了限定。

接着在SmartInitializingSingleton的对象实例化时对RestTemplate的列表内的
每个实例增加LoadBalancerInterceptor拦截器。LoadBalancerInterceptor拦截器内容如下:

public class LoadBalancerInterceptor implements ClientHttpRequestInterceptor {private LoadBalancerClient loadBalancer;private LoadBalancerRequestFactory requestFactory;public LoadBalancerInterceptor(LoadBalancerClient loadBalancer, LoadBalancerRequestFactory requestFactory) {this.loadBalancer = loadBalancer;this.requestFactory = requestFactory;}public LoadBalancerInterceptor(LoadBalancerClient loadBalancer) {this(loadBalancer, new LoadBalancerRequestFactory(loadBalancer));}public ClientHttpResponse intercept(final HttpRequest request, final byte[] body, final ClientHttpRequestExecution execution) throws IOException {URI originalUri = request.getURI();String serviceName = originalUri.getHost();Assert.state(serviceName != null, "Request URI does not contain a valid hostname: " + originalUri);return (ClientHttpResponse)this.loadBalancer.execute(serviceName, this.requestFactory.createRequest(request, body, execution));}
}

最终请求交给LoadBalancerClient进行处理。

再来看看LoadBalancerClient如何处理?

LoadBalancerClient接口定义了三个方法,内容如下:

public interface LoadBalancerClient extends ServiceInstanceChooser {<T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException;<T> T execute(String serviceId, ServiceInstance serviceInstance, LoadBalancerRequest<T> request) throws IOException;URI reconstructURI(ServiceInstance instance, URI original);
}

其中2个execute()的方法为执行方法,reconstructURI()方法为重构url。
所继承的ServiceInstanceChooser接口有一个方法为choose()是用来选择服务。

最终实现类为RibbonLoadBalancerClient。其中执行execute()方法内容如下:

public <T> T execute(String serviceId, LoadBalancerRequest<T> request, Object hint) throws IOException {ILoadBalancer loadBalancer = this.getLoadBalancer(serviceId);Server server = this.getServer(loadBalancer, hint);if (server == null) {throw new IllegalStateException("No instances available for " + serviceId);} else {RibbonLoadBalancerClient.RibbonServer ribbonServer = new RibbonLoadBalancerClient.RibbonServer(serviceId, server, this.isSecure(server, serviceId), this.serverIntrospector(serviceId).getMetadata(server));return this.execute(serviceId, (ServiceInstance)ribbonServer, (LoadBalancerRequest)request);}}

其中通过getServer来获取服务实例,再跟踪源码可以看到最终交给ILoadBalancer类来选择服务实例,然后处理。

ILoadBalancer接口源码如下:

package com.netflix.loadbalancer;import java.util.List;public interface ILoadBalancer {void addServers(List<Server> var1);Server chooseServer(Object var1);void markServerDown(Server var1);/** @deprecated */@DeprecatedList<Server> getServerList(boolean var1);List<Server> getReachableServers();List<Server> getAllServers();
}

其中addServers方法为添加Server集合,chooseServer方法是根据key值去获取Server,
markServerDown是标记服务下线,getReachableServers获取可用服务集合,getAllServers获取所有服务集合。
getServerList方法已弃用,参数是true为获取可用服务集合否则获取所有服务集合。

ILoadBalancer的实现类关系如下:

2019122001

通过查看RibbonClientConfiguration类内,默认实现的ILoadBalancer为ZoneAwareLoadBalancer,具体代码如下:

@Bean@ConditionalOnMissingBeanpublic ILoadBalancer ribbonLoadBalancer(IClientConfig config, ServerList<Server> serverList, ServerListFilter<Server> serverListFilter, IRule rule, IPing ping, ServerListUpdater serverListUpdater) {return (ILoadBalancer)(this.propertiesFactory.isSet(ILoadBalancer.class, this.name) ? (ILoadBalancer)this.propertiesFactory.get(ILoadBalancer.class, config, this.name) : new ZoneAwareLoadBalancer(config, rule, ping, serverList, serverListFilter, serverListUpdater));}

接着查看ZoneAwareLoadBalancer实现的chooseServer方法最终默认是调用super.chooseServer,具体实现是在
BaseLoadBalancer类内,最终是交给IRule去选择对应的服务。

Ribbo内默认IRule实现类主要有以下几种:

  • RoundRobinRule: (默认)轮询,依次执行每个执行一次
  • RandomRule: 随机选择一个
  • BestAvailableRule: 选择最小请求数
  • WeightedResponseTirneRule: 根据响应时间去分配weight,weight越低,被选
    择的可能性就越低
  • RetryRule: 对选定的负载均衡策略机上重试机制,在一个配置时间段内当选择server不成功,则一直尝试使用subRule的方式选择一个可用的server
  • ZoneAvoidanceRule: 复合判断server所在区域的性能和server的可用性选择server
  • ResponseTimeWeightedRule: 作用同WeightedResponseTimeRule,二者作用是一样的,ResponseTimeWeightedRule后来改名为WeightedResponseTimeRule

选择具体的Server之后,再包装成RibbonServer对象,之前返回的server是该对象中的一个字段,除此之外,还有服务名serviceId,是否需要使用https等信息。最后,通过LoadBalancerRequest的apply方法,向具体的server发请求,从而实现了负载均衡。

在请求时,传入的ribbonServer对象,被当成ServiceInstance类型的对象进行接收。ServiceInstance是一个接口,定义了服务治理系统中,每个实例需要提供的信息,比如serviceId,host,port等。
LoadBalancerRequest是一个接口,最终会通过实现类的apply方法去执行,实现类是在LoadBalancerInterceptor中调用RibbonLoadBalancerClient的execute方法时,传进来的一个匿名类,可以通过查看LoadBalancerInterceptor的代码看到。
创建LoadBalancerRequest匿名类的时候,就重写了apply方法,apply方法中,还新建了一个ServiceRequestWrapper的内部类,这个类中,就重写了getURI方法,getURI方法会调用loadBalancer的reconstructURI方法来构建uri

public class LoadBalancerRequestFactory {public LoadBalancerRequest<ClientHttpResponse> createRequest(final HttpRequest request, final byte[] body, final ClientHttpRequestExecution execution) {return (instance) -> {HttpRequest serviceRequest = new ServiceRequestWrapper(request, instance, this.loadBalancer);LoadBalancerRequestTransformer transformer;if (this.transformers != null) {for(Iterator var6 = this.transformers.iterator(); var6.hasNext(); serviceRequest = transformer.transformRequest((HttpRequest)serviceRequest, instance)) {transformer = (LoadBalancerRequestTransformer)var6.next();}}return execution.execute((HttpRequest)serviceRequest, body);};}
}public class ServiceRequestWrapper extends HttpRequestWrapper {private final ServiceInstance instance;private final LoadBalancerClient loadBalancer;public ServiceRequestWrapper(HttpRequest request, ServiceInstance instance, LoadBalancerClient loadBalancer) {super(request);this.instance = instance;this.loadBalancer = loadBalancer;}public URI getURI() {URI uri = this.loadBalancer.reconstructURI(this.instance, this.getRequest().getURI());return uri;}
}

自定义负载均衡策略

编写自定义规则继承AbstractLoadBalancerRule,代码如下:

@Component
public class MyRule extends AbstractLoadBalancerRule {private int times = 0;public MyRule(){}@Overridepublic void initWithNiwsConfig(IClientConfig iClientConfig) {}@Overridepublic Server choose(Object key) {return this.choose(this.getLoadBalancer(), key);}public Server choose(ILoadBalancer lb, Object key) {List<Server> upList = lb.getReachableServers();if (times <= 5) {times ++;return upList.get(0);} else {return upList.get(1);}}
}

修改启动类,增加@RibbonClient注解,修改后如下:

@SpringBootApplication
@RibbonClient(name = "producer", configuration = MyRule.class)
public class EurekaClientConsumerApplication {public static void main(String[] args) {SpringApplication.run(EurekaClientConsumerApplication.class, args);}@LoadBalanced //使用负载均衡机制@Beanpublic RestTemplate restTemplate(){return new RestTemplate();}
}

其中name为对应的serverId。配置完成,启动服务。
访问 http://127.0.0.1:8600/show?name=IT_LiGe ,刷新访问6次后显示的端口会发生变化。

附录

源代码地址:https://gitee.com/LeeJunProject/spring_cloud_learning/tree/master/eureka/EurekaDemo

END

欢迎扫描下图关注公众号 IT李哥,公众号经常推送一些优质的技术文章

在这里插入图片描述


本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!

相关文章

立即
投稿

微信公众账号

微信扫一扫加关注

返回
顶部