RabbitMQ 消息队列 优化发送邮件

news2025/2/23 0:21:00

express 发送邮件

  • 最简单的异步发送邮件方法
  • 为何要使用 RabbitMQ?
  • 如何在 Node 项目中集成 RabbitMQ?

一、 不用 await 发送邮件

在实际开发之前,不妨先思考下,我们最终的目的是为了让邮件异步发送。那发送邮件这里有个await,我们干嘛不直接把这个await去掉,这不就完事了吗?这样不就是不等待邮件发送完成,直接提示成功了吗?

我要告诉大家,你想的一点也没错,这样做 100% 可以的。当然这样做,也会造成一些问题。因为不等待异步执行的结果,如果发送邮件出现错误了,会导致catch里无法捕获错误

1、参考案例:
router.post('/sign_up', validateCaptcha, async function (req, res) {
  try {
    const body = {
      email: req.body.email,
      username: req.body.username,
      nickname: req.body.nickname,
      password: req.body.password,
      sex: 2,
      role: 0
    }

    const user = await User.create(body);
    delete user.dataValues.password;         // 删除密码
    // 请求成功,删除验证码,防止重复使用
    await delKey(req.body.captchaKey);

    const html = `
  您好,<span style="color: red">${user.nickname}。</span><br><br>
  恭喜,您已成功注册会员!<br><br>
  xw`
    await sendMail(user.email, '「xw」的注册成功通知', html);

    success(res, '创建用户成功。', {user}, 201);
  } catch (error) {
    failure(res, error);
  }
});
2、去掉sendMail前的await
router.post('/sign_up', validateCaptcha, async function (req, res) {
  try {
    const body = {
      email: req.body.email,
      username: req.body.username,
      nickname: req.body.nickname,
      password: req.body.password,
      sex: 2,
      role: 0
    }

    const user = await User.create(body);
    delete user.dataValues.password;         // 删除密码
    // 请求成功,删除验证码,防止重复使用
    await delKey(req.body.captchaKey);

    const html = `
  您好,<span style="color: red">${user.nickname}。</span><br><br>
  恭喜,您已成功注册会员!<br><br>
  xw`
    sendMail(user.email, '「xw」的注册成功通知', html);

    success(res, '创建用户成功。', {user}, 201);
  } catch (error) {
    failure(res, error);
  }
});

结果 :可以注册成功 但是终端崩溃了,再去请求其他任何接口,都是无法访问的了

在这里插入图片描述

在这里插入图片描述

3、原因就是因为,这是一个异步操作,我们没有等待异步操作的结果,所以没法在这里捕获异常。解决方法也很简单,打开utils/mail.js,给发送邮件函数,加上try…catch即可。
/**
 * 发送邮件
 * @param email
 * @param subject
 * @param html
 * @returns {Promise<void>}
 */
const sendMail = async (email, subject, html) => {
  try {
    await transporter.sendMail({
      from: process.env.MAILER_USER,
      to: email,
      subject,
      html,
    });
  } catch (error) {
    console.log('邮件发送失败:', error);
  }
};

在这里插入图片描述

最后运行:虽然现在还是报错了,但是程序自身并没有崩溃。你去访问其他接口,也是完全不受影响的。

那么现在,我们就用非常简单的方式,实现了异步发送邮件了。这样是完全可行的

二、 为何要使用 RabbitMQ

既然这么简单就能实现了,那为何还要使用RabbitMQ呢?我觉得主要有以下三点目的

  • 1、解耦:将发送邮件的功能独立出去,这样即使邮件服务出现问题,也不会直接影响主应用的性能和可用性。
  • 2、增加并发:对于高并发的情况,将任务放入队列中可以起到缓冲作用,增加程序的吞吐量,防止后端服务因瞬间请求过多而崩溃。
  • 3、错误处理:如果消息队列中的任务处理失败,可以进行重试,确保任务最终能够成功完成。
    所以,对于非常简单的功能,你确实可以不写await来达到异步的目的。但对于大型程序来说,为了有更好的性能,增加并发处理能力,提高错误处理的可靠性,使用消息队列是一个更好的选择。

