go对rabbitmq基本操作

news2024/11/28 13:30:08

一、安装rabbitmq

  • 1、直接使用docker拉取镜像

    docker pull rabbitmq:3.8
    
  • 2、启动容器

    docker run \
     -e RABBITMQ_DEFAULT_USER=admin \
     -e RABBITMQ_DEFAULT_PASS=123456 \
     -v mq-plugins:/plugins \
     --name rabbit01 \
     --hostname rabbit01 --restart=always \
     -p 15672:15672 \
     -p 5672:5672 \
     -d \
     rabbitmq:3.8
    
  • 3、关于端口的介绍

    • 15672的给浏览器控制台使用的
    • 5672是给程序调用的
  • 4、进入到rabbit01容器中

    docker exec -it rabbit01 /bin/bash
    
  • 5、开启可视化界面操作

    rabbitmq-plugins enable rabbitmq_management
    
  • 6、客户端直接访问xx:15672

  • 7、或者直接用别人搞好的镜像

    docker run \
     -e RABBITMQ_DEFAULT_USER=admin \
     -e RABBITMQ_DEFAULT_PASS=123456 \
     -v mq-plugins:/plugins \
     --name rabbit02 \
     --hostname rabbit02 --restart=always \
     -p 15672:15672 \
     -p 5672:5672 \
     -d \
     rabbitmq:3.8-management
    

二、go语言对rabbitmq基本操作

  • 1、安装依赖包

    go get github.com/streadway/amqp
    
  • 2、基本的连接操作

    package main
    
    import (
    	"fmt"
    	"github.com/streadway/amqp"
    )
    
    func main() {
    	// 连接rabbitmq
        // conn,_ := amqp.Dial("amqp://用户名:密码@IP:端口号/虚拟机空间名称")   // 端口号:5672
    	conn, _ := amqp.Dial("amqp://admin:123456@localhost:5672//") // 端口号:5672
    	defer conn.Close()
    
    	// 打开通道
    	ch, err := conn.Channel()
    	fmt.Println(err)
    	defer ch.Close()
    }
    
  • 3、由于部分每个地方都要使用,封装成一个方法

    package utils
    
    import (
    	"fmt"
    	"github.com/streadway/amqp"
    )
    
    func RabbitmqUtils() *amqp.Channel {
    	// 连接rabbitmq
    	conn, _ := amqp.Dial("amqp://admin:123456@localhost:5672//") // 端口号:5672
    	//defer conn.Close()
    	// 打开通道
    	ch, err := conn.Channel()
    	fmt.Println(err)
    	//defer ch.Close()
    	return ch
    }
    
  • 4、创建一个队列,然后到可视化界面查看是否自动创建

    func main() {
    	// 创建一个队列
        // durable, autoDelete, exclusive, noWait bool
    	queue, err := utils.RabbitmqUtils().QueueDeclare("simple_queue", false, false, false, false, nil)
    	fmt.Println(queue.Name, err)
    }
    

    在这里插入图片描述

  • 5、关于创建队列几个参数的介绍

    • 第一个参数是队列名称
    • 第二个参数是队列是否持久化
    • 第三个参数是是否自动删除
    • 第四个参数是队列是否可以被其他队列访问
    • 第五个参数是设置为true则表示不等待服务器回执信息.函数将返回NULL,可以提高访问速度

