Mqtt Broker

常见的 Mqtt Broker

MQTT Broker开源说明
Mosquitto完全开源,单机版,支持 10 万并发,伪集群。安装在单机版
EMQX有开源版本,有商业版本。开源版本单机支持 50 万并发,并且可以配置集群。
apache rocketmq阿里捐赠给 Apache 的开源消息组件、高可用、消息集群。

安装 Mqtt 插件后,可以作为 Mqtt Broker+Kafka 来看
阿里云 Mqtt 服务云服务,收费的,不用自己安装了。

最终,在 EMQX 与 rocketmq 之间摇摆,EMQX 优势是专业,不足是公司规模小,担心开源版本被商业版本蚕食。rocketmq 优势是 Apache 品牌的加持,开源会持续下去,不足是没来想要一个 MqttBroker,全引入了一个消息队列,担心速度慢。另外与后面的 kafka 功能有点重复了。

小孩子才做选择题呢,两个都要,两个要做到随时替换,想用那个就用那个。

1、Mqtt 背景知识

1.1 基本功能要求

  • 必要功能

    • 支持 MQTT 5.0 和 3.x 协议标准

    • 完全开源

    • 有集群功能,防止单点故障

  • 加分功能

    • Kafka 集成
    • 支持 websocket 协议。微信小程序通过 websocket 来连接 mqtt 服务
  • 为了便于移植,下面功能不用

1.2 MQTT 连接的基本概念

MQTT 的连接地址通常包含 :服务器 IP 或者域名、服务器端口、连接协议。

连接地址

基于 TCP 的 MQTT 连接

mqtt 是普通的 TCP 连接,端口一般为 1883。

mqtts 是基于 TLS/SSL 的安全连接,端口一般为 8883。

比如 mqtt://broker.emqx.io:1883 是一个基于普通 TCP 的 MQTT 连接地址。

基于 WebSocket 的连接

ws 是普通的 WebSocket 连接,端口一般为 8083。

wss 是基于 WebSocket 的安全连接,端口一般为 8084。

当使用 WebSocket 连接时,连接地址还需要包含 Path,EMQX 默认配置的 Path 是 /mqtt。比如 ws://broker.emqx.io:8083/mqtt 是一个基于 WebSocket 的 MQTT 连接地址。

其他参数

  • 客户端 ID(Client ID)
  • 用户名与密码(Username & Password)
  • 连接超时(Connect Timeout)
  • 保活周期(Keep Alive):保活周期,是一个以秒为单位的时间间隔。客户端在无报文发送时,将按 Keep Alive 设定的值定时向服务端发送心跳报文,确保连接不被服务端断开。
  • 清除会话(Clean Session)
    • false 时表示创建一个持久会话,在客户端断开连接时,会话仍然保持并保存离线消息,直到会话超时注销。为 true 时表示创建一个新的临时会话,在客户端断开时,会话自动销毁。

遗嘱消息(Last Will)

遗嘱消息是 MQTT 为那些可能出现意外断线的设备提供的将遗嘱优雅地发送给其他客户端的能力。

遗嘱消息可以看作是一个简化版的 MQTT 消息,它也包含 Topic、Payload、QoS、Retain 等信息。

  • 当设备意外断线时,遗嘱消息将被发送至遗嘱 Topic;
  • 遗嘱 Payload 是待发送的消息内容;
  • 遗嘱 QoS 与普通 MQTT 消息的 QoS 一致,详细请见MQTT QoS(服务质量)介绍
  • 遗嘱 Retain 为 true 时表明遗嘱消息是保留消息。MQTT 服务器会为每个主题存储最新一条保留消息,以方便消息发布后才上线的客户端在订阅主题时仍可以接收到该消息。详细请见MQTT 保留消息是什么?如何使用?

更多关于遗嘱消息的介绍可查看博客:MQTT 遗嘱消息(Will Message)的使用

这里介绍一下如何将 Retained 消息与 Will 消息结合起来进行使用。

  1. 客户端 A 遗嘱消息内容设定为 offline,该遗嘱主题与一个普通发送状态的主题设定成同一个 A/status
  2. 当客户端 A 连接时,向主题 A/status 发送内容为 online 的 Retained 消息,其它客户端订阅主题 A/status 的时候,将获取到 Retained 消息为 online
  3. 当客户端 A 异常断开时,系统自动向主题 A/status 发送内容为 offline 的消息,其它订阅了此主题的客户端会马上收到 offline 消息;如果遗嘱消息设置了 Will Retain,那么此时如果有新的订阅 A/status 主题的客户端上线,也将获取到内容为 offline 的遗嘱消息。

