go rabbitmq 操作

news2024/10/6 10:39:14

go rabbitmq 操作

go 依赖包github.com/streadway/amqp

docker快速部署

docker pull rabbitmq:management
docker run -d rabbitmq:management # 先跑一个看看监听了哪些端口
docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq #5672 go 程序连接,15672是管理页面

写个最基本生产者消费者demo(headers 模式)

package test

import (
	"fmt"
	"log"
	"os"
	"os/signal"
	"strconv"
	"syscall"
	"testing"
	"time"

	"github.com/streadway/amqp"
)

var (
	obj *MQOBJ
)

type MQOBJ struct {
	*amqp.Connection
	*amqp.Channel
}

func (mq *MQOBJ) Close() error {
	mq.Connection.Close()
	mq.Channel.Close()
	return nil
}
func init() {
	var mqurl = "amqp://cho:123@192.168.101.7:5672"
	con, err := amqp.Dial(mqurl)
	if err != nil {
		log.Fatalln(err)
	}
	ch, err := con.Channel()
	if err != nil {
		log.Fatalln(err)
	}
	obj = &MQOBJ{Connection: con, Channel: ch}

}
func producer() {
	_, err := obj.Channel.QueueDeclare("go-test2", true, false, false, false, nil)
	if err != nil {
		return
	}
	err = obj.ExchangeDeclare("go-test-exchange2", amqp.ExchangeHeaders, true, false, false, false, nil)
	if err != nil {
		log.Fatalln(err)
	}
	//这个queue绑定,你也可以放消费者那边绑定,更灵活
	err = obj.Channel.QueueBind("go-test2", "go-test2", "go-test-exchange2", false, amqp.Table{"name": "jesko"})
	if err != nil {
		log.Fatalln(err)
	}
	ticker := time.NewTicker(time.Millisecond * 300)
	var i int
	for {
		select {
		case <-ticker.C:
			err = obj.Publish("", "go-test2", false, false, amqp.Publishing{Body: []byte("hello " + strconv.Itoa(i)), ContentType: "text/plain", Headers: amqp.Table{"x-match": "any", "name": "jesko", "age": 22}})
			if err != nil {
				log.Fatalln(err)
			}
			i++
		}
	}

}
func customer() {
	_, err := obj.Channel.QueueDeclare("go-test2", true, false, false, false, nil)
	if err != nil {
		log.Fatalln(err)
	}
	msgch, err := obj.Channel.Consume("go-test2", "", true, false, true, false, nil)
	if err != nil {
		log.Fatalln(err)
	}
	ch := make(chan os.Signal, 1)
	signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM)
	for {
		select {
		case msg := <-msgch:
			fmt.Println("accept msg " + string(msg.Body))
		case <-ch:
			return
		}
	}
}
func TestAmqp(t *testing.T) {
	defer obj.Close()
	go func() {
		producer()
	}()
	time.Sleep(2 * time.Second)
	customer()
}

请添加图片描述
这里可以看到我们创建的queue
请添加图片描述

topic模式

topic模式不用绑定headers去匹配

package test

import (
	"fmt"
	"log"
	"os"
	"os/signal"
	"strconv"
	"syscall"
	"testing"
	"time"

	"github.com/streadway/amqp"
)

var (
	obj    *MQOBJ
	logger *log.Logger = log.New(os.Stdout, "", log.Llongfile|log.LUTC)
)

func Fataln(a ...any) {
	logger.Println(a...)
	os.Exit(0)
}

type MQOBJ struct {
	*amqp.Connection
	*amqp.Channel
}

func (mq *MQOBJ) Close() error {
	mq.Connection.Close()
	mq.Channel.Close()
	return nil
}
func init() {
	var mqurl = "amqp://cho:123@192.168.101.7:5672"
	con, err := amqp.Dial(mqurl)
	if err != nil {
		Fataln(err)
	}
	ch, err := con.Channel()
	if err != nil {
		Fataln(err)
	}
	obj = &MQOBJ{Connection: con, Channel: ch}
	fmt.Println("init success")

}
func producer() {
	err := obj.ExchangeDeclare("go-test-exchange", amqp.ExchangeTopic, true, false, false, false, nil)
	if err != nil {
		Fataln(err)
	}
	ticker := time.NewTicker(time.Millisecond * 300)
	var i int
	for {
		select {
		case <-ticker.C:
			err = obj.Publish("go-test-exchange", "go-test", false, false, amqp.Publishing{Body: []byte("hello " + strconv.Itoa(i)), ContentType: "text/plain"})
			if err != nil {
				Fataln(err)
			}
			i++
		}
	}

}

