RabbitMQ-默认读、写方式介绍

news2024/10/6 12:21:07

1、RabbitMQ简介

rabbitmq是一个开源的消息中间件,主要有以下用途,分别是:

  1. 应用解耦:通过使用RabbitMQ,不同的应用程序之间可以通过消息进行通信,从而降低应用程序之间的直接依赖性,提高系统的可维护性、扩展性和容错性。
  2. 异步提速:通过将耗时的操作转化为异步执行,可以提高系统的响应速度和吞吐量,提升用户体验。
  3. 削峰填谷:在高峰时段,RabbitMQ可以缓存大量的消息,从而避免系统崩溃,并在低峰时段处理这些消息,提高系统的稳定性。
  4. 消息分发:RabbitMQ可以将消息分发到多个消费者进行处理,从而提高系统的灵活性和处理能力。

了解rabbitmq的设计架构,对理解mq如何使用有很大的帮助。

一个非常重要的点,mq中的生产者从来不是直接将消息发送到队列中的,而是将消息发送到了mq的交换机中(上图中的exchange为交换机), 甚至生产者都不知道这条消息将被发送到哪个队列中。

交换机是个怎样的设计呢,他的一侧连接生产者,从生产者接收消息,另外一侧连接队列,将消息push进队列中,将消息push进一个队列,还是多个队列,还是抛弃,这些策略是由交换机的类型决定的,对于交换机的使用,后面详细介绍。

2、RabbitMQ安装

rabiitmq的安装,最简单的一种方式为运行mq的docker镜像,一行命令搞定:

# latest RabbitMQ 3.13
docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.13-management

执行命令后,可以看到如下打印,则代表RabbitMQ启动成功:

镜像启动成功后,可以通过ip:15672打开mq控制台:

http://xxx.xx.xxx.xx:15672/#/

 

mq安装完成后,下面就可以进行实践啦。

3、默认模式读、写mq

 rabbitmq官方的库:github.com/rabbitmq/amqp091-go

生产者侧代码:

package main

import (
	"context"
	"fmt"
	"time"

	amqp "github.com/rabbitmq/amqp091-go"
)

func Send(msg string) error {
	// 连接rabbitmq
	conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
	if err != nil {
		fmt.Println("connect error:", err)
		return err
	}
	defer conn.Close()
	// 创建通道
	ch, err := conn.Channel()
	if err != nil {
		fmt.Println("channel error:", err)
		return err
	}
	defer ch.Close()
	// 创建队列,使用默认的交换机
	q, err := ch.QueueDeclare(
		"lp_default", // name
		true,         // durable
		false,        // delete when unused
		false,        // exclusive
		false,        // noWait
		nil,          // arguments
	)
	if err != nil {
		fmt.Println("queue declare error:", err)
		return err
	}
	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
	defer cancel()
	fmt.Println(q.Name)
	// body := "Hello World!"
	err = ch.PublishWithContext(ctx,
		"",     // exchange,默认交换机
		q.Name, // routing key
		false,  // mandatory
		false,  // immediate
		amqp.Publishing{
			ContentType: "text/plain",
			Body:        []byte(msg),
		})
	if err != nil {
		fmt.Println("publish error:", err)
		return err
	}
	return nil
}

func main() {
	Send("Hello world")
}

 运行上面代码后,可以在rabbitmq的客户端 看到这个队列:

 点击队列,进入队列详情:

第一个框中,显式了队列详情,可以看出,这个队列绑定的是默认的交换机。

第二个框,点击后,可以看到队列中的消息详情。

消费者:

package main

import (
	"fmt"

	amqp "github.com/rabbitmq/amqp091-go"
)

