Go实现RabbitMQ消息模式

news2024/11/18 7:46:57

【目标】

  1. go实现RabbitMQ简单模式和work工作模式

  2. go实现RabbitMQ 消息持久化和手动应答

  3. go实现RabbitMQ 发布订阅模式

  4. go使用MQ实现评论后排行榜更新

1. go实现简单模式

编写路由实现生产消息

实现生产消息

MQ消息执行为命令行执行,所以创建命令行执行函数main,用来消费消息

创建mq/demo/main.go

浏览器中访问路由,执行生产者生产消息

打开http://localhost:15672/#/queues, 查看RabbitMQ客户端查看是否消息

执行消费者,实现消息消费

进入 mq/demo/中,执行bee run

2. go实现work工作模式

在启动另一个窗口,实现第二个消费者

生产消息

打开RabbitMQ客户端,查看消费者

查看work消费

两个work时,轮询执行消费

2.1 go实现RabbitMQ消息持久化和手动应答

消息持久化

消息的可靠性是RabbitMQ的一大特色,那么RabbitMQ是如何保证消息可靠性的呢——消息持久化。
为了保证RabbitMQ在退出或者crash等异常情况下数据没有丢失,需要将queue,exchange和Message都持久化

生产者实现消息持久化

第二个参数设置为true,即durable=true.

消费者实现消息持久化

在RabbitMQ服务重启或者服务宕机的情况下,也不会丢失消息。

可以将Queue与Message都设置为可持久化(durable),这样可以保证绝大部分情况下RabbitMQ消息不会丢失。

手动应答

RabbitMQ 消息应答机制

消费者处理一个任务是需要一段时间的,如果有一个消费者正在处理一个比较耗时的任务并且只处理了一部分,突然这个时候消费者宕机了,那么会出现什么情况呢?

如果是自动应答模式,消费者在处理任务的过程中宕机了,那么消息将会丢失,而手动应答则能够保证消息不会被丢失,所以在实际的应用当中绝大多数都采用手动应答

为了保证消息从队列可靠地达到消费者并且被消费者消费处理,RabbitMQ 提供了消息应答机制,RabbitMQ 有两种应答机制,自动应答和手动应答

1、自动应答

RabbitMQ 只要将消息分发给消费者就被认为消息传递成功,就会将内存中的消息删除,而不管消费者有没有处理完消息

2、手动应答

RabbitMQ 将消息分发给了消费者,并且只有当消费者处理完成了整个消息之后才会被认为消息传递成功了,然后才会将内存中的消息删除

消息应答:

消费者在接收到消息并且处理该消息之后,告诉 rabbitmq 它已经处理了,rabbitmq 可以把该消息删除了

手动应答优点:

可以批量应答并且减少网络拥堵

消费方法中设置手动应答

效果:

关闭自动应答

RabbitMQ中查看

开启手动应答后,才返回消息执行成功,保证了消息不会被丢失

3. go实现RabbitMQ 发布订阅模式

RabbitMq消息模式的核心思想是:

一个生产者并不会直接往一个队列中发送消息,事实上,生产者根本不知道它发送的消息将被转发到哪些队列。

  实际上,生产者只能把消息发送给一个exchange(交换机),exchange只做一件简单的事情:一方面它们接收从生产者发送过来的消息,另一方面,它们把接收到的消息推送给队列。一个exchage必须清楚地知道如何处理一条消息。

有四种类型的交换器,分别是:direct、topic、headers、fanous(广播模式)

广播模式交换器很简单,从字面意思也能理解,它其实就是把接收到的消息推送给所有它知道的队列。在我们的日志系统中正好需要这种模式。

go实现RabbitMQ 发布订阅模式 RabbitMQ tutorial - Publish/Subscribe | RabbitMQ

实现广播模式(发布订阅模式)demo

生产者向交换机中发送消息

和简单模式、work模式相比,多了创建交换机

消费者拉取交换机中消息实现消费

和简单模式、work模式相比,多了创建交换机、创建了临时队列、绑定临时队列

