RabbitMQ快速上手(延迟队列)

news2024/11/24 0:10:10

安装

官网

参考文章:
​
https://blog.csdn.net/miaoye520/article/details/123207661
​
https://blog.csdn.net/lvoelife/article/details/126658695

安装Erlang,并添加环境变量ERLANG_HOME,命令行运行erl

安装rabbitmq,rabbitmq-server-3.12.0.exe

注意Erlang要选择对应的版本

安装RabbitMQ-Plugins插件,rabbitmq-plugins enable rabbitmq_management

访问 http://localhost:15672/

账号密码 guest,guest

使用

中文文档

参考文章:
https://blog.csdn.net/weixin_45698935/article/details/123481137
https://www.liwenzhou.com/posts/Go/rabbitmq-1/

Go实践:

go get github.com/streadway/amqp 

基本使用:

生产者

package main
​
import (
    "github.com/streadway/amqp"
    "log"
)
​
type App struct {
    Name string
    Num int
}
​
type Root struct {
    Apps []*App
}
​
func main() {
    // 1.尝试连接RabbitMQ,建立连接
    // 该连接抽象了套接字连接,并为我们处理协议版本协商和认证等。
    conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
    if err != nil {
        log.Fatalf(err.Error())
    }
    defer conn.Close()
​
    // 2.创建一个通道, 大多数API都是该通道操作的
    ch, err := conn.Channel()
    defer ch.Close()
​
    // 3.声明消息要发送的队列
    q, err := ch.QueueDeclare("hello", false, false, false, false, nil)
    if err != nil {
        log.Fatalf(err.Error())
    }
​
    body := "hello world12"
    err = ch.Publish("", q.Name, false, false, amqp.Publishing{
        ContentType: "text/plain",
        Body: []byte(body),
    })
    if err != nil {
        log.Fatalf(err.Error())
    }
​
    return
}

消费者

package main
​
import (
    "github.com/streadway/amqp"
    "log"
)
​
func main() {
    // 1.尝试连接RabbitMQ,建立连接
    // 该连接抽象了套接字连接,并为我们处理协议版本协商和认证等。
    conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
    if err != nil {
        log.Fatalf(err.Error())
    }
    defer conn.Close()
​
    // 2.创建一个通道, 大多数API都是该通道操作的
    ch, err := conn.Channel()
    defer ch.Close()
​
    // 3.声明消息要发送的队列
    q, err := ch.QueueDeclare("hello", false, false, false, false, nil)
    if err != nil {
        log.Fatalf(err.Error())
    }
​
    // 获取接收消息的Delivery通道
    msgs, err := ch.Consume(
        q.Name, // queue
        "",     // consumer
        true,   // auto-ack
        false,  // exclusive
        false,  // no-local
        false,  // no-wait
        nil,    // args
    )
    if err != nil {
        log.Println(err.Error(), "Failed to register a consumer")
    }
​
    forever := make(chan bool)
​
    go func() {
        for d := range msgs {
            log.Printf("Received a message: %s", d.Body)
        }
    }()
​
    log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
    <-forever
}

延迟队列

不应该是队列,而应该是堆。将先过期的消息排在前面

需要安装插件:有插件的支持 Community Plugins — RabbitMQ rabbitmq_delayed_message_exchange

首先要引入一个概念:死信队列,当我们的发送的消息被接收端nck或reject,消息在队列的存活时间超过设定的 TTL,消息数量超过最大队列长度,这样的消息会被认为是死信(“dead letter”)通过配置的死信交换机这样的死信可以被投递到对应的死信队列中

发送者:

发送者的实现就很简单了,就和普通的发送实现几乎一致,因为反正就是投递到对应的队列中就可以了,只需要将发送消息的部分,在消息的 header 中加入 x-delay 字段表示当前消息的 TTL 就可以了,也就是设定延迟时间,注意单位为毫秒

