基于MQTT协议实现微服务架构事件总线

news2025/2/28 3:11:59

一、场景描述

昨天在博客《客户端订阅服务端事件的实现方法》中提出了利用websocket、服务端EventEmitter和客户端mitt实现客户端订阅服务端事件,大大简化了客户端对服务端数据实时响应的逻辑。上述方案适用于单服务节点的情形。

对于由服务集群支撑的微服务架构,websocket提供的点对点通信已无法满足前端订阅后端集群事件的需求,升级方案是使用基于消息总线的通信方式。

在这里插入图片描述

二、几种消息总线适用性比较

常用的消息总线包括Kafka、Redis和基于MQTT协议实现的EMQX。

Kafka和MQTT都是从发布/订阅系统演化而来,但发展侧重点不同。Kafka通过分布式架构提供了海量数据流的存储,并保证数据流顺序,它的设计目标是支持数据发布、订阅和存储。而MQTT用于网络中传输小型数据包,其设计目的是实现简单、可靠的设备间通信。

而Redis是从内存数据库系统演化而来,发布/订阅功能是把消息保存在内存中。与Kafka相比,其只能提供半持久化;与MQTT相比,其通信效率较低。

由于应用场景没有对消息持久化的需求,且考虑到产业大脑平台未来会接入工业互联网,使用MQTT协议来搭建事件总线更利于平台在工业互联网环境下的扩展。

三、MQTT简介

几年前,本人曾写过MQTT简介。本文摘抄其中重要概念。

(一)MQTT协议

MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是基于“订阅/发布”模式的轻量级通信协议,该协议基于TCP/IP,能以极低的带宽为海量(百万级)跨域设备提供可靠的消息服务,因此在物联网、小型移动终端、边缘计算方面有广泛应用。
所谓可靠的消息传输,体现为可配置消息的服务质量(QoS),有三种服务质量可选:

  • 至多一次:
    消息发布完全依赖底层TCP/IP网络。会发生消息丢失或重复。应用场景如环境传感器的数据采集,丢失一次记录无所谓,因为不久后还会有第二次发送。
  • 至少一次:
    确保消息送达订阅者,但消息可能重复,适用于幂等性操作。
  • 只有一次:
    最严格的消息服务质量,确保消息到达且仅到达一次订阅者。应用场景如计费系统等。

MQTT协议中存在三种身份:消息总线(Broker)、发布者(Publish)和订阅者(Subscribe),其中消息总线属于服务器,后两者都属于客户端。发布者和订阅者可以是各种物联网设备和小型终端,消息发布者可以同时也是消息订阅者,如下图所示。

MQTT.png

MQTT传输的消息分为:主题(Topic)和负载(payload)两部分:

  • Topic,可以理解为消息的类型,订阅者订阅(Subscribe)后,就会收到该主题的消息内容(payload);
  • payload,可以理解为消息的内容,是指订阅者具体要使用的内容。

订阅消息时,可以在订阅表达式中使用通配符筛选器对主题进行筛选,可同时订阅所匹配的多个主题。
MQTT协议中主要有以下5个方法:

  • connect:客户端建立与服务器的连接
  • disconnect:等待客户端完成工作后,端口与总线的会话
  • subscribe:客户端向消息总线注册订阅主题
  • unsubscribe:客户端等待消息总线取消所注册的订阅
  • publish:客户端向消息总线发送某主题的消息

(二)开源消息总线EMQX

EMQX(Erlang/Enterprise/Elastic MQTT Broker),是基于Erlang语言开发的开源物联网MQTT消息总线。其是一款由前华为员工开发的开源软件,软件主页为https://www.emqx.io/。可根据操作系统类别选择不同版本下载安装,或通过docker部署。

软件安装后,通过 emqx start以后台方式启动。启动后将会开放两个端口:

  • 18083端口为控制台端口,可通过浏览器访问该端口,首次登录的用户名和密码为admin和public。控制台提供了总线监控、用户权限管理、在线客户端订阅/发布等功能。
  • 8083端口为通信端口,MQTT客户端可通过该端口与EMQX消息总线通信。

(三)MQTT.js客户端

MQTT.js是MQTT客户端Nodejs SDK,可在浏览器(ES模块)和Node.js环境(CommonJS模块)下使用,前者可通过MQTT over WebSocket使用,后者既可以通过MQTT over WebSocket使用,也可以直接使用MQTT。区别仅仅是连接参数的协议头不同。

1. 安装和帮助文件

$ pnpm i mqtt -S #安装
$ npx mqtt help  #帮助
MQTT.js command line interface, available commands are:

  * publish     publish a message to the broker
  * subscribe   subscribe for updates from the broker
  * version     the current MQTT.js version
  * help        help about commands

Launch 'mqtt help [command]' to know more about the commands.

2. 使用方法

// const mqtt = require('mqtt') //ES模块
import mqtt from 'mqtt'  //CommonJS模块

