1.什么是spring cloud consul?
spring cloud consul 是 spring cloud 提供的对 hashicorp consul 的支持。它是一种基于服务网格的工具,用于实现服务注册、发现、配置管理和健康检查。 主要功能包括:
- 服务注册与发现:通过 consul 的服务注册功能,spring cloud consul 可以实现微服务的动态注册和发现,简化服务间通信。
- 分布式配置管理:通过 consul 的 key/value 存储机制,提供对分布式配置的管理。
- 健康检查:支持服务实例的健康检查,确保只有健康的实例可供其他服务调用。
- 选举与分布式锁:通过 consul 的会话机制,支持分布式锁和领导选举。
spring cloud consul 的选举机制
spring cloud consul 的选举机制基于 consul 会话(session) 和 键值存储(key/value store) 实现分布式领导选举。
工作原理:
- 会话创建:
- 服务实例向 consul 创建一个会话(session),这是一个临时的、与实例绑定的对象。
- 会话带有 ttl(生存时间),需要定期续约,保持活跃状态。
- 获取锁(lock):
- 通过将一个 key 的值设置为当前会话 id,服务尝试获取该 key 的锁。
- consul 使用 cas(compare and swap)操作来确保只有一个服务实例可以成功获取锁。
- 锁定成功:
- 成功获取锁的服务实例被视为领导者(leader)。
- 其他实例会定期尝试获取锁,但只能等待当前锁被释放或超时。
- 锁释放或失效:
- 如果领导实例未能及时续约会话(例如宕机或网络中断),consul 会释放与该会话相关联的锁,其他实例可以竞争成为新的领导者。
2.环境搭建
run consul agent
docker run -d --name=dev-consul -p 8500:8500 consul
web ui
http://localhost:8500
3.代码工程
实验目标
- 使用 consul 提供的会话机制和键值存储来实现 分布式领导选举。
- 通过
@inboundchanneladapter和@serviceactivator实现周期性检查领导身份并执行领导任务。
pom.xml
<?xml version="1.0" encoding="utf-8"?>
<project xmlns="http://maven.apache.org/pom/4.0.0"
xmlns:xsi="http://www.w3.org/2001/xmlschema-instance"
xsi:schemalocation="http://maven.apache.org/pom/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactid>springcloud-demo</artifactid>
<groupid>com.et</groupid>
<version>1.0-snapshot</version>
</parent>
<modelversion>4.0.0</modelversion>
<artifactid>leaderelection</artifactid>
<properties>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupid>org.springframework.boot</groupid>
<artifactid>spring-boot-starter-web</artifactid>
</dependency>
<dependency>
<groupid>org.springframework.boot</groupid>
<artifactid>spring-boot-starter-actuator</artifactid>
</dependency>
<dependency>
<groupid>org.springframework.boot</groupid>
<artifactid>spring-boot-starter-test</artifactid>
<scope>test</scope>
</dependency>
<!-- spring cloud starter consul discovery -->
<dependency>
<groupid>org.springframework.cloud</groupid>
<artifactid>spring-cloud-starter-consul-discovery</artifactid>
</dependency>
<dependency>
<groupid>org.springframework.integration</groupid>
<artifactid>spring-integration-core</artifactid>
</dependency>
</dependencies>
</project>
leaderelectionconfig.java
package com.et;
import jakarta.annotation.predestroy;
import org.springframework.context.annotation.bean;
import org.springframework.context.annotation.configuration;
import org.springframework.http.httpentity;
import org.springframework.http.httpheaders;
import org.springframework.http.httpmethod;
import org.springframework.http.responseentity;
import org.springframework.integration.annotation.inboundchanneladapter;
import org.springframework.integration.annotation.poller;
import org.springframework.integration.annotation.serviceactivator;
import org.springframework.integration.core.messagesource;
import org.springframework.integration.support.messagebuilder;
import org.springframework.messaging.message;
import org.springframework.messaging.messagehandler;
import org.springframework.messaging.messagingexception;
import org.springframework.web.client.resttemplate;
@configuration
public class leaderelectionconfig {
private static final string leader_key = "service/leader";
private static final string consul_url = "http://localhost:8500";
private string sessionid;
@bean
@inboundchanneladapter(value = "leaderchannel", poller = @poller(fixeddelay = "5000"))
public messagesource<string> leadermessagesource() {
return () -> {
// implement logic to check if this instance is the leader
boolean isleader = checkleadership();
return messagebuilder.withpayload(isleader ? "i am the leader" : "i am not the leader").build();
};
}
@bean
@serviceactivator(inputchannel = "leaderchannel")
public messagehandler leadermessagehandler() {
return new messagehandler() {
@override
public void handlemessage(message<?> message) throws messagingexception {
system.out.println(message.getpayload());
// implement logic to perform leader-specific tasks
}
};
}
private final resttemplate resttemplate = new resttemplate();
public leaderelectionconfig() {
this.sessionid = createsession();
}
private string createsession() {
string url = consul_url + "/v1/session/create";
httpheaders headers = new httpheaders();
httpentity<string> entity = new httpentity<>("{\"name\": \"leader-election-session\"}", headers);
//responseentity<string> response = resttemplate.postforentity(url, entity, string.class);
// put
responseentity<string> response = resttemplate.exchange(url, httpmethod.put, entity, string.class);
// extract session id from response
return response.getbody().split("\"")[3]; // this is a simple way to extract the session id
}
public boolean checkleadership() {
string url = consul_url + "/v1/kv/" + leader_key + "?acquire=" + sessionid;
httpheaders headers = new httpheaders();
httpentity<string> entity = new httpentity<>(headers);
responseentity<boolean> response = resttemplate.exchange(url, httpmethod.put, entity, boolean.class);
return boolean.true.equals(response.getbody());
}
public void releaseleadership() {
string url = consul_url + "/v1/kv/" + leader_key + "?release=" + sessionid;
httpheaders headers = new httpheaders();
httpentity<string> entity = new httpentity<>(headers);
responseentity<boolean> response = resttemplate.exchange(url, httpmethod.put, entity, boolean.class);
if (boolean.true.equals(response.getbody())) {
system.out.println("released leadership successfully");
} else {
system.out.println("failed to release leadership");
}
}
@predestroy
public void onexit() {
releaseleadership();
}
}
代码解释
- 初始化:
- 启动时通过
createsession()向 consul 注册会话。
- 启动时通过
- 周期性任务:
- 每 5 秒通过
checkleadership()检查领导身份。 - 如果是领导者,执行特定任务(如打印日志、执行业务逻辑)。
- 每 5 秒通过
- 释放资源:
- 应用关闭时,通过
releaseleadership()释放锁。
- 应用关闭时,通过
leaderelectionapplication.java
package com.et;
import org.springframework.boot.springapplication;
import org.springframework.boot.autoconfigure.springbootapplication;
import org.springframework.cloud.client.discovery.enablediscoveryclient;
import org.springframework.integration.config.enableintegration;
@springbootapplication
@enablediscoveryclient
@enableintegration
public class leaderelectionapplication {
public static void main(string[] args) {
springapplication.run(leaderelectionapplication.class, args);
}
}
配置文件
node1
server.port=8081
spring.cloud.consul.discovery.enabled=true
spring.cloud.consul.discovery.register=true
spring.application.name=leader-election-example
spring.cloud.consul.host=localhost
spring.cloud.consul.port=8500
spring.cloud.consul.discovery.instance-id=${spring.application.name}:${spring.application.instance_id:${random.value}}
node2
server.port=8082
spring.cloud.consul.discovery.enabled=true
spring.cloud.consul.discovery.register=true
spring.application.name=leader-election-example
spring.cloud.consul.host=localhost
spring.cloud.consul.port=8500
spring.cloud.consul.discovery.instance-id=${spring.application.name}:${spring.application.instance_id:${random.value}}
以上只是一些关键代码。
4.测试
启动node1节点
java -jar myapp.jar --spring.profiles.active=node1
启动node2节点
java -jar myapp.jar --spring.profiles.active=node2
通过控制台观察日志,其中只有一台机器能选为主机
以上就是spring cloud consul实现选举机制的代码工程的详细内容,更多关于spring cloud consul选举机制的资料请关注代码网其它相关文章!
发表评论