当前位置: 代码网 > it编程>编程语言>Java > RocketMQ入坑指南(二):JAVA接入RocketMQ

RocketMQ入坑指南(二):JAVA接入RocketMQ

2024年07月28日 Java 我要评论
RocketMQ入坑指南(二):JAVA接入RocketMQ

前言

上篇内容介绍了rocketmq的基础知识和单机启动方式,同时也也简单介绍了监控仪表盘rocketmq dashboard,这次主要介绍java中如何接入rocketmq的。由于springboot提供的starter集成度太高,屏蔽了大量细节,不建议上来直接学习,我们先通过rocketmq-client整合,了解基本使用方法,后期再学习springboot的整合。

rocketmq整合

1.首先先导入rocketmq-client包

<dependency>
    <groupid>org.apache.rocketmq</groupid>
    <artifactid>rocketmq-client</artifactid>
    <version>4.9.4</version>
</dependency>

2.编写生产者代码

import com.alibaba.fastjson.jsonobject;
import org.apache.rocketmq.client.producer.defaultmqproducer;
import org.apache.rocketmq.client.producer.sendresult;
import org.apache.rocketmq.common.message.message;

public class producer {

    public static void producer() {
        //创建defaultmqproducer消息生产者对象
        defaultmqproducer producer = new defaultmqproducer("testproducergroup");
        //设置nameserver 多个节点间用分号分割
        producer.setnamesrvaddr("192.168.2.5:9876");
        try {
            //与nameserver建立长连接
            producer.start();
            //发送一百条数据
            for (int i = 1; i <= 100; i++) {
                //1s中发送一次
                thread.sleep(1000);
                jsonobject json = new jsonobject();
                json.put("orderid",i+1);
                json.put("desc","这是第"+i+1+"个订单");
                //数据正文
                string data = json.tojsonstring();
                /*创建消息
                message消息三个参数
                topic 代表消息主题,自定义自定义topicorder代表订单主题代表订单主题
                tags 代表标志,用于消费者接收数据时进行数据筛选。pay_tag代表支付相关信息
                body 代表消息内容
                */
                message message = new message("topicorder", "pay_tag", data.getbytes());
                //发送消息,获取发送结果
                sendresult result = producer.send(message);
                //将发送结果对象打印在控制台
                system.out.println("消息已发送:msgid:" + result.getmsgid() + ",发送状态:"
                        + result.getsendstatus());
            }
        }catch (exception e){
            e.printstacktrace();
        }finally {
            try {
                producer.shutdown();
            } catch (exception e) {
            }
        }
    }

    public static void main(string[] args) {
        producer.producer();;
    }
}

执行之后,可以看到如下信息

3.编写消费者代码

mq的消费模式可以大致分为两种,一种是推push,一种是拉pull。

  • push是服务端主动推送消息给客户端,优点是及时性较好,但如果客户端没有做好流控,一旦服务端推送大量消息到客户端时,就会导致客户端消息堆积甚至崩溃。

  • pull是客户端需要主动到服务端取数据,优点是客户端可以依据自己的消费能力进行消费,但拉取的频率也需要用户自己控制,拉取频繁容易造成服务端和客户端的压力,拉取间隔长又容易造成消费不及时。

apache rocketmq既提供了push模式也提供了pull模式。

(1)push推送模式

import org.apache.rocketmq.client.consumer.defaultmqpushconsumer;
import org.apache.rocketmq.client.consumer.listener.consumeconcurrentlycontext;
import org.apache.rocketmq.client.consumer.listener.consumeconcurrentlystatus;
import org.apache.rocketmq.client.consumer.listener.messagelistenerconcurrently;
import org.apache.rocketmq.common.message.messageext;

import java.util.list;

public class pushconsumer {

    public static void consumer() {
        //创建消费者对象
        defaultmqpushconsumer consumer = new defaultmqpushconsumer("testconsumergroup");
        try {
            //设置nameserver节点
            consumer.setnamesrvaddr("192.168.2.5:9876");
            /*订阅主题,
            consumer.subscribe包含两个参数:
            topic: 说明消费者从broker订阅哪一个主题,这一项要与provider保持一致。
            subexpression: 子表达式用于筛选tags。
            同一个主题下可以包含很多不同的tags,subexpression用于筛选符合条件的tags进行接收。
            例如:设置为*,则代表接收所有tags数据。
            例如:设置为pay_tag,则broker中只有tags=pay_tag的消息会被接收,而其他的就会被排除在外。
            */
            consumer.subscribe("topicorder", "*");
            //创建监听,当有新的消息监听程序会及时捕捉并加以处理。
            consumer.registermessagelistener(new messagelistenerconcurrently() {
                public consumeconcurrentlystatus consumemessage(
                        list<messageext> msgs, consumeconcurrentlycontext context) {
                    //批量数据处理
                    for (messageext msg : msgs) {
                        system.out.println("消费者获取数据:" + msg.getmsgid() + "==>" + new
                                string(msg.getbody()));
                    }
                    //返回数据已接收标识
                    return consumeconcurrentlystatus.consume_success;
                }
            });
            //启动消费者,与broker建立长连接,开始监听。
            consumer.start();
        } catch (exception e) {
            e.printstacktrace();
        }
    }

    public static void main(string[] args) {
        pushconsumer.consumer();
    }
}

(2)pull拉取模式

import org.apache.rocketmq.client.consumer.defaultlitepullconsumer;
import org.apache.rocketmq.common.message.messageext;

import java.util.list;

public class pullconsumer {
    public static volatile boolean running = true;
    public static void consumer() {
        //创建pull消费者对象
        defaultlitepullconsumer litepullconsumer = new defaultlitepullconsumer("testpullconsumergroup");
        //设置nameserver节点
        litepullconsumer.setnamesrvaddr("192.168.2.5:9876");
        try {
            //订阅主题,与push相同
            litepullconsumer.subscribe("topicorder", "*");
            //每次拉取数据条目数
            litepullconsumer.setpullbatchsize(10);
            //启动消费者
            litepullconsumer.start();
            while (running) {
                list<messageext> messageexts = litepullconsumer.poll();
                //批量数据处理
                for (messageext msg : messageexts) {
                    system.out.println("消费者获取数据:" + msg.getmsgid() + "==>" + new
                            string(msg.getbody()));
                }
            }
        }catch (exception e){
            e.printstacktrace();
        }finally {
            litepullconsumer.shutdown();
        }
    }

    public static void main(string[] args) {
        pullconsumer.consumer();
    }
}

两种方式大家都可以试一下,结果如下:

总结

这篇主要介绍的是rocketmq最基础的使用方式,目的在于让大家能更好的理解运行的方式,后面的章节,我们会慢慢的深入学习。

接下来将详细的介绍生产者的相关知识

关注我,不迷路

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2025  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com