三、简单模式

  • 1、根据官网图来看,简单模式是不需要交换机的

    在这里插入图片描述

  • 2、定义生产者,向队列中发送消息(注意要先创建队列)

    func main() {
        /**
    	第一个参数是交换机名称
    	第二个参数是队列名称
    	第三个参数是 如果生产者生产的任务没有正常进入队列中,设置为true会返还给生产者,设置为false会直接丢弃
    	第四个参数是 路由的时候
    	第五个参数是消息体
    	*/
    	err := utils.RabbitmqUtils().Publish("", "simple_queue", false, false, amqp.Publishing{
    		Body: []byte("hello word"),
    	})
    	fmt.Println(err)
    }
    
  • 3、查看可是界面是否存在一条消息

  • 4、创建消费者,来获取消息内容

    /**
    第一个参数是队列名称
    第二个参数自己给当前消费者命名
    第三个参数是否自动应答
    第三个参数队列是否可以被其他队列访问
    第四个参数
    第五个参数设置为true则表示不等待服务器回执信息.函数将返回NULL,可以提高访问速度
    */
    msgChan, err := utils.RabbitmqUtils().Consume("simple_queue", "", false, false, false, false, nil)
    fmt.Println(err)
    for msg := range msgChan {
        fmt.Println(string(msg.Body))
    }
    

四、工作模式

  • 1、工作模式是指一个生产者多个消费者,在简单模式上扩展成多个消费者,每个消费者只能交替来消费消息

  • 2、定义2个消费者来消费消息

    func main() {
    	msgChan, err := utils.RabbitmqUtils().Consume("work_queue", "", true, false, false, true, nil)
    	fmt.Println(err)
    	for msg := range msgChan {
    		fmt.Println("消费者1:", string(msg.Body))
    	}
    }
    
  • 3、生产多条消息

    func main() {
    	for i := 0; i < 10; i++ {
    		_ = utils.RabbitmqUtils().Publish("", "work_queue", false, false, amqp.Publishing{
    			Body: []byte(fmt.Sprintf("hello word %d", i)),
    		})
    	}
    }
    
  • 4、消费结果

    在这里插入图片描述

五、发布订阅模式

  • 1、发布订阅模式同样是一个生产者生产消息,多个消费者来消费,与上面的工作模式的区别是:工作模式是一个消费者消费后,另外一个消费者就消费不到了,发布订阅模式是不管有几个消费者都可以消费到消息

  • 2、使用goapi来创建交换机和队列

    func main() {
    	// 1.创建2个队列
    	queue1, _ := utils.RabbitmqUtils().QueueDeclare("first_queue", true, false, false, true, nil)
    	queue2, _ := utils.RabbitmqUtils().QueueDeclare("second_queue", true, false, false, true, nil)
    	// 2.创建一个交换机
    	_ = utils.RabbitmqUtils().ExchangeDeclare("first_exchange", amqp.ExchangeDirect, true, false, false, false, nil)
    	// 3.队列和交换机绑定在一起
    	_ = utils.RabbitmqUtils().QueueBind(queue1.Name, "", "first_exchange", true, nil)
    	_ = utils.RabbitmqUtils().QueueBind(queue2.Name, "", "first_exchange", true, nil)
    }
    
  • 3、消费者只需要绑定队列来消费消息就可以

    func main() {
    	msgChan, err := utils.RabbitmqUtils().Consume("first_queue", "", true, false, false, true, nil)
    	fmt.Println(err)
    	for msg := range msgChan {
    		fmt.Println("消费者1:", string(msg.Body))
    	}
    }
    
  • 4、生产者只需要把消息发送到交换机里面就可以,交换机会根据绑定的队列来推送消息

    func main() {
    	_ = utils.RabbitmqUtils().Publish("first_exchange", "", false, false, amqp.Publishing{
    		Body: []byte("hello word"),
    	})
    }
    
  • 5、可以查看控制台两个消费者都接收到消息

