当前位置: 代码网 > it编程>编程语言>Java > Java连接Emqx实现订阅发布消息的步骤记录

Java连接Emqx实现订阅发布消息的步骤记录

2025年09月22日 Java 我要评论
一:前提安装了emqx开源版、mqttx客户端二:订阅发布实现步骤1.引入依赖<!--mqtt客户端--><dependency> <groupid>org.

一:前提

安装了emqx开源版、mqttx客户端

二:订阅发布实现步骤

1.引入依赖

<!--mqtt客户端-->
<dependency>
    <groupid>org.eclipse.paho</groupid>
    <artifactid>org.eclipse.paho.client.mqttv3</artifactid>
    <version>1.2.2</version>
</dependency>

2.编辑配置文件

mqtt:
  broker:
    uri: tcp://127.0.0.1:31883
  client:
    id: mqtt-am-client-${random.uuid}
  # 订阅主题配置(支持多个)
  intopics:
    - topic: test/topic1
      qos: 0
    - topic: test/topic2
      qos: 1
    - topic: test/topic3
      qos: 2
  # 发布主题配置(支持多个)
  outtopics:
    - topic: out/topic1
      qos: 0
  username: am
  password: lgyptuab4th5p
  keepaliveinterval: 60

3.读取配置文件

package com.wtzn.web.config;

import lombok.data;
import org.springframework.boot.context.properties.configurationproperties;
import org.springframework.context.annotation.configuration;

import java.util.list;

@configuration
@configurationproperties(prefix = "mqtt")
@data
public class mqttproperties {
    private broker broker;
    private client client;
    private list<topicconfig> intopics;
    private list<topicconfig> outtopics;
    private string username;
    private string password;
    private int keepaliveinterval;

    @data
    public static class broker {
        private string uri;
    }

    @data
    public static class client {
        private string id;
    }
    @data
    public static class topicconfig {
        private string topic;
        private int qos;
    }

}

4.创建mqtt客户端

package com.wtzn.web.config;

import org.eclipse.paho.client.mqttv3.mqttclient;
import org.eclipse.paho.client.mqttv3.mqttconnectoptions;
import org.eclipse.paho.client.mqttv3.mqttexception;
import org.eclipse.paho.client.mqttv3.persist.memorypersistence;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.context.annotation.bean;
import org.springframework.context.annotation.configuration;

@configuration
public class mqttconfig {

    @autowired
    private mqttproperties mqttproperties;

    @bean
    public mqttclient mqttclient() throws mqttexception {
        mqttclient client = new mqttclient(mqttproperties.getbroker().geturi(), mqttproperties.getclient().getid(), new memorypersistence());
        mqttconnectoptions options = new mqttconnectoptions();
        // 此客户端的用户名和密码
        options.setusername(mqttproperties.getusername());
        options.setpassword(mqttproperties.getpassword().tochararray());
        options.setcleansession(true);
        // 设置遗嘱消息
      //  options.setwill(mqttproperties.getouttopic(), "我是mqtt-am-client,我已下线,这是我的遗嘱".getbytes(), 2, true);
        // 连接超时重试
        options.setconnectiontimeout(5000); //毫秒
        options.setkeepaliveinterval(mqttproperties.getkeepaliveinterval());
        options.setautomaticreconnect(true);//网络中断重连
        client.connect(options);
        return client;
    }
}

5.controller层

package com.wtzn.web.controller;

import cn.dev33.satoken.annotation.saignore;
import com.wtzn.common.json.utils.jsonutils;
import com.wtzn.web.domain.bo.payload;
import com.wtzn.web.service.mqttservice;
import lombok.extern.slf4j.slf4j;
import org.eclipse.paho.client.mqttv3.mqttexception;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.web.bind.annotation.*;

import java.util.linkedlist;


@restcontroller
@slf4j
@requestmapping("/mqtt")
public class mqttcontroller {

    @autowired
    private mqttservice mqttservice;

