先自我介绍一下,小编浙江大学毕业,去过华为、字节跳动等大厂,目前阿里p7
深知大多数程序员,想要提升技能,往往是自己摸索成长,但自己不成体系的自学效果低效又漫长,而且极易碰到天花板技术停滞不前!
因此收集整理了一份《2024年最新大数据全套学习资料》,初衷也很简单,就是希望能够帮助到想自学提升又不知道该从何学起的朋友。
既有适合小白学习的零基础资料,也有适合3年以上经验的小伙伴深入学习提升的进阶课程,涵盖了95%以上大数据知识点,真正体系化!
由于文件比较多,这里只是将部分目录截图出来,全套包含大厂面经、学习笔记、源码讲义、实战项目、大纲路线、讲解视频,并且后续会持续更新
如果你需要这些资料,可以添加v获取:vip204888 (备注大数据)
正文
后续的操作都是围绕这几个核心概念来增删改查的。
### 数据库管理
由于mysql数据库比较重量,在此就选择使用相对轻量的数据库sqlite. mysql是客户端服务器结构的程序,而sqlite是一个本地的数据库,操作它相当于是直接操作本地文件。
在java中使用sqlite,也不需要额外的安装,可以直接使用maven,将相关依赖引入,然后进行配置就可以正常使用了。
**sqlite的依赖:**
<!-- https://mvnrepository.com/artifact/org.xerial/sqlite-jdbc -->
<dependency>
<groupid>org.xerial</groupid>
<artifactid>sqlite-jdbc</artifactid>
<version>3.42.0.0</version>
</dependency>
**application.yml:**
spring:
datasource:
url: jdbc:sqlite:./data/meta.db
username:
password:
driver-class-name: org.sqlite.jdbc
sqlite是把数据存储在当前硬盘的某个指定文件中,在这里就存储在了工作路径的data目录中的meta.db文件了。
sqlite也不需要指定用户名和密码,毕竟是自己一个人用的,只有本地主机才能使用。
接下来就是建库和建表。在mysql中,需要自己手动进行创建database,但在sqlite中,则无需手动建库了,每一个数据库文件就是一个数据库,即当前的meta.db文件就相当于是mysql中的database。
为了能够即用即创建表,在这考虑部署的时候就自动创建表,即通过mybatis的update注解来建表并添加一些基础的查询、增加、删除操作。
@mapper
public interface metamapper {
//建表操作
@update(“create table if not exists exchange(” +
“name varchar(50) primary key,” +
“type int,” +
“durable boolean,” +
“autodelete boolean,” +
“arguments varchar(1024))”)
public void createexchangetable();
@update("create table if not exists queue(" +
"name varchar(50) primary key," +
"exclusive boolean," +
"durable boolean," +
"autodelete boolean," +
"arguments varchar(1024))")
public void createqueuetable();
@update("create table if not exists binding(" +
"exchangename varchar(50)," +
"queuename varchar(50)," +
"bindingkey varchar(256))")
public void createbindingtable();
//基础添加、删除、查询操作
@insert("insert into exchange values(#{name}, #{type}, #{durable}, #{autodelete}, #{arguments})")
public void insertexchange(exchange exchange);
@delete("delete from exchange where name = #{exchangename}")
public void deleteexchange(string exchangename);
@select("select * from exchange")
public list<exchange> selectallexchanges();
@insert("insert into queue values(#{name}, #{exclusive}, #{durable}, #{autodelete}, #{arguments})")
public void insertqueue(msgqueue msgqueue);
@delete("delete from queue where name = #{queuename}")
public void deletequeue(string queuename);
@select("select * from queue")
public list<msgqueue> selectallqueues();
@insert("insert into binding values(#{exchangename}, #{queuename}, #{bindingkey})")
public void insertbinding(binding binding);
@delete("delete from binding where exchangename = #{exchangename} and queuename = #{queuename}")
public void deletebinding(string exchangename, string queuename);
@select("select * from binding")
public list<binding> selectallbindings();
}
由于sqlite中也不支持hashmap这种数据结构,因此使用string来存储。
mybatis在写入数据的时候,会自动的调用对象的getter方法,而在读出数据的时候,会自动调用setter方法,因此为了能够将arguments这个hashmap类型的数据存入到数据库中,我们需要转成字符串。
在此使用jackson来处理,引入相关依赖即可。
**jackson依赖:**
<dependency>
<groupid>com.fasterxml.jackson.core</groupid>
<artifactid>jackson-databind</artifactid>
<version>2.9.6</version>
</dependency>
**getter和setter:**
public string getarguments() {
objectmapper objectmapper = new objectmapper();
try {
objectmapper.writevalueasstring(arguments);
} catch (jsonprocessingexception e) {
e.printstacktrace();
}
return “{}”;
}
public void setarguments(string arguments) {
objectmapper objectmapper = new objectmapper();
try {
this.arguments = objectmapper.readvalue(arguments, new typereference<map<string, object>>() {});
} catch (ioexception e) {
e.printstacktrace();
}
}
为了使程序的结构更加清晰,程序员更方便的操作数据库,考虑创建一个databasemanager类来封装数据库的相关操作。
**databasemanager:**
//封装数据库操作
//databasemanager不交给spring管理,由程序员手动管理
@slf4j
public class databasemanager {
//从spring上下文中手动获取metamapper的bean
private metamapper metamapper;
//带有业务逻辑的初始化操作
//完成建库建表操作
//如果不存在表就进行创建,并添加默认初始数据
public void init(){
metamapper = mqblogapplication.context.getbean(metamapper.class);
if(!checkdbexists()){
//创建data目录
file file = new file("./data");
file.mkdirs();
//建表并初始化数据
createtable();
createdefaultdata();
log.info("[databasemanager] 数据库初始化完成");
}else{
log.info("[databasemanager] 数据库已经初始化");
}
}
//删除数据库目录
//删除目录前,需要保证目录是空的
public void deletedb(){
file file = new file("./data/meta.db");
//删除目录里面的文件
boolean issuccess = file.delete();
if(issuccess){
log.info("[databasemanager] 删除数据库文件成功");
}else{
log.info("[databasemanager] 删除数据库文件失败");
}
//删除目录
file = new file("./data");
issuccess = file.delete();
if(issuccess){
log.info("[databasemanager] 删除数据库目录成功");
}else {
log.info("[databasemanager] 删除数据库目录失败");
}
}
//判断数据库(meta.db)是否存在
private boolean checkdbexists() {
file dbfile = new file("./data/meta.db");
return dbfile.exists();
}
//对 exchange,queue,binding 进行建表
private void createtable() {
metamapper.createexchangetable();
metamapper.createqueuetable();
metamapper.createbindingtable();
log.info("[databasemanager] 创建表完成");
}
//rabbitmq初始带有一个匿名交换机, 类型为 direct
private void createdefaultdata() {
exchange exchange = new exchange();
exchange.setname("");
exchange.settype(exchangetype.direct);
exchange.setdurable(true);
exchange.setautodelete(false);
metamapper.insertexchange(exchange);
log.info("[databasemanager] 初始数据构造完成");
}
//增删查操作
public void insertexchange(exchange exchange){
metamapper.insertexchange(exchange);
}
public void deleteexchange(string exchangename){
metamapper.deleteexchange(exchangename);
}
public list<exchange> selectallexchanges(){
list<exchange> exchanges = metamapper.selectallexchanges();
return exchanges;
}
public void insertqueue(msgqueue queue){
metamapper.insertqueue(queue);
}
public void deletequeue(string queuename){
metamapper.deletequeue(queuename);
}
public list<msgqueue> selectallqueues(){
return metamapper.selectallqueues();
}
public void insertbinding(binding binding){
metamapper.insertbinding(binding);
}
public void deletebinding(string exchangename, string queuename){
metamapper.deletebinding(exchangename, queuename);
}
public list<binding> selectallbindings(){
return metamapper.selectallbindings();
}
}
### 文件管理
文件管理主要是对消息进行存储,消息是进行序列化存储的数据,不便于读取,因此我们无法区分一条消息的开始和结束。如果对每条消息都单独保存在一个文件中,可能会创建大量的文件。
我们现在做出如下约定:一条消息在硬盘上存储需要先花费4个字节存储消息体的长度,然后再存储消息本体。

