egg.js使用消息队列rabbitMQ

news2025/1/19 23:16:17

1. egg-amqplib: 基于 rabbitmq 消息队列封装的库

安装:

npm i egg-amqplib --save

引入

// {app_root}/config/plugin.js
exports.amqplib = {
  enable: true,
  package: 'egg-amqplib',
};

设置

// {app_root}/config/config.default.js
exports.amqplib = {
  client: {
    // url: 'amqp://localhost',
    connectOptions: {
      protocol: 'amqp',
      hostname: 'localhost',
      port: 5672,
      username: 'guest',
      password: 'guest',
      locale: 'en_US',
      frameMax: 0,
      heartbeat: 0,
      vhost: '/',
    },
    // socketOptions: {
    //   cert: certificateAsBuffer, // client cert
    //   key: privateKeyAsBuffer, // client key
    //   passphrase: 'MySecretPassword', // passphrase for key
    //   ca: [caCertAsBuffer], // array of trusted CA certs
    // },
  },
};

查看github: https://github.com/zubinzhang/egg-amqplib

控制层:

'use strict';

const Controller = require('egg').Controller;
const queueName = 'test';

class HomeController extends Controller {
  async publish() {
    const { msg } = this.ctx.query;

    const ch = await this.app.amqplib.createChannel();
    await ch.assertQueue(queueName, { durable: false });
    const ok = await ch.sendToQueue(queueName, Buffer.from(msg));
    await ch.close();

    this.ctx.body = ok;
    this.ctx.status = 200;
  }

  async consume() {
    const ch = await this.app.amqplib.createChannel();
    await ch.assertQueue(queueName, { durable: false });
    const msg = await new Promise(resolve => ch.consume(queueName, msg => resolve(msg)));

    if (msg !== null) {
      ch.ack(msg);
      await ch.close();

      this.ctx.status = 200;
      this.ctx.body = { msg: msg.content.toString() };
    } else {
      this.ctx.status = 500;
    }
  }
}

module.exports = HomeController;

 路由:

'use strict';

module.exports = app => {
  const { router, controller } = app;

  router.get('/publish', controller.home.publish);
  router.get('/consume', controller.home.consume);
};

参考:egg-amqplib/test/fixtures/apps/amqplib-test/app/controller/home.js at master · zubinzhang/egg-amqplib · GitHub

2. 安装rabbitmq, 可使用docker安装rabbitmq

docker run --name rabbitmq -p 5672:567. -p 15672:15672 rabbitmq:3-management

访问地址: http://localhost:15672

默认的账号密码是: guest  :  guest

创建管理员:

3. 队列: 一对一

 

P 是我们的生产者  >  中间的框是一个队列,代表消费者保留的消息缓冲区  >  C 是我们的消费者 

'use strict';
const Controller = require('egg').Controller;
/**
 * 一对一队列演示
 */

// 频道名称
const queueName = 'hasone'

class UserController extends Controller {

  // 生成者
  async send() {
    // 1. 获取要发送的消息
    const { msg } = this.ctx.query
    // 2. 创建频道
    const ch = await this.app.amqplib.createChannel();
    // 3. 创建队列 durable 关闭持久化存储
    await ch.assertQueue(queueName, { durable: false }  );
    // 4. 发送消息
    const ok = await ch.sendToQueue(queueName, Buffer.from(msg));
    // 5. 关闭连接
    await ch.close();

    this.ctx.body = ok;
    this.ctx.status = 200;
  }

  // 消费者
  async work() {
    // 1. 创建频道
    const ch = await this.app.amqplib.createChannel();
    // 2. 选择队列
    await ch.assertQueue(queueName, { durable: false });
    //3. 接收队列的消息
    const resultMsg = await new Promise(resolve => ch.consume(queueName, msg => resolve(msg), { noAck: true }));

    // 4. 显示消息内容
    if (resultMsg !== null) {
      ch.ack(resultMsg);
      await ch.close();

      const { content } = resultMsg;
      this.status = 200;
      this.ctx.body = { msg: content.toString() }

    } else {
      this.ctx.body = '队列消费失败'
      this.ctx.status = 500;
    }
  }
}

module.exports = UserController;

 4. 队列: 一对多

'use strict';
const Controller = require('egg').Controller;
/**
 * 队列一对多演示
 * 生产者 ---->  队列 ----> 消费者
*                    ----> 消费者
                     ----> 消费者
 */

// 频道名称
const queueName = 'hasMany'

class UserController extends Controller {

  // 生成者
  async send() {
    const { msg } = this.ctx.query;
    //1. 创建频道
    const ch = await this.app.amqplib.createChannel();
    // 2. 创建队列 开启持久化存储
    await ch.assertQueue(queueName, { durable: true });
    // 3. 发送消息
    let ok = null;
    for(let i=0; i<50; i++) {
      // 此时我们确信即使RabbitMQ重新启动,task_queue队列也不会丢失。现在我们需要将消息标记为持久性 - 通过使用持久性选项Channel.sendToQueue。
      ok = await ch.sendToQueue(queueName, Buffer.from(msg+i), { persistent: true });
    }
    //4. 关闭连接
    await ch.close();

    this.ctx.body = ok;
    this.ctx.status = 200;
  }