六、路由模式

  • 1、路由模式和上面的发布订阅模式有点类似,只是在上面的基础上添加的路由key

  • 2、使用go-api创建交换机和队列,并且对其绑定

    func main() {
    	// 1.创建2个队列
    	queue1, _ := utils.RabbitmqUtils().QueueDeclare("first_queue_key", true, false, false, true, nil)
    	queue2, _ := utils.RabbitmqUtils().QueueDeclare("second_queue_key", true, false, false, true, nil)
    	// 2.创建一个交换机
    	err := utils.RabbitmqUtils().ExchangeDeclare("second_exchange", amqp.ExchangeDirect, true, false, false, false, nil)
    	if err != nil {
    		fmt.Println(err)
    	}
    	// 3.队列和交换机绑定在一起
    	_ = utils.RabbitmqUtils().QueueBind(queue1.Name, "info", "second_exchange", true, nil)
    	_ = utils.RabbitmqUtils().QueueBind(queue2.Name, "info", "second_exchange", true, nil)
    	_ = utils.RabbitmqUtils().QueueBind(queue2.Name, "error", "second_exchange", true, nil)
    }
    
  • 3、定义消费者

    func main() {
    	msgChan, err := utils.RabbitmqUtils().Consume("first_queue_key", "", true, false, false, true, nil)
    	fmt.Println(err)
    	for msg := range msgChan {
    		fmt.Println("消费者1:", string(msg.Body))
    	}
    }
    
  • 4、定义生产者

    func main() {
        // 消费者会根据绑定的路由key来获取消息
    	_ = utils.RabbitmqUtils().Publish("second_exchange", "error", false, false, amqp.Publishing{
    		Body: []byte("hello word"),
    	})
    }
    

七、主题模式

  • 1、主题模式和上面路由模式差不多,只是多了一个模糊匹配
    • *表示只匹配一个单词
    • #表示匹配多个单词