func main() {
	conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
	if err != nil {
		fmt.Println("connect error:", err)
		return
	}
	defer conn.Close()

	ch, err := conn.Channel()
	if err != nil {
		fmt.Println("Channel error:", err)
		return
	}
	defer ch.Close()

	q, err := ch.QueueDeclare(
		"lp_default", // name
		true,         // durable
		false,        // delete when unused
		false,        // exclusive
		false,        // no-wait
		nil,          // arguments
	)
	if err != nil {
		fmt.Println("Queue Declare error:", err)
		return
	}

	msgs, err := ch.Consume(
		q.Name, // queue
		"",     // consumer
		true,   // auto-ack
		false,  // exclusive
		false,  // no-local
		false,  // no-wait
		nil,    // args
	)
	if err != nil {
		fmt.Println("Consume error:", err)
		return
	}

	var forever chan struct{}

	go func() {
		for d := range msgs {
			fmt.Printf("Received a message: %s\n", d.Body)
		}
	}()

	fmt.Printf(" [*] Waiting for messages. To exit press CTRL+C")
	<-forever
}

代码运行记录:

liupeng@192 default % go run recive.go
 [*] Waiting for messages. To exit press CTRL+CReceived a message: Hello world
Received a message: Hello world
Received a message: Hello world
Received a message: Hello world
Received a message: Hello world
Received a message: Hello world
Received a message: Hello world
Received a message: Hello world
Received a message: Hello world
Received a message: Hello world
Received a message: Hello world
Received a message: Hello world

在以上消费端代码中,如果代码在处理消息的过程中出现异常导致了程序退出,这样正在处理的这条消息就会丢失,为了避免这种情况的发生,rabbitmq设计了消息应答的机制,我们修改上面程序,将auto-ack参数设置为false,当处理完消息后,使用d.Ack(false)发送消息应答。

	msgs, err := ch.Consume(
		q.Name, // queue
		"",     // consumer
		false,   // auto-ack,设置为false,取消自动应答
		false,  // exclusive
		false,  // no-local
		false,  // no-wait
		nil,    // args
	)
	if err != nil {
		fmt.Println("Consume error:", err)
		return
	}

	var forever chan struct{}

	go func() {
		for d := range msgs {
			fmt.Printf("Received a message: %s\n", d.Body)
			d.Ack(false)  // 手动应答
		}
	}()

	fmt.Printf(" [*] Waiting for messages. To exit press CTRL+C")
	<-forever

如果忘记了进行消息应答,消息会被重新发入调度队列,这样就会吃掉越来越多的内存。

但是,当rabbitmq的服务down掉后,队列中的消息仍然会丢失,为了保证在这种情况下,消息仍然能够不丢失,我们需要做两件事:队列不丢失+消息不丢失,代码如下:

队列持久化:

q, err := ch.QueueDeclare(
  "hello",      // name
  true,         // durable,设置队列持久化
  false,        // delete when unused
  false,        // exclusive
  false,        // no-wait
  nil,          // arguments
)
failOnError(err, "Failed to declare a queue")

消息持久化:

将DeliveryMode设置为amqp.Persistent

err = ch.PublishWithContext(ctx,
			"",     // exchange,默认交换机
			q.Name, // routing key
			false,  // mandatory
			false,  // immediate
			amqp.Publishing{
				ContentType:  "text/plain",
				Body:         []byte(msg),
				DeliveryMode: amqp.Persistent,
			})

以上就是默认读写rabbitmq的方法,后面再介绍其他几种使用方式。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1698069.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

有什么普通人可以做的赚钱软件?盘点9个适合普通人长期做的软件

在这个互联网高速发展的时代&#xff0c;智能手机已经成为我们生活中不可分割的一部分。众多APP的涌现&#xff0c;使得许多朋友都在寻求通过手机赚钱的方法。 然而&#xff0c;面对市面上琳琅满目的网上赚钱APP&#xff0c;我们该如何挑选呢&#xff1f;别担心&#xff0c;今…

python web自动化(验证码处理)