  // 消费者
  async work1() {
   // 1. 创建频道
   const ch = await this.app.amqplib.createChannel();
   //2. 选择队列
   await ch.assertQueue(queueName, { durable: true });
   // 3. 接收消息 noAck 关闭消息自动确认模式,需要手动 ack
   const resultMsg = await new Promise(resolve => ch.consume(queueName, msg => {

    setTimeout(() => {
      resolve(msg)
     }, 500)

   }, { noAck: false }) );

   if (resultMsg !== null) {
    const { content } = resultMsg;
     //消费者发回ack(nowledgement)告诉RabbitMQ已收到,处理了特定消息,RabbitMQ可以自由删除它
    ch.ack(resultMsg);
    await ch.close();

    this.ctx.body = { work1: content.toString() };
    this.ctx.status = 200;
   } else {
     this.ctx.body = '消费者1号失败'
     this.ctx.status = 500
   }

  }

  async work2() {
     // 1. 创建频道
   const ch = await this.app.amqplib.createChannel();
   //2. 选择队列 RabbitMQ永远不会丢失我们的队列。为此,我们需要声明它是持久的
   await ch.assertQueue(queueName, { durable: true });
   // 3. 接收消息 noAck 开启自动确认模式
   const resultMsg = await new Promise(resolve => ch.consume(queueName, msg => {
    setTimeout(() => {
      resolve(msg)
     }, 1000)

   }, { noAck: false }) );

   if (resultMsg !== null) {
    const { content } = resultMsg;
    ch.ack(resultMsg);
    await ch.close();

    this.ctx.body = { work2: content.toString() };
    this.ctx.status = 200;
   } else {
     this.ctx.body = '消费者2号失败'
     this.ctx.status = 500
   }
  }

  async work3() {
     // 1. 创建频道
   const ch = await this.app.amqplib.createChannel();
   //2. 选择队列
   await ch.assertQueue(queueName, { durable: true });
   // 3. 接收消息 noAck 开启自动确认模式
   const resultMsg = await new Promise(resolve => ch.consume(queueName, msg => {

    setTimeout(() => {
      resolve(msg)
     }, 1500)


   }, { noAck: false }) );


   if (resultMsg !== null) {
    const { content } = resultMsg;
    //消费者发回ack(nowledgement)告诉RabbitMQ已收到,处理了特定消息,RabbitMQ可以自由删除它
    ch.ack(resultMsg);
    await ch.close();

    this.ctx.body = { work3: content.toString() };
    this.ctx.status = 200;
   } else {
     this.ctx.body = '消费者3号失败'
     this.ctx.status = 500
   }

  }
}

module.exports = UserController;

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1919884.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Zero-shot learning for requirements classification: An exploratory study

Zero-shot learning for requirements classification: An exploratory study A B S T R A C T 背景:需求工程(RE)研究人员一直在试验机器学习(ML)和深度学习(DL)方法来完成一系列的需求工程任务,比如需求分类、需求跟踪、歧义检测和建模。然而,今天的…

VSCode上通过C++实现单例模式

单例模式实际上就是为了确保一个类最多只有一个实例,并且在程序的任何地方都可以访问这个实例,也就是提供一个全局访问点,单例对象不需要手动释放,交给系统来释放就可以了,单例模式的设计初衷就是为了在整个应用程序的…

Web3 ETF 软件系统的开发框架

Web3 ETF 软件系统的开发框架主要包括智能合约层、前端层、后端层和基础设施层,下面进行详细的介绍。北京木奇移动技术有限公司,专业的软件外包开发公司,欢迎交流合作。 智能合约层 智能合约是运行在区块链上的程序,用于定义和执…

基于ssm的图书管理系统的设计与实现

摘 要 在当今信息技术日新月异的时代背景下,图书管理领域正经历着深刻的变革,传统的管理模式已难以适应现代社会的快节奏和高要求,逐渐向数字化、智能化的方向演进。本论文聚焦于这一转变趋势,致力于设计并成功实现一个基于 SSM&…

[计算机网络] VPN技术

VPN技术 1. 概述 虚拟专用网络(VPN)技术利用互联网服务提供商(ISP)和网络服务提供商(NSP)的网络基础设备,在公用网络中建立专用的数据通信通道。VPN的主要优点包括节约成本和提供安全保障。 优…

博物馆地图导航系统:高精度地图引擎与AR/VR融合,实现博物馆数字化转型

在人民日益追求精神文化的时代下,博物馆作为传承与展示人类文明的璀璨殿堂,其重要性不言而喻。然而,随着博物馆规模的不断扩大和藏品种类的日益丰富,游客在享受知识盛宴的同时,也面临着“迷路”与“错过”的困扰。博物…

综合实验作业