八、简单对其封装

  • 1、封装代码

    package utils
    
    import (
    	"errors"
    	"fmt"
    	"github.com/streadway/amqp"
    	"log"
    )
    
    // MQURL url的格式 amqp://账号:密码@rabbitmq服务器地址:端口号/vhost
    const MQURL = "amqp://admin:123456@localhost:5672//"
    
    type RabbitMQ struct {
    	conn    *amqp.Connection
    	channel *amqp.Channel
    	MQUrl   string
    }
    
    // NewRabbitMQ 创建RabbitMQ的结构体实例
    func NewRabbitMQ() *RabbitMQ {
    	rabbitMQ := &RabbitMQ{
    		MQUrl: MQURL,
    	}
    	var err error
    	// 创建rabbitMQ连接
    	rabbitMQ.conn, err = amqp.Dial(rabbitMQ.MQUrl)
    	if err != nil {
    		rabbitMQ.failOnErr(err, "创建连接错误")
    	}
    	rabbitMQ.channel, err = rabbitMQ.conn.Channel()
    	if err != nil {
    		rabbitMQ.failOnErr(err, "获取channel失败")
    	}
    	return rabbitMQ
    }
    
    // Binding 创建交换机和队列并且绑定在一起
    func (r *RabbitMQ) Binding(queueName, exchange, key, routerKey string) {
    	// 1.创建1个队列
    	queue1, err := r.channel.QueueDeclare(queueName, true, false, false, true, nil)
    	if err != nil {
    		r.failOnErr(err, "创建队列失败")
    	}
    	if exchange != "" && key == "" {
    		r.failOnErr(errors.New("错误"), "请传递交换机链接类型")
    	}
    	if exchange != "" {
    		// 2.创建一个交换机
    		err1 := r.channel.ExchangeDeclare(exchange, key, true, false, false, false, nil)
    		if err1 != nil {
    			r.failOnErr(err, "创建交换机失败")
    		}
    		// 3.队列和交换机绑定在一起
    		if err := r.channel.QueueBind(queue1.Name, routerKey, exchange, true, nil); err != nil {
    			fmt.Println("1111")
    			r.failOnErr(err, "交换机和队列绑定失败")
    		}
    	}
    	fmt.Println("创建成功")
    }
    
    // failOnErr 定义内部错误处理
    func (r *RabbitMQ) failOnErr(err error, message string) {
    	if err != nil {
    		log.Fatalf("%s:%s", message, err)
    		panic(fmt.Sprintf("%s:%s", message, err))
    	}
    }
    
    func (r *RabbitMQ) Close() {
    	defer func(Conn *amqp.Connection) {
    		err := Conn.Close()
    		if err != nil {
    			r.failOnErr(err, "关闭链接失败")
    		}
    	}(r.conn)
    	defer func(Channel *amqp.Channel) {
    		err := Channel.Close()
    		if err != nil {
    			r.failOnErr(err, "关闭通道失败")
    		}
    	}(r.channel)
    }
    
    func (r *RabbitMQ) Qos(prefetchCount, prefetchSize int, global bool) {
    	err := r.channel.Qos(prefetchCount, prefetchSize, global)
    	if err != nil {
    		r.failOnErr(err, "限流失败")
    	}
    }
    
    // Publish 发布者
    func (r *RabbitMQ) Publish(exchange, routerKey, message string) {
    	// 2.发送数据到队列中
    	if err := r.channel.Publish(
    		exchange,
    		routerKey,
    		false, // 如果为true的时候会根据exchange的类型和routKey规则,如果无法找到符合条件的队列那么会把发送的消息发挥给发送者
    		false, // 如果为true的时候当exchane发送消息到队列后发现队列上没有绑定消费者则会把消息发还给发送者
    		amqp.Publishing{
    			Body: []byte(message),
    		},
    	); err != nil {
    		r.failOnErr(err, "发送消息失败")
    	}
    	fmt.Println("恭喜你,消息发送成功")
    }
    
    // Consumer 消费者
    func (r *RabbitMQ) Consumer(queueName string, callback func(message []byte)) {
    	// 2.接收消息
    	message, err := r.channel.Consume(
    		queueName,
    		"",    // 区分多个消费者
    		true,  // 是否自动应答
    		false, // 是否具有排他性
    		false, // 如果为true的时候,表示不能将同一个connection中发送的消息传递给connection中的消费者
    		false, // 队列消费是否阻塞
    		nil,
    	)
    	if err != nil {
    		r.failOnErr(err, "接收消息失败")
    	}
    	fmt.Println("消费者等待消费...")
    	forever := make(chan bool)
    	// 使用协程处理消息
    	go func() {
    		for d := range message {
    			log.Printf("接收到的消息:%s", d.Body)
    			callback(d.Body)
    		}
    	}()
    	<-forever
    }
    
  • 2、简单模式的使用

    func main() {
    	mq := utils.NewRabbitMQ()
    	mq.Consumer("simple_queue1", func(message []byte) {
    		fmt.Println(string(message))
    	})
    	defer mq.Close()
    }
    
    func main() {
    	mq := utils.NewRabbitMQ()
    	mq.Binding("simple_queue1", "", "", "")
    	defer mq.Close()
    	mq.Publish("", "simple_queue1", "你好水痕")
    }
    
  • 3、工作模式的使用

    func main() {
    	mq := utils.NewRabbitMQ()
    	mq.Consumer("work_queue1", func(message []byte) {
    		fmt.Println("消费者2", string(message))
    	})
    	defer mq.Close()
    }
    
    func main() {
    	mq := utils.NewRabbitMQ()
    	defer mq.Close()
    	for i := 0; i < 10; i++ {
    		mq.Publish("", "work_queue1", fmt.Sprintf("你好水痕%d", i))
    	}
    }
    
  • 4、交换机带路由的时候

    func main() {
    	mq := utils.NewRabbitMQ()
    	mq.Binding("first_queue1", "first_exchange1", amqp.ExchangeDirect, "info")
    	mq.Binding("first_queue1", "first_exchange1", amqp.ExchangeDirect, "error")
    	mq.Binding("first_queue2", "first_exchange1", amqp.ExchangeDirect, "info")
    	defer mq.Close()
    }
    
    func main() {
    	mq := utils.NewRabbitMQ()
    	mq.Consumer("first_queue2", func(message []byte) {
    		fmt.Println("消费者2", string(message))
    	})
    	defer mq.Close()
    }
    
    func main() {
    	mq := utils.NewRabbitMQ()
    	defer mq.Close()
    	mq.Publish("first_exchange1", "error", "你好水痕")
    }
    

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1254287.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

