当前位置: 代码网 > it编程>编程语言>Java > 【SpringCloud】Eureka源码解析 上

【SpringCloud】Eureka源码解析 上

2024年07月31日 Java 我要评论
SpringCloud的eureka组件解析,包括注册服务和拉取服务

e246a1dda09849a5a89535a62441565d.png

【springcloud】eureka源码解析 上-csdn博客

【springcloud】eureka源码解析 下-csdn博客

【springcloud】zuul源码解析-csdn博客

【springcloud】config源码解析-csdn博客

【springcloud】hystrix源码解析-csdn博客

【springcloud】ribbon源码解析-csdn博客

 往期系列:

【springboot】springboot源码解析第一章 springboot的构造方法-csdn博客

【springboot】springboot源码解析第二章 springboot的run方法-csdn博客

【springboot】springboot源码解析第三章 springboot的自动化配置-csdn博客

【springboot】springboot源码解析第四章 springboot的bean接口-csdn博客

【springboot】springboot源码解析第五章 springboot的beandefinition收集过程-csdn博客

【springboot】springboot源码解析第六章 springboot的getbean方法-csdn博客

【springboot】springboot源码解析第七章 springboot的感悟-csdn博客

eureka是一个服务发现与注册组件,它包含服务端和客户端,服务端管理服务的注册信息,客户端简化服务实例与服务端的交互。我们结合源码来分析下eureka组件的实现原理,内容分为上下两章,第一章分析eureka的服务注册,第二章分析eureka的心跳机制,本章节是第一章。

参考源码:<spring-cloud.version>hoxton.sr9</spring-cloud.version>

1、注册服务

1.1 服务端接收注册信息

spring-cloud-netflix-eureka-server依赖包下有一个spring.factories文件,文件内容如下

org.springframework.boot.autoconfigure.enableautoconfiguration=\
  org.springframework.cloud.netflix.eureka.server.eurekaserverautoconfiguration

根据springboot自动配置的原理可知,eurekaserverautoconfiguration会被标记成了一个自动配置类。eurekaserverautoconfiguration配置类中有一个jerseyapplication方法,这个方法会收集指定包下被path或provider注解标记的类的beandefinition,这些类可以看作是controller

// 扫描包路径
private static final string[] eureka_packages = 
	new string[]{"com.netflix.discovery", "com.netflix.eureka"};    


// 收集包下指定类的beandefinition,放入application对象
@bean
public application jerseyapplication(environment environment, resourceloader resourceloader) {
	classpathscanningcandidatecomponentprovider provider = new classpathscanningcandidatecomponentprovider(false, environment);

	// 收集的对象要求被path或provider注解标记
	provider.addincludefilter(new annotationtypefilter(path.class));
	provider.addincludefilter(new annotationtypefilter(provider.class));
	string[] var5 = eureka_packages;
	int var6 = var5.length;

	for(int var7 = 0; var7 < var6; ++var7) {
		string basepackage = var5[var7];

		// 扫描包路径,收集beandefinition
		set<beandefinition> beans = provider.findcandidatecomponents(basepackage);
		iterator var10 = beans.iterator();

		while(var10.hasnext()) {
			beandefinition bd = (beandefinition)var10.next();
			class<?> cls = classutils.resolveclassname(bd.getbeanclassname(), resourceloader.getclassloader());
			classes.add(cls);
		}
	}

	...
	return rc;
}

// 获取到application,将beandefinition置入servlet容器
@bean
public filterregistrationbean<?> jerseyfilterregistration(application eurekajerseyapp) {
	filterregistrationbean<filter> bean = new filterregistrationbean();
	bean.setfilter(new servletcontainer(eurekajerseyapp));
	bean.setorder(integer.max_value);
	bean.seturlpatterns(collections.singletonlist("/eureka/*"));
	return bean;
}

收集的beandefinition会通过jerseyfilterregistration方法放入servlet容器,这样接收请求时就能通过url映射给指定的bean来处理请求

com.netflix.eureka包下被扫描的类如下:

applicationresource类是controller中的一员,它有一个addinstance方法,这个方法就是服务端响应服务注册的方法

@post
@consumes({"application/json", "application/xml"})
public response addinstance(instanceinfo info, @headerparam("x-netflix-discovery-replication") string isreplication) {
		...
		
		// 执行注册
		this.registry.register(info, "true".equals(isreplication));
		return response.status(204).build();
}
调用链:
-> applicationresource.addinstance
-> instanceregistry.register
-> peerawareinstanceregistryimpl.register
-> abstractinstanceregistry.register

服务端使用currenthashmap来存储服务的信息,服务端响应注册的过程较为简单

// 用currenthashmap存储服务信息 
private final concurrenthashmap<string, map<string, lease<instanceinfo>>> registry
					 = new concurrenthashmap();    

public void register(instanceinfo registrant, int leaseduration, boolean isreplication)   
{
		...
		map<string, lease<instanceinfo>> gmap = (map)this.registry.get(registrant.getappname());
		lease<instanceinfo> lease = new lease(registrant, leaseduration);
		if (existinglease != null) {
			lease.setserviceuptimestamp(existinglease.getserviceuptimestamp());
		}
		
		// 将服务信息放入map中
		((map)gmap).put(registrant.getid(), lease);
		...
}

