使用 RabbitMQ 和 Go 构建异步订单处理系统

news2025/1/21 22:24:32

使用 RabbitMQ 和 Go 构建异步订单处理系统

我们可以通过构建一个订单处理系统来演示如何使用消息队列(MQ)实现异步任务处理。这个项目将使用 RabbitMQ 作为消息队列,并使用 Go 语言来实现。以下是项目的详细教程和相关环境配置。

项目描述

功能:模拟一个简单的电商订单处理系统,包括下单、库存扣减、邮件通知、以及发货通知。每个任务通过消息队列异步处理。

环境配置

1. 安装 Go 环境

确保已经安装 Go 语言开发环境,可以通过以下命令确认:

go version

如果没有安装,可以从 Go 官方网站 下载并安装。
2. 安装 RabbitMQ

RabbitMQ 可以通过 Docker 轻松安装:

docker run -d --hostname rabbitmq --name some-rabbit -p 5672:5672 -p 15672:15672 rabbitmq:3-management
RabbitMQ 管理控制台可以通过 http://localhost:15672 访问。
默认用户名和密码都是 guest。

3. 创建 Go 项目

mkdir order-processing
cd order-processing
go mod init order-processing

4. 安装 RabbitMQ Go 客户端库

go get github.com/streadway/amqp

项目结构

项目将包含以下文件和目录:

order-processing/
│
├── main.go                 // 入口文件,初始化 MQ 连接
├── producer.go             // 生产者代码,模拟下单请求
├── consumer_inventory.go   // 消费者代码,处理库存扣减
├── consumer_email.go       // 消费者代码,处理邮件发送
└── consumer_shipping.go    // 消费者代码,处理发货通知

代码实现

1. main.go - 初始化 MQ 连接

package main

import (
    "log"
    "github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
    if err != nil {
        log.Fatalf("%s: %s", msg, err)
    }
}

func main() {
    conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
    failOnError(err, "Failed to connect to RabbitMQ")
    defer conn.Close()

    ch, err := conn.Channel()
    failOnError(err, "Failed to open a channel")
    defer ch.Close()

    // 声明队列
    q, err := ch.QueueDeclare(
        "order_queue", // 队列名称
        true,          // 持久化
        false,         // 自动删除
        false,         // 独占
        false,         // 无需等待
        nil,           // 额外属性
    )
    failOnError(err, "Failed to declare a queue")

    log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
    
    // 创建消费者
    msgs, err := ch.Consume(
        q.Name, // 队列名称
        "",     // 消费者标识符
        true,   // 自动应答
        false,  // 独占
        false,  // 无需等待
        false,  // no local
        nil,    // 额外属性
    )
    failOnError(err, "Failed to register a consumer")

    forever := make(chan bool)

    go func() {
        for d := range msgs {
            log.Printf("Received a message: %s", d.Body)
        }
    }()

    <-forever
}

2. producer.go - 生产者代码

package main

import (
    "log"
    "github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
    if err != nil {
        log.Fatalf("%s: %s", msg, err)
    }
}

func main() {
    conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
    failOnError(err, "Failed to connect to RabbitMQ")
    defer conn.Close()

    ch, err := conn.Channel()
    failOnError(err, "Failed to open a channel")
    defer ch.Close()

    q, err := ch.QueueDeclare(
        "order_queue",
        true,
        false,
        false,
        false,
        nil,
    )
    failOnError(err, "Failed to declare a queue")

    body := "New Order"
    err = ch.Publish(
        "",
        q.Name,
        false,
        false,
        amqp.Publishing{
            ContentType: "text/plain",
            Body:        []byte(body),
        })
    failOnError(err, "Failed to publish a message")

    log.Printf(" [x] Sent %s", body)
}

3. consumer_inventory.go - 处理库存扣减

package main

import (
    "log"
    "github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
    if err != nil {
        log.Fatalf("%s: %s", msg, err)
    }
}

func main() {
    conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
    failOnError(err, "Failed to connect to RabbitMQ")
    defer conn.Close()

    ch, err := conn.Channel()
    failOnError(err, "Failed to open a channel")
    defer ch.Close()

    q, err := ch.QueueDeclare(
        "order_queue",
        true,
        false,
        false,
        false,
        nil,
    )
    failOnError(err, "Failed to declare a queue")

    msgs, err := ch.Consume(
        q.Name,
        "",
        true,
        false,
        false,
        false,
        nil,
    )
    failOnError(err, "Failed to register a consumer")

    forever := make(chan bool)

    go func() {
        for d := range msgs {
            log.Printf("Processing inventory for order: %s", d.Body)
            // 处理库存扣减逻辑
        }
    }()

    <-forever
}

