当前位置: 代码网 > 手机>品牌>华为 > 模拟实现RabbitMQ,2024年最新我在华为做大数据开发外包的真实经历

模拟实现RabbitMQ,2024年最新我在华为做大数据开发外包的真实经历

2024年07月31日 华为 我要评论
log.info(“[MemoryDataCenter] 消息已成功从队列中取出 queueName:” + queueName + “, messageId:” + message.getMessageId());log.info(“[MemoryDataCenter] 消息已成功添加到队列 queueName:” + queueName + “, messageId:” + message.getMessageId());

先自我介绍一下,小编浙江大学毕业,去过华为、字节跳动等大厂,目前阿里p7

深知大多数程序员,想要提升技能,往往是自己摸索成长,但自己不成体系的自学效果低效又漫长,而且极易碰到天花板技术停滞不前!

因此收集整理了一份《2024年最新大数据全套学习资料》,初衷也很简单,就是希望能够帮助到想自学提升又不知道该从何学起的朋友。
img
img
img
img
img

既有适合小白学习的零基础资料,也有适合3年以上经验的小伙伴深入学习提升的进阶课程,涵盖了95%以上大数据知识点,真正体系化!

由于文件比较多,这里只是将部分目录截图出来,全套包含大厂面经、学习笔记、源码讲义、实战项目、大纲路线、讲解视频,并且后续会持续更新

如果你需要这些资料,可以添加v获取:vip204888 (备注大数据)
img

正文


后续的操作都是围绕这几个核心概念来增删改查的。


### 数据库管理


由于mysql数据库比较重量,在此就选择使用相对轻量的数据库sqlite. mysql是客户端服务器结构的程序,而sqlite是一个本地的数据库,操作它相当于是直接操作本地文件。


在java中使用sqlite,也不需要额外的安装,可以直接使用maven,将相关依赖引入,然后进行配置就可以正常使用了。


**sqlite的依赖:**



    <!-- https://mvnrepository.com/artifact/org.xerial/sqlite-jdbc -->
    <dependency>
        <groupid>org.xerial</groupid>
        <artifactid>sqlite-jdbc</artifactid>
        <version>3.42.0.0</version>
    </dependency>

**application.yml:**



spring:
datasource:
url: jdbc:sqlite:./data/meta.db
username:
password:
driver-class-name: org.sqlite.jdbc


sqlite是把数据存储在当前硬盘的某个指定文件中,在这里就存储在了工作路径的data目录中的meta.db文件了。


sqlite也不需要指定用户名和密码,毕竟是自己一个人用的,只有本地主机才能使用。


接下来就是建库和建表。在mysql中,需要自己手动进行创建database,但在sqlite中,则无需手动建库了,每一个数据库文件就是一个数据库,即当前的meta.db文件就相当于是mysql中的database。


为了能够即用即创建表,在这考虑部署的时候就自动创建表,即通过mybatis的update注解来建表并添加一些基础的查询、增加、删除操作。



@mapper
public interface metamapper {
//建表操作
@update(“create table if not exists exchange(” +
“name varchar(50) primary key,” +
“type int,” +
“durable boolean,” +
“autodelete boolean,” +
“arguments varchar(1024))”)
public void createexchangetable();

@update("create table if not exists queue(" +
        "name varchar(50) primary key," +
        "exclusive boolean," +
        "durable boolean," +
        "autodelete boolean," +
        "arguments varchar(1024))")
public void createqueuetable();

@update("create table if not exists binding(" +
        "exchangename varchar(50)," +
        "queuename varchar(50)," +
        "bindingkey varchar(256))")
public void createbindingtable();

//基础添加、删除、查询操作
@insert("insert into exchange values(#{name}, #{type}, #{durable}, #{autodelete}, #{arguments})")
public void insertexchange(exchange exchange);

@delete("delete from exchange where name = #{exchangename}")
public void deleteexchange(string exchangename);

@select("select * from exchange")
public list<exchange> selectallexchanges();

@insert("insert into queue values(#{name}, #{exclusive}, #{durable}, #{autodelete}, #{arguments})")
public void insertqueue(msgqueue msgqueue);

@delete("delete from queue where name = #{queuename}")
public void deletequeue(string queuename);

@select("select * from queue")
public list<msgqueue> selectallqueues();

@insert("insert into binding values(#{exchangename}, #{queuename}, #{bindingkey})")
public void insertbinding(binding binding);

@delete("delete from binding where exchangename = #{exchangename} and queuename = #{queuename}")
public void deletebinding(string exchangename, string queuename);

@select("select * from binding")
public list<binding> selectallbindings();

}


由于sqlite中也不支持hashmap这种数据结构,因此使用string来存储。


mybatis在写入数据的时候,会自动的调用对象的getter方法,而在读出数据的时候,会自动调用setter方法,因此为了能够将arguments这个hashmap类型的数据存入到数据库中,我们需要转成字符串。


在此使用jackson来处理,引入相关依赖即可。


**jackson依赖:**



    <dependency>
        <groupid>com.fasterxml.jackson.core</groupid>
        <artifactid>jackson-databind</artifactid>
        <version>2.9.6</version>
    </dependency>

**getter和setter:**



public string getarguments() {
objectmapper objectmapper = new objectmapper();
try {
objectmapper.writevalueasstring(arguments);
} catch (jsonprocessingexception e) {
e.printstacktrace();
}
return “{}”;
}

public void setarguments(string arguments) {
objectmapper objectmapper = new objectmapper();
try {
this.arguments = objectmapper.readvalue(arguments, new typereference<map<string, object>>() {});
} catch (ioexception e) {
e.printstacktrace();
}
}


为了使程序的结构更加清晰,程序员更方便的操作数据库,考虑创建一个databasemanager类来封装数据库的相关操作。


**databasemanager:**



//封装数据库操作
//databasemanager不交给spring管理,由程序员手动管理
@slf4j
public class databasemanager {
//从spring上下文中手动获取metamapper的bean
private metamapper metamapper;

//带有业务逻辑的初始化操作
//完成建库建表操作
//如果不存在表就进行创建,并添加默认初始数据
public void init(){
    metamapper = mqblogapplication.context.getbean(metamapper.class);
    if(!checkdbexists()){
        //创建data目录
        file file = new file("./data");
        file.mkdirs();
        //建表并初始化数据
        createtable();
        createdefaultdata();
        log.info("[databasemanager] 数据库初始化完成");
    }else{
        log.info("[databasemanager] 数据库已经初始化");
    }
}

//删除数据库目录
//删除目录前,需要保证目录是空的
public void deletedb(){
    file file = new file("./data/meta.db");
    //删除目录里面的文件
    boolean issuccess = file.delete();
    if(issuccess){
        log.info("[databasemanager] 删除数据库文件成功");
    }else{
        log.info("[databasemanager] 删除数据库文件失败");
    }
    //删除目录
    file = new file("./data");
    issuccess = file.delete();
    if(issuccess){
        log.info("[databasemanager] 删除数据库目录成功");
    }else {
        log.info("[databasemanager] 删除数据库目录失败");
    }

}

//判断数据库(meta.db)是否存在
private boolean checkdbexists() {
    file dbfile = new file("./data/meta.db");
    return dbfile.exists();
}

//对 exchange,queue,binding 进行建表
private void createtable() {
    metamapper.createexchangetable();
    metamapper.createqueuetable();
    metamapper.createbindingtable();
    log.info("[databasemanager] 创建表完成");
}

//rabbitmq初始带有一个匿名交换机, 类型为 direct
private void createdefaultdata() {
    exchange exchange = new exchange();
    exchange.setname("");
    exchange.settype(exchangetype.direct);
    exchange.setdurable(true);
    exchange.setautodelete(false);
    metamapper.insertexchange(exchange);
    log.info("[databasemanager] 初始数据构造完成");
}

//增删查操作
public void insertexchange(exchange exchange){
    metamapper.insertexchange(exchange);
}

public void deleteexchange(string exchangename){
    metamapper.deleteexchange(exchangename);
}

public list<exchange> selectallexchanges(){
    list<exchange> exchanges = metamapper.selectallexchanges();
    return exchanges;
}

public void insertqueue(msgqueue queue){
    metamapper.insertqueue(queue);
}

public void deletequeue(string queuename){
    metamapper.deletequeue(queuename);
}

public list<msgqueue> selectallqueues(){
    return metamapper.selectallqueues();
}

public void insertbinding(binding binding){
    metamapper.insertbinding(binding);
}

public void deletebinding(string exchangename, string queuename){
    metamapper.deletebinding(exchangename, queuename);
}

public list<binding> selectallbindings(){
    return metamapper.selectallbindings();
}

}