1.2 客户端发送注册信息
1.2.1 client客户端

spring-cloud-netflix-eureka-client依赖包下也有一个spring.factories文件,文件内容如下

...
org.springframework.cloud.netflix.eureka.eurekaclientautoconfiguration
...

eurekaclientautoconfiguration被标记成自动配置类,它里面有一个创建eurekaclient类对象的bean方法,看类的名称我们知道这是一个客户端

@bean(
	destroymethod = "shutdown"
)
@conditionalonmissingbean(
	value = {eurekaclient.class},
	search = searchstrategy.current
)
@org.springframework.cloud.context.config.annotation.refreshscope
@lazy
public eurekaclient eurekaclient(applicationinfomanager manager, eurekaclientconfig config, eurekainstanceconfig instance, @autowired(required = false) healthcheckhandler healthcheckhandler) {
	applicationinfomanager appmanager;
	if (aoputils.isaopproxy(manager)) {
		appmanager = (applicationinfomanager)proxyutils.gettargetobject(manager);
	} else {
		appmanager = manager;
	}

	// 创建客户端
	cloudeurekaclient cloudeurekaclient = new cloudeurekaclient(appmanager, config, this.optionalargs, this.context);
	cloudeurekaclient.registerhealthcheck(healthcheckhandler);
	return cloudeurekaclient;
}
调用链:
-> eurekaautoserviceregistration.eurekaclient
-> new cloudeurekaclient(appmanager, config, this.optionalargs, this.context);
-> cloudeurekaclient.super(applicationinfomanager, config, args);
-> discoveryclient.discoveryclient

... 构造方法重载

-> discoveryclient.discoveryclient
-> initscheduledtasks

跟踪eurekaclient类的构造方法找到discoveryclient类,discoveryclient类的构造方法调用了initscheduledtasks方法,初始化了一个定时任务

private void initscheduledtasks() {
	...
		// 添加状态变更监听器
		this.statuschangelistener = new applicationinfomanager.statuschangelistener() {
			public string getid() {
				return "statuschangelistener";
			}

			public void notify(statuschangeevent statuschangeevent) {
				if (statuschangeevent.getstatus() == instancestatus.down) {
					discoveryclient.logger.error("saw local status change event {}", statuschangeevent);
				} else {
					discoveryclient.logger.info("saw local status change event {}", statuschangeevent);
				}

				// 监听器被通知后调用ondemandupdate方法
				discoveryclient.this.instanceinforeplicator.ondemandupdate();
			}
		};
	...

}

定时任务内添加了一个状态修改监听器,监听器调用notify方法时会回调ondemandupdate方法,追踪这个回调方法

调用链:
-> instanceinforeplicator.ondemandupdate
-> instanceinforeplicator.this.run
-> this.discoveryclient.register
-> this.eurekatransport.registrationclient.register(this.instanceinfo)
-> abstractjerseyeurekahttpclient.register

进入到abstractjerseyeurekahttpclient类的register方法

public eurekahttpresponse<void> register(instanceinfo info) {
	string urlpath = "apps/" + info.getappname();
	clientresponse response = null;

	eurekahttpresponse var5;
	try {
		// 向注册中心发送http请求
		webresource.builder resourcebuilder = this.jerseyclient
				.resource(this.serviceurl)
				.path(urlpath)
				.getrequestbuilder();
		this.addextraheaders(resourcebuilder);
		response = (clientresponse)((webresource.builder)((webresource.builder)((webresource.builder)resourcebuilder.header("accept-encoding", "gzip")).type(mediatype.application_json_type)).accept(new string[]{"application/json"})).post(clientresponse.class, info);
		var5 = eurekahttpresponse.aneurekahttpresponse(response.getstatus()).headers(headersof(response)).build();
	} finally {
		if (logger.isdebugenabled()) {
			logger.debug("jersey http post {}/{} with instance {}; statuscode={}", new object[]{this.serviceurl, urlpath, info.getid(), response == null ? "n/a" : response.getstatus()});
		}

		if (response != null) {
			response.close();
		}

	}

	return var5;
}

注册方法写得很直白了:客户端拿到注册中心地址,然后携带服务元数据,发送请求完成注册。不过还有一个问题,之前我们提到定时任务内初始化了一个监听器,这个监听器只有被通知了才会执行后续的注册方法,那么监听器是如何被通知的?它的触发时机又在何时?

1.2.2 监听器

eurekaclientautoconfiguration配置类还有一个创建eurekaautoserviceregistration类的bean方法

// 创建服务注册客户端 
@bean
@conditionalonbean({autoserviceregistrationproperties.class})
@conditionalonproperty(
	value = {"spring.cloud.service-registry.auto-registration.enabled"},
	matchifmissing = true
)
public eurekaautoserviceregistration eurekaautoserviceregistration(applicationcontext context, eurekaserviceregistry registry, eurekaregistration registration) {
	return new eurekaautoserviceregistration(context, registry, registration);
}

