C# MQTTNET 服务端+客户端 实现 源码示例

news2025/1/23 8:02:28

目录

1.演示效果

2.源码下载

3.服务端介绍

4.客户端介绍


1.演示效果

2.源码下载

下载地址:https://download.csdn.net/download/rotion135/89385802

3.服务端介绍

服务端用的控制台程序进行设计,实际使用可以套一层Windows服务的皮,进行服务部署。

调试用控制台显示收发的消息,便于直观

首先安装的MQTTNET 版本是4.3.6.1152 :

自定义了服务的客户端列表数据模型:

    public class MqttClientInfo
    {
        /// <summary>
        /// ID
        /// </summary>
        public string ClientId { get; set; }
        /// <summary>
        /// 客户端名称
        /// </summary>
        public string ClientName { get; set; }
        /// <summary>
        /// 订阅列表
        /// </summary>
        public List<MqttSubscription> Subscriptions { get; set; } = new List<MqttSubscription>();
    }

    public class MqttSubscription
    {
        /// <summary>
        /// 所属客户端
        /// </summary>
        public MqttClientInfo Parent { get; set; }
        /// <summary>
        /// 订阅消息
        /// </summary>
        public string Topic { get; set; }
    }

再对服务端的代码进行封装,添加响应的事件,做一些消息显示到控制台