package main
​
import (
    "log"
    "os"
    "strings"
​
    "github.com/streadway/amqp"
)
​
func main() {
    failOnError := func (err error, msg string) {
        if err != nil {
            log.Fatalf("%s: %s", msg, err)
        }
    }
​
    conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
    failOnError(err, "Failed to connect to RabbitMQ")
    defer conn.Close()
​
    ch, err := conn.Channel()
    failOnError(err, "Failed to open a channel")
    defer ch.Close()
​
    body := bodyFrom(os.Args)
    // 将消息发送到延时队列上
    err = ch.Publish(
        "",                 // exchange 这里为空则不选择 exchange
        "test_delay",       // routing key
        false,              // mandatory
        false,              // immediate
        amqp.Publishing{
            ContentType: "text/plain",
            Body:        []byte(body),
            Expiration: "10000",    // 设置五秒的过期时间
        })
    failOnError(err, "Failed to publish a message")
​
    log.Printf(" [x] Sent %s", body)
}
​
func bodyFrom(args []string) string {
    var s string
    if (len(args) < 2) || os.Args[1] == "" {
        s = "hello3"
    } else {
        s = strings.Join(args[1:], " ")
    }
    return s
}

接收者

package main
​
import (
    "log"
​
    "github.com/streadway/amqp"
)
​
func main() {
​
    failOnError := func (err error, msg string) {
        if err != nil {
            log.Fatalf("%s: %s", msg, err)
        }
    }
​
    // 建立链接
    conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
    failOnError(err, "Failed to connect to RabbitMQ")
    defer conn.Close()
​
    ch, err := conn.Channel()
    failOnError(err, "Failed to open a channel")
    defer ch.Close()
​
    // 声明一个主要使用的 exchange
    err = ch.ExchangeDeclare(
        "logs",   // name
        "fanout", // type
        true,     // durable
        false,    // auto-deleted
        false,    // internal
        false,    // no-wait
        nil,      // arguments
    )
    failOnError(err, "Failed to declare an exchange")
​
    // 声明一个常规的队列, 其实这个也没必要声明,因为 exchange 会默认绑定一个队列
    q, err := ch.QueueDeclare(
        "test_logs",    // name
        false, // durable
        false, // delete when unused
        true,  // exclusive
        false, // no-wait
        nil,   // arguments
    )
    failOnError(err, "Failed to declare a queue")
​
    /**
     * 注意,这里是重点!!!!!
     * 声明一个延时队列, ß我们的延时消息就是要发送到这里
     */
    _, errDelay := ch.QueueDeclare(
        "test_delay",    // name
        false, // durable
        false, // delete when unused
        true,  // exclusive
        false, // no-wait
        amqp.Table{
            // 当消息过期时把消息发送到 logs 这个 exchange
            "x-dead-letter-exchange":"logs",
        },   // arguments
    )
    failOnError(errDelay, "Failed to declare a delay_queue")
​
    err = ch.QueueBind(
        q.Name, // queue name, 这里指的是 test_logs
        "",     // routing key
        "logs", // exchange
        false,
        nil)
    failOnError(err, "Failed to bind a queue")
​
    // 这里监听的是 test_logs
    msgs, err := ch.Consume(
        q.Name, // queue name, 这里指的是 test_logs
        "",     // consumer
        true,   // auto-ack
        false,  // exclusive
        false,  // no-local
        false,  // no-wait
        nil,    // args
    )
    failOnError(err, "Failed to register a consumer")
​
    forever := make(chan bool)
​
    go func() {
        for d := range msgs {
            log.Printf(" [x] %s", d.Body)
        }
    }()
​
    log.Printf(" [*] Waiting for logs. To exit press CTRL+C")
    <-forever
}

参考:golang 使用 rabbitmq 延迟队列-腾讯云开发者社区-腾讯云

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/702150.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Pastebin设计之旅:从零设计网络文本存储系统