如何建立一个安全的 MQTT 连接?

SSL/TLS 的出现很好的解决了通信中的风险问题。

单向认证

单向认证是一种仅通过验证服务器证书来建立安全通信的方式,它能保证通信是加密的,但是不能验证客户端的真伪,通常需要与用户名、密码、Client ID 等认证机制结合。读者可参考博客EMQX MQTT 服务器启用 SSL/TLS 安全连接来建立一个安全的单向认证 MQTT 连接。

双向认证是指在进行通信认证时要求服务端和客户端都提供证书,双方都需要进行身份认证,以确保通信中涉及的双方都是受信任的。 双方彼此共享其公共证书,然后基于该证书执行验证、确认。一些对安全性要求较高的应用场景,就需要开启双向 SSL/TLS 认证。读者查看博客EMQX 启用双向 SSL/TLS 安全连接了解如何建立一个安全的双向认证 MQTT 连接。

感兴趣的读者也可查看以下博客来学习物联网安全相关知识:

1.3 发布/订阅模式

MQTT 发布/订阅模式的精髓在于由一个被称为代理(Broker)的中间角色负责所有消息的路由和分发工作,发布者将带有主题的消息发送给代理,订阅者则向代理订阅主题来接收感兴趣的消息。

在 MQTT 中,主题和订阅无法被提前注册或创建,所以代理也无法预知某一个主题之后是否会有订阅者,以及会有多少订阅者,所以只能将消息转发给当前的订阅者,如果当前不存在任何订阅,那么消息将被直接丢弃。

发布订阅模式的松耦合特性,也给 MQTT 带来了一些副作用。由于发布者并不知晓订阅者的状态,因此发布者也无法得知订阅者是否收到了消息,或者是否正确处理了消息。为此,MQTT 5.0 增加了请求响应特性,以实现订阅者收到消息后向某个主题发送应答,发布者收到应答后再进行后续操作。

MQTT 的发布/订阅机制可以很轻易地满足我们一对一、一对多、多对一的通信需要。这也在很大程度上拓宽了 MQTT 在 IoT 领域之外的应用,像网络直播互动、手机消息推送等行业场景,都非常适合使用 MQTT。

1.4 什么是 MQTT 主题?

通常不建议主题以 / 开头或结尾,例如 /chatchat/

chat/room/1
sensor/10/temperature
sensor/+/temperature
sensor/#

不同于消息队列中的主题(比如 Kafka),MQTT 主题不需要提前创建。MQTT 客户端在订阅或发布时即自动的创建了主题,开发者无需再关心主题的创建,并且也不需要手动删除主题。

主题通配符

单层通配符

加号 (“+” ) 是用于单个主题层级匹配的通配符。

如果客户端订阅了主题 sensor/+/temperature,将会收到以下主题的消息:

sensor/1/temperature
sensor/2/temperature
...
sensor/n/temperature

但是不会匹配以下主题:

sensor/temperature
sensor/bedroom/1/temperature
多层通配符