服务端的代码就这么简单

    public class LSMQTTServer
    {
        MqttServer mqttServer;
        List<MqttClientInfo> MqttClients = new List<MqttClientInfo>();

        /// <summary>
        /// 初始化Mqtt服务并启动服务
        /// </summary>
        /// <param name="ip"></param>
        /// <param name="port"></param>
        public virtual void InitMqttServer(string ip, int port)
        {
            var mqttServerOptions =
                    new MqttServerOptionsBuilder()
                    .WithDefaultEndpoint()
                    .WithDefaultEndpointBoundIPAddress(IPAddress.Parse(ip))//set the ip of the server
                    .WithDefaultEndpointPort(port)//set the port of the server                    
                    .Build();
            mqttServer = new MqttFactory().CreateMqttServer(mqttServerOptions); // create MQTT service object
            mqttServer.ValidatingConnectionAsync += MqttServer_ValidatingConnectionAsync;
            mqttServer.ClientConnectedAsync += MqttServer_ClientConnectedAsync;
            mqttServer.ClientDisconnectedAsync += MqttServer_ClientDisconnectedAsync;
            mqttServer.ClientSubscribedTopicAsync += MqttServer_ClientSubscribedTopicAsync;
            mqttServer.ClientUnsubscribedTopicAsync += MqttServer_ClientUnsubscribedTopicAsync;
            mqttServer.InterceptingPublishAsync += MqttServer_InterceptingPublishAsync;
            mqttServer.ClientAcknowledgedPublishPacketAsync += MqttServer_ClientAcknowledgedPublishPacketAsync;
            mqttServer.InterceptingClientEnqueueAsync += MqttServer_InterceptingClientEnqueueAsync;
            mqttServer.ApplicationMessageNotConsumedAsync += MqttServer_ApplicationMessageNotConsumedAsync;

            mqttServer.StartAsync();
        }       

        private Task MqttServer_ApplicationMessageNotConsumedAsync(ApplicationMessageNotConsumedEventArgs arg)
        {
            try
            {
            }
            catch (Exception ex)
            {
                LogOperate.Error("MqttServer_ApplicationMessageNotConsumedAsync", ex);
            }
            return Task.CompletedTask;
        }

        private Task MqttServer_InterceptingClientEnqueueAsync(InterceptingClientApplicationMessageEnqueueEventArgs arg)
        {
            try
            {
            }
            catch (Exception ex)
            {
                LogOperate.Error("MqttServer_InterceptingClientEnqueueAsync", ex);
            }
            return Task.CompletedTask;
        }

        private Task MqttServer_ClientAcknowledgedPublishPacketAsync(ClientAcknowledgedPublishPacketEventArgs arg)
        {
            try
            {

            }
            catch (Exception ex)
            {
                LogOperate.Error("MqttServer_ClientAcknowledgedPublishPacketAsync", ex);
            }
            return Task.CompletedTask;
        }

        /// <summary>
        /// 消息接收
        /// </summary>
        /// <param name="arg"></param>
        /// <returns></returns>
        private Task MqttServer_InterceptingPublishAsync(InterceptingPublishEventArgs arg)
        {
            try
            {
                var client = arg.ClientId;
                var topic = arg.ApplicationMessage.Topic;
                var content = Encoding.UTF8.GetString(arg.ApplicationMessage.PayloadSegment);
                GlobalEvents.OnMessage($"接收到消息:Client[{client}] Topic[{topic}] Message[{content}]");
            }
            catch (Exception ex)
            {
                LogOperate.Error("MqttServer_InterceptingPublishAsync", ex);
            }
            return Task.CompletedTask;
        }

        /// <summary>
        /// 关闭Mqtt服务
        /// </summary>
        public async virtual Task StopMqttServer()
        {
            if (mqttServer != null)
            {
                if (mqttServer.IsStarted)
                {
                    await mqttServer.StopAsync();
                    mqttServer.Dispose();
                }
            }
        }

        /// <summary>
        /// 对客户端的连接进行验证
        /// </summary>
        /// <param name="arg"></param>
        /// <returns></returns>
        public virtual Task MqttServer_ValidatingConnectionAsync(ValidatingConnectionEventArgs arg)
        {
            try
            {
                //验证ClientId
                if (string.IsNullOrWhiteSpace(arg.ClientId))
                {
                    arg.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword;
                    return Task.CompletedTask;
                }

                //验证用户名和密码
                bool acceptflag = !(string.IsNullOrWhiteSpace(arg.UserName) || string.IsNullOrWhiteSpace(arg.Password));

                if (!acceptflag)
                {
                    //验证失败
                    arg.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword;
                    return Task.CompletedTask;
                }
                arg.ReasonCode = MqttConnectReasonCode.Success;
            }
            catch (Exception ex)
            {
                LogOperate.Error("MqttServer_ValidatingConnectionAsync", ex);
            }
            return Task.CompletedTask;
        }

        /// <summary>
        /// 客户端连接成功
        /// </summary>
        /// <param name="arg"></param>
        /// <returns></returns>
        public virtual Task MqttServer_ClientConnectedAsync(ClientConnectedEventArgs arg)
        {
            try
            {
                MqttClients.Add(new MqttClientInfo() { ClientId = arg.ClientId, ClientName = arg.UserName });
                GlobalEvents.OnMessage($"客户端上线- ID:【{arg.ClientId}】 Name:【{arg.UserName}】");
            }
            catch (Exception ex)
            {
                LogOperate.Error("MqttServer_ClientConnectedAsync", ex);
            }
            return Task.CompletedTask;
        }

        /// <summary>
        /// 客户端断开连接
        /// </summary>
        /// <param name="arg"></param>
        /// <returns></returns>
        /// <exception cref="NotImplementedException"></exception>
        public virtual Task MqttServer_ClientDisconnectedAsync(ClientDisconnectedEventArgs arg)
        {
            try
            {
                MqttClientInfo? mqttUser = MqttClients.FirstOrDefault(t => t.ClientId == arg.ClientId);
                if (mqttUser != null)
                {
                    MqttClients.Remove(mqttUser);
                    GlobalEvents.OnMessage($"客户端离线- ID:【{mqttUser.ClientId}】");
                }

            }
            catch (Exception ex)
            {
                LogOperate.Error("MqttServer_ClientDisconnectedAsync", ex);
            }
            return Task.CompletedTask;
        }

        /// <summary>
        /// 客户端发布订阅
        /// </summary>
        /// <param name="arg"></param>
        /// <returns></returns>
        public virtual Task MqttServer_ClientSubscribedTopicAsync(ClientSubscribedTopicEventArgs arg)
        {
            try
            {
                if (arg == null)
                    return Task.CompletedTask;
                MqttClientInfo? mqttUser = MqttClients.FirstOrDefault(t => t.ClientId == arg.ClientId);
                if (mqttUser != null)
                {
                    mqttUser.Subscriptions.Add(new MqttSubscription() { Parent = mqttUser, Topic = arg.TopicFilter.Topic });
                    GlobalEvents.OnMessage($"客户端发布订阅- Topic:【{arg.TopicFilter.Topic}】");
                }
            }
            catch (Exception ex)
            {
                LogOperate.Error("MqttServer_ClientSubscribedTopicAsync", ex);
            }
            return Task.CompletedTask;
        }


        /// <summary>
        /// 客户端取消订阅
        /// </summary>
        /// <param name="arg"></param>
        /// <returns></returns>
        public virtual Task MqttServer_ClientUnsubscribedTopicAsync(ClientUnsubscribedTopicEventArgs arg)
        {
            try
            {
                if (arg == null)
                    return Task.CompletedTask;
                MqttClientInfo? mqttUser = MqttClients.FirstOrDefault(t => t.ClientId == arg.ClientId);
                if (mqttUser != null)
                {
                    MqttSubscription? mqttSubedTopic = mqttUser.Subscriptions.FirstOrDefault(t => t.Topic == arg.TopicFilter);
                    if (mqttSubedTopic != null)
                    {
                        mqttUser.Subscriptions.Remove(mqttSubedTopic);
                        GlobalEvents.OnMessage($"客户端取消订阅- Topic:【{mqttSubedTopic.Topic}】");
                    }
                }
            }
            catch (Exception ex)
            {
                LogOperate.Error("MqttServer_ClientUnsubscribedTopicAsync", ex);
            }
            return Task.CompletedTask;
        }


    }