eurekaautoserviceregistration类实现了smartlifecycle接口。当spring容器加载完所有bean后会调用smartlifecycle接口实现类的start方法,start方法调用eurekaserviceregistry类的regiser方法

public class eurekaautoserviceregistration implements 
    autoserviceregistration, 
    smartlifecycle, 
    ordered, 
    smartapplicationlistener 
{


    public void start() {
        if (this.port.get() != 0) {
            if (this.registration.getnonsecureport() == 0) {
                this.registration.setnonsecureport(this.port.get());
            }

            if (this.registration.getsecureport() == 0 && this.registration.issecure()) {
                this.registration.setsecureport(this.port.get());
            }
        }

        if (!this.running.get() && this.registration.getnonsecureport() > 0) {
            
            // 调用eurekaserviceregistry的regiser
            this.serviceregistry.register(this.registration);
            this.context.publishevent(new instanceregisteredevent(this, 
            this.registration.getinstanceconfig()));
            this.running.set(true);
        }

    }
}

eurekaserviceregistry类的regiser方法会设置实例的状态。进入applicationinfomanager类的setinstancestatus方法

// 设置实例状态        
reg.getapplicationinfomanager().setinstancestatus(
		  reg.getinstanceconfig().getinitialstatus());
      

setinstancestatus方法触发了一个状态修改事件,并且通知了监听器

public synchronized void setinstancestatus(instanceinfo.instancestatus status) {
	instanceinfo.instancestatus next = this.instancestatusmapper.map(status);
	if (next != null) {
		instanceinfo.instancestatus prev = this.instanceinfo.setstatus(next);
		if (prev != null) {
			iterator var4 = this.listeners.values().iterator();

			while(var4.hasnext()) {
				statuschangelistener listener = (statuschangelistener)var4.next();

				try {
					// 通知监听器
					listener.notify(new statuschangeevent(prev, next));
				} catch (exception var7) {
					logger.warn("failed to notify listener: {}", listener.getid(), var7);
				}
			}
		}

	}
}

这里的监听器和上面提到的状态修改监听器其实是同一个监听器,在调用eurekaautoserviceregistration对象的start方法后,监听器会收到通知然后调用客户端的register方法,这就是发送注册服务请求的执行时机

2、拉取服务

2.1 初次拉取

客户端第一次拉取服务和discoveryclient类的构造方法有关,详情如下:

@inject
discoveryclient(...){
	...
	// 调用fetchregistry方法,拉取服务
	boolean primaryfetchregistryresult = this.fetchregistry(false);
	if (!primaryfetchregistryresult) {
		 logger.info("initial registry fetch from primary servers failed");
	}
	...
}  

private boolean fetchregistry(boolean forcefullregistryfetch) {
	...
	// 调用getandstorefullregistry方法,拉取全部服务
	this.getandstorefullregistry();
	...
}

private void getandstorefullregistry() throws throwable {
	...
	long currentupdategeneration = this.fetchregistrygeneration.get();
	// 启动时会打印这行日志
	logger.info("getting all instance registry info from the eureka server");
	applications apps = null;
	// 发送http请求
	eurekahttpresponse<applications> httpresponse = this.clientconfig.getregistryrefreshsinglevipaddress() == null ? this.eurekatransport.queryclient.getapplications((string[])this.remoteregionsref.get()) : this.eurekatransport.queryclient.getvip(this.clientconfig.getregistryrefreshsinglevipaddress(), (string[])this.remoteregionsref.get());
	if (httpresponse.getstatuscode() == status.ok.getstatuscode()) {
		apps = (applications)httpresponse.getentity();
	}
	...
}
2.2 定时拉取

为了保证服务信息真实可信,客户端会定时拉取远程注册列表更新本地数据。提到到定时任务,自然的联想到discoveryclient类的initscheduledtasks方法(1.2.1的内容)

private void initscheduledtasks() {
	int renewalintervalinsecs;
	int expbackoffbound;
	if (this.clientconfig.shouldfetchregistry()) {
		renewalintervalinsecs = this.clientconfig.getregistryfetchintervalseconds();
		expbackoffbound = this.clientconfig.getcacherefreshexecutorexponentialbackoffbound();

		// 定时刷新本地服务列表任务,具体任务在cacherefreshthread内
		this.cacherefreshtask = new timedsupervisortask("cacherefresh", this.scheduler, this.cacherefreshexecutor, renewalintervalinsecs, timeunit.seconds, expbackoffbound, new 
		// 执行任务的线程
		cacherefreshthread());
		this.scheduler.schedule(this.cacherefreshtask, (long)renewalintervalinsecs, timeunit.seconds);
	}
}

class cacherefreshthread implements runnable {
	cacherefreshthread() {
	}

	public void run() {
		// 刷新服务列表
		discoveryclient.this.refreshregistry();
	}
}

@visiblefortesting
void refreshregistry() {
	...
	// 获取服务列表
	boolean success = this.fetchregistry(remoteregionsmodified);
	...
}

3、总结

eureka服务端启动后通过自动配置加载com.netflix.eureka包下的处理器,处理器会响应注册、拉取、剔除服务等http请求

eureka客户端启动后会发送注册请求,并定时更新服务列表

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2025  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com