C#实现数据采集系统-数据反写(1)MQTT订阅接收消息

news2025/1/22 17:41:46

C#实现数据采集系统-数据反写

实现步骤

  1. MQTT订阅,接收消息
  2. 反写内容写入通信类,添加到写入队列中 链接-消息内容处理和写入通信类队列
  3. 实现Modbustcp通信写入

具体实现

1.MQTT订阅,接收消息

Mqtt实现采集数据转发

Mqtt控制类增加订阅方法

  1. 增加一个通用的订阅方法,需要的参数是一个主题和一个委托,将主题跟对应的委托方法对应存储,然后再mqtt中订阅,收到对应的主题消息,然后执行对应的方法。
 public void SubscribeTopic(string topic, Action<string, string> topicAction)
 {
     //订阅
 }

然后需要一个键值对用于存储这个关系

 private Dictionary<string, Action<string, string>> _topicActions;

订阅方法实现:订阅主题,添加到_topicActions,如果已经连接,则直接订阅,没有连接,则等待连上的时候自动订阅,增加锁来确保订阅成功

/// <summary>
/// 订阅主题,添加到_topicActions,如果已经连接,则直接订阅
/// </summary>
/// <param name="topic"></param>
/// <param name="topicAction"></param>
public void SubscribeTopic(string topic, Action<string, string> topicAction)
{
    
    lock (_topicActionsLock)
    {
        if (!_topicActions.ContainsKey(topic))
        {
            _topicActions.Add(topic, topicAction);
            if (_mqttClient.IsConnected)
            {
                _mqttClient.SubscribeAsync(topic);
            }
        }
    }
    
}

在连接方法中,添加订阅

在这里插入图片描述

public void MqttConnect()
{
    while (!_mqttClient.IsConnected)
    {
        try
        {
            Console.WriteLine($"正在连接……");
            _mqttClient.ConnectAsync(_clientOptions).GetAwaiter().GetResult();
        }
        catch (Exception ex)
        {
            Task.Delay(1000).Wait();
            Console.WriteLine("连接mqtt服务器失败");
        }
    }
    lock (_topicActionsLock)
    {
        foreach (var item in _topicActions)
        {
            _mqttClient.SubscribeAsync(item.Key);
        }
    }

}
  1. 添加接收消息事件
 //客户端接收消息事件
 _mqttClient.ApplicationMessageReceivedAsync +=
     MqttClient_ApplicationMessageReceivedAsync;
     
     

  /// <summary>
  /// 接收消息
  /// </summary>
  /// <param name="args"></param>
  /// <returns></returns>
  private async Task MqttClient_ApplicationMessageReceivedAsync(
      MqttApplicationMessageReceivedEventArgs args
  )
  {
      try
      {
          Console.WriteLine($"收到消息:{args.ApplicationMessage.Topic}");
          if (_topicActions.ContainsKey(args.ApplicationMessage.Topic))
          {
              _topicActions[args.ApplicationMessage.Topic]
                  .Invoke(
                      args.ApplicationMessage.Topic,
                      Encoding.UTF8.GetString(args.ApplicationMessage.Payload)
                  );
          }
      }
      catch (Exception ex)
      {
          Console.WriteLine(ex.Message);
      }
  }