然后在控制台运行的时候,对服务进行实例化

var ip = IPHelper.GetLocalIP();
int port = 3303;
LSMQTTServer server = new LSMQTTServer();
server.InitMqttServer(ip,port);
GlobalEvents.OnMessage($"MQTT服务启动,IP:{ip},Port{port}");

4.客户端介绍

客户端设计,用的WPF,将每个客户端连接,用自定义控件进行封装,界面添加多个控件即表示多个客户端,添加订阅后,即可以显示收到的消息,便于多个客户端之间的消息调试

首先对连接的客户端也进行了类封装,包括连接,订阅,取消订阅和消息接收等

    public class LSMQTTClient
    {
        MqttClient mqttClient;
        public delegate void DelegateOutMessage(string message);
        public event DelegateOutMessage OnOutMessage;

        public void InitMqttClient(string serverIp, int serverPort, string clientId, string userName, string password)
        {
            try
            {
                var options = new MqttClientOptionsBuilder()
                     .WithCleanSession(true)
                     .WithCredentials(userName, password)
                     .WithClientId(clientId)
                     .WithTcpServer(serverIp,serverPort)
                     .Build();

                ConnectMQTTServer(options);
            }
            catch (Exception ex)
            {
                LogOperate.Error("InitMqttClient 发生异常", ex);
            }
        }

        /// <summary>
        /// 判断是否已连接服务
        /// </summary>
        /// <returns></returns>
        public bool IsConnect()
        {
            if (mqttClient == null)
                return false;
            if(!mqttClient.IsConnected)
                return false;
            return true;
        }


        public async void ConnectMQTTServer(MqttClientOptions options)
        {
            MqttFactory factory = new MqttFactory();
            if (mqttClient == null)
            {
                mqttClient = (MqttClient)factory.CreateMqttClient();
                mqttClient.ApplicationMessageReceivedAsync += MqttClient_ApplicationMessageReceivedAsync; ;
                mqttClient.ConnectedAsync += MqttClient_ConnectedAsync; ;
                mqttClient.DisconnectedAsync += MqttClient_DisconnectedAsync; ;
            }

            await mqttClient.ConnectAsync(options);
        }


        /// <summary>
        /// 断开服务连接
        /// </summary>
        public void DisConnectMQTTServer()
        {
            if(mqttClient!=null && mqttClient.IsConnected)
            {
                mqttClient.DisconnectAsync();
            }
        }

        /// <summary>
        /// 添加订阅
        /// </summary>
        /// <param name="topic"></param>
        /// <returns></returns>
        public async Task<BaseResult> AddSubscription(string topic)
        {
            if(!IsConnect())
            {
                return new BaseResult(false, "请先连接服务");
            }

            await mqttClient.SubscribeAsync(topic);
            return BaseResult.Successed;
        }

        /// <summary>
        /// 取消订阅
        /// </summary>
        /// <param name="topic"></param>
        /// <returns></returns>
        public async Task<BaseResult> UnSubscription(string topic)
        {
            if (!IsConnect())
            {
                return new BaseResult(false, "请先连接服务");
            }

            await mqttClient.UnsubscribeAsync(topic);
            return BaseResult.Successed;
        }

        /// <summary>
        /// 发送消息
        /// </summary>
        /// <param name="topic"></param>
        /// <param name="content"></param>
        /// <returns></returns>
        public async Task<BaseResult> SendMessage(string topic,string content)
        {
            if (!IsConnect())
            {
                return new BaseResult(false, "请先连接服务");
            }

            await mqttClient.PublishStringAsync(topic, content);
            return BaseResult.Successed;
        }

        private Task MqttClient_DisconnectedAsync(MqttClientDisconnectedEventArgs arg)
        {
            OnOutMessage?.Invoke("已断开服务连接");
            return Task.CompletedTask;
        }

        private Task MqttClient_ConnectedAsync(MqttClientConnectedEventArgs arg)
        {
            if (arg.ConnectResult.ResultCode == MqttClientConnectResultCode.Success)
                OnOutMessage?.Invoke("已连接到服务");
            else
                OnOutMessage?.Invoke($"连接服务失败【{arg.ConnectResult.ReasonString}】");
            return Task.CompletedTask;
        }

        private Task MqttClient_ApplicationMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs arg)
        {
            if (arg.ApplicationMessage.PayloadSegment.Array != null)
            {
                var content = Encoding.UTF8.GetString(arg.ApplicationMessage.PayloadSegment.Array);// BitConverter.ToString(arg.ApplicationMessage.PayloadSegment.Array,0, arg.ApplicationMessage.PayloadSegment.Count);
                OnOutMessage?.Invoke($"[{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")}]-收到消息>>{content}");
            }
            return Task.CompletedTask;
        }
    }