// 连接选项
const options = {
      clean: true, // true: 清除会话, false: 保留会话
      connectTimeout: 4000, // 超时时间
      // 认证信息
      clientId: 'user_id', // 要保证唯一性
      // 若在控制台配置了用户名和密码:
      // username: 'xxx',
      // password: 'xxx',
}

// 连接字符串, 通过协议指定使用的连接方式
// ws 未加密 WebSocket 连接
// wss 加密 WebSocket 连接
// mqtt 未加密 TCP 连接
// mqtts 加密 TCP 连接
// wxs 微信小程序连接
// alis 支付宝小程序连接
const connectUrl = 'ws://localhost:8084/mqtt'
const client = mqtt.connect(connectUrl, options)

client.on('reconnect', error => {
  console.error('正在重连:', error)
})

client.on('error', error => {
  console.error('连接失败:', error)
})

//收到消息
client.on('message', (topic, message) => {
  console.log('收到消息:', topic, message.toString()) //message是二进制流,需要转换成字符串
})

//订阅主题
const topic='/user_id/#'
const qos=0  //0:最多交付1次;1:至少交付1次;2:只交付1次
client.subscribe(topic, qos, error=>{  //订阅user_id主题下所有消息
  if(error){
    console.error('订阅主题失败:', error)
    return
  }
  console.log('订阅成功')
})

//发布消息
client.publish('user_id/a',JSON.stringify({a:123}),qos,error=>{
  if(error){
    console.error('发布消息失败:', error)
  }
})

//取消订阅
client.unsubscrib(topic, qos, error=>{
  if(error){
    console.error('取消订阅失败', error)
    return
  }
})

//断开连接
if(client.connected){
  try{
    client.end(false,()=>{
      console.log('成功断开连接')
    })catch(error){
      console.error('断开连接失败:', error)
    }
  }
}

(四)安全性

1. 排它订阅

排它订阅是 EMQX 支持的 MQTT 扩展功能。排它订阅允许对主题进行互斥订阅,一个主题同一时刻仅被允许存在一个订阅者,在当前订阅者未取消订阅前,其他订阅者都将无法订阅对应主题。

2. JWT认证

系统整体采用JWT认证方式,通过一台认证服务器颁发JWT Token。MQTT客户端访问EMQX总线时,携带由认证服务器颁发的JWT Token。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1477131.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

前端-DOM树

dom树描述网页元素关系的一个专有名词,如html内包含了head、body,而head内包含meta、title、script等,body内包含div等元素;网页所有内容都在document里面,网页内容以树状形式排列,所以称之为dom树 dom树内…

进制转换md5绕过 [安洵杯 2019]easy_web1

打开题目 在查看url的时候得到了一串类似编码的东西,源码那里也是一堆base64,但是转换成图片就是网页上我们看见的那个表情包 ?imgTXpVek5UTTFNbVUzTURabE5qYz0&cmd 我们可以先试把前面的img那串解码了 解码的时候发现长度不够,那我们…

【Prometheus】基于Altertmanager发送告警到多个接收方、监控各种服务、pushgateway

基于Altertmanager发送报警到多个接收方 一、配置alertmanager-发送告警到qq邮箱1.1、告警流程1.2、告警设置【1】邮箱配置【2】告警规则配置【3】 部署prometheus【4】部署service 二、配置alertmanager-发送告警到钉钉三、配置alertmanager-发送告警到企业微信3.1、注册企业微…

DTD、XML阐述、XML的两种文档类型约束和DTD的使用

目录 ​编辑 一、DTD 什么是DTD? 为什么要使用 DTD? 内部 DTD 声明 具有内部 DTD 的 XML 文档 外部 DTD 声明 引用外部 DTD 的 XML 文档 二、XML 什么是XML? XML 不执行任何操作 XML 和 HTML 之间的区别 XML 不使用预定义的标记…

AI大预言模型——ChatGPT在地学、GIS、气象、农业、生态、环境等应用

