当前位置: 代码网 > it编程>编程语言>Java > RabbitMQ的基本使用&入门

RabbitMQ的基本使用&入门

2024年07月31日 Java 我要评论
RabbitMQ可以实现可靠的异步通信,解耦系统中的组件,以及处理高并发和大规模的消息流。它在许多领域,如微服务架构、日志处理、任务调度等方面都有广泛的应用RabbitMQ作为消息中间件,它允许应用程序之间通过消息进行通信。消息被发送到队列中,然后由消费者从队列中接收并处理。生产者是将消息发送到RabbitMQ的应用程序。它将消息发布到一个或多个队列。消费者是从RabbitMQ接收消息并进行处理的应用程序。一个队列可以有多个消费者,但每条消息只会被一个消费者处理。交换机负责将消息路由到一个或多个队列。

前言

rabbitmq是一个开源的消息代理软件,用于处理应用程序之间的消息传递。它实现了高级消息队列协议(amqp),是一个可靠且灵活的消息中间件,广泛用于构建分布式系统、微服务架构以及异构系统之间的通信 ,本期文章将为大家带来rabbitmq的基本使用及入门

 一.rabbitmq基本介绍

rabbitmq可以实现可靠的异步通信,解耦系统中的组件,以及处理高并发和大规模的消息流。它在许多领域,如微服务架构、日志处理、任务调度等方面都有广泛的应用

消息队列(message queue): rabbitmq作为消息中间件,它允许应用程序之间通过消息进行通信。消息被发送到队列中,然后由消费者从队列中接收并处理。

生产者(producer): 生产者是将消息发送到rabbitmq的应用程序。它将消息发布到一个或多个队列。

消费者(consumer): 消费者是从rabbitmq接收消息并进行处理的应用程序。一个队列可以有多个消费者,但每条消息只会被一个消费者处理。

交换机(exchange): 交换机负责将消息路由到一个或多个队列。生产者将消息发送到交换机,而交换机则将消息路由到相关的队列。

队列(queue): 队列是存储消息的地方,它是消息的终点。消费者从队列中获取消息并进行处理。

绑定(binding): 绑定是交换机和队列之间的关系,它定义了如何将消息从交换机路由到队列。

  1. 持久性(durable): rabbitmq支持将队列和消息标记为持久性,以确保在服务器重启时数据不会丢失。

  2. 虚拟主机(virtual host): 虚拟主机是rabbitmq中消息隔离的单位,允许在同一rabbitmq服务器上创建多个独立的消息代理。

  3. ack(acknowledgment): 消费者在处理完消息后发送ack,通知rabbitmq该消息已被成功处理。这确保消息不会在处理失败时丢失。

  4. 插件系统: rabbitmq具有丰富的插件系统,允许扩展和定制其功能

1.常见消息队列

2.docker安装部署rabbitmq

1.拉取镜像

docker pull  rabbitmq:management

2.创建容器

在虚拟机中运行以下命令 

docker run -d \
--name my-rabbitmq \
-p 5672:5672 -p 15672:15672 \
--hostname my-rabbitmq-host \
-e rabbitmq_default_vhost=my_vhost \
-e rabbitmq_default_user=admin \
-e rabbitmq_default_pass=admin \
--restart=always \
rabbitmq:management

开放端口

firewall-cmd --zone=public --add-port=15672/tcp --permanent
firewall-cmd --zone=public --add-port=5672/tcp --permanent

容器创建运行完成后我们可以在自己的主机上通过ip:15672进入到管理员界面,账号密码为我们上面命令指定的admin,登录即可

3.添加用户

 添加用户输入账户名和密码指定当前用户身份

4.分配操作虚拟主机

刚创建完的用户还不能进行一个登录,需要给它分配一个能够操作的虚拟主机

 virtal host是可以分配的虚拟主机,set permission可以对其进行分配

 分配完成后的用户就可以对当前虚拟主机进行登录等操作了

二.rabbitmq实现简单队列

1.项目创建

我们以生产者和消费者的形式创建springboot项目

 

创建时需要注意,以springboot项目为例在勾选依赖时,要将spring for rabbit勾选

2.对生产者和消费者进行配置

生产者和消费者的配置文件都需要更改

server:
    port: 8888
spring:
    rabbitmq:
        host: rabbit所在主机的ip
        username: yu
        password: 123456
        port: 5672
        virtual-host: my_vhost

3.创建rabbitmq队列的spring配置类

通过@bean注解,它定义了一个名为firstqueue的bean,该bean是一个queue实例,代表了一个名为"firstqueue"的rabbitmq队列。这样的配置允许应用程序使用该队列进行消息的生产和消费

package com.yu.publisher;

import org.springframework.amqp.core.queue;
import org.springframework.context.annotation.bean;
import org.springframework.context.annotation.configuration;

@configuration
@suppresswarnings("all")
public class rabbitconfig {
@bean
    public queue firstqueue() {
        return new queue("firstqueue");
    }
}