再设计自定义控件,将连接属性单独实例化

自定义控件包括XAML和ViewModel的设计,详细的可以下载源码进行查看,此处不展示太多了,代码量也确实有一些些,无非就是 连接的各个参数,如IP、端口,客户端ID,用户名、密码等等

然后再主界面设计两个按钮,添加自定义控件和清理自定义控件

界面设计的东西不介绍太多了,因为客户端可以有很多种设计的方式,但通讯那一块就已经在上边展示的代码里边了;

到此服务端+客户端就已经实现了,是不是没有想象中那么复杂。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1793734.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

了解一下Ubuntu Linux

1.3.1 什么是Ubuntu Ubuntu这个名字非常神奇&#xff0c;它取自非洲南部祖鲁语的ubuntu&#xff0c;是一个哲学名称&#xff0c;其意思为“人性”或者“我的存在是因为大家的存在”。对于中国人来说&#xff0c;一般称呼它为乌班图。 Ubuntu是在Debian的基础上开发出来的&am…

在Win10安装MySQL环境以及更改相关配置---附带每一步截图

下载MySQL数据库 MySQL官网链接 选择合适自己的版本&#xff0c;这里我选择5.7.17&#xff0c;选择安装包大的那一个&#xff0c;这个是离线安装&#xff0c;下载到本地后进行安装。 选择“No thanks&#xff0c;just start my download.”即进入下载状态。 安装 运行安…

