spring boot 消息队列与异步处理
36.1 学习目标与重点提示
学习目标:掌握spring boot消息队列与异步处理的核心概念与使用方法,包括消息队列的定义与特点、异步处理的定义与特点、spring boot与消息队列的集成、spring boot与异步处理的集成、spring boot的实际应用场景,学会在实际开发中处理消息队列与异步处理问题。
重点:消息队列的定义与特点、异步处理的定义与特点、spring boot与消息队列的集成、spring boot与异步处理的集成、spring boot的实际应用场景。
36.2 消息队列与异步处理概述
消息队列与异步处理是java开发中的重要组件,用于实现系统的异步处理和消息传递。
36.2.1 消息队列的定义
定义:消息队列是一种用于存储和传递消息的中间件,支持异步通信和消息处理。
作用:
- 实现系统的异步处理。
- 提高系统的响应速度。
- 实现系统的解耦。
常见的消息队列:
- rabbitmq:开源的消息队列。
- activemq:开源的消息队列。
- kafka:分布式流处理平台。
✅ 结论:消息队列是一种用于存储和传递消息的中间件,作用是实现系统的异步处理、提高系统的响应速度、实现系统的解耦。
36.2.2 异步处理的定义
定义:异步处理是指系统在处理请求时,不需要等待请求完成就可以继续处理其他请求。
作用:
- 提高系统的响应速度。
- 提高系统的吞吐量。
- 实现系统的解耦。
常见的异步处理:
- 异步调用。
- 异步任务。
- 异步消息。
✅ 结论:异步处理是指系统在处理请求时,不需要等待请求完成就可以继续处理其他请求,作用是提高系统的响应速度、提高系统的吞吐量、实现系统的解耦。
36.3 spring boot与消息队列的集成
spring boot与消息队列的集成是java开发中的重要内容。
36.3.1 集成rabbitmq的步骤
定义:集成rabbitmq的步骤是指使用spring boot与rabbitmq集成的方法。
步骤:
- 创建spring boot项目。
- 添加所需的依赖。
- 配置rabbitmq。
- 创建消息发送类。
- 创建消息接收类。
- 创建控制器类。
- 测试应用。
示例:
pom.xml文件中的依赖:
<dependencies>
<!-- web依赖 -->
<dependency>
<groupid>org.springframework.boot</groupid>
<artifactid>spring-boot-starter-web</artifactid>
</dependency>
<!-- 消息队列依赖 -->
<dependency>
<groupid>org.springframework.boot</groupid>
<artifactid>spring-boot-starter-amqp</artifactid>
</dependency>
<!-- 数据验证依赖 -->
<dependency>
<groupid>org.springframework.boot</groupid>
<artifactid>spring-boot-starter-validation</artifactid>
</dependency>
<!-- 测试依赖 -->
<dependency>
<groupid>org.springframework.boot</groupid>
<artifactid>spring-boot-starter-test</artifactid>
<scope>test</scope>
</dependency>
</dependencies>application.properties文件中的配置:
# 服务器端口 server.port=8080 # rabbitmq配置 spring.rabbitmq.host=localhost spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest spring.rabbitmq.virtual-host=/
消息发送类:
import org.springframework.amqp.rabbit.core.rabbittemplate;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.stereotype.component;
@component
public class productmessagesender {
@autowired
private rabbittemplate rabbittemplate;
public void sendproductmessage(string message) {
rabbittemplate.convertandsend("product-exchange", "product-routing-key", message);
system.out.println("发送产品消息:" + message);
}
}消息接收类:
import org.springframework.amqp.rabbit.annotation.rabbitlistener;
import org.springframework.stereotype.component;
@component
public class productmessagereceiver {
@rabbitlistener(queues = "product-queue")
public void receiveproductmessage(string message) {
system.out.println("接收产品消息:" + message);
// 处理消息
processproductmessage(message);
}
private void processproductmessage(string message) {
// 模拟处理消息的耗时操作
try {
thread.sleep(1000);
} catch (interruptedexception e) {
e.printstacktrace();
}
system.out.println("处理产品消息完成:" + message);
}
}配置类:
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.bean;
import org.springframework.context.annotation.configuration;
@configuration
public class rabbitmqconfig {
@bean
public queue productqueue() {
return new queue("product-queue", true);
}
@bean
public directexchange productexchange() {
return new directexchange("product-exchange");
}
@bean
public binding productbinding(queue productqueue, directexchange productexchange) {
return bindingbuilder.bind(productqueue).to(productexchange).with("product-routing-key");
}
}控制器类:
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.validation.bindingresult;
import org.springframework.validation.fielderror;
import org.springframework.web.bind.annotation.*;
import javax.validation.valid;
import java.util.hashmap;
import java.util.list;
import java.util.map;
@restcontroller
@requestmapping("/api/products")
public class productcontroller {
@autowired
private productmessagesender productmessagesender;
@postmapping("/send")
public map<string, object> sendproductmessage(@requestparam string message) {
map<string, object> result = new hashmap<>();
productmessagesender.sendproductmessage(message);
result.put("success", true);
result.put("message", "产品消息发送成功");
return result;
}
}应用启动类:
import org.springframework.boot.springapplication;
import org.springframework.boot.autoconfigure.springbootapplication;
@springbootapplication
public class rabbitmqapplication {
public static void main(string[] args) {
springapplication.run(rabbitmqapplication.class, args);
}
}测试类:
import org.junit.jupiter.api.test;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.boot.test.context.springboottest;
import org.springframework.boot.test.web.client.testresttemplate;
import org.springframework.boot.web.server.localserverport;
import org.springframework.http.*;
import java.util.hashmap;
import java.util.map;
import static org.assertj.core.api.assertions.assertthat;
@springboottest(webenvironment = springboottest.webenvironment.random_port)
class rabbitmqapplicationtests {
@localserverport
private int port;
@autowired
private testresttemplate resttemplate;
@test
void contextloads() {
}
@test
void testsendproductmessage() {
responseentity<map> response = resttemplate.getforentity("http://localhost:" + port + "/api/products/send?message=测试产品消息", map.class);
assertthat(response.getstatuscode()).isequalto(httpstatus.ok);
assertthat(response.getbody().get("success")).isequalto(true);
}
}✅ 结论:集成rabbitmq的步骤包括创建spring boot项目、添加所需的依赖、配置rabbitmq、创建消息发送类、创建消息接收类、创建控制器类、测试应用。
36.4 spring boot与异步处理的集成
spring boot与异步处理的集成是java开发中的重要内容。
36.4.1 集成spring boot异步处理的步骤
定义:集成spring boot异步处理的步骤是指使用spring boot与异步处理集成的方法。
步骤:
- 创建spring boot项目。
- 添加所需的依赖。
- 配置异步处理。
- 创建异步任务类。
- 创建控制器类。
- 测试应用。
示例:
pom.xml文件中的依赖:
<dependencies>
<!-- web依赖 -->
<dependency>
<groupid>org.springframework.boot</groupid>
<artifactid>spring-boot-starter-web</artifactid>
</dependency>
<!-- 异步处理依赖 -->
<dependency>
<groupid>org.springframework.boot</groupid>
<artifactid>spring-boot-starter-task</artifactid>
</dependency>
<!-- 数据验证依赖 -->
<dependency>
<groupid>org.springframework.boot</groupid>
<artifactid>spring-boot-starter-validation</artifactid>
</dependency>
<!-- 测试依赖 -->
<dependency>
<groupid>org.springframework.boot</groupid>
<artifactid>spring-boot-starter-test</artifactid>
<scope>test</scope>
</dependency>
</dependencies>application.properties文件中的配置:
# 服务器端口 server.port=8080 # 异步处理配置 spring.task.execution.pool.core-size=5 spring.task.execution.pool.max-size=10 spring.task.execution.pool.queue-capacity=100
异步任务类:
import org.springframework.scheduling.annotation.async;
import org.springframework.stereotype.component;
@component
public class productasynctask {
@async
public void processproduct(string productid) {
system.out.println("开始处理产品:" + productid);
// 模拟处理产品的耗时操作
try {
thread.sleep(2000);
} catch (interruptedexception e) {
e.printstacktrace();
}
system.out.println("产品处理完成:" + productid);
}
}控制器类:
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.validation.bindingresult;
import org.springframework.validation.fielderror;
import org.springframework.web.bind.annotation.*;
import javax.validation.valid;
import java.util.hashmap;
import java.util.list;
import java.util.map;
@restcontroller
@requestmapping("/api/products")
public class productcontroller {
@autowired
private productasynctask productasynctask;
@postmapping("/process")
public map<string, object> processproduct(@requestparam string productid) {
map<string, object> result = new hashmap<>();
productasynctask.processproduct(productid);
result.put("success", true);
result.put("message", "产品处理任务已提交");
return result;
}
}应用启动类:
import org.springframework.boot.springapplication;
import org.springframework.boot.autoconfigure.springbootapplication;
import org.springframework.scheduling.annotation.enableasync;
@springbootapplication
@enableasync
public class asyncapplication {
public static void main(string[] args) {
springapplication.run(asyncapplication.class, args);
}
}测试类:
import org.junit.jupiter.api.test;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.boot.test.context.springboottest;
import org.springframework.boot.test.web.client.testresttemplate;
import org.springframework.boot.web.server.localserverport;
import org.springframework.http.*;
import java.util.hashmap;
import java.util.map;
import static org.assertj.core.api.assertions.assertthat;
@springboottest(webenvironment = springboottest.webenvironment.random_port)
class asyncapplicationtests {
@localserverport
private int port;
@autowired
private testresttemplate resttemplate;
@test
void contextloads() {
}
@test
void testprocessproduct() {
responseentity<map> response = resttemplate.getforentity("http://localhost:" + port + "/api/products/process?productid=p001", map.class);
assertthat(response.getstatuscode()).isequalto(httpstatus.ok);
assertthat(response.getbody().get("success")).isequalto(true);
}
}✅ 结论:集成spring boot异步处理的步骤包括创建spring boot项目、添加所需的依赖、配置异步处理、创建异步任务类、创建控制器类、测试应用。
36.5 spring boot的实际应用场景
在实际开发中,spring boot消息队列与异步处理的应用场景非常广泛,如:
- 实现订单创建的异步处理。
- 实现邮件发送的异步处理。
- 实现数据同步的异步处理。
- 实现消息通知的异步处理。
示例:
import org.springframework.amqp.rabbit.core.rabbittemplate;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.stereotype.component;
@component
class productmessagesender {
@autowired
private rabbittemplate rabbittemplate;
public void sendproductmessage(string message) {
rabbittemplate.convertandsend("product-exchange", "product-routing-key", message);
system.out.println("发送产品消息:" + message);
}
}
import org.springframework.amqp.rabbit.annotation.rabbitlistener;
import org.springframework.stereotype.component;
@component
class productmessagereceiver {
@rabbitlistener(queues = "product-queue")
public void receiveproductmessage(string message) {
system.out.println("接收产品消息:" + message);
processproductmessage(message);
}
private void processproductmessage(string message) {
try {
thread.sleep(1000);
} catch (interruptedexception e) {
e.printstacktrace();
}
system.out.println("处理产品消息完成:" + message);
}
}
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.bean;
import org.springframework.context.annotation.configuration;
@configuration
class rabbitmqconfig {
@bean
public queue productqueue() {
return new queue("product-queue", true);
}
@bean
public directexchange productexchange() {
return new directexchange("product-exchange");
}
@bean
public binding productbinding(queue productqueue, directexchange productexchange) {
return bindingbuilder.bind(productqueue).to(productexchange).with("product-routing-key");
}
}
import org.springframework.scheduling.annotation.async;
import org.springframework.stereotype.component;
@component
class productasynctask {
@async
public void processproduct(string productid) {
system.out.println("开始处理产品:" + productid);
try {
thread.sleep(2000);
} catch (interruptedexception e) {
e.printstacktrace();
}
system.out.println("产品处理完成:" + productid);
}
}
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.validation.bindingresult;
import org.springframework.validation.fielderror;
import org.springframework.web.bind.annotation.*;
import javax.validation.valid;
import java.util.hashmap;
import java.util.list;
import java.util.map;
@restcontroller
@requestmapping("/api/products")
class productcontroller {
@autowired
private productmessagesender productmessagesender;
@autowired
private productasynctask productasynctask;
@postmapping("/send")
public map<string, object> sendproductmessage(@requestparam string message) {
map<string, object> result = new hashmap<>();
productmessagesender.sendproductmessage(message);
result.put("success", true);
result.put("message", "产品消息发送成功");
return result;
}
@postmapping("/process")
public map<string, object> processproduct(@requestparam string productid) {
map<string, object> result = new hashmap<>();
productasynctask.processproduct(productid);
result.put("success", true);
result.put("message", "产品处理任务已提交");
return result;
}
}
@springbootapplication
@enableasync
public class asyncandrabbitmqapplication {
public static void main(string[] args) {
springapplication.run(asyncandrabbitmqapplication.class, args);
}
}
// 测试类
@springboottest(webenvironment = springboottest.webenvironment.random_port)
class asyncandrabbitmqapplicationtests {
@localserverport
private int port;
@autowired
private testresttemplate resttemplate;
@test
void contextloads() {
}
@test
void testsendproductmessage() {
responseentity<map> response = resttemplate.getforentity("http://localhost:" + port + "/api/products/send?message=测试产品消息", map.class);
assertthat(response.getstatuscode()).isequalto(httpstatus.ok);
assertthat(response.getbody().get("success")).isequalto(true);
}
@test
void testprocessproduct() {
responseentity<map> response = resttemplate.getforentity("http://localhost:" + port + "/api/products/process?productid=p001", map.class);
assertthat(response.getstatuscode()).isequalto(httpstatus.ok);
assertthat(response.getbody().get("success")).isequalto(true);
}
}输出结果:
- 发送产品消息:测试产品消息。
- 接收产品消息:测试产品消息。
- 处理产品消息完成:测试产品消息。
- 开始处理产品:p001。
- 产品处理完成:p001。
✅ 结论:在实际开发中,spring boot消息队列与异步处理的应用场景非常广泛,需要根据实际问题选择合适的消息队列和异步处理方法。
总结
本章我们学习了spring boot消息队列与异步处理,包括消息队列的定义与特点、异步处理的定义与特点、spring boot与消息队列的集成、spring boot与异步处理的集成、spring boot的实际应用场景,学会了在实际开发中处理消息队列与异步处理问题。其中,消息队列的定义与特点、异步处理的定义与特点、spring boot与消息队列的集成、spring boot与异步处理的集成、spring boot的实际应用场景是本章的重点内容。从下一章开始,我们将学习spring boot的其他组件、微服务等内容。
到此这篇关于spring boot 消息队列与异步处理的文章就介绍到这了,更多相关spring boot 消息队列内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!
发表评论