临时队列

  我们使用的队列都是有具体的队列名,创建命名队列是很必要的,因为我们需要将消费者指向同一名字的队列。因此,要想在生产者和消费者中间共享队列就必须要使用命名队列。

 demo中的日志系统也可以使用非命名队列(可以不手动命名),我们希望收到所有日志消息,而不是部分。并且我们希望总是接收到新的日志消息而不是旧的日志消息。为了解决这个问题,需要分两步走。

  首先,无论何时我们的消费者连接到RabbitMq,我们都需要一个新的、空的队列来接收日志消息,因此,消费者在连接上RabbitMq之后需要创建一个任意名字的队列,或者让RabbitMq生成任意的队列名字。

  其次,一旦该消费者断开了与RabbitMq的连接,队列也被自动删除。

  通过queueDeclare()来创建一个非持久化、专有的、自动删除的、名字随机生成的队列。

实现发布订阅模式:

创建消息路由

控制器中实现生产者消息推送到交换机

创建mq/fanout/main.go,实现消费者从交换机中获取消息实现消费

效果:

执行生产者,实现消息生产

打开RabbitMQ客户端,查看消息状态

执行消费者,实现消费

注:因为是发布订阅模式。所以我们启动两个消费者实现多个用户消费同一消息

消费者1

消费者2

当生产者生产消息时,所订阅的消费者会执行消费

消费者1

消费者2

4. go实现RabbitMQ 路由模式


一个通过路由把One的消息取出来,另一个通过路由把two的消息取出来,一个队列打印奇数,一个队列打印偶数

生产者代码

消费者代码奇数代码


消费者代码偶数代码

运行效果

5. go实现RabbitMQ 主题模式


生产者代码

// topic主题push
// @router /mq/topic/push [*]
func (this *MqDemoController) GetTopic() {
   //创建线程执行(发送自增的数字到队列中)
   go func() {
      count := 0
      for {
         if count%2 == 0 {
            //strconv.Itoa 把count转化为字符串
            mq.PublishEx("wsyb.demo.topic", "topic", "wsyb.video", "wsyb.video"+strconv.Itoa(count))
         } else {
            mq.PublishEx("wsyb.demo.topic", "topic", "user.wsyb", "user.wsyb"+strconv.Itoa(count))
         }
         count++
         time.Sleep(1 * time.Second)

      }
   }()

   this.Ctx.WriteString("topic")
}

// topic主题push
// @router /mq/topictwo/push [*]
func (this *MqDemoController) GetTopicTwo() {
   //创建线程执行(发送自增的数字到队列中)
   go func() {
      count := 0
      for {
         if count%2 == 0 {
            //strconv.Itoa 把count转化为字符串
            mq.PublishEx("wsyb.demo.topic", "topic", "a.frog.name", "a.frog.name"+strconv.Itoa(count))
         } else {
            mq.PublishEx("wsyb.demo.topic", "topic", "b.frog.uid", "b.frog.uid"+strconv.Itoa(count))
         }
         count++
         time.Sleep(1 * time.Second)

      }
   }()

   this.Ctx.WriteString("topic")
}