完整Mqtt代码

 public class MqttControllor
 {
     private MqttConfig _config;
     private string _clientId;
     MqttClientOptions _clientOptions;

     private IMqttClient _mqttClient;

     private readonly object _topicActionsLock = new object();
     private Dictionary<string, Action<string, string>> _topicActions;

     public MqttControllor(MqttConfig config, bool isAutoConnect = true)
     {
         _topicActions = new Dictionary<string, Action<string, string>>();

         _config = config;
         _clientId = config.ClientId == "" ? Guid.NewGuid().ToString() : config.ClientId;
         MqttClientOptionsBuilder optionsBuilder = new MqttClientOptionsBuilder()
             .WithTcpServer(_config.Ip, _config.Port)
             .WithCredentials(_config.Username, _config.Password)
             .WithClientId(_clientId);

         _clientOptions = optionsBuilder.Build();

         _mqttClient = new MqttFactory().CreateMqttClient();

         // 客户端连接关闭事件
         _mqttClient.DisconnectedAsync += MqttClient_DisconnectedAsync;
         //客户端接收消息事件
         _mqttClient.ApplicationMessageReceivedAsync +=
             MqttClient_ApplicationMessageReceivedAsync;
         if (isAutoConnect)
         {
             Task.Run(() =>
             {
                 MqttConnect();
             });
         }
     }

     /// <summary>
     /// 接收消息
     /// </summary>
     /// <param name="args"></param>
     /// <returns></returns>
     private async Task MqttClient_ApplicationMessageReceivedAsync(
         MqttApplicationMessageReceivedEventArgs args
     )
     {
         try
         {
             Console.WriteLine($"收到消息:{args.ApplicationMessage.Topic}");
             if (_topicActions.ContainsKey(args.ApplicationMessage.Topic))
             {
                 _topicActions[args.ApplicationMessage.Topic]
                     .Invoke(
                         args.ApplicationMessage.Topic,
                         Encoding.UTF8.GetString(args.ApplicationMessage.Payload)
                     );
             }
         }
         catch (Exception ex)
         {
             Console.WriteLine(ex.Message);
         }
     }

     private Task MqttClient_DisconnectedAsync(MqttClientDisconnectedEventArgs arg)
     {
         Console.WriteLine($"客户端已断开与服务端的连接……");
         //断开重连
         _mqttClient = new MqttFactory().CreateMqttClient();
         MqttConnect();
         return Task.CompletedTask;
     }

     public void MqttConnect()
     {
         while (!_mqttClient.IsConnected)
         {
             try
             {
                 Console.WriteLine($"正在连接……");
                 _mqttClient.ConnectAsync(_clientOptions).GetAwaiter().GetResult();
             }
             catch (Exception ex)
             {
                 Task.Delay(1000).Wait();
                 Console.WriteLine("连接mqtt服务器失败");
             }
         }
         Console.WriteLine($"客户端已连接到服务端……");
         //连接成功,订阅主题
         lock (_topicActionsLock)
         {
             foreach (var item in _topicActions)
             {
                 _mqttClient.SubscribeAsync(item.Key);
             }
         }
     }

     /// <summary>
     /// 订阅主题,添加到_topicActions,如果已经连接,则直接订阅
     /// </summary>
     /// <param name="topic"></param>
     /// <param name="topicAction"></param>
     public void SubscribeTopic(string topic, Action<string, string> topicAction)
     {
         lock (_topicActionsLock)
         {
             if (!_topicActions.ContainsKey(topic))
             {
                 _topicActions.Add(topic, topicAction);
                 if (_mqttClient.IsConnected)
                 {
                     _mqttClient.SubscribeAsync(topic);
                 }
             }
         }
     }

     /// <summary>
     /// 推送消息
     /// </summary>
     /// <param name="topic">主题</param>
     /// <param name="data">消息内容</param>
     /// <param name="qsLevel"></param>
     /// <param name="retain"></param>
     public void Publish(string topic, string data, int qsLevel = 0, bool retain = false)
     {
         qsLevel = Math.Clamp(qsLevel, 0, 2);

         if (!_mqttClient.IsConnected)
         {
             throw new Exception("mqtt未连接");
         }
         var message = new MqttApplicationMessage
         {
             Topic = topic,
             PayloadSegment = Encoding.UTF8.GetBytes(data),
             QualityOfServiceLevel = (MqttQualityOfServiceLevel)qsLevel,
             Retain = retain // 服务端是否保留消息。true为保留,如果有新的订阅者连接,就会立马收到该消息。
         };

         _mqttClient.PublishAsync(message);
     }
 }

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2064377.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【微信小程序】使用 npm 包 - Vant Weapp --定制主题

