前言
其实,“通过redis手动更新ribbon缓存来解决eureka微服务架构中服务下线感知的问题”是一种解,但不是最优解
1.痛点
上一篇文章的标题是:
通过redis手动更新ribbon缓存来解决eureka微服务架构中服务下线感知的问题
当时在文章的末尾就指出,使用redis+aop的方式有很多漏洞,只有在服务调用方发送调用请求的情况下才会触发切面中更新ribbon缓存的逻辑。如果每次在发布eureka新服务的场景下,告警的接口都能准确定位到,那将这些接口方法通过切面去针对性的加上更新ribbon缓存的前置操作完全是没问题的。但是如果告警接口数量众多,并且无法定位,上述方法就有些不够看了。
2.解决方案
于是,基于此种困境,我想到了用mq的事件驱动模式来推进ribbon缓存更新(“下线”这一事件驱动,而不是“发送跨服务调用请求”这一事件),具体如下:
即,当服务被调用方中调用了下线接口下线了指定服务,会生产消息到mq里,服务被调用方会监听这个队列去消费消息,并通过消费消息这一事件(消费下线服务端口信息)去驱动更新ribbon缓存。
说明:
在以前我觉得用mq不能做下线,压测了很多次也没成功,这本质还是没搞懂eureka-server,eureka-client,ribbon三者的关系和之间的动作,其实这个体系里有两个非常关键的点(在配置文件中设置),可以直接影响无感知下线的结果,需要动态调整:那就是要关闭eureka-server的三级缓存usereadonlyresponsecache: false
,并且缩短eureka-client端向eureka-server端拉取服务列表的时间registry-fetch-interval-seconds: 3
。可能这里大家看到去改配置有点鸡肋并且在实际场景中也不太现实,但别急,暂时先往下看,后面我会专门写一篇文章来解决这一问题
3.具体实现
3.1配置rabbitmq
1.配置rabbitmq(安装这些大家可以去看看平台比较成熟的文章)这里就不写了,我是直接在服务器上用docker容器化运行的:
在调用方与被调用方都配好mq
3.2生产下线消息
首先声明一个队列:
@configuration
@enablerabbit
public class rabbitmqconfig {
@bean
public queue thequeue() {
return new queue("server_list");
}
}
服务下线接口处,生产下线消息到mq,向这接口/service-down-list
发送get请求,传递指定的下线服务实例信息即可下线服务,即http://localhost:8081/control/service-down-list?portparams=8083
就下线了8083服务实例
@value("${eureka-server.ipaddress}")
private string ipaddress;
@value("${eureka-server.appname}")
private string appname;
@value("${diy_queue.value}")
private string queuename;
@getmapping(value = "/service-down-list")
public string offline(@requestparam list<integer> portparams) {
list<integer> successlist = new arraylist<>();
//得到服务信息
list<instanceinfo> instances = eurekaclient.getinstancesbyvipaddress(appname, false);
list<integer> serviceports = instances.stream().map(instanceinfo::getport).collect(collectors.tolist());
//去服务列表里挨个下线
okhttpclient client = new okhttpclient();
log.error("开始时间:{}", system.currenttimemillis());
portparams.parallelstream().foreach(temp -> {
if (serviceports.contains(temp)) {
string url = "http://" + ipaddress + ":" + temp + "/control/service-down";
try {
response response = client.newcall(new request.builder().url(url).build()).execute();
if (response.code() == 200) {
log.debug(temp + "服务下线成功");
successlist.add(temp);
} else {
log.debug(temp + "服务下线失败");
}
} catch (ioexception e) {
log.error(e.tostring());
}
}
});
//todo mq通知
hashmap<string, list<integer>> portinfo = new hashmap<>();
portinfo.put(appname,successlist);
rabbittemplate.convertandsend(queuename,portinfo);
return successlist + "优雅下线成功";
}
这里向mq的队列里传递了下线的服务实例端口信息
3.3更新ribbon缓存
服务调用方通过“下线“这一事件驱动ribbon缓存更新
/**
* 消费者
*/
@slf4j
@component
public class consumer {
@resource
springclientfactory springclientfactory;
@resource
clearribboncachebean clearribboncachebean;
@rabbitlistener(queues = "server_list")
public void listenworkqueue1(hashmap<string, list<integer>> message) {
log.debug("消费者1接收到消息——" + message + "时间为:" + localtime.now());
for (string key : message.keyset()) {
list<integer> value = message.get(key);
log.debug("key: " + key);
log.debug("value: " + value);
if (objectutils.isnotempty(value)) {
clearribboncachebean.clearribboncache(springclientfactory, value.tostring(), key);
}
log.debug("现在的所有服务列表:{}", springclientfactory.getloadbalancer(key).getallservers());
}
}
}
清理ribbon缓存的bean:
/**
* 手动清除ribbon缓存
*/
@configuration
@slf4j
public class clearribboncachebean {
/**
* 削减
*/
public static boolean cutdown(list<integer> ports, server index) {
return ports.contains(index.getport());
}
public void clearribboncache(springclientfactory clientfactory, string portparams,string appname) {
// 获取指定服务的负载均衡器
iloadbalancer loadbalancer = clientfactory.getloadbalancer(appname);
//在主动拉取可用列表,而不是走拦截器被动的方式——这里为什么获取可用的之后还要过滤,就是因为所谓的可用不是实时的可用而是缓存中的可用
list<server> reachableservers = loadbalancer.getreachableservers();//这里从客户端获取,会等待客户端同步三级缓存
//过滤掉已经下线的端口,符合条件端口的服务过滤出来
list<integer> portlist = stringchange.stringtolist(portparams);
list<server> ableservers = reachableservers.stream().filter(temp -> !cutdown(portlist, temp)).collect(collectors.tolist());
log.debug("可用服务列表:{}", ableservers);
// 在某个时机需要清除ribbon缓存
((baseloadbalancer) loadbalancer).setserverslist(ableservers); // 清除ribbon负载均衡器的缓存
}
3.4压测
运行项目,调用下线接口并压测来模拟一下线上场景:
此时我们调用下线接口,下线8083服务实例:
压测结果,均无异常:
观察服务实例的日志输出:
未下线的8081,8084
下线的8083
这说明,eureka服务下线感知的延迟已经完全被消除
4.优化
以上的mq还是采用简单队列的模式,即生产者生产一条消息到队列中,该消息也只能被一个消费者消费到。在微服务架构中,用户微服务肯定不只是被单方面调用,而是会被多方调用。那这就要求我们不能单纯只将消息生产到队列里,应该通过广播的模式进行消息的分发。为了更方便交换机与队列的灵活绑定,以及方便扩展,采用topic话题的模型进行消息的广播:
声明一个新队列:
@bean
public queue thequeue() {
return new queue("user-queue");
}
将生产消息的地方改为携带一个routingkey并发送到交换机中:
//todo mq通知
hashmap<string, list<integer>> portinfo = new hashmap<>();
portinfo.put(appname,successlist);
rabbittemplate.convertandsend(exchangename,"user.service-down",portinfo);// 这个队列以后可能会发user话题下的很多信息
将消费者端的消息监听器进行改造,变为监听指定话题的消息:
@rabbitlistener(bindings = @queuebinding(
value = @queue(name = "user-queue"),
exchange = @exchange(name = "user-topic", type = exchangetypes.topic),
key = "user.service-down")
)
public void listenworkqueue1(hashmap<string, list<integer>> message) {
log.debug("消费者1接收到消息——" + message + "时间为:" + localtime.now());
for (string key : message.keyset()) {
list<integer> value = message.get(key);
log.debug("key: " + key);
log.debug("value: " + value);
if (objectutils.isnotempty(value)) {
clearribboncachebean.clearribboncache(springclientfactory, value.tostring(), key);
}
log.debug("现在的所有服务列表:{}", springclientfactory.getloadbalancer(key).getallservers());
}
}
发表评论