所以,对于非常简单的功能,你确实可以不写await来达到异步的目的。但对于大型程序来说,为了有更好的性能,增加并发处理能力,提高错误处理的可靠性,使用消息队列是一个更好的选择。

三. 在 Node 项目中集成 RabbitMQ

3.1. 环境变量

打开项目的.env文件,加入

RABBITMQ_URL=amqp://admin:xw@localhost

这样连接到RabbitMQ的时候,相关的信息,就可以从环境变量读取。

.env.example中加入

RABBITMQ_URL=

README.md中加入对应的说明

RABBITMQ_URL=你的连接

- `RABBITMQ_URL`配置为消息队列服务器地址。
3.2.、连接到 RabbitMQ

新建utils/rabbit-mq.js,里面创建一个连接

const amqp = require('amqplib');
const sendMail = require('./mail');

// 创建全局的 RabbitMQ 连接和通道
let connection;
let channel;

/**
 * 连接到 RabbitMQ
 * @returns {Promise<*>}
 */
const connectToRabbitMQ = async () => {
  if (connection && channel) return;  // 如果已经连接,直接返回

  try {
    connection = await amqp.connect(process.env.RABBITMQ_URL);
    channel = await connection.createChannel();
    await channel.assertQueue('mail_queue', { durable: true });
  } catch (error) {
    console.error('RabbitMQ 连接失败:', error);
  }
};

  • 顶部做了相关的引用。
  • 创建了全局的连接和通道。
  • 如果已经连接了,就直接返回。如果没有连接就连上,并创建通道。
  • 创建了队列,名字叫做:mail_queue。
  • 使用了durable: true,表示队列需要持久化。

3.3、 邮件队列生产者

接着继续建一个方法,来发送邮件

/**
 * 邮件队列生产者(发送消息)
 */
const mailProducer = async (msg) => {
  try {
    await connectToRabbitMQ(); // 确保已连接

    channel.sendToQueue('mail_queue', Buffer.from(JSON.stringify(msg)), { persistent: true });
  } catch (error) {
    console.error('邮件队列生产者错误:', error);
  }
};

  • 这里就直接将消息,发送到队列中。
  • 注意用JSON.stringify转了一下,说明我们传过来的msg将会是对象格式。
  • 使用了persistent: true,表示消息需要持久化。
3.4、 邮件队列消费者
/**
 * 邮件队列消费者(接收消息)
 */
const mailConsumer = async () => {
  try {
    await connectToRabbitMQ();
    channel.consume('mail_queue',
      async (msg) => {
        const message = JSON.parse(msg.content.toString());
        await sendMail(message.to, message.subject, message.html);
      }, {
        noAck: true,
      }
    );
  } catch (error) {
    console.error('邮件队列消费者错误:', error);
  }
};

module.exports = {
  mailProducer,
  mailConsumer,
};

  • 我们监听了mail_queue队列。
  • 如过收到消息了,就执行发送邮件。
  • 使用了noAck: true,表示自动确认消息。

四. 实际运用

4.1、启用生产者

打开routes/auth.js

// const sendMail = require('../utils/mail');
const { mailProducer } = require('../utils/rabbit-mq');

/**
 * 用户注册
 * POST /auth/sign_up
 */
router.post('/sign_up', validateCaptcha, async function (req, res) {
  try {
    // ...

    // 将邮件发送请求放入队列
    const msg = {
      to: user.email,
      subject: '「xw」的注册成功通知',
      html: `
        您好,<span style="color: red">${user.nickname}</span>。<br><br>
        恭喜,您已成功注册会员!<br><br>

        xw
      `,
    };
    await mailProducer(msg);

    success(res, '创建用户成功。', { user }, 201);
  } catch (error) {
    failure(res, error);
  }
});

  • 顶部将发送邮件的方法去掉,并引用一下生产者
  • 将要发送的信息,改为对象格式。
  • 调用生产者方法,就将信息发送到队列中了。