type Empty struct{}

func customer(name string, stopchan <-chan Empty) {
	ch, err := obj.Connection.Channel()
	if err != nil {
		Fataln(err)
	}
	defer ch.Close()
	_, err = ch.QueueDeclare(name, true, false, false, false, nil)
	if err != nil {
		Fataln("queue declare failed", err)
	}
	err = ch.QueueBind(name, name, "go-test-exchange", false, nil)
	if err != nil {
		fmt.Fprintln(os.Stderr, "queue bind failed", err)
		return
	}
	msgch, err := ch.Consume(name, "", true, false, true, false, nil)
	if err != nil {
		Fataln("consume failed", err)
	}
	for {
		select {
		case msg := <-msgch:
			fmt.Println("accept msg " + name + " " + string(msg.Body))
		case <-stopchan:
			return
		}
	}
}
func TestAmqp(t *testing.T) {
	defer obj.Close()
	go func() {
		producer()
	}()
	time.Sleep(2 * time.Second)
	stopchanlist := make([]chan Empty, 2)
	stopchanlist[0], stopchanlist[1] = make(chan Empty, 1), make(chan Empty, 1)
	ch := make(chan os.Signal, 1)
	signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM)
	go func() {
		<-ch
		for _, c := range stopchanlist {
			c <- Empty{}
		}
	}()
	go customer("go-test", stopchanlist[0])
	customer("go-test2", stopchanlist[1])
}

go-test 有信息(topic 匹配),go-test2没信息(topic未匹配)。

direct模式

package test

import (
	"fmt"
	"log"
	"os"
	"os/signal"
	"strconv"
	"syscall"
	"testing"
	"time"

	"github.com/streadway/amqp"
)

var (
	obj    *MQOBJ
	logger *log.Logger = log.New(os.Stdout, "", log.Llongfile|log.LUTC)
)

func Fataln(a ...any) {
	logger.Println(a...)
	os.Exit(0)
}

type MQOBJ struct {
	*amqp.Connection
	*amqp.Channel
}

func (mq *MQOBJ) Close() error {
	mq.Connection.Close()
	mq.Channel.Close()
	return nil
}
func init() {
	var mqurl = "amqp://cho:123@192.168.101.7:5672"
	con, err := amqp.Dial(mqurl)
	if err != nil {
		Fataln(err)
	}
	ch, err := con.Channel()
	if err != nil {
		Fataln(err)
	}
	obj = &MQOBJ{Connection: con, Channel: ch}
	fmt.Println("init success")

}
func producer() {
	err := obj.ExchangeDeclare("go-test-exchange3", amqp.ExchangeDirect, true, false, false, false, nil)
	if err != nil {
		Fataln(err)
	}
	ticker := time.NewTicker(time.Millisecond * 300)
	var i int
	for {
		select {
		case <-ticker.C:
			err = obj.Publish("go-test-exchange3", "", false, false, amqp.Publishing{Body: []byte("hello " + strconv.Itoa(i)), ContentType: "text/plain"})
			if err != nil {
				Fataln(err)
			}
			i++
		}
	}

}

type Empty struct{}

func customer(name string, stopchan <-chan Empty) {
	ch, err := obj.Connection.Channel()
	if err != nil {
		Fataln(err)
	}
	defer ch.Close()
	_, err = ch.QueueDeclare(name, true, false, false, false, nil)
	if err != nil {
		Fataln("queue declare failed", err)
	}
	err = ch.QueueBind(name, "", "go-test-exchange3", false, nil)
	if err != nil {
		fmt.Fprintln(os.Stderr, "queue bind failed", err)
		return
	}
	msgch, err := ch.Consume(name, "", true, false, true, false, nil)
	if err != nil {
		Fataln("consume failed", err)
	}
	for {
		select {
		case msg := <-msgch:
			fmt.Println("accept msg " + name + " " + string(msg.Body))
		case <-stopchan:
			return
		}
	}
}
func TestAmqp(t *testing.T) {
	defer obj.Close()
	go func() {
		producer()
	}()
	time.Sleep(2 * time.Second)
	stopchanlist := make([]chan Empty, 2)
	stopchanlist[0], stopchanlist[1] = make(chan Empty, 1), make(chan Empty, 1)
	ch := make(chan os.Signal, 1)
	signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM)
	go func() {
		<-ch
		for _, c := range stopchanlist {
			c <- Empty{}
		}
	}()
	go customer("go-test", stopchanlist[0])
	customer("go-test2", stopchanlist[1])
}