    @saignore
    @postmapping("/mqtt")
    public void publish() {
        try {
          //  linkedlist<payload> payloadlinkedlist=new linkedlist<>();
            for(int i=1; i<=10000; i++){
                payload payload=new payload();
                payload.settemperature(i);
              //  payloadlinkedlist.add(payload);
                mqttservice.publish("test/topic1",0,jsonutils.tojsonstring(payload));
            }

        } catch (mqttexception e) {
            log.error("发布消息失败{}", e.getmessage());
        }
        log.info("发布消息成功");
    }


}

6.service层

package com.wtzn.web.service;

import com.wtzn.common.json.utils.jsonutils;
import com.wtzn.web.config.mqttproperties;
import com.wtzn.web.domain.bo.payload;
import jakarta.annotation.postconstruct;
import jakarta.annotation.predestroy;
import lombok.sneakythrows;
import lombok.extern.slf4j.slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.stereotype.service;

import java.util.arrays;


@service
@slf4j
public class mqttservice implements mqttcallbackextended {

    @autowired
    private mqttclient mqttclient;

    @autowired
    private mqttproperties mqttproperties;
    
    @postconstruct
    public void init() throws mqttexception {
        mqttclient.setcallback(this);
 /*       mqttclient.subscribe(mqttproperties.getintopic());
        log.info("订阅主题{}", mqttproperties.getintopic());
*/
        mqttproperties.getintopics().foreach(x -> {
            try {
                mqttclient.subscribe(x.gettopic(), x.getqos());
                log.info("订阅主题{}", x.gettopic());
            } catch (mqttexception e) {
                throw new runtimeexception(e);
            }
        });

    }

    @predestroy
    public void destroy() throws mqttexception {
        mqttclient.disconnect();
        log.info("与服务器断开连接");
    }

    /**
     * @description: 发送消息
     * @param: [message]
     * @return: void
     **/
    public void publish(string topic,int qos,string message) throws mqttexception {
        mqttmessage mqttmessage = new mqttmessage(message.getbytes());
        mqttmessage.setqos(qos);
        mqttclient.publish(topic, mqttmessage);
        log.info("向主题【{}】发布消息:【{}】", topic, message);
    }


    /**
     * @description: 接收消息
     * @param: [topic, message]
     * @return: void
     **/
    @override
    public void messagearrived(string topic, mqttmessage message) throws mqttexception {
        payload payload = jsonutils.parseobject(new string(message.getpayload()), payload.class);
        log.info("接收到来自【{}】的消息【{}】", topic, payload.gettemperature());
      /*  if (payload.gettemperature() > 37) {
            publish("发烧");
        }*/


    }


    @override
    public void connectionlost(throwable cause) {
        log.error("连接丢失:{}", cause.getmessage());
    }

    @sneakythrows
    @override
    public void deliverycomplete(imqttdeliverytoken token) {
        if( token!=null ){
            mqttmessage message = null;
            try {
                message = token.getmessage();
            } catch (mqttexception e) {
                throw new runtimeexception(e);
            }
            string topic = token.gettopics()==null ? null : arrays.aslist(token.gettopics()).tostring();
            string str = message==null ? null : new string(message.getpayload());
            log.info("deliverycomplete: topic={}, message={}", topic, str);
        } else {
            log.info("deliverycomplete: null");
        }

        log.info("消息已送达");
    }

    @override
    public void connectcomplete(boolean b, string s) {

            mqttproperties.getintopics().foreach(x -> {
                try {
                    mqttclient.subscribe(x.gettopic(), x.getqos());
                    log.info("订阅主题{}", x.gettopic());
                } catch (mqttexception e) {
                    throw new runtimeexception(e);
                }
            });
    }
}

7.dao层

package com.wtzn.web.domain.bo;

import lombok.data;

@data
public class payload {
    private integer temperature;
}

三:测试

1.postman直接调用测试

2、下载mqttx客户端进行测试

总结 

到此这篇关于java连接emqx实现订阅发布消息的文章就介绍到这了,更多相关java emqx订阅发布消息内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2025  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com