1.解决验证码问题的常⻅⼏种⽅式 1&#xff09; Debug模式启动浏览器&#xff08;浏览器复⽤&#xff09;&#xff1a; 原理&#xff1a;浏览器是有缓存记录的&#xff0c;只需要 沿⽤已经保存有登录记录的浏览器 进⾏后续的操作就⾏ 2&#xff09;识别法&#xff1a; 原理…

pycharm中,出现SyntaxError: Non-ASCII character ‘\xe4‘ in file... 的问题以及解决方法

文章目录 一、问题描述二、解决方法 一、问题描述 在pycharm中&#xff0c;使用python中编写中文字符时&#xff0c;会提示如下错误信息&#xff1a; SyntaxError: Non-ASCII character \xe4 in file ...... on line 8, but no encoding declared; see http://python.org/dev…

网上比较受认可的赚钱软件有哪些?众多兼职选择中总有一个适合你

在这个互联网高速发展的时代&#xff0c;网上赚钱似乎成了一种潮流。但是&#xff0c;你是否还在靠运气寻找赚钱的机会&#xff1f;是否还在为找不到靠谱的兼职平台而苦恼&#xff1f; 今天&#xff0c;就为你揭秘那些真正靠谱的网上赚钱平台&#xff0c;让你的赚钱之路不再迷…

MySQL--InnoDB体系结构

目录 一、物理存储结构 二、表空间 1.数据表空间介绍 2.数据表空间迁移 3.共享表空间 4.临时表空间 5.undo表空间 三、InnoDB内存结构 1.innodb_buffer_pool 2.innodb_log_buffer 四、InnoDB 8.0结构图例 五、InnoDB重要参数 1.redo log刷新磁盘策略 2.刷盘方式&…

S1E45:单链表1 课后作业

测试题&#xff1a;0. 相比起数组来说&#xff0c;单链表具有哪些优势呢&#xff1f; 答&#xff1a;长度非固定&#xff0c;可以申请添加长度 答案&#xff1a;对于数组来说&#xff0c;随机插入或者删除其中间的某一个元素&#xff0c;都是需要大量的移动操作&#xff0c;而…

基于tcp实现自定义应用层协议

认识协议 协议&#xff08;Protocol&#xff09; 是一种通信规则或标准&#xff0c;用于定义通信双方或多方之间如何交互和传输数据。在计算机网络和通信系统中&#xff0c;协议规定了通信实体之间信息交换的格式、顺序、定时以及有关同步等事宜的约定。简易来说协议就是通信…

网络工程师---第三十八天

ISIS&#xff1a; ISIS含义&#xff1a;中间系统到中间系统IS-IS。 ISIS特点&#xff1a;①内部网关协议IGP&#xff08;Interior Gateway Protocol&#xff09;&#xff0c;用于自治系统内部&#xff1b; ②IS-IS也是一种链路状态协议&#xff0c;使用最短路径优先SPF算法进…

电子阅览室在管理时需注意什么

关于如今的绝大多数人来说&#xff0c;想必都听说过“电子阅览室”这一概念。它首要运用在校园中&#xff0c;给学生们供给愈加丰厚的常识储藏。它也是一个独立的局域网&#xff0c;在校园网络中作为重要的一个组成部分而存在。但是&#xff0c;一个好的电子阅览室是需求满意运…

python文件IO基础知识

目录 1.open函数打开文件 2.文件对象读写数据和关闭 3.文本文件和二进制文件的区别 4.编码和解码 读写文本文件时 读写二进制文件时 5.文件指针位置 6.文件缓存区与flush()方法 1.open函数打开文件 使用 open 函数创建一个文件对象&#xff0c;read 方法来读取数据&…

Docker学习(4):部署web项目

一、部署vue项目 在home目录下创建项目目录 将打包好的vue项目放入该目录下&#xff0c;dist是打包好的vue项目 在项目目录下&#xff0c;编辑default.conf 内容如下&#xff1a; server {listen 80;server_name localhost; # 修改为docker服务宿主机的iplocation / {r…