demo测试命令

go test -v amqp_test.go

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1524778.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Linux:系统初始化,内核优化,性能优化(3)

优化系统的文件句柄数&#xff08;全局&#xff09; 也就是系统的最大文件数量 查看最大数量 cat /proc/sys/fs/file-max 当我们的服务器有非常大的一个数据并发的时候十几二十万的文件需要去配置&#xff0c;可能这个是远远不够的&#xff0c;我们就要去修改 vim /etc/sy…

栈和队列(Java实现)

栈和队列&#xff08;Java实现&#xff09; 栈 栈(Stack)&#xff1a;栈是先进后出&#xff08;FILO, First In Last Out&#xff09;的数据结构。Java中实现栈有以下两种方式&#xff1a; stack类LinkedList实现&#xff08;继承了Deque接口&#xff09; &#xff08;1&am…

使用 GitHub Actions 通过 CI/CD 简化 Flutter 应用程序开发

在快节奏的移动应用程序开发世界中&#xff0c;速度、可靠性和效率是决定项目成功或失败的关键因素。持续集成和持续部署 (CI/CD) 实践已成为确保满足这些方面的强大工具。当与流行的跨平台框架 Flutter 和 GitHub Actions 的自动化功能相结合时&#xff0c;开发人员可以创建无…

【GPT-SOVITS-04】SOVITS 模块-鉴别模型解析

说明&#xff1a;该系列文章从本人知乎账号迁入&#xff0c;主要原因是知乎图片附件过于模糊。 知乎专栏地址&#xff1a; 语音生成专栏 系列文章地址&#xff1a; 【GPT-SOVITS-01】源码梳理 【GPT-SOVITS-02】GPT模块解析 【GPT-SOVITS-03】SOVITS 模块-生成模型解析 【G…

正则表达式与re模块

目录 正则表达式 简介 语法&#xff1a; 常用元字符&#xff1a; 量词: 贪婪匹配和惰性匹配&#xff1a; re模块 简介&#xff1a; 常用的几个模块&#xff1a; 1.findall 2.search 3.finditer 4.compile 案例展示&#xff1a; 需求&#xff1a; 思路分析&#…

Blocks —— 《Objective-C高级编程 iOS与OS X多线程和内存管理》

目录 Blocks概要什么是BlocksOC转C方法关于几种变量的特点 Blocks模式Block语法Block类型 变量截获局部变量值__block说明符截获的局部变量 Blocks的实现Block的实质 Blocks概要 什么是Blocks Blocks是C语言的扩充功能&#xff0c;即带有局部变量的匿名函数。 顾名思义&#x…

u盘文件损坏怎么恢复数据?分享三个数据恢复方法

随着科技的飞速发展&#xff0c;U盘已成为我们日常生活和工作中不可或缺的数据存储工具。然而&#xff0c;由于各种原因&#xff0c;如不当操作、病毒感染或硬件故障等&#xff0c;U盘中的文件可能会受到损坏。那么&#xff0c;当U盘文件损坏时&#xff0c;我们该如何恢复数据呢…

mac下Appuim环境安装

参考资料 Mac安装Appium_mac电脑安装appium-CSDN博客 安卓测试工具&#xff1a;Appium 环境安装&#xff08;mac版本&#xff09;_安卓自动化测试mac环境搭建-CSDN博客 1. 基本环境依赖 1 node.js 2 JDK&#xff08;Java JDK&#xff09; 3 Android SDK 4 Appium&#x…

深度学习-基于机器学习的语音情感识别系统的设计

概要 语音识别在现实中有着极为重要的应用&#xff0c;现在语音内容的识别技术已日趋成熟。当前语音情感识别是研究热点之一&#xff0c;它可以帮助AI和人更好地互动、可以帮助心理医生临床诊断、帮助随时随地高效测谎等。本文采用了中科院自动化所的CASIA语料库作为样本&#…

Qt文件以及文件夹相关类(QDir、QFile、QFileInfo)的使用

关于Qt相关文件读写操作以及文件夹的一些知识&#xff0c;之前也写过一些博客&#xff1a; Qt关于路径的处理&#xff08;绝对路径、相对路径、路径拼接、工作目录、运行目录&#xff09;_qt 相对路径-CSDN博客 C/Qt 读写文件_qt c 读取文本文件-CSDN博客 C/Qt读写ini文件_…

