当前位置: 代码网 > it编程>游戏开发>游戏引擎 > Rocketmq之NameServer地址配置及更新过程

Rocketmq之NameServer地址配置及更新过程

2026年04月12日 游戏引擎 我要评论
前言nameserver在整个rocketmq的模块划分中占据重要的地位,起到类似于注册中心的作用。brokerserver启动时需要向nameserver注册自身元数据信息以及主题topic信息,而

前言

nameserver在整个rocketmq的模块划分中占据重要的地位,起到类似于注册中心的作用。

brokerserver启动时需要向nameserver注册自身元数据信息以及主题topic信息,而producer发送消息到brokerserver、consumer从brokerserver订阅消息,则需要经过nameserver才能确定最终要进行数据通讯的brokerserver的地址,所以,brokerserver、producer、consumer程序启动均需要配置nameserver的地址。

本篇文章就来聊一聊,brokerserver、producer、consumer程序如何设置nameserver的地址以及nameserver的地址是否可以动态更新。

配置方式

首要我们需要清楚一点,代理服务器brokerserver的配置信息最后会封装到brokerconfig对象中,而生产者producer、消费者consumer的配置信息最终会封装到clientconfig对象中,那么在这两个类中应该有nameserver地址相关的变量。

clientconfig代码片段

public class clientconfig {
    // ...省略部分代码
    private string namesrvaddr = nameserveraddressutils.getnameserveraddresses();
    // ...省略部分代码
}

nameserveraddressutils#getnameserveraddresses

public class nameserveraddressutils {
    // ...省略部分代码
    public static string getnameserveraddresses() {
        return system.getproperty(mixall.namesrv_addr_property, system.getenv(mixall.namesrv_addr_env));
    }
    // ...省略部分代码
}

brokerconfig代码片段

public class brokerconfig {
    // ...省略部分代码
    @importantfield
    private string namesrvaddr = system.getproperty(mixall.namesrv_addr_property, system.getenv(mixall.namesrv_addr_env));
    // ...省略部分代码
}

可见,不管是clientconfig,还是brokerconfig,nameserver地址变量的初始值,默认先取系统属性rocketmq.namesrv.addr的值,如果该系统属性未设置,则再取环境变量namesrv_addr的值,如果两者均未设置,那么配置类中nameserver地址变量初始值为null。 

注: 

通过上述分析,我们可以知晓两种配置nameserver地址的方法,不管你的程序是brokerserver、producer或者是consumer 

1.设置系统属性:rocketmq.namesrv.addr 

2.设置环境变量:namesrv_addr

如果配置类中nameserver地址变量初始值为null,那brokerserver、producer、consumer启动的时候是不是就会因为没有这个值而启动不了或者报错呢?

其实并不会,还有额外的补偿手段,程序会通过http请求去访问一个特定的url获取nameserver地址,我们继续分析。

producer或者consumer

producer或者consumer底层都会持有一个mqclientinstance类对象,而在mqclientinstance类中,我们可以看到通过url请求nameserver地址的代码。

原生api发送消息或者消费消息的代码大致如下所列,我们以消息发送者的start方法为切入口进行分析。

// 原生api发送消息
defaultmqproducer producer = new defaultmqproducer("test-group");
producer.start();
sendresult sendresult = producer.send(new message("test-topic", "test-message".getbytes(standardcharsets.utf_8)));
system.out.println(sendresult);

// 原生api消费消息
defaultmqpushconsumer defaultmqpushconsumer = new defaultmqpushconsumer("test-group");
defaultmqpushconsumer.registermessagelistener((messagelistenerconcurrently) (msglist, context) -> {
    try {
        msglist.foreach(system.out::println);
    } catch (exception e) {
        e.printstacktrace();
        return consumeconcurrentlystatus.reconsume_later;
    }
    return consumeconcurrentlystatus.consume_success;
});
defaultmqpushconsumer.subscribe("test-topic", "*");
defaultmqpushconsumer.start();

defaultmqproducer#start