井字符号(“#” )是用于匹配主题中任意层级的通配符。

多层通配符表示它的父级和任意数量的子层级,在使用多层通配符时,它必须占据整个层级并且必须是主题的最后一个字符,例如:

如果客户端订阅主题 senser/#,它将会收到以下主题的消息:

sensor
sensor/temperature
sensor/1/temperature

系统主题

$SYS/开头的主题为系统主题,系统主题主要用于获取 MQTT 服务器自身运行状态、消息统计、客户端上下线事件等数据。目前,MQTT 协议暂未明确规定 $SYS/ 主题标准,但大多数 MQTT 服务器都遵循该标准建议。

共享订阅

共享订阅是 MQTT 5.0 引入的新特性,用于在多个订阅者之间实现订阅的负载均衡,MQTT 5.0 规定的共享订阅主题以 $share 开头。

下图中,3 个订阅者用共享订阅的方式订阅了同一个主题 $share/g/topic,其中topic 是它们订阅的真实主题名,而 $share/g/ 是共享订阅前缀(g/ 是群组名,可为任意 UTF-8 编码字符串)。

另外,对于 MQTT 5.0 以下的版本,EMQX 还支持不带群组的共享订阅前缀 $queue,关于共享订阅的更多详情请查看 EMQX 共享订阅文档。

这个功能,可以增加消息消费的效率

不同场景中的主题设计

智能家居

比如我们用传感器监测卧室、客厅以及厨房的温度、湿度和空气质量,可以设计以下几个主题:

  • myhome/bedroom/temperature
  • myhome/bedroom/humidity
  • myhome/bedroom/airquality
  • myhome/livingroom/temperature
  • myhome/livingroom/humidity
  • myhome/livingroom/airquality
  • myhome/kitchen/temperature
  • myhome/kitchen/humidity
  • myhome/kitchen/airquality

接下来,可以通过订阅 myhome/bedroom/+ 主题获取卧室的温度、湿度及空气质量数据,订阅 myhome/+/temperature 主题获取三个房间的温度数据,订阅 myhome/# 获取所有的数据。

充电桩

充电桩的上行主题格式为 ocpp/cp/${cid}/notify/${action},下行主题格式为 ocpp/cp/${cid}/reply/${action}

  • ocpp/cp/cp001/notify/bootNotification

    充电桩上线时向该主题发布上线请求。

  • ocpp/cp/cp001/notify/startTransaction

    向该主题发布充电请求。

  • ocpp/cp/cp001/reply/bootNotification

    充电桩上线前需订阅该主题接收上线应答。

  • ocpp/cp/cp001/reply/startTransaction

    充电桩发起充电请求前需订阅该主题接收充电请求应答。

即时消息
  • chat/user/${user_id}/inbox

    一对一聊天:用户上线后订阅该收件箱主题 ,将能接收到好友发送给自己的消息。给好友回复消息时,只需要将该主题的 user_id 换为好友的的 id 即可。

  • chat/group/${group_id}/inbox

    群聊:用户加群成功后,可订阅该主题获取对应群组的消息,回复群聊时直接给该主题发布消息即可。

  • req/user/${user_id}/add

    添加好友:可向该主题发布添加好友的申请(user_id 为对方的 id)。

    接收好友请求:用户可订阅该主题(user_id 为自己的 id)接收其他用户发起的好友请求。

  • resp/user/${user_id}/add

    接收好友请求的回复:用户添加好友前,需订阅该主题接收请求结果(user_id 为自己的 id)。

    回复好友申请:用户向该主题发送消息表明是否同意好友申请(user_id 为对方的 id)。

  • user/${user_id}/state

    用户在线状态:用户可以订阅该主题获取好友的在线状态。

2、Emqx

2.1 快速开始

安装 broker

一上来就是集群。

emqx1:
image: emqx:5.0.24
container_name: emqx1
user: root
environment:
- "EMQX_NODE_NAME=emqx@node1.emqx.io"
- "EMQX_CLUSTER__DISCOVERY_STRATEGY=static"
- "EMQX_CLUSTER__STATIC__SEEDS=[emqx@node1.emqx.io,emqx@node2.emqx.io]"
healthcheck:
test: ["CMD", "/opt/emqx/bin/emqx_ctl", "status"]
interval: 5s
timeout: 25s
retries: 5
networks:
iot-bridge:
aliases:
- node1.emqx.io
ports:
- 1883:1883
- 8083:8083
- 8084:8084
- 8883:8883
- 18083:18083
volumes:
- ${DATA_PATH}/emqx/emqx1:/opt/emqx/data
- /etc/localtime:/etc/localtime:ro
emqx2:
image: emqx:5.0.24
container_name: emqx2
user: root
environment:
- "EMQX_NODE_NAME=emqx@node2.emqx.io"
- "EMQX_CLUSTER__DISCOVERY_STRATEGY=static"
- "EMQX_CLUSTER__STATIC__SEEDS=[emqx@node1.emqx.io,emqx@node2.emqx.io]"
healthcheck:
test: ["CMD", "/opt/emqx/bin/emqx_ctl", "status"]
interval: 5s
timeout: 25s
retries: 5
networks:
iot-bridge:
aliases:
- node2.emqx.io
volumes:
- ${DATA_PATH}/emqx/emqx2:/opt/emqx/data
- /etc/localtime:/etc/localtime:ro
networks:
iot-bridge:
driver: bridge
  • 注意事项
    • 要设置user: root
    • 设置networks,并且网络名称是node2.emqx.io中间要带点的。

执行命令

docker-compose up -d

查看集群状态

$ docker-compose exec emqx1 sh -c "emqx_ctl cluster status"
Cluster status: #{running_nodes => ['emqx@node1.emqx.io','emqx@node2.emqx.io'],
stopped_nodes => []}

安装客户端