消费所有主题代码(#)

// 包名必须是main否则消费不成功
package main

import (
   "fmt"
   "wsybapi/services/mq"
)

func main() {
   //执行消费  # 代表获取所有的数据
   mq.ConsumerEx("wsyb.demo.topic", "topic", "#", callback)
}

// 回调函数
func callback(s string) {
   //打印消费结果
   fmt.Printf("topic all msg is :%s\n", s)
}

匹配多个规则进行消费

// 包名必须是main否则消费不成功
package main

import (
   "fmt"
   "wsybapi/services/mq"
)

func main() {
   //执行消费 * 匹配一个或者多个符合规则的数据
   mq.ConsumerEx("wsyb.demo.topic", "topic", "*.frog.*", callback)
}

// 回调函数
func callback(s string) {
   //打印消费结果
   fmt.Printf("topic frog msg is :%s\n", s)
}

匹配一个规则进行消费

// 包名必须是main否则消费不成功
package main

import (
   "fmt"
   "wsybapi/services/mq"
)

func main() {
   //执行消费
   mq.ConsumerEx("wsyb.demo.topic", "topic", "wsyb.*", callback)
}

// 回调函数
func callback(s string) {
   //打印消费结果
   fmt.Printf("tpic wsyb msg is :%s\n", s)
}

运行结果

6. rabbitmq死信队列

6.1应用场景:
  1. 发送消息规定10分钟以后发送给用户
  2. 规定消息每天固定的时间发送
    3.下了订单没有支付,30分钟以后就会取消订单
    4.订单相关的,下单以后会定时收到会系统的提示消息
6.2什么是死信队列呢:

死信队列产生的条件,不仅是ttl时间过期了,还有消息被拒绝,队列达到最大长度,都会产生死信,相信大家已经明白了

7. go使用MQ实现评论后排行榜更新

修改逻辑,新增评论时更新redis排行榜的数据

发布评论

打开MQ客户端,查看队列状态

创建mq/top/main.go,连接数据库

在消费回调函数中,编写消费者逻辑实现排行榜更新

执行消费者

效果:

先评论内容

打开redis可视化界面,查看排行榜评论数

再次评论

打开redis可视化界面,查看排行榜评论数是否实现更新

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2179008.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

react-native-Windows配置

一:官网: React Native for Windows macOS Build native Windows & macOS apps with Javascript and React 二:安装依赖 需要以管理员身份运行powershell,然后粘贴下面代码,注意:要安装淘宝镜像,要…

JAVA线程基础二——锁的概述之乐观锁与悲观锁

乐观锁与悲观锁 乐观锁和悲观锁是在数据库中引入的名词,但是在并发包锁里面也引入了类似的思想,所以这里还是有必要讲解下。 悲观锁指对数据被外界修改持保守态度,认为数据很容易就会被其他线程修改,所以在数据被处理前先对数据进行加锁&…

[Redis][典型运用][分布式锁]详细讲解

目录 0.什么是分布式锁1.分布式锁的基础实现2.引入过期时间3.引入校验ID4.引入Lua5.引入Watch Dog(看门狗)6.引入Redlock算法7.其他功能 0.什么是分布式锁 在⼀个分布式的系统中,也会涉及到多个节点访问同⼀个公共资源的情况,此时就需要通过锁来做互斥控…

一拖二快充线:单接与双接的多场景应用

在当代社会,随着智能手机等电子设备的普及,充电问题成为了人们关注的焦点。一拖二快充线作为一种创新的充电解决方案,因其便捷性与高效性而受到广泛关注。本文将深入探讨一拖二快充线的定义、原理以及在单接与双接手机场景下的应用&#xff0…

数字图像处理:空间域滤波

1.数字图像处理:空间域滤波 1.1 滤波器核(相关核)与卷积 图像上的邻域计算 线性空间滤波的原理 滤波器核(相关核)是如何得到的? 空间域的卷积 卷积:滤波器核与window中的对应值相乘后所有…

touch命令:创建文件,更新时间戳

一、命令简介 ​touch​ 命令在 Linux 和其他类 Unix 系统中用于创建空白文件或者更新已存在文件的时间戳。如果指定的文件不存在,touch​ 命令会创建一个空白文件;如果文件已经存在,touch​ 命令会更新文件的访问时间和修改时间&#xff0c…

誉天Linux云计算课程学什么?为什么保障就业?

一个IT工程师相当于干了哪些职业? 其中置顶回答生动而形象地描绘道: 一个IT工程师宛如一个超级多面手,相当于——加班狂程序员测试工程师实施工程师网络工程师电工装卸工搬运工超人。 此中酸甜苦辣咸,相信很多小伙伴们都深有体会。除了典…

用开源软件制作出精美的短视频#视频编辑

从前,有一个叫做创意森林的地方,住着各种各样的编辑精灵。一天,视频编辑精灵们发现了一本神秘的论文,里面写满了如何利用前沿的AI技术来提升他们的工作效率。于是,精灵们开始学习使用LLM和LLaVA,像魔法一样…

《企业实战分享 · 开发技术栈选型》

📢 大家好,我是 【战神刘玉栋】,有10多年的研发经验,致力于前后端技术栈的知识沉淀和传播。 💗 🌻 CSDN入驻不久,希望大家多多支持,后续会继续提升文章质量,绝不滥竽充数…

02Cesium中常用的鼠标事件

文章目录 02Cesium中常用的鼠标事件1、左键单击事件2、左键双击事件3、左键按下事件4、左键弹起事件5、中键按下事件6、中键弹起事件7、鼠标移动事件8、右键单击事件9、右键按下事件10、右键弹起事件11、鼠标滚轮事件具体在代码中的应用如下所示 02Cesium中常用的鼠标事件 Ces…

windows下安装rabbitMQ并开通管理界面和允许远程访问

如题,在windows下安装一个rabbitMQ server;然后用浏览器访问其管理界面;由于rabbitMQ的默认账号guest默认只能本机访问,因此需要设置允许其他机器远程访问。这跟mysql的思路很像,默认只能本地访问,要远程访…

《深度学习》OpenCV 图像拼接 拼接原理、参数解析、案例实现

目录 一、图像拼接 1、直接看案例 图1与图2展示: 合并完结果: 2、什么是图像拼接 3、图像拼接步骤 1)加载图像 2)特征点检测与描述 3)特征点匹配 4)图像配准 5)图像变换和拼接 6&am…