[JAVASE] 类和对象(六) -- 接口(续篇)

目录 一. Comparable接口 与 compareTo方法 1.1 Comparable接口 1.2 compareTo方法的重写 1.2.1 根据年龄进行比较 1.2.2 根据姓名进行比较 1.4 compareTo 方法 的使用 1.3 compareTo方法的缺点(重点) 二. Comparator接口 与 compare方法 2.1 Comparator接口 2.2 compare 方法…

使用AWR对电路进行交流仿真---以整流器仿真为例

使用AWR对电路进行交流仿真—以整流器仿真为例 生活不易&#xff0c;喵喵叹气。马上就要上班了&#xff0c;公司的ADS的版权紧缺&#xff0c;主要用的软件都是NI 的AWR&#xff0c;只能趁着现在没事做先学习一下子了&#xff0c;希望不要裁我。 本AWR专栏只是学习的小小记录而…

2024.5.25期末测试总结

成绩&#xff1a; 配置&#xff1a; 可能与实际有些出入 题目&#xff1a; 第一题&#xff1a; 代码思路&#xff1a; 一道模拟题&#xff0c;按照公式计算出sumpow(2,i)&#xff0c;判断sum>H&#xff0c;输出 代码&#xff1a; #include<bits/stdc.h> using name…

LiveGBS流媒体平台GB/T28181用户手册-基础配置:信令服务配置、流媒体服务配置、白名单、黑名单、更多配置

LiveGBS流媒体平台GB/T28181用户手册-基础配置:信令服务配置、流媒体服务配置、白名单、黑名单、更多配置 1、基础配置1.1、信令服务配置1.2、白名单1.3、黑名单1.4、流媒体服务配置 2、搭建GB28181视频直播平台 1、基础配置 LiveGBS相关信令服务配置和流媒体服务配置都在这里…

Spark运行模式详解

Spark概述 Spark 可以在多种不同的运行模式下执行&#xff0c;每种模式都有其自身的特点和适用场景。 部署Spark集群大体上分为两种模式&#xff1a;单机模式与集群模式。大多数分布式框架都支持单机模式&#xff0c;方便开发者调试框架的运行环境。但是在生产环境中&#xff…

机器人支持回调接口配置(详细教程)

大家伙&#xff0c;我是雄雄&#xff0c;欢迎关注微信公众号&#xff1a;雄雄的小课堂。 一、前言 今天&#xff0c;给大家介绍一下&#xff0c;如何在机器人中配置回调地址和接口编写。很多时候我们可能有这样的场景&#xff0c;收到消息后&#xff0c;想自己处理一下消息的内…

【微服务】springboot 构建镜像多种模式使用详解

目录 一、前言 二、微服务常用的镜像构建方案 3.1 使用Dockerfile 3.2 使用docker plugin插件 3.3 使用docker compose 编排文件 三、环境准备 3.1 服务器 3.2 安装JDK环境 3.2.1 创建目录 3.2.2 下载安装包 3.2.3 配置环境变量 2.2.4 查看java版本 3.3 安装maven …

MySQL中, 自增主键和UUID作为主键有什么区别?

首先我们来看看, 存储自增主键和uuid的数据类型 我们知道, mysql中作为主键的通常是int类型的数据, 这个 数据从第一条记录开始, 从1开始主键往后递增, 例如我有100条数据, 那么根据主键排序后, 里面的记录从上往下一次就是1, 2, 3 ... 100, 但是UUID就不一样了, UUID是根据特殊…

HTTP协议、URL、HTTPS协议 ----- 讲解很详细

本章重点 理解应用层的作用, 初识HTTP协议 了解HTTPS协议 一、HTTP协议 1.认识url 虽然我们说&#xff0c;应用层协议是我们程序猿自己定的&#xff0c;但实际上&#xff0c;已经有大佬们定义了一些现成的&#xff0c;又非常好用的应用层协议&#xff0c;供我们直接参考使…