在Go中迅速使用RabbitMQ

news2025/1/11 23:53:54

文章目录

  • 1 认识
    • 1.1 MQ分类
    • 1.2 安装
    • 1.3 基本流程
  • 2 [Work模型](https://www.rabbitmq.com/tutorials/tutorial-two-go#preparation)
  • 3 交换机
    • 3.1 fanout
    • 3.2 direct
    • 3.3 [topic](https://www.rabbitmq.com/tutorials/tutorial-five-go)
  • 4 Golang创建交换机/队列/Publish/Consume/Bind
  • 5 可靠性
    • 5.1 生产者可靠性
    • 5.2 MQ可靠性
      • 5.2.1 Lazy Queue
    • 5.3 消费者可靠性
    • 5.4 业务幂等性
    • 5.4 Golang实现可靠性
      • 1. 确保消息生产者的可靠性
      • 2. 确保消息队列的可靠性
      • 3. 确保消息消费者的可靠性
      • 4. 容错处理
  • 6 延迟消息
    • 6.1 死信交换机
    • 6.2 延迟消息插件
      • 6.2.1 安装
      • 6.2.2 使用
      • 6.2.3 应用场景

  • 为什么要使用消息队列

image-20240903160417835

1 认识

1.1 MQ分类

  • 有Broker

    • 重Topic —— 在整个broker中,依据topic来进行消息中转。在重topic的MQ中必然需要topic —— kafka
    • 轻Topic —— topic只是一种中转模式 —— rabbitMQ
  • 无Broker

1.2 安装

# latest RabbitMQ 3.13
docker run \
	-e RABBITMQ_DEFAULT_USER=dusong \  #默认账号和密码均为:guest
	-e RABBITMQ_DEFAULT_PASS=123123 \
	-d \  #detached mode
	-v mq-plugins:/plugins \   #插件挂载
	--rm \
   	--name rabbitmq \
    -p 5672:5672 \    #消息通信端口
    -p 15672:15672 \  #管理界面端口
    rabbitmq:3.13-management

1.3 基本流程

image-20240904110326213

  • exchange只能转发消息,不能存储消息
  • 通过bind将queue绑定到exchange

2 Work模型

  • 多个消费者绑定到一个队列

  • 同一个消息只会被一个消费者处理

  • 通过设置prefetch来控制消费者预取的消息数量(不设置默认平均平均分配)

    image-20240904144344585

    err = ch.Qos(
      1,     // prefetch count
      0,     // prefetch size
      false, // global
    )
    

3 交换机

3.1 fanout

fanout类型的交换机会将消息转发给所有绑定到改交换机的队列

3.2 direct

image-20240904151234823

err = ch.ExchangeDeclare(
  "logs_direct", // name
  "direct",      // type
  true,          // durable
  false,         // auto-deleted
  false,         // internal
  false,         // no-wait
  nil,           // arguments
)
failOnError(err, "Failed to declare an exchange")

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

body := bodyFrom(os.Args)
err = ch.PublishWithContext(ctx,
  "logs_direct",         // exchange
  "log", // routing key
  false, // mandatory
  false, // immediate
  amqp.Publishing{
    ContentType: "text/plain",
    Body:        []byte(body),
})

3.3 topic

image-20240904151944421

4 Golang创建交换机/队列/Publish/Consume/Bind

  • 创建交换机

    err = ch.ExchangeDeclare(
            "logs_direct", // name
            "direct",      // type
            true,          // durable
            false,         // auto-deleted
            false,         // internal
            false,         // no-wait
            nil,           // arguments
    )
    
  • 创建队列

    q, err := ch.QueueDeclare(
        "hello", // name
        false,   // durable(是否持久化)
        false,   // delete when unused
        false,   // exclusive
        false,   // no-wait
        nil,     // arguments
    )
    
  • 绑定

    err = ch.QueueBind(
            q.Name,        // queue name
            "log",             // routing key
            "logs_direct", // exchange
            false,
            nil
    )
    
  • 发送

    body := "this is log"
    err = ch.PublishWithContext(ctx,
            "logs_direct",         // exchange
            "log", // routing key
            false,                 // mandatory
            false,                 // immediate
            amqp.Publishing{
                    ContentType: "text/plain",
                    Body:        []byte(body),
            })
    
  • 接收

    msgs, err := ch.Consume(
            q.Name, // queue
            "",     // consumer
            true,   // auto ack
            false,  // exclusive
            false,  // no local
            false,  // no wait
            nil,    // args
    )
    

5 可靠性

5.1 生产者可靠性

  • 生产者重连

  • 生产者确认(ack)

5.2 MQ可靠性

  • 交换机/队列持久化
  • 消息持久化

5.2.1 Lazy Queue

image-20240904172117264

image-20240904163818387

5.3 消费者可靠性

  • 消费者确认机制

    image-20240904172521990

5.4 业务幂等性

  • 消费者因为保证可靠性可能消费业务多次,因此需要保证业务幂等性
  1. 给消息加上uuid
  2. 在业务逻辑上做修改

5.4 Golang实现可靠性

在使用 RabbitMQ 的 Go 应用程序中,要确保消息的可靠性,通常需要从以下几个方面入手:

1. 确保消息生产者的可靠性

  • 消息确认(Publisher Confirms): 开启 RabbitMQ 的发布确认模式。通过调用 Channel.Confirm() 方法,让 RabbitMQ 服务器在成功接收并持久化消息后向生产者发送确认。这样可以确保生产者知道消息已被可靠接收。

    ch.Confirm(false) // 启用发布确认模式
    confirm := ch.NotifyPublish(make(chan amqp.Confirmation, 1))
    
    // 发布消息
    err = ch.Publish(exchange, routingKey, mandatory, immediate, msg)
    if err != nil {
        // 处理发布失败的情况
    }
    
    select {
    case confirmed := <-confirm:
        if confirmed.Ack {
            fmt.Println("消息已确认")
        } else {
            fmt.Println("消息未确认")
        }
    case <-time.After(time.Second * 5):
        fmt.Println("消息确认超时")
    }
    
  • 消息持久化(Message Durability): 将消息标记为持久化,以确保即使 RabbitMQ 服务器重启,消息也不会丢失。通过设置 DeliveryModeamqp.Persistent 来实现:

    msg := amqp.Publishing{
        DeliveryMode: amqp.Persistent,
        ContentType:  "text/plain",
        Body:         []byte("Hello, RabbitMQ!"),
    }
    

2. 确保消息队列的可靠性

  • 队列持久化(Queue Durability): 创建队列时,将其声明为持久化队列。这样即使 RabbitMQ 服务器重启,队列依然存在。

    _, err = ch.QueueDeclare(
        "my_queue",  // 队列名
        true,        // 是否持久化
        false,       // 是否自动删除
        false,       // 是否排他
        false,       // 是否阻塞
        nil,         // 其他参数
    )
    if err != nil {
        log.Fatalf("Failed to declare a queue: %s", err)
    }
    

3. 确保消息消费者的可靠性

  • 手动确认(Manual Acknowledgment): 消费者手动确认接收到的消息。这样只有在消息成功处理后,RabbitMQ 才会将其从队列中移除。如果消费者没有确认消息且发生故障,RabbitMQ 会将消息重新投递。

    msgs, err := ch.Consume(
        "my_queue", // 队列名
        "",         // 消费者标识
        false,      // 自动确认
        false,      // 是否排他
        false,      // 是否阻塞
        false,      // 是否在同一个连接上消费
        nil,        // 其他参数
    )
    if err != nil {
        log.Fatalf("Failed to register a consumer: %s", err)
    }
    
    for d := range msgs {
        // 处理消息
        fmt.Printf("Received a message: %s", d.Body)
    
        // 手动确认
        d.Ack(false)
    }
    
  • QoS(Quality of Service): 设置消费者的 QoS 参数,例如 prefetch_count,确保消费者不会一次处理太多消息,从而导致过载。

    err = ch.Qos(
        1,    // 每次处理一条消息
        0,    // 消息大小限制(不限制)
        false, // 是否应用于整个通道
    )
    if err != nil {
        log.Fatalf("Failed to set QoS: %s", err)
    }
    

4. 容错处理

  • 重试机制: 在生产者和消费者中实现重试机制,例如使用带有指数回退的重试逻辑,以应对 RabbitMQ 不可用或网络波动的情况。

  • 死信队列(DLX): 配置死信队列,将处理失败的消息路由到指定的死信队列,方便后续分析和处理。

通过这些措施,可以有效提高使用 RabbitMQ 时的消息可靠性。

6 延迟消息

6.1 死信交换机

image-20240905145924200

6.2 延迟消息插件

6.2.1 安装

  1. https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/v3.13.0/rabbitmq_delayed_message_exchange-3.13.0.ez

  2. 将插件放在该目录

    image-20240905153455222

  3. docker exec -it rabbitmq rabbitmq-plugins enable rabbitmq-delayed-message-exchange

6.2.2 使用

 // 3. 声明延迟交换机
    err = ch.ExchangeDeclare(
        "delay_exchange",               // 交换机名称
        "x-delayed-message",            // 交换机类型
        true,                           // 是否持久化
        false,                          // 是否自动删除
        false,                          // 是否内部使用
        false,                          // 是否等待
        amqp.Table{"x-delayed-type": "direct"}, // 交换机类型的设置
    )
    failOnError(err, "Failed to declare an exchange")

    // 4. 发送消息
    body := "Hello World with delay"
    err = ch.Publish(
        "delay_exchange", // 交换机名称
        "routing_key",    // 路由键
        false,            // 是否强制发送
        false,            // 是否立即发送
        amqp.Publishing{
            ContentType: "text/plain",
            Body:        []byte(body),
            Headers: amqp.Table{
                "x-delay": int32(5000), // 延迟时间,单位为毫秒 (5秒延迟)
            },
        })

6.2.3 应用场景

  • 消息内部维护一个计时器,延迟消息对CPU的消耗较高,适用于延迟时间较短的场景

image-20240905155732697
false, // 是否立即发送
amqp.Publishing{
ContentType: “text/plain”,
Body: []byte(body),
Headers: amqp.Table{
“x-delay”: int32(5000), // 延迟时间,单位为毫秒 (5秒延迟)
},
})


### 6.2.3 应用场景

- 消息内部维护一个计时器,延迟消息对CPU的消耗较高,适用于延迟时间较短的场景

[外链图片转存中...(img-eA0QMPnx-1725527666228)]

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2117635.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

视频监控系统选型:为何EasyCVR视频汇聚平台成为优选方案

随着科技的飞速发展&#xff0c;视频监控系统作为现代安防体系的核心组成部分&#xff0c;其重要性日益凸显。无论是智慧城市、智慧交通、智慧园区还是企业安防&#xff0c;高效、稳定、智能的视频监控解决方案都是不可或缺的。在众多视频监控平台中&#xff0c;EasyCVR视频汇聚…

《向量数据库指南》——如何评估 Embedding 模型

01. 简介 在此前发布的文章(https://zilliz.com/learn/sparse-and-dense-embeddings)中,我们探析了当前稠密 Embedding 模型的架构,并介绍了 sentence-transformers 库的一些基础用法。虽然通过 sentence-transformers 可以使用众多预训练模型,但这些模型几乎都采用了与…

【空气能热泵热水系统原理

高温直热循环系列&#xff1a;1、系统简图&#xff08;带电辅热&#xff09; 注&#xff1a;1)图适用于以一次加热式热泵热水机组为主机的热水系统。2&#xff09;系统所有机组的启、停都由生活热水箱中水位开关控制。机组以直热式动作的条件为&#xff1a;①当线控器设定水箱…

VM中创建CentOS 7及VM中如何修改DHCP的IP网段

一、创建虚拟机 1新建Centos虚拟机 2类型选择 3版本兼容性选择 4镜像选择 5安装系统选择 6虚拟机的创建路径&#xff08;选择C盘以外且不包含中文名称的路径&#xff09; 7硬件配置选择 网络类型的选择&#xff08;通常情况下选择NAT模式&#xff09; 8剩下的全部按推荐走&…

AcWing算法基础课-787归并排序-Java题解

大家好&#xff0c;我是何未来&#xff0c;本篇文章给大家讲解《AcWing算法基础课》787 题——归并排序。本文详细介绍了归并排序的算法思路&#xff0c;包括分解、合并和递归排序三个主要步骤。通过 Java 代码实现&#xff0c;展示了如何将数组递归分解至单个元素&#xff0c;…

揭秘!焦虑症与气血不足:是巧合还是内在关联?

在这个快节奏、高压力的时代&#xff0c;焦虑症仿佛成了现代人难以言说的“隐形伴侣”。失眠、心悸、易怒……这些症状让许多人苦不堪言。而另一边&#xff0c;中医理论中的“气血不足”也常常被视为身体虚弱、情绪不稳的根源。那么&#xff0c;焦虑症与气血不足之间&#xff0…

EMLOG程序单页友链和标签增加美化

单页友联效果图&#xff1a; 标签页面效果图&#xff1a; 源码介绍 EMLOG单页友情链接和TAG标签&#xff0c;友链单页文件代码main{width: 58%;是设置宽度 自己把设置成与您的网站宽度一样&#xff0c;如果自适应就填写100%&#xff0c;TAG文件不用修改 安装方法&#xff1a…

使用Selenium与WebDriver实现跨浏览器自动化数据抓取

背景/引言 在数据驱动的时代&#xff0c;网络爬虫成为了收集和分析海量数据的关键工具。为了应对不同浏览器环境下的兼容性问题&#xff0c;Selenium与WebDriver成为了开发者实现跨浏览器自动化数据抓取的首选工具。本文将深入探讨如何利用Selenium和WebDriver实现跨浏览器的数…

客户管理太难了?你可能忽视了这些常见问题

在客户管理中&#xff0c;你是不是常常感到力不从心&#xff1f;客户信息不准确、沟通不到位、客户流失毫无预警……这些问题不仅让管理者头疼&#xff0c;还严重影响企业的业绩增长。客户管理看似简单&#xff0c;但往往隐藏着很多不易察觉的细节问题。如果你觉得客户越来越难…

什么运动耳机好用?六大技巧助力选购优质产品

​开放式蓝牙耳机现在超流行&#xff0c;不仅年轻人爱用&#xff0c;连不少上了年纪的人也喜欢在公园里散步时戴上。这些耳机无论是听歌、学习、健身还是办公&#xff0c;都能派上用场。到了2024年&#xff0c;想要挑到一款既好用又好听的开放式蓝牙耳机&#xff0c;得好好比较…

Vue2+JS项目升级为Vue3+TS之jquery的maphilight引入项目

本人由于想提升自己的项目开发能力&#xff0c;所以将就项目的vue2JavaScriptwebpack的旧技术栈升级为vue3typescriptvite的技术栈&#xff0c;所以遇到很多坑&#xff0c;以下是maphilight的解决方法。 众所周知jquery是基于JavaScript进行开发&#xff0c;但是已有typescript…

LiveKit的agent介绍

概念 LiveKit核心概念&#xff1a; Room&#xff08;房间&#xff09;Participant&#xff08;参会人&#xff09;Track&#xff08;信息流追踪&#xff09; Agent 架构图 ​ 订阅信息流 ​ agent交互流程 客户端操作 加入房间 房间创建方式 手动 赋予用户创建房间的…

【原创】java+springboot+mysql校园疫情管理系统设计与实现

个人主页&#xff1a;程序猿小小杨 个人简介&#xff1a;从事开发多年&#xff0c;Java、Php、Python、前端开发均有涉猎 博客内容&#xff1a;Java项目实战、项目演示、技术分享 文末有作者名片&#xff0c;希望和大家一起共同进步&#xff0c;你只管努力&#xff0c;剩下的交…

【JAVA开源】基于Vue和SpringBoot的图书个性化推荐系统

本文项目编号 T 015 &#xff0c;文末自助获取源码 \color{red}{T015&#xff0c;文末自助获取源码} T015&#xff0c;文末自助获取源码 目录 一、系统介绍1.1 业务分析1.2 用例设计1.3 时序设计 二、演示录屏三、启动教程四、功能截图五、文案资料5.1 选题背景5.2 国内外研究…

【学习笔记】 陈强-机器学习-Python-Ch15 人工神经网络(1)sklearn

系列文章目录 监督学习&#xff1a;参数方法 【学习笔记】 陈强-机器学习-Python-Ch4 线性回归 【学习笔记】 陈强-机器学习-Python-Ch5 逻辑回归 【课后题练习】 陈强-机器学习-Python-Ch5 逻辑回归&#xff08;SAheart.csv&#xff09; 【学习笔记】 陈强-机器学习-Python-…

[AHK] 调用函数动态生成ListBox窗口

需求背景 动态生成向导对话框&#xff0c;由用户选一个选项&#xff0c;类似做选择题。 运行效果 AHK v1 代码 if(A_ScriptFullPathA_LineFile)MsgBox % ListBox("窗口标题", "这是一个生成listbox的Demo", "a|b|c|d|",3) return ;---------…

【小沐学OpenGL】Ubuntu环境下glew的安装和使用

文章目录 1、简介1.1 OpenGL简介1.2 glew简介 2、安装glew2.1 命令安装glew2.2 直接代码安装glew2.3 cmake代码安装glew 3、测试glew3.1 测试glewfreeglut3.2 测试glewglfw 结语 1、简介 1.1 OpenGL简介 Linux 系统中的 OpenGL 是一个跨语言、跨平台的应用程序编程接口&#…

智能的PHP开发工具PhpStorm v2024.2全新发布——支持日志文件

PhpStorm是一个轻量级且便捷的PHP IDE&#xff0c;其旨在提高用户效率&#xff0c;可深刻理解用户的编码&#xff0c;提供智能代码补全&#xff0c;快速导航以及即时错误检查。可随时帮助用户对其编码进行调整&#xff0c;运行单元测试或者提供可视化debug功能。 立即获取PhpS…

【私活儿分享】手串珠子管理小程序,便捷查询珠子(串手链的珠子)位置

前言 之间帮客户做了个查询手串珠子位置的小程序&#xff0c;便于帮助客户管理众多的珠子&#xff0c;这个珠子就是戴在手上串起来的饰品。好了&#xff0c;话不多说&#xff0c;进入正题&#xff01; 正文 小程序比较简单&#xff0c;采用云开发。两个页面&#xff0c;一个查…

Git 新手指南

Git 命令大全 Git 是目前最流行的分布式版本控制系统&#xff0c;用于跟踪文件的更改&#xff0c;协调不同开发者的协作。掌握 Git 命令能够极大提高工作效率&#xff0c;尤其在软件开发过程中。本文将详细介绍 Git 的一些常用命令&#xff0c;帮助你更好地理解和使用 Git。 1…