当前位置: 代码网 > it编程>编程语言>Java > Springboot对接mqtt的项目实践

Springboot对接mqtt的项目实践

2026年02月10日 Java 我要评论
在spring boot中对接mqtt协议,可以使用eclipse paho客户端和spring integration mqtt模块。以下是详细实现步骤:1. 添加依赖<dependencie

在spring boot中对接mqtt协议,可以使用eclipse paho客户端和spring integration mqtt模块。以下是详细实现步骤:

1. 添加依赖

<dependencies>
    <!-- spring boot starter -->
    <dependency>
        <groupid>org.springframework.boot</groupid>
        <artifactid>spring-boot-starter</artifactid>
    </dependency>
    
    <!-- spring integration mqtt -->
    <dependency>
        <groupid>org.springframework.integration</groupid>
        <artifactid>spring-integration-mqtt</artifactid>
    </dependency>
    
    <!-- eclipse paho mqtt client -->
    <dependency>
        <groupid>org.eclipse.paho</groupid>
        <artifactid>org.eclipse.paho.client.mqttv3</artifactid>
        <version>1.2.5</version>
    </dependency>
</dependencies>

2. 配置mqtt连接参数

# application.yml
mqtt:
  broker-url: tcp://localhost:1883
  username: admin
  password: password
  client-id: spring-boot-client
  default-topic: test/topic
  timeout: 30
  keepalive: 60
  completion-timeout: 30000

3. mqtt配置类

@configuration
@enableconfigurationproperties(mqttproperties.class)
public class mqttconfig {

    @autowired
    private mqttproperties mqttproperties;

    // mqtt连接配置
    @bean
    public mqttconnectoptions mqttconnectoptions() {
        mqttconnectoptions options = new mqttconnectoptions();
        options.setserveruris(new string[]{mqttproperties.getbrokerurl()});
        options.setusername(mqttproperties.getusername());
        options.setpassword(mqttproperties.getpassword().tochararray());
        options.setconnectiontimeout(mqttproperties.gettimeout());
        options.setkeepaliveinterval(mqttproperties.getkeepalive());
        options.setautomaticreconnect(true);
        options.setcleansession(true);
        return options;
    }

    // mqtt客户端工厂
    @bean
    public mqttpahoclientfactory mqttclientfactory() {
        defaultmqttpahoclientfactory factory = new defaultmqttpahoclientfactory();
        factory.setconnectionoptions(mqttconnectoptions());
        return factory;
    }

    // 出站消息通道(用于发送消息)
    @bean
    @serviceactivator(inputchannel = "mqttoutboundchannel")
    public messagehandler mqttoutbound() {
        mqttpahomessagehandler messagehandler = 
            new mqttpahomessagehandler(mqttproperties.getclientid() + "-producer", mqttclientfactory());
        messagehandler.setasync(true);
        messagehandler.setdefaulttopic(mqttproperties.getdefaulttopic());
        return messagehandler;
    }

    // 出站通道
    @bean
    public messagechannel mqttoutboundchannel() {
        return new directchannel();
    }

    // 入站消息适配器(用于接收消息)
    @bean
    public messageproducer inbound() {
        mqttpahomessagedrivenchanneladapter adapter = 
            new mqttpahomessagedrivenchanneladapter(mqttproperties.getclientid() + "-consumer", 
                                                   mqttclientfactory(), mqttproperties.getdefaulttopic());
        adapter.setcompletiontimeout(mqttproperties.getcompletiontimeout());
        adapter.setconverter(new defaultpahomessageconverter());
        adapter.setqos(1);
        adapter.setoutputchannel(mqttinboundchannel());
        return adapter;
    }

    // 入站通道
    @bean
    public messagechannel mqttinboundchannel() {
        return new directchannel();
    }

    // 入站消息处理器
    @bean
    @serviceactivator(inputchannel = "mqttinboundchannel")
    public messagehandler handler() {
        return new messagehandler() {
            @override
            public void handlemessage(message<?> message) throws messagingexception {
                string topic = (string) message.getheaders().get("mqtt_receivedtopic");
                string payload = (string) message.getpayload();
                system.out.println("received message from topic: " + topic + ", payload: " + payload);
                // 处理接收到的消息
                processmessage(topic, payload);
            }
        };
    }
}