小程序对 npm 的支持与限制 1. 什么是 Vant Weapp 官方文档地址 &#xff1a;https://youzan.github.io/vant-weapp 2. 安装 Vant 组件库 详细的操作步骤&#xff0c;大家可以参考 Vant 官方提供的快速上手教程&#xff1a; https://youzan.github.io/vant-weapp/#/quickst…

Texio电源维修德士直流电源维修PSW-1080M160

Texio德士电源维修类型有&#xff1a;指针式小型直流电源、数字显示直流电源、多路输出直流电源、直流电源、相控电源、低噪声开关电源、宽幅电源、高电压宽幅电源、开关电源、超薄直流电源、4象限双电源、高电压直流电源 Texio电源维修常见系列如下 PSW-系列是一款单路输出、…

框架——Mybatis(!!!MyBatis 环境搭建步骤)

目录 一、Mybatis 概述 1.背景 2.简介 3.Mybatis 中文官网 二、MyBatis 环境搭建&#xff08;超全&#xff01;&#xff01;&#xff01;&#xff09; 1.创建一张表和表对应的实体类 2.导入 MyBatis jar包,mysql数据库驱动包 3.创建 MyBatis全局配置文件 4. 在接口中…

2024年4款高质量的英语翻译工具推荐!

英语作为一门应用非常广泛的语言&#xff0c;其影响力还是非常深远的。虽然现在学英语的人很多&#xff0c;但对于非英语母语的人来说&#xff0c;英语多多少少会是一个壁垒&#xff0c;所以翻译工具也变得重要了起来。这次&#xff0c;我便要跟大家分享几个很专业的英语翻译工…

连锁门店收银系统源码+电子发票

传统纸质开票模式&#xff0c;流程复杂、时间长&#xff0c;为解决商户开票难的问题&#xff0c;千呼新零售2.0上线了电子发票功能&#xff0c;开通方便&#xff0c;使用简便&#xff01;商户只需要简单配置&#xff0c;就可以实现门店实现开票自由&#xff01; 一、线下订单开…

黑马程序员|8天Python第13章面向对象

一 初识对象 1.生活中数据的组织 2.程序中数据的组织 3.使用对象组织数据 类的属性&#xff1a; 二 成员方法 1.类的定义和使用 2.成员变量和成员方法 类外面是函数&#xff0c;类里面是方法。 3.成员方法的定义语法 self 只是写在这里&#xff0c;传参的时候可以当作不存在。…

QTCreator学习

1.新建程序 2. 设置项目名称 3. Build System选择qmake,若选择cmake则只会产生CmakeLists文件&#xff0c;不会产生pro文件。 4.Base class选择QDialog,表示该类继承于QDialog类 5.套件选择MinGW 32bit,取消掉其他的。 6. 双击ui文件&#xff0c;拖动可添加工具。 7.点击左…

深信服技术服务工程师面试全过程分享

吉祥知识星球http://mp.weixin.qq.com/s?__bizMzkwNjY1Mzc0Nw&mid2247485367&idx1&sn837891059c360ad60db7e9ac980a3321&chksmc0e47eebf793f7fdb8fcd7eed8ce29160cf79ba303b59858ba3a6660c6dac536774afb2a6330#rd 《网安面试指南》http://mp.weixin.qq.com/s?…

这对二婚夫妻结婚半年,一起生活才一个月,就走到了婚姻尽头!

这对二婚夫妻结婚半年&#xff0c;一起生活才一个月&#xff0c;就走到了婚姻尽头&#xff01; 这是一篇涉离婚纠纷的民事起诉状 &#xff08;范文点评&#xff09; 离 婚 起 诉 状 原告&#xff1a;韩某斌&#xff0c;男&#xff0c;现年37岁&#xff0c;汉族&#xff0c;打…

Spring Cloud Consul面试题

​ ​ 您好&#xff0c;我是程序员小羊&#xff01; 前言 Spring Cloud Consul 是微服务架构中的一个重要组件&#xff0c;用于服务发现、配置管理以及健康检查。了解 Spring Cloud Consul 的工作原理和应用场景&#xff0c;对于微服务开发者和架构师来说至关重要。以下是一些常…

线程优先级调度