### 文件管理


文件管理主要是对消息进行存储,消息是进行序列化存储的数据,不便于读取,因此我们无法区分一条消息的开始和结束。如果对每条消息都单独保存在一个文件中,可能会创建大量的文件。


我们现在做出如下约定:一条消息在硬盘上存储需要先花费4个字节存储消息体的长度,然后再存储消息本体。


![](https://img-blog.csdnimg.cn/direct/e33f820358e7428e8892252a19f76359.png)


在定义消息的时候,我们定义了两个辅助字段:offsetbeg、offsetend,分别记录一个消息在文件中存储的开始和结束。


![](https://img-blog.csdnimg.cn/direct/3c11996f7e72427ba8cdac3ca044cac8.png)


通过上述分析,我们可以知道,需要对message对象进行序列化存储在文件中,而offsetbeg和offsetend则不需要,保留原始值即可。需要对message类实现serializable接口,由于basicproperties也包含在类中,也需要实现接口,而offsetbeg,offsetend使用transient关键字描述,程序就不会再序列化了。



@data
public class message implements serializable {
//消息的基本属性
private basicproperties basicproperties;
//存储数据
private byte[] body;
//位置偏移量,便于在文件中取出数据
//采用[offsetbeg, offsetend)区间
//这两个属性是帮助从文件中读取数据的,不需要进行序列化
private transient long offsetbeg;
private transient long offsetend;
//表示此消息在文件中是否有效(采用逻辑删除)
//0x00 表示无效, 0x01 表示有效
private byte isvalid = 0x01;
}

@data
public class basicproperties implements serializable {
//为防止重复,使用uuid生成
private string messageid;
private boolean isdurable = false;
//口令
private string routingkey;
}


**binarytool:**



public class binarytool {
//将一个java对象序列化成一个字节数组
//这个类必须实现了serializable
public static byte[] tobytes(object object) throws ioexception {
//此处的 bytearrayoutputstream 可以不用关闭,是一个纯内存的对象,不涉及到文件描述符
try(bytearrayoutputstream bytearrayoutputstream = new bytearrayoutputstream()) {
try (objectoutputstream objectoutputstream = new objectoutputstream(bytearrayoutputstream)) {
//此处 writeobject 会将 object 对象进行序列化,生成的二进制字节数据,写入到objectoutputstream中
//objectoutputstream 关联到了 bytearrayoutputstream, 最终就写到了 bytearrayoutputstream
objectoutputstream.writeobject(object);
}
//将 bytearrayoutputstream 中序列化后的二进制数据 转成byte[] 数组
return bytearrayoutputstream.tobytearray();
}
}

public static object frombytes(byte[] data) throws ioexception, classnotfoundexception {
    try(bytearrayinputstream bytearrayinputstream = new bytearrayinputstream(data)){
        try(objectinputstream objectinputstream = new objectinputstream(bytearrayinputstream)){
            //此处的 readobject 将 data byte[] 转成对象
            return objectinputstream.readobject();
        }
    }
}

}


由于每条消息都是存储在队列上的,因此就按照队列的维度进行存储。即每个队列都是一个目录,在每个目录下都有两个文件,一个是data数据文件,用来存储消息,另一个是state文件,用来描述data数据文件中的情况,为了后面可以进行gc。


此处的data文件是二进制文件,而state文件则是字符型文件,里面用一个数表示消息的总数,再用一个数表示消息的有效个数。因为采用的是逻辑删除的方式,也就是说被删除的消息不会真正意义上删除,当文件中存储很多数据的时候,就会大大影响我们的读取效率,因此我们需要使用gc来真正删除垃圾消息。在这规定:当文件有1000个消息的时候,并且垃圾消息占比50%,我们就进行一次gc。


在这我们也封装一个类来帮助我们处理文件的操作。


**messagefilemanager:**



@slf4j
//对硬盘上的message进行管理
//规定每个队列都对应一个目录,
//每个目录下都包含两个文件:queue_data.txt, queue_state.txt,分别用来存储数据和统计数据个数
//
public class messagefilemanager {
public void init() {
//为了接口的统一性
//暂时不处理,后续可自行添加
}

//表示该队列下统计的文件信息
public static class state{
    public int totalcount = 0; //总消息数量
    public int validcount = 0; //有效消息数量
}

//获取消息所处队列的目录
private string getqueuedir(string queuename){
    return "./data/" + queuename;
}

//获取消息所处队列的数据文件路径
private string getqueuedatapath(string queuename){
    return getqueuedir(queuename) + "/queue_data.txt";
}

//获取消息所处队列的统计文件路径
private string getqueuestatepath(string queuename){
    return getqueuedir(queuename) + "/queue_state.txt";
}

//创建队列对应的目录和文件
public void createqueuefiles(string queuename) throws ioexception {
    //1.创建队列对应目录
    file directory = new file(getqueuedir(queuename));
    if(!directory.exists()){
        boolean success = directory.mkdirs();
        if(!success){
            throw new ioexception("[messagefilemanager] 创建目录失败" + directory.getabsolutefile());
        }
    }
    //2.创建队列对应数据文件
    file datafile = new file(getqueuedatapath(queuename));
    if(!datafile.exists()){
        boolean success = datafile.createnewfile();
        if(!success){
            throw new ioexception("[managerfilemanager] 创建文件失败" + datafile.getabsolutefile());
        }
    }
    //3.创建队列对应统计文件
    file statefile = new file(getqueuestatepath(queuename));
    if(!statefile.exists()){
        boolean success = statefile.createnewfile();
        if(!success){
            throw new ioexception("[managerfilemanager] 创建文件失败" + statefile.getabsolutefile());
        }
    }
    //4.初始化队列文件
    state state = new state();
    writestate(queuename, state);
}

//删除队列对应的目录和文件
public void destroyqueuefiles(string queuename) throws ioexception {
    //要删除目录需要先删除里面的文件
    file datafile = new file(getqueuedatapath(queuename));
    boolean success1 = datafile.delete();
    file statefile = new file(getqueuestatepath(queuename));
    boolean success2 = statefile.delete();
    file directory = new file(getqueuedir(queuename));
    boolean success3 = directory.delete();
    //确保文件都删除了
    if(!success1 || !success2 || !success3){
        throw new ioexception("[messagefilemanager] 删除队列文件失败" + datafile.getabsolutefile());
    }
}

//从文件中读取统计信息
private state readstate(string queuename){
    //state文件是文本文件,可以使用scanner来读写
    state state = new state();
    try(inputstream inputstream = new fileinputstream(getqueuestatepath(queuename))){
        scanner scanner = new scanner(inputstream);
        state.totalcount = scanner.nextint();
        state.validcount = scanner.nextint();
        return state;
    } catch (ioexception e) {
        e.printstacktrace();
    }
    return null;
}

//向文件中写入统计信息
private void writestate(string queuename, state state){
    //outputstream默认情况打开文件,会清空文件
    try(outputstream outputstream = new fileoutputstream(getqueuestatepath(queuename))){
        printwriter printwriter = new printwriter(outputstream);
        printwriter.print(state.totalcount + "\t" + state.validcount);
        printwriter.flush();
    } catch (ioexception e) {
        e.printstacktrace();
    }
}

//判断队列对应的数据文件和统计文件都存在
private boolean checkqueuefilesexist(string name) {
    //如果两个文件都存在了,则说明目录也存在,不需要额外判断
    file datafile = new file(getqueuedatapath(name));
    file statefile = new file(getqueuestatepath(name));
    if(!datafile.exists() || !statefile.exists()){
        return false;
    }
    return true;
}

//将消息写入对应的队列文件
public void sendmessage(msgqueue queue, message message) throws ioexception, mqexception {
    //1.判断对应的队列文件是否存在
    if(!checkqueuefilesexist(queue.getname())){
        throw new mqexception("[messagefilemanager] 队列文件不存在 queuename:" + queue.getname());
    }
    //2.将消息转成二进制
    synchronized (queue){
        byte[] data = binarytool.tobytes(message);
        //3.获取到文件末尾位置,设置文件位置的偏移量
        file datafile = new file(getqueuedatapath(queue.getname()));
        //每条消息前4个字节用来表示消息的长度
        message.setoffsetbeg(datafile.length() + 4);
        message.setoffsetend(datafile.length() + 4 + data.length);
        //4.写入消息文件
        try(outputstream outputstream = new fileoutputstream(datafile, true)){
            try(dataoutputstream dataoutputstream = new dataoutputstream(outputstream)){
                //写入消息的长度(4个字节)
                dataoutputstream.writeint(data.length);
                //写入消息本体
                dataoutputstream.write(data);
            }
        }
        //5.更新统计文件
        state state = readstate(queue.getname());
        state.totalcount += 1;
        state.validcount += 1;
        writestate(queue.getname(), state);
    }
}

//从硬盘上删除消息(逻辑删除)
//此处传的消息必须为有效消息
public void deletemessage(msgqueue queue, message message) throws ioexception, classnotfoundexception, mqexception {
    //1.判断队列对应的文件是否存在
    if(!checkqueuefilesexist(queue.getname())){
        throw new mqexception("[messagefilemanager] 队列文件不存在 queuename:" + queue.getname());
    }
    synchronized (queue){
        //2.从文件中获取到消息
        byte[] data = new byte[(int)message.getoffsetend() - (int)message.getoffsetbeg()];
        try(randomaccessfile randomaccessfile = new randomaccessfile(getqueuedatapath(queue.getname()), "rw")){
            //移动文件光标
            randomaccessfile.seek(message.getoffsetbeg());
            randomaccessfile.read(data);
            //3.将消息反序列化并标记为无效
            message diskmessage = (message)binarytool.frombytes(data);
            diskmessage.setisvalid((byte)0x00);
            //4.将消息重新写入文件
            randomaccessfile.seek(message.getoffsetbeg());
            randomaccessfile.write(binarytool.tobytes(diskmessage));
        }
        //5.更新统计文件
        state state = readstate(queue.getname());
        //逻辑删除,不需要修改totalcount
        state.validcount -= 1;
        writestate(queue.getname(), state);
    }
}

//从队列文件中读取所有 有效消息
//在程序启动时,进行调用,读取文件中的所有有效消息,加载到内存中
public linkedlist<message> loadallmessagefromqueue(string queuename) throws ioexception, mqexception, classnotfoundexception {
    linkedlist<message> messages = new linkedlist<>();
    try(inputstream inputstream = new fileinputstream(getqueuedatapath(queuename))){
        try(datainputstream datainputstream = new datainputstream(inputstream)){
            //记录当前文件光标,方便赋值message中offsetbeg和offsetend值
            long currentoffset = 0;
            while(true){
                //1.读取当前消息的长度
                //当 readint 读取到文件末尾的时候,会抛出异常 eofexception
                int messagesize = datainputstream.readint();
                //2.根据读取到的长度,在文件中读取
                byte[] buffer = new byte[messagesize];
                int actualsize = datainputstream.read(buffer);
                if(messagesize != actualsize){
                    throw new mqexception("[messagefilemanager] 文件读取错误 queuename:" + queuename);
                }
                //3.将字节数组反序列化成对象
                message message = (message) binarytool.frombytes(buffer);
                //4.判断当前消息是否为有效消息
                if(message.getisvalid() == 0x00){
                    currentoffset += (4 + messagesize);
                    continue;
                }
                //5.计算在文件中偏移量并赋值,将消息添加到链表中
                message.setoffsetbeg(currentoffset + 4);
                message.setoffsetend(currentoffset + 4 + messagesize);
                currentoffset += (4 + messagesize);
                messages.add(message);
            }
        } catch (eofexception e){
            //文件读取到末尾,并非真的异常
            log.info("[messagefilemanager] 文件读取完毕");
        }
    }
    return messages;
}

//获取新的队列数据文件
private string getqueuenewdatapath(string queuename){
    return getqueuedir(queuename) + "/new_queued_data.txt";
}

//约定 当文件中总消息数超过 1000 并且 无效消息的个数 超过一半 就进行一次gc
public boolean checkgc(string queuename){
    state state = readstate(queuename);
    if(state.totalcount > 1000 && (double)state.validcount / (double) state.totalcount > 0.5){
        return true;
    }
    return false;
}

//对消息文件进行gc操作
//此处使用复制算法的方式进行
public void gc(msgqueue queue) throws ioexception, mqexception, classnotfoundexception {
    //当一个线程进行gc的时候,其他线程不允许进行操作
    synchronized (queue){
        //由于gc操作比较耗时,在这输出一下gc的时间
        //计算gc开始时间
        long gcbeg = system.currenttimemillis();
        //1.创建新文件
        file newdatafile = new file(getqueuenewdatapath(queue.getname()));
        if(newdatafile.exists()){
            log.info("[messagefilemanager] 存在复制的目标文件");
            newdatafile.delete();
        }
        if(!newdatafile.createnewfile()){
            throw new mqexception("[messagefilemanager] 复制文件创建失败,queuename:" + queue.getname());
        }
        //2.从旧文件中读取消息
        //此处读取到的信息都是有效信息
        linkedlist<message> messages = loadallmessagefromqueue(queue.getname());
        //3.将有效消息写入新文件
        try(outputstream outputstream = new fileoutputstream(newdatafile)){
            try(dataoutputstream dataoutputstream = new dataoutputstream(outputstream)) {
                for (message message : messages) {
                    //不调用sendmessage,防止outputstream频繁关闭影响性能
                    byte[] buffer = binarytool.tobytes(message);
                    dataoutputstream.writeint(buffer.length);
                    dataoutputstream.write(buffer);
                }
            }
        }
        //4.让新文件替换就旧文件
        file olddatafile = new file(getqueuedatapath(queue.getname()));
        if(!olddatafile.delete()){
            throw new mqexception("[messagefilemanager] 旧数据文件删除失败 olddatafile:" + olddatafile.getabsolutefile());
        }
        if(!newdatafile.renameto(olddatafile)){
            throw new mqexception("[messagefilemanager] 文件重命名失败 newdatafile:" + newdatafile.getabsolutefile());
        }
        //5.更新统计信息的文件
        state state = readstate(queue.getname());
        state.totalcount = state.validcount;
        writestate(queue.getname(), state);

        //计算gc结束时间
        long gcend = system.currenttimemillis();
        log.info("[messagefilemanager] " + queue.getname() + "gc完成, 总耗时: " + (gcend - gcbeg) + "ms");
    }
}

}


### 封装硬盘


为了方便后续对数据进行持久化,即不用去关注数据到底是存在数据库还是文件中,在此再封装一个diskdatacenter类来统一对数据持久化。



/*

  • 通过这个类来管理文件中的数据和数据库数据

  • 调用者不用再关注数据是储存在硬盘文件中还是数据库中

  • 通过这个类完成对 交换机、队列、绑定、消息 的增删查
    */
    public class diskdatacenter {
    private databasemanager databasemanager = new databasemanager();
    private messagefilemanager messagefilemanager = new messagefilemanager();

    //对实例进行初始化
    public void init(){
    databasemanager.init();
    messagefilemanager.init();
    }

    //交换机
    public void insertexchange(exchange exchange){
    databasemanager.insertexchange(exchange);
    }

    public void deleteexchange(string exchangename){
    databasemanager.deleteexchange(exchangename);
    }

    public list selectallexchanges(){
    return databasemanager.selectallexchanges();
    }

    //队列
    public void insertqueue(msgqueue queue) throws ioexception {
    databasemanager.insertqueue(queue);
    //每个队列都关联一个目录文件(子文件为queue_data.txt、queue_state.txt)
    //需要将对应的文件创建
    messagefilemanager.createqueuefiles(queue.getname());
    }

    public void deletequeue(string queuename) throws ioexception {
    databasemanager.deletequeue(queuename);
    //需要将队列对应的文件都删除
    messagefilemanager.destroyqueuefiles(queuename);
    }

    public list selectallqueues(){
    return databasemanager.selectallqueues();
    }

    //绑定
    public void insertbinding(binding binding){
    databasemanager.insertbinding(binding);
    }

    public void deletebinding(binding binding){
    databasemanager.deletebinding(binding.getexchangename(), binding.getqueuename());
    }

    public list selectallbindings(){
    return databasemanager.selectallbindings();
    }

    //消息
    public void insertmessage(msgqueue queue, message message) throws ioexception, mqexception {
    messagefilemanager.sendmessage(queue, message);
    }

    public void deletemessage(msgqueue queue, message message) throws ioexception, classnotfoundexception, mqexception {
    messagefilemanager.deletemessage(queue, message);
    //判断是否需要gc
    if(messagefilemanager.checkgc(queue.getname())){
    messagefilemanager.gc(queue);
    }
    }

    public linkedlist loadallmessagefromqueue(string queuename) throws ioexception, mqexception, classnotfoundexception {
    return messagefilemanager.loadallmessagefromqueue(queuename);
    }
    }


### 内存管理


上述只是完成了持久化操作,但是真正的主战场还是在内存上,硬盘只是用来保证服务器重启后,数据不丢失。


为了方便后续对象的查找,我们可以使用hashmap的数据结构来存储。


**交换机:使用hashmap,key 为 exchangename,value 为 exchange 对象.**


**队列:使用hashmap,key 为 queuename,value 为 queue 对象.**


**绑定:使用嵌套hashmap, 第一个 key 为exchangename ,第一个 value 为 hashmap<string, binding>**


**第二个 key 为 queuename,第二个 value 为 binding 对象.**


表示的含义是,先查找交换机邦迪的所有队列,然后再找到指定绑定.


**消息:使用hashmap,key 为 messageid, value 为 message 对象.**


由于我们的消息是依附于队列的,我们还需要使用数据结构来表示这样的关系,即在队列中存储着消息。


**队列和消息之间的关系:使用hashmap,key 为 queuename, value 为 linkedlist,里面存储message对象.**


**待确认的消息:使用嵌套hashmap,第一个 key 为 queuename,第一个 value 为 hashmap<string, message>**


**第二个 key 为 messageid,第二个 value 为 message对象.**


当消费者从队列中取了一个消息,我们无法保证有没有丢失或读错,但可以通过应答的方式来保证可靠性,即消费者消费完了这个消息后,发送一个确认应答告诉服务器,后续服务器就可以删除了,但在一些对可靠性不高的场景中,手动应答的方式就会比较鸡肋,于是我们在这里提供两种应答方式:1.自动应答 2.手动应答



/*

  • 管理内存中的数据

  • 由于会在多线程的场景下使用,需要注意线程安全
    */
    @slf4j
    public class memorydatacenter {
    //在内存中需要管理 交换机、队列、绑定、消息

    //key: exchangename, value: exchange对象
    private concurrenthashmap<string, exchange> exchangemap = new concurrenthashmap<>();
    //key: queuename, value: queue对象
    private concurrenthashmap<string, msgqueue> queuemap = new concurrenthashmap<>();
    //key: exchangename, value: 该交换机与队列的所有绑定关系映射集合(hashmap)
    private concurrenthashmap<string, concurrenthashmap<string, binding>> bindingmap = new concurrenthashmap<>();
    //key: messageid, value: message对象
    private concurrenthashmap<string, message> messagemap = new concurrenthashmap<>();
    //key: queuename, value: 该队列下的消息对象的集合
    private concurrenthashmap<string, linkedlist> queuemessagemap = new concurrenthashmap<>();
    //key: queuename, value: 该队列下未应答的消息映射集合
    //第二个key: messageid(uuid表示)
    private concurrenthashmap<string, concurrenthashmap<string, message>> queuemessagewaitackmap = new concurrenthashmap<>();

    //交换机
    public void insertexchange(exchange exchange){
    exchangemap.put(exchange.getname(), exchange);
    log.info(“[memorydatacenter] 交换机插入成功, exchangename:” + exchange.getname());
    }

    public void deleteexchange(string exchangename){
    exchangemap.remove(exchangename);
    log.info(“[memorydatacenter] 交换机删除成功, exchangename:” + exchangename);
    }

    public exchange getexchange(string exchangename){
    return exchangemap.get(exchangename);
    }

    //队列
    public void insertqueue(msgqueue queue){
    queuemap.put(queue.getname(), queue);
    log.info(“[memorydatacenter] 队列插入成功, queuename:” + queue.getname());
    }

    public void deletequeue(string queuename){
    queuemap.remove(queuename);
    log.info(“[memorydatacenter] 队列删除成功, queuename:” + queuename);
    }

    public msgqueue getqueue(string queuename){
    return queuemap.get(queuename);
    }

    //绑定
    public void insertbinding(binding binding) throws mqexception {
    // concurrenthashmap<string, binding> queuebindingmap = bindingmap.get(binding.getexchangename());
    concurrenthashmap<string, binding> queuebindingmap = bindingmap.computeifabsent(binding.getexchangename(), k -> new concurrenthashmap<>());
    //检查嵌套的哈希表是否存在
    synchronized (queuebindingmap){
    //这样写需要再创建一把锁
    // if(queuebindingmap == null){
    // queuebindingmap = new concurrenthashmap<>();
    // bindingmap.put(binding.getexchangename(), queuebindingmap);
    // }
    if(queuebindingmap.get(binding.getqueuename()) != null){
    throw new mqexception(“[memorydatacenter] 绑定关系已经存在 exchangename:” + binding.getexchangename() +
    “, queuename” + binding.getqueuename());
    }
    queuebindingmap.put(binding.getqueuename(), binding);
    log.info(“[memorydatacenter] 绑定关系插入成功 exchangename:”+ binding.getexchangename() +
    “, queuename” + binding.getqueuename());
    }
    }

    public void deletebinding(binding binding) throws mqexception {
    concurrenthashmap<string, binding> queuebindingmap = bindingmap.get(binding.getexchangename());
    if(queuebindingmap == null){
    throw new mqexception(“[memorydatacenter] 该绑定不存在 exchangename:” + binding.getexchangename() +
    “, queuename” + binding.getqueuename());
    }
    bindingmap.remove(binding.getexchangename());
    log.info(“[memorydatacenter] 绑定关系删除成功 exchangename:”+ binding.getexchangename() +
    “, queuename” + binding.getqueuename());
    }

    //两个版本的获取绑定:
    //1.获取队列和交换机之间的唯一绑定
    public binding getbinding(string exchangename, string queuename) throws mqexception {
    concurrenthashmap<string, binding> queuebindingmap = bindingmap.get(exchangename);
    if(queuebindingmap == null){
    return null;
    }
    return queuebindingmap.get(queuename);
    }
    //2.获取当前当前交换机下的所有绑定
    public concurrenthashmap<string, binding> getbindings(string exchangename){
    return bindingmap.get(exchangename);
    }

    //消息
    //向消息中心添加消息
    public void addmessage(message message){
    messagemap.put(message.getmessageid(), message);
    log.info(“[memorydatacenter] 消息添加完成 messageid:” + message.getmessageid());
    }

    public void removemessage(string messageid){
    messagemap.remove(messageid);
    log.info(“[memorydatacenter] 消息删除完成 messageid:” + messageid);
    }

    public message getmessage(string messageid){
    return messagemap.get(messageid);
    }

    //将消息发送到指定队列
    public void sendmessage(string queuename, message message){
    // linkedlist messages = queuemessagemap.get(queuename);
    linkedlist messages = queuemessagemap.computeifabsent(queuename, k -> new linkedlist<>());
    synchronized (messages){
    //queuemessagemap.put(queuename, messages);
    //将消息添加到队列中
    messages.add(message);
    }
    //将消息添加到消息中心
    addmessage(message);
    log.info(“[memorydatacenter] 消息已成功添加到队列 queuename:” + queuename + “, messageid:” + message.getmessageid());
    }

    //从指定队列中取消息
    public message pollmessage(string queuename){
    linkedlist messages = queuemessagemap.get(queuename);
    //当链表中没有元素
    if(messages == null){
    return null;
    }
    synchronized (messages){
    if(messages.size() == 0){
    return null;
    }
    message message = messages.remove(0);
    log.info(“[memorydatacenter] 消息已成功从队列中取出 queuename:” + queuename + “, messageid:” + message.getmessageid());
    return message;
    }
    }

    //获取指定队列中消息的个数
    public int getmessagesize(string queuename){
    linkedlist messages = queuemessagemap.get(queuename);
    if(messages == null){
    return 0;
    }
    //linkedlist 线程不安全
    synchronized (messages){
    return messages.size();
    }
    }

    //添加未被确认消息
    public void addmessagewaitack(string queuename, message message){
    // concurrenthashmap<string, message> messagewaitack = queuemessagewaitackmap.get(queuename);
    concurrenthashmap<string, message> messagewaitack = queuemessagewaitackmap.computeifabsent(queuename, k -> new concurrenthashmap<>());
    synchronized (messagewaitack){
    // if(messagewaitack == null){
    // messagewaitack = new concurrenthashmap<>();
    // queuemessagewaitackmap.put(queuename, messagewaitack);
    // }
    messagewaitack.put(message.getmessageid(), message);
    }
    log.info(“[memorydatacenter] 消息进入待确认队列 queuename:” + queuename + “, messageid:” + message.getmessageid());
    }

    //删除未确认消息(用户确认了消息)
    public void deletemessagewaitack(string queuename, string messageid){
    concurrenthashmap<string, message> messagewaitack = queuemessagewaitackmap.get(queuename);
    if(messagewaitack == null){
    return;
    }
    messagewaitack.remove(messageid);
    log.info(“[memorydatacenter] 消息已从待确认队列中删除 queuename:” + queuename + “, messageid:” + messageid);
    }

    //从待确认队列中获取指定消息
    public message getmessagewaitack(string queuename, string messageid){
    concurrenthashmap<string, message> messagewaitack = queuemessagewaitackmap.get(queuename);
    if(messagewaitack == null){
    return null;
    }
    return messagewaitack.get(messageid);
    }

    //从硬盘上恢复数据到内存中
    public void recovery(diskdatacenter diskdatacenter) throws mqexception, ioexception, classnotfoundexception {
    //1.清空之前的数据
    exchangemap.clear();
    queuemap.clear();
    bindingmap.clear();
    messagemap.clear();
    queuemessagemap.clear();
    //2.恢复交换机
    list exchangelist = diskdatacenter.selectallexchanges();
    for(exchange exchange : exchangelist){
    insertexchange(exchange);
    }
    //3.恢复队列
    list queuelist = diskdatacenter.selectallqueues();
    for(msgqueue queue : queuelist){
    insertqueue(queue);
    }
    //4.恢复绑定
    list bindinglist = diskdatacenter.selectallbindings();
    for(binding binding : bindinglist){
    insertbinding(binding);
    }
    //5.恢复消息
    for(msgqueue queue : queuelist){
    linkedlist messagelist = diskdatacenter.loadallmessagefromqueue(queue.getname());
    queuemessagemap.put(queue.getname(), messagelist);
    for(message message : messagelist){
    addmessage(message);
    }
    }
    }
    }


在内存中,需要考虑很多多线程的情况。对于add操作,往往会涉及到判空或者判断是否存在,有很大概率出现线程不安全的情况,因此需要加锁,而对于delete、select操作,加锁不一定是很必要的,需要具体问题具体分析。能用线程安全的尽量使用,但也要考虑锁粒度是否会影响效率~


### 虚拟主机


在虚拟主机中需要实现对内存硬盘的管理,并提供对交换机、队列、绑定、消息管理的核心api,供上层服务器调用(服务器主要处理网络请求,然后调用这边的api实现业务)。使用虚拟主机的目的是为了隔离交换机、队列、绑定、消息,一个服务器上可以有多个虚拟主机,在rabbitmq中,就提供了对虚拟主机的创建、删除操作,但在本项目中先不实现。


要想实现隔离的效果,我们得知道这些交换机是从属于哪个虚拟主机的。不·难发现,一个虚拟机可以对应多个交换机,而一个交换机只能对应一个虚拟主机,这是一个"一对多"的关系。


方案一:参照数据库中“一对多”的关系来设计,给交换机增加一个属性,表示从属的虚拟主机;


方案二:给交换机增加一个前缀名,即为虚拟主机的名字,然后可以根据前缀知道从属的虚拟主机;


方案三:为每个虚拟主机单独分配数据库和文件,从物理上真正实现隔离。


此处选择的是方案二。




---


先来对交换机、队列、绑定的增删.



public class virtualhost {
private string virtualhostname;
//硬盘数据管理
private diskdatacenter diskdatacenter = new diskdatacenter();
//内存数据管理
private memorydatacenter memorydatacenter = new memorydatacenter();

private router router = new router();

private final object exchangelocker = new object();

private final object queuelocker = new object();

public virtualhost(string name){
    //需要对数据进行初始化(数据库建库建表操作)
    this.virtualhostname = name;
    diskdatacenter.init();
    //将硬盘上的数据恢复到内存中
    try {
        memorydatacenter.recovery(diskdatacenter);
    } catch (mqexception | ioexception | classnotfoundexception e) {
        log.error("[virtualhost] 恢复内存数据失败");
        e.printstacktrace();
    }
}


//创建交换
public boolean exchangedeclare(string exchangename, exchangetype exchangetype, boolean durable,
                               boolean autodelete, map<string, object> arguments) {
    //为了进行虚拟主机的隔离,此处采用在交换机的前缀加上虚拟主机名的方式
    exchangename = virtualhostname + "_" + exchangename;
    try{
        synchronized (exchangelocker){
            //1.判断交换机是否存在
            exchange exsitsexchange = memorydatacenter.getexchange(exchangename);
            if(exsitsexchange != null){
                throw new mqexception("[virtualhost] 交换机已经存在 exchangename:" + exchangename);
            }
            //2.创建交换机
            exchange exchange = new exchange();
            exchange.setname(exchangename);
            exchange.settype(exchangetype);
            exchange.setdurable(durable);
            exchange.setautodelete(autodelete);
            exchange.setarguments(arguments);
            //3.将交换机写入硬盘
            if(durable){
                diskdatacenter.insertexchange(exchange);
            }
            //4.将交换机写入内存
            memorydatacenter.insertexchange(exchange);
            log.info("[virtualhost] 交换机创建成功 exchangename:" + exchangename);
            return true;
        }
    } catch (exception e){
        log.error("[virtualhost] 交换机创建失败 exchangename:" + exchangename);
        e.printstacktrace();
        return false;
    }
}

//删除交换机
public boolean exchangedelete(string exchangename){
    exchangename = virtualhostname + "_" + exchangename;
    try{
        synchronized (exchangelocker){
            //1.判断交换机是否存在
            exchange existsexchange = memorydatacenter.getexchange(exchangename);
            if(existsexchange == null){
                throw new mqexception("[virtualhost] 交换机不存在 exchangename:" + exchangename);
            }
            //2.将交换机从硬盘上删除
            //此处不要取反,isdurable 表示 是否存在硬盘上
            //如果对象存储在硬盘上才要删除,没有存在硬盘上就不用
            if(existsexchange.isdurable()){
                diskdatacenter.deleteexchange(exchangename);
            }
            //3.将交换机从内存中删除
            memorydatacenter.insertexchange(existsexchange);
            log.info("[virtualhost] 交换机删除成功 exchangename:" + exchangename);
        }
        return true;
    } catch (exception e){
        log.error("[virtualhost] 交换机删除失败 exchangename:" + exchangename);
        e.printstacktrace();
        return false;
    }
}

//创建队列
public boolean queuedeclare(string queuename, boolean exclusive, boolean durable, boolean autodelete,
                            map<string, object> arguments){
    queuename = virtualhostname + "_" + queuename;
    try{
        synchronized (queuelocker){
            msgqueue existsqueue = memorydatacenter.getqueue(queuename);
            if(existsqueue != null){
                throw new mqexception("[virtualhost] 队列已经存在 queuename:" + queuename);
            }
            //创建队列
            msgqueue queue = new msgqueue();
            queue.setname(queuename);
            queue.setdurable(durable);
            queue.setexclusive(exclusive);
            queue.setautodelete(autodelete);
            queue.setarguments(arguments);
            //写入硬盘
            if(durable){
                diskdatacenter.insertqueue(queue);
            }
            //写入内存
            memorydatacenter.insertqueue(queue);
            log.info("[virtualhost] 队列创建成功 queuename:" + queuename);
        }
        return true;
    } catch (exception e){
        log.error("[virtualhost] 队列创建失败 queuename:" + queuename);
        e.printstacktrace();
        return false;
    }
}

//删除队列
public boolean queuedelete(string queuename){
    queuename = virtualhostname + "_" + queuename;
    try{
        synchronized (queuelocker){
            msgqueue existsqueue = memorydatacenter.getqueue(queuename);
            if(existsqueue == null){
                throw new mqexception("[virtualhost] 队列不存在 queuename:" + queuename);
            }
            if(existsqueue.isdurable()){
                diskdatacenter.deletequeue(queuename);
            }
            memorydatacenter.deletequeue(queuename);
            log.error("[virtualhost] 队列删除成功 queuename:" + queuename);
            return true;
        }
    }catch (exception e){
        log.error("[virtualhost] 队列删除失败 queuename:" + queuename);
        e.printstacktrace();
        return false;
    }
}

//添加绑定关系
public boolean queuebind(string exchangename, string queuename, string bindingkey){
    exchangename = virtualhostname + "_" + exchangename;
    queuename = virtualhostname + "_" + queuename;
    try{
        synchronized (exchangelocker){
            synchronized (queuelocker){
                //1.验证转发规则
                //等下实现~
                if(!router.checkbindingkey(bindingkey)){
                    throw new mqexception("[virtualhost] bindingkey非法 bindingkey:" + bindingkey);
                }
                //2.判断绑定关系是否已经存在
                binding existsbinding = memorydatacenter.getbinding(exchangename, queuename);
                if(existsbinding != null){
                    throw new mqexception("[virtualhost] 绑定已经存在 exchangename:" + exchangename + ", queuename:" + queuename);
                }
                //3.判断交换机和队列是否都存在
                exchange existsexchange = memorydatacenter.getexchange(exchangename);
                msgqueue existsqueue = memorydatacenter.getqueue(queuename);
                if(existsexchange == null){
                    throw new mqexception("[virtualhost] 交换机不存在 exchangename:" + exchangename);
                }
                if(existsqueue == null){
                    throw new mqexception("[virtualhost] 队列不存在 queuename:" + queuename);
                }
                //4.创建绑定
                binding binding = new binding();
                binding.setexchangename(exchangename);
                binding.setqueuename(queuename);
                binding.setbindingkey(bindingkey);
                //5.将绑定写入硬盘
                diskdatacenter.insertbinding(binding);
                //6.将绑定写入内存
                memorydatacenter.insertbinding(binding);
                log.info("[virtualhost] 绑定添加成功 exchangename:" + exchangename + ", queuename:" + queuename);
            }
        }
        return true;
    } catch (exception e){
        log.error("[virtualhost] 绑定创建失败 exchangename:" + exchangename + ", queuename:" + queuename);
        e.printstacktrace();
    }
    return false;
}

//解除绑定
public boolean queueunbind(string exchangename, string queuename){
    exchangename = virtualhostname + "_" + exchangename;
    queuename = virtualhostname + "_" + queuename;
    try{
        synchronized (exchangelocker){
            synchronized (queuelocker){
                //检查是否有这个绑定
                binding existsbinding = memorydatacenter.getbinding(exchangename, queuename);
                if(existsbinding == null){
                    throw new mqexception("[virtualhost] 绑定不存在 exchangename:" + exchangename + ", queuename:" + queuename);
                }
                //从硬盘中删除绑定关系
                diskdatacenter.deletebinding(existsbinding);
                //从内存中删除绑定关系
                memorydatacenter.deletebinding(existsbinding);
                log.info("[virtualhost] 绑定解除成功 exchangename:" + exchangename + ", queuename:" + queuename);
            }
        }
        return true;
    } catch (exception e){
        log.error("[virtualhost] 绑定解除失败 exchangename:" + exchangename + ", queuename:" + queuename);
        e.printstacktrace();
        return false;
    }
}

虽然我们在memorydatacenter中已经进行过了加锁操作,但是现在又再virtualhost中又进行了加锁,是否之前的锁就没有意义了呢?


当然不是,因为无法事先知道什么时候、在哪去调用这些api,如果一个其他线程不安全的类去调用memorydatacenter中的一些api,就会引发线程安全问题。


在virtualhost中定义了一把 交换机锁、 一把队列锁,这些锁的力度比较大,当一个线程去操作交换机a,此时另一个线程即使操作的是交换机b,也会进行阻塞,这会大大降低程序效率。不过影响不是很大,因为上述的这些操作都属于是低频操作,消息队列主要还是消息的转发~




---


上述实现了一些辅助消息交换转发的api,现在要来实现消息队列中最核心的两个api:发布消息 basicpublish 和 订阅消息 basicconsume.


**basicpublish发布消息**


发布消息是指生产者将一个消息发送到指定交换机上,然后交换机根据自身的类型,进行消息的转发,发送到与之绑定的队列中。


交换机有3种类型(本项目中实现了3种):


1.dircet 直接交换机,消息中的routinkey就是队列名,直接将消息发送给队列即可.


2.fanout 扇出交换机,消息会发送到所有与之绑定的队列中,此时消息中的routingkey无效.


3.topic 主题交换机,消息会发送给所有与之绑定且binding中的bindingkey 与 消息中的 rouingkey 匹配的队列.


首先我们先来实现路由转发的规则,大致包含如下几个api:checkroutingkey、checkbindingkey、route.


**router:**



这个类用来实现路由转发的规则
/
public class router {
//规定:
//bindingkey 由 数字,字母,下划线组成,通过.来分隔,允许通配符
,#存在
//形如: 123.abc..aa.#
public boolean checkbindingkey(string bindingkey) {
for (int i = 0; i < bindingkey.length(); i++) {
char ch = bindingkey.charat(i);
if ((ch >= ‘a’ && ch <= ‘z’) || (ch >= ‘0’ && ch <= ‘9’) || (ch >= ‘a’ && ch <= ‘z’)) {
continue;
}
if (ch == ‘.’ || ch == '
’ || ch == '
’ || ch == ‘#’) {
continue;
}
return false;
}
//这里处理通配符不合法的情况
//在正则表达式中需要对.进行转义 -> .
//而 \ 也需要进行转义,再添加一个 \
string[] words = bindingkey.split(“\.”);
for (int i = 0; i < words.length; i++) {
string word = words[i];
if (word.length() > 1 && (word.contains(““) || word.contains(”#“))) {
return false;
}
if (word.equals(”#“)) {
if (i + 1 < words.length && (words[i + 1].equals(”#“) || words[i + 1].equals(”
”))) {
return false;
}
}
}
return true;
}

//规定:
//routingkey 由 数字,字母,下划线组成,通过.来分隔
//形如: 123.abc._123
public boolean checkroutingkey(string routingkey) {
    for (int i = 0; i < routingkey.length(); i++) {
        char ch = routingkey.charat(i);
        if ((ch >= 'a' && ch <= 'z') || (ch >= '0' && ch <= '9') || (ch >= 'a' && ch <= 'z')) {
            continue;
        }
        if (ch == '.' || ch == '_') {
            continue;
        }
        return false;
    }
    return true;
}

//进行路由
public boolean route(exchangetype exchangetype, string routingkey, string bindingkey) {
    if (exchangetype == exchangetype.fanout) {
        return true;
    }
    if (exchangetype == exchangetype.topic) {
        if (routetopic(routingkey, bindingkey)) {
            return true;
        }
    }
    return false;
}

//主题交换机的路由
//*可以代表两个.之间的任意的字符串
//#可以代表0个或多个*
private boolean routetopic(string routingkey, string bindingkey) {
    string[] routingtokens = routingkey.split("\\.");
    string[] bindingtokens = bindingkey.split("\\.");
    int routingindex = 0;
    int bindingindex = 0;
    while(routingindex < routingtokens.length && bindingindex < bindingtokens.length){
        if(bindingtokens[bindingindex].equals("#")){
            if(bindingindex == bindingtokens.length - 1){
                return true;
            }else {
                bindingindex++;
                for(int i = routingindex; i < routingtokens.length; i++){
                    if(bindingtokens[bindingindex].equals(routingtokens[routingindex])){
                        break;
                    }
                }
                if(routingindex == routingtokens.length){
                    return false;
                }
            }
        }else{
            if(!routingtokens[routingindex].equals(bindingtokens[bindingindex])
            && !bindingtokens[bindingindex].equals("*")){
                return false;
            }
        }
        routingindex++;
        bindingindex++;
    }
    if(routingindex == routingtokens.length && bindingindex == bindingtokens.length){
        return true;
    }
    return false;
}

}


上述routingkey、bindingkey、路由规则 都是源自于amqp协议


接下来实现消息的发布。


**basicpublish:**



//向指定交换机发送消息,并根据交换机的转发规则,向队列发送消息
public boolean basicpublish(string exchangename, string routingkey, basicproperties basicproperties, byte[] body){
exchangename = virtualhostname + “" + exchangename;
try{
//1.判断交换机是否存在
exchange exchange = memorydatacenter.getexchange(exchangename);
if(exchange == null){
throw new mqexception(“[virtualhost] 交换机不存在 exchangename:” + exchangename);
}
//2.判断routingkey是否合法
if(!router.checkroutingkey(routingkey)){
throw new mqexception(“[virtualhost] routingkey非法 routingkey:” + routingkey);
}
//3.检查交换机类型
if(exchange.gettype() == exchangetype.direct){
//直接交换机, routingkey为queuename
//4.查找指定队列
string queuename = virtualhostname + "
” + routingkey;
msgqueue queue = memorydatacenter.getqueue(queuename);
if(queue == null){
throw new mqexception(“[virtualhost] 队列不存在 queuename:” + queuename);
}
//5.构造message对象
message message = message.createmessagewithid(basicproperties, routingkey, body);
//6.发送消息到指定队列
sendmessage(queue, message);
}else{
//4.获取到所有绑定关系
concurrenthashmap<string, binding> bindings = memorydatacenter.getbindings(exchangename);
for(map.entry<string, binding> entry : bindings.entryset()){
binding binding = entry.getvalue();
//5.查找指定队列
string queuename = virtualhostname + “_” + binding.getqueuename();
msgqueue queue = memorydatacenter.getqueue(queuename);
if(queue == null){
//如果当前不存在,还需要检查其他的队列是否存在
continue;
}
//6.检查路由规则:
// fanout, 则该交换机绑定的所有队列都要转发
// topic, 则需要判断routingkey和bindingkey是否能对上
if(!router.route(exchange.gettype(), routingkey, binding.getbindingkey())){
continue;
}
//7.构造message对象
message message = message.createmessagewithid(basicproperties, routingkey, body);
//8.发送消息到指定队列
sendmessage(queue, message);
}
log.info(“[virtualhost] 发送消息成功”);
}
return true;
} catch (exception e){
log.error("[virtualhost] 发送消息失败 exchangename: " + exchangename);
e.printstacktrace();
return false;
}
}


**basicconsume订阅消息**


订阅消息是指消费者绑定一个队列,后续队列中有消息就自动的推送过来。



//订阅消息
//添加一个队列订阅者,当队列收到消息,就把消息推送给订阅者
//consumertag: 消费者的唯一标识
//autoack: 是否自动应答
//consumer: 回调函数,并写成了函数式接口,描述了消费者的行为
public boolean basicconsume(string consumertag, string queuename, boolean autoack, consumer consumer){
queuename = virtualhostname + “_” + queuename;
try{
consumermanager.addconsumer(consumertag, queuename, autoack, consumer);
log.info(“[virtualhost] 订阅消息成功 queuename:” + consumertag + “, queuename:” + queuename);
return true;
}catch (exception e){
log.error(“[virtualhost] 订阅消息失败 consumertag:” + consumertag + “, queuename:” + queuename);
e.printstacktrace();
return false;
}
}


接下来实现consumermanager中的api:notifyconsumer 提醒消费者消费消息,addconsumer 添加消费者,consumemessage 消费消息。


首先我们需要描述一下消费者



@data
//描述消费者的参数
public class consumerenv {
private string consumertag;
private string queuename;
private boolean autoack;
//通过这个回调来处理收到的消息
private consumer consumer;

public consumerenv() {
}

public consumerenv(string consumertag, string queuename, boolean autoack, consumer consumer) {
    this.consumertag = consumertag;
    this.queuename = queuename;
    this.autoack = autoack;
    this.consumer = consumer;
}

}

@functionalinterface
public interface consumer {
//把消息推送给消费者
void handledelivery(string consumertag, basicproperties basicproperties, byte[] body) throws mqexception, ioexception;
}


在生产者和消费者的交互逻辑大致有两种:


1.生产者先调用basicpublish发布消息到了某个队列,此时还没有消费者订阅这个队列,后续有消费者订阅了,直接消费消息即可


2.消费者先订阅了一个队列,但生产者还没有生产消息到队列中,后续生产者生产完消息后,提醒消费者去队列中消费消息即可


由于一个队列可能有多个消费者订阅,我们需要使用一个数据结构来存储,并且还需要制定一个算法,实现将消息能给发送给多个消费者。此处采用轮询的方式,即类似取队列元素的方式。


需要在msgqueue中添加一个字段,用来存储这个队列中的订阅者,并提供相应的取出某一个订阅者的api  
  



@data
public class msgqueue {
private string name;
//是否独占(只能被一个消费者使用)
private boolean exclusive = false;
//是否可持久化
private boolean durable = false;
//是否自动删除
private boolean autodelete = false;
//可选参数
private map<string, object> arguments = new hashmap<>();
//订阅者
private linkedlist consumerenvs = new linkedlist<>();
//轮询序号
private atomicinteger consumerseq = new atomicinteger(0);

public void addconsumerenv(consumerenv consumerenv){
    consumerenvs.add(consumerenv);
}

public consumerenv chooseconsumer(){
    if(consumerenvs.size() == 0){
        return null;
    }
    int index = consumerseq.get() % consumerenvs.size();
    consumerseq.getandincrement();
    return consumerenvs.get(index);
}

}


程序如何知道有新的消息来呢?通过一个扫描线程扫描队列,然后再将这个消息提交到线程池中,让线程池去执行服务器端的回调函数。


![](https://img-blog.csdnimg.cn/direct/27fe74c834b94a0aa88a85dbe3fd1df6.png)


如果使用一个扫描线程,去扫描所有队列的话,可以去遍历存储队列对象的哈希表,理论上行,但是不够优雅,如果对每一个队列都使用一个线程的话,会开辟很多线程,也会影响效率。在这使用一个阻塞队列,当有一个新的消息添加到队列中,会将这个队列名插入到这个阻塞队列中,后续只要根据这个阻塞队列,去取数据然后处理即可。



@slf4j
/*

  • 通过这个类来实现消费消息

  • 每当有消息到队列的时候,就将队列名(令牌)添加到令牌队列中,线程池再根据这个队列进行处理
    */
    public class consumermanager {
    //方便操作数据
    private virtualhost parent;
    //使用线程池,用来处理回调任务
    private executorservice workpool = executors.newfixedthreadpool(4);
    //每当队列中来了消息,就触发这个令牌队列,通知可以去消费消息
    private blockingdeque tokenqueue = new linkedblockingdeque<>();
    //扫描线程,扫描令牌队列中是否有新令牌
    private thread scanthread = null;

    public consumermanager(virtualhost p){
    this.parent = p;
    //创建扫描线程
    scanthread = new thread(() ->{
    while(true){
    try {
    //获取令牌
    string tokenqueuename = tokenqueue.take();
    //根据令牌找到指定队列
    msgqueue queue = parent.getmemorydatacenter().getqueue(tokenqueuename);
    if(queue == null){
    throw new mqexception(“[consumermanager] 令牌队列不存在 queuename:” + tokenqueuename);
    }
    //从队列中消费消息
    synchronized (queue){
    consumemessage(queue);
    }
    } catch (interruptedexception | mqexception e) {
    e.printstacktrace();
    }
    }
    });

     //设置为后台线程
     scanthread.setdaemon(true);
     scanthread.start();
    

    }

    //通知消费消息
    public void notifyconsumer(string queuename) throws interruptedexception {
    tokenqueue.put(queuename);
    }

    //添加订阅者
    public void addconsumer(string consumertag, string queuename, boolean autoack, consumer consumer){
    msgqueue queue = parent.getmemorydatacenter().getqueue(queuename);
    try{
    if(queue == null){
    throw new mqexception(“[consumermanager] 队列不存在 queuename:” + queuename);
    }
    synchronized (queue){
    //添加消费者
    queue.addconsumerenv(new consumerenv(consumertag, queuename, autoack, consumer));
    //如果有消息就进行消费
    int n = parent.getmemorydatacenter().getmessagesize(queuename);
    for(int i = 0; i < n; i++){
    //消费
    consumemessage(queue);
    }
    }
    }catch (exception e){
    log.error(“[consumermanager] 添加订阅者失败”);
    e.printstacktrace();
    }
    }

    //消费消息
    private void consumemessage(msgqueue queue){
    //1.通过轮询方式,寻找消费者
    consumerenv luckdog = queue.chooseconsumer();
    if(luckdog == null){
    return;
    }
    //2.获取队列中的消息
    message message = parent.getmemorydatacenter().pollmessage(queue.getname());
    if(message == null){
    return;
    }
    //3.把消息带入到回调方法,交给线程池处理
    workpool.submit(() -> {
    try{
    //1.把消息添加到待确认队列中
    parent.getmemorydatacenter().addmessagewaitack(queue.getname(), message);
    //2.执行回调操作,消费消息
    luckdog.getconsumer().handledelivery(luckdog.getconsumertag(), message.getbasicproperties(), message.getbody());
    //3.如果当前是自动应答.则将消息删除
    // 如果是手动应答,则先不处理,让消费者后续调用 basicack 处理
    if(luckdog.isautoack()){
    //删除消息从两个方面进行,内存和硬盘
    //硬盘
    if(message.isdurable()){
    parent.getdiskdatacenter().deletemessage(queue, message);
    }
    //内存
    parent.getmemorydatacenter().deletemessagewaitack(queue.getname(), message.getmessageid());
    parent.getmemorydatacenter().removemessage(message.getmessageid());
    log.info(“[consumermanager] 消息被成功消费 queuename:” + queue.getname());
    }
    }catch (exception e){
    log.info(“[consumermanager] 消息消费失败 queuename:” + queue.getname());
    e.printstacktrace();
    }
    });
    }
    }


最后还差一个basicack手动应答,即消费者消费完消息,然后告诉服务器。



//手动应答
public boolean basicack(string queuename, string messageid){
queuename = virtualhostname + “_” + queuename;
try{
msgqueue queue = memorydatacenter.getqueue(queuename);
if(queue == null){
throw new mqexception(“[virtualhost] 要确认的队列不存在 queuename:” + queuename);
}
message message = memorydatacenter.getmessage(messageid);
if(message == null){
throw new mqexception(“[virtualhost] 要确认的消息不存在 messageid:” + messageid);
}
if(message.isdurable()){
diskdatacenter.deletemessage(queue, message);
}
memorydatacenter.deletemessagewaitack(queuename, messageid);
memorydatacenter.removemessage(messageid);
log.info(“[virtualhost] 消息确认收到 queuename:” + queuename);
return true;
}catch (exception e){
log.info(“[virtualhost] 消息确认失败 queuename:” + queuename);
e.printstacktrace();
return false;
}
}


### 约定应用层协议


在网络通信中,我们需要约定协议,方便对方接收并读取。在这里使用http协议和其他一些不是很妥,于是我们在tcp协议的基础上自定义实现应用层协议。


**协议格式:**


![](https://images.3wcode.com/3wcode/20240731/b_0_202407312146315737.jpg)


类型用来描述,这次请求/响应 是干啥的,例如:0x03 表示 这个请求是创建exchange.


由于进行网络通信也是使用的二进制,为了防止出现粘包等问题,使用长度来分隔.


对于请求,载荷中保存的是序列化的方法的参数,对于响应,载荷中保存的是序列化的方法的返回值.


我们自定义的应用层协议是建立在tcp协议的基础上,而每一次tcp协议都为了可靠性会进行三次握手、四次挥手,如果频繁的创建的话,会比较费时,在这我们使用多信道的方式,复用每一次tcp连接,即tcp连接是一直保持的,但是断开是逻辑上的断开。


**约定类型:**




| 类型 | 方法 |
| --- | --- |
| 0x01 | 创建 channel |
| 0x02 | 关闭 channel |
| 0x03 | 创建 exchange |
| 0x04 | 销毁 exchange |
| 0x05 | 创建 queue |
| 0x06 | 销毁 queue |
| 0x07 | 创建 binding |
| 0x08 | 销毁 binding |
| 0x09 | 发送 message |
| 0x0a | 订阅 queue |
| 0x0b | 确认应答 |
| 0x0c | 服务器给客户端推送的消息 |


### 创建载荷类


为了方便后续代码编写,使用两个类来描述网络通信:request,response


**request:**



//表示网络通信中的请求对象
//在此约定:
//一个请求由三部分组成:类型+长度+载荷
//在请求中,载荷为调用方法的参数
@data
public class request {
private int type;
private int length;
private byte[] payload;
}


**response:**





**网上学习资料一大堆,但如果学到的知识不成体系,遇到问题时只是浅尝辄止,不再深入研究,那么很难做到真正的技术提升。**

**需要这份系统化的资料的朋友,可以添加v获取:vip204888 (备注大数据)**
![img](https://img-blog.csdnimg.cn/img_convert/b8620bc36d227a73a9e1c965d8efb29f.png)

**一个人可以走的很快,但一群人才能走的更远!不论你是正从事it行业的老鸟或是对it行业感兴趣的新人,都欢迎加入我们的的圈子(技术交流、学习资源、职场吐槽、大厂内推、面试辅导),让我们一起学习成长!**

ssageid:" + messageid);
        }
        if(message.isdurable()){
            diskdatacenter.deletemessage(queue, message);
        }
        memorydatacenter.deletemessagewaitack(queuename, messageid);
        memorydatacenter.removemessage(messageid);
        log.info("[virtualhost] 消息确认收到 queuename:" + queuename);
        return true;
    }catch (exception e){
        log.info("[virtualhost] 消息确认失败 queuename:" + queuename);
        e.printstacktrace();
        return false;
    }
}

约定应用层协议

在网络通信中,我们需要约定协议,方便对方接收并读取。在这里使用http协议和其他一些不是很妥,于是我们在tcp协议的基础上自定义实现应用层协议。

协议格式:

类型用来描述,这次请求/响应 是干啥的,例如:0x03 表示 这个请求是创建exchange.

由于进行网络通信也是使用的二进制,为了防止出现粘包等问题,使用长度来分隔.

对于请求,载荷中保存的是序列化的方法的参数,对于响应,载荷中保存的是序列化的方法的返回值.

我们自定义的应用层协议是建立在tcp协议的基础上,而每一次tcp协议都为了可靠性会进行三次握手、四次挥手,如果频繁的创建的话,会比较费时,在这我们使用多信道的方式,复用每一次tcp连接,即tcp连接是一直保持的,但是断开是逻辑上的断开。

约定类型:

类型方法
0x01创建 channel
0x02关闭 channel
0x03创建 exchange
0x04销毁 exchange
0x05创建 queue
0x06销毁 queue
0x07创建 binding
0x08销毁 binding
0x09发送 message
0x0a订阅 queue
0x0b确认应答
0x0c服务器给客户端推送的消息

创建载荷类

为了方便后续代码编写,使用两个类来描述网络通信:request,response

request:

//表示网络通信中的请求对象
//在此约定:
//一个请求由三部分组成:类型+长度+载荷
//在请求中,载荷为调用方法的参数
@data
public class request {
    private int type;
    private int length;
    private byte[] payload;
}

response:

网上学习资料一大堆,但如果学到的知识不成体系,遇到问题时只是浅尝辄止,不再深入研究,那么很难做到真正的技术提升。

需要这份系统化的资料的朋友,可以添加v获取:vip204888 (备注大数据)
[外链图片转存中…(img-cmx91exl-1713197004039)]

一个人可以走的很快,但一群人才能走的更远!不论你是正从事it行业的老鸟或是对it行业感兴趣的新人,都欢迎加入我们的的圈子(技术交流、学习资源、职场吐槽、大厂内推、面试辅导),让我们一起学习成长!

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2025  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com