Nodejs 第七十六章(MQ进阶)

news2025/1/16 17:57:15

MQ介绍和基本使用在上一章介绍过了,不再重复

  1. 消息:在RabbitMQ中,消息是传递的基本单元。它由消息体和可选的属性组成

  2. 生产者Producer:生产者是消息的发送方,它将消息发送到RabbitMQ的交换器(Exchange)中

  3. 交换器Exchange:交换器接收从生产者发送的消息,并根据特定的规则将消息路由到一个或多个队列中

  4. 队列Queue:队列是消息的接收方,它存储了待处理的消息。消费者可以从队列中获取消息并进行处理

  5. 消费者Consumer:消费者是消息的接收方,它从队列中获取消息并进行处理

MQ进阶用法

发布订阅

发布订阅,消息的发送者称为发布者(Publisher),而接收消息的一个或多个实体称为订阅者(Subscriber

回顾上一篇,点对点通讯生产者发送一条消息通过路由投递到Queue,只有一个消费者能消费到 也就是一对一发送

请添加图片描述

回归主题 发布订阅就是生产者的消息通过交换机写到多个队列,不同的订阅者消费不同的队列,也就是实现了一对多

发布订阅的模式分为四种

  1. Direct(直连)模式:把消息放到交换机指定key的队列里面。
  2. Topic(主题)模式: 把消息放到交换机指定key的队列里面,额外增加使用"*“匹配一个单词或使用”#"匹配多个单词
  3. Headers(头部)模式:把消息放到交换机头部属性去匹配队列
  4. Fanout(广播)模式:把消息放入交换机所有的队列,实现广播

发布订阅-代码编写

1. direct模式编写

主要就是通过 routingKey 匹配实现路由 这里的zs就是routingKey

生产者

import amqplib from 'amqplib'
const connection = await amqplib.connect('amqp://localhost:5672')
//创建一个频道
const channel = await connection.createChannel() 
//声明一个交换机
/**
 * @param {String} exchange 交换机的名称
 * @param {String} type "direct" | "topic" | "headers" | "fanout" | "match" | 使用广播模式
 * @param {Object} options {durable: true} //开启消息持久化
 */
await channel.assertExchange('logs', 'direct', {
    durable: true
})
//发送消息
/**
 * @param {String} exchange 交换机的名称
 * @param {String} routingKey 路由键
 * @param {Buffer} content 消息内容
 */
 //这里的zs就是routingKey
channel.publish('logs', 'zs', Buffer.from('小满direct模式发送的消息'))

//断开
await channel.close()
await connection.close()
process.exit(0)

消费者(我们编写多个方便测试)

consume.js

import amqplib from 'amqplib'
const connection = await amqplib.connect('amqp://localhost:5672')
const channel = await connection.createChannel() //创建一个频道

await channel.assertExchange('logs', 'direct', {
    durable: true
})

//添加一个队列
const { queue } = await channel.assertQueue('queue1', {
    durable: true
})
//绑定交换机
/**
 * @param {String} queue 队列名称
 * @param {String} exchange 交换机名称
 * @param {String} routingKey 路由键
 */
//匹配对应的zs值才能收到
await channel.bindQueue(queue, 'logs', 'zs')
//接收消息
channel.consume('queue1', (msg) => {
    console.log(msg.content.toString());
}, {
    noAck: true //自动确认消息被消费
})

consume2.js

import amqplib from 'amqplib'
const connection = await amqplib.connect('amqp://localhost:5672')
const channel = await connection.createChannel() //创建一个频道

await channel.assertExchange('logs', 'direct', {
    durable: true
})

//添加一个队列
const { queue } = await channel.assertQueue('queue2', {
    durable: true
})
//绑定交换机
/**
 * @param {String} queue 队列名称
 * @param {String} exchange 交换机名称
 * @param {String} routingKey 路由键
 */
 //匹配对应的zs值才能收到
await channel.bindQueue(queue, 'logs', 'zs')
//接收消息
channel.consume('queue2', (msg) => {
    console.log(msg.content.toString());
}, {
    noAck: true //自动确认消息被消费
})
2. Topic模式编写

我们把模式切换成了Topic 并且publish 发布的时候 routingKey 换成了 xm.xxxxxxxx

生产者

import amqplib from 'amqplib'
const connection = await amqplib.connect('amqp://localhost:5672')
//创建一个频道
const channel = await connection.createChannel() 
//声明一个交换机
/**
 * @param {String} exchange 交换机的名称
 * @param {String} type "direct" | "topic" | "headers" | "fanout" | "match" | 使用广播模式
 * @param {Object} options {durable: true} //开启消息持久化
 */
await channel.assertExchange('topic', 'topic', {
    durable: true
})
//发送消息
/**
 * @param {String} exchange 交换机的名称
 * @param {String} routingKey 路由键
 * @param {Buffer} content 消息内容
 */
 //注意这儿匹配规则换了 换成xm.xxxxxxxxxxxxxxxxxxxxx
channel.publish('logs', 'xm.sadsdsdasdasdasdsda', Buffer.from('小满topic模式发送的消息'))

//断开
await channel.close()
await connection.close()
process.exit(0)

消费者匹配(注意这里匹配规则xm.*'使用了* 就是模糊匹配的意思)

import amqplib from 'amqplib'
const connection = await amqplib.connect('amqp://localhost:5672')
const channel = await connection.createChannel() //创建一个频道

await channel.assertExchange('topic', 'topic', {
    durable: true
})

//添加一个队列
const { queue } = await channel.assertQueue('queue1', {
    durable: true
})
//绑定交换机
/**
 * @param {String} queue 队列名称
 * @param {String} exchange 交换机名称
 * @param {String} routingKey 路由键 *匹配一个单词 #匹配多个单词
 */
 //这儿变化了
await channel.bindQueue(queue, 'topic', 'xm.*')
//接收消息
channel.consume('queue1', (msg) => {
    console.log(msg.content.toString());
}, {
    noAck: true //自动确认消息被消费
})
3. Headers模式

生产者(注意 publish 增加第四个参数开启了header 添加了data参数)

import amqplib from 'amqplib'
const connection = await amqplib.connect('amqp://localhost:5672')
//创建一个频道
const channel = await connection.createChannel() 
   //声明一个交换机
   /**
    * @param {String} exchange 交换机的名称
    * @param {String} type "direct" | "topic" | "headers" | "fanout" | "match" | 使用广播模式
    * @param {Object} options {durable: true} //开启消息持久化
    */
   await channel.assertExchange('headers', 'headers', {
       durable: true
   })
   //发送消息
   /**
    * @param {String} exchange 交换机的名称
    * @param {String} routingKey 路由键
    * @param {Buffer} content 消息内容
    * @param {Object} options {headers: {'key': 'value'}} //定义匹配规则
    */
    //嘿 这儿变了
   channel.publish('headers', '', Buffer.from('小满headers模式发送的消息'),{
       headers: {
           data:'xmzs'
       }
   })

   //断开
   await channel.close()
   await connection.close()
   process.exit(0)

消费者(bindQueue 增加一个对象 属性跟生产者对应即可)

   import amqplib from 'amqplib'
   const connection = await amqplib.connect('amqp://localhost:5672')
   const channel = await connection.createChannel() //创建一个频道
   await channel.assertExchange('headers', 'headers')

   //添加一个队列
   const { queue } = await channel.assertQueue('queue1')
   //绑定交换机
   /**
    * @param {String} queue 队列名称
    * @param {String} exchange 交换机名称
    * @param {String} routingKey 路由键 *匹配一个单词 #匹配多个单词
    */
   await channel.bindQueue(queue, 'headers', '',{
       data:'xmzs' //注意这儿不加headers 直接放值即可
   })
   //接收消息
   channel.consume(queue, (msg) => {
       console.log(msg.content.toString());
   }, {
       noAck: true //自动确认消息被消费
   })
4. Fanout模式

生产者(其实也就是routingKey 变成一个空值实现全体广播)

import amqplib from 'amqplib'
const connection = await amqplib.connect('amqp://localhost:5672')
//创建一个频道
const channel = await connection.createChannel()
//声明一个交换机
/**
* @param {String} exchange 交换机的名称
* @param {String} type "direct" | "topic" | "headers" | "fanout" | "match" | 使用广播模式
* @param {Object} options {durable: true} //开启消息持久化
*/
await channel.assertExchange('fanout', 'fanout')
//发送消息
/**
* @param {String} exchange 交换机的名称
* @param {String} routingKey 路由键
* @param {Buffer} content 消息内容
*/
channel.publish('fanout', '', Buffer.from('小满fanout模式发送的消息'))

//断开
await channel.close()
await connection.close()
process.exit(0)

消费者(routingKey接受空值即可 就算有值也会被忽略)

import amqplib from 'amqplib'
const connection = await amqplib.connect('amqp://localhost:5672')
const channel = await connection.createChannel() //创建一个频道
await channel.assertExchange('fanout', 'fanout')

//添加一个队列
const { queue } = await channel.assertQueue('queue1')
//绑定交换机
/**
* @param {String} queue 队列名称
* @param {String} exchange 交换机名称
* @param {String} routingKey 路由键 *匹配一个单词 #匹配多个单词
*/
await channel.bindQueue(queue, 'fanout', '')
//接收消息
channel.consume(queue, (msg) => {
console.log(msg.content.toString());
}, {
noAck: true //自动确认消息被消费
})

总结

通过使用RabbitMQ作为缓冲,避免数据库服务崩溃的风险。生产者将消息放入队列,消费者从队列中读取消息并进行处理,随后确认消息已被处理。在应用之间存在一对多的关系时,可以使用Exchange交换机根据不同的规则将消息转发到相应的队列:

  1. 直连交换机(direct exchange):根据消息的路由键(routing key)将消息直接转发到特定队列。
  2. 主题交换机(topic exchange):根据消息的路由键进行模糊匹配,将消息转发到符合条件的队列。
  3. 头部交换机(headers exchange):根据消息的头部信息进行转发。
  4. 广播交换机(fanout exchange):将消息广播到交换机下的所有队列

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1810333.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

php 混合xml js,html 代码报错 ,结束标签关闭, short_open_tag 的作用,php关闭文件结束判断

结束标签关闭, short_open_tag 的作用,php关闭文件结束判断 有时候我们我们会将php,xml,js,html 混合编写 php文件只要开始标签而不要结尾标签? 混合代码看代码 直接运行 yntax error, unexpected version (T_STRING) in php…

【差分数组】1674. 使数组互补的最少操作次数

本文涉及知识点 差分数组 LeetCode1674. 使数组互补的最少操作次数 给你一个长度为 偶数 n 的整数数组 nums 和一个整数 limit 。每一次操作,你可以将 nums 中的任何整数替换为 1 到 limit 之间的另一个整数。 如果对于所有下标 i(下标从 0 开始&…

reverse入门刷题(6.9)

总结: 拿到附件,先运行看看有没有信息,再查壳,再IDA运行 1.Easy_vb 收获: 使用搜索:在String的时候用的是ctrlf 在IDA_view的时候使用搜索是Aitt 打开IDA,Aitt搜索MCTF(关键字即…

2024年6月最新开源电视影视TVAPP原生源码和后台管理平台源码及完整教程

本套源码为本人维护更新完善半年左右的还在使用开发的源码,与市面上倒卖的残次品不一样,没有可比性,向下兼容安卓4.0,向上兼容安卓13以上TV电视系统, 完全无闪退,弹窗报错,卡死、异常死循环残次…

加解密算法及国密算法应用

常见的加解密算法可以分为可逆和不可逆两种 不可逆算法 哈希算法,MD5,hs-256,SM3 一般系统中使用密码加密和数据防篡改校验字段就是不可逆算法 hs-256应用:JWT header头部payload荷载signature签名(防止篡改) 前两部分采用base…

数组双指针经典习题

合并两个有序数组 class Solution {public void merge(int[] nums1, int m, int[] nums2, int n) {int p1m-1,p2n-1;int p3nums1.length-1;while(p1>0&&p2>0){//放完一个数组if(nums1[p1]>nums2[p2]){nums1[p3--]nums1[p1];p1--;}else{nums1[p3--]nums2[p2];p…

微服务Day7学习-数据聚合、同步、补全

文章目录 数据聚合聚合分类 自动补全DSL实现Bucket聚合DSL实现Metrics聚合RestAPI实现聚合多条件聚合对接前端接口拼音分词器自定义分词器自动补全查询实现酒店搜索框自动补全 数据同步数据同步思路分析利用mq实现mysql与elasticsearch数据同步 集群介绍搭建ES集群 数据聚合 聚…

搭建vauditdemo靶场mysql为NO问题

一、问题 在搭建vauditdemo时,遇到如下显示问题: mysql版本检测为NO 二、解决 查找该方面问题时,并没有找到解决方法 然后换mysql版本换了五六个也没有解决问题 问了AI后给的答复有一条为将mysql改为mysqli 修改保存后解决问题 步骤如…

力扣 T62 不同路径

题目 连接 思路 思路1 &#xff1a; BFS爆搜 class Solution { public:queue<pair<int,int>>q;int uniquePaths(int m, int n) {q.push({1,1}); // 起始位置vector<pair<int, int>> actions;actions.push_back({0, 1}); // 向下actions.push_bac…

OBS 录屏软件:录制圆形头像画中画,设置卡通人像(保姆级教程,有步骤图,建议收藏)

Mac分享吧 文章目录 一、OBS录屏软件 圆形头像画中画效果注意&#xff1a;圆形画中画仅需要在软件中设置一次&#xff0c;每次录制&#xff0c;使用带有圆形头像画中画的场景 录制视频即可。该场景不可删除&#xff01;&#xff01;&#xff01;若删除&#xff0c;则需要重新设…

Vue3学习日记(day2)

目录 前言 注意事项 vite使用 1&#xff1a;控制台vite创建vue 2&#xff1a;使用可视化软件&#xff08;我使用为vscode&#xff09;npm安装对应依赖包 3&#xff1a;使用npm脚本或者直接在终端输入命令运行软件后打开生成网址 4&#xff1a;打开网址正常进入网页 rou…

【漏洞复现】飞企互联-FE企业运营管理平台 treeXml.jsp SQL注入漏洞

0x01 产品简介 飞企互联-FE企业运营管理平台是一个基于云计算、智能化、大数据、物联网、移动互联网等技术支撑的云工作台。这个平台可以连接人、链接端、联通内外&#xff0c;支持企业B2B、C2B与020等核心需求&#xff0c;为不同行业客户的互联网转型提供支持。其特色在于提供…

限流(服务降级):基于自定义注解+切面的方式实现接口调用频率限制

文章目录 引言I 基于GuavaCache实现频率限制1.1 基于LoadingCache实现(灵活控制,高效率)【推荐】1.2 基于LoadingCache自定义RateLimiter (无法灵活控制限制时间范围)1.3 基于google的RateLimiter实现(效率低)II 基于Redis实现限流引言 背景:提供接口给下游(外部厂商)…

DeepSpeed Huggingface模型的自动Tensor并行、kernel注入、训练阶段的优化版kernel

推理阶段。 在后台&#xff0c;1. DeepSpeed会把运行高性能kernel(kernel injection)&#xff0c;加快推理速度&#xff0c;这些对用户是透明的&#xff1b; 2. DeepSpeed会根据mp_size来将模型放置在多个GPU卡上&#xff0c;自动模型并行&#xff1b; import os import torch …

Mysql学习(八)——多表查询

文章目录 五、多表查询5.1 多表关系5.2 多表查询概述5.3 内连接5.4 外连接5.5 自连接5.6 联合查询5.7子查询5.8 总结 五、多表查询 5.1 多表关系 概述&#xff1a;项目开发中&#xff0c;在进行数据库表结构设计时&#xff0c;会根据业务需求及业务模块之间的关系&#xff0c;…

freertos源码分析DAY2 (消息队列 )

目录 1. 队列原理 1.1 顺序队列操作 1.2 循环队列操作 2.消息队列原理 2.1消息队列的构成 2.2 消息队列出入队原则 2.3 消息队列发送/接收消息原理 2.4 队列锁机制 3. 消息队列创建及删除 3.1 创建消息队列函数 3.1.1 xQueueGenericCreate通用任务创建函数 3.1.1.1 …

人工智能在医学领域的应用及技术实现

欢迎来到 Papicatch的博客 目录 &#x1f349;引言 &#x1f349; 医学影像分析 &#x1f348;技术实现 &#x1f34d;数据准备 &#x1f34d;模型构建 &#x1f34d;模型训练 &#x1f34d;模型评估 &#x1f34d;应用部署 &#x1f348;示例代码 &#x1f349; 基因…

简单动态字符串SDS

简单动态字符串&#xff08;simple dynamic string&#xff09;: redis虽然说是用C语言重写的&#xff0c;但它也进行了一些创新&#xff0c;自己构建了简单动态字符串SDS&#xff0c;从名字也看得出来有别于以空字符结尾的字符数组&#xff08;C字符串&#xff09; reids中只…

谷歌AI助力软件工程的进展及未来展望

每周跟踪AI热点新闻动向和震撼发展 想要探索生成式人工智能的前沿进展吗&#xff1f;订阅我们的简报&#xff0c;深入解析最新的技术突破、实际应用案例和未来的趋势。与全球数同行一同&#xff0c;从行业内部的深度分析和实用指南中受益。不要错过这个机会&#xff0c;成为AI领…

【教学类-64-03】20240611色块眼力挑战(三)-2-10宫格色差10-50(10倍)适合中班幼儿园(星火讯飞)

背景需求&#xff1a; 【教学类-64-02】20240610色块眼力挑战&#xff08;二&#xff09;-2-25宫格&色差10-100&#xff08;10倍&#xff09;&#xff08;星火讯飞&#xff09;-CSDN博客文章浏览阅读360次&#xff0c;点赞17次&#xff0c;收藏13次。【教学类-64-02】2024…