0%

【SpringCloud】负载均衡:Ribbon

负载均衡的实现,Ribbon使用

1 Ribbon入门

1.1 Ribbon介绍

  • Ribbon是基于Netflix Ribbon实现的一套客户端,负载均衡的工具
  • 主要功能是提供客户端的软件负载均衡(Load Balancer)算法和服务调用

1.2 功能

  • 实现负载均衡(LB)
    • 负载均衡:将用户的请求平摊到多个服务器上,从而达到系统的高可用
  • 集中式LB:在服务端实现负载均衡,代表:Nginx
  • 进程内LB:在客户端实现负载均衡,也就是Ribbon

1.3 总结

  • Ribbon = 负载均衡 + RestTemplate调用

2 Ribbon使用

2.1 依赖引入

1
2
3
4
5
6
7
8
9
10
11
12
<!--一般情况引入-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-ribbon</artifactId>
</dependency>


<!--但是引入最新版的eureka的包,里面就包含了ribbon-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>

2.2 RestTemplate使用

  • getForObjectgetForEntity
    • getForObject:返回对象为响应中数据转化为的对象,基本上可以理解为Json
    • getForEntity:返回对象为ResponseEntity对象,包含了响应中一些重要信息,比如响应头、响应状态码、响应体
1
2
3
4
5
6
7
8
9
10
11
12
//getForEntity使用例子

@GetMapping("/consumer/payment/getForEntity/{id}")
public CommonResult getPayment2(@PathVariable("id") Long id) {
ResponseEntity<CommonResult> entity = restTemplate.getForEntity(PAYMENT_URL + "/payment/get/" + id, CommonResult.class);

if (entity.getStatusCode().is2xxSuccessful()){ //获取状态码,判断状态码是否200左右
return entity.getBody();
} else {
return new CommonResult(400, "请求失败!");
}
}
  • Post请求也是一样道理

2.3 Ribbon自带的负载均衡策略

  • IRule:根据特定算法从服务列表中选取一个要访问的服务
  • 所以负载均衡算法都是IRule的实现类
实现类 说明
com.netflix.loadbalancer.RoundRobinRule 轮询
com.netflix.loadbalancer.RandomRule 随机
com.netflix.loadbalancer.RetryRule 先按照轮询的策略获取服务,如果获取服务失败则在指定时间内进行重试,获取可用服务
WeightedReponseTimeRule 对轮询的拓展,响应速度越快的实例选择权重就越大,越容易被选择
BestAvaliableRule 会先过滤掉由于多次访问故障而处于断路器跳闸状态的服务,然后选择一个并发量最小的服务
AvaliablilityFilteringRule 会先过滤故障实例,再选择并发较小的实例
ZoneAvoidanceRule 默认规则,复合判断server所在区域的性能和server的可用性选择服务器

2.4 替换负载均衡策略

  • 注意:自定义配置类,不能放在@ComponentScan所扫描的当前包下以及子包,否则此自定义的配置类就会被所有的Ribbon客户端所共享,达不到特殊化定制的目的
  • 因为@SpringBootApplication此注解包含了@ComponentScan,所以主启动类同级包,以及子包都不能放自定义配置类
  • (1)创建自定义配置类
1
2
3
4
5
6
7
8
@Configuration
public class MySelfRule {

@Bean
public IRule myRule() {
return new RandomRule(); //随机算法
}
}
  • (2)修改主启动类
1
2
3
4
5
6
7
8
@SpringBootApplication
@EnableEurekaClient
@RibbonClient(name = "cloud-payment-service", configuration = MySelfRule.class) //告诉Ribbon不要使用默认策略,要使用我们配置的策略
public class OrderMain80 {
public static void main(String[] args) {
SpringApplication.run(OrderMain80.class, args);
}
}

2.5 轮询算法原理与实现

  • 轮询算法:rest接口第几次请求数 %(求余) 服务器集群总数量 = 实际调用服务器位置下标吗,每次服务重启后rest接口计数从1开始
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
//源码分析
public Server choose(ILoadBalancer lb, Object key) {
//初始化
if (lb == null) {
log.warn("no load balancer");
return null;
} else {
Server server = null;
int count = 0;

while(true) {
if (server == null && count++ < 10) {
List<Server> reachableServers = lb.getReachableServers(); //获取可到达的服务(活着的服务器)
List<Server> allServers = lb.getAllServers(); //获取所有的服务器
int upCount = reachableServers.size();
int serverCount = allServers.size();
if (upCount != 0 && serverCount != 0) { //当两种服务数量都不等于0时,才正常执行算法
int nextServerIndex = this.incrementAndGetModulo(serverCount); //通过算法获取下标
server = (Server)allServers.get(nextServerIndex); //通过下标获取需要访问的服务器
if (server == null) {
Thread.yield();
} else {
if (server.isAlive() && server.isReadyToServe()) {
return server; //返回该服务器
}

server = null;
}
continue;
}

log.warn("No up servers available from load balancer: " + lb);
return null;
}

if (count >= 10) {
log.warn("No available alive servers after 10 tries from load balancer: " + lb);
}

return server;
}
}
}

//求余获取下标算法
private int incrementAndGetModulo(int modulo) {
int current; //当前请求数
int next; //下一次下标
do {
current = this.nextServerCyclicCounter.get();
next = (current + 1) % modulo; //求余
} while(!this.nextServerCyclicCounter.compareAndSet(current, next));

return next;
}

  • 手写轮询算法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class MyLB {
//当前网络请求次数
private AtomicInteger atomicInteger = new AtomicInteger(0);

//请求数自增
private final int getAndIncrement() {
int current;
int next;

do {
current = this.atomicInteger.get();
next = current > Integer.MAX_VALUE ? 0 : current + 1; //三元运算
}while (!this.atomicInteger.compareAndSet(current, next));
log.info("**** 当前网络请求数:" + next);
return next;
}

//获取需要访问服务的实例
public ServiceInstance instance(List<ServiceInstance> serviceInstances) {
int index = getAndIncrement() % serviceInstances.size(); //求余
return serviceInstances.get(index);
}
}