Windows优先级调度算法 系统维护了一个全局的处理器数组KiProcessorBlock&#xff0c;其中每个元素对应于一个处理器的KPRCB对象。其次&#xff0c;另有一个全局变量KiIdleSummary记录了哪些处理器当前是空闲的。所谓一个处理器是空闲的&#xff0c;是指该处理器正在执行空闲循…

《Techporters架构搭建》-Day08 Spring Boot日志实现

集成日志功能 日志介绍日志相关概念选择Logback还是Log4j2&#xff1f;LogBack相关知识LogbackLogback的基本概念Logback的日志级别Logback的配置文件Logback日志文件解析logback-spring.xml的配置项标签说明完整的logback-spring.xml配置示例知识点补充 整合Spring Boot和Logb…

【Lecture1】清华大学大模型公开课——大模型绪论

#清华大模型公开课第二季 #OpenBMB 目录 1. The Evolution of Artificial Intelligence --History 人工智能的演变--历史 1.1 Definition of AI --定义 1.2 Conceptualization of AI -- 概念 1.3 Birth of AI as a Discipline 1.4 Development of AI 1.4.1 Symbolic Int…

Oracle问题笔记

ORA-28040 没有匹配的验证协议 问题出现场景oracle数据库为12c,应用使用的jdbc或客户端工具是11g版本一下&#xff0c;连接12c数据库时会报ora-28040错误。解决办法在Oracle服务端的$ORACLE_HOME/network/admin/sqlnet.ora文件中添加&#xff1a; SQLNET.ALLOWED_LOGON_VERSI…

消息队列篇

1、队列简介 概念&#xff1a; 队列是任务到任务、任务到中断、中断到任务数据交流的一种机制&#xff0c;说白了&#xff0c;队列就是用来传递消息的。 ----------------------------------------------------------------------------------------------------------------…

【Python机器学习】NLP概述——深度处理

自然语言处理流水线的各个阶段可以看作是层&#xff0c;就像是前馈神经网络中的层一样。深度学习就是通过在传统的两层机器学习模型架构&#xff08;特征提取建模&#xff09;中添加额外的处理层来创建更复杂的模型和行为。 上图中&#xff0c;前四层对应于聊天机器人流水线中的…

MCtalk·CEO对话×每刻科技:经济挑战期,企业如何将“好钢”用在“刀刃”上?

2015 年 10 月&#xff0c;网易数智发布第一款产品&#xff0c;正式踏上了 ToB 商业化之路。从那以后&#xff0c;我们每年举办不同主题的科技峰会&#xff0c;分享最新的行业体感和洞察&#xff1b;访谈各界企业领导者&#xff0c;记录他们的创新与创业经历&#xff1b;走过大…

dubbo:dubbo+zookeeper整合nginx实现网关(四)

文章目录 0. 引言1. nginx简介2. 集成nginx2.1 负载均衡实现 3. 源码4. 总结 0. 引言 我们之前讲解过dubbozookeeper实现服务调用和注册中心&#xff0c;但是还缺乏一个统一的入口&#xff0c;即网关服务。dubbozookeeper的模式更加适合的网关组件为nginx&#xff0c;所以今天…

Unity编辑器扩展:创建一个欢迎窗口,在启动Editor的时候显示自定义窗口。

Unity编辑器扩展&#xff1a;创建一个欢迎窗口&#xff0c;在启动Editor的时候显示自定义窗口。 在Unity开发过程中&#xff0c;经常会遇到需要向其他人展示重要信息的情况&#xff0c;比如项目文档、脚本说明、插件介绍等。这个窗口不仅能够展示必要的文档信息&#xff0c;还…

苍穹外卖(瑞吉外卖)--环境搭建

作为软件开发工程师&#xff0c;在编码的过程中就不可避免地会接触多种软件环境&#xff0c;我们主要来分析在工作中经常遇到的三套环境&#xff0c; 分别是: 开发环境、测试环境、生产环境 开发环境 在开发阶段使用的环境&#xff0c;就是开发环境&#xff0c;一般外部用户无…