为什么高斯核是实现尺度空间变换的唯一变换核,并且是唯一的线性核?再研究

请先看&#xff0c;我们前面一篇&#xff0c;尺度为什么是sigma。 下面要说的是&#xff0c;我们研究的是:g&#xff08;x,y,sigma&#xff09;和g&#xff08;x,y,k*sigma&#xff09;的关系 而不是&#xff1a;I(x,y)和g&#xff08;x,y,sigma&#xff09;之间的关系 也不…

十分钟让你搞懂JVM中的GC垃圾回收机制(分代回收)

文章目录 0. 为什么要有垃圾回收?1. 垃圾回收哪个内存区域?2. 如何找到垃圾(死亡对象的判断)2.1 引用计数法2.2 可达性分析法2.3 两种算法的差别 3. 如何清理垃圾(死亡对象的回收)3.1 标记-清楚法3.2 复制法3.3 标记-整理法 4. JVM使用的回收方法4.1 什么是分代回收4.2 哪些对…

数据结构 / day04 作业

1. 单链表任意位置删除, 单链表任意位置修改, 单链表任意位置查找, 单链表任意元素查找, 单链表任意元素修改, 单链表任意元素删除, 单链表逆置 // main.c#include "head.h"int main(int argc, const char *argv[]) {Linklist headNULL; //head 是头指针// printf(&q…

基于opencv+ImageAI+tensorflow的智能动漫人物识别系统——深度学习算法应用(含python、JS、模型源码)+数据集(四)

目录 前言总体设计系统整体结构图系统流程图 运行环境爬虫模型训练实际应用 模块实现1. 数据准备1&#xff09;爬虫下载原始图片2&#xff09;手动筛选图片 2. 数据处理3. 模型训练及保存4. 模型测试1&#xff09;前端2&#xff09;后端 系统测试1. 测试效果2. 模型应用1&#…

极兔快递查询,极兔快递单号查询,对需要的单号记录进行备注

批量查询极兔快递单号的物流信息&#xff0c;对需要的快递单号记录进行备注。 所需工具&#xff1a; 一个【快递批量查询高手】软件 极兔快递单号若干 操作步骤&#xff1a; 步骤1&#xff1a;运行【快递批量查询高手】软件&#xff0c;并登录 步骤2&#xff1a;点击主界面左…

redis(Remote Dictionary Service) 底层数据结构

redis 底层数据结构 动态字符串SDS 优点 获取字符串长度的时间复杂度O(1) 支持动态扩容&#xff0c;减少内存分配次数 新字符串小于1M – 新空间为扩展后字符串长度的两倍 1 新字符串大于1M – 新空间为扩展后字符串长度 1M 1. 内存预分配 二进制安全&#xff08;记录了…

java springboot中使用 AOP监听方法执行周期

首先 我们在 pom.xml 中 dependencies标签中加入依赖 <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-aop</artifactId> </dependency>然后 我们随便创建一个类 编写代码如下 package com.ex…

pytorch导出rot90算子至onnx

如何导出rot90算子至onnx 1 背景描述2 等价替换2.1 rot90替换(NCHW)2.2 rot180替换(NCHW)2.3 rot270替换(NCHW) 3 rot导出ONNX 1 背景描述 在部署模型时&#xff0c;如果某些模型中或者前后处理中含有rot90算子&#xff0c;但又希望一起和模型导出onnx时&#xff0c;可能会遇到…

YOLOv5轻量化改进之mobilenetv3,更换mobilenetv3中的注意力机制。

目录 一、原理 二、代码 三、YOLOv5改进 一、原理 我们提出了基于互补搜索技术和新颖架构设计相结合的下一代mobilenet。MobileNetV3通过硬件网络架构搜索(NAS)和NetAdapt算法的结合来调整到移动电话cpu,然后通过新的架构进步进行改进。本文开始探索自动搜索算法和网络设计如…

Java新建项目如何整理项目结构,没有src文件夹