SpringBoot+Vue房产销售网站(前后端分离)

技术栈 JavaSpringBootMavenMySQLMyBatisVueShiroElement-UI 角色对应功能 用户销售经理管理员 功能截图

Javascript系统学习(三)

前端模块化前端模块化CommonJS、AMD、CMD、ES6_commonjs amd cmd es6模块化-CSDN博客 ES6: <script type"module" src"main.js"></script> //默认导出 export default function(ctx) {... } ----------------------------------- //模块命名…

史上最走心midjourney教程多案例咒语注意点

一 &#xff1a;什么是Midjourney &#xff1f;&#xff08;文末附 MidJourney 知识库&#xff0c;从注册到使用教程还有高阶技巧应有尽有。&#xff09; Midjourney是一个由同名研究实验室开发的人工智能程式&#xff0c;可根据文本生成图像&#xff0c;于2022年7月12日进入公…

Linux 僵尸进程和孤儿进程

一.Z(zombie)-僵尸进程 1.僵死状态&#xff08;Zombies&#xff09;是一个比较特殊的状态。当进程退出并且父进程&#xff08;使用wait()系统调用后&#xff09;没有读取到子进程退出的返回代码时就会产生僵死(尸)进程 2.僵死进程会以终止状态保持在进程表中&#xff0c;并且会…

神经网络 torch.nn---Containers

torch.nn — PyTorch 2.3 documentation torch.nn - PyTorch中文文档 (pytorch-cn.readthedocs.io) nn是Neural Network的简称&#xff0c;帮助程序员方便执行如下的与神经网络相关的行为&#xff1a; &#xff08;1&#xff09;创建神经网络 &#xff08;2&#xff09;训练…

matlab(实例):滤波器(低通、带通、高通,使用butter函数、filter函数)

一、题目&#xff1a;已知一个时域信号&#xff0c;包含三个频率&#xff08;50Hz、150Hz、300Hz&#xff09;&#xff0c;分别设计并使用低通滤波器、带通滤波器、高通滤波器&#xff0c;对其进行滤波&#xff0c;画出滤波信号的时域图和频谱图。 二、解题过程&#xff1a; ①…

HQChart小程序教程4-动态控制手势滚动页面

动态控制手势滚动页面 示例效果canvas 控制页面滚动属性步骤1. 使用变量绑定disable-scroll2. 在手势处理函数中控制是否滚动页面 交流QQ群HQChart代码地址 示例效果 canvas 控制页面滚动属性 根据官方文档&#xff0c;disable-scroll 属性是控制画布手势是否可以滚动页面。 h…

一线教师教学工具汇总

亲爱的教师们&#xff01;我们的教学工具箱里也该更新换代啦&#xff01;今天&#xff0c;就让我来给大家安利一波超实用的教学神器&#xff1a; 百度文库小程序 —— 在线图书馆 百度文库&#xff0c;一个宝藏级的在线文档分享平台&#xff01;在这里&#xff0c;你可以找到海…

YashanDB携手宏杉科技助力国产软件生态发展

近日&#xff0c;深圳计算科学研究院崖山数据库系统YashanDB与宏杉科技系列存储、系列服务器与数据库一体机等多款产品顺利完成兼容性互认证。经严格测试&#xff0c;双方产品完全兼容&#xff0c;稳定运行&#xff0c;共同提供高效、稳定、安全的国产软硬件一体化解决方案&…

超声波清洗机真的有用吗?四款宝藏超声波清洗机千万别错过