4. consumer_email.go - 处理邮件发送

package main

import (
    "log"
    "github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
    if err != nil {
        log.Fatalf("%s: %s", msg, err)
    }
}

func main() {
    conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
    failOnError(err, "Failed to connect to RabbitMQ")
    defer conn.Close()

    ch, err := conn.Channel()
    failOnError(err, "Failed to open a channel")
    defer ch.Close()

    q, err := ch.QueueDeclare(
        "order_queue",
        true,
        false,
        false,
        false,
        nil,
    )
    failOnError(err, "Failed to declare a queue")

    msgs, err := ch.Consume(
        q.Name,
        "",
        true,
        false,
        false,
        false,
        nil,
    )
    failOnError(err, "Failed to register a consumer")

    forever := make(chan bool)

    go func() {
        for d := range msgs {
            log.Printf("Sending email for order: %s", d.Body)
            // 处理邮件发送逻辑
        }
    }()

    <-forever
}

5. consumer_shipping.go - 处理发货通知

package main

import (
    "log"
    "github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
    if err != nil {
        log.Fatalf("%s: %s", msg, err)
    }
}

func main() {
    conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
    failOnError(err, "Failed to connect to RabbitMQ")
    defer conn.Close()

    ch, err := conn.Channel()
    failOnError(err, "Failed to open a channel")
    defer ch.Close()

    q, err := ch.QueueDeclare(
        "order_queue",
        true,
        false,
        false,
        false,
        nil,
    )
    failOnError(err, "Failed to declare a queue")

    msgs, err := ch.Consume(
        q.Name,
        "",
        true,
        false,
        false,
        false,
        nil,
    )
    failOnError(err, "Failed to register a consumer")

    forever := make(chan bool)

    go func() {
        for d := range msgs {
            log.Printf("Notifying shipping for order: %s", d.Body)
            // 处理发货通知逻辑
        }
    }()

    <-forever
}

运行步骤

1.启动 RabbitMQ(如果使用 Docker 已启动,则跳过这步)。

docker run -d --hostname rabbitmq --name some-rabbit -p 5672:5672 -p 15672:15672 rabbitmq:3-management

2.启动消费者:分别运行 consumer_inventory.goconsumer_email.goconsumer_shipping.go文件。

3.发送订单消息:运行 producer.go 文件,模拟用户下单。
在这里插入图片描述

运行后的验证

1.消费者处理消息的日志输出:

每个消费者程序启动后,会开始监听 RabbitMQ 中相应的队列。当生产者发送消息时,消费者会从队列中读取消息,并输出处理的日志信息。
消费者处理消息的流程如下:
    consumer_inventory.go 处理库存更新的消息。
    consumer_email.go 处理订单确认邮件的发送。
    consumer_shipping.go 处理发货通知的消息。

你可以在每个运行的消费者窗口中看到相应的日志输出,例如:
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
2.RabbitMQ 管理界面监控: 通过访问 RabbitMQ 管理界面,你可以检查 RabbitMQ 队列的状态:

查看哪些队列正在运行。
检查队列中是否有消息积压。
查看消息的流入和流出情况,确保消费者正在从队列中消费消息。

在这里插入图片描述

项目中的关键点解释

RabbitMQ 消息模型:
    生产者会将消息发布到 RabbitMQ 的一个交换机(Exchange)上,交换机会将消息路由到相应的队列(Queue)。
    消费者监听队列并从中获取消息进行处理。每个消费者可以处理一个特定类型的消息。

AMQP协议:
    Go 代码中的 amqp 库基于 AMQP 协议(Advanced Message Queuing Protocol),这是 RabbitMQ 使用的消息协议。它定义了如何在生产者和消费者之间传递消息。

队列持久化与自动应答:
    RabbitMQ 队列可以配置为持久化消息,确保在系统重启时消息不会丢失。
    你可以在消费者中配置是否自动应答(acknowledgment),以确认消息处理成功后再从队列中移除。当前例子中是使用自动应答的模式。

问题排查

如果在运行过程中遇到问题,可以按以下步骤排查:

1.检查 RabbitMQ 是否正常运行:确保 Docker 容器正常运行,并且端口没有冲突。