4.创建控制层进行访问测试

1.生产者

我们通过导入spring amqp框架中用于发送消息的amqptemplate接口,调用amqptemplateconvertandsend方法向名为"firstqueue"的rabbitmq队列发送消息,消息内容为"hello world",发送成功后返回一个字符串"🚀",作为http响应体。

package com.yu.publisher;

import org.springframework.amqp.core.amqptemplate;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.web.bind.annotation.requestmapping;
import org.springframework.web.bind.annotation.restcontroller;

@restcontroller
public class testcontroller {
    @autowired
    private amqptemplate amqptemplate;

    @requestmapping("send1")
    public string send1(){
        //向消息队列发送消息
        amqptemplate.convertandsend("firstqueue","hello world");
        return "🚀";
    }
}

测试阶段:

启动生产者,在浏览器调用send1(controller中的方法)进行访问测试

这时我们打开rabbitmq控制面板中的消息队列就可以看到我们刚刚发送的消息了

 

 2.消费者

我们通过注解@rabbithandler指定该类监听名为"firstqueue"的rabbitmq队列和@rabbithandler注解标识该方法为处理rabbitmq消息的方法,使用lombok生成的日志记录器记录接收到的消息,将"接收到:"与消息内容拼接并记录为警告级别的日志,当队列"firstqueue"中有消息到达时,process方法会被调用,将接收到的消息记录到日志中

package com.yu.consumer;

import lombok.extern.slf4j.slf4j;
import org.springframework.amqp.rabbit.annotation.rabbithandler;
import org.springframework.amqp.rabbit.annotation.rabbitlistener;
import org.springframework.stereotype.component;

@component
@suppresswarnings("all")
@slf4j
@rabbitlistener(queues = "firstqueue")
public class receiver {
@rabbithandler
public void process(string msg) {
log.warn("接收到:" + msg);
}
}

启动消费者,这时,当我们再去访问生产者时,在日志信息中就会返回生产者所发送的消息

5.通过消息队列发送对象

 在rabbitmq中发送对象通常需要进行对象的序列化和反序列化,以便将对象转换为消息体发送到队列,然后从队列接收消息并还原为对象

1.在生产者中创建实体user

实现序列化接口

package com.yu.publisher;

import lombok.allargsconstructor;
import lombok.data;
import lombok.noargsconstructor;

import java.io.serializable;

@suppresswarnings("all")
@data
@allargsconstructor
@noargsconstructor
public class user implements serializable {
private string username;
private string userpwd;
}

2.配置rabbitmq的quene实例

package com.yu.publisher;

import org.springframework.amqp.core.queue;
import org.springframework.context.annotation.bean;
import org.springframework.context.annotation.configuration;

@configuration
@suppresswarnings("all")
public class rabbitconfig {
    @bean
    public queue secondqueue() {
        return new queue("secondqueue");
    }

}

3.创建发送信息方法

使用objectmapper将其序列化为json字符串,然后通过amqptemplate.convertandsend将这个json字符串发送到名为"secondqueue"的rabbitmq队列

package com.yu.publisher;

import com.fasterxml.jackson.databind.objectmapper;
import org.springframework.amqp.core.amqptemplate;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.web.bind.annotation.requestmapping;
import org.springframework.web.bind.annotation.restcontroller;

@restcontroller
public class testcontroller {
    @autowired
    private amqptemplate amqptemplate;
    @autowired
    private objectmapper objectmapper;

    @requestmapping("send2")
    public string send2() throws exception{
        user user = new user("yu","123");
        string yu = objectmapper.writevalueasstring(user);
        //向消息队列发送消息
        amqptemplate.convertandsend("secondqueue",yu);
        return "🚀";
    }
}

4.创建接受信息方法

声明一个方法作为消息处理器,标记方法用于处理从rabbitmq队列接收到的消息,处理消息的方法,该方法接收一个json字符串参数使用objectmapper将json字符串转换为user对象,使用slf4j日志框架记录接收到的user对象,这里使用warn级别,接收并处理从"secondqueue"队列中接收到的消息

package com.yu.consumer;

import com.fasterxml.jackson.core.jsonprocessingexception;
import com.fasterxml.jackson.databind.objectmapper;
import lombok.extern.slf4j.slf4j;
import org.springframework.amqp.rabbit.annotation.rabbithandler;
import org.springframework.amqp.rabbit.annotation.rabbitlistener;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.stereotype.component;

@component
@suppresswarnings("all")
@slf4j
@rabbitlistener(queues = "secondqueue")
public class pojoreceiver {

    @autowired
    private objectmapper objectmapper;
    @rabbithandler
    public void process(string json) throws exception {
        user user = objectmapper.readvalue(json, user.class);
        log.warn("接收到:" + user);
    }
}

测试阶段:

当我们向消息队列中发送一个对象,消费者再进行接收

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2025  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com