鸿蒙harmonyos next flutter通信之BasicMessageChannel获取app版本号

本文将通过BasicMessageChannel获取app版本号,以此来演练BasicMessageChannel用法。 建立channel flutter代码: //建立通道 BasicMessageChannel basicMessageChannel BasicMessageChannel("com.xmg.basicMessageChannel",StringCodec());…

系统工程 > 霍尔三维结构

简介 霍尔三维结构模型是由美国系统工程专家霍尔(A.D.Hall)在1969年提出的一种系统工程方法论,它集中体现了系统工程方法的系统化、综合化、最优化、程序化和标准化等特点 。该模型将系统工程整个活动过程分为前后紧密衔接的七个阶段和七个步…

MySQL的驱动安装

1、下载并安装MySQL 下载地址: 建议在下列框中选择LTS长期支持版本,下载对应的MSI安装文件。 安装完成后,将MySQL的环境bin路径添加到环境变量中。 可以运行MySQL Configurator进行配置,主要设置密码,并初始化。其余…

机器学习课程学习周报十四

机器学习课程学习周报十四 文章目录 机器学习课程学习周报十四摘要Abstract一、机器学习部分1. EM算法与高斯混合模型2. 概率论复习(三) 总结 摘要 本周的学习重点是EM算法与高斯混合模型的应用。单高斯模型无法有效拟合多峰数据分布,因此引…

论文精读:拓扑超导体PdBi2Te4和PdBi2Te5计算

npj Computational Materials (2023) 9:188 ; https://doi.org/10.1038/s41524-023-01144-y 摘要节选 超导拓扑金属(SCTMs)近年来成为一种很有前途的量子计算拓扑超导(TSC)和马约拉纳零模式平台。 本文提出了一种通过将超导单元嵌入到拓扑绝缘体中来设计sctm的策略。还编制了…

二叉树的中序遍历(java)

概述 关于二叉树,我们都不陌生,许多基于递归的问题发起点都是一个二叉树的root节点。对于各种二叉树的问题,我们也是通过dfs进行求解。例如求二叉树的深度、最近公共祖先等 算法分析 关于二叉树的中序遍历,我们都知道应该先访…

无人机之集群路径规划篇

无人机的集群路径规划是一个复杂而重要的任务,它要求为一群无人机设计出既安全又高效的飞行路径,同时考虑到多种约束条件和目标。 一、路径规划的重要性 无人机集群路径规划对于确保无人机能够安全、高效地完成任务至关重要。通过合理的路径规划&#x…

Word办公自动化的一些方法

1.Word部分内容介绍 word本身是带有格式的一种文档,有人说它本质是XML,所以一定要充分利用标记了【样式】的特性来迅速调整【格式】,从而专心编辑文档内容本身。 样式(集) 编号(多级关联样式编号&#xff…