docker ps

如果 RabbitMQ 没有启动,可以通过命令 docker start some-rabbit 来启动。

2.确保消费者能够连接到 RabbitMQ:如果消费者无法连接到 RabbitMQ,可能是因为配置错误或 RabbitMQ 服务未启动。检查消费者的日志输出,确认连接是否成功。

3.查看 RabbitMQ 日志:通过 RabbitMQ 管理界面可以查看日志,排查是否有消息未路由到队列或者连接失败的错误。

通过这些步骤,你应该能够顺利运行和验证这个消息队列项目。如果有更多问题或需要其他帮助,随时告诉我!

总结

通过这篇博客,你应当掌握了 RabbitMQ 的基本使用方法,了解了如何将 RabbitMQ 与 Go 应用集成,从而构建可靠的消息传递系统。这些技能将为你在开发异步消息处理和微服务架构方面奠定坚实的基础。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2108870.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

uniapp+vue3实现双通道透明MP4播放支持小程序和h5

双通道透明MP4视频播放的截图 以下是合成后结果&#xff0c;二个合并在一起进行播放 下载资源&#xff0c;打开运行直接使用看到效果 https://download.csdn.net/download/qq_40039641/89715780

[iBOT] Image BERT Pre-Training with Online Tokenizer

1、目的 探索visual tokenizer编码下的MIM&#xff08;Masked Image Modeling&#xff09; 2、方法 iBOT&#xff08;image BERT pre-training with Online Tokenizer&#xff09; 1&#xff09;knowledge distillation&#xff08;KD&#xff09; distill knowledge from the…

六、桥接模式

桥接模式&#xff08;Bridge Pattern&#xff09;是一种结构型设计模式&#xff0c;旨在将抽象与实现分离&#xff0c;使得两者可以独立变化。通过使用桥接模式&#xff0c;可以避免在多个维度上进行继承&#xff0c;降低代码的复杂度&#xff0c;从而提高系统的可扩展性。 组成…

c# Csv文件读写示例,如果文件存在追加写入

功能 1.写入 2.读取 导出文件效果 调用示例 注意示例中的ToDataTable()方法是自己的封装的扩展方法&#xff0c;源码在集合扩展方法-CSDN博客 private List<MarkDataModel> createMarkDataList(int count){var markDataModels new List<MarkDataModel>();for (…

WSA事件模型

服务端骨架&#xff1a; #include <iostream> #include <WinSock2.h> #pragma comment(lib,"ws2_32.lib") #include <windows.h>int main() {WSADATA lpWSAData;WSAStartup(MAKEWORD(2, 2), &lpWSAData);SOCKADDR_IN saddr{ 0 };saddr.sin_add…

【生日视频制作】海上绿色摩托艇汽车艇车身AE模板修改文字软件生成器教程特效素材【AE模板】

生日视频制作教程海上绿色摩托艇汽车艇车身AE模板修改文字特效广软件告生成神器素材祝福玩法AE模板替换工程 怎么如何做的【生日视频制作】海上绿色摩托艇汽车艇车身AE模板修改文字软件生成器教程特效素材【AE模板】 生日视频制作步骤&#xff1a; 安装AE软件 下载AE模板 把AE模…

单点登录问题【拼多多0905一面】

说一些今晚情况&#xff0c;7点腾讯音乐笔试&#xff0c;因为8点拼多多一面&#xff0c;哪个都拒不了。硬着头皮50分钟写了1.2题然后去面试。刚开始状态真的很差&#xff0c;大脑思考不动&#xff0c;面试中2个手撕&#xff0c;做出来一个&#xff0c;两个项目问题&#xff0c;…

用Java实现一个简易的炸金花小游戏

最近闲暇时间写了个用Java实现的简易版的炸金花小游戏&#xff0c;还是很有趣的&#xff0c;下面具体来介绍下具体实现。 下面这个是初始化页面。 一、设计思路 1、首先要了解炸金花的游戏规则&#xff0c;针对整个游戏过程来考虑。从游戏开始后的抽牌选出庄家&#xff0c;再到…

shell 中的grep sed awk命令

目录 一、grep命令 1.基本格式 2.常用命令 3. sort 命令 3.1 格式 3.2 常用选项 4. uniq 快捷去重 4.1 格式 5. tr 命令 5.1 格式 5.2 常用选项 练习&#xff1a; 二、sed 命令 1. sed基本概念 2. 基本操作格式 3. 常用选项 4. 应用 5. 文本模式过滤行内容 6. s…