node01:192.168.175.146 node02:192.168.175.147 【node01】 node01 与 node02 防火墙在本实验中都需要放行的服务; [rootlocalhost ~]# firewall-cmd --permanent --add-servicedns success [rootlocalhost ~]# firewall-cmd --permanent -…

【C语言】 —— 预处理详解(下)

【C语言】 —— 预处理详解(下) 前言七、# 和 \##7.1 # 运算符7.2 ## 运算符 八、命名约定九、# u n d e f undef undef十、命令行定义十一、条件编译11.1、单分支的条件编译11.2、多分支的条件编译11.3、判断是否被定义11.4、嵌套指令 十二、头文件的包…

以太网中的各种帧结构

帧结构(Ethernet Frame Structure)介绍 以太网信号帧结构(Ethernet Signal Frame Structure),有被称为以太网帧结构,一般可以分为两类 —— 数据帧和管理帧。 按照 IEEE 802.3,ISO/IEC8803-3 …

Django 框架下的media和static静态文件

Django有两种静态文件 static: 静态文件夹,存放CSS,JS,网站的一些图片等静态资源,为Templates下的html页面提供的。static是不会变化的 media:媒体文件夹,存放网站中用户所相关的一些文件,比如说用户的图片…

深度解析蚂蚁 SEO 蜘蛛池:提升网站流量的有效利器

在当今数字化时代,网站流量对于企业和个人的在线业务成功至关重要。为了在竞争激烈的网络环境中脱颖而出,众多站长和 SEO 从业者不断探索各种优化策略,其中蚂蚁 SEO 的蜘蛛池成为备受关注的工具之一。 蚂蚁 SEO 蜘蛛池是一种创新的技术手段&a…

24/7/12总结

axios Axios 是一个基于 promise 网络请求库&#xff0c;作用于node.js 和浏览器中。 它是 isomorphic 的(即同一套代码可以运行在浏览器和node.js中)。在服务端它使用原生 node.js http 模块, 而在客户端 (浏览端) 则使用 XMLHttpRequests。 get请求: <script>function…

Cannot resolve symbol ‘HttpServlet‘

问题&#xff1a;不自动导包。 解决方案&#xff1a; https://blog.csdn.net/chenyu_Yang/article/details/136597181

9.Python学习:Socket

1.网络通信要素&#xff08;IP端口传输协议&#xff09; 2.Socket编程 2.1TCP、UDP协议了解 2.2 Socket流程 服务端有两个socket对象&#xff0c;客户端有一个 3.Socket实战 服务端代码&#xff1a; import socket #创建Socket对象 sksocket.socket() #绑定ip与端口号-使…

一文搞定node.js和Vue脚手架的介绍以及安装

node.js的介绍以及安装 node.js的介绍 node.js提供了前端程序的运行环境&#xff0c;可以把node.js理解成是运行前端程序的服务器。node.js的安装 从官网下载安装即可&#xff1a;http://nodejs.cn/download/不要勾选这个,否则会下载很多东西 node -v 是 查看node的版本 npm…

Ubuntu 22.04.4 LTS (linux) 安装 Auditd 安全审计

1 安装auditd sudo apt update sudo apt-get install auditd 2 修改配置 #sudo vim /etc/audit/auditd.conf #日志文件位置 log_file /var/log/audit/audit.log #日志文件大小(Mb) max_log_file 8 #日志文件数量 num_logs 53 启动服务 sudo systemctl restart aud…

【TOOLS】Chrome扩展开发

Chrome Extension Development 1. 入门教程 入门案例&#xff0c;可以访问【 谷歌插件官网官方文档 】查看官方入门教程&#xff0c;这里主要讲解大概步骤 Chrome Extenson 没有固定的脚手架&#xff0c;所以项目的搭建需要根据开发者自己根据需求搭建项目&#xff08;例如通过…

前端工程化:Webpack配置全攻略

前端工程化&#xff1a;Webpack配置全攻略 前端小伙伴们&#xff0c;今天我们来聊聊那个让人又爱又恨的 Webpack。没错&#xff0c;就是那个配置起来让你想砸键盘&#xff0c;但又离不开它的构建工具。别担心&#xff0c;跟着我来&#xff0c;保证让你从 Webpack 小白变成配置…

【k8s部署elasticsearch】k8s环境下安装elasticsearch集群和kibana

文章目录 简介一.条件及环境说明二.需求说明三.实现原理及说明四.详细步骤4.1.规划节点标签4.2.创建三个statefulset和service headless配置4.3.创建service配置 五.安装kibana六.调整索引分区七.安装说明 简介 k8s集群中搭建有elasticsearch服务一般都会用到pvc&#xff0c;但…

苹果入局,AI手机或将实现“真智能”?

【潮汐商业评论/原创】 “AI应用智能手机不就是现在的AI手机。” 当被问到现阶段对AI手机的看法时&#xff0c;John如是说。“术业有专攻&#xff0c;那么多APP在做AI功能&#xff0c;下载用就是了&#xff0c;也用不着现在换个AI手机啊。” 对于AI手机&#xff0c;或许大多…