项目简介&#xff1a;Pastebin是一个在线的文本存储平台&#xff0c;让用户可以存储和分享代码片段或者其他类型的文本。它支持多种编程和标记语言的语法高亮&#xff0c;用户可以选择让他们的"paste"公开或私有。无需注册就可以使用&#xff0c;但注册用户可以更方便…

森海塞尔重磅推出TC Bars智能音视频一体机, 为中小型协作空间缔造理想解决方案

森海塞尔重磅推出TC Bars智能音视频一体机&#xff0c; 为中小型协作空间缔造理想解决方案 全球音频行业先驱森海塞尔重磅推出首款内置摄像头的可扩展一体化会议设备 德国韦德马克&#xff0c;2023年6月13日——森海塞尔作为先进音频技术的首选&#xff0c;致力于使协作与学习…

力扣 617. 合并二叉树

题目来源&#xff1a; C题解1&#xff1a;使用队列实现层序遍历。基于root1&#xff0c;遇到可覆盖部分&#xff0c;直接将该节点指向对应节点&#xff0c;遇到重复部分&#xff0c;则修改root1该节点相应的值。 /*** Definition for a binary tree node.* struct TreeNode {*…

超市零售数据可视化分析(Plotly 指南)

CSDN 上不能插入 HTML&#xff0c;可以在 GitHub Page 上查看&#xff1a; https://paradiseeee.github.io/2022/07/30/超市零售数据可视化分析/ 项目首次发布于 Kesci 上 – 超市零售数据分析。感兴趣的可以直接上去 Fork 之后自己做。由于上面只能用 Jupyter Notebook&#x…

多旋翼无人机试验系统设计与实现

摘 要 世界的航空业的大门被20世纪莱特兄弟制造的“飞行者一号”开启&#xff0c;直至今日处于飞速发展的阶段。随着时代的进步&#xff0c;各种微电子、微传感、通信技术的飞速发展&#xff0c;让无人机在时代内成为一种新型的空中力量。除了军用方面的多种用途&#xff0c;无…

《Linux操作系统编程》第九章 数据查找和筛选工具 : 了解流编辑器sed和报表生成器awk的简单使用

&#x1f337;&#x1f341; 博主 libin9iOak带您 Go to New World.✨&#x1f341; &#x1f984; 个人主页——libin9iOak的博客&#x1f390; &#x1f433; 《面试题大全》 文章图文并茂&#x1f995;生动形象&#x1f996;简单易学&#xff01;欢迎大家来踩踩~&#x1f33…

uni-app滚动分页 兼容(App 小程序 H5)

因为手机端本身屏幕空间不大 所以大家一般都会选择用滚动分页 首先 我在根目录下创建了一个 api目录 下面创建了一个bookApi.js 其中写了一个请求函数 getBookList 根据当前页 page 和 每页展示多少条 pageSize 获取数据 那么 我的组件代码是这样的 <template><scro…

MacBook Pro Apple M2 Max安装MySQL-8.0.33

文章目录 下载安装程序安装数据库配环境变量查看数据库状态 系统&#xff1a;macOS Ventura 芯片&#xff1a;M2 数据库&#xff1a;MySQL 8.0.33 下载安装程序 官网地址&#xff1a;https://www.mysql.com/ 点击页签“DOWLOADS“后将页面拖到底部&#xff0c;点MySQL Commu…

U盘的文件系统为FAT32才可以同时在苹果电脑和windows电脑中正常使用

文章目录 1.驱动器F中的磁盘未被格式化。想现在格式化吗&#xff1f;2.U盘插到苹果电脑上后无法写入 1.驱动器F中的磁盘未被格式化。想现在格式化吗&#xff1f; 我之前U盘的文件系统为exFAT&#xff0c;插入Windows Server 2003系统的电脑中&#xff0c;打开时弹出上面的提示框…

2023上半年软考系统分析师科目一整理-14

