前言
上篇内容介绍了rocketmq的基础知识和单机启动方式,同时也也简单介绍了监控仪表盘rocketmq dashboard,这次主要介绍java中如何接入rocketmq的。由于springboot提供的starter集成度太高,屏蔽了大量细节,不建议上来直接学习,我们先通过rocketmq-client整合,了解基本使用方法,后期再学习springboot的整合。
rocketmq整合
1.首先先导入rocketmq-client包
<dependency>
<groupid>org.apache.rocketmq</groupid>
<artifactid>rocketmq-client</artifactid>
<version>4.9.4</version>
</dependency>
2.编写生产者代码
import com.alibaba.fastjson.jsonobject;
import org.apache.rocketmq.client.producer.defaultmqproducer;
import org.apache.rocketmq.client.producer.sendresult;
import org.apache.rocketmq.common.message.message;
public class producer {
public static void producer() {
//创建defaultmqproducer消息生产者对象
defaultmqproducer producer = new defaultmqproducer("testproducergroup");
//设置nameserver 多个节点间用分号分割
producer.setnamesrvaddr("192.168.2.5:9876");
try {
//与nameserver建立长连接
producer.start();
//发送一百条数据
for (int i = 1; i <= 100; i++) {
//1s中发送一次
thread.sleep(1000);
jsonobject json = new jsonobject();
json.put("orderid",i+1);
json.put("desc","这是第"+i+1+"个订单");
//数据正文
string data = json.tojsonstring();
/*创建消息
message消息三个参数
topic 代表消息主题,自定义自定义topicorder代表订单主题代表订单主题
tags 代表标志,用于消费者接收数据时进行数据筛选。pay_tag代表支付相关信息
body 代表消息内容
*/
message message = new message("topicorder", "pay_tag", data.getbytes());
//发送消息,获取发送结果
sendresult result = producer.send(message);
//将发送结果对象打印在控制台
system.out.println("消息已发送:msgid:" + result.getmsgid() + ",发送状态:"
+ result.getsendstatus());
}
}catch (exception e){
e.printstacktrace();
}finally {
try {
producer.shutdown();
} catch (exception e) {
}
}
}
public static void main(string[] args) {
producer.producer();;
}
}
执行之后,可以看到如下信息
3.编写消费者代码
mq的消费模式可以大致分为两种,一种是推push,一种是拉pull。
-
push是服务端主动推送消息给客户端,优点是及时性较好,但如果客户端没有做好流控,一旦服务端推送大量消息到客户端时,就会导致客户端消息堆积甚至崩溃。
-
pull是客户端需要主动到服务端取数据,优点是客户端可以依据自己的消费能力进行消费,但拉取的频率也需要用户自己控制,拉取频繁容易造成服务端和客户端的压力,拉取间隔长又容易造成消费不及时。
apache rocketmq既提供了push模式也提供了pull模式。
(1)push推送模式
import org.apache.rocketmq.client.consumer.defaultmqpushconsumer;
import org.apache.rocketmq.client.consumer.listener.consumeconcurrentlycontext;
import org.apache.rocketmq.client.consumer.listener.consumeconcurrentlystatus;
import org.apache.rocketmq.client.consumer.listener.messagelistenerconcurrently;
import org.apache.rocketmq.common.message.messageext;
import java.util.list;
public class pushconsumer {
public static void consumer() {
//创建消费者对象
defaultmqpushconsumer consumer = new defaultmqpushconsumer("testconsumergroup");
try {
//设置nameserver节点
consumer.setnamesrvaddr("192.168.2.5:9876");
/*订阅主题,
consumer.subscribe包含两个参数:
topic: 说明消费者从broker订阅哪一个主题,这一项要与provider保持一致。
subexpression: 子表达式用于筛选tags。
同一个主题下可以包含很多不同的tags,subexpression用于筛选符合条件的tags进行接收。
例如:设置为*,则代表接收所有tags数据。
例如:设置为pay_tag,则broker中只有tags=pay_tag的消息会被接收,而其他的就会被排除在外。
*/
consumer.subscribe("topicorder", "*");
//创建监听,当有新的消息监听程序会及时捕捉并加以处理。
consumer.registermessagelistener(new messagelistenerconcurrently() {
public consumeconcurrentlystatus consumemessage(
list<messageext> msgs, consumeconcurrentlycontext context) {
//批量数据处理
for (messageext msg : msgs) {
system.out.println("消费者获取数据:" + msg.getmsgid() + "==>" + new
string(msg.getbody()));
}
//返回数据已接收标识
return consumeconcurrentlystatus.consume_success;
}
});
//启动消费者,与broker建立长连接,开始监听。
consumer.start();
} catch (exception e) {
e.printstacktrace();
}
}
public static void main(string[] args) {
pushconsumer.consumer();
}
}
(2)pull拉取模式
import org.apache.rocketmq.client.consumer.defaultlitepullconsumer;
import org.apache.rocketmq.common.message.messageext;
import java.util.list;
public class pullconsumer {
public static volatile boolean running = true;
public static void consumer() {
//创建pull消费者对象
defaultlitepullconsumer litepullconsumer = new defaultlitepullconsumer("testpullconsumergroup");
//设置nameserver节点
litepullconsumer.setnamesrvaddr("192.168.2.5:9876");
try {
//订阅主题,与push相同
litepullconsumer.subscribe("topicorder", "*");
//每次拉取数据条目数
litepullconsumer.setpullbatchsize(10);
//启动消费者
litepullconsumer.start();
while (running) {
list<messageext> messageexts = litepullconsumer.poll();
//批量数据处理
for (messageext msg : messageexts) {
system.out.println("消费者获取数据:" + msg.getmsgid() + "==>" + new
string(msg.getbody()));
}
}
}catch (exception e){
e.printstacktrace();
}finally {
litepullconsumer.shutdown();
}
}
public static void main(string[] args) {
pullconsumer.consumer();
}
}
两种方式大家都可以试一下,结果如下:
总结
这篇主要介绍的是rocketmq最基础的使用方式,目的在于让大家能更好的理解运行的方式,后面的章节,我们会慢慢的深入学习。
接下来将详细的介绍生产者的相关知识
关注我,不迷路
发表评论