下载与官网地址
建立连接
  • 支持 mqtt://ws://。如果使用 SSL/TLS 连接,需要修改为 mqtts://wss://
  • 用户认证信息:Broker 开启了用户认证,您可以在配置项中填写 Username 和 Password 信息。
  • 如果需要开启 SSL/TLS 认证,在配置中开启 SSL/TLS
    • 若选择 Self signed,需要进行证书配置。
      • 若是单向连接,只需选择您的 CA File。
      • 若是双向认证,还需要选择配置 Client Certificate File 和 Client key file。
  • MQTT 的协议版本:默认版本为 v3.1.1。如果选择了 v5.0,还可以配置 Session Expiry Interval、Receive Maximum 和 Topic Alias Maximum
  • 遗嘱消息:您可以配置遗嘱消息。Last-Will-QoSLast-Will-Retain 的值默认为 0 和 False
消息的发布与订阅
  • 订阅
    • 在 Topic 输入框中输入多个 Topic,使用逗号(,)进行分割
MQTT 5
  • MQTT 5.0 引入了 Clean Start 和会话过期间隔(Session Expiry Interval)这两项特性
  • 用户属性配置,连接成功后,MQTT 服务器就能获取到该客户端的用户属性内容。
多窗口
  • 上面有一个 ➕ 号图表,可以打开多个窗口。

管理界面

访问http://localhost:18083/

  • 默认密码是:admin public。 第一登陆后,我把密码修改成 fanhl@pku
  • 如果忘记密码,可以通过 CLI 来修改:./bin/emqx ctl admins passwd <Username> <Password>

2.2 所需服务器

  • 最大连接数 (支持的设备连接数上限):3w
  • 消息上下行 TPS(每秒钟消息发送和接收条数的总和,和消息大小无关):3W

配置估算: 4 核 8 GB 内存 2 节点

最终,还要靠压力测试来计算。

2.3 访问控制

认证

密码认证是最简单,也是使用最多的认证方式。此时,客户端需要提供能够表明身份的凭据,例如用户名、客户端 ID 以及对应的密码。

MQTT v5 增加了新特性 增强认证。可以实现对客户端和服务器的双向认证,服务器可以验证连接的客户端是否是真正的客户端,客户端也可以验证连接的服务器是否是真正的服务器,从而提供了更高的安全性。

  • 可以提供外接数据库就更好了。
  • 可以提供 API 函数就更好了。

授权

用户很多,有一个批量的规则,来授权。

EMQX 还允许在主题中使用占位符,在匹配规则时将当前客户端信息等动态替换到主题中,支持的占位符如下:

  • ${clientid}
  • ${username}

下面是参考的一些数据格式。

%% 允许用户名是 dashboard 的客户端订阅 "$SYS/#" 这个主题
{allow, {user, "dashboard"}, subscribe, ["$SYS/#"]}.
%% 允许来自127.0.0.1 的用户发布和订阅 "$SYS/#" 以及 "#"
{allow, {ipaddr, "127.0.0.1"}, all, ["$SYS/#", "#"]}.
%% 拒绝其他所有用户订阅 "$SYS/#""#" 主题
{deny, all, subscribe, ["$SYS/#", {eq, "#"}]}.
%% 如果前面的规则都没有匹配到,则允许所有操作
{allow, all}.

也可以添加下面的规则

url -X 'POST' \
'http://localhost:18083/api/v5/authorization/sources/built_in_database/clientid' \
-H 'accept: */*' \
-H 'Content-Type: application/json' \
-d '[
{
"clientid": "client1",
"rules": [
{
"action": "publish",
"permission": "allow",
"topic": "test/toopic/1"
},
{
"action": "subscribe",
"permission": "allow",
"topic": "test/toopic/2"
},
{
"action": "all",
"permission": "deny",
"topic": "eq test/#"
}
]
}
]'

黑名单

2.4 规则引擎

按照以前的逻辑,这部分功能可以通过自己编写程序实现,现在可以通过规则引擎实现。如果使用规则引擎,那么会增加更换 Mqtt Broker 的成本

规则引擎是 EMQX 内置基于 SQL 的数据处理组件,搭配 数据桥接 使用无需编写代码即可实现一站式的 IoT 数据提取、过滤、转换、存储与处理,以加速应用集成和业务创新。