C#自定义控件的放置与拖动

1、自定义控件 using System; using System.Collections.Generic; using System.ComponentModel; using System.Drawing; using System.Drawing.Drawing2D; using System.Linq; using System.Text; using System.Threading.Tasks; using System.Windows.Forms;namespace PartA…

【mysql】mysql修改sql_mode之后无法启动

现象&#xff1a;修改后mysql无法启动&#xff0c;不报错 原因&#xff1a;MySQL在8以后sql_mode已经取消了NO_AUTO_CREATE_USER这个关键字。去掉这个关键字后&#xff0c;启动就可以了 修改前&#xff1a; sql_modeSTRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR…

jenkins 工具使用

使用方式 替代手动&#xff0c;自动化拉取、集成、构建、测试&#xff1b;是CI/CD持续集成、持续部署主流开发模式中重要的环节&#xff1b;必须组件 jenkins-gitlab&#xff0c;代码公共仓库服务器&#xff08;至少6G内存&#xff09;&#xff1b;jenkins-server&#xff0c;…

查看当前主机的硬盘是固态硬盘还是机械硬盘

windows主机下查看硬盘类型方法&#xff1a; 打开dos界面&#xff0c;输入 powershell进入powershell界面 在PowerShell窗口中&#xff0c;输入 Get-PhysicalDisk 命令并按回车。 发现MediaType下面的值为HDD,即为机械硬盘&#xff0c;如果是固态硬盘&#xff0c;则为SSD

什么是CDN及其如何影响SEO?

有没有想过&#xff0c;为什么你的网站在谷歌搜索结果的后几页徘徊&#xff0c;即使你已经优化了每一个网页&#xff1f; 有时候&#xff0c; 慢速的网站性能可能是罪魁祸首。 如果这个问题引起了你的共鸣&#xff0c;那么你可能想要探索一下内容分发网络&#xff08;Content…

Android平台通过RTSP服务实现摄像头麦克风共享

技术背景 前些年&#xff0c;我们在完成Android平台RTMP直播推送模块后&#xff0c;遇到这样的技术需求&#xff0c;好多开发者希望在Android平台&#xff0c;实现摄像头和麦克风音视频数据采集编码打包后&#xff0c;对外提供RTSP&#xff08;Real Time Streaming Protocol&a…

【DEV工具-IDEA】新建springboot项目,无法选择java8?

问题&#xff1a;新建springboot项目&#xff0c;无法选择java8。 #解决&#xff1a; 替换为 &#xff1a;https://start.aliyun.com

基于蜣螂优化最小二乘支持向量机的数据分类预测Matlab程序DBO-LSSVM 多特征输入多类别输出 含基础程序

基于蜣螂优化最小二乘支持向量机的数据分类预测Matlab程序DBO-LSSVM 多特征输入多类别输出 含基础程序 文章目录 一、基本原理DBO&#xff08;Dung Beetle Optimization&#xff09;算法原理LSSVM&#xff08;Least Squares Support Vector Machine&#xff09;模型原理DBO-LSS…

【C#跨平台开发详解】C#跨平台开发技术之.NET Core基础学习及快速入门

1. C#与.NET的发展历程 C#是由Microsoft开发的现代编程语言&#xff0c;最初伴随着.NET Framework发布。随着技术的进步&#xff0c;特别是针对跨平台开发的需求&#xff0c;Microsoft推出了.NET Core&#xff0c;这是一个开源且跨平台的框架&#xff0c;支持Windows、macOS和…

微信小程序手写签名

微信小程序手写签名组件 该组件基于signature_pad封装&#xff0c;signature_pad本身是web端的插件&#xff0c;此处将插件代码修改为小程序端可用。 signature_pad.js /*!* Signature Pad v5.0.3 | https://github.com/szimek/signature_pad* (c) 2024 Szymon Nowak | Releas…

Windows10 安全加固之禁止光驱、U盘等自动播放

在使用Windows10系统的电脑中插入插入光盘或者U盘时,默认是自动播放的,这样会引入一些可能不安全的因素。因此,为了系统安全,有必要禁止光驱、U盘等自动播放。具体方法如下: 方法一:通过设置页面关闭 第1步:单击win10系统的“开始”菜单->“设置”,打开“windows设…