【GPT-SOVITS-01】源码梳理

说明&#xff1a;该系列文章从本人知乎账号迁入&#xff0c;主要原因是知乎图片附件过于模糊。 知乎专栏地址&#xff1a; 语音生成专栏 系列文章地址&#xff1a; 【GPT-SOVITS-01】源码梳理 【GPT-SOVITS-02】GPT模块解析 【GPT-SOVITS-03】SOVITS 模块-生成模型解析 【G…

react中hooks使用限制

只能在最顶层使用Hook 不要在循环、条件中调用hook&#xff0c;确保总是在React函数最顶层使用它们 只能React函数中调用Hook 不要在普通的js函数中调用 在React的函数组件中调用Hook 在自定义hook中调用其他hook 原因&#xff1a; 我们每次的状态值或者依赖项存在哪里&…

Unity触发器的使用

1.首先建立两个静态精灵&#xff08;并给其中一个物体添加"jj"标签&#xff09; 2.添加触发器 3.给其中一个物体添加刚体组件&#xff08;如果这里是静态的碰撞的时候将不会触发效果&#xff0c;如果另一个物体有刚体可以将它移除&#xff0c;或者将它的刚体属性设置…

Jest:JavaScript的单元测试利器

&#x1f90d; 前端开发工程师、技术日更博主、已过CET6 &#x1f368; 阿珊和她的猫_CSDN博客专家、23年度博客之星前端领域TOP1 &#x1f560; 牛客高级专题作者、打造专栏《前端面试必备》 、《2024面试高频手撕题》 &#x1f35a; 蓝桥云课签约作者、上架课程《Vue.js 和 E…

挑战杯 机器视觉目标检测 - opencv 深度学习

文章目录 0 前言2 目标检测概念3 目标分类、定位、检测示例4 传统目标检测5 两类目标检测算法5.1 相关研究5.1.1 选择性搜索5.1.2 OverFeat 5.2 基于区域提名的方法5.2.1 R-CNN5.2.2 SPP-net5.2.3 Fast R-CNN 5.3 端到端的方法YOLOSSD 6 人体检测结果7 最后 0 前言 &#x1f5…

YOLOV9训练自己的数据集

1.代码下载地址GitHub - WongKinYiu/yolov9: Implementation of paper - YOLOv9: Learning What You Want to Learn Using Programmable Gradient Information 2.准备自己的数据集 这里数据集我以SAR数据集为例 具体的下载链接如下所示&#xff1a; 链接&#xff1a;https:/…

软件测试 自动化测试selenium 基础篇

文章目录 1. 什么是自动化测试&#xff1f;1.1 自动化分类 2. 什么是 Selenium &#xff1f;3. 为什么使用 Selenium &#xff1f;4. Selenium 工作原理5. Selenium 环境搭建 1. 什么是自动化测试&#xff1f; 将人工要做的测试工作进行转换&#xff0c;让代码去执行测试工作 …

netlogo 羊-草生态系统模型的系统动力学搭建

to setupclear-allsystem-dynamics-setupendto gosystem-dynamics-gosystem-dynamics-do-plot enda 羊的净出生率 a 0.001sheep_birth a * sheep * grass羊 10 sheep 10b 羊的死亡率 0.01 b 0.01death 羊的死亡流 羊x 羊的死亡率 death b * sheep草 200 grass 200R 草的净…

2024最新PHP彩虹网盘与外链分享程序,支持所有格式文件的上传

彩虹外链网盘是一款基于PHP的在线存储和分享平台&#xff0c;它允许用户上传各种类型的文件&#xff0c;并提供了生成文件链接、图片链接、音乐和视频链接的功能。同时&#xff0c;它还会自动生成相应的UBB代码和HTML代码&#xff0c;支持文本、图片、音乐和视频的在线预览。这…

基于深度学习LSTM+NLP情感分析电影数据爬虫可视化分析推荐系统(深度学习LSTM+机器学习双推荐算法+scrapy爬虫+NLP情感分析+数据分析可视化)

文章目录 基于深度学习LSTMNLP情感分析电影数据爬虫可视化分析推荐系统&#xff08;深度学习LSTM机器学习双推荐算法scrapy爬虫NLP情感分析数据分析可视化&#xff09;项目概述深度学习长短时记忆网络&#xff08;Long Short-Term Memory&#xff0c;LSTM&#xff09;机器学习协…