4.2、 启动消费者

根目录的app.js。在里面加上:


require('dotenv').config();

// 启动邮件消费者
const { mailConsumer } = require('./utils/rabbit-mq');
(async () => {
  await mailConsumer();
  console.log('邮件消费者已启动');
})();

这样只要 Node 项目一启动,消费者就会一直自动运行起来。在真实大型项目里,为了不影响主程序的稳定运行。更好的方式,应该将发送邮件解耦了,可以让消费者在专门的程序中独立启动

解耦封装

1、封装生产者、队列/utils/rabbit-mq.js
const amqp = require('amqplib');
const sendMail = require('./mail');

// 创建全局的 RabbitMQ 连接和通道
let connection;
let channel;

// 封装一个重试连接的函数,增加连接的稳定性
const connectWithRetry = async (url, retries = 5, delay = 5000) => {
  let attempt = 0;
  while (attempt < retries) {
    try {
      return await amqp.connect(url);
    } catch (error) {
      attempt++;
      console.error(`RabbitMQ 连接尝试 ${attempt} 失败:`, error);
      if (attempt < retries) {
        await new Promise(resolve => setTimeout(resolve, delay));
      }
    }
  }
  throw new Error('无法连接到 RabbitMQ,已达到最大重试次数');
};

/**
 * 连接到 RabbitMQ
 * @returns {Promise<*>}
 */
const connectToRabbitMQ = async () => {
  if (connection && channel) return;

  try {
    connection = await connectWithRetry(process.env.RABBITMQ_URL);
    channel = await connection.createChannel();

    // 监听连接关闭事件,方便处理异常
    connection.on('close', () => {
      console.warn('RabbitMQ 连接已关闭,尝试重新连接...');
      connection = null;
      channel = null;
    });

    // 监听连接错误事件,增强错误处理能力
    connection.on('error', (err) => {
      console.error('RabbitMQ 连接发生错误:', err);
    });

    await channel.assertQueue('mail_queue', { durable: true });
  } catch (error) {
    console.error('RabbitMQ 连接失败:', error);
    throw error;
  }
};

/**
 * 邮件队列生产者(发送消息)
 */
const mailProducer = async (msg) => {
  try {
    await connectToRabbitMQ(); // 确保已连接

    // 消息持久化设置,提高消息可靠性
    const options = { persistent: true };
    const sent = channel.sendToQueue('mail_queue', Buffer.from(JSON.stringify(msg)), options);
    if (!sent) {
      console.warn('消息未能立即入队,等待下次机会');
    }
  } catch (error) {
    console.error('邮件队列生产者错误:', error);
    throw error;
  }
};

/**
 * 邮件队列消费者(接收消息)
 */
const mailConsumer = async () => {
  try {
    await connectToRabbitMQ();

    // 消费消息时,手动确认消息,避免消息丢失
    channel.consume('mail_queue', async (msg) => {
      if (msg) {
        try {
          const message = JSON.parse(msg.content.toString());
          await sendMail(message.to, message.subject, message.html);
          channel.ack(msg); // 手动确认消息
        } catch (error) {
          console.error('处理邮件消息时出错:', error);
          channel.nack(msg, false, true); // 消息处理失败,重新入队
        }
      }
    }, { noAck: false }); // 关闭自动确认

    console.log('邮件队列消费者已开始监听');
  } catch (error) {
    console.error('邮件队列消费者错误:', error);
    throw error;
  }
};

module.exports = {
  mailProducer,
  mailConsumer,
};
2、封装消费者/utils/mail-consumer.js
require('dotenv').config();
const { mailConsumer } = require('./rabbit-mq');

