具体实现
- 前言
- MQTT协议概念
- 组成部分
- 实现mqtt协议
- 测试
- 其他
前言
首先说明一下,netty实现并封装了mqtt协议,同时也为其写好了编解码器,但是再了解并搭建之前,尤其是还不了解netty和mqtt的同学,必须要清楚一件事:mqtt协议的所具备的功能都是需要你自己实现的。
简单举个例子,rabbitmq消息中间件应该都知道,我们在使用rabbit的时候只需要定义交换机、队列,然后生产者和消费者分别往指定队列发送消息和监听指定队列消息即可互相收发,但是MQTT只是一种协议,说白了就是一种概念,告诉你这种协议是什么样的,netty并没有帮你实现如何订阅发布,你需要根据自己具体的需求,按照mqtt协议的规范去实现主题订阅发布的功能。
不单是netty,凡是用到mqtt协议的,大概都是这种情况,也可能是博主开始研究的时候走入了误区,混淆了概念,后来才反应过来,当然,明白的就直接看正文吧。
MQTT协议概念
组成部分
- 固定头
包含消息的类型(Message Type)和QoS级别等标志位。
消息类型:
名称 | 描述 | 方向 |
---|---|---|
CONNECT | 客户端请求与服务端建立连接 | C->S(服务端接收) |
CONNACK | 服务端确认连接建立 | S->C(客户端接收) |
PUBLISH | 发布消息【QoS 0级别,最多分发一次】,生产者只会发送一次消息,不关心消息是否被代理服务端或消费者收到 | 双向都可 |
PUBACK | 收到发布消息确认,客户端接收【QoS 1级别,至少分发一次】,保证消息发送到服务端(也就是代理服务器broker),如果没收到或一定时间没收到服务端的ack,就会重发消息 | 双向都可 |
PUBREC | 收到发布消息【QoS 2级别】只分发一次消息,且保证到达 ,这三步保证消息有且仅有一次传递给消费者 | 双向都可 |
PUBREL | 释放发布消息【QoS 2级别】 | 双向都可 |
PUBCOMP | 完成发布消息【QoS 2级别】 | 双向都可 |
SUBSCRIBE | 订阅请求 | C->S(服务端接收) |
SUBACK | 订阅确认 | S->C(客户端接收) |
UNSUBSCRIBE | 取消订阅 | C->S(服务端接收) |
UNSUBACK | 取消订阅确认 | S->C(客户端接收) |
PING | 客户端发送PING(连接保活)命令 | |
PINGRSP | PING命令回复 | S->C(客户端接收) |
DISCONNECT | 断开连接 |
- 可变头
包含协议名,协议版本,连接标志,心跳间隔时间,连接返回码,主题名等。 - 消息体
包含消息内容,也就是payload。
实现mqtt协议
NettyServer服务端
/**
* @author: zhouwenjie
* @description: netty启动配置类
* @create: 2020-04-03 11:43
**/
@Slf4j
@Component
public class NettyServer {
@Autowired
private NettyServerChannelInitializer nettyServerChannelInitializer;
@Value("${netty.socket_port}")
private int socketPort;
public void start() {
//创建接收请求和处理请求的实例(默认线程数为 CPU 核心数乘以2也可自定义)
NioEventLoopGroup bossGroup = new NioEventLoopGroup(3);
NioEventLoopGroup workerGroup = new NioEventLoopGroup(6);
try {
//创建服务端启动辅助类(boostrap 用来为 Netty 程序的启动组装配置一些必须要组件,例如上面的创建的两个线程组)
ServerBootstrap socketBs = new ServerBootstrap();
//channel 方法用于指定服务器端监听套接字通道
//socket配置
socketBs.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(nettyServerChannelInitializer)
//ChannelOption.SO_BACKLOG对应的是tcp/ip协议listen函数中的backlog参数,函数listen(int socketfd,int backlog)用来初始化服务端可连接队列,服务端处理客户端连接请求是顺序处理的,所以同一时间只能处理一个客户端连接,多个客户端来的时候,服务端将不能处理的客户端连接请求放在队列中等待处理,backlog参数指定了队列的大小
.option(ChannelOption.SO_BACKLOG, 1024)
//如果TCP_NODELAY没有设置为true,那么底层的TCP为了能减少交互次数,会将网络数据积累到一定的数量后,服务器端才发送出去,会造成一定的延迟。在互联网应用中,通常希望服务是低延迟的,建议将TCP_NODELAY设置为true
.option(ChannelOption.TCP_NODELAY, true)
//快速复用,防止服务端重启端口被占用的情况发生
.option(ChannelOption.SO_REUSEADDR, true)
//默认的心跳间隔是7200s即2小时。Netty默认关闭该功能。
.option(ChannelOption.SO_KEEPALIVE, true);
ChannelFuture socketFuture = socketBs.bind(socketPort).sync();
if (socketFuture.isSuccess()) {
log.info("[*Netty服务端启动成功]");
socketFuture.channel().closeFuture().sync();
}else {
log.info("[~~~Netty服务端启动失败~~~]");
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
配置管道
/**
* @author: zhouwenjie
* @description: 配置管道 服务端初始化,客户端与服务器端连接一旦创建,这个类中方法就会被回调,设置出站编码器和入站解码器
* @create: 2020-04-03 14:14
**/
@Component
public class NettyServerChannelInitializer extends ChannelInitializer<SocketChannel> {
@Autowired
private NettyServerHandler nettyServerHandler;
@Override
protected void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("decoder", new MqttDecoder(1024 * 8));
pipeline.addLast("encoder", MqttEncoder.INSTANCE);
pipeline.addLast(nettyServerHandler);
}
}
配置处理器
/**
* @author: zhouwenjie
* @description: 服务端业务处理类
* @create: 2020-04-03 14:13
**/
@Slf4j
@Component
@ChannelHandler.Sharable
public class NettyServerHandler extends SimpleChannelInboundHandler<MqttMessage> {
public static final ConcurrentHashMap<String, ChannelHandlerContext> clientMap = new ConcurrentHashMap<String, ChannelHandlerContext>();
@Value("${netty.address_list}")
private List<String> addressList;
@Autowired
private MqttMsgBack mqttMsgBack;
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
InetSocketAddress address = (InetSocketAddress) ctx.channel().remoteAddress();
//判断连接是否合法
if (addressList.contains(address.getHostString())) {
clientMap.put(ctx.channel().id().toString(), ctx);
super.handlerAdded(ctx);
} else {
ctx.close();
}
}
/**
* 功能描述: 客户端终止连接服务器会触发此函数
*
* @param ctx
* @return void
* @author zhouwenjie
* @date 2020/4/3 16:47
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
clientMap.remove(ctx.channel().id().toString());
super.channelInactive(ctx);
}
/**
* 功能描述: 有客户端发消息会触发此函数
*
* @param ctx
* @param mqttMessage
* @return void
* @author zhouwenjie
* @date 2020/4/3 16:48
*/
@Override
public void channelRead0(ChannelHandlerContext ctx, MqttMessage mqttMessage) {
if (null != mqttMessage) {
log.info("接收mqtt消息:" + mqttMessage);
MqttFixedHeader mqttFixedHeader = mqttMessage.fixedHeader();
if (mqttFixedHeader.messageType().equals(MqttMessageType.CONNECT)) {
// 在一个网络连接上,客户端只能发送一次CONNECT报文。服务端必须将客户端发送的第二个CONNECT报文当作协议违规处理并断开客户端的连接
// 建议connect消息单独处理,用来对客户端进行认证管理等 这里直接返回一个CONNACK消息
mqttMsgBack.connectionAck(ctx, mqttMessage);
}
switch (mqttFixedHeader.messageType()) {
case PUBLISH: // 客户端发布消息
// PUBACK报文是对QoS 1等级的PUBLISH报文的响应
mqttMsgBack.publishAck(ctx, mqttMessage);
break;
case PUBREL: // 发布释放
// PUBREL报文是对PUBREC报文的响应
// to do
mqttMsgBack.publishComp(ctx, mqttMessage);
break;
case SUBSCRIBE: // 客户端订阅主题
// 客户端向服务端发送SUBSCRIBE报文用于创建一个或多个订阅,每个订阅注册客户端关心的一个或多个主题。
// 为了将应用消息转发给与那些订阅匹配的主题,服务端发送PUBLISH报文给客户端。
// SUBSCRIBE报文也(为每个订阅)指定了最大的QoS等级,服务端根据这个发送应用消息给客户端
// to do
mqttMsgBack.subscribeAck(ctx, mqttMessage);
break;
case UNSUBSCRIBE: // 客户端取消订阅
// 客户端发送UNSUBSCRIBE报文给服务端,用于取消订阅主题
// to do
mqttMsgBack.unsubscribeAck(ctx, mqttMessage);
break;
case PINGREQ: // 客户端发起心跳
// 客户端发送PINGREQ报文给服务端的
// 在没有任何其它控制报文从客户端发给服务的时,告知服务端客户端还活着
// 请求服务端发送 响应确认它还活着,使用网络以确认网络连接没有断开
mqttMsgBack.pingResp(ctx, mqttMessage);
break;
case DISCONNECT: // 客户端主动断开连接
// DISCONNECT报文是客户端发给服务端的最后一个控制报文, 服务端必须验证所有的保留位都被设置为0
break;
default:
break;
}
}
}
/**
* 功能描述: 心跳检测
*
* @param ctx 这里的作用主要是解决断网,弱网的情况发生
* @param evt
* @return void
* @author zhouwenjie
* @date 2020/4/3 17:02
*/
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
String socketString = ctx.channel().remoteAddress().toString();
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt;
if (event.state() == IdleState.READER_IDLE) {
System.out.println("Client: " + socketString + " READER_IDLE 读超时");
ctx.disconnect();
}
}
}
/**
* 功能描述:
*
* @param ctx
* @param cause
* @return void
* @author 发生异常会触发此函数
* @date 2020/4/3 16:49
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
}
}
mqtt订阅发布具体实现
@Slf4j
@Component
public class MqttMsgBack {
@Value("${netty.user_name}")
private String userName;
@Value("${netty.password}")
private String password;
public static final ConcurrentHashMap<String, HashSet<String>> subMap = new ConcurrentHashMap<String, HashSet<String>>();
/**
* 确认连接请求
*
* @param ctx
* @param mqttMessage
*/
public void connectionAck(ChannelHandlerContext ctx, MqttMessage mqttMessage) {
MqttConnectMessage mqttConnectMessage = (MqttConnectMessage) mqttMessage;
//获取连接者的ClientId
String clientIdentifier = mqttConnectMessage.payload().clientIdentifier();
//查询用户名密码是否正确
String userNameNow = mqttConnectMessage.payload().userName();
String passwordNow = mqttConnectMessage.payload().password();
if (userName.equals(userNameNow) && password.equals(passwordNow)) {
MqttFixedHeader mqttFixedHeaderInfo = mqttConnectMessage.fixedHeader();
MqttConnectVariableHeader mqttConnectVariableHeaderInfo = mqttConnectMessage.variableHeader();
//构建返回报文, 可变报头
MqttConnAckVariableHeader mqttConnAckVariableHeaderBack = new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_ACCEPTED, mqttConnectVariableHeaderInfo.isCleanSession());
//构建返回报文, 固定报头 至多一次(至少—次,只有一次)
MqttFixedHeader mqttFixedHeaderBack = new MqttFixedHeader(MqttMessageType.CONNACK, mqttFixedHeaderInfo.isDup(), MqttQoS.AT_MOST_ONCE, mqttFixedHeaderInfo.isRetain(), 0x02);
//构建连接回复消息体
MqttConnAckMessage connAck = new MqttConnAckMessage(mqttFixedHeaderBack, mqttConnAckVariableHeaderBack);
ctx.writeAndFlush(connAck);
//设置节点名
InetSocketAddress address = (InetSocketAddress) ctx.channel().remoteAddress();
log.info("终端登录成功,ID号:{},IP信息:{},终端号:{}", clientIdentifier, address.getHostString(), address.getPort());
} else {
ctx.close();
}
}
/**
* 根据qos发布确认
*
* @param ctx
* @param mqttMessage
*/
public void publishAck(ChannelHandlerContext ctx, MqttMessage mqttMessage) {
MqttPublishMessage mqttPublishMessage = (MqttPublishMessage) mqttMessage;
MqttFixedHeader mqttFixedHeaderInfo = mqttPublishMessage.fixedHeader();
MqttQoS qos = mqttFixedHeaderInfo.qosLevel();
//得到主题
String topicName = mqttPublishMessage.variableHeader().topicName();
//将消息发送给订阅的客户端
HashSet<String> clientHashSet = subMap.get(topicName);
if (clientHashSet != null) {
ByteBuf byteBuf = mqttPublishMessage.payload();
for (String id : clientHashSet) {
ChannelHandlerContext context = ServerMqttHandler.clientMap.get(id);
if (context == null) {
//防止客户端频繁上下线导致id变化,带来不必要的空指针
ServerMqttHandler.clientMap.remove(id);
} else {
//因为ByteBuf每次发送之后就会被清空了,下次发送就拿不到payload,所以提前复制一份
ByteBuf payload = byteBuf.retainedDuplicate();
MqttPublishMessage pubMessage = new MqttPublishMessage(mqttFixedHeaderInfo, mqttPublishMessage.variableHeader(), Unpooled.buffer().writeBytes(payload));
context.writeAndFlush(pubMessage);
}
}
}
//获取消息体
// ByteBuf msgBodyBuf = mqttPublishMessage.payload();
// byte[] tmp = new byte[msgBodyBuf.readableBytes()];
// msgBodyBuf.readBytes(tmp);
// String s = new String(tmp);
// log.info("收到:{},主题{}", s, topicName);
//返回消息给发送端
switch (qos) {
//至多一次
case AT_MOST_ONCE:
break;
//至少一次
case AT_LEAST_ONCE:
//构建返回报文, 可变报头
MqttMessageIdVariableHeader mqttMessageIdVariableHeaderBack = MqttMessageIdVariableHeader.from(mqttPublishMessage.variableHeader().packetId());
//构建返回报文, 固定报头
MqttFixedHeader mqttFixedHeaderBack = new MqttFixedHeader(MqttMessageType.PUBACK, mqttFixedHeaderInfo.isDup(), MqttQoS.AT_MOST_ONCE, mqttFixedHeaderInfo.isRetain(), 0x02);
//构建PUBACK消息体
MqttPubAckMessage pubAck = new MqttPubAckMessage(mqttFixedHeaderBack, mqttMessageIdVariableHeaderBack);
log.info("Qos:AT_LEAST_ONCE:{}", pubAck.toString());
ctx.writeAndFlush(pubAck);
break;
//刚好一次
case EXACTLY_ONCE:
//构建返回报文,固定报头
MqttFixedHeader mqttFixedHeaderBack2 = new MqttFixedHeader(MqttMessageType.PUBREC, false, MqttQoS.AT_LEAST_ONCE, false, 0x02);
//构建返回报文,可变报头
MqttMessageIdVariableHeader mqttMessageIdVariableHeaderBack2 = MqttMessageIdVariableHeader.from(mqttPublishMessage.variableHeader().packetId());
MqttMessage mqttMessageBack = new MqttMessage(mqttFixedHeaderBack2, mqttMessageIdVariableHeaderBack2);
log.info("Qos:EXACTLY_ONCE回复:{}" + mqttMessageBack.toString());
ctx.writeAndFlush(mqttMessageBack);
break;
default:
break;
}
}
/**
* 发布完成 qos2
*
* @param ctx
* @param mqttMessage
*/
public void publishComp(ChannelHandlerContext ctx, MqttMessage mqttMessage) {
MqttMessageIdVariableHeader messageIdVariableHeader = (MqttMessageIdVariableHeader) mqttMessage.variableHeader();
//构建返回报文, 固定报头
MqttFixedHeader mqttFixedHeaderBack = new MqttFixedHeader(MqttMessageType.PUBCOMP, false, MqttQoS.AT_MOST_ONCE, false, 0x02);
//构建返回报文, 可变报头
MqttMessageIdVariableHeader mqttMessageIdVariableHeaderBack = MqttMessageIdVariableHeader.from(messageIdVariableHeader.messageId());
MqttMessage mqttMessageBack = new MqttMessage(mqttFixedHeaderBack, mqttMessageIdVariableHeaderBack);
log.info("发布完成回复:{}" + mqttMessageBack.toString());
ctx.writeAndFlush(mqttMessageBack);
}
/**
* 订阅确认
*
* @param ctx
* @param mqttMessage
*/
public void subscribeAck(ChannelHandlerContext ctx, MqttMessage mqttMessage) {
MqttSubscribeMessage mqttSubscribeMessage = (MqttSubscribeMessage) mqttMessage;
MqttMessageIdVariableHeader messageIdVariableHeader = mqttSubscribeMessage.variableHeader();
//构建返回报文, 可变报头
MqttMessageIdVariableHeader variableHeaderBack = MqttMessageIdVariableHeader.from(messageIdVariableHeader.messageId());
Set<String> topics = mqttSubscribeMessage.payload().topicSubscriptions().stream().map(mqttTopicSubscription -> mqttTopicSubscription.topicName()).collect(Collectors.toSet());
List<Integer> grantedQoSLevels = new ArrayList<>(topics.size());
int i = 0;
for (String topic : topics) {
HashSet<String> contexts = subMap.get(topic);
if (contexts == null) {
contexts = new HashSet<>();
}
//存储订阅客户端
contexts.add(ctx.channel().id().toString());
subMap.put(topic, contexts);
grantedQoSLevels.add(mqttSubscribeMessage.payload().topicSubscriptions().get(i).qualityOfService().value());
i++;
}
// 构建返回报文 有效负载
MqttSubAckPayload payloadBack = new MqttSubAckPayload(grantedQoSLevels);
// 构建返回报文 固定报头
MqttFixedHeader mqttFixedHeaderBack = new MqttFixedHeader(MqttMessageType.SUBACK, false, MqttQoS.AT_MOST_ONCE, false, 2 + topics.size());
// 构建返回报文 订阅确认
MqttSubAckMessage subAck = new MqttSubAckMessage(mqttFixedHeaderBack, variableHeaderBack, payloadBack);
ctx.writeAndFlush(subAck);
}
/**
* 取消订阅确认
*
* @param ctx
* @param mqttMessage
*/
public void unsubscribeAck(ChannelHandlerContext ctx, MqttMessage mqttMessage) {
MqttUnsubscribeMessage mqttUnsubscribeMessage = (MqttUnsubscribeMessage) mqttMessage;
MqttMessageIdVariableHeader messageIdVariableHeader = mqttUnsubscribeMessage.variableHeader();
// 构建返回报文 可变报头
MqttMessageIdVariableHeader variableHeaderBack = MqttMessageIdVariableHeader.from(messageIdVariableHeader.messageId());
// 构建返回报文 固定报头
MqttFixedHeader mqttFixedHeaderBack = new MqttFixedHeader(MqttMessageType.UNSUBACK, false, MqttQoS.AT_MOST_ONCE, false, 2);
// 构建返回报文 取消订阅确认
MqttUnsubAckMessage unSubAck = new MqttUnsubAckMessage(mqttFixedHeaderBack, variableHeaderBack);
log.info("取消订阅回复:{}", unSubAck);
//删除本地订阅客户端
List<String> topics = mqttUnsubscribeMessage.payload().topics();
for (String topic : topics) {
HashSet<String> hashSet = subMap.get(topic);
if (hashSet != null) {
hashSet.remove(ctx.channel().id().toString());
}
}
ctx.writeAndFlush(unSubAck);
}
/**
* 心跳响应
*
* @param ctx
* @param mqttMessage
*/
public void pingResp(ChannelHandlerContext ctx, MqttMessage mqttMessage) {
MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PINGRESP, false, MqttQoS.AT_MOST_ONCE, false, 0);
MqttMessage mqttMessageBack = new MqttMessage(fixedHeader);
log.info("心跳回复:{}", mqttMessageBack.toString());
ctx.writeAndFlush(mqttMessageBack);
}
}
测试
这里使用MQTT.fx工具,免费使用版本1.7.1,下载地址,密码:6nst。
这个软件是可以多开的,多开的时候记得设置好,尤其是clientId,不要一样。
中文可能会乱码,这个工具好像不支持中文。
其他
如果想了解更详细的协议可以参考这篇文章,MQTT协议分析。