规则的典型应用场景举例

  • 动作监听:智慧家庭智能门锁开发中,门锁会因为网络、电源故障、人为破坏等原因离线导致功能异常,使用规则配置监听离线事件向应用服务推送该故障信息,可以在接入层实现第一时间的故障检测的能力;
  • 数据筛选:车联网的卡车车队管理,车辆传感器采集并上报了大量运行数据,应用平台仅关注车速大于 40 km/h 时的数据,此场景下可以使用规则对消息进行条件过滤,向业务消息队列写入满足条件的数据;
  • 消息路由:智能计费应用中,终端设备通过不同主题区分业务类型,可通过配置规则将计费业务的消息接入计费消息队列并在消息抵达设备端后发送确认通知到业务系统,非计费信息接入其他消息队列,实现业务消息路由配置;
  • 消息编解码:其他公共协议 / 私有 TCP 协议接入、工控行业等应用场景下,可以通过规则的本地处理函数(可在 EMQX 上定制开发)做二进制 / 特殊格式消息体的编解码工作;亦可通过规则的消息路由将相关消息流向外部计算资源如函数计算进行处理(可由用户自行开发处理逻辑),将消息转为业务易于处理的 JSON 格式,简化项目集成难度、提升应用快速开发交付能力。

2.5 数据桥接

数据桥接是用来对接 EMQX 和外部数据系统的通道,比如 MySQL、MongoDB 等数据库, 或 Kafka,RabbitMQ 等消息中间件,或 HTTP 服务器等。

通过数据桥接,用户可以实时地将消息从 EMQX 发送到外部数据系统,或者从外部数据系统拉取数据并发送到 EMQX 的某个主题。

EMQX 开源版中仅支持 MQTT 桥接 和 Webhook

数据桥接特性

连接池

连接池是一组可重用的连接对象。通过连接池,用户无需在为每个请求重新创建连接,有助于降低资源消耗,提高连接效率和并发能力。

EMQX 会为每个需要创建数据桥的节点创建一个单独的连接池。例如,对一个包含 3 个 EMQX 节点的集群,如果将每个数据桥的连接池大小设置为 8,那么 EMQX 将创建 3 x 8 = 24 个连接。注意:请确保构建的连接池数量不要超过资源的连接限制。

Webhook

EMQX 支持通过 Webhook 的方式将客户端消息和事件发送到外部 HTTP 服务。

  • 搭建简易 HTTP 服务
  • 创建 Webhook 数据桥接
  • 创建数据转发规则

如果 EMQX 放在了容器中,那么就要导入到容器中的内容。

搭建简易 HTTP 服务,今后格式定下来,可以转到一个类中。

@PostMapping("/emqx")
public String emqx(@RequestBody String body) {
HttpServletRequest request= RequestContextHolderUtil.getRequest();
return body;
}

创建 Webhook 数据桥接

填写 url

填写内容

{
"payload": "${payload}" ,
"topic": "${topic}",
"username":"${username}",
"timestamp":${timestamp},
"qos":${qos}
}

创建数据转发规则

调试

发送一个消息,看传输的内容。

MQTT

MQTT 数据桥接是一种连接多个 EMQX 集群或其他 MQTT 服务的方式

Apache Kafka

Apache Kafka 数据桥接实现了 EMQX 客户端消息和事件与 Apache Kafka (包括 Confluent) 的桥接,能够提供 EMQX 与企业应用之间高性能、高可靠的数据集成,有效降低应用复杂度并提升扩展性。

2.6 功能区别

以下是 EMQX 开源版与企业版功能列表:

项目EMQX 开源版EMQX 企业版RocketMQ
产品定位全球领先的开源 MQTT Broker高可靠、可扩展的企业级 MQTT 物联网接入平台
伸缩性单集群至多 1 亿 MQTT 连接单集群至多 1 亿 MQTT 连接
性能> 500 万 MQTT 消息每秒> 500 万 MQTT 消息每秒
可靠性内存数据存储RocksDB 数据存储(即将支持)
延迟1~5 毫秒1~5 毫秒
SLA99.99%至多 99.999%
数据集成(开箱即用)340+
LicenseApache Version 2.0Commercial license (Business source license)
技术支持开源社区7x24 全球支持
MQTT 5.0imgimg
MQTT over QUICimgimg
MQTT 扩展imgimg
多协议网关imgimg
多租户imgimg
跨地域复制imgimg
数据持久化imgimg
Schema Registryimgimg
消息编解码imgimg
规则引擎imgimg
Flow Editorimgimg
文件传输imgimg
Kafka 集成imgimg
企业系统集成imgimg
故障排查imgimg
云原生 & K8simgimg
边缘计算imgimg

3、共享订阅

这是一个重要的功能,可以解决消费大量数据的要求。

下面模拟了

  • 一个发布者: admin,向 topic发送消息。
  • 一个共享组,有 3 个接受者:s1 s2 s3,共享订阅topic

发送者往 pub 中发送

订阅者通过:$share/g1/topic ,就能实现负载均衡的接受数据