2023上半年软考系统分析师科目一整理-14 计算机系统性能评估中&#xff0c;( A )通常采用加法指令的运算速度来衡量计算机的速度。(D )首先计算出处理部件每个计算单元的有效计算率&#xff0c;再按不同字长加以调整&#xff0c;得出该计算单元的理论性能&#xff0c;所有组成该…

JavaFX学习:Observable Collections(观察集合)

JavaFX中的观察集合&#xff08;Observable Collections&#xff09;继承自Java的集合&#xff08;Collections&#xff09;。Java集合提供了List、Map、Set三种集合接口。JavaFX在Java集合基础上派生出可以监听集合内容变化的三种集合接口。接口如下&#xff1a; ObservableL…

【算法与数据结构】剑指 Offer 05、LeetCode替换空格

文章目录 一、题目二、双指针法三、完整代码 所有的LeetCode题解索引&#xff0c;可以看这篇文章——【算法和数据结构】LeetCode题解。 一、题目 二、双指针法 思路分析&#xff1a;这道题使用双指针法就能不用额外的辅助空间。首先计算字符串中的空格数量&#xff0c;然后重设…

函数模板和类模板 知识点总结 C++程序设计与算法笔记总结(七) 北京大学 郭炜

函数模板 交换两个整型变量的值的Swap函数&#xff1a; void Swap(int & x,int & y) { int tmp x; x y; y tmp; } 交换两个double型变量的值的Swap函数: void Swap(double & x,double & y) { double tmp x; x y; y tmp; }用函数模板解决&#xff1a; …

MAYA动力学曲线带动骨骼

例子 2 自由下落了 对比测试 尖端 太麻烦&#xff0c;使用风 nucleus1.windDirectionZ10*sin(time) 把球合成一个 删除一个解算器&#xff0c;就不动了

Redis 性能管理/优化 双一致性问题 缓存雪崩/击穿/穿透

---------------------- Redis 性能管理 ---------------------------------------- ----- 查看Redis内存使用 ----- info memoryredis-cli -a abc123 info memory ----- 内存碎片率 ----- used_memory_rss&#xff1a;是Redis向操作系统申请的内存。used_memory&#xff1a;是…

从有序顺序表中删除所有其值重复的元素(用不同的负数代替),使所有元素的值均不同。

题目要求&#xff1a;从有序顺序表中删除所有其值重复的元素&#xff08;用不同的负数代替&#xff09;&#xff0c;使所有元素的值均不同。 0&#xff1a;有序顺序表 1&#xff1a;删除所有其值重复的元素 2&#xff1a;用不同的负数代替 3&#xff1a;顺序表中所有元素的值均…

Maven中依赖使用范围

IDEA中help中show Log in Explorer可以查看idea日志 依赖使用范围 构建包含的流程&#xff1a;编译 &#xff0c;测试 &#xff0c;运行 &#xff0c;打包 &#xff0c;安装 &#xff0c;部署 comile test package install deploy 使用标签 1&#xff1a;compile 缺省值 伴随者…

OpenStack(T版)——块存储(Cinder)服务介绍与安装

文章目录 OpenStack(T版)——块存储(Cinder)服务介绍与安装安装和配置(controller)准备(1)创建数据库(2)加载admin user的环境变量(3)创建Identity服务凭据(4)创建Cinder 块存储服务组件的API endpoint 安装和配置Cinder块存储服务组件(1)安装软件包(2)编辑/etc/cinder/cinder.…

第七章:使用FileZilla搭建FTP服务器详解

目录 一、软件下载 二、服务器安装与配置 三、使用客户端 一、软件下载 到官方网站下载 FileZilla 的服务端和客户端程序 &#xff1a; FileZilla - The free FTP solution 二、服务器安装与配置 1 安装 安装的过程非常简单&#xff0c;直接下一步就可以了&#xff0c;需要…

【热部署】springboot-devtools

目录 pom idea配置 1 2 2021往后的idea版本 之前的idea版本 3 说明 注意 pom <!-- <!&ndash; 热部署 &ndash;>--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-devt…