现在IDEA2023中新建项目时, 不会有src文件夹。这时需要自己创建一个src的包&#xff0c;然后将这个包设置为source root。 可能出现没有这个选项的情况&#xff0c;这是需要把设置的当前项目首先Unmark了&#xff0c;然后再对src文件夹mark一下。 src: 这是源代码的根目录。 …

Self Distillation 自蒸馏论文解读

paper&#xff1a;Be Your Own Teacher: Improve the Performance of Convolutional Neural Networks via Self Distillation official implementation&#xff1a; https://github.com/luanyunteng/pytorch-be-your-own-teacher 前言 知识蒸馏作为一种流行的压缩方法&#…

与Windows 10更新大同小异!一步一步教你如何更新Windows 11

如果你想让你的Windows 11设备获得最佳性能&#xff0c;那么定期更新是至关重要的。即使是最好的电脑如果不更新也会受到影响&#xff0c;因为更新会应用软件调整&#xff0c;帮助你的设备更快、更平稳地运行。它还提高了安全性&#xff0c;意味着你可以从Microsoft的最新功能中…

自动驾驶学习笔记(十一)——高精地图

#Apollo开发者# 学习课程的传送门如下&#xff0c;当您也准备学习自动驾驶时&#xff0c;可以和我一同前往&#xff1a; 《自动驾驶新人之旅》免费课程—> 传送门 《Apollo Beta宣讲和线下沙龙》免费报名—>传送门 文章目录 前言 高精地图 地图采集 底图制作 地图…

OpenFeign入门

OpenFeign是Spring Cloud OpenFeign&#xff0c;是Spring Cloud团队开发的基于Feign的框架 1、OpenFeign功能升级 OpenFeign在Feign的基础上提供了以下增强和扩展功能 &#xff08;1&#xff09;便于集成Spring Cloud组件&#xff1a;OpenFeign与Spring Cloud其他组件&#…

TCP/IP协议、三次握手、四次挥手

TCP/IP TCP/IP协议分层TCP头部三次握手TCP四次挥手常见问题1、什么是TCP网络分层2、TCP为什么是三次握手&#xff0c;不是两次或者四次&#xff1f;3、TCP为什么是四次挥手&#xff0c;为什么不能是三次挥手将第二次挥手和第三次挥手合并&#xff1f;4、四次挥手时为什么TIME_W…

汽车电子 - UDS

汽车电子 - UDS 概念基本概念分类请求与响应寻址信息物理寻址功能寻址 协议格式&#xff1f;&#xff1f;&#xff1f;750/758厂家自定义的吗&#xff1f;&#xff1f;&#xff1f;&#xff0c; 所有的UDS服务都在这里边吗&#xff1f;&#xff1f;&#xff1f;&#xff0c;代码…

Redis-缓存设计

缓存穿透 缓存穿透是指查询一个根本不存在的数据&#xff0c; 缓存层和存储层都不会命中&#xff0c; 通常出于容错的考虑&#xff0c; 如果从存储层查不到数据则不写入缓存层。 缓存穿透将导致不存在的数据每次请求都要到存储层去查询&#xff0c; 失去了缓存保护后端存储的…

Linux:docker容器操作(4)

docker的基础操作 Linux&#xff1a;docker基础操作&#xff08;3&#xff09;-CSDN博客https://blog.csdn.net/w14768855/article/details/134616198?spm1001.2014.3001.5501 我这里准备了两个镜像 镜像加载到容器 docker create [选项] 镜像 运行的程序 -i 让容器的标准输…

基于单片机的可升降助眠婴儿床(论文+源码)

1.系统设计 本课题为基于单片机的可升降助眠婴儿床系统&#xff0c;在设计目标上确定如下&#xff1a; 1. 可以实现婴儿床的升降&#xff0c;摇床功能控制&#xff1b; 2. 具有音乐播放功能&#xff0c;并且有多首曲目&#xff1b; 3. 用户可以通过按键或者红外遥控&#x…