常见的 Mqtt Broker
MQTT Broker | 开源 | 说明 |
---|---|---|
Mosquitto | 完全开源,单机版,支持 10 万并发,伪集群。 | 安装在单机版 |
EMQX | 有开源版本,有商业版本。开源版本单机支持 50 万并发,并且可以配置集群。 | |
apache rocketmq | 阿里捐赠给 Apache 的开源消息组件、高可用、消息集群。 安装 Mqtt 插件后,可以作为 Mqtt Broker+Kafka 来看 | |
阿里云 Mqtt 服务 | 云服务,收费的,不用自己安装了。 |
最终,在 EMQX 与 rocketmq 之间摇摆,EMQX 优势是专业,不足是公司规模小,担心开源版本被商业版本蚕食。rocketmq 优势是 Apache 品牌的加持,开源会持续下去,不足是没来想要一个 MqttBroker,全引入了一个消息队列,担心速度慢。另外与后面的 kafka 功能有点重复了。
小孩子才做选择题呢,两个都要,两个要做到随时替换,想用那个就用那个。
必要功能
支持 MQTT 5.0 和 3.x 协议标准
完全开源
有集群功能,防止单点故障
加分功能
MQTT 的连接地址通常包含 :服务器 IP 或者域名、服务器端口、连接协议。
基于 TCP 的 MQTT 连接
mqtt
是普通的 TCP 连接,端口一般为 1883。
mqtts
是基于 TLS/SSL 的安全连接,端口一般为 8883。
比如 mqtt://broker.emqx.io:1883
是一个基于普通 TCP 的 MQTT 连接地址。
基于 WebSocket 的连接
ws
是普通的 WebSocket 连接,端口一般为 8083。
wss
是基于 WebSocket 的安全连接,端口一般为 8084。
当使用 WebSocket 连接时,连接地址还需要包含 Path,EMQX 默认配置的 Path 是 /mqtt
。比如 ws://broker.emqx.io:8083/mqtt
是一个基于 WebSocket 的 MQTT 连接地址。
false
时表示创建一个持久会话,在客户端断开连接时,会话仍然保持并保存离线消息,直到会话超时注销。为 true
时表示创建一个新的临时会话,在客户端断开时,会话自动销毁。遗嘱消息是 MQTT 为那些可能出现意外断线的设备提供的将遗嘱优雅地发送给其他客户端的能力。
遗嘱消息可以看作是一个简化版的 MQTT 消息,它也包含 Topic、Payload、QoS、Retain 等信息。
true
时表明遗嘱消息是保留消息。MQTT 服务器会为每个主题存储最新一条保留消息,以方便消息发布后才上线的客户端在订阅主题时仍可以接收到该消息。详细请见MQTT 保留消息是什么?如何使用?更多关于遗嘱消息的介绍可查看博客:MQTT 遗嘱消息(Will Message)的使用。
这里介绍一下如何将 Retained 消息与 Will 消息结合起来进行使用。
offline
,该遗嘱主题与一个普通发送状态的主题设定成同一个 A/status
。A/status
发送内容为 online
的 Retained 消息,其它客户端订阅主题 A/status
的时候,将获取到 Retained 消息为 online
。A/status
发送内容为 offline
的消息,其它订阅了此主题的客户端会马上收到 offline
消息;如果遗嘱消息设置了 Will Retain,那么此时如果有新的订阅 A/status
主题的客户端上线,也将获取到内容为 offline
的遗嘱消息。SSL/TLS 的出现很好的解决了通信中的风险问题。
单向认证
单向认证是一种仅通过验证服务器证书来建立安全通信的方式,它能保证通信是加密的,但是不能验证客户端的真伪,通常需要与用户名、密码、Client ID 等认证机制结合。读者可参考博客EMQX MQTT 服务器启用 SSL/TLS 安全连接来建立一个安全的单向认证 MQTT 连接。
双向认证是指在进行通信认证时要求服务端和客户端都提供证书,双方都需要进行身份认证,以确保通信中涉及的双方都是受信任的。 双方彼此共享其公共证书,然后基于该证书执行验证、确认。一些对安全性要求较高的应用场景,就需要开启双向 SSL/TLS 认证。读者查看博客EMQX 启用双向 SSL/TLS 安全连接了解如何建立一个安全的双向认证 MQTT 连接。
感兴趣的读者也可查看以下博客来学习物联网安全相关知识:
MQTT 发布/订阅模式的精髓在于由一个被称为代理(Broker)的中间角色负责所有消息的路由和分发工作,发布者将带有主题的消息发送给代理,订阅者则向代理订阅主题来接收感兴趣的消息。
在 MQTT 中,主题和订阅无法被提前注册或创建,所以代理也无法预知某一个主题之后是否会有订阅者,以及会有多少订阅者,所以只能将消息转发给当前的订阅者,如果当前不存在任何订阅,那么消息将被直接丢弃。
发布订阅模式的松耦合特性,也给 MQTT 带来了一些副作用。由于发布者并不知晓订阅者的状态,因此发布者也无法得知订阅者是否收到了消息,或者是否正确处理了消息。为此,MQTT 5.0 增加了请求响应特性,以实现订阅者收到消息后向某个主题发送应答,发布者收到应答后再进行后续操作。
MQTT 的发布/订阅机制可以很轻易地满足我们一对一、一对多、多对一的通信需要。这也在很大程度上拓宽了 MQTT 在 IoT 领域之外的应用,像网络直播互动、手机消息推送等行业场景,都非常适合使用 MQTT。
通常不建议主题以 /
开头或结尾,例如 /chat
或 chat/
。
chat/room/1sensor/10/temperaturesensor/+/temperaturesensor/#
不同于消息队列中的主题(比如 Kafka),MQTT 主题不需要提前创建。MQTT 客户端在订阅或发布时即自动的创建了主题,开发者无需再关心主题的创建,并且也不需要手动删除主题。
加号 (“+” ) 是用于单个主题层级匹配的通配符。
如果客户端订阅了主题 sensor/+/temperature
,将会收到以下主题的消息:
sensor/1/temperaturesensor/2/temperature...sensor/n/temperature
但是不会匹配以下主题:
sensor/temperaturesensor/bedroom/1/temperature
井字符号(“#” )是用于匹配主题中任意层级的通配符。
多层通配符表示它的父级和任意数量的子层级,在使用多层通配符时,它必须占据整个层级并且必须是主题的最后一个字符,例如:
如果客户端订阅主题 senser/#
,它将会收到以下主题的消息:
sensorsensor/temperaturesensor/1/temperature
以 $SYS/
开头的主题为系统主题,系统主题主要用于获取 MQTT 服务器自身运行状态、消息统计、客户端上下线事件等数据。目前,MQTT 协议暂未明确规定 $SYS/
主题标准,但大多数 MQTT 服务器都遵循该标准建议。
共享订阅是 MQTT 5.0 引入的新特性,用于在多个订阅者之间实现订阅的负载均衡,MQTT 5.0 规定的共享订阅主题以 $share
开头。
下图中,3 个订阅者用共享订阅的方式订阅了同一个主题 $share/g/topic
,其中topic
是它们订阅的真实主题名,而 $share/g/
是共享订阅前缀(g/
是群组名,可为任意 UTF-8 编码字符串)。
另外,对于 MQTT 5.0 以下的版本,EMQX 还支持不带群组的共享订阅前缀 $queue
,关于共享订阅的更多详情请查看 EMQX 共享订阅文档。
这个功能,可以增加消息消费的效率
比如我们用传感器监测卧室、客厅以及厨房的温度、湿度和空气质量,可以设计以下几个主题:
myhome/bedroom/temperature
myhome/bedroom/humidity
myhome/bedroom/airquality
myhome/livingroom/temperature
myhome/livingroom/humidity
myhome/livingroom/airquality
myhome/kitchen/temperature
myhome/kitchen/humidity
myhome/kitchen/airquality
接下来,可以通过订阅 myhome/bedroom/+
主题获取卧室的温度、湿度及空气质量数据,订阅 myhome/+/temperature
主题获取三个房间的温度数据,订阅 myhome/#
获取所有的数据。
充电桩的上行主题格式为 ocpp/cp/${cid}/notify/${action}
,下行主题格式为 ocpp/cp/${cid}/reply/${action}
。
ocpp/cp/cp001/notify/bootNotification
充电桩上线时向该主题发布上线请求。
ocpp/cp/cp001/notify/startTransaction
向该主题发布充电请求。
ocpp/cp/cp001/reply/bootNotification
充电桩上线前需订阅该主题接收上线应答。
ocpp/cp/cp001/reply/startTransaction
充电桩发起充电请求前需订阅该主题接收充电请求应答。
chat/user/${user_id}/inbox
一对一聊天:用户上线后订阅该收件箱主题 ,将能接收到好友发送给自己的消息。给好友回复消息时,只需要将该主题的 user_id
换为好友的的 id 即可。
chat/group/${group_id}/inbox
群聊:用户加群成功后,可订阅该主题获取对应群组的消息,回复群聊时直接给该主题发布消息即可。
req/user/${user_id}/add
添加好友:可向该主题发布添加好友的申请(user_id
为对方的 id)。
接收好友请求:用户可订阅该主题(user_id
为自己的 id)接收其他用户发起的好友请求。
resp/user/${user_id}/add
接收好友请求的回复:用户添加好友前,需订阅该主题接收请求结果(user_id
为自己的 id)。
回复好友申请:用户向该主题发送消息表明是否同意好友申请(user_id
为对方的 id)。
user/${user_id}/state
用户在线状态:用户可以订阅该主题获取好友的在线状态。
一上来就是集群。
emqx1:image: emqx:5.0.24container_name: emqx1user: rootenvironment:- "EMQX_NODE_NAME=emqx@node1.emqx.io"- "EMQX_CLUSTER__DISCOVERY_STRATEGY=static"- "EMQX_CLUSTER__STATIC__SEEDS=[emqx@node1.emqx.io,emqx@node2.emqx.io]"healthcheck:test: ["CMD", "/opt/emqx/bin/emqx_ctl", "status"]interval: 5stimeout: 25sretries: 5networks:iot-bridge:aliases:- node1.emqx.ioports:- 1883:1883- 8083:8083- 8084:8084- 8883:8883- 18083:18083volumes:- ${DATA_PATH}/emqx/emqx1:/opt/emqx/data- /etc/localtime:/etc/localtime:roemqx2:image: emqx:5.0.24container_name: emqx2user: rootenvironment:- "EMQX_NODE_NAME=emqx@node2.emqx.io"- "EMQX_CLUSTER__DISCOVERY_STRATEGY=static"- "EMQX_CLUSTER__STATIC__SEEDS=[emqx@node1.emqx.io,emqx@node2.emqx.io]"healthcheck:test: ["CMD", "/opt/emqx/bin/emqx_ctl", "status"]interval: 5stimeout: 25sretries: 5networks:iot-bridge:aliases:- node2.emqx.iovolumes:- ${DATA_PATH}/emqx/emqx2:/opt/emqx/data- /etc/localtime:/etc/localtime:ronetworks:iot-bridge:driver: bridge
user: root
networks
,并且网络名称是node2.emqx.io
中间要带点的。执行命令
docker-compose up -d
查看集群状态
$ docker-compose exec emqx1 sh -c "emqx_ctl cluster status"Cluster status: #{running_nodes => ['emqx@node1.emqx.io','emqx@node2.emqx.io'],stopped_nodes => []}
mqtt://
,ws://
。如果使用 SSL/TLS
连接,需要修改为 mqtts://
,wss://
。Self signed
,需要进行证书配置。Last-Will-QoS
和 Last-Will-Retain
的值默认为 0 和 False
。访问http://localhost:18083/
./bin/emqx ctl admins passwd <Username> <Password>
配置估算: 4 核 8 GB 内存 2 节点
最终,还要靠压力测试来计算。
密码认证是最简单,也是使用最多的认证方式。此时,客户端需要提供能够表明身份的凭据,例如用户名、客户端 ID 以及对应的密码。
MQTT v5 增加了新特性 增强认证。可以实现对客户端和服务器的双向认证,服务器可以验证连接的客户端是否是真正的客户端,客户端也可以验证连接的服务器是否是真正的服务器,从而提供了更高的安全性。
用户很多,有一个批量的规则,来授权。
EMQX 还允许在主题中使用占位符,在匹配规则时将当前客户端信息等动态替换到主题中,支持的占位符如下:
${clientid}
${username}
下面是参考的一些数据格式。
%% 允许用户名是 dashboard 的客户端订阅 "$SYS/#" 这个主题{allow, {user, "dashboard"}, subscribe, ["$SYS/#"]}.%% 允许来自127.0.0.1 的用户发布和订阅 "$SYS/#" 以及 "#"{allow, {ipaddr, "127.0.0.1"}, all, ["$SYS/#", "#"]}.%% 拒绝其他所有用户订阅 "$SYS/#" 和 "#" 主题{deny, all, subscribe, ["$SYS/#", {eq, "#"}]}.%% 如果前面的规则都没有匹配到,则允许所有操作{allow, all}.
也可以添加下面的规则
url -X 'POST' \'http://localhost:18083/api/v5/authorization/sources/built_in_database/clientid' \-H 'accept: */*' \-H 'Content-Type: application/json' \-d '[{"clientid": "client1","rules": [{"action": "publish","permission": "allow","topic": "test/toopic/1"},{"action": "subscribe","permission": "allow","topic": "test/toopic/2"},{"action": "all","permission": "deny","topic": "eq test/#"}]}]'
黑名单
按照以前的逻辑,这部分功能可以通过自己编写程序实现,现在可以通过规则引擎实现。如果使用规则引擎,那么会增加更换 Mqtt Broker 的成本
规则引擎是 EMQX 内置基于 SQL 的数据处理组件,搭配 数据桥接 使用无需编写代码即可实现一站式的 IoT 数据提取、过滤、转换、存储与处理,以加速应用集成和业务创新。
规则的典型应用场景举例
数据桥接是用来对接 EMQX 和外部数据系统的通道,比如 MySQL、MongoDB 等数据库, 或 Kafka,RabbitMQ 等消息中间件,或 HTTP 服务器等。
通过数据桥接,用户可以实时地将消息从 EMQX 发送到外部数据系统,或者从外部数据系统拉取数据并发送到 EMQX 的某个主题。
EMQX 开源版中仅支持 MQTT 桥接 和 Webhook
连接池是一组可重用的连接对象。通过连接池,用户无需在为每个请求重新创建连接,有助于降低资源消耗,提高连接效率和并发能力。
EMQX 会为每个需要创建数据桥的节点创建一个单独的连接池。例如,对一个包含 3 个 EMQX 节点的集群,如果将每个数据桥的连接池大小设置为 8,那么 EMQX 将创建 3 x 8 = 24 个连接。注意:请确保构建的连接池数量不要超过资源的连接限制。
EMQX 支持通过 Webhook 的方式将客户端消息和事件发送到外部 HTTP 服务。
如果 EMQX 放在了容器中,那么就要导入到容器中的内容。
搭建简易 HTTP 服务,今后格式定下来,可以转到一个类中。
@PostMapping("/emqx")public String emqx(@RequestBody String body) {HttpServletRequest request= RequestContextHolderUtil.getRequest();return body;}
创建 Webhook 数据桥接
填写 url
填写内容
{"payload": "${payload}" ,"topic": "${topic}","username":"${username}","timestamp":${timestamp},"qos":${qos}}
创建数据转发规则
调试
发送一个消息,看传输的内容。
MQTT 数据桥接是一种连接多个 EMQX 集群或其他 MQTT 服务的方式
Apache Kafka 数据桥接实现了 EMQX 客户端消息和事件与 Apache Kafka (包括 Confluent) 的桥接,能够提供 EMQX 与企业应用之间高性能、高可靠的数据集成,有效降低应用复杂度并提升扩展性。
以下是 EMQX 开源版与企业版功能列表:
项目 | EMQX 开源版 | EMQX 企业版 | RocketMQ |
---|---|---|---|
产品定位 | 全球领先的开源 MQTT Broker | 高可靠、可扩展的企业级 MQTT 物联网接入平台 | |
伸缩性 | 单集群至多 1 亿 MQTT 连接 | 单集群至多 1 亿 MQTT 连接 | |
性能 | > 500 万 MQTT 消息每秒 | > 500 万 MQTT 消息每秒 | |
可靠性 | 内存数据存储 | RocksDB 数据存储(即将支持) | |
延迟 | 1~5 毫秒 | 1~5 毫秒 | |
SLA | 99.99% | 至多 99.999% | |
数据集成(开箱即用) | 3 | 40+ | |
License | Apache Version 2.0 | Commercial license (Business source license) | |
技术支持 | 开源社区 | 7x24 全球支持 | |
MQTT 5.0 | |||
MQTT over QUIC | |||
MQTT 扩展 | |||
多协议网关 | |||
多租户 | |||
跨地域复制 | |||
数据持久化 | |||
Schema Registry | |||
消息编解码 | |||
规则引擎 | |||
Flow Editor | |||
文件传输 | |||
Kafka 集成 | |||
企业系统集成 | |||
故障排查 | |||
云原生 & K8s | |||
边缘计算 |
这是一个重要的功能,可以解决消费大量数据的要求。
下面模拟了
topic
发送消息。s1 s2 s3
,共享订阅topic
发送者往 pub 中发送
订阅者通过:$share/g1/topic
,就能实现负载均衡的接受数据