// 封装启动消费者的函数,方便后续扩展和错误处理
const startMailConsumer = async () => {
  try {
    await mailConsumer();
    console.log('邮件消费者已启动');
  } catch (error) {
    console.error('启动邮件消费者时出错:', error);
    process.exit(1);
  }
};

// 启动消费者
startMailConsumer();

// 监听进程信号,优雅关闭消费者
process.on('SIGINT', async () => {
  console.log('收到 SIGINT 信号,正在优雅关闭邮件消费者...');
  try {
    // 这里可以添加关闭连接和通道的逻辑
    process.exit(0);
  } catch (error) {
    console.error('关闭邮件消费者时出错:', error);
    process.exit(1);
  }
});

如果在app.js里面使用了消费者 把代码去掉

3、安装pm2
npm i pm2
4、根目录新建cosystem.config.js
module.exports = {
  apps: [
    {
      name: "express-app",
      script: "./bin/www", // 实际路径
      watch: process.env.NODE_ENV === 'development', // 根据环境变量决定是否开启监听
      interpreter: "node",
      env: {
        NODE_ENV: "development"
      },
      env_production: {
        NODE_ENV: "production"
      }
    },
    {
      name: "mail-consumer",
      script: "./utils/mail-consumer.js", // 实际路径
      interpreter: "node",
      env: {
        NODE_ENV: "development"
      },
      env_production: {
        NODE_ENV: "production"
      }
    }
  ]
};
4、本地运行
pm2 start ecosystem.config.js

pm2 简单的方法

命令说明示例
pm2 start <app.js>启动一个 Node.js 应用程序。可指定 JavaScript 文件、JSON 配置文件或其他可执行文件。pm2 start app.js
pm2 list列出所有由 PM2 管理的应用程序,显示应用程序的状态、进程 ID、名称等信息。pm2 list
pm2 stop <app_name_or_id>停止指定名称或 ID 的应用程序,但不会从 PM2 的列表中删除。pm2 stop my_apppm2 stop 0
pm2 stop all停止所有由 PM2 管理的应用程序。pm2 stop all
pm2 restart <app_name_or_id>重启指定名称或 ID 的应用程序。pm2 restart my_apppm2 restart 0
pm2 restart all重启所有由 PM2 管理的应用程序。pm2 restart all
pm2 delete <app_name_or_id>从 PM2 的管理列表中删除指定名称或 ID 的应用程序,同时停止该应用程序。pm2 delete my_apppm2 delete 0
pm2 delete all从 PM2 的管理列表中删除所有应用程序。pm2 delete all
pm2 show <app_name_or_id>显示指定名称或 ID 的应用程序的详细信息,如环境变量、执行模式等。pm2 show my_apppm2 show 0
pm2 logs <app_name_or_id>显示指定名称或 ID 的应用程序的日志信息。若不指定则显示所有应用日志。pm2 logs my_apppm2 logs 0
pm2 monit实时监控由 PM2 管理的所有应用程序的 CPU 和内存使用情况。pm2 monit
pm2 save保存当前 PM2 管理的应用程序列表,以便在系统重启后自动恢复这些应用。pm2 save
pm2 resurrect恢复之前使用 pm2 save 保存的应用程序列表。pm2 resurrect
5、 宝塔中部署

启动选项

 pm2 start ecosystem.config.js --no-daemon

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2303673.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

NetLogon 权限提升漏洞

参考文章&#xff1a;CVE-2020-1472NetLogon权限提升漏洞_cve-2020-1472复现 谢公子-CSDN博客 域控机器账户&#xff1a;WIN-0V0GAORDC17 域控 ip&#xff1a;192.168.72.163 域内攻击者机器 ip&#xff1a;192.168.72.158&#xff0c;host&#xff1a;WIN10-01 攻击者 kali…

【C++】 Flow of Control

