文章目录
flutter 是 google 推出并开源的移动应用开发框架,主打跨平台、高保真、高性能。开发者可以通过 dart 语言开发 app,一套代码同时运行在 ios 和 android 平台。 flutter 提供了丰富的组件、接口,开发者可以快速地为 flutter 添加 native 扩展。同时 flutter 还使用 native 引擎渲染视图,这无疑能为用户提供良好的体验。
mqtt 是一种基于发布/订阅模式的 轻量级物联网消息传输协议 ,可在严重受限的硬件设备和低带宽、高延迟的网络上实现稳定传输。它凭借简单易实现、支持 qos、报文小等特点,占据了物联网协议的半壁江山。
本文主要介绍如何在 flutter 项目中使用 mqtt,实现客户端与 mqtt 服务器的连接、订阅、取消订阅、收发消息等功能。
一、项目初始化
1.新建项目
新建一个项目,可以参考以下链接:
2.安装依赖
添加依赖到 pubspec.yaml
文件中
dependencies:
mqtt_client: ^7.2.1
安装依赖:
flutter pub get
导入:
import 'package:mqtt_client/mqtt_client.dart';
结果:
name: flutter_mqtt
description: a new flutter application.
publish_to: 'none'
version: 1.0.0+1
environment:
sdk: ">=3.2.3 <4.0.0"
dependencies:
flutter:
sdk: flutter
cupertino_icons: ^1.0.6
mqtt_client: ^10.2.0
二、mqtt 的使用
连接 mqtt 服务器使用 emqx 提供的 免费公共 mqtt 服务器,该服务基于 emqx 的 mqtt 物联网云平台 创建。服务器接入信息如下:
- broker: broker.emqx.io
- tcp port: 1883
- websocket port: 8083
import 'package:mqtt_client/mqtt_client.dart';
import 'package:mqtt_client/mqtt_server_client.dart';
import 'package:flutter/services.dart';
import 'dart:io';
future<mqttclient> connect() async {
mqttserverclient client =
mqttserverclient.withport('broker.emqx.io', 'flutter_client', 1883);
client.logging(on: true);
client.onconnected = onconnected;
client.ondisconnected = ondisconnected;
client.onunsubscribed = onunsubscribed;
client.onsubscribed = onsubscribed;
client.onsubscribefail = onsubscribefail;
client.pongcallback = pong;
// 设置心跳保持时间为 60 秒,替换mqttconnectmessage().keepalivefor(60)
client.keepaliveperiod = 60;
final connmess = mqttconnectmessage()
.withclientidentifier("flutter_client")
.authenticateas("test", "test")
.withwilltopic('willtopic')
.withwillmessage('my will message')
.startclean()
.withwillqos(mqttqos.atleastonce);
client.connectionmessage = connmess;
// 如果您想使用tls连接,请使用以下代码
// security context
// list<int> cabytes = await loadbytes('assets/certs/server-ca.crt');
// list<int> clientcertbytes = await loadbytes('assets/certs/client.crt');
// list<int> clientkeybytes = await loadbytes('assets/certs/client.key');
// securitycontext context = new securitycontext()
// ..settrustedcertificatesbytes(cabytes)
// ..useprivatekeybytes(clientkeybytes)
// ..usecertificatechainbytes(clientcertbytes);
// client.secure = true;
// client.securitycontext = context;
try {
print('connecting');
await client.connect();
} catch (e) {
print('exception: $e');
client.disconnect();
}
if (client.connectionstatus.state == mqttconnectionstate.connected) {
// 属性'state'不能被无条件访问,因为接收方可以是'null'。
print('emqx client connected');
client.updates.listen((list<mqttreceivedmessage<mqttmessage>> c) {
final mqttpublishmessage message = c[0].payload;
final payload =
mqttpublishpayload.bytestostringasstring(message.payload.message);
print('接收消息:$payload from topic: ${c[0].topic}>');
});
client.published.listen((mqttpublishmessage message) {
print('published');
final payload =
mqttpublishpayload.bytestostringasstring(message.payload.message);
print(
'published message: $payload to topic: ${message.variableheader.topicname}');
});
} else {
print(
'emqx client connection failed - disconnecting, status is ${client.connectionstatus}');
client.disconnect();
exit(-1);
}
return client;
}
future<list<int>> loadbytes(string assetpath) async {
bytedata data = await rootbundle.load(assetpath);
list<int> bytes = data.buffer.asuint8list();
return bytes;
}
// 连接成功
void onconnected() {
print('connected');
}
// 连接断开
void ondisconnected() {
print('disconnected');
}
// 订阅主题成功
void onsubscribed(string topic) {
print('subscribed topic: $topic');
}
// 订阅主题失败
void onsubscribefail(string topic) {
print('failed to subscribe topic: $topic');
}
// 成功取消订阅 topic可空
void onunsubscribed(string? topic) {
print('unsubscribed topic: $topic');
}
// 收到 ping 响应
void pong() {
print('ping response client callback invoked');
}
1.建立异步连接
future<mqttclient> connect() async {}
future<mqttclient> connect() async {}
是一个 dart 中的异步函数(async function),用于连接到 mqtt 代理(broker)并返回一个 future
对象,该对象最终会产生一个 mqttclient
实例。
future<mqttclient>
: 这表示该函数返回一个future
对象,该对象的结果类型为mqttclient
。在 dart 中,future
表示一个可能会在未来完成的操作,并且可以用于处理异步任务的结果或错误。async
关键字: 这个关键字用于定义一个异步函数,即函数内部可能会包含await
表达式来等待其他异步操作的完成。connect()
函数名: 这是异步函数的名称,表示该函数用于执行连接操作。
在一个异步函数中,可以使用 await
关键字来等待其他异步操作的完成,例如网络请求、文件读写等。可以使用 await
关键字或者 .then()
方法来处理连接成功后返回的 mqttclient
实例。
2.创建客户端
mqttserverclient client =
mqttserverclient.withport('broker.emqx.io', 'flutter_client', 1883);
client.logging(on: true);
client.onconnected = onconnected;
client.ondisconnected = ondisconnected;
client.onunsubscribed = onunsubscribed;
client.onsubscribed = onsubscribed;
client.onsubscribefail = onsubscribefail;
client.pongcallback = pong;
// 设置心跳保持时间为 60 秒,替换mqttconnectmessage().keepalivefor(60)
client.keepaliveperiod = 60;
上面的代码片段初始化了一个 mqtt 客户端(mqttserverclient
),并设置了日志记录、连接状态回调、订阅相关回调以及心跳保持时间等功能。
1)创建客户端对象
mqttserverclient client = mqttserverclient.withport('broker.emqx.io', 'flutter_client', 1883);
这行代码创建了一个 mqtt 客户端对象 client
,使用 withport
方法指定了连接的 mqtt 代理(broker)地址为 'broker.emqx.io'
,客户端标识符为 'flutter_client'
,连接端口为 1883
。
2)启用日志记录
client.logging(on: true);
使用 logging
方法开启了客户端的日志记录功能,通过传入 on: true
参数,表示启用日志记录。
3)设置连接状态
设置连接状态回调函数:
client.onconnected = onconnected;
client.ondisconnected = ondisconnected;
这里设置了客户端连接状态的回调函数,当客户端连接或断开连接时,会触发相应的回调函数 onconnected
和 ondisconnected
。您需要在代码中实现这些回调函数,并在需要时处理相应的逻辑。
4)设置订阅
设置订阅相关回调函数:
client.onunsubscribed = onunsubscribed;
client.onsubscribed = onsubscribed;
client.onsubscribefail = onsubscribefail;
这些回调函数用于处理订阅主题的相关事件,包括取消订阅成功 (onunsubscribed
)、订阅成功 (onsubscribed
) 和订阅失败 (onsubscribefail
)。您也需要在代码中实现这些回调函数,并根据实际情况处理相应的逻辑。
5)设置心跳保持
设置心跳保持时间:
client.keepaliveperiod = 60;
这行代码设置了客户端与服务器之间的心跳保持时间为 60 秒。这里使用了 keepaliveperiod
属性来设置心跳保持时间,代替了之前使用 mqttconnectmessage().keepalivefor(60)
的方式,因为新版本的 mqtt 客户端库提供了直接设置心跳间隔时间的属性。
.keepalivefor(60)
是设置 mqtt 连接消息中的心跳保持时间(keep alive)。 mqtt 中的心跳保持机制以及这个具体的参数含义:
-
心跳保持(keep alive): 在 mqtt 协议中,客户端和服务器之间通过心跳保持机制来维持连接的活跃性。客户端会定期向服务器发送心跳消息,服务器在接收到心跳消息后会确认连接仍然活跃。如果服务器长时间没有收到客户端的心跳消息,就会认为连接断开,并进行相应的处理(如关闭连接或触发重连)。
-
.keepalivefor(60)
: 这里的keepalivefor
方法用于设置客户端与服务器之间的心跳间隔时间,参数60
表示心跳间隔为 60 秒。这意味着客户端将每 60 秒向服务器发送一次心跳消息,以确保连接保持活跃。
在 mqtt 中,心跳保持机制的作用主要有两点:
- 确保连接的活跃性:通过定期发送心跳消息,可以防止连接因为长时间无活动而被服务器断开。
- 检测连接状态:服务器通过接收客户端的心跳消息来确认连接是否仍然可用,从而及时处理连接异常或断开的情况。
因此,使用 .keepalivefor(60)
方法可以设置客户端与服务器之间的心跳保持时间为 60 秒,以维持连接的稳定性和可靠性。您可以根据实际需求调整心跳间隔时间,但一般建议保持在合理的范围内,避免频繁发送心跳消息导致网络资源浪费或服务器负担过大。
这将被删除,你现在必须通过客户端keepaliveperiod设置它
在 mqtt 客户端库(例如 mqtt_client
)的源代码中提到 keepalivefor
方法要被移除,并且应该通过客户端的 keepaliveperiod
属性来设置。这意味着在将来的版本中,您应该使用客户端对象的 keepaliveperiod
属性来设置心跳保持时间,而不是调用 keepalivefor
方法。
-
keepalivefor
方法:这是旧版本 mqtt 客户端库提供的一种设置心跳保持时间的方法。通过调用keepalivefor
方法并传递心跳间隔时间(以秒为单位),来设置客户端与服务器之间的心跳保持时间。 -
keepaliveperiod
属性:新版本 mqtt 客户端库将采用属性的方式来设置心跳保持时间。您需要直接访问客户端对象的keepaliveperiod
属性,并将心跳间隔时间(以秒为单位)赋值给该属性来设置心跳保持时间。
下面是一个示例代码,展示了如何使用 keepaliveperiod
属性来设置心跳保持时间:
import 'package:mqtt_client/mqtt_client.dart';
void main() {
// 创建 mqtt 客户端
mqttclient client = mqttclient('test.mosquitto.org', '');
// 设置心跳保持时间为 60 秒
client.keepaliveperiod = 60;
// 连接到 mqtt 代理
client.connect();
}
在这个示例中,我们直接通过 client.keepaliveperiod = 60;
来设置心跳保持时间为 60 秒。这样就避免了使用即将被移除的 keepalivefor
方法。
3.设置连接消息
final connmess = mqttconnectmessage()
.withclientidentifier("flutter_client")
.authenticateas("test", "test")
.withwilltopic('willtopic')
.withwillmessage('my will message')
.startclean()
.withwillqos(mqttqos.atleastonce);
client.connectionmessage = connmess;
这段代码是用来创建一个 mqtt 连接消息(mqttconnectmessage
),然后将这个连接消息设置到 mqtt 客户端(mqttserverclient
)中的 connectionmessage
属性上。
1)创建连接消息
final connmess = mqttconnectmessage()
.withclientidentifier("flutter_client")
.authenticateas("test", "test")
.withwilltopic('willtopic')
.withwillmessage('my will message')
.startclean()
.withwillqos(mqttqos.atleastonce);
这段代码使用了链式调用的方式创建了一个 mqtt 连接消息对象 connmess
。
.withclientidentifier("flutter_client")
设置了客户端标识符为 “flutter_client”。.authenticateas("test", "test")
指定了用户名和密码为 “test”。.withwilltopic('willtopic')
设置了遗嘱消息(will message)的主题为 “willtopic”。.withwillmessage('my will message')
设置了遗嘱消息的内容为 “my will message”。.startclean()
表示客户端会话(session)将以干净会话的方式开始,即不保留任何之前的会话状态。.withwillqos(mqttqos.atleastonce)
设置了遗嘱消息的服务质量等级(qos)为至少一次传递。
2)部署连接消息
client.connectionmessage = connmess;
这行代码将刚刚创建的 mqtt 连接消息 connmess
设置到了 mqtt 客户端 client
的 connectionmessage
属性上。这样设置后,当客户端发起连接时,会使用这个连接消息进行连接操作,包括指定客户端标识符、认证信息、遗嘱消息等信息。
4.配置加密通信
// 如果您想使用tls连接,请使用以下代码
// security context
list<int> cabytes = await loadbytes('assets/certs/server-ca.crt');
list<int> clientcertbytes = await loadbytes('assets/certs/client.crt');
list<int> clientkeybytes = await loadbytes('assets/certs/client.key');
securitycontext context = new securitycontext()
..settrustedcertificatesbytes(cabytes)
..useprivatekeybytes(clientkeybytes)
..usecertificatechainbytes(clientcertbytes);
client.secure = true;
client.securitycontext = context;
这段代码片段展示了如何在 mqtt 客户端中配置 tls(transport layer security)连接以进行加密通信。tls 是一种安全协议,用于在客户端和服务器之间创建加密通道,保护数据的传输安全性和隐私性。
通过上述配置,您可以确保 mqtt 客户端在与 mqtt 代理建立连接时使用 tls 加密通道进行数据传输,从而提高数据的安全性和保密性。这对于需要在不安全网络环境下进行通信的应用程序非常重要,例如需要保护用户隐私信息或敏感数据的应用场景。
1)加载证书文件
list<int> cabytes = await loadbytes('assets/certs/server-ca.crt');
list<int> clientcertbytes = await loadbytes('assets/certs/client.crt');
list<int> clientkeybytes = await loadbytes('assets/certs/client.key');
这里使用 loadbytes
方法加载了 tls 连接所需的证书文件。server-ca.crt
是服务器端的 ca(certificate authority)证书,client.crt
是客户端的证书,client.key
是客户端的私钥。这些证书文件应该包含在您的应用程序的 assets 目录中。
2)创建安全上下文
securitycontext context = new securitycontext()
..settrustedcertificatesbytes(cabytes)
..useprivatekeybytes(clientkeybytes)
..usecertificatechainbytes(clientcertbytes);
使用加载的证书文件创建了一个安全上下文(securitycontext)对象 context
。在安全上下文中,设置了信任的证书、客户端的私钥和证书链。
3)配置安全属性
设置 mqtt 客户端的安全相关属性:
client.secure = true;
client.securitycontext = context;
将 mqtt 客户端的 secure
属性设置为 true
表示启用 tls 安全连接。然后将之前创建的安全上下文 context
设置到客户端的 securitycontext
属性中,以便客户端在连接时使用这个安全上下文进行加密通信。
5.连接 mqtt 代理
try {
print('connecting');
await client.connect();
} catch (e) {
print('exception: $e');
client.disconnect();
}
这段代码展示了在尝试连接 mqtt 代理时如何处理可能发生的异常情况。
-
try
块:try { print('connecting'); await client.connect(); }
在
try
块中尝试连接 mqtt 代理。首先输出一条提示信息 “connecting”,然后使用await
关键字等待连接操作的完成。使用await
关键字可以暂停当前异步函数的执行,直到client.connect()
方法执行完成并返回结果(成功连接或抛出异常)。 -
catch
块:catch (e) { print('exception: $e'); client.disconnect(); }
如果
try
块中的代码抛出了异常,那么程序会立即跳转到catch
块来处理异常。在这里,我们捕获到了异常e
,然后打印异常信息,并调用client.disconnect()
方法断开客户端连接。
总体来说,这段代码的目的是尝试连接 mqtt 代理,如果连接过程中发生了异常(例如网络问题、认证失败等),则在捕获到异常后立即执行断开连接的操作。这样可以确保在连接过程中出现问题时及时处理,并执行一些清理操作以保证程序的稳定性和可靠性。
6.连接状态检查
if (client.connectionstatus?.state == mqttconnectionstate.connected) {
// 属性'state'不能被无条件访问,因为接收方可以是'null'。
print('emqx client connected');
client.updates?.listen((list<mqttreceivedmessage<mqttmessage>> c) {
final mqttpublishmessage message = c[0].payload;
final payload =
mqttpublishpayload.bytestostringasstring(message.payload.message);
print('接收消息:$payload from topic: ${c[0].topic}>');
});
client.published?.listen((mqttpublishmessage message) {
print('published');
final payload =
mqttpublishpayload.bytestostringasstring(message.payload.message);
print(
'published message: $payload to topic: ${message.variableheader.topicname}');
});
} else {
print(
'emqx client connection failed - disconnecting, status is ${client.connectionstatus}');
client.disconnect();
exit(-1);
}
这段代码是一个连接状态检查的示例,它首先检查 mqtt 客户端 client
的连接状态,如果连接状态为已连接 (mqttconnectionstate.connected
),则执行一系列操作;如果连接状态不是已连接,则执行断开连接并退出程序的操作。
1)检查连接状态
if (client.connectionstatus?.state == mqttconnectionstate.connected) {
// 连接状态为已连接时执行以下操作
// ...
} else {
// 连接状态不是已连接时执行以下操作
// ...
}
这部分代码通过访问 client.connectionstatus?.state
属性来获取客户端的连接状态。如果连接状态为已连接,则执行 if
块内的代码;否则执行 else
块内的代码。
在 mqtt 客户端库中,client.connectionstatus.state
和 mqttconnectionstate.connected
都是用来表示 mqtt 客户端的连接状态的概念。
-
client.connectionstatus?.state:
- 这是指
client
对象的connectionstatus
属性的state
属性,用来获取当前 mqtt 客户端的连接状态。具connectionstatus
是一个对象,它包含了描述 mqtt 连接状态的信息,例如连接是否已建立、连接中断的原因等。而state
属性则表示连接状态的枚举值,可能包括已连接、连接中、已断开等不同状态。 - 如果使用
client
或connectionstatus
时没有初始化或者赋值为 null,那么在访问它们的属性或方法时会导致空指针异常(nullpointerexception)。为避免这种情况,使用安全的访问方式?.
使对象为 null 时返回 null 而不是抛出异常。但客户端的接收方不能为空,因此client?.
中的空感知操作符?
没有必要。所以使用client.connectionstatus?.state
来访问连接状态,它会按照以下方式处理:- 如果
connectionstatus
为 null,则表达式client.connectionstatus?.state
也会返回 null,并且不会尝试访问state
属性。 - 如果
connectionstatus
不为 null,那么表达式client.connectionstatus?.state
将会返回连接状态的实际值,例如mqttconnectionstate.connected
或其他枚举值。
- 如果
- 这是指
-
mqttconnectionstate.connected:
这是 mqtt 客户端库中预定义的一个枚举值,用来表示 mqtt 连接状态为已连接。在 dart 的 mqtt 客户端库中,mqttconnectionstate
是一个枚举类型,其中包括了不同的连接状态值,如connected
、connecting
、disconnected
等。
因此使用 client.connectionstatus?.state
来检查 mqtt 客户端的连接状态时,通常会将其与预定义的连接状态枚举值比较,例如 mqttconnectionstate.connected
,以确定客户端当前的连接状态是否为已连接状态。这样可以在代码中根据不同的连接状态执行相应的逻辑,如处理接收消息、发布消息等操作。
(1)已连接状态
print('emqx client connected');
client.updates?.listen((list<mqttreceivedmessage<mqttmessage>> c) {
// 监听接收消息事件
// ...
});
client.published?.listen((mqttpublishmessage message) {
// 监听发布消息事件
// ...
});
如果连接状态为已连接,会输出连接成功的消息,并且设置了对客户端接收和发布消息的监听器。通过 client.updates.listen
和 client.published.listen
方法,可以监听客户端接收和发布消息的事件,并对接收到的消息进行处理。
(2)连接失败
print('emqx client connection failed - disconnecting, status is ${client.connectionstatus}');
client.disconnect();
exit(-1);
如果连接状态不是已连接,会输出连接失败的消息,并断开客户端连接,并通过 exit(-1)
退出程序。
总体来说,这段代码展示了在连接 mqtt 代理时如何进行连接状态的检查和处理,以及如何监听接收和发布消息的事件。这样可以确保在连接过程中出现问题时及时处理,并对 mqtt 消息进行相应的操作。
2)处理消息
client.updates?.listen((list<mqttreceivedmessage<mqttmessage>> c) {
final mqttpublishmessage message = c[0].payload;
final payload =
mqttpublishpayload.bytestostringasstring(message.payload.message);
print('接收消息:$payload from topic: ${c[0].topic}>');
});
client.published?.listen((mqttpublishmessage message) {
print('published');
final payload =
mqttpublishpayload.bytestostringasstring(message.payload.message);
print(
'published message: $payload to topic: ${message.variableheader.topicname}');
});
(1)stream监听
通过 client.updates.listen
和 client.published.listen
方法,可以监听客户端接收和发布消息的事件,并对接收到的消息进行处理。
-
client.updates
是一个 stream,它用于监听客户端接收到的消息更新事件。调用client.updates.listen()
方法可以订阅这个 stream,并在消息更新时触发回调函数来处理消息。streamsubscription<list<mqttreceivedmessage<mqttmessage>>> subscription = client.updates.listen((list<mqttreceivedmessage<mqttmessage>> c) { // 处理接收到的消息更新事件 }); // 取消订阅 subscription.cancel();
-
client.published
是一个 stream,它用于监听客户端发布消息的事件。调用client.published.listen()
方法可以订阅这个 stream,并在消息发布时触发回调函数来处理发布事件。streamsubscription<mqttpublishmessage> subscription = client.published.listen((mqttpublishmessage message) { // 处理消息发布事件 }); // 取消订阅 subscription.cancel();
-
client.updates.listen()
和client.published.listen()
都是用来监听 mqtt 客户端的消息更新或发布事件的方法。这两个方法都返回一个 streamsubscription 对象,用于订阅消息事件并监听消息的到达或发布,通过这个对象可以管理对该 stream 的订阅,包括取消订阅以释放资源。
(2)listen() 订阅
在 dart 中,subscription
是一个 streamsubscription 对象,它表示对一个 stream 的订阅。当我们调用 stream.listen()
方法来订阅一个 stream 时,listen
方法会返回一个 streamsubscription 对象,通过这个对象我们可以管理对该 stream 的订阅,包括取消订阅以释放资源。streamsubscription 的作用和用法:
-
订阅 stream:
stream<int> stream = stream<int>.periodic(duration(seconds: 1), (index) => index); streamsubscription<int> subscription = stream.listen((data) { print('received data: $data'); });
在上面的示例中,我们创建了一个周期性的 stream,每隔1秒产生一个新的整数。然后通过调用
stream.listen()
方法订阅这个 stream,并将返回的 streamsubscription 对象赋值给变量subscription
。 -
处理事件:
通过subscription
对象,我们可以处理 stream 发出的事件。subscription.ondata((data) { print('received data: $data'); }); subscription.onerror((error) { print('error occurred: $error'); }); subscription.ondone(() { print('stream closed'); });
在上面的代码中,我们可以通过
ondata
方法处理数据事件,onerror
方法处理错误事件,ondone
方法处理 stream 结束事件。 -
取消订阅:
当不再需要监听 stream 的事件时,应该及时取消订阅以释放资源。subscription.cancel();
调用
cancel()
方法可以取消对 stream 的订阅,停止监听事件。
通过 streamsubscription,我们可以管理对 stream 的订阅状态,处理事件,并且在不需要继续监听时及时释放资源。这样可以确保代码的健壮性和性能,并避免不必要的资源浪费。
总体来说,这段代码块是一个用于处理 mqtt 客户端接收到的消息的回调函数。它从接收到的消息列表中取出第一个消息,提取出消息的内容和主题,并将其打印出来或者进行其他处理。这种回调函数通常用于在 mqtt 客户端订阅了某个主题后,处理接收到的消息的逻辑。
(3)接收消息
// 这是一个匿名函数或者回调函数,它接受一个名为 c 的参数,参数类型是 list<mqttreceivedmessage<mqttmessage>>。这个参数 c 是用来处理接收到的 mqtt 消息的列表。
(list<mqttreceivedmessage<mqttmessage>> c) {
// 在函数体内从参数 c 中取出第一个接收到的 mqtt 消息,并将其 payload 赋值给 message 变量。在 mqtt 中消息通常由 payload 组成,payload 可以是文本、二进制数据等。
final mqttpublishmessage message = c[0].payload as mqttpublishmessage;
// 接下来从 message 中获取其 payload,并将其转换为字符串形式。这里使用了 mqttpublishpayload.bytestostringasstring 方法将消息的二进制数据转换为字符串形式,以便进行打印或进一步处理。
final payload = mqttpublishpayload.bytestostringasstring(message.payload.message);
// 最后通过 print 方法打印接收到的消息内容和消息所属的主题(topic)。${c[0].topic} 获取了消息列表中第一个消息的主题,${payload} 则获取了转换后的消息内容。
print('接收消息: $payload from topic: ${c[0].topic}');
}
在 mqtt(message queuing telemetry transport)协议中,消息的格式有多种类型,其中包括 mqttmessage
和 mqttpublishmessage
。它们之间的关系和区别:
-
mqttmessage:
mqttmessage
是 mqtt 协议中所有消息的基类,它定义了一些通用的消息属性和方法。在 dart 的 mqtt_client 库中,mqttmessage
是一个抽象类,不能直接实例化,而是作为其他具体消息类型的父类。mqttmessage
包含了一些通用的消息属性,例如消息类型、qos(quality of service)、消息标识符等,同时也定义了一些方法来处理消息的序列化和反序列化操作。
-
mqttpublishmessage:
mqttpublishmessage
是mqttmessage
的具体子类,用来表示 mqtt 中的 publish(发布)消息。publish 消息是 mqtt 中最常用的消息类型之一,用于发布消息到指定的主题。- 在 dart 的 mqtt_client 库中,
mqttpublishmessage
继承自mqttmessage
,并且添加了一些特定于 publish 消息的属性和方法,例如消息主题、消息负载(payload)等。 - publish 消息是 mqtt 中的核心消息类型之一,它可以携带各种类型的数据,例如文本、json、二进制数据等,通过消息主题来区分不同的消息类型和目标接收者。
因此,mqttpublishmessage
类型是 mqttmessage
类型的一个具体子类,它扩展了基本的 mqtt 消息类型,专门用来表示 publish 消息,提供了更多与 publish 消息相关的属性和方法。在实际开发中,当处理 mqtt 消息时,根据消息的类型来选择合适的消息类进行处理,从而完成对 mqtt 协议的交互和通信。
final mqttpublishmessage message = c[0].payload as mqttpublishmessage;
c[0].payload
类型为 mqttmessage
,若赋给类型为 mqttpublishmessage
的变量会存在类型不匹配的问题。
在 dart 中,子类的实例不能直接赋值给父类的变量,因为子类可能包含父类没有的属性或方法,这样赋值就会丢失这些额外的信息。解决这个问题的方法是进行类型转换或者使用适当的构造函数来创建正确的对象。
/// represents a mqtt message that has been received from a broker.
class mqttreceivedmessage<t> extends observe.changerecord {
/// initializes a new instance of an mqttreceivedmessage class.
mqttreceivedmessage(this.topic, this.payload);
/// the topic the message was received on.
string topic;
/// the payload of the message received.
t payload;
}
根据自定义的list<mqttreceivedmessage<mqttmessage>> c
以及源码文件中的mqttreceivedmessage<t>
可推出t
即为mqttmessage
,既payload
的类型为mqttmessage
,在知道其类型情况下转换为 mqttpublishmessage
类型的对象,可以使用类型转换操作符 as
或者构造函数来处理这种情况。
-
使用类型转换操作符
as
:
如果您确定c[0].payload
实际上是一个mqttpublishmessage
类型的对象,可以使用as
操作符进行类型转换:final mqttpublishmessage message = c[0].payload as mqttpublishmessage;
-
使用构造函数创建对象:
如果无法直接进行类型转换,可以通过mqttpublishmessage
类的构造函数来创建一个新的对象,并将部分属性复制过去:final mqttpublishmessage message = mqttpublishmessage(c[0].payload.message);
(4)发送消息
// 使用安全调用 ?.,在 client.published 不为 null 时才调用 listen 方法。避免在 client.published 为 null 的情况下出现空指针异常。
client.published?.listen((mqttpublishmessage message) {
// 在 listen 方法中通过 message 参数获取到了已发布消息。mqttpublishmessage 是 mqtt 客户端库定义的一个消息类型,用于表示 mqtt 协议中的发布消息。
print('发表信息成功:');
// 将消息的有效负载(payload)从字节流转换为字符串。message.payload.message 表示消息的字节流数据,通过 bytestostringasstring 方法将其转换为字符串形式。
final payload = mqttpublishpayload.bytestostringasstring(message.payload.message);
print(// 'published message: $payload to topic: ${message.variableheader.topicname}');
'发表信息: $payload 对于主题: ${message.variableheader?.topicname}');
// 通过 message 对象的 variableheader 属性可以获取消息的可变头部信息,其中包含了消息的主题名(topicname)。
// 使用 ?. 安全调用操作符是为了避免在 variableheader 为 null 的情况下出现空指针异常。
}
7.加载资源文件
// 这是一个异步函数,返回一个 future 类型的对象,这个 future 的结果是一个list<int> 类型的字节列表。函数名为 loadbytes,接受一个参数 assetpath,表示要加载的资源路径。
future<list<int>> loadbytes(string assetpath) async {
bytedata data = await rootbundle.load(assetpath);
// 在函数体内部使用 await 关键字等待 rootbundle.load(assetpath) 方法的结果。
// rootbundle 是 flutter 提供的用于访问应用程序的资源的类
// load 方法用于异步加载指定路径的资源文件,并返回一个 bytedata 对象,表示加载的数据。
list<int> bytes = data.buffer.asuint8list();
// 接着将 bytedata 对象中的数据转换为 uint8list 类型的字节列表,以便后续使用。
// buffer 属性表示数据的缓冲区。
// asuint8list() 方法将缓冲区中的数据转换为 uint8list 类型。
return bytes;
// 最后使用 return 关键字返回加载的字节列表作为 future 的结果。
// 由于函数声明时指定了返回值类型为 future<list<int>>,因此必须返回一个符合该类型的对象,这里返回的是异步加载的字节列表。
}
这段代码可以用来异步加载应用程序中的资源文件并将其转换为字节列表。在 flutter 中常将一些静态资源文件(如图片、字体、配置文件等)放置在项目的 assets 文件夹中,然后通过异步加载的方式将这些资源文件读取为字节列表,以便在应用程序中使用。
-
加载图片资源:
可以将图片文件作为静态资源添加到 flutter 项目的 assets 文件夹中,然后使用上述代码异步加载图片资源的字节列表。加载后的字节列表可以用于创建 image 或者 imageprovider 对象,从而显示图片。 -
加载字体文件:
如果应用程序使用了自定义字体文件(如ttf或otf格式),可以将字体文件添加到 assets 文件夹中,并使用上述代码加载字体文件的字节列表。加载后的字节列表可以用于创建自定义字体的字体对象,以便应用程序中使用自定义字体。 -
加载配置文件:
有时应用程序可能需要加载一些配置文件(如json文件、xml文件等),这些配置文件可以作为静态资源添加到 assets 文件夹中,并使用上述代码异步加载配置文件的字节列表。加载后的字节列表可以用于解析配置文件的内容,从而动态配置应用程序的行为或者显示内容。
以加载图片资源为例,演示如何使用上述代码加载应用程序中的图片资源并显示到界面上。
1)配置资源
首先,在 flutter 项目的 pubspec.yaml
文件中添加图片资源的配置,例如:
flutter:
assets:
- assets/images/sample_image.png
2)加载字节列表
然后,在 dart 文件中使用上述代码异步加载图片资源的字节列表
import 'dart:async';
import 'dart:typed_data';
import 'package:flutter/material.dart';
import 'package:flutter/services.dart';
future<list<int>> loadbytes(string assetpath) async {
bytedata data = await rootbundle.load(assetpath);
list<int> bytes = data.buffer.asuint8list();
return bytes;
}
3)界面显示
显示到界面上,例如:
import 'dart:async';
import 'dart:typed_data';
import 'package:flutter/material.dart';
import 'package:flutter/services.dart';
future<list<int>> loadbytes(string assetpath) async {
bytedata data = await rootbundle.load(assetpath);
list<int> bytes = data.buffer.asuint8list();
return bytes;
}
void main() {
runapp(myapp());
}
class myapp extends statelesswidget {
widget build(buildcontext context) {
return materialapp(
title: 'image loading demo',
home: scaffold(
appbar: appbar(title: text('image loading demo')),
body: center(
child: futurebuilder<list<int>>(
future: loadbytes('assets/images/sample_image.png'),
builder: (context, snapshot) {
if (snapshot.connectionstate == connectionstate.waiting) {
return circularprogressindicator();
} else if (snapshot.haserror) {
return text('error: ${snapshot.error}');
} else if (!snapshot.hasdata) {
return text('no data available');
} else {
// 使用加载的字节列表创建image对象显示图片
uint8list imagebytes = uint8list.fromlist(snapshot.data!);
return image.memory(imagebytes);
}
},
),
),
),
);
}
}
在上面的示例中创建了一个简单的 flutter 应用程序,加载名为 sample_image.png
的图片资源,并将其显示到界面上。使用了 futurebuilder
来异步加载图片资源的字节列表,并根据加载状态显示不同的界面内容。加载完成后,使用 image.memory
构造函数将加载的字节列表转换为 image
对象,并在界面上显示图片。
8.证书文件
assets:
- assets/certs/server-ca.crt
- assets/certs/client.crt
- assets/certs/client.key
这个设置是用来告诉 flutter 应用程序在构建时包含哪些静态资源文件的配置。在 flutter 中,静态资源文件(如图片、字体、配置文件等)可以作为应用程序的一部分打包到应用程序的安装包中,以便应用程序在运行时可以访问这些资源。
配置示例 assets:
包含了三个文件路径:
assets/certs/server-ca.crt
assets/certs/client.crt
assets/certs/client.key
这些文件用于处理加密和安全连接的证书文件,通常在与服务器进行安全通信时需要使用。具体用途可能包括:
server-ca.crt
:服务器的 ca(certificate authority)证书,用于验证服务器的身份和信任链。client.crt
:客户端的证书,用于证明客户端的身份,并与服务器进行双向认证。client.key
:客户端的私钥,用于在安全连接中进行加密通信。
这些证书和密钥文件可以通过 flutter 的 assets 配置添加到应用程序中,在构建时会被打包到应用程序的安装包中,然后应用程序可以在运行时读取这些文件并使用它们进行安全连接和通信,例如在与服务器进行 https 连接或者 mqtt 安全连接时使用。
需要注意的是,这些文件应该是安全的敏感信息,应妥善保管,不要在应用程序中硬编码敏感信息或者在公开的代码库中包含这些证书和密钥文件,以免造成安全风险。通常建议将这些敏感文件放在安全的存储位置,并在应用程序中安全地访问这些文件。
三、设置组件
mqttconnectmessage
:设置连接选项,包含超时设置,认证以及遗愿消息等。
client.updates.listen
:用于监听已订阅主题的消息到达。
1.证书连接示例
/// security context
securitycontext context = new securitycontext()
..usecertificatechain('path/to/my_cert.pem')
..useprivatekey('path/to/my_key.pem', password: 'key_password')
..setclientauthorities('path/to/client.crt', password: 'password');
client.secure = true;
client.securitycontext = context;
2.其他 mqtt 操作
1)主题订阅
client.subscribe("topic/test", mqttqos.atleastonce)
2)消息发布
const pubtopic = 'topic/test';
final builder = mqttclientpayloadbuilder();
builder.addstring('hello mqtt');
client.publishmessage(pubtopic, mqttqos.atleastonce, builder.payload);
3)取消订阅
client.unsubscribe('topic/test');
4)断开连接
client.disconnect();
3.测试
我们给该项目编写了一个简单的 ui 界面,并配合 mqtt 5.0 客户端工具 - mqttx 进行以下测试:
- 连接
- 订阅
- 发布
- 取消订阅
- 断开连接
应用界面:
使用 mqttx 作为另一个客户端进行消息收发:
我们可以看到整个过程的日志。
4.总结
至此,我们完成了在 android 平台上利用 flutter 构建 mqtt 应用,实现了客户端与 mqtt 服务器的连接、订阅、取消订阅、收发消息等功能。
flutter 通过统一的开发语言和跨平台特性让开发强大的移动应用变得十分容易,它将来可能会是开发移动应用的最佳解决方案。结合 flutter、mqtt 协议及 mqtt 云服务,我们可以开发更多有趣的应用。在这里插入图片描述
发表评论