原文链接:AI大预言模型——ChatGPT在地学、GIS、气象、农业、生态、环境等应用 一开启大模型 1 开启大模型 1)大模型的发展历程与最新功能 2)大模型的强大功能与应用场景 3)国内外经典大模型(ChatGPT、LLaMA、Gemini、DALLE、Midjourney、Stable Di…

Java Web(十一)--JSON Ajax

JSON JSon在线文档: JSON 简介 JSON(JavaScript Object Notation, JS 对象标记) 是一种轻量级的数据交换格式。轻量级指的是跟xml做比较。数据交换指的是客户端和服务器之间业务数据的传递格式。 它基于 ECMAScript (W3C制定的JS规范)的一个子集,采…

Dsco Dropship EDI需求分析

供应商要想从Dsco处通过EDI获取订单,需要部署自己的EDI系统,与Dsco的EDI供应商CommerceHub 建立连接,分为两个方向: 1.从CommerceHub 的 Dsco 平台获取 EDI 850 采购订单 2.向Dsco发送库存(846)、订单状态…

如何使用ArcGIS Pro创建最低成本路径

虽然两点之间直线最短,但是在实际运用中,还需要考虑地形、植被和土地利用类型等多种因素,需要加权计算最低成本路径,这里为大家介绍一下计算方法,希望能对你有所帮助。 数据来源 教程所使用的数据是从水经微图中下载…

JVM(3)

垃圾回收(GC)相关 在C/C中,当我们使用类似于malloc的内存开辟,还需要手动释放内存空间,这样的机制在使用时给我们造成了诸多不便,但在Java中,有垃圾回收这样的机制,这就是指:我们不再需要手动释放,程序会自动判定,某个内存空间是否可以继续使用,如果内存不使用了,就会自动释放…

(转载)SpringCloud 微服务(三)-Seata解决分布式事务问题

ps:这个原文写的很好,怕后续这个地址失效,备份一个留着自己学习 转自:SpringCloud 微服务(三)-Seata解决分布式事务问题_seata 黑马 代码-CSDN博客 看完了黑马程序员的免费课程,感觉受益匪浅,…

堆排序C++(Acwing)

代码&#xff1a; #include <iostream> #include <algorithm>using namespace std;const int N 100010;int n, m; int h[N], cnt;void down(int u) {int t u;if(u * 2 < cnt && h[u * 2] < h[t]) t u * 2;if(u * 2 1 < cnt && h[u *…

【架构之路】糟糕程序员的20个坏习惯,切记要改掉

文章目录 强烈推荐前言&#xff1a;坏习惯:总结&#xff1a;强烈推荐专栏集锦写在最后 强烈推荐 前些天发现了一个巨牛的人工智能学习网站&#xff0c;通俗易懂&#xff0c;风趣幽默&#xff0c;忍不住分享一下给大家。点击跳转到网站:人工智能 前言&#xff1a; 优秀的程序员…

【重要公告】BSV区块链协会全新推出“网络访问规则NAR”

​​发表时间&#xff1a;2024年2月15日 BSV区块链协会正式宣布已为BSV区块链推出一套全新的网络访问规则&#xff08;Network Access Rules&#xff0c;以下简称“NAR”&#xff09;。 NAR是一整套规则&#xff0c;用于规范BSV协会与BSV网络节点之间的关系。它基于比特币最初…

Unity 常用操作

2D素材网站 https://craftpix.net/ https://itch.io/game-assets/tag-2d/tag-backgrounds 3D素材资源网址 https://www.mixamo.com/#/ 场景常用操作&#xff1a; 快捷键&#xff1a;QWER Q&#xff1a;Q键或鼠标中键&#xff0c;可以拉动场景。 W&#xff1a;选中物体后&…

雾锁王国服务器要开服务器吗?

雾锁王国要开服务器吗&#xff1f;可以使用官方服务器&#xff0c;也可以自己搭建多人联机服务器&#xff0c;更稳定不卡&#xff0c;畅玩开黑。阿腾云分享atengyun.com给大家目前阿里云和腾讯云均提供雾锁王国服务器和一键搭建程序&#xff0c;成本26元即可搭建一台自己的雾锁…

小白水平理解面试经典题目leetcode 606. Construct String from Binary Tree【递归算法】

Leetcode 606. 从二叉树构造字符串 题目描述 例子 小白做题 坐在自习室正在准备刷题的小白看到这道题&#xff0c;想想自己那可是没少和白月光做题呢&#xff0c;也不知道小美刷题刷到哪里了&#xff0c;这题怎么还没来问我&#xff0c;难道是王谦谦去做题了&#xff1f; 这…

换根DP,LeetCode 2581. 统计可能的树根数目

目录 一、题目 1、题目描述 2、接口描述 3、原题链接 二、解题报告 1、思路分析 2、复杂度 3、代码详解 一、题目 1、题目描述 Alice 有一棵 n 个节点的树&#xff0c;节点编号为 0 到 n - 1 。树用一个长度为 n - 1 的二维整数数组 edges 表示&#xff0c;其中 edges[…

特征值和特征向量及其在机器学习中的应用

特征值和特征向量是线性代数中的概念&#xff0c;用于分析和理解线性变换&#xff0c;特别是由方阵表示的线性变换。它们被用于许多不同的数学领域&#xff0c;包括机器学习和人工智能。 在机器学习中&#xff0c;特征值和特征向量用于表示数据、对数据执行操作以及训练机器学…

MVCC【重点】

参考链接 [1] https://www.bilibili.com/video/BV1YD4y1J7Qq/?spm_id_from333.1007.top_right_bar_window_history.content.click&vd_source0cb0c5881f5c7d76e7580fbd2f551074 [2]https://www.cnblogs.com/jelly12345/p/14889331.html [3]https://xiaolincoding.com/mysql…

第十三篇【传奇开心果系列】Python的文本和语音相互转换库技术点案例示例:Microsoft Azure的Face API开发人脸识别门禁系统经典案例

传奇开心果博文系列 系列博文目录Python的文本和语音相互转换库技术点案例示例系列 博文目录前言一、实现步骤和雏形示例代码二、扩展思路介绍三、活体检测深度解读和示例代码四、人脸注册和管理示例代码五、实时监控和报警示例代码六、多因素认证示例代码七、访客管理示例代码…