1 搭建MQTT服务器
1.1 Dockerfile 内容
FROM rabbitmq:3.11.6-management
COPY install_rabbitmq_plus.sh /usr/local/
RUN chmod 777 /usr/local/install_rabbitmq_plus.sh
RUN /bin/sh /usr/local/install_rabbitmq_plus.sh
1.2 容器中需要安装插件的命令 放在 install_rabbitmq_plus.sh 文件中
rabbitmq-plugins enable rabbitmq_management
rabbitmq-plugins enable rabbitmq_mqtt
rabbitmq-plugins enable rabbitmq_web_mqtt
1.3 构件容器需要用到的yml配置 docker-compose-RabbitMQ.yaml
version: "3.6"
services:
#服务
rabbitmq:
build: #镜像构建
context: . #构建镜像时所在的资源路径
dockerfile: Dockerfile #构建镜像时需要的dockerfile文件路径
ports:
- 5672:5672
- 15672:15672
# mqtt端口
- 15675:15675
- 1883:1883
volumes:
- ./data/rabbitmq:/var/lib/rabbitmq
environment:
- TZ=Asia/Shanghai
- RABBITMQ_DEFAULT_USER=root
- RABBITMQ_DEFAULT_PASS=1234
restart: always
1.4 docker-compose 启动RabbitMQ以及MQTT插件服务
docker-compose -f docker-compose-RabbitMQ.yaml up --build -d
2 .net core 实现
using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Packets;
using MQTTnet.Protocol;
public class XJ_MQTT
{
public MqttClient GetClient()
{
MqttClient mqttClient = new MQTTnet.MqttFactory().CreateMqttClient() as MqttClient;
mqttClient.ApplicationMessageReceivedAsync += MqttClient_ApplicationMessageReceivedAsync1;
mqttClient.ConnectedAsync += MqttClient_ConnectedAsync;
mqttClient.ConnectingAsync += MqttClient_ConnectingAsync;
mqttClient.DisconnectedAsync += MqttClient_DisconnectedAsync;
mqttClient.InspectPacketAsync += MqttClient_InspectPacketAsync;
try
{
MqttClientOptionsBuilder optionsBuilder = new MqttClientOptionsBuilder();
optionsBuilder.WithTcpServer("mqtt的服务器IP", 1883);
string id = Guid.NewGuid().ToString("N");
optionsBuilder.WithClientId(id);
optionsBuilder.WithProtocolVersion(MQTTnet.Formatter.MqttProtocolVersion.V311);
optionsBuilder.WithCredentials("root", "1234");
mqttClient.ConnectAsync(optionsBuilder.Build()).Wait();
}
catch (Exception e)
{
Console.WriteLine($"连接到MQTT服务器失败" + Environment.NewLine + e.Message + Environment.NewLine);
}
return mqttClient;
}
private Task MqttClient_InspectPacketAsync(MQTTnet.Diagnostics.InspectMqttPacketEventArgs arg)
{
Console.WriteLine($"MqttClient_InspectPacketAsync :{UTF8Encoding.UTF8.GetString(arg.Buffer)}" + Environment.NewLine);
return Task.CompletedTask;
}
private Task MqttClient_DisconnectedAsync(MqttClientDisconnectedEventArgs arg)
{
Console.WriteLine("已断开MQTT服务器" + Environment.NewLine);
return Task.CompletedTask;
}
private Task MqttClient_ConnectingAsync(MqttClientConnectingEventArgs arg)
{
Console.WriteLine("链接MQTT服务器中...." + Environment.NewLine);
return Task.CompletedTask;
}
private Task MqttClient_ConnectedAsync(MqttClientConnectedEventArgs arg)
{
Console.WriteLine("已连接到MQTT服务器" + Environment.NewLine);
return Task.CompletedTask;
}
private Task MqttClient_ApplicationMessageReceivedAsync1(MqttApplicationMessageReceivedEventArgs arg)
{
Console.WriteLine($">>ClientId:{arg.ClientId} \r\nTopic:{arg.ApplicationMessage.Topic}\r\nPayload:{Encoding.UTF8.GetString(arg.ApplicationMessage.Payload)}{Environment.NewLine}" + Environment.NewLine);
return Task.CompletedTask;
}
}
2.1 实现发布
using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Packets;
using MQTTnet.Protocol;
var mq = new XJ_MQTT();
var client = mq.GetClient();
while (true)
{
string input = Console.ReadLine();
client.PublishStringAsync("testTpic", "payload" + Guid.NewGuid().ToString("N") + ":" + input);
}
2.2 实现订阅
using MQTTnet.Packets;
var mq = new XJ_MQTT();
var client = mq.GetClient();
client.SubscribeAsync(new MQTTnet.Client.MqttClientSubscribeOptions()
{
TopicFilters = new List<MqttTopicFilter>() { new MqttTopicFilter() { Topic = "testTpic" } }
});
while (true)
Console.ReadLine();
3 运行效果