@override
public void start() throws mqclientexception {
    this.setproducergroup(withnamespace(this.producergroup));
    this.defaultmqproducerimpl.start();
    if (null != tracedispatcher) {
        try {
            tracedispatcher.start(this.getnamesrvaddr(), this.getaccesschannel());
        } catch (mqclientexception e) {
            log.warn("trace dispatcher start failed ", e);
        }
    }
}

我们可以看到,defaultmqproducer的start方法主要逻辑委托给了defaultmqproducerimpl的start方法

defaultmqproducerimpl#start

public void start() throws mqclientexception {
    this.start(true);
}

public void start(final boolean startfactory) throws mqclientexception {
    switch (this.servicestate) {
        case create_just:
            this.servicestate = servicestate.start_failed;

            this.checkconfig();

            if (!this.defaultmqproducer.getproducergroup().equals(mixall.client_inner_producer_group)) {
                this.defaultmqproducer.changeinstancenametopid();
            }

            this.mqclientfactory = mqclientmanager.getinstance().getorcreatemqclientinstance(this.defaultmqproducer, rpchook);

            boolean registerok = mqclientfactory.registerproducer(this.defaultmqproducer.getproducergroup(), this);
            if (!registerok) {
                this.servicestate = servicestate.create_just;
                throw new mqclientexception("the producer group[" + this.defaultmqproducer.getproducergroup()
                    + "] has been created before, specify another name please." + faqurl.suggesttodo(faqurl.group_name_duplicate_url),
                    null);
            }

            this.topicpublishinfotable.put(this.defaultmqproducer.getcreatetopickey(), new topicpublishinfo());

            if (startfactory) {
                mqclientfactory.start();
            }

            log.info("the producer [{}] start ok. sendmessagewithvipchannel={}", this.defaultmqproducer.getproducergroup(),
                this.defaultmqproducer.issendmessagewithvipchannel());
            this.servicestate = servicestate.running;
            break;
        case running:
        case start_failed:
        case shutdown_already:
            throw new mqclientexception("the producer service state not ok, maybe started once, "
                + this.servicestate
                + faqurl.suggesttodo(faqurl.client_service_not_ok),
                null);
        default:
            break;
    }

    this.mqclientfactory.sendheartbeattoallbrokerwithlock();

    this.startscheduledtask();

}

在defaultmqproducerimpl的start(final boolean startfactory)方法中,创建了mqclientinstance对象实例,并调用了其start方法

mqclientinstance#start

public void start() throws mqclientexception {

    synchronized (this) {
        switch (this.servicestate) {
            case create_just:
                // 省略部分代码
                
                // 没有手动指定namesrv的值,从远端服务器获取并更新本地缓存
                if (null == this.clientconfig.getnamesrvaddr()) {
                    this.mqclientapiimpl.fetchnameserveraddr();
                }
                
                // 省略部分代码
                
                // start various schedule tasks
                this.startscheduledtask();
                
                // 省略部分代码
                break;
            case start_failed:
                throw new mqclientexception("the factory object[" + this.getclientid() + "] has been created before, and failed.", null);
            default:
                break;
        }
    }
}

mqclientinstance的start方法中,与我们该篇分析的内容相关的大致就是上述代码所列的两处。 

第一,判断clientconfig对象的namesrvaddr值是否为null,若是,调用mqclientapiimpl的fetchnameserveraddr方法从远端服务器拉取nameserver服务器的地址,并更新用到的地方

mqclientapiimpl#fetchnameserveraddr

public string fetchnameserveraddr() {
    try {
        string addrs = this.topaddressing.fetchnsaddr();
        if (addrs != null) {
            if (!addrs.equals(this.namesrvaddr)) {
                log.info("name server address changed, old=" + this.namesrvaddr + ", new=" + addrs);
                this.updatenameserveraddresslist(addrs);
                this.namesrvaddr = addrs;
                return namesrvaddr;
            }
        }
    } catch (exception e) {
        log.error("fetchnameserveraddr exception", e);
    }
    return namesrvaddr;
}

topaddressing#fetchnsaddr