4. 配置属性类

@configurationproperties(prefix = "mqtt")
@component
@data
public class mqttproperties {
    private string brokerurl;
    private string username;
    private string password;
    private string clientid;
    private string defaulttopic;
    private int timeout;
    private int keepalive;
    private int completiontimeout;
}

5. mqtt服务类

@service
public class mqttservice {

    @autowired
    private messagechannel mqttoutboundchannel;

    // 发送消息到指定主题
    public void sendmessage(string topic, string message) {
        mqttoutboundchannel.send(messagebuilder.withpayload(message)
                .setheader("mqtt_topic", topic)
                .build());
    }

    // 发送消息到默认主题
    public void sendmessage(string message) {
        mqttoutboundchannel.send(messagebuilder.withpayload(message).build());
    }

    // 发送带qos的消息
    public void sendmessage(string topic, string message, int qos) {
        mqttoutboundchannel.send(messagebuilder.withpayload(message)
                .setheader("mqtt_topic", topic)
                .setheader("mqtt_qos", qos)
                .build());
    }
}

6. 消息处理器

@component
public class mqttmessageprocessor {

    private static final logger logger = loggerfactory.getlogger(mqttmessageprocessor.class);

    public void processmessage(string topic, string payload) {
        logger.info("processing mqtt message - topic: {}, payload: {}", topic, payload);
        
        // 根据不同的主题进行不同的处理
        switch (topic) {
            case "test/topic":
                handletesttopic(payload);
                break;
            case "sensor/data":
                handlesensordata(payload);
                break;
            default:
                handledefaultmessage(topic, payload);
        }
    }

    private void handletesttopic(string payload) {
        logger.info("处理测试主题消息: {}", payload);
        // 具体的业务逻辑
    }

    private void handlesensordata(string payload) {
        logger.info("处理传感器数据: {}", payload);
        try {
            // 解析json数据等操作
            // objectmapper mapper = new objectmapper();
            // sensordata data = mapper.readvalue(payload, sensordata.class);
        } catch (exception e) {
            logger.error("解析传感器数据失败", e);
        }
    }

    private void handledefaultmessage(string topic, string payload) {
        logger.info("处理默认消息 - topic: {}, payload: {}", topic, payload);
    }
}

7. 控制器示例

@restcontroller
@requestmapping("/mqtt")
public class mqttcontroller {

    @autowired
    private mqttservice mqttservice;

    @postmapping("/publish")
    public responseentity<string> publishmessage(@requestparam string topic, 
                                               @requestparam string message) {
        try {
            mqttservice.sendmessage(topic, message);
            return responseentity.ok("message published successfully");
        } catch (exception e) {
            return responseentity.status(500).body("failed to publish message: " + e.getmessage());
        }
    }

    @postmapping("/publish/default")
    public responseentity<string> publishtodefaulttopic(@requestparam string message) {
        try {
            mqttservice.sendmessage(message);
            return responseentity.ok("message published to default topic");
        } catch (exception e) {
            return responseentity.status(500).body("failed to publish message: " + e.getmessage());
        }
    }
}

8. 主应用类

@springbootapplication
@enableconfigurationproperties
public class mqttapplication {
    public static void main(string[] args) {
        springapplication.run(mqttapplication.class, args);
    }
}

9. 测试mqtt服务

可以使用mqtt.fx或其他mqtt客户端工具进行测试:

  1. 启动spring boot应用
  2. 使用mqtt客户端订阅主题 test/topic
  3. 调用api发送消息:
curl -x post "http://localhost:8080/mqtt/publish?topic=test/topic&message=hello mqtt"

主要特性

  • 自动重连: 配置了自动重连机制
  • qos支持: 支持不同的服务质量等级
  • 多主题订阅: 可以订阅多个主题
  • 异步处理: 消息发送支持异步模式
  • 配置灵活: 通过配置文件管理连接参数

这样你就实现了一个完整的spring boot mqtt集成方案,可以方便地进行消息的发布和订阅。

到此这篇关于springboot对接mqtt的项目实践的文章就介绍到这了,更多相关springboot对接mqtt内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2026  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com