越来越多的人开始依赖眼镜来帮助他们纠正视力问题。然而&#xff0c;眼镜的使用也带来了一些麻烦&#xff0c;其中之一就是如何保持眼镜的清洁和清晰。传统的洗眼镜方法可能效果不佳&#xff0c;甚至可能会划伤镜片。幸运的是&#xff0c;现在有一种新兴的技术可以解决这个问题…

最新一站式AI创作中文系统网站源码+系统部署+支持GPT对话、Midjourney绘画、Suno音乐、GPT-4o文档分析等大模型

一、系统简介 本文将介绍最新的一站式AI创作中文系统&#xff08;集成ChatGPTMidjourneySunoStable Diffusion&#xff09;——「星河易创AI」系统&#xff0c;该系统基于ChatGPT的核心技术&#xff0c;融合了自然语言问答、绘画、音乐、文档分享、图片识别等创作功能&#xf…

ctfshow web web签到--web14

web签到 查看源代码解码即可 web2 SQL注入&#xff0c;我之前是没遇到这种格式的长了新姿势 or 11 union select 1,database(),3 limit 1,2;# or 11 union select 1,(select table_name from information_schema.tables where table_schemaweb2 limit 0,1),3 limit 1,2;# or…

招待宴请、工作餐、加班餐,YonSuite让企业支出餐餐可控

随着企业规模的不断扩大和员工需求的日益多样化&#xff0c;餐饮管理成为了企业日常运营中不可忽视的一环。传统的餐饮管理方式&#xff0c;如员工垫付、手工报销、招待费用高、费用去向不明等&#xff0c;不仅效率低下&#xff0c;而且难以实时掌控支出情况&#xff0c;给企业…

减肥经验分享,坚持好的习惯。

今天早上看到一篇丁香医生的液断减肥&#xff0c;看到评论&#xff0c;很多人都觉得减肥很难&#xff0c;都在常年减肥&#xff0c;于是想给大家分享一下我的减肥经验。 目录 一 自我介绍 二 减肥四个阶段 第一阶段 少吃主食 第二阶段 不吃主食 第三阶段 黄瓜鸡蛋 第四阶…

HTML开发 Vue2.x + Element-UI 动态生成表单项并添加表单校验

基于vue2.x 和element-ui 动态生成表单项并添加表单校验&#xff1b; 1、需求问题 如下图&#xff0c;项目有个需求&#xff0c;点击添加按钮&#xff0c;新增一行设备信息&#xff0c;且每项信息必填&#xff1b; 2、代码 看到这个需求&#xff0c;首先想到要使用v-for的形…

python 巡检报告中的邮件处理

00.创作背景,在每天的巡检报告中要 要检查oa相关服务器的备份作业是否备份成功 那个备份软件有个功能&#xff0c;就是完成备份作业后&#xff0c;可以发送信息到我的邮箱。 01.通过检查我邮箱的信息&#xff0c;就可以了解那个备份作业的情况。 通过解释邮件的名称可以了解备…

618大促该买哪些数码好物?数码好物选购清单来啦,闭眼不踩坑!

618年中大促&#xff0c;无疑是一场数码爱好者的盛宴&#xff0c;在这个时刻&#xff0c;各大品牌和商家纷纷推出超值优惠&#xff0c;让众多心仪的数码产品以历史最低价呈现在消费者面前。面对如此丰富的选择&#xff0c;你是否也在犹豫哪些数码好物值得在这个节点入手呢&…

斯坦福 AI 团队被指抄袭清华大模型:细节揭秘

近日&#xff0c;斯坦福AI团队因发布的AI模型被指抄袭清华大学的研究成果而陷入争议。本文将详细探讨这一事件的背景、关键细节及其对开源社区的影响。 事件背景 斯坦福的AI团队发布了一个名为“LLaMA-3V”的模型&#xff0c;声称只花了500美元且只用了GPT-4的1%的体量便达到…