public final string fetchnsaddr() {
    return fetchnsaddr(true, 3000);
}

public final string fetchnsaddr(boolean verbose, long timeoutmills) {
    string url = this.wsaddr;
    try {
        if (!utilall.isblank(this.unitname)) {
            url = url + "-" + this.unitname + "?nofix=1";
        }
        httptinyclient.httpresult result = httptinyclient.httpget(url, null, null, "utf-8", timeoutmills);
        if (200 == result.code) {
            string responsestr = result.content;
            if (responsestr != null) {
                return clearnewline(responsestr);
            } else {
                log.error("fetch nameserver address is null");
            }
        } else {
            log.error("fetch nameserver address failed. statuscode=" + result.code);
        }
    } catch (ioexception e) {
        if (verbose) {
            log.error("fetch name server address exception", e);
        }
    }

    if (verbose) {
        string errormsg =
            "connect to " + url + " failed, maybe the domain name " + mixall.getwsaddr() + " not bind in /etc/hosts";
        errormsg += faqurl.suggesttodo(faqurl.name_server_addr_not_exist_url);

        log.warn(errormsg);
    }
    return null;
}

向远端请求的url地址就是wsaddr变量值,那么这个变量是在什么地方赋值的呢?发现该变量是在topaddressing的构造方法中赋值,而topaddressing对象又是在mqclientapiimpl中创建,我们找到具体的创建逻辑

mqclientapiimpl#constructor

public mqclientapiimpl(final nettyclientconfig nettyclientconfig,
    final clientremotingprocessor clientremotingprocessor,
    rpchook rpchook, final clientconfig clientconfig) {
    this.clientconfig = clientconfig;
    topaddressing = new topaddressing(mixall.getwsaddr(), clientconfig.getunitname());
    
    // 省略部分逻辑
}

传入topaddressing构造方法的值取自mixall的getwsaddr方法的返回值

mixall#getwsaddr

public static string getwsaddr() {
    string wsdomainname = system.getproperty("rocketmq.namesrv.domain", default_namesrv_addr_lookup);
    string wsdomainsubgroup = system.getproperty("rocketmq.namesrv.domain.subgroup", "nsaddr");
    string wsaddr = "http://" + wsdomainname + ":8080/rocketmq/" + wsdomainsubgroup;
    if (wsdomainname.indexof(":") > 0) {
        wsaddr = "http://" + wsdomainname + "/rocketmq/" + wsdomainsubgroup;
    }
    return wsaddr;
}

通过分析上述方法,可以知道,默认会从http://jmenv.tbsite.net:8080/rocketmq/nsaddr这个链接处拉取nameserver的地址。当然,如果你想更改这个链接,可以修改系统属性"rocketmq.namesrv.domain"和"rocketmq.namesrv.domain.subgroup"的值以达到目的。

第二,判断clientconfig对象的namesrvaddr值是否为null,若是,开启定时任务,周期性地更新nameserver服务器的地址

mqclientinstance#startscheduledtask

private void startscheduledtask() {
    // 没有手动指定name-server地址的情况下,两分钟更新一次name-server地址
    // 这个就是name-server可以动态变化的唯一途径
    if (null == this.clientconfig.getnamesrvaddr()) {
        // 两分钟拉取更新一次name-server地址
        this.scheduledexecutorservice.scheduleatfixedrate(new runnable() {

            @override
            public void run() {
                try {
                    mqclientinstance.this.mqclientapiimpl.fetchnameserveraddr();
                } catch (exception e) {
                    log.error("scheduledtask fetchnameserveraddr exception", e);
                }
            }
        }, 1000 * 10, 1000 * 60 * 2, timeunit.milliseconds);
    }
}

可以看出,默认每隔2分钟会调用mqclientapiimpl的fetchnameserveraddr方法更新nameserver地址,具体逻辑上述第一条已经解释过不再赘述

brokerserver

brokerserver的启动类是brokerstartup,在brokerstartup的main方法中,会创建brokercontroller的实例对象并调用其start方法,而在创建brokercontroller的实例对象时会一并调用其initialize方法进行初始化,就在这个初始化方法中,有跟我们该篇分析相关的内容。