在定义消息的时候,我们定义了两个辅助字段:offsetbeg、offsetend,分别记录一个消息在文件中存储的开始和结束。

通过上述分析,我们可以知道,需要对message对象进行序列化存储在文件中,而offsetbeg和offsetend则不需要,保留原始值即可。需要对message类实现serializable接口,由于basicproperties也包含在类中,也需要实现接口,而offsetbeg,offsetend使用transient关键字描述,程序就不会再序列化了。
@data
public class message implements serializable {
//消息的基本属性
private basicproperties basicproperties;
//存储数据
private byte[] body;
//位置偏移量,便于在文件中取出数据
//采用[offsetbeg, offsetend)区间
//这两个属性是帮助从文件中读取数据的,不需要进行序列化
private transient long offsetbeg;
private transient long offsetend;
//表示此消息在文件中是否有效(采用逻辑删除)
//0x00 表示无效, 0x01 表示有效
private byte isvalid = 0x01;
}
@data
public class basicproperties implements serializable {
//为防止重复,使用uuid生成
private string messageid;
private boolean isdurable = false;
//口令
private string routingkey;
}
**binarytool:**
public class binarytool {
//将一个java对象序列化成一个字节数组
//这个类必须实现了serializable
public static byte[] tobytes(object object) throws ioexception {
//此处的 bytearrayoutputstream 可以不用关闭,是一个纯内存的对象,不涉及到文件描述符
try(bytearrayoutputstream bytearrayoutputstream = new bytearrayoutputstream()) {
try (objectoutputstream objectoutputstream = new objectoutputstream(bytearrayoutputstream)) {
//此处 writeobject 会将 object 对象进行序列化,生成的二进制字节数据,写入到objectoutputstream中
//objectoutputstream 关联到了 bytearrayoutputstream, 最终就写到了 bytearrayoutputstream
objectoutputstream.writeobject(object);
}
//将 bytearrayoutputstream 中序列化后的二进制数据 转成byte[] 数组
return bytearrayoutputstream.tobytearray();
}
}
public static object frombytes(byte[] data) throws ioexception, classnotfoundexception {
try(bytearrayinputstream bytearrayinputstream = new bytearrayinputstream(data)){
try(objectinputstream objectinputstream = new objectinputstream(bytearrayinputstream)){
//此处的 readobject 将 data byte[] 转成对象
return objectinputstream.readobject();
}
}
}
}
由于每条消息都是存储在队列上的,因此就按照队列的维度进行存储。即每个队列都是一个目录,在每个目录下都有两个文件,一个是data数据文件,用来存储消息,另一个是state文件,用来描述data数据文件中的情况,为了后面可以进行gc。
此处的data文件是二进制文件,而state文件则是字符型文件,里面用一个数表示消息的总数,再用一个数表示消息的有效个数。因为采用的是逻辑删除的方式,也就是说被删除的消息不会真正意义上删除,当文件中存储很多数据的时候,就会大大影响我们的读取效率,因此我们需要使用gc来真正删除垃圾消息。在这规定:当文件有1000个消息的时候,并且垃圾消息占比50%,我们就进行一次gc。
在这我们也封装一个类来帮助我们处理文件的操作。
**messagefilemanager:**
@slf4j
//对硬盘上的message进行管理
//规定每个队列都对应一个目录,
//每个目录下都包含两个文件:queue_data.txt, queue_state.txt,分别用来存储数据和统计数据个数
//
public class messagefilemanager {
public void init() {
//为了接口的统一性
//暂时不处理,后续可自行添加
}
//表示该队列下统计的文件信息
public static class state{
public int totalcount = 0; //总消息数量
public int validcount = 0; //有效消息数量
}
//获取消息所处队列的目录
private string getqueuedir(string queuename){
return "./data/" + queuename;
}
//获取消息所处队列的数据文件路径
private string getqueuedatapath(string queuename){
return getqueuedir(queuename) + "/queue_data.txt";
}
//获取消息所处队列的统计文件路径
private string getqueuestatepath(string queuename){
return getqueuedir(queuename) + "/queue_state.txt";
}
//创建队列对应的目录和文件
public void createqueuefiles(string queuename) throws ioexception {
//1.创建队列对应目录
file directory = new file(getqueuedir(queuename));
if(!directory.exists()){
boolean success = directory.mkdirs();
if(!success){
throw new ioexception("[messagefilemanager] 创建目录失败" + directory.getabsolutefile());
}
}
//2.创建队列对应数据文件
file datafile = new file(getqueuedatapath(queuename));
if(!datafile.exists()){
boolean success = datafile.createnewfile();
if(!success){
throw new ioexception("[managerfilemanager] 创建文件失败" + datafile.getabsolutefile());
}
}
//3.创建队列对应统计文件
file statefile = new file(getqueuestatepath(queuename));
if(!statefile.exists()){
boolean success = statefile.createnewfile();
if(!success){
throw new ioexception("[managerfilemanager] 创建文件失败" + statefile.getabsolutefile());
}
}
//4.初始化队列文件
state state = new state();
writestate(queuename, state);
}
//删除队列对应的目录和文件
public void destroyqueuefiles(string queuename) throws ioexception {
//要删除目录需要先删除里面的文件
file datafile = new file(getqueuedatapath(queuename));
boolean success1 = datafile.delete();
file statefile = new file(getqueuestatepath(queuename));
boolean success2 = statefile.delete();
file directory = new file(getqueuedir(queuename));
boolean success3 = directory.delete();
//确保文件都删除了
if(!success1 || !success2 || !success3){
throw new ioexception("[messagefilemanager] 删除队列文件失败" + datafile.getabsolutefile());
}
}
//从文件中读取统计信息
private state readstate(string queuename){
//state文件是文本文件,可以使用scanner来读写
state state = new state();
try(inputstream inputstream = new fileinputstream(getqueuestatepath(queuename))){
scanner scanner = new scanner(inputstream);
state.totalcount = scanner.nextint();
state.validcount = scanner.nextint();
return state;
} catch (ioexception e) {
e.printstacktrace();
}
return null;
}
//向文件中写入统计信息
private void writestate(string queuename, state state){
//outputstream默认情况打开文件,会清空文件
try(outputstream outputstream = new fileoutputstream(getqueuestatepath(queuename))){
printwriter printwriter = new printwriter(outputstream);
printwriter.print(state.totalcount + "\t" + state.validcount);
printwriter.flush();
} catch (ioexception e) {
e.printstacktrace();
}
}
//判断队列对应的数据文件和统计文件都存在
private boolean checkqueuefilesexist(string name) {
//如果两个文件都存在了,则说明目录也存在,不需要额外判断
file datafile = new file(getqueuedatapath(name));
file statefile = new file(getqueuestatepath(name));
if(!datafile.exists() || !statefile.exists()){
return false;
}
return true;
}
//将消息写入对应的队列文件
public void sendmessage(msgqueue queue, message message) throws ioexception, mqexception {
//1.判断对应的队列文件是否存在
if(!checkqueuefilesexist(queue.getname())){
throw new mqexception("[messagefilemanager] 队列文件不存在 queuename:" + queue.getname());
}
//2.将消息转成二进制
synchronized (queue){
byte[] data = binarytool.tobytes(message);
//3.获取到文件末尾位置,设置文件位置的偏移量
file datafile = new file(getqueuedatapath(queue.getname()));
//每条消息前4个字节用来表示消息的长度
message.setoffsetbeg(datafile.length() + 4);
message.setoffsetend(datafile.length() + 4 + data.length);
//4.写入消息文件
try(outputstream outputstream = new fileoutputstream(datafile, true)){
try(dataoutputstream dataoutputstream = new dataoutputstream(outputstream)){
//写入消息的长度(4个字节)
dataoutputstream.writeint(data.length);
//写入消息本体
dataoutputstream.write(data);
}
}
//5.更新统计文件
state state = readstate(queue.getname());
state.totalcount += 1;
state.validcount += 1;
writestate(queue.getname(), state);
}
}
//从硬盘上删除消息(逻辑删除)
//此处传的消息必须为有效消息
public void deletemessage(msgqueue queue, message message) throws ioexception, classnotfoundexception, mqexception {
//1.判断队列对应的文件是否存在
if(!checkqueuefilesexist(queue.getname())){
throw new mqexception("[messagefilemanager] 队列文件不存在 queuename:" + queue.getname());
}
synchronized (queue){
//2.从文件中获取到消息
byte[] data = new byte[(int)message.getoffsetend() - (int)message.getoffsetbeg()];
try(randomaccessfile randomaccessfile = new randomaccessfile(getqueuedatapath(queue.getname()), "rw")){
//移动文件光标
randomaccessfile.seek(message.getoffsetbeg());
randomaccessfile.read(data);
//3.将消息反序列化并标记为无效
message diskmessage = (message)binarytool.frombytes(data);
diskmessage.setisvalid((byte)0x00);
//4.将消息重新写入文件
randomaccessfile.seek(message.getoffsetbeg());
randomaccessfile.write(binarytool.tobytes(diskmessage));
}
//5.更新统计文件
state state = readstate(queue.getname());
//逻辑删除,不需要修改totalcount
state.validcount -= 1;
writestate(queue.getname(), state);
}
}
//从队列文件中读取所有 有效消息
//在程序启动时,进行调用,读取文件中的所有有效消息,加载到内存中
public linkedlist<message> loadallmessagefromqueue(string queuename) throws ioexception, mqexception, classnotfoundexception {
linkedlist<message> messages = new linkedlist<>();
try(inputstream inputstream = new fileinputstream(getqueuedatapath(queuename))){
try(datainputstream datainputstream = new datainputstream(inputstream)){
//记录当前文件光标,方便赋值message中offsetbeg和offsetend值
long currentoffset = 0;
while(true){
//1.读取当前消息的长度
//当 readint 读取到文件末尾的时候,会抛出异常 eofexception
int messagesize = datainputstream.readint();
//2.根据读取到的长度,在文件中读取
byte[] buffer = new byte[messagesize];
int actualsize = datainputstream.read(buffer);
if(messagesize != actualsize){
throw new mqexception("[messagefilemanager] 文件读取错误 queuename:" + queuename);
}
//3.将字节数组反序列化成对象
message message = (message) binarytool.frombytes(buffer);
//4.判断当前消息是否为有效消息
if(message.getisvalid() == 0x00){
currentoffset += (4 + messagesize);
continue;
}
//5.计算在文件中偏移量并赋值,将消息添加到链表中
message.setoffsetbeg(currentoffset + 4);
message.setoffsetend(currentoffset + 4 + messagesize);
currentoffset += (4 + messagesize);
messages.add(message);
}
} catch (eofexception e){
//文件读取到末尾,并非真的异常
log.info("[messagefilemanager] 文件读取完毕");
}
}
return messages;
}
//获取新的队列数据文件
private string getqueuenewdatapath(string queuename){
return getqueuedir(queuename) + "/new_queued_data.txt";
}
//约定 当文件中总消息数超过 1000 并且 无效消息的个数 超过一半 就进行一次gc
public boolean checkgc(string queuename){
state state = readstate(queuename);
if(state.totalcount > 1000 && (double)state.validcount / (double) state.totalcount > 0.5){
return true;
}
return false;
}
//对消息文件进行gc操作
//此处使用复制算法的方式进行
public void gc(msgqueue queue) throws ioexception, mqexception, classnotfoundexception {
//当一个线程进行gc的时候,其他线程不允许进行操作
synchronized (queue){
//由于gc操作比较耗时,在这输出一下gc的时间
//计算gc开始时间
long gcbeg = system.currenttimemillis();
//1.创建新文件
file newdatafile = new file(getqueuenewdatapath(queue.getname()));
if(newdatafile.exists()){
log.info("[messagefilemanager] 存在复制的目标文件");
newdatafile.delete();
}
if(!newdatafile.createnewfile()){
throw new mqexception("[messagefilemanager] 复制文件创建失败,queuename:" + queue.getname());
}
//2.从旧文件中读取消息
//此处读取到的信息都是有效信息
linkedlist<message> messages = loadallmessagefromqueue(queue.getname());
//3.将有效消息写入新文件
try(outputstream outputstream = new fileoutputstream(newdatafile)){
try(dataoutputstream dataoutputstream = new dataoutputstream(outputstream)) {
for (message message : messages) {
//不调用sendmessage,防止outputstream频繁关闭影响性能
byte[] buffer = binarytool.tobytes(message);
dataoutputstream.writeint(buffer.length);
dataoutputstream.write(buffer);
}
}
}
//4.让新文件替换就旧文件
file olddatafile = new file(getqueuedatapath(queue.getname()));
if(!olddatafile.delete()){
throw new mqexception("[messagefilemanager] 旧数据文件删除失败 olddatafile:" + olddatafile.getabsolutefile());
}
if(!newdatafile.renameto(olddatafile)){
throw new mqexception("[messagefilemanager] 文件重命名失败 newdatafile:" + newdatafile.getabsolutefile());
}
//5.更新统计信息的文件
state state = readstate(queue.getname());
state.totalcount = state.validcount;
writestate(queue.getname(), state);
//计算gc结束时间
long gcend = system.currenttimemillis();
log.info("[messagefilemanager] " + queue.getname() + "gc完成, 总耗时: " + (gcend - gcbeg) + "ms");
}
}
}
### 封装硬盘
为了方便后续对数据进行持久化,即不用去关注数据到底是存在数据库还是文件中,在此再封装一个diskdatacenter类来统一对数据持久化。
/*
-
通过这个类来管理文件中的数据和数据库数据
-
调用者不用再关注数据是储存在硬盘文件中还是数据库中
-
通过这个类完成对 交换机、队列、绑定、消息 的增删查
*/
public class diskdatacenter {
private databasemanager databasemanager = new databasemanager();
private messagefilemanager messagefilemanager = new messagefilemanager();//对实例进行初始化
public void init(){
databasemanager.init();
messagefilemanager.init();
}//交换机
public void insertexchange(exchange exchange){
databasemanager.insertexchange(exchange);
}public void deleteexchange(string exchangename){
databasemanager.deleteexchange(exchangename);
}public list selectallexchanges(){
return databasemanager.selectallexchanges();
}//队列
public void insertqueue(msgqueue queue) throws ioexception {
databasemanager.insertqueue(queue);
//每个队列都关联一个目录文件(子文件为queue_data.txt、queue_state.txt)
//需要将对应的文件创建
messagefilemanager.createqueuefiles(queue.getname());
}public void deletequeue(string queuename) throws ioexception {
databasemanager.deletequeue(queuename);
//需要将队列对应的文件都删除
messagefilemanager.destroyqueuefiles(queuename);
}public list selectallqueues(){
return databasemanager.selectallqueues();
}//绑定
public void insertbinding(binding binding){
databasemanager.insertbinding(binding);
}public void deletebinding(binding binding){
databasemanager.deletebinding(binding.getexchangename(), binding.getqueuename());
}public list selectallbindings(){
return databasemanager.selectallbindings();
}//消息
public void insertmessage(msgqueue queue, message message) throws ioexception, mqexception {
messagefilemanager.sendmessage(queue, message);
}public void deletemessage(msgqueue queue, message message) throws ioexception, classnotfoundexception, mqexception {
messagefilemanager.deletemessage(queue, message);
//判断是否需要gc
if(messagefilemanager.checkgc(queue.getname())){
messagefilemanager.gc(queue);
}
}public linkedlist loadallmessagefromqueue(string queuename) throws ioexception, mqexception, classnotfoundexception {
return messagefilemanager.loadallmessagefromqueue(queuename);
}
}
### 内存管理
上述只是完成了持久化操作,但是真正的主战场还是在内存上,硬盘只是用来保证服务器重启后,数据不丢失。
为了方便后续对象的查找,我们可以使用hashmap的数据结构来存储。
**交换机:使用hashmap,key 为 exchangename,value 为 exchange 对象.**
**队列:使用hashmap,key 为 queuename,value 为 queue 对象.**
**绑定:使用嵌套hashmap, 第一个 key 为exchangename ,第一个 value 为 hashmap<string, binding>**
**第二个 key 为 queuename,第二个 value 为 binding 对象.**
表示的含义是,先查找交换机邦迪的所有队列,然后再找到指定绑定.
**消息:使用hashmap,key 为 messageid, value 为 message 对象.**
由于我们的消息是依附于队列的,我们还需要使用数据结构来表示这样的关系,即在队列中存储着消息。
**队列和消息之间的关系:使用hashmap,key 为 queuename, value 为 linkedlist,里面存储message对象.**
**待确认的消息:使用嵌套hashmap,第一个 key 为 queuename,第一个 value 为 hashmap<string, message>**
**第二个 key 为 messageid,第二个 value 为 message对象.**
当消费者从队列中取了一个消息,我们无法保证有没有丢失或读错,但可以通过应答的方式来保证可靠性,即消费者消费完了这个消息后,发送一个确认应答告诉服务器,后续服务器就可以删除了,但在一些对可靠性不高的场景中,手动应答的方式就会比较鸡肋,于是我们在这里提供两种应答方式:1.自动应答 2.手动应答
/*
-
管理内存中的数据
-
由于会在多线程的场景下使用,需要注意线程安全
*/
@slf4j
public class memorydatacenter {
//在内存中需要管理 交换机、队列、绑定、消息//key: exchangename, value: exchange对象
private concurrenthashmap<string, exchange> exchangemap = new concurrenthashmap<>();
//key: queuename, value: queue对象
private concurrenthashmap<string, msgqueue> queuemap = new concurrenthashmap<>();
//key: exchangename, value: 该交换机与队列的所有绑定关系映射集合(hashmap)
private concurrenthashmap<string, concurrenthashmap<string, binding>> bindingmap = new concurrenthashmap<>();
//key: messageid, value: message对象
private concurrenthashmap<string, message> messagemap = new concurrenthashmap<>();
//key: queuename, value: 该队列下的消息对象的集合
private concurrenthashmap<string, linkedlist> queuemessagemap = new concurrenthashmap<>();
//key: queuename, value: 该队列下未应答的消息映射集合
//第二个key: messageid(uuid表示)
private concurrenthashmap<string, concurrenthashmap<string, message>> queuemessagewaitackmap = new concurrenthashmap<>();//交换机
public void insertexchange(exchange exchange){
exchangemap.put(exchange.getname(), exchange);
log.info(“[memorydatacenter] 交换机插入成功, exchangename:” + exchange.getname());
}public void deleteexchange(string exchangename){
exchangemap.remove(exchangename);
log.info(“[memorydatacenter] 交换机删除成功, exchangename:” + exchangename);
}public exchange getexchange(string exchangename){
return exchangemap.get(exchangename);
}//队列
public void insertqueue(msgqueue queue){
queuemap.put(queue.getname(), queue);
log.info(“[memorydatacenter] 队列插入成功, queuename:” + queue.getname());
}public void deletequeue(string queuename){
queuemap.remove(queuename);
log.info(“[memorydatacenter] 队列删除成功, queuename:” + queuename);
}public msgqueue getqueue(string queuename){
return queuemap.get(queuename);
}//绑定
public void insertbinding(binding binding) throws mqexception {
// concurrenthashmap<string, binding> queuebindingmap = bindingmap.get(binding.getexchangename());
concurrenthashmap<string, binding> queuebindingmap = bindingmap.computeifabsent(binding.getexchangename(), k -> new concurrenthashmap<>());
//检查嵌套的哈希表是否存在
synchronized (queuebindingmap){
//这样写需要再创建一把锁
// if(queuebindingmap == null){
// queuebindingmap = new concurrenthashmap<>();
// bindingmap.put(binding.getexchangename(), queuebindingmap);
// }
if(queuebindingmap.get(binding.getqueuename()) != null){
throw new mqexception(“[memorydatacenter] 绑定关系已经存在 exchangename:” + binding.getexchangename() +
“, queuename” + binding.getqueuename());
}
queuebindingmap.put(binding.getqueuename(), binding);
log.info(“[memorydatacenter] 绑定关系插入成功 exchangename:”+ binding.getexchangename() +
“, queuename” + binding.getqueuename());
}
}public void deletebinding(binding binding) throws mqexception {
concurrenthashmap<string, binding> queuebindingmap = bindingmap.get(binding.getexchangename());
if(queuebindingmap == null){
throw new mqexception(“[memorydatacenter] 该绑定不存在 exchangename:” + binding.getexchangename() +
“, queuename” + binding.getqueuename());
}
bindingmap.remove(binding.getexchangename());
log.info(“[memorydatacenter] 绑定关系删除成功 exchangename:”+ binding.getexchangename() +
“, queuename” + binding.getqueuename());
}//两个版本的获取绑定:
//1.获取队列和交换机之间的唯一绑定
public binding getbinding(string exchangename, string queuename) throws mqexception {
concurrenthashmap<string, binding> queuebindingmap = bindingmap.get(exchangename);
if(queuebindingmap == null){
return null;
}
return queuebindingmap.get(queuename);
}
//2.获取当前当前交换机下的所有绑定
public concurrenthashmap<string, binding> getbindings(string exchangename){
return bindingmap.get(exchangename);
}//消息
//向消息中心添加消息
public void addmessage(message message){
messagemap.put(message.getmessageid(), message);
log.info(“[memorydatacenter] 消息添加完成 messageid:” + message.getmessageid());
}public void removemessage(string messageid){
messagemap.remove(messageid);
log.info(“[memorydatacenter] 消息删除完成 messageid:” + messageid);
}public message getmessage(string messageid){
return messagemap.get(messageid);
}//将消息发送到指定队列
public void sendmessage(string queuename, message message){
// linkedlist messages = queuemessagemap.get(queuename);
linkedlist messages = queuemessagemap.computeifabsent(queuename, k -> new linkedlist<>());
synchronized (messages){
//queuemessagemap.put(queuename, messages);
//将消息添加到队列中
messages.add(message);
}
//将消息添加到消息中心
addmessage(message);
log.info(“[memorydatacenter] 消息已成功添加到队列 queuename:” + queuename + “, messageid:” + message.getmessageid());
}//从指定队列中取消息
public message pollmessage(string queuename){
linkedlist messages = queuemessagemap.get(queuename);
//当链表中没有元素
if(messages == null){
return null;
}
synchronized (messages){
if(messages.size() == 0){
return null;
}
message message = messages.remove(0);
log.info(“[memorydatacenter] 消息已成功从队列中取出 queuename:” + queuename + “, messageid:” + message.getmessageid());
return message;
}
}//获取指定队列中消息的个数
public int getmessagesize(string queuename){
linkedlist messages = queuemessagemap.get(queuename);
if(messages == null){
return 0;
}
//linkedlist 线程不安全
synchronized (messages){
return messages.size();
}
}//添加未被确认消息
public void addmessagewaitack(string queuename, message message){
// concurrenthashmap<string, message> messagewaitack = queuemessagewaitackmap.get(queuename);
concurrenthashmap<string, message> messagewaitack = queuemessagewaitackmap.computeifabsent(queuename, k -> new concurrenthashmap<>());
synchronized (messagewaitack){
// if(messagewaitack == null){
// messagewaitack = new concurrenthashmap<>();
// queuemessagewaitackmap.put(queuename, messagewaitack);
// }
messagewaitack.put(message.getmessageid(), message);
}
log.info(“[memorydatacenter] 消息进入待确认队列 queuename:” + queuename + “, messageid:” + message.getmessageid());
}//删除未确认消息(用户确认了消息)
public void deletemessagewaitack(string queuename, string messageid){
concurrenthashmap<string, message> messagewaitack = queuemessagewaitackmap.get(queuename);
if(messagewaitack == null){
return;
}
messagewaitack.remove(messageid);
log.info(“[memorydatacenter] 消息已从待确认队列中删除 queuename:” + queuename + “, messageid:” + messageid);
}//从待确认队列中获取指定消息
public message getmessagewaitack(string queuename, string messageid){
concurrenthashmap<string, message> messagewaitack = queuemessagewaitackmap.get(queuename);
if(messagewaitack == null){
return null;
}
return messagewaitack.get(messageid);
}//从硬盘上恢复数据到内存中
public void recovery(diskdatacenter diskdatacenter) throws mqexception, ioexception, classnotfoundexception {
//1.清空之前的数据
exchangemap.clear();
queuemap.clear();
bindingmap.clear();
messagemap.clear();
queuemessagemap.clear();
//2.恢复交换机
list exchangelist = diskdatacenter.selectallexchanges();
for(exchange exchange : exchangelist){
insertexchange(exchange);
}
//3.恢复队列
list queuelist = diskdatacenter.selectallqueues();
for(msgqueue queue : queuelist){
insertqueue(queue);
}
//4.恢复绑定
list bindinglist = diskdatacenter.selectallbindings();
for(binding binding : bindinglist){
insertbinding(binding);
}
//5.恢复消息
for(msgqueue queue : queuelist){
linkedlist messagelist = diskdatacenter.loadallmessagefromqueue(queue.getname());
queuemessagemap.put(queue.getname(), messagelist);
for(message message : messagelist){
addmessage(message);
}
}
}
}
在内存中,需要考虑很多多线程的情况。对于add操作,往往会涉及到判空或者判断是否存在,有很大概率出现线程不安全的情况,因此需要加锁,而对于delete、select操作,加锁不一定是很必要的,需要具体问题具体分析。能用线程安全的尽量使用,但也要考虑锁粒度是否会影响效率~
### 虚拟主机
在虚拟主机中需要实现对内存硬盘的管理,并提供对交换机、队列、绑定、消息管理的核心api,供上层服务器调用(服务器主要处理网络请求,然后调用这边的api实现业务)。使用虚拟主机的目的是为了隔离交换机、队列、绑定、消息,一个服务器上可以有多个虚拟主机,在rabbitmq中,就提供了对虚拟主机的创建、删除操作,但在本项目中先不实现。
要想实现隔离的效果,我们得知道这些交换机是从属于哪个虚拟主机的。不·难发现,一个虚拟机可以对应多个交换机,而一个交换机只能对应一个虚拟主机,这是一个"一对多"的关系。
方案一:参照数据库中“一对多”的关系来设计,给交换机增加一个属性,表示从属的虚拟主机;
方案二:给交换机增加一个前缀名,即为虚拟主机的名字,然后可以根据前缀知道从属的虚拟主机;
方案三:为每个虚拟主机单独分配数据库和文件,从物理上真正实现隔离。
此处选择的是方案二。
---
先来对交换机、队列、绑定的增删.
public class virtualhost {
private string virtualhostname;
//硬盘数据管理
private diskdatacenter diskdatacenter = new diskdatacenter();
//内存数据管理
private memorydatacenter memorydatacenter = new memorydatacenter();
private router router = new router();
private final object exchangelocker = new object();
private final object queuelocker = new object();
public virtualhost(string name){
//需要对数据进行初始化(数据库建库建表操作)
this.virtualhostname = name;
diskdatacenter.init();
//将硬盘上的数据恢复到内存中
try {
memorydatacenter.recovery(diskdatacenter);
} catch (mqexception | ioexception | classnotfoundexception e) {
log.error("[virtualhost] 恢复内存数据失败");
e.printstacktrace();
}
}
//创建交换
public boolean exchangedeclare(string exchangename, exchangetype exchangetype, boolean durable,
boolean autodelete, map<string, object> arguments) {
//为了进行虚拟主机的隔离,此处采用在交换机的前缀加上虚拟主机名的方式
exchangename = virtualhostname + "_" + exchangename;
try{
synchronized (exchangelocker){
//1.判断交换机是否存在
exchange exsitsexchange = memorydatacenter.getexchange(exchangename);
if(exsitsexchange != null){
throw new mqexception("[virtualhost] 交换机已经存在 exchangename:" + exchangename);
}
//2.创建交换机
exchange exchange = new exchange();
exchange.setname(exchangename);
exchange.settype(exchangetype);
exchange.setdurable(durable);
exchange.setautodelete(autodelete);
exchange.setarguments(arguments);
//3.将交换机写入硬盘
if(durable){
diskdatacenter.insertexchange(exchange);
}
//4.将交换机写入内存
memorydatacenter.insertexchange(exchange);
log.info("[virtualhost] 交换机创建成功 exchangename:" + exchangename);
return true;
}
} catch (exception e){
log.error("[virtualhost] 交换机创建失败 exchangename:" + exchangename);
e.printstacktrace();
return false;
}
}
//删除交换机
public boolean exchangedelete(string exchangename){
exchangename = virtualhostname + "_" + exchangename;
try{
synchronized (exchangelocker){
//1.判断交换机是否存在
exchange existsexchange = memorydatacenter.getexchange(exchangename);
if(existsexchange == null){
throw new mqexception("[virtualhost] 交换机不存在 exchangename:" + exchangename);
}
//2.将交换机从硬盘上删除
//此处不要取反,isdurable 表示 是否存在硬盘上
//如果对象存储在硬盘上才要删除,没有存在硬盘上就不用
if(existsexchange.isdurable()){
diskdatacenter.deleteexchange(exchangename);
}
//3.将交换机从内存中删除
memorydatacenter.insertexchange(existsexchange);
log.info("[virtualhost] 交换机删除成功 exchangename:" + exchangename);
}
return true;
} catch (exception e){
log.error("[virtualhost] 交换机删除失败 exchangename:" + exchangename);
e.printstacktrace();
return false;
}
}
//创建队列
public boolean queuedeclare(string queuename, boolean exclusive, boolean durable, boolean autodelete,
map<string, object> arguments){
queuename = virtualhostname + "_" + queuename;
try{
synchronized (queuelocker){
msgqueue existsqueue = memorydatacenter.getqueue(queuename);
if(existsqueue != null){
throw new mqexception("[virtualhost] 队列已经存在 queuename:" + queuename);
}
//创建队列
msgqueue queue = new msgqueue();
queue.setname(queuename);
queue.setdurable(durable);
queue.setexclusive(exclusive);
queue.setautodelete(autodelete);
queue.setarguments(arguments);
//写入硬盘
if(durable){
diskdatacenter.insertqueue(queue);
}
//写入内存
memorydatacenter.insertqueue(queue);
log.info("[virtualhost] 队列创建成功 queuename:" + queuename);
}
return true;
} catch (exception e){
log.error("[virtualhost] 队列创建失败 queuename:" + queuename);
e.printstacktrace();
return false;
}
}
//删除队列
public boolean queuedelete(string queuename){
queuename = virtualhostname + "_" + queuename;
try{
synchronized (queuelocker){
msgqueue existsqueue = memorydatacenter.getqueue(queuename);
if(existsqueue == null){
throw new mqexception("[virtualhost] 队列不存在 queuename:" + queuename);
}
if(existsqueue.isdurable()){
diskdatacenter.deletequeue(queuename);
}
memorydatacenter.deletequeue(queuename);
log.error("[virtualhost] 队列删除成功 queuename:" + queuename);
return true;
}
}catch (exception e){
log.error("[virtualhost] 队列删除失败 queuename:" + queuename);
e.printstacktrace();
return false;
}
}
//添加绑定关系
public boolean queuebind(string exchangename, string queuename, string bindingkey){
exchangename = virtualhostname + "_" + exchangename;
queuename = virtualhostname + "_" + queuename;
try{
synchronized (exchangelocker){
synchronized (queuelocker){
//1.验证转发规则
//等下实现~
if(!router.checkbindingkey(bindingkey)){
throw new mqexception("[virtualhost] bindingkey非法 bindingkey:" + bindingkey);
}
//2.判断绑定关系是否已经存在
binding existsbinding = memorydatacenter.getbinding(exchangename, queuename);
if(existsbinding != null){
throw new mqexception("[virtualhost] 绑定已经存在 exchangename:" + exchangename + ", queuename:" + queuename);
}
//3.判断交换机和队列是否都存在
exchange existsexchange = memorydatacenter.getexchange(exchangename);
msgqueue existsqueue = memorydatacenter.getqueue(queuename);
if(existsexchange == null){
throw new mqexception("[virtualhost] 交换机不存在 exchangename:" + exchangename);
}
if(existsqueue == null){
throw new mqexception("[virtualhost] 队列不存在 queuename:" + queuename);
}
//4.创建绑定
binding binding = new binding();
binding.setexchangename(exchangename);
binding.setqueuename(queuename);
binding.setbindingkey(bindingkey);
//5.将绑定写入硬盘
diskdatacenter.insertbinding(binding);
//6.将绑定写入内存
memorydatacenter.insertbinding(binding);
log.info("[virtualhost] 绑定添加成功 exchangename:" + exchangename + ", queuename:" + queuename);
}
}
return true;
} catch (exception e){
log.error("[virtualhost] 绑定创建失败 exchangename:" + exchangename + ", queuename:" + queuename);
e.printstacktrace();
}
return false;
}
//解除绑定
public boolean queueunbind(string exchangename, string queuename){
exchangename = virtualhostname + "_" + exchangename;
queuename = virtualhostname + "_" + queuename;
try{
synchronized (exchangelocker){
synchronized (queuelocker){
//检查是否有这个绑定
binding existsbinding = memorydatacenter.getbinding(exchangename, queuename);
if(existsbinding == null){
throw new mqexception("[virtualhost] 绑定不存在 exchangename:" + exchangename + ", queuename:" + queuename);
}
//从硬盘中删除绑定关系
diskdatacenter.deletebinding(existsbinding);
//从内存中删除绑定关系
memorydatacenter.deletebinding(existsbinding);
log.info("[virtualhost] 绑定解除成功 exchangename:" + exchangename + ", queuename:" + queuename);
}
}
return true;
} catch (exception e){
log.error("[virtualhost] 绑定解除失败 exchangename:" + exchangename + ", queuename:" + queuename);
e.printstacktrace();
return false;
}
}
虽然我们在memorydatacenter中已经进行过了加锁操作,但是现在又再virtualhost中又进行了加锁,是否之前的锁就没有意义了呢?
当然不是,因为无法事先知道什么时候、在哪去调用这些api,如果一个其他线程不安全的类去调用memorydatacenter中的一些api,就会引发线程安全问题。
在virtualhost中定义了一把 交换机锁、 一把队列锁,这些锁的力度比较大,当一个线程去操作交换机a,此时另一个线程即使操作的是交换机b,也会进行阻塞,这会大大降低程序效率。不过影响不是很大,因为上述的这些操作都属于是低频操作,消息队列主要还是消息的转发~
---
上述实现了一些辅助消息交换转发的api,现在要来实现消息队列中最核心的两个api:发布消息 basicpublish 和 订阅消息 basicconsume.
**basicpublish发布消息**
发布消息是指生产者将一个消息发送到指定交换机上,然后交换机根据自身的类型,进行消息的转发,发送到与之绑定的队列中。
交换机有3种类型(本项目中实现了3种):
1.dircet 直接交换机,消息中的routinkey就是队列名,直接将消息发送给队列即可.
2.fanout 扇出交换机,消息会发送到所有与之绑定的队列中,此时消息中的routingkey无效.
3.topic 主题交换机,消息会发送给所有与之绑定且binding中的bindingkey 与 消息中的 rouingkey 匹配的队列.
首先我们先来实现路由转发的规则,大致包含如下几个api:checkroutingkey、checkbindingkey、route.
**router:**
这个类用来实现路由转发的规则
/
public class router {
//规定:
//bindingkey 由 数字,字母,下划线组成,通过.来分隔,允许通配符,#存在
//形如: 123.abc..aa.#
public boolean checkbindingkey(string bindingkey) {
for (int i = 0; i < bindingkey.length(); i++) {
char ch = bindingkey.charat(i);
if ((ch >= ‘a’ && ch <= ‘z’) || (ch >= ‘0’ && ch <= ‘9’) || (ch >= ‘a’ && ch <= ‘z’)) {
continue;
}
if (ch == ‘.’ || ch == '’ || ch == '’ || ch == ‘#’) {
continue;
}
return false;
}
//这里处理通配符不合法的情况
//在正则表达式中需要对.进行转义 -> .
//而 \ 也需要进行转义,再添加一个 \
string[] words = bindingkey.split(“\.”);
for (int i = 0; i < words.length; i++) {
string word = words[i];
if (word.length() > 1 && (word.contains(““) || word.contains(”#“))) {
return false;
}
if (word.equals(”#“)) {
if (i + 1 < words.length && (words[i + 1].equals(”#“) || words[i + 1].equals(””))) {
return false;
}
}
}
return true;
}
//规定:
//routingkey 由 数字,字母,下划线组成,通过.来分隔
//形如: 123.abc._123
public boolean checkroutingkey(string routingkey) {
for (int i = 0; i < routingkey.length(); i++) {
char ch = routingkey.charat(i);
if ((ch >= 'a' && ch <= 'z') || (ch >= '0' && ch <= '9') || (ch >= 'a' && ch <= 'z')) {
continue;
}
if (ch == '.' || ch == '_') {
continue;
}
return false;
}
return true;
}
//进行路由
public boolean route(exchangetype exchangetype, string routingkey, string bindingkey) {
if (exchangetype == exchangetype.fanout) {
return true;
}
if (exchangetype == exchangetype.topic) {
if (routetopic(routingkey, bindingkey)) {
return true;
}
}
return false;
}
//主题交换机的路由
//*可以代表两个.之间的任意的字符串
//#可以代表0个或多个*
private boolean routetopic(string routingkey, string bindingkey) {
string[] routingtokens = routingkey.split("\\.");
string[] bindingtokens = bindingkey.split("\\.");
int routingindex = 0;
int bindingindex = 0;
while(routingindex < routingtokens.length && bindingindex < bindingtokens.length){
if(bindingtokens[bindingindex].equals("#")){
if(bindingindex == bindingtokens.length - 1){
return true;
}else {
bindingindex++;
for(int i = routingindex; i < routingtokens.length; i++){
if(bindingtokens[bindingindex].equals(routingtokens[routingindex])){
break;
}
}
if(routingindex == routingtokens.length){
return false;
}
}
}else{
if(!routingtokens[routingindex].equals(bindingtokens[bindingindex])
&& !bindingtokens[bindingindex].equals("*")){
return false;
}
}
routingindex++;
bindingindex++;
}
if(routingindex == routingtokens.length && bindingindex == bindingtokens.length){
return true;
}
return false;
}
}
上述routingkey、bindingkey、路由规则 都是源自于amqp协议
接下来实现消息的发布。
**basicpublish:**
//向指定交换机发送消息,并根据交换机的转发规则,向队列发送消息
public boolean basicpublish(string exchangename, string routingkey, basicproperties basicproperties, byte[] body){
exchangename = virtualhostname + “" + exchangename;
try{
//1.判断交换机是否存在
exchange exchange = memorydatacenter.getexchange(exchangename);
if(exchange == null){
throw new mqexception(“[virtualhost] 交换机不存在 exchangename:” + exchangename);
}
//2.判断routingkey是否合法
if(!router.checkroutingkey(routingkey)){
throw new mqexception(“[virtualhost] routingkey非法 routingkey:” + routingkey);
}
//3.检查交换机类型
if(exchange.gettype() == exchangetype.direct){
//直接交换机, routingkey为queuename
//4.查找指定队列
string queuename = virtualhostname + "” + routingkey;
msgqueue queue = memorydatacenter.getqueue(queuename);
if(queue == null){
throw new mqexception(“[virtualhost] 队列不存在 queuename:” + queuename);
}
//5.构造message对象
message message = message.createmessagewithid(basicproperties, routingkey, body);
//6.发送消息到指定队列
sendmessage(queue, message);
}else{
//4.获取到所有绑定关系
concurrenthashmap<string, binding> bindings = memorydatacenter.getbindings(exchangename);
for(map.entry<string, binding> entry : bindings.entryset()){
binding binding = entry.getvalue();
//5.查找指定队列
string queuename = virtualhostname + “_” + binding.getqueuename();
msgqueue queue = memorydatacenter.getqueue(queuename);
if(queue == null){
//如果当前不存在,还需要检查其他的队列是否存在
continue;
}
//6.检查路由规则:
// fanout, 则该交换机绑定的所有队列都要转发
// topic, 则需要判断routingkey和bindingkey是否能对上
if(!router.route(exchange.gettype(), routingkey, binding.getbindingkey())){
continue;
}
//7.构造message对象
message message = message.createmessagewithid(basicproperties, routingkey, body);
//8.发送消息到指定队列
sendmessage(queue, message);
}
log.info(“[virtualhost] 发送消息成功”);
}
return true;
} catch (exception e){
log.error("[virtualhost] 发送消息失败 exchangename: " + exchangename);
e.printstacktrace();
return false;
}
}
**basicconsume订阅消息**
订阅消息是指消费者绑定一个队列,后续队列中有消息就自动的推送过来。
//订阅消息
//添加一个队列订阅者,当队列收到消息,就把消息推送给订阅者
//consumertag: 消费者的唯一标识
//autoack: 是否自动应答
//consumer: 回调函数,并写成了函数式接口,描述了消费者的行为
public boolean basicconsume(string consumertag, string queuename, boolean autoack, consumer consumer){
queuename = virtualhostname + “_” + queuename;
try{
consumermanager.addconsumer(consumertag, queuename, autoack, consumer);
log.info(“[virtualhost] 订阅消息成功 queuename:” + consumertag + “, queuename:” + queuename);
return true;
}catch (exception e){
log.error(“[virtualhost] 订阅消息失败 consumertag:” + consumertag + “, queuename:” + queuename);
e.printstacktrace();
return false;
}
}
接下来实现consumermanager中的api:notifyconsumer 提醒消费者消费消息,addconsumer 添加消费者,consumemessage 消费消息。
首先我们需要描述一下消费者
@data
//描述消费者的参数
public class consumerenv {
private string consumertag;
private string queuename;
private boolean autoack;
//通过这个回调来处理收到的消息
private consumer consumer;
public consumerenv() {
}
public consumerenv(string consumertag, string queuename, boolean autoack, consumer consumer) {
this.consumertag = consumertag;
this.queuename = queuename;
this.autoack = autoack;
this.consumer = consumer;
}
}
@functionalinterface
public interface consumer {
//把消息推送给消费者
void handledelivery(string consumertag, basicproperties basicproperties, byte[] body) throws mqexception, ioexception;
}
在生产者和消费者的交互逻辑大致有两种:
1.生产者先调用basicpublish发布消息到了某个队列,此时还没有消费者订阅这个队列,后续有消费者订阅了,直接消费消息即可
2.消费者先订阅了一个队列,但生产者还没有生产消息到队列中,后续生产者生产完消息后,提醒消费者去队列中消费消息即可
由于一个队列可能有多个消费者订阅,我们需要使用一个数据结构来存储,并且还需要制定一个算法,实现将消息能给发送给多个消费者。此处采用轮询的方式,即类似取队列元素的方式。
需要在msgqueue中添加一个字段,用来存储这个队列中的订阅者,并提供相应的取出某一个订阅者的api
@data
public class msgqueue {
private string name;
//是否独占(只能被一个消费者使用)
private boolean exclusive = false;
//是否可持久化
private boolean durable = false;
//是否自动删除
private boolean autodelete = false;
//可选参数
private map<string, object> arguments = new hashmap<>();
//订阅者
private linkedlist consumerenvs = new linkedlist<>();
//轮询序号
private atomicinteger consumerseq = new atomicinteger(0);
public void addconsumerenv(consumerenv consumerenv){
consumerenvs.add(consumerenv);
}
public consumerenv chooseconsumer(){
if(consumerenvs.size() == 0){
return null;
}
int index = consumerseq.get() % consumerenvs.size();
consumerseq.getandincrement();
return consumerenvs.get(index);
}
}
程序如何知道有新的消息来呢?通过一个扫描线程扫描队列,然后再将这个消息提交到线程池中,让线程池去执行服务器端的回调函数。

如果使用一个扫描线程,去扫描所有队列的话,可以去遍历存储队列对象的哈希表,理论上行,但是不够优雅,如果对每一个队列都使用一个线程的话,会开辟很多线程,也会影响效率。在这使用一个阻塞队列,当有一个新的消息添加到队列中,会将这个队列名插入到这个阻塞队列中,后续只要根据这个阻塞队列,去取数据然后处理即可。
@slf4j
/*
-
通过这个类来实现消费消息
-
每当有消息到队列的时候,就将队列名(令牌)添加到令牌队列中,线程池再根据这个队列进行处理
*/
public class consumermanager {
//方便操作数据
private virtualhost parent;
//使用线程池,用来处理回调任务
private executorservice workpool = executors.newfixedthreadpool(4);
//每当队列中来了消息,就触发这个令牌队列,通知可以去消费消息
private blockingdeque tokenqueue = new linkedblockingdeque<>();
//扫描线程,扫描令牌队列中是否有新令牌
private thread scanthread = null;public consumermanager(virtualhost p){
this.parent = p;
//创建扫描线程
scanthread = new thread(() ->{
while(true){
try {
//获取令牌
string tokenqueuename = tokenqueue.take();
//根据令牌找到指定队列
msgqueue queue = parent.getmemorydatacenter().getqueue(tokenqueuename);
if(queue == null){
throw new mqexception(“[consumermanager] 令牌队列不存在 queuename:” + tokenqueuename);
}
//从队列中消费消息
synchronized (queue){
consumemessage(queue);
}
} catch (interruptedexception | mqexception e) {
e.printstacktrace();
}
}
});//设置为后台线程 scanthread.setdaemon(true); scanthread.start();
}
//通知消费消息
public void notifyconsumer(string queuename) throws interruptedexception {
tokenqueue.put(queuename);
}//添加订阅者
public void addconsumer(string consumertag, string queuename, boolean autoack, consumer consumer){
msgqueue queue = parent.getmemorydatacenter().getqueue(queuename);
try{
if(queue == null){
throw new mqexception(“[consumermanager] 队列不存在 queuename:” + queuename);
}
synchronized (queue){
//添加消费者
queue.addconsumerenv(new consumerenv(consumertag, queuename, autoack, consumer));
//如果有消息就进行消费
int n = parent.getmemorydatacenter().getmessagesize(queuename);
for(int i = 0; i < n; i++){
//消费
consumemessage(queue);
}
}
}catch (exception e){
log.error(“[consumermanager] 添加订阅者失败”);
e.printstacktrace();
}
}//消费消息
private void consumemessage(msgqueue queue){
//1.通过轮询方式,寻找消费者
consumerenv luckdog = queue.chooseconsumer();
if(luckdog == null){
return;
}
//2.获取队列中的消息
message message = parent.getmemorydatacenter().pollmessage(queue.getname());
if(message == null){
return;
}
//3.把消息带入到回调方法,交给线程池处理
workpool.submit(() -> {
try{
//1.把消息添加到待确认队列中
parent.getmemorydatacenter().addmessagewaitack(queue.getname(), message);
//2.执行回调操作,消费消息
luckdog.getconsumer().handledelivery(luckdog.getconsumertag(), message.getbasicproperties(), message.getbody());
//3.如果当前是自动应答.则将消息删除
// 如果是手动应答,则先不处理,让消费者后续调用 basicack 处理
if(luckdog.isautoack()){
//删除消息从两个方面进行,内存和硬盘
//硬盘
if(message.isdurable()){
parent.getdiskdatacenter().deletemessage(queue, message);
}
//内存
parent.getmemorydatacenter().deletemessagewaitack(queue.getname(), message.getmessageid());
parent.getmemorydatacenter().removemessage(message.getmessageid());
log.info(“[consumermanager] 消息被成功消费 queuename:” + queue.getname());
}
}catch (exception e){
log.info(“[consumermanager] 消息消费失败 queuename:” + queue.getname());
e.printstacktrace();
}
});
}
}
最后还差一个basicack手动应答,即消费者消费完消息,然后告诉服务器。
//手动应答
public boolean basicack(string queuename, string messageid){
queuename = virtualhostname + “_” + queuename;
try{
msgqueue queue = memorydatacenter.getqueue(queuename);
if(queue == null){
throw new mqexception(“[virtualhost] 要确认的队列不存在 queuename:” + queuename);
}
message message = memorydatacenter.getmessage(messageid);
if(message == null){
throw new mqexception(“[virtualhost] 要确认的消息不存在 messageid:” + messageid);
}
if(message.isdurable()){
diskdatacenter.deletemessage(queue, message);
}
memorydatacenter.deletemessagewaitack(queuename, messageid);
memorydatacenter.removemessage(messageid);
log.info(“[virtualhost] 消息确认收到 queuename:” + queuename);
return true;
}catch (exception e){
log.info(“[virtualhost] 消息确认失败 queuename:” + queuename);
e.printstacktrace();
return false;
}
}
### 约定应用层协议
在网络通信中,我们需要约定协议,方便对方接收并读取。在这里使用http协议和其他一些不是很妥,于是我们在tcp协议的基础上自定义实现应用层协议。
**协议格式:**

类型用来描述,这次请求/响应 是干啥的,例如:0x03 表示 这个请求是创建exchange.
由于进行网络通信也是使用的二进制,为了防止出现粘包等问题,使用长度来分隔.
对于请求,载荷中保存的是序列化的方法的参数,对于响应,载荷中保存的是序列化的方法的返回值.
我们自定义的应用层协议是建立在tcp协议的基础上,而每一次tcp协议都为了可靠性会进行三次握手、四次挥手,如果频繁的创建的话,会比较费时,在这我们使用多信道的方式,复用每一次tcp连接,即tcp连接是一直保持的,但是断开是逻辑上的断开。
**约定类型:**
| 类型 | 方法 |
| --- | --- |
| 0x01 | 创建 channel |
| 0x02 | 关闭 channel |
| 0x03 | 创建 exchange |
| 0x04 | 销毁 exchange |
| 0x05 | 创建 queue |
| 0x06 | 销毁 queue |
| 0x07 | 创建 binding |
| 0x08 | 销毁 binding |
| 0x09 | 发送 message |
| 0x0a | 订阅 queue |
| 0x0b | 确认应答 |
| 0x0c | 服务器给客户端推送的消息 |
### 创建载荷类
为了方便后续代码编写,使用两个类来描述网络通信:request,response
**request:**
//表示网络通信中的请求对象
//在此约定:
//一个请求由三部分组成:类型+长度+载荷
//在请求中,载荷为调用方法的参数
@data
public class request {
private int type;
private int length;
private byte[] payload;
}
**response:**
**网上学习资料一大堆,但如果学到的知识不成体系,遇到问题时只是浅尝辄止,不再深入研究,那么很难做到真正的技术提升。**
**需要这份系统化的资料的朋友,可以添加v获取:vip204888 (备注大数据)**

**一个人可以走的很快,但一群人才能走的更远!不论你是正从事it行业的老鸟或是对it行业感兴趣的新人,都欢迎加入我们的的圈子(技术交流、学习资源、职场吐槽、大厂内推、面试辅导),让我们一起学习成长!**
ssageid:" + messageid);
}
if(message.isdurable()){
diskdatacenter.deletemessage(queue, message);
}
memorydatacenter.deletemessagewaitack(queuename, messageid);
memorydatacenter.removemessage(messageid);
log.info("[virtualhost] 消息确认收到 queuename:" + queuename);
return true;
}catch (exception e){
log.info("[virtualhost] 消息确认失败 queuename:" + queuename);
e.printstacktrace();
return false;
}
}
约定应用层协议
在网络通信中,我们需要约定协议,方便对方接收并读取。在这里使用http协议和其他一些不是很妥,于是我们在tcp协议的基础上自定义实现应用层协议。
协议格式:
类型用来描述,这次请求/响应 是干啥的,例如:0x03 表示 这个请求是创建exchange.
由于进行网络通信也是使用的二进制,为了防止出现粘包等问题,使用长度来分隔.
对于请求,载荷中保存的是序列化的方法的参数,对于响应,载荷中保存的是序列化的方法的返回值.
我们自定义的应用层协议是建立在tcp协议的基础上,而每一次tcp协议都为了可靠性会进行三次握手、四次挥手,如果频繁的创建的话,会比较费时,在这我们使用多信道的方式,复用每一次tcp连接,即tcp连接是一直保持的,但是断开是逻辑上的断开。
约定类型:
类型 | 方法 |
---|---|
0x01 | 创建 channel |
0x02 | 关闭 channel |
0x03 | 创建 exchange |
0x04 | 销毁 exchange |
0x05 | 创建 queue |
0x06 | 销毁 queue |
0x07 | 创建 binding |
0x08 | 销毁 binding |
0x09 | 发送 message |
0x0a | 订阅 queue |
0x0b | 确认应答 |
0x0c | 服务器给客户端推送的消息 |
创建载荷类
为了方便后续代码编写,使用两个类来描述网络通信:request,response
request:
//表示网络通信中的请求对象
//在此约定:
//一个请求由三部分组成:类型+长度+载荷
//在请求中,载荷为调用方法的参数
@data
public class request {
private int type;
private int length;
private byte[] payload;
}
response:
网上学习资料一大堆,但如果学到的知识不成体系,遇到问题时只是浅尝辄止,不再深入研究,那么很难做到真正的技术提升。
需要这份系统化的资料的朋友,可以添加v获取:vip204888 (备注大数据)
[外链图片转存中…(img-cmx91exl-1713197004039)]
一个人可以走的很快,但一群人才能走的更远!不论你是正从事it行业的老鸟或是对it行业感兴趣的新人,都欢迎加入我们的的圈子(技术交流、学习资源、职场吐槽、大厂内推、面试辅导),让我们一起学习成长!
发表评论