《C程序设计基础教程》——刘厚泉&#xff0c;李政伟&#xff0c;二零一三年九月版&#xff0c;学习笔记 文章目录 1、选择结构1.1、if 语句1.2、嵌套的 if 语句1.3、条件运算符 ?:1.4、switch 语句 2、循环结构2.1、while 语句2.2、do-while 语句2.3、 for 循环2.4、循环嵌套…

图论 之 迪斯科特拉算法求解最短路径

文章目录 题目743.网络延迟时间3341.到达最后一个房间的最少时间I 求解最短路径的问题&#xff0c;分为使用BFS和使用迪斯科特拉算法&#xff0c;这两种算法求解的范围是有区别的 BFS适合求解&#xff0c;边的权值都是1的图中的最短路径的问题 图论 之 BFS迪斯科特拉算法适合求…

Spring Boot 中事务的用法详解

引言 在 Spring Boot 中&#xff0c;事务管理是一个非常重要的功能&#xff0c;尤其是在涉及数据库操作的业务场景中。Spring 提供了强大的事务管理支持&#xff0c;能够帮助我们简化事务的管理和控制。本文将详细介绍 Spring Boot 中事务的用法&#xff0c;包括事务的基本概…

【react18】如何使用useReducer和useContext来实现一个todoList功能

重点知识点就是使用useReducer来攻坚小型的公共状态管理&#xff0c;useImmerReducer来实现数据的不可变 实现效果 实现代码 项目工程结构 App.js文件 import logo from "./logo.svg"; import "./App.css"; import TodoLists from "./comps/TodoLi…

一篇搞懂vue3中如何使用ref、reactive实现响应式数据

ref 可实现 基本类型、对象类型响应式数据 reactive&#xff1a;只能实现 对象类型响应式 ref实现 基本类型 数据响应式&#xff1a; <template><div class"person"><h2>姓名&#xff1a;{{ name }}</h2><h2>年龄&#xff1a;{{ ag…

【HeadFirst系列之HeadFirst设计模式】第7天之命令模式:封装请求,轻松实现解耦!

命令模式&#xff1a;封装请求&#xff0c;轻松实现解耦&#xff01; 大家好&#xff01;今天我们来聊聊设计模式中的命令模式&#xff08;Command Pattern&#xff09;。如果你曾经需要将请求封装成对象&#xff0c;或者希望实现请求的撤销、重做等功能&#xff0c;那么命令模…

Linux-Ansible自动化运维

文章目录 自动化运维Ansible &#x1f3e1;作者主页&#xff1a;点击&#xff01; &#x1f916;Linux专栏&#xff1a;点击&#xff01; ⏰️创作时间&#xff1a;2025年02月21日17点38分 自动化运维 自动化运维常用工具 Ansible 和 SaltStack 自动化运维优势&#xff1a; 服…

uni-app(位置1)

文章目录 一、获取当前的地理位置、速度 uni.getLocation(OBJECT)二、打开地图选择位置 uni.chooseLocation(OBJECT)三、使用应用内置地图查看位置。uni.openLocation(OBJECT) 一、获取当前的地理位置、速度 uni.getLocation(OBJECT) App平台 manifest中配置好自己的地图厂商k…

RabbitMQ服务异步通信

消息队列在使用过程中&#xff0c;面临着很多实际问题需要思考&#xff1a; 1. 消息可靠性 消息从发送&#xff0c;到消费者接收&#xff0c;会经理多个过程&#xff1a; 其中的每一步都可能导致消息丢失&#xff0c;常见的丢失原因包括&#xff1a; 发送时丢失&#xff1a; 生…

CSS基础(浮动、相对定位、绝对定位、固定定位、粘性定位、版心、重置默认样式)

文章目录 1. 浮动&#xff08;float&#xff09;1.1 简介1.2 元素浮动后的特点1.3 脱离文档流示例图1.4 浮动产生的影响1.4.1 积极影响1.4.2 消极影响 1.5 解决浮动产生的影响1.5.1 清除浮动&#xff08;Clearfix&#xff09;1.5.2 创建新的块格式化上下文&#xff08;BFC&…