brokerstartup#main

public static void main(string[] args) {
    start(createbrokercontroller(args));
}

brokerstartup#createbrokercontroller

public static brokercontroller createbrokercontroller(string[] args) {
    // 省略部分代码
    try {
        // 省略部分代码
        boolean initresult = controller.initialize();
        // 省略部分代码
        return controller;
    } catch (throwable e) {
        e.printstacktrace();
        system.exit(-1);
    }

    return null;
}

brokercontroller#initialize

public boolean initialize() throws clonenotsupportedexception {
    // 将本地文件中存储的数据加载至内存
    boolean result = this.topicconfigmanager.load();
    result = result && this.consumeroffsetmanager.load();
    result = result && this.subscriptiongroupmanager.load();
    result = result && this.consumerfiltermanager.load();

    // 省略部分代码

    result = result && this.messagestore.load();

    if (result) {
        // 省略部分代码
        if (this.brokerconfig.getnamesrvaddr() != null) {
            this.brokerouterapi.updatenameserveraddresslist(this.brokerconfig.getnamesrvaddr());
            log.info("set user specified name server address: {}", this.brokerconfig.getnamesrvaddr());
        } else if (this.brokerconfig.isfetchnamesrvaddrbyaddressserver()) {
            // 没有明确指定name-server的地址,且配置了允许从地址服务器获取name-server地址
            // 每隔2分钟从name-server地址服务器拉取最新的配置
            this.scheduledexecutorservice.scheduleatfixedrate(new runnable() {

                @override
                public void run() {
                    try {
                        brokercontroller.this.brokerouterapi.fetchnameserveraddr();
                    } catch (throwable e) {
                        log.error("scheduledtask fetchnameserveraddr exception", e);
                    }
                }
            }, 1000 * 10, 1000 * 60 * 2, timeunit.milliseconds);
        }

        // 省略部分代码
    }
    return result;
}

可以看到,brokerconfig中如果namesrvaddr变量值为null,默认每隔2分钟会调用brokerouterapi的fetchnameserveraddr方法更新nameserver地址

brokerouterapi#fetchnameserveraddr

public string fetchnameserveraddr() {
    try {
        string addrs = this.topaddressing.fetchnsaddr();
        if (addrs != null) {
            if (!addrs.equals(this.namesrvaddr)) {
                log.info("name server address changed, old: {} new: {}", this.namesrvaddr, addrs);
                this.updatenameserveraddresslist(addrs);
                this.namesrvaddr = addrs;
                return namesrvaddr;
            }
        }
    } catch (exception e) {
        log.error("fetchnameserveraddr exception", e);
    }
    return namesrvaddr;
}

该方法与mqclientapiimpl的fetchnameserveraddr方法逻辑几乎一模一样,也不再进行赘述。

注: 

第三种配置nameserver地址的方法,那就是系统启动的时候,不配置系统属性rocketmq.namesrv.addr和环境变量namesrv_addr,这样系统会自动访问一个url从远端服务器拉取nameserver的地址,同时会开启相应的定时任务定时刷新nameserver地址。

总结

brokerserver、producer、consumer程序启动获取nameserver地址的三种方式

  • 1.配置系统属性:rocketmq.namesrv.addr
  • 2.配置环境变量:namesrv_addr
  • 3.不配置系统属性rocketmq.namesrv.addr和环境变量namesrv_addr,让程序自动访问一个url从远端服务器拉取nameserver的地址,同时会开启相应的定时任务定时刷新nameserver地址。该url地址默认为http://jmenv.tbsite.net:8080/rocketmq/nsaddr,可以通过修改系统属性rocketmq.namesrv.domain和rocketmq.namesrv.domain.subgroup以达到修改url地址的目的。

只有上述第3种方式才可以实现程序运行过程中动态更新nameserver地址值。

以上为个人经验,希望能给大家一个参考,也希望大家多多支持代码网。

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2026  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com