前言
nameserver在整个rocketmq的模块划分中占据重要的地位,起到类似于注册中心的作用。
brokerserver启动时需要向nameserver注册自身元数据信息以及主题topic信息,而producer发送消息到brokerserver、consumer从brokerserver订阅消息,则需要经过nameserver才能确定最终要进行数据通讯的brokerserver的地址,所以,brokerserver、producer、consumer程序启动均需要配置nameserver的地址。
本篇文章就来聊一聊,brokerserver、producer、consumer程序如何设置nameserver的地址以及nameserver的地址是否可以动态更新。
配置方式
首要我们需要清楚一点,代理服务器brokerserver的配置信息最后会封装到brokerconfig对象中,而生产者producer、消费者consumer的配置信息最终会封装到clientconfig对象中,那么在这两个类中应该有nameserver地址相关的变量。
clientconfig代码片段
public class clientconfig {
// ...省略部分代码
private string namesrvaddr = nameserveraddressutils.getnameserveraddresses();
// ...省略部分代码
}
nameserveraddressutils#getnameserveraddresses
public class nameserveraddressutils {
// ...省略部分代码
public static string getnameserveraddresses() {
return system.getproperty(mixall.namesrv_addr_property, system.getenv(mixall.namesrv_addr_env));
}
// ...省略部分代码
}
brokerconfig代码片段
public class brokerconfig {
// ...省略部分代码
@importantfield
private string namesrvaddr = system.getproperty(mixall.namesrv_addr_property, system.getenv(mixall.namesrv_addr_env));
// ...省略部分代码
}
可见,不管是clientconfig,还是brokerconfig,nameserver地址变量的初始值,默认先取系统属性rocketmq.namesrv.addr的值,如果该系统属性未设置,则再取环境变量namesrv_addr的值,如果两者均未设置,那么配置类中nameserver地址变量初始值为null。
注:
通过上述分析,我们可以知晓两种配置nameserver地址的方法,不管你的程序是brokerserver、producer或者是consumer
1.设置系统属性:rocketmq.namesrv.addr
2.设置环境变量:namesrv_addr
如果配置类中nameserver地址变量初始值为null,那brokerserver、producer、consumer启动的时候是不是就会因为没有这个值而启动不了或者报错呢?
其实并不会,还有额外的补偿手段,程序会通过http请求去访问一个特定的url获取nameserver地址,我们继续分析。
producer或者consumer
producer或者consumer底层都会持有一个mqclientinstance类对象,而在mqclientinstance类中,我们可以看到通过url请求nameserver地址的代码。
原生api发送消息或者消费消息的代码大致如下所列,我们以消息发送者的start方法为切入口进行分析。
// 原生api发送消息
defaultmqproducer producer = new defaultmqproducer("test-group");
producer.start();
sendresult sendresult = producer.send(new message("test-topic", "test-message".getbytes(standardcharsets.utf_8)));
system.out.println(sendresult);
// 原生api消费消息
defaultmqpushconsumer defaultmqpushconsumer = new defaultmqpushconsumer("test-group");
defaultmqpushconsumer.registermessagelistener((messagelistenerconcurrently) (msglist, context) -> {
try {
msglist.foreach(system.out::println);
} catch (exception e) {
e.printstacktrace();
return consumeconcurrentlystatus.reconsume_later;
}
return consumeconcurrentlystatus.consume_success;
});
defaultmqpushconsumer.subscribe("test-topic", "*");
defaultmqpushconsumer.start();
defaultmqproducer#start
@override
public void start() throws mqclientexception {
this.setproducergroup(withnamespace(this.producergroup));
this.defaultmqproducerimpl.start();
if (null != tracedispatcher) {
try {
tracedispatcher.start(this.getnamesrvaddr(), this.getaccesschannel());
} catch (mqclientexception e) {
log.warn("trace dispatcher start failed ", e);
}
}
}
我们可以看到,defaultmqproducer的start方法主要逻辑委托给了defaultmqproducerimpl的start方法
defaultmqproducerimpl#start
public void start() throws mqclientexception {
this.start(true);
}
public void start(final boolean startfactory) throws mqclientexception {
switch (this.servicestate) {
case create_just:
this.servicestate = servicestate.start_failed;
this.checkconfig();
if (!this.defaultmqproducer.getproducergroup().equals(mixall.client_inner_producer_group)) {
this.defaultmqproducer.changeinstancenametopid();
}
this.mqclientfactory = mqclientmanager.getinstance().getorcreatemqclientinstance(this.defaultmqproducer, rpchook);
boolean registerok = mqclientfactory.registerproducer(this.defaultmqproducer.getproducergroup(), this);
if (!registerok) {
this.servicestate = servicestate.create_just;
throw new mqclientexception("the producer group[" + this.defaultmqproducer.getproducergroup()
+ "] has been created before, specify another name please." + faqurl.suggesttodo(faqurl.group_name_duplicate_url),
null);
}
this.topicpublishinfotable.put(this.defaultmqproducer.getcreatetopickey(), new topicpublishinfo());
if (startfactory) {
mqclientfactory.start();
}
log.info("the producer [{}] start ok. sendmessagewithvipchannel={}", this.defaultmqproducer.getproducergroup(),
this.defaultmqproducer.issendmessagewithvipchannel());
this.servicestate = servicestate.running;
break;
case running:
case start_failed:
case shutdown_already:
throw new mqclientexception("the producer service state not ok, maybe started once, "
+ this.servicestate
+ faqurl.suggesttodo(faqurl.client_service_not_ok),
null);
default:
break;
}
this.mqclientfactory.sendheartbeattoallbrokerwithlock();
this.startscheduledtask();
}
在defaultmqproducerimpl的start(final boolean startfactory)方法中,创建了mqclientinstance对象实例,并调用了其start方法
mqclientinstance#start
public void start() throws mqclientexception {
synchronized (this) {
switch (this.servicestate) {
case create_just:
// 省略部分代码
// 没有手动指定namesrv的值,从远端服务器获取并更新本地缓存
if (null == this.clientconfig.getnamesrvaddr()) {
this.mqclientapiimpl.fetchnameserveraddr();
}
// 省略部分代码
// start various schedule tasks
this.startscheduledtask();
// 省略部分代码
break;
case start_failed:
throw new mqclientexception("the factory object[" + this.getclientid() + "] has been created before, and failed.", null);
default:
break;
}
}
}
mqclientinstance的start方法中,与我们该篇分析的内容相关的大致就是上述代码所列的两处。
第一,判断clientconfig对象的namesrvaddr值是否为null,若是,调用mqclientapiimpl的fetchnameserveraddr方法从远端服务器拉取nameserver服务器的地址,并更新用到的地方
mqclientapiimpl#fetchnameserveraddr
public string fetchnameserveraddr() {
try {
string addrs = this.topaddressing.fetchnsaddr();
if (addrs != null) {
if (!addrs.equals(this.namesrvaddr)) {
log.info("name server address changed, old=" + this.namesrvaddr + ", new=" + addrs);
this.updatenameserveraddresslist(addrs);
this.namesrvaddr = addrs;
return namesrvaddr;
}
}
} catch (exception e) {
log.error("fetchnameserveraddr exception", e);
}
return namesrvaddr;
}
topaddressing#fetchnsaddr
public final string fetchnsaddr() {
return fetchnsaddr(true, 3000);
}
public final string fetchnsaddr(boolean verbose, long timeoutmills) {
string url = this.wsaddr;
try {
if (!utilall.isblank(this.unitname)) {
url = url + "-" + this.unitname + "?nofix=1";
}
httptinyclient.httpresult result = httptinyclient.httpget(url, null, null, "utf-8", timeoutmills);
if (200 == result.code) {
string responsestr = result.content;
if (responsestr != null) {
return clearnewline(responsestr);
} else {
log.error("fetch nameserver address is null");
}
} else {
log.error("fetch nameserver address failed. statuscode=" + result.code);
}
} catch (ioexception e) {
if (verbose) {
log.error("fetch name server address exception", e);
}
}
if (verbose) {
string errormsg =
"connect to " + url + " failed, maybe the domain name " + mixall.getwsaddr() + " not bind in /etc/hosts";
errormsg += faqurl.suggesttodo(faqurl.name_server_addr_not_exist_url);
log.warn(errormsg);
}
return null;
}
向远端请求的url地址就是wsaddr变量值,那么这个变量是在什么地方赋值的呢?发现该变量是在topaddressing的构造方法中赋值,而topaddressing对象又是在mqclientapiimpl中创建,我们找到具体的创建逻辑
mqclientapiimpl#constructor
public mqclientapiimpl(final nettyclientconfig nettyclientconfig,
final clientremotingprocessor clientremotingprocessor,
rpchook rpchook, final clientconfig clientconfig) {
this.clientconfig = clientconfig;
topaddressing = new topaddressing(mixall.getwsaddr(), clientconfig.getunitname());
// 省略部分逻辑
}
传入topaddressing构造方法的值取自mixall的getwsaddr方法的返回值
mixall#getwsaddr
public static string getwsaddr() {
string wsdomainname = system.getproperty("rocketmq.namesrv.domain", default_namesrv_addr_lookup);
string wsdomainsubgroup = system.getproperty("rocketmq.namesrv.domain.subgroup", "nsaddr");
string wsaddr = "http://" + wsdomainname + ":8080/rocketmq/" + wsdomainsubgroup;
if (wsdomainname.indexof(":") > 0) {
wsaddr = "http://" + wsdomainname + "/rocketmq/" + wsdomainsubgroup;
}
return wsaddr;
}
通过分析上述方法,可以知道,默认会从http://jmenv.tbsite.net:8080/rocketmq/nsaddr这个链接处拉取nameserver的地址。当然,如果你想更改这个链接,可以修改系统属性"rocketmq.namesrv.domain"和"rocketmq.namesrv.domain.subgroup"的值以达到目的。
第二,判断clientconfig对象的namesrvaddr值是否为null,若是,开启定时任务,周期性地更新nameserver服务器的地址
mqclientinstance#startscheduledtask
private void startscheduledtask() {
// 没有手动指定name-server地址的情况下,两分钟更新一次name-server地址
// 这个就是name-server可以动态变化的唯一途径
if (null == this.clientconfig.getnamesrvaddr()) {
// 两分钟拉取更新一次name-server地址
this.scheduledexecutorservice.scheduleatfixedrate(new runnable() {
@override
public void run() {
try {
mqclientinstance.this.mqclientapiimpl.fetchnameserveraddr();
} catch (exception e) {
log.error("scheduledtask fetchnameserveraddr exception", e);
}
}
}, 1000 * 10, 1000 * 60 * 2, timeunit.milliseconds);
}
}
可以看出,默认每隔2分钟会调用mqclientapiimpl的fetchnameserveraddr方法更新nameserver地址,具体逻辑上述第一条已经解释过不再赘述
brokerserver
brokerserver的启动类是brokerstartup,在brokerstartup的main方法中,会创建brokercontroller的实例对象并调用其start方法,而在创建brokercontroller的实例对象时会一并调用其initialize方法进行初始化,就在这个初始化方法中,有跟我们该篇分析相关的内容。
brokerstartup#main
public static void main(string[] args) {
start(createbrokercontroller(args));
}
brokerstartup#createbrokercontroller
public static brokercontroller createbrokercontroller(string[] args) {
// 省略部分代码
try {
// 省略部分代码
boolean initresult = controller.initialize();
// 省略部分代码
return controller;
} catch (throwable e) {
e.printstacktrace();
system.exit(-1);
}
return null;
}
brokercontroller#initialize
public boolean initialize() throws clonenotsupportedexception {
// 将本地文件中存储的数据加载至内存
boolean result = this.topicconfigmanager.load();
result = result && this.consumeroffsetmanager.load();
result = result && this.subscriptiongroupmanager.load();
result = result && this.consumerfiltermanager.load();
// 省略部分代码
result = result && this.messagestore.load();
if (result) {
// 省略部分代码
if (this.brokerconfig.getnamesrvaddr() != null) {
this.brokerouterapi.updatenameserveraddresslist(this.brokerconfig.getnamesrvaddr());
log.info("set user specified name server address: {}", this.brokerconfig.getnamesrvaddr());
} else if (this.brokerconfig.isfetchnamesrvaddrbyaddressserver()) {
// 没有明确指定name-server的地址,且配置了允许从地址服务器获取name-server地址
// 每隔2分钟从name-server地址服务器拉取最新的配置
this.scheduledexecutorservice.scheduleatfixedrate(new runnable() {
@override
public void run() {
try {
brokercontroller.this.brokerouterapi.fetchnameserveraddr();
} catch (throwable e) {
log.error("scheduledtask fetchnameserveraddr exception", e);
}
}
}, 1000 * 10, 1000 * 60 * 2, timeunit.milliseconds);
}
// 省略部分代码
}
return result;
}
可以看到,brokerconfig中如果namesrvaddr变量值为null,默认每隔2分钟会调用brokerouterapi的fetchnameserveraddr方法更新nameserver地址
brokerouterapi#fetchnameserveraddr
public string fetchnameserveraddr() {
try {
string addrs = this.topaddressing.fetchnsaddr();
if (addrs != null) {
if (!addrs.equals(this.namesrvaddr)) {
log.info("name server address changed, old: {} new: {}", this.namesrvaddr, addrs);
this.updatenameserveraddresslist(addrs);
this.namesrvaddr = addrs;
return namesrvaddr;
}
}
} catch (exception e) {
log.error("fetchnameserveraddr exception", e);
}
return namesrvaddr;
}
该方法与mqclientapiimpl的fetchnameserveraddr方法逻辑几乎一模一样,也不再进行赘述。
注:
第三种配置nameserver地址的方法,那就是系统启动的时候,不配置系统属性rocketmq.namesrv.addr和环境变量namesrv_addr,这样系统会自动访问一个url从远端服务器拉取nameserver的地址,同时会开启相应的定时任务定时刷新nameserver地址。
总结
brokerserver、producer、consumer程序启动获取nameserver地址的三种方式
- 1.配置系统属性:rocketmq.namesrv.addr
- 2.配置环境变量:namesrv_addr
- 3.不配置系统属性rocketmq.namesrv.addr和环境变量namesrv_addr,让程序自动访问一个url从远端服务器拉取nameserver的地址,同时会开启相应的定时任务定时刷新nameserver地址。该url地址默认为http://jmenv.tbsite.net:8080/rocketmq/nsaddr,可以通过修改系统属性rocketmq.namesrv.domain和rocketmq.namesrv.domain.subgroup以达到修改url地址的目的。
只有上述第3种方式才可以实现程序运行过程中动态更新nameserver地址值。
以上为个人经验,希望能给大家一个参考,也希望大家多多支持代码网。
发表评论