文章目录
前言
负载均衡在微服务层次主要解决的是服务器压力的问题,通过负载均衡算法可以将请求分摊到不同的微服务上,从而解决某台微服务因请求压力过大导致服务奔溃现象。本文就dubbo中的几种负载均衡算法进行分析,通过了解dubbo中负载均衡算法的实现,从而熟悉负载均衡算法的核心思想。
dubbo对负载均衡提供了抽象接口loadbalance,总共有五种实现,分别是:
- 一致性hash算法:consistenthashloadbalance。
- 最小活跃度算法:leastactiveloadbalance。
- 加权随机,dubbo中的默认负载均衡算法:randomloadbalance。
- 轮询算法:roundrobinloadbalance。
- 最短响应时间算法:shortestresponseloadbalance。
如果我们要在dubbo中使用自己的负载均衡算法,只需要实现该接口,重写其方法,然后再根据dubbo spi 配置好实现,在使用的时候在服务调用者端配置loadbalance属性告诉dubbo使用哪种负载均衡算法即可。
- loadbalance:负载均衡的抽象接口。
@spi(randomloadbalance.name)
public interface loadbalance {
@adaptive("loadbalance")
<t> invoker<t> select(list<invoker<t>> invokers, url url, invocation invocation) throws rpcexception;
}
- abstractloadbalance:负载均衡的抽象接口loadbalance的抽象实现。
public abstract class abstractloadbalance implements loadbalance {
static int calculatewarmupweight(int uptime, int warmup, int weight) {
int ww = (int) (math.round(math.pow((uptime / (double) warmup), 2) * weight));
return ww < 1 ? 1 : (math.min(ww, weight));
}
@override
public <t> invoker<t> select(list<invoker<t>> invokers, url url, invocation invocation) {
if (collectionutils.isempty(invokers)) {
return null;
}
if (invokers.size() == 1) {
return invokers.get(0);
}
return doselect(invokers, url, invocation);
}
protected abstract <t> invoker<t> doselect(list<invoker<t>> invokers, url url, invocation invocation);
int getweight(invoker<?> invoker, invocation invocation) {
int weight;
url url = invoker.geturl();
if (invoker instanceof clusterinvoker) {
url = ((clusterinvoker<?>) invoker).getregistryurl();
}
// multiple registry scenario, load balance among multiple registries.
if (registry_service_reference_path.equals(url.getserviceinterface())) {
weight = url.getparameter(weight_key, default_weight);
} else {
weight = url.getmethodparameter(invocation.getmethodname(), weight_key, default_weight);
if (weight > 0) {
long timestamp = invoker.geturl().getparameter(timestamp_key, 0l);
if (timestamp > 0l) {
long uptime = system.currenttimemillis() - timestamp;
if (uptime < 0) {
return 1;
}
int warmup = invoker.geturl().getparameter(warmup_key, default_warmup);
if (uptime > 0 && uptime < warmup) {
weight = calculatewarmupweight((int)uptime, warmup, weight);
}
}
}
}
return math.max(weight, 0);
}
}
一致性hash算法:consistenthashloadbalance
在分布式系统中,一致性hash算法可以做到将一部分请求固定打到指定的机器上,走到分在均衡的效果。一致性hash算法中有一个hash环的概念,在dubbo中,使用treemap来实现hash环,treemap特点是将key按照从小到大的方式进行存储。
源码分析:
protected <t> invoker<t> doselect(list<invoker<t>> invokers, url url, invocation invocation) {
string methodname = rpcutils.getmethodname(invocation);
string key = invokers.get(0).geturl().getservicekey() + "." + methodname;
// 计算整个节点集合list的hashcode,目的在于判断集合中的元素是否发生了变化。
int invokershashcode = getcorrespondinghashcode(invokers);
consistenthashselector<t> selector = (consistenthashselector<t>) selectors.get(key);
// 如果根据key取出来的selector是空的,或者hashcode已经发生了变化,就说明节点信息已经发生了变化,那么就需要重新构造hash环。
if (selector == null || selector.identityhashcode != invokershashcode) {
selectors.put(key, new consistenthashselector<t>(invokers, methodname, invokershashcode));
selector = (consistenthashselector<t>) selectors.get(key);
}
// 选择节点
return selector.select(invocation);
}
// 构造hash环
consistenthashselector(list<invoker<t>> invokers, string methodname, int identityhashcode) {
this.virtualinvokers = new treemap<long, invoker<t>>();
this.identityhashcode = identityhashcode;
url url = invokers.get(0).geturl();
this.replicanumber = url.getmethodparameter(methodname, hash_nodes, 160);
string[] index = comma_split_pattern.split(url.getmethodparameter(methodname, hash_arguments, "0"));
argumentindex = new int[index.length];
for (int i = 0; i < index.length; i++) {
argumentindex[i] = integer.parseint(index[i]);
}
// 循环所有的节点信息,构造hash环。
for (invoker<t> invoker : invokers) {
string address = invoker.geturl().getaddress();
// replicanumber默认为160,这里会根据每一个节点的ip生成40个虚拟节点。
for (int i = 0; i < replicanumber / 4; i++) {
byte[] digest = bytes.getmd5(address + i);
// 为每一个虚拟节点生成四个存储位置
for (int h = 0; h < 4; h++) {
long m = hash(digest, h);
// key就是当前的节点的ip计算出来的hash值,value就是当前循环的节点。
virtualinvokers.put(m, invoker);
}
}
totalrequestcount = new atomiclong(0);
servercount = invokers.size();
errequestcountmap.clear();
}
// 最终调用这个方法进行选择节点
private invoker<t> selectforkey(long hash) {
// 参数hash就是根据hash参数计算出来的下标,这里根据下标获取大于等于该hash值的最小节点信息
map.entry<long, invoker<t>> entry = virtualinvokers.ceilingentry(hash);
// 如果节点信息不存在,那么说明当前hash值就是hash环上最大的那么节点信息,
if (entry == null) {
// 那么接下来的那个下标就是hash环上的第一个元素了。
entry = virtualinvokers.firstentry();
}
// 取出节点信息的地址
string serveraddress = entry.getvalue().geturl().getaddress();
// 这里设定了一个请求线程数阈值
double overloadthread = ((double) totalrequestcount.get() / (double) servercount) * overload_ratio_thread;
// 如果当前选定的这个节点已经接收过请求并且接收的请求数超过了设定的这个线程数阈值,说明这个节点不可用
while (serverrequestcountmap.containskey(serveraddress)
&& serverrequestcountmap.get(serveraddress).get() >= overloadthread) {
// 如果这个节点不可用,就需要获取大于给定的key的最小节点信息
entry = getnextinvokernode(virtualinvokers, entry);
// 重新获取节点信息的地址
serveraddress = entry.getvalue().geturl().getaddress();
}
// 当前选定的这个节点没有接收过请求,那么就说明可用,将当前节点信息假如到serverrequestcountmap中
if (!serverrequestcountmap.containskey(serveraddress)) {
serverrequestcountmap.put(serveraddress, new atomiclong(1));
} else {
// 接收的请求数未超过设定的这个线程数阈值,那么就说明可用,将当前节点信息已经接收的请求数加1
serverrequestcountmap.get(serveraddress).incrementandget();
}
totalrequestcount.incrementandget();
return entry.getvalue();
}
总结:
- 首先计算整个服务列表的hashcode值,根据这个服务列表的hashcode值判断,是否需要重新构建hash环,构造hash环的时候,根据每一个节点的ip生成40个虚拟节点,每一个虚拟节点会存储在四个位置,虚拟节点的引入是为了解决实际节点数据较少而导致的数据倾斜问题。
- 根据请求中的指定的参数值计算出hash值,从hash环获取节点信息。
- 根据获取的节点信息,计算当前节点是否之前已经接收过请求。
- 如果接收过请求且接收过的请求数已经超过了设置的请求数阈值,那么就认当前选中的这个节点不可用,会继续选择下一个节点信息。
- 如果当前选定的这个节点没有接收过请求,那么就说明可用,将当前节点信息假如到serverrequestcountmap中,或者接收的请求数未超过设定的这个线程数阈值,那么就说明可用,将当前节点信息已经接收的请求数加1。
- 最终返回选定的这个节点。
最小活跃度:leastactiveloadbalance
根据服务处理请求的频率进行选择,频率越低越符合条件。
源码分析
protected <t> invoker<t> doselect(list<invoker<t>> invokers, url url, invocation invocation) {
// 记录服务数量
int length = invokers.size();
// 用来记录所有服务中,最小活跃度最低的那个服务的最小活跃数
int leastactive = -1;
// 具有相同最小活跃数的服务个数
int leastcount = 0;
// 具有最小活跃数的服务集合
int[] leastindexes = new int[length];
// 每一个服务的权重集合
int[] weights = new int[length];
// 所有服务总权重和
int totalweight = 0;
// 第一个最小活跃数的服务的权重,类似于选中了一个标准,之后用这个标准和每一个服务的权重进行比较,用来判断是否所有的服务的具有相同的权重。
int firstweight = 0;
// 标志是否所有的服务的具有相同的权重,默认为true
boolean sameweight = true;
// 循环所有的服务
for (int i = 0; i < length; i++) {
invoker<t> invoker = invokers.get(i);
// 获取当前服务的活跃数
int active = rpcstatus.getstatus(invoker.geturl(), invocation.getmethodname()).getactive();
// 获取当前服务的权重,默认为100
int afterwarmup = getweight(invoker, invocation);
// 将当前服务的权重按照序号加入到weights数组中。
weights[i] = afterwarmup;
// 如果当前服务是第一个服务或者当前服务的活跃数比之前记录的最小活跃数还小
if (leastactive == -1 || active < leastactive) {
// 当前服务的活跃数为最小活跃数
leastactive = active;
// 具有相同最小活跃数的服务个数,也就是当前服务器的个数
leastcount = 1;
// 将当前服务按照序号加入到具有相同最小活跃数的服务集合中
leastindexes[0] = i;
// 当前服务的权重就是权重之和
totalweight = afterwarmup;
// 当前服务的权重就是第一个最小活跃数的服务的权重
firstweight = afterwarmup;
// 这种情况下,所有的服务的权重都是一样的。
sameweight = true;
// 如果当前服务的活跃数和之前记录的最小活跃数是一样的
} else if (active == leastactive) {
// 当前服务加入到具有最小活跃数的服务集合中,具有相同最小活跃数的服务个数加1
leastindexes[leastcount++] = i;
// 总权重加上当前服务的权重
totalweight += afterwarmup;
// 如果所有的服务的权重都是一样的,并且当前服务的权重和第一个最小活跃数的服务的权重不相等
if (sameweight && afterwarmup != firstweight) {
// 说明所有的服务的权重不一样的
sameweight = false;
}
}
}
// 如果上边的所有的流程完了之后,只有一个最小活跃数的服务
if (leastcount == 1) {
// 那么就直接将这个服务返回即可,这个服务就是目前最小活跃数的服务
return invokers.get(leastindexes[0]);
}
// 如果所有的服务的权重不一样,并且总权重之和大于0
if (!sameweight && totalweight > 0) {
// 随机从总权重中获取一个数
int offsetweight = threadlocalrandom.current().nextint(totalweight);
// 循环具有相同最小活跃数的服务集合
for (int i = 0; i < leastcount; i++) {
int leastindex = leastindexes[i];
// 从权重集合中获取当前服务的权重,并用随机权重减去这个服务的权重
offsetweight -= weights[leastindex];
// 如果随机权重减去这个服务的权重之后小于0,那么就表明这个服务就是符合条件的服务。
if (offsetweight < 0) {
return invokers.get(leastindex);
}
}
}
// 如果所有服务的权重都是一样的,并且总权重为0,那么随机选取一个服务即可
return invokers.get(leastindexes[threadlocalrandom.current().nextint(leastcount)]);
}
总结:
- 从所有的服务中找最小活跃数相同的服务,记录这些服务的权重集合,服务个数,并判断出是否所有的服务都具有相同的权重。
- 如果所有的服务都具有相同的权重,那么直接从找出来的这些最小活跃数相同的服务中随机返回一个服务即可。
- 如果所有的服务权重不相同,并且最小活跃数相同的服务只有一个,那么直接返回这个服务即可。
- 如果所有的服务权重不相同,并且最小活跃数相同的服务有多个,那么根据总权重获取一个随机数,之后和找出来的每一个最小活跃数相同的服务的权重相减,得到的值小于0,那么当前这个服务就是符合条件的服务,返回即可。
加权随机:randomloadbalance(dubbo中的默认负载均衡算法)
加权随机算法是dubbo中默认的负载均衡算法,使用权重随机从服务列表中获取服务。
源码分析
protected <t> invoker<t> doselect(list<invoker<t>> invokers, url url, invocation invocation) {
// 获取远程服务的个数
int length = invokers.size();
// 这是一个标识,标识所有的远程服务的权重是否都是一样的,后续再遍历所有的远程服务的时候,只要有一个远程服务的权重和其他的不一致,该标志就会被改为false,默认为true。
boolean sameweight = true;
// 用来存放各个远程服务的权重
int[] weights = new int[length];
// 总权重,各个远程服务的权重之和。
int totalweight = 0;
// 循环所有的远程服务
for (int i = 0; i < length; i++) {
// 获取当前远程服务的权重
int weight = getweight(invokers.get(i), invocation);
// 计算所有的远程服务权重之和
totalweight += weight;
// 保存当前远程服务的权重
weights[i] = totalweight;
// 旧版本的是注释的这种写法,容易理解点:就是当前远程服务的权重和上一个远程服务的权重进行比较,如果不一样,就将sameweight改为false,新版的这种写法不容易理解。
/**
if (sameweight && i > 0
&& weight != getweight(invokers.get(i - 1), invocation)) {
sameweight = false;
}
**/
if (sameweight && totalweight != weight * (i + 1)) {
sameweight = false;
}
}
// 如果总权重 > 0 并且所有的远程服务的权重都不一样
if (totalweight > 0 && !sameweight) {
// 随机从总权重中获取一个数字。
int offset = threadlocalrandom.current().nextint(totalweight);
// 再次循环所有的远程服务集合
for (int i = 0; i < length; i++) {
// 如果随机数小于当前远程服务的权重,那么随机数刚好落在当前远程服务所占有的权重区间。
if (offset < weights[i]) {
return invokers.get(i);
}
}
}
// 如果总权重 <= 0 或者所有的远程服务的权重都一样,那么随机选取一个远程服务
return invokers.get(threadlocalrandom.current().nextint(length));
}
总结:
- 计算所有的服务节点数,并按照这个数循环遍历获取每一个节点的权重,并检查所有的节点的权重是否都一样。
- 如果所有的节点的权重都是一样的,那么直接随机返回一个节点即可。
- 如果每一个节点的权重都不一样,那么计算节点的权重之和,并按照循环节点的顺序将各个节点的权重和缓存。
- 随机产生一个权重之和之内的随机数据,并循环权重和缓存,随机数刚好小于的权重和的那个下标就是当前选中的节点。
轮询算法:roundrobinloadbalance
轮询算法的思想就是每次都选取一个服务,这个服务必须和上次选择的不一致。
源码分析:
protected <t> invoker<t> doselect(list<invoker<t>> invokers, url url, invocation invocation) {
// key就是接口全限定名.方法名
string key = invokers.get(0).geturl().getservicekey() + "." + invocation.getmethodname();
// 处理roundrobinloadbalance自身的缓存,key为接口全限定名.方法名,value为mao<每一个服务的标识identifystring,封装的weightedroundrobin>
concurrentmap<string, weightedroundrobin> map = methodweightmap.computeifabsent(key, k -> new concurrenthashmap<>());
// 总权重
int totalweight = 0;
// 设置的最大值,默认为long的最小值,为负数
long maxcurrent = long.min_value;
long now = system.currenttimemillis();
// 选中的服务
invoker<t> selectedinvoker = null;
// 选中服务对应的weightedroundrobin信息
weightedroundrobin selectedwrr = null;
// 开始循环所有服务
for (invoker<t> invoker : invokers) {
// 获取当前服务的标识
string identifystring = invoker.geturl().toidentitystring();
// 获取当前服务的权重,默认为100
int weight = getweight(invoker, invocation);
// 处理缓存,如果存在当前服务的weightedroundrobin信息,就返回当前服务的weightedroundrobin,如果没有,就构造一个weightedroundrobin,并将当前服务的权重设置进去。
weightedroundrobin weightedroundrobin = map.computeifabsent(identifystring, k -> {
weightedroundrobin wrr = new weightedroundrobin();
wrr.setweight(weight);
return wrr;
});
// 判断下,是不是权重发生了变化,如果发生了变化,需要重新设置
if (weight != weightedroundrobin.getweight()) {
weightedroundrobin.setweight(weight);
}
// 实际上这里获取的就是当前这个服务的对应的权重,但是这个值在后续还会处理,这个值会直接影响服务能不能被选中。
long cur = weightedroundrobin.increasecurrent();
// 设置对应的服务信息最后更新时间为当前时间
weightedroundrobin.setlastupdate(now);
// 如果cur大于maxcurrent,当前服务被选中,相当于在找最大权重的那个服务
if (cur > maxcurrent) {
// 设置cur为maxcurrent
maxcurrent = cur;
// 记录选中的服务
selectedinvoker = invoker;
// 记录选中的服务的其他信息
selectedwrr = weightedroundrobin;
}
// 记录总权重
totalweight += weight;
}
// 处理roundrobinloadbalance中自身的缓存,如果缓存中的服务信息和服务列表中的信息不一致,那么就需要从缓存中将超过1分钟还没有被更新的服务移除掉。
if (invokers.size() != map.size()) {
map.entryset().removeif(item -> now - item.getvalue().getlastupdate() > recycle_period);
}
// 如果选中的服务不为空
if (selectedinvoker != null) {
// 这里会将当前选中的服务信息的cur设置成一个负数,也就是设置成所有权重的之和的负数,那么这个服务的cur就是最小的一个了,目的是为了保证下一次不会再次选中该服务
selectedwrr.sel(totalweight);
return selectedinvoker;
}
// 如果上述流程之后,还未选中服务,那么直接就返回第一个服务。
return invokers.get(0);
}
总结:
- roundrobinloadbalance会自身缓存一个服务列表选取信息,当真正的服务列表信息有变动的时候,就更新自身的缓存信息,即将缓存信息中超过1分钟还未被更新服务信息的服务删除。
- 每一次调用都去服务列表中找最大权重的服务,如果找到了就记录,如果没有找到,就直接返回第一个服务。
- 循环服务列表的时候,首先都会更新服务时间,并设置一个值cur,这个值很重要,会直接决定当前服务能否被选中,选种的服务后续会将这个cur值,设置成一个负数,以保证下一次轮询,不会再次被选中。
最短响应时间算法:shortestresponseloadbalance
源码分析:
protected <t> invoker<t> doselect(list<invoker<t>> invokers, url url, invocation invocation) {
// 获取远程服务的个数
int length = invokers.size();
// 最大的响应时间,默认为long的最大值
long shortestresponse = long.max_value;
// 具有相同最短响应时间的服务的个数
int shortestcount = 0;
// 具有相同最短响应时间的服务的集合
int[] shortestindexes = new int[length];
// 每一个服务的权重集合
int[] weights = new int[length];
// 所有服务总权重和
int totalweight = 0;
// 第一个最短响应时间的服务的权重,类似于选中了一个标准,之后用这个标准和每一个服务的权重进行比较,用来判断是否所有的服务的具有相同的权重。
int firstweight = 0;
// 标志是否所有的服务的具有相同的权重,默认为true
boolean sameweight = true;
// 循环所有的服务
for (int i = 0; i < length; i++) {
invoker<t> invoker = invokers.get(i);
// 获取当前服务的rpc信息
rpcstatus rpcstatus = rpcstatus.getstatus(invoker.geturl(), invocation.getmethodname());
// 获取当前服务成功响应所花费的平均响应时间
long succeededaverageelapsed = rpcstatus.getsucceededaverageelapsed();
// 获取当前服务的活跃数,加1的目的就是算上本次请求。
int active = rpcstatus.getactive() + 1;
// 计算处理完这些请求需要花费的总体响应时间
long estimateresponse = succeededaverageelapsed * active;
// 获取当前服务的权重
int afterwarmup = getweight(invoker, invocation);
// 按照序号记录权重
weights[i] = afterwarmup;
// 如果当前服务处理完这些请求需要花费的总体响应时间小于设置最大的响应时间,这里实际上就是在找最小响应时间的服务。
if (estimateresponse < shortestresponse) {
// 设置最大的响应时间为当前服务的总体响应时间
shortestresponse = estimateresponse;
// 最短响应时间的数量为1,也就是本服务一个
shortestcount = 1;
// 记录最短响应时间的这个服务
shortestindexes[0] = i;
// 总权重就是当前这个服务的权重
totalweight = afterwarmup;
// 当前服务的权重就是第一个最短响应时间的服务的权重
firstweight = afterwarmup;
// 这种情况下,所有的服务的权重都是一样的。
sameweight = true;
// 如果当前服务的最短响应时间和之前记录的最短响应时间是一样的
} else if (estimateresponse == shortestresponse) {
// 当前服务加入到具有最短响应时间的服务集合中,具有相同最短响应时间的服务个数加1
shortestindexes[shortestcount++] = i;
// 总权重加上当前服务的权重
totalweight += afterwarmup;
// 如果所有的服务的权重都是一样的,并且当前服务不是列表中第一个服务,并且当前服务的权重和第一个最短响应时间的服务的权重不相等
if (sameweight && i > 0
&& afterwarmup != firstweight) {
// 说明所有的服务的权重不一样的
sameweight = false;
}
}
}
// 如果上边的所有的流程完了之后,只有一个最短响应时间的服务
if (shortestcount == 1) {
// 那么就直接将这个服务返回即可,这个服务就是目前最短响应时间的服务
return invokers.get(shortestindexes[0]);
}
// 如果所有的服务的权重不一样,并且总权重之和大于0
if (!sameweight && totalweight > 0) {
// 随机从总权重中获取一个数
int offsetweight = threadlocalrandom.current().nextint(totalweight);
// 循环具有相同最短响应时间的服务集合
for (int i = 0; i < shortestcount; i++) {
int shortestindex = shortestindexes[i];
// 从权重集合中获取当前服务的权重,并用随机权重减去这个服务的权重
offsetweight -= weights[shortestindex];
// 如果随机权重减去这个服务的权重之后小于0,那么就表明这个服务就是符合条件的服务。
if (offsetweight < 0) {
return invokers.get(shortestindex);
}
}
}
// 如果所有服务的权重都是一样的,并且总权重为0,那么随机选取一个服务即可
return invokers.get(shortestindexes[threadlocalrandom.current().nextint(shortestcount)]);
}
总结:
- 从所有的服务中找最短响应时间相同的服务,记录这些服务的权重集合,服务个数,并判断出是否所有的服务都具有相同的权重。
- 如果所有的服务都具有相同的权重,那么直接从找出来的这些最短响应时间相同的服务中随机返回一个服务即可。
- 如果所有的服务权重不相同,并且最短响应时间相同的服务只有一个,那么直接返回这个服务即可。
- 如果所有的服务权重不相同,并且最短响应时间相同的服务有多个,那么根据总权重获取一个随机数,之后和找出来的每一个最短响应时间相同的服务的权重相减,得到的值小于0,那么当前这个服务就是符合条件的服务,返回即可。
发表评论