Spring Cloud — Hystrix 服务隔离、请求缓存及合并

Hystrix 的核心是提供服务容错保护&#xff0c;防止任何单一依赖耗尽整个容器的全部用户线程。使用舱壁隔离模式&#xff0c;对资源或失败单元进行隔离&#xff0c;避免一个服务的失效导致整个系统垮掉&#xff08;雪崩效应&#xff09;。 1 Hystrix监控 Hystrix 提供了对服务…

RagFlow+Ollama 构建RAG私有化知识库

RagFlowOllama 构建RAG私有化知识库 关于RAG一、什么是RAGFlow一、RAGFlow 安装配置测服已有服务&#xff1a; mysql、redis、elasticsearch 二、RAGFlow 配置 ollama&#xff1a;本地运行大型语言模型的工具软件。用户可以轻松下载、运行和管理各种开源 LLM。降低使用门槛&…

【Linux】【网络】不同子网下的客户端和服务器通信

【Linux】【网络】不同子网下的客户端和服务器通信 前两天在进行socket()网络编程并进行测试时&#xff0c;发现在不同wifi下两个电脑无法进行连接&#xff0c;大概去查找了如何解决 看到可以使用 frp 这个快速反向代理实现。 frp 可让您将位于 NAT 或防火墙后面的本地服务器…

SpringBoot教程(十四) SpringBoot之集成Redis

SpringBoot教程&#xff08;十四&#xff09; | SpringBoot之集成Redis 一、Redis集成简介二、集成步骤 2.1 添加依赖2.2 添加配置2.3 项目中使用之简单使用 &#xff08;举例讲解&#xff09;2.4 项目中使用之工具类封装 &#xff08;正式用这个&#xff09;2.5 序列化 &…

OpenHarmony分布式数据管理子系统

OpenHarmony分布式数据管理子系统 简介 目录 组件说明 分布式数据对象数据共享分布式数据服务Key-Value数据库首选项关系型数据库标准数据化通路 相关仓 简介 子系统介绍 分布式数据管理子系统支持单设备的各种结构化数据的持久化&#xff0c;以及跨设备之间数据的同步、…

单片机 Bootloade与二进制文件的生成

单片机的 Bootloader 是一种特殊的程序&#xff0c;负责在单片机上电后初始化硬件、更新用户应用程序&#xff08;固件&#xff09;&#xff0c;并将控制权移交给用户程序。以下是其运行机制和关键流程的详细说明&#xff1a; 1、单片机 Bootloader 的核心作用 固件更新&…

MySQL数据库(3)—— 表操作

目录 一&#xff0c;创建表 1.1 创建表的SQL 1.2 演示 二&#xff0c;查看表 三&#xff0c;修改表 四&#xff0c;删除表 常用的表操作会涉及到两种SWL语句 DDL&#xff08;Data Definition Language&#xff09;数据定义语言&#xff1a;建表、改表、删表等&#xff0…

Springboot + Ollama + IDEA + DeepSeek 搭建本地deepseek简单调用示例

1. 版本说明 springboot 版本 3.3.8 Java 版本 17 spring-ai 版本 1.0.0-M5 deepseek 模型 deepseek-r1:7b 需要注意一下Ollama的使用版本&#xff1a; 2. springboot项目搭建 可以集成在自己的项目里&#xff0c;也可以到 spring.io 生成一个项目 生成的话&#xff0c;如下…

七星棋牌源码高阶技术指南:6端互通、200+子游戏玩法深度剖析与企业级搭建实战(完全开源)

在棋牌游戏行业高速发展的今天&#xff0c;如何构建一个具备高并发、强稳定性与多功能支持的棋牌游戏系统成为众多开发者和运营团队关注的焦点。七星棋牌全开源修复版源码 凭借其 六端互通、200子游戏玩法、多省区本地化支持&#xff0c;以及 乐豆系统、防沉迷、比赛场、AI智能…