基于EMQ的企信说明文档(包含EMQ安装步骤、JAVA服务端、VUE客户端)
整体数据流图:
VUE简单demo:
- 什么是EMQ
EMQ X R3.1 (Erlang/Enterprise/Elastic MQTT Broker) 是基于 Erlang/OTP 语言平台开发,支持大规模连接和分布式集群,简单来说,EMQ是基于MQTT协议的一个发布订阅模式的消息服务器。
- 什么是MQTT协议
MQTT(消息队列遥测传输)是ISO 标准(ISO/IEC PRF 20922)下基于发布/订阅范式的消息协议。它工作在 TCP/IP协议族上,是为硬件性能低下的远程设备以及网络状况糟糕的情况下而设计的发布/订阅型消息协议
MQTT特点
MQTT协议是为大量计算能力有限,且工作在低带宽、不可靠的网络的远程传感器和控制设备通讯而设计的协议,它具有以下主要的几项特性:
1、使用发布/订阅消息模式,提供一对多的消息发布,解除应用程序耦合;
2、对负载内容屏蔽的消息传输;
3、使用 TCP/IP 提供网络连接;
4、有三种消息发布服务质量:
-
- “至多一次”,消息发布完全依赖底层 TCP/IP 网络。会发生消息丢失或重复。这一级别可用于如下情况,环境传感器数据,丢失一次读记录无所谓,因为不久后还会有第二次发送。
- “至少一次”,确保消息到达,但消息重复可能会发生。
- “只有一次”,确保消息到达一次。这一级别可用于如下情况,在计费系统中,消息重复或丢失会导致不正确的结果。
5、小型传输,开销很小(固定长度的头部是 2 字节),协议交换最小化,以降低网络流量;
6、使用 Last Will 和 Testament 特性通知有关各方客户端异常中断的机制。 [1]
3. EMQ的安装部署
3.1 下载安装
从EMQ官网下载emq的安装包,以最新的基于Centos7系统的3.2版本zip包为例,下载后用unzip解压出emqx目录。(旧版本为emqttd,新版本为emqx)
第一步:打开EMQ官网,下载emq安装包
打开官网https://www.emqx.io/cn/
在右上角有个下载按钮,点击下载,进入下载页面,选择版本v3.2.0, 类型为Linux/centos7/zip 点下载
下载地址:https://www.emqx.io/cn/downloads#broker
此处选择Broker类型
第二部:拷贝安装包到Centos服务器root目录
采用cp命令将包推送到centos服务器:cp ./emqx-centos7-v3.2.0.zip root@192.168.6.118:/root/
第三部:解压安装包,启动程序
登陆到centos服务器终端,采用unzip命令加压刚推上来的zip包,得到/root/emqx文件夹。该文件夹内即为emq的所有程序。可以执行./bin/emqx console 启动emq服务
命令: unzip ./emqx-centoos7-v3.2.0.zip
如无特殊更改直接进入emqx输入命令./bin/emqx console启动即可,默认后台仪表板访问地址为http://服务器ip:18083/#/ 默认的代码接入地址为: tcp://服务器ip :1883
第五步:查看仪表板
如果能访问后台仪表板,则表示启动正常,后台默认用户名密码为admin public
查看TOPIC
后续如果有客户端接入订阅了topic则可以在 MONITORING/Topics栏下查看topic列表, 如下图,当前topic为用户的id,每个用户订阅接收自己id的消息:
查看EMQ链接
其中clientId = transfer的为JAVA后端, clientId为用户id的为web客户端。当前测试客户端为网页,clientId上报的都为用户id,便于区分。
3.2 EMQ命令配置说明
emqx console 表示控制台方式启动,能查看到打印,
exqx start 为守护进程方式启动,
emqx status 可以查看emqx的启动状态。
MQ 消息服务器主要配置文件:
etc/emqx.conf | EMQ消息服务器参数设置 |
etc/plugins/*.conf | EMQ 插件配置文件 |
etc/emqx.conf 中两个重要的虚拟机启动参数:
node.process_limit | Erlang虚拟机允许的最大进程数,emqttd一个连接会消耗2个Erlang进程 |
node.max_ports | Erlang虚拟机允许的最大Port数量,emqttd一个连接消耗1个Port |
Erlang的Port非TCP端口,可以理解为文件句柄。
node.process_limit = 参数值 > 最大允许连接数 * 2
node.max_ports = 参数值 > 最大允许连接数
实际连接数量超过Erlang虚拟机参数设置,会引起EMQ消息服务器宕机!
vim ./etc/emqx.conf
节点名称:
node.name = emqx@127.0.0.1
节点名格式: Name@Host, Host必须是IP地址或FQDN(主机名.域名)
erlang VM 参数 (以100万连接参考)
node.process_limit = 2097152
node.max_ports = 1048576
TCP(SSL) Socket Options
mqtt.listener.tcp = 11883
mqtt.listener.tcp.acceptors = 64 mqtt.listener.tcp.max_clients = 1000000
mqtt.listener.ssl = 18883
mqtt.listener.ssl.acceptors = 64
mqtt.listener.ssl.max_clients = 1000000
mqtt.listener.ssl.keyfile = etc/certs/key.pem
mqtt.listener.ssl.certfile = etc/certs/cert.pem
## HTTP(SSL) Listener
mqtt.listener.http = 18083
mqtt.listener.http.acceptors = 32
mqtt.listener.http.max_clients = 100000
mqtt.listener.https = 18084
mqtt.listener.https.acceptors = 32
mqtt.listener.https.max_clients = 100000
mqtt.listener.https.certfile = etc/certs/cert.pem
mqtt.listener.https.keyfile = etc/certs/key.pem
## console 日志配置
log.console = file
log.console.level = debug
log.console.file = log/console.log
Dashboard配置
vim /home/hzmail/wm/emqtt/emqttd1/etc/plugins/emq_dashboard.conf
修改 端口
dashboard.listener.http = 18083
http://0.0.0.0:18083
修改管理员密码
4. 代码接入EMQ, 发布订阅消息
4.1 整体框架图
EMQ仪表板上显示的客户端列表如下:
4.2 JAVA后端接入
JAVA后端为SpringBoot工程,主要负责消息的接收,入库,和分发。
接入步骤如下:
第一步: 数据库设计
主要包含两个核心表,t_chat, t_message
T_chat 表负责记录人跟人,人跟群的聊天关系,同一个发送者和同一个接收者只包含一条记录,同时记录对应聊天的消息未读数,和最后一条消息id.
T_chat表列结构
字段名 | 描述 |
tenant_id | 公司id |
source_id | 发送者id |
target_id | 接收者id |
owner_id | 群组创建者 |
target_type | 接收者类型 |
last_msg_id | 用户上次访问最大消息ID |
new_msg_id | 最新消息ID |
fullname | 聊天名(单聊为对方名称,群聊为群名) |
pinned | 置顶标志,1置顶,0不置顶 |
notify | 通知标志,1弹出通知,0不弹出通知 |
bloked | 阻止消息标志,1阻止,0不组织 |
status | 0正常,1在聊天界面删除,3失效,4群解散 |
unread | 未读数 |
updated_time | 更新时间 |
created_time | 创建时间 |
version | 版本号,用来排序 |
实际表里数据样例:
T_chat表建表SQL
DROP TABLE IF EXISTS `t_chat`;
CREATE TABLE `t_chat` (
`id` bigint(11) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键',
`tenant_id` bigint(20) DEFAULT NULL COMMENT '公司id',
`source_id` bigint(20) DEFAULT NULL COMMENT '发送者id',
`target_id` bigint(20) DEFAULT NULL COMMENT '接收者id',
`owner_id` bigint(20) DEFAULT NULL COMMENT '消息所有者',
`target_type` tinyint(4) DEFAULT '0' COMMENT '接收目标类型,0 :人 1: 群',
`last_msg_id` bigint(20) DEFAULT NULL COMMENT '最后一个消息的id',
`new_msg_id` bigint(20) DEFAULT NULL COMMENT '信消息的id',
`fullname` varchar(128) DEFAULT NULL COMMENT '如果和人聊天则为对方名称,如果和群聊天则为群名称',
`pinned` tinyint(4) DEFAULT NULL COMMENT '是否置顶',
`notify` tinyint(4) DEFAULT NULL COMMENT '是否推送通知',
`bloked` tinyint(4) DEFAULT NULL COMMENT '是否被锁定',
`status` tinyint(4) DEFAULT NULL COMMENT '消息状态',
`unread` bigint(20) DEFAULT NULL COMMENT '未读数',
`version` bigint(20) DEFAULT NULL COMMENT '版本号',
`created_time` datetime DEFAULT NULL COMMENT '创建时间',
`updated_time` datetime DEFAULT NULL COMMENT '更新时间',
PRIMARY KEY (`id`),
KEY `idx_id` (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=7 DEFAULT CHARSET=utf8mb4;
T_message表 负责记录具体的消息内容,消息的类型,消息的发送者接收者
T_message表列结构
列名 | 类型 | 备注 |
id | BIGINT | 消息id,自增长主键 |
uuid | VARCHAR | 消息的唯一id,随机生成 |
tenant_id | BIGINT | 公司id |
created_by_id | BIGINT | 创建者id |
updated_by_id | BIGINT | 修改者id |
created_at | DATETIME | 创建时间 |
updated_at | DATETIME | 更新时间 |
version | INT | 版本号 |
source_id | BIGINT | 发送者id |
source_type | INT | 发送者类型 |
target_id | BIGINT | 接收者id |
target_type | INT | 接收者类型 |
content_type | INT | 消息内容类型 |
content | VARCHAR | 消息内容 |
表内数据样例:
T_message表建表SQL
DROP TABLE IF EXISTS `t_message`;
CREATE TABLE `t_message` (
`id` bigint(11) unsigned NOT NULL AUTO_INCREMENT,
`tenant_id` bigint(20) DEFAULT NULL,
`uuid` varchar(128) DEFAULT NULL,
`created_by_id` bigint(20) DEFAULT NULL,
`updated_by_id` bigint(20) DEFAULT NULL,
`created_at` datetime DEFAULT NULL COMMENT '创建时间',
`updated_at` datetime DEFAULT NULL COMMENT '更新时间',
`source_id` bigint(20) DEFAULT NULL,
`source_type` tinyint(4) DEFAULT '0',
`target_id` bigint(20) DEFAULT NULL,
`target_type` tinyint(4) DEFAULT '0',
`content` varchar(2048) CHARACTER SET utf8 DEFAULT NULL,
`content_type` tinyint(4) DEFAULT NULL,
PRIMARY KEY (`id`),
KEY `idx_id` (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=141 DEFAULT CHARSET=utf8mb4;
第二步: 创建SpringBoot工程
工程结构如下:
第三步: 编写Service,Dao对接表
当前采用的mybitise数据库中间件,编写mapper封装类。
第四步: 在service中启动EMQ订阅
首先需要在pom里引用paho的库:
Pom.xml引入jar包
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.0</version>
</dependency>
在ChatService的构造时建立和EMQ的连接,当前与EMQ建立链接采用的Paho的jar库,订阅通用topic: IM_MSG, 接收所有发送者发送的企信消息,将消息解析出来存储chat表和Message表,再将消息以接收者id为topic发布emq消息。
对应文件:com.qiyu.transfer.service.ChatService
第五步: 编写Controller,对终端提供接口
编写controller主要提供给客户端按照发送者和接受者拉取离线消息,接口支持翻页查询
对应文件:com.qiyu.transfer.controller.ChatController
读取历史消息接口
返回json如下:
[
{
"id": 145,
"tenantId": null,
"uuid": "b748a781-a1ee-410f-8137-0f46cd5ef34c",
"createdById": null,
"updatedById": null,
"createdAt": "2019-07-25 15:30:27",
"updatedAt": null,
"sourceId": 800001,
"sourceType": null,
"targetId": 800002,
"targetType": null,
"content": "李四,你好啊,我是张三",
"contentType": null
},
{
"id": 146,
"tenantId": null,
"uuid": "cb2a9b4a-69e5-4e95-8142-4250c78ee3fd",
"createdById": null,
"updatedById": null,
"createdAt": "2019-07-25 15:31:40",
"updatedAt": null,
"sourceId": 800002,
"sourceType": null,
"targetId": 800001,
"targetType": null,
"content": "张三,你好,我是李四",
"contentType": null
}
]
标记消息已读接口
场景: 当在聊天记录里点击和某个人的聊天的后,调用该接口将未读数标记为0.
4.2.1 发布消息
消息质量配置
message.setQos(qos); // 设置消息的服务质量
qos值含义: 0 最多一次,1 至少一次, 2 只有一次
消息队列与飞行窗口
EMQ的会话层通过一个内存消息队列和飞行窗口处理下发消息:
飞行窗口(Inflight Window)保存当前正在发送未确认的 Qos1/2 消息。窗口值越大,吞吐越高;窗口值越小,消息顺序越严格。
当客户端离线或者飞行窗口(Inflight Window)满时,消息缓存到队列。如果消息队列满,先丢弃 Qos=0 消息或最早进入队列的消息。
发布函数原型
代码位于:com.qiyu.transfer.mqtt.MqttPahoHelper.java
发布消息参数说明
- Topic : 个人理解为消息管道名称,此处为服务器端,发送消息的topic为用户id,表示向哪个用户发送消息。
- Content: 消息内容,可以是文本形式,也可能是上传到七牛等cdn服务器后得到的文档、图片、语音的url, 客户端根据conentType来区分如何展示消息体内容。
MqttClient构造参数说明
- ServerURI: 为emqx服务器端ip端口,如http://192.168.4.152:8083
- ClientId: emqx服务器上查看到的接入的客户端id,此处为服务器端,我们定义为”transfer”
- Persistence: 为paho库内数据缓冲容器,我们只需要new一个对象传入即可。
public boolean publishMsg(String topic,String content) {
try {
// 创建客户端
if (mMqttClient == null) {
mMqttClient = new MqttClient(serverURI, clientId, persistence);
// 创建链接参数
MqttConnectOptions connOpts = new MqttConnectOptions();
// 在重新启动和重新连接时记住状态
connOpts.setCleanSession(false);
// 设置连接的用户名
//connOpts.setUserName(userName);
//connOpts.setPassword(password.toCharArray());
// 建立连接
mMqttClient.connect(connOpts);
}
// 创建消息
MqttMessage message = new MqttMessage(content.getBytes());
// 设置消息的服务质量
message.setQos(qos);
// 发布消息
mMqttClient.publish(topic, message);
} catch (Exception me) {
me.printStackTrace();
return false;
}
return true;
}
4.2.2 订阅消息
订阅说明
订阅消息传入两个参数,一个是topic,表示订阅哪个topic的消息,另一个是回调函数,当有消息的走回调函数处理。
在服务器端只订阅一个topic为”IM_MSG”的消息,所有客户端聊天消息都先发到该topic。 该topic名称目前在application.propertity中配置。
订阅函数原型
代码位于:com.qiyu.transfer.mqtt.MqttPahoHelper.java
订阅消息参数说明
- Topic : transfer服务端订阅消息的名称,此处固定为IM_MSG
- MsgCallback: 当收到客户端发送的topic=IM_MSG后的回调函数
/**
* 订阅消息
* @param topic
* @param cb
* @return
*/
@Override
public boolean subscribeMsg(String topic ,MsgCallback cb) {
try {
// serverURI为主机名,transfer为clientid即连接MQTT的客户端ID,一般以客户端唯一标识符表示,MemoryPersistence设置clientid的保存形式,默认为以内存保存
// 创建客户端
if (mMqttClient == null) {
mMqttClient = new MqttClient(serverURI, clientId, persistence);
// MQTT的连接设置
MqttConnectOptions options = new MqttConnectOptions();
// 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,这里设置为true表示每次连接到服务器都以新的身份连接
options.setCleanSession(true);
// 设置连接的用户名,密码
//options.setUserName(userName);
//options.setPassword(passWord.toCharArray());
// 设置超时时间 单位为秒
options.setConnectionTimeout(10);
// 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制
options.setKeepAliveInterval(20);
mMqttClient.connect(options);
}
// 设置回调函数
mMqttClient.setCallback(new MqttCallback() {
public void connectionLost(Throwable cause) {
System.out.println("connectionLost");
}
public void messageArrived(String topic, MqttMessage message) throws Exception {
//System.out.println("topic:"+topic);
//System.out.println("Qos:"+message.getQos());
//System.out.println("message content:"+new String(message.getPayload()));
cb.onRecived(new MsgData(message.getPayload()));
}
public void deliveryComplete(IMqttDeliveryToken token) {
//System.out.println("deliveryComplete---------"+ token.isComplete());
}
});
//订阅消息
mMqttClient.subscribe(topic, qos);
} catch (Exception e) {
e.printStackTrace();
}
return true;
}
4.2.3 自动重连机制
当EMQ服务断开时所有订阅的客户端会被迫断开,如果客户端不自己添加重连机制,则当EMQ服务器恢复时不能恢复正常通讯。此时需要客户端监听onConnectionLost回调,并在回调里起定时器间隔重连,代码如下:
public void connectionLost(Throwable cause) {
log.info("connectionLost, reconnect in 30 seconds..");
reConnect();
}
public void reConnect() {
if (mMqttClient == null || mMqttClient.isConnected()) {
return;
}
while (true) {
try {
Thread.sleep(10000);
if (mMqttClient != null && ! mMqttClient.isConnected()) {
mMqttClient.connect(mOptions);
//订阅消息
if (mTopic != null && !mTopic.isEmpty())
mMqttClient.subscribe(mTopic, qos);
log.info("reconnect done!");
}
break;
} catch (Exception e) {
//e.printStackTrace();
log.error("reconnect error! e=" + e.getMessage());
continue;
}
}
}
4.3 WEB前端接入,实现聊天功能
4.3.1 界面展现
4.3.2 连接方式-Paho JS库
前端测试页面为html单页面,页面采用html + vue实现展示和数据交互,采用了paho.js库实现mqtt协议对接,采用axios实现http离线消息拉取.
位于后台工程的resource下:
核心代码块:
initWebSocket(fromUserId,serverIp){ //初始化weosocket
var thiz = this;
//建立客户端实例
client = new Paho.MQTT.Client(serverIp,8083,"/mqtt",fromUserId);
client.connect({onSuccess:onConnect});//连接服务器并注册连接成功处理事件
client.onConnectionLost = onConnectionLost;//注册连接断开处理事件
client.onMessageArrived = onMessageArrived;//注册消息接收处理事件
var unsubscribeCb={ onSuccess(){ console.log("退订成功") },
onFailure(error){ console.log("退订失败 e=" + error) }}
function onConnect() {
console.log("mqtt 连接成功 " + fromUserId);
if (thiz.lastTopic != '' && thiz.lastTopic != fromUserId) {
client.unsubscribe(thiz.lastTopic,unsubscribeCb);//取消订阅主题
}
client.subscribe(fromUserId);//订阅主题
thiz.lastTopic = fromUserId;
if (thiz.reconnectTimer) {
clearTimeout(thiz.reconnectTimer)
}
}
function keepTryReconnect() {
this.reconnectTimer = setInterval(doReconnect,5000);
}
4.3.3 自动重连机制
当EMQ服务断开时所有订阅的客户端会被迫断开,如果客户端不自己添加重连机制,则当EMQ服务器恢复时不能恢复正常通讯。此时需要客户端监听onConnectionLost回调,并在回调里起定时器间隔重连,代码如下:
function keepTryReconnect() {
this.reconnectTimer = setInterval(doReconnect,5000);
}
function doReconnect() {
try {
console.log("触发重连..");
if (client.isConnected()) {
console.log("触发重连.. 已连接退出重连");
clearInterval(this.reconnectTimer)
return;
}
client.connect({onSuccess: onConnect});//连接服务器并注册连接成功处理事件
} catch (e) {
console.log("重连失败.." + e);
}
}
5. EMQ的集群配置
EMQ多核服务器和现代操作系统内核层面,可以很轻松支持100万 TCP 连接,核心问题是应用层面如何处理业务瓶颈。
EMQ X 消息服务器在业务和应用层面,解决了单节点承载100万连接的各类瓶颈问题。但基于稳定性考量,最好还是采用集群的方式。
5.1 集群的方式
EMQ支持手动集群和自动发现集群。
配置集群前需要先将每个EMQ节点命名,命名方式为编辑配置文件:
emqx/etc/emqx.conf:
node.name = emqx@s1.emqx.io
或 node.name = emqx@192.168.0.10
5.2 手动集群
通过手动执行命令来配置集群。
节点加入集群
启动两台节点后,emqx@s2.emqx.io 上执行:
$ ./bin/emqx_ctl cluster join emqx@s1.emqx.io
Join the cluster successfully.
Cluster status: [{running_nodes,['emqx@s1.emqx.io','emqx@s2.emqx.io']}]
或,emq@s1.emqx.io 上执行:
$ ./bin/emqx_ctl cluster join emqx@s2.emqx.io
Join the cluster successfully.
Cluster status: [{running_nodes,['emqx@s1.emqx.io','emqx@s2.emqx.io']}]
任意节点上查询集群状态:
$ ./bin/emqx_ctl cluster status
Cluster status: [{running_nodes,['emqx@s1.emqx.io','emqx@s2.emqx.io']}]
节点退出集群
节点退出集群,两种方式:
- leave: 本节点退出集群
- force-leave: 从集群删除其他节点
emqx@s2.emqx.io 主动退出集群:
$ ./bin/emqx_ctl cluster leave
或 emqx@s1.emqx.io 节点上,从集群删除 emqx@s2.emqx.io 节点:
$ ./bin/emqx_ctl cluster force-leave emqx@s2.emqx.io
5.3 自动发现组建集群
基于 static 节点列表自动集群
配置固定的节点列表,自动发现并创建集群:
cluster.discovery = static
cluster.static.seeds = emq1@127.0.0.1,ekka2@127.0.0.1
基于 mcast 组播自动集群
基于 UDP 组播自动发现并创建集群:
cluster.discovery = mcast
cluster.mcast.addr = 239.192.0.1
cluster.mcast.ports = 4369,4370
cluster.mcast.iface = 0.0.0.0
cluster.mcast.ttl = 255
cluster.mcast.loop = on
这些集群配置的位置为emqx解压目录下: ./etc/emqx.conf ,文件中已有这些配置原型,配置截图如下: