golang rabbitMQ 生产者复用channel以及生产者组分发策略

news2025/1/15 12:54:39

引用的是rabbitMQ官方示例的库:github.com/rabbitmq/amqp091-go

在网络编程中我们知道tcp连接的创建、交互、销毁等相关操作的"代价"都是很高的,所以就要去实现如何复用这些连接,并要做到高效并可靠。

预期效果:

项目初始化构建时可以自定义选择生产者开启多个connection,每个connection可以启动多少个channel【都是全局复用的】,因为rabbitMQ所有的命令都是基本都是通过channel去操作完成的,所以这个channel很重要,也是我们想要复用的重点。

初始化创建完connection和channel后,当生产者需要发送一条消息的时候,我们可以通过一些策略去选择它发送到哪个connection和channel,我这里采用的就是随机选择,也可以采用哈希取模、轮询权重算法等,这个可以根据自身业务来做。

我简单画了一个效果图:

定义RabbitMQ结构体以及Config结构体

type Config struct {
    Host     string
    Port     int
    User     string
    Password string
}

type RabbitMQ struct {
    ctx     context.Context
    n       int
    m       *sync.Mutex
    Conn    *amqp.Connection
    Channel []*amqp.Channel
}

实例化RabbitMQ结构体

func (mq *RabbitMQ) New(config Config) (rabbitmq *RabbitMQ) {

    configString := fmt.Sprintf("amqp://%s:%s@%s:%d/", config.User, config.Password, config.Host, config.Port)

    conn, err := amqp.Dial(configString)
    if err != nil {
        log.Panicf("amqp connect error: %v \n", err)
    }

    rabbitmq = &RabbitMQ{
        ctx:  context.Background(),
        m:    &sync.Mutex{},
        Conn: conn,
    }

    return
}

一、创建消费者

// ConsumeWithWork rabbitmq消费消息[work模式 channelNums可以设置当前连接开启多少个channel]
func (mq *RabbitMQ) ConsumeWithWork(queueName string, channelNums int) {
    for i := 0; i < channelNums; i++ {
        go func(i int) {

            ch, err := mq.Conn.Channel()
            if err != nil {
                log.Panicf("amqp open a channel error: %v \n", err)
            }

            q, err := ch.QueueDeclare(
                queueName, // name
                true,      // durable
                false,     // delete when unused
                false,     // exclusive
                false,     // no-wait
                nil,       // arguments
            )
            if err != nil {
                log.Panicf("amqp declare a queue error: %v \n", err)
            }

            err = ch.Qos(
                1,     // prefetch count
                0,     // prefetch size
                false, // global
            )
            if err != nil {
                log.Panicf("amqp set QoS error: %v \n", err)
            }

            msg, err := ch.Consume(
                q.Name, // queue
                "",     // consumer
                false,  // auto-ack
                false,  // exclusive
                false,  // no-local
                false,  // no-wait
                nil,    // args
            )
            if err != nil {
                log.Panicf("amqp register a consumer error: %v \n", err)
            }

            log.Printf(" [work-%d] Waiting for messages. To exit press CTRL+C", i)

            for d := range msg {
                time.Sleep(2 * time.Second)
                fmt.Printf("[work-%d] Received a message: %s \n", i, d.Body)
                err = d.Ack(false)
                if err != nil {
                    log.Printf("work_one Ack Err: %v", err)
                }
            }
        }(i)
    }

    var forever chan struct{}
    <-forever
}

二、创建生产者组

// NewPlusherGroups 创建生产者组
func NewPlusherGroups(config Config, connNums, channelNums int) (plusherGroups map[int]*RabbitMQ) {

    plusherGroups = make(map[int]*RabbitMQ, connNums)

    for i := 0; i < connNums; i++ {

        var rabbitmq *RabbitMQ
        rabbitmq = rabbitmq.New(config)
        rabbitmq.n = i

        for cN := 0; cN < channelNums; cN++ {
            ch, err := rabbitmq.Conn.Channel()
            if err != nil {
                log.Panicf("amqp open a channel error: %v \n", err)
            }
            rabbitmq.Channel = append(rabbitmq.Channel, ch)
        }

        plusherGroups[i] = rabbitmq
    }
    return
}

三、将消息随机分发给不同的connection、channel

// SendMessageWithWork 生产者发送消息[work模式+(many conn and many channel)]
func SendMessageWithWork(plusherGroups map[int]*RabbitMQ, queueName, body string) bool {

    if plusherGroups == nil {
        log.Panicln("SendMessageWithWork plusherGroups params is nil!")
    }

    rand.Seed(time.Now().UnixNano())

    //获取连接个数
    connNums := len(plusherGroups)

    //随机分配一个连接对象
    randConnIndex := rand.Intn(connNums)

    //选择随机分配的连接对象
    conn := plusherGroups[randConnIndex]

    //既然采用了发布者复用conn、channel的形式那么一定要加锁处理
    //这里为每个对象的操作进行加锁(非线程安全,不加锁会报错的)
    //至于在存在并发竞争的情况下会存在一定性能损耗,但是我们配置好适量的conn和channel这个基本可以忽略
    conn.m.Lock()
    defer conn.m.Unlock()

    //获取当前对象的channel个数
    channelNums := len(conn.Channel)

    //随机分配一个channel对象
    randChannelIndex := rand.Intn(channelNums)

    //选择随机分配的channel
    ch := conn.Channel[randChannelIndex]

    q, err := ch.QueueDeclare(
        queueName, // name
        true,      // durable
        false,     // delete when unused
        false,     // exclusive
        false,     // no-wait
        nil,       // arguments
    )
    if err != nil {
        log.Panicf("amqp declare a queue error: %v \n", err)
    }

    body = fmt.Sprintf("conn[%d] channel[%d] send message : %s", randConnIndex, randChannelIndex, body)
    err = ch.PublishWithContext(conn.ctx,
        "",     // exchange
        q.Name, // routing key
        false,  // mandatory
        false,
        amqp.Publishing{
            DeliveryMode: amqp.Persistent,
            ContentType:  "text/plain",
            Body:         []byte(body),
        })
    if err != nil {
        log.Panicf("amqp publish a message error: %v \n", err)
    }

    return true
}

四、main函数调用消费者

package main

import (
    rabbitmq "go-test/rabbitmq/package"
)

func main()  {

    queueName := "task_queue"

    config := rabbitmq.Config{
        Host: "192.168.6.103",
        Port: 5672,
        User: "root",
        Password: "root",
    }

    var mq *rabbitmq.RabbitMQ
    mq = mq.New(config)

    //开启N个消费者
    mq.ConsumeWithWork(queueName, 3)
}

五、main函数调用生产者组发送消息

package main

import (
    "fmt"
    "github.com/gin-gonic/gin"
    rabbitmq "go-test/rabbitmq/package"
    "net/http"
    "time"
)

func main()  {

    var messageNo int

    queueName := "task_queue"

    config := rabbitmq.Config{
        Host: "192.168.6.103",
        Port: 5672,
        User: "root",
        Password: "root",
    }

    //conn连接数
    connNums := 2
    //channel连接数
    channelNums := 3

    //启动N个不同conn的连接,并且每个连接对应的channel为N个的rabbitmq实例
    plusherGroup := rabbitmq.NewPlusherGroups(config, connNums, channelNums)


    e := gin.Default()
    e.GET("/", func(c *gin.Context) {

        body := fmt.Sprintf("这是第%d条消息...", messageNo)

        if rabbitmq.SendMessageWithWork(plusherGroup, queueName, body) == true {
            messageNo++
            c.JSON(200, gin.H{
                "code": 200,
                "msg": "success",
            })
        } else {
            c.JSON(200, gin.H{
                "code": 500,
                "msg": "error",
            })
        }
    })

    server := &http.Server{
        Addr:         ":18776",
        Handler:      e,
        ReadTimeout:  time.Minute,
        WriteTimeout: time.Minute,
    }
    if err := server.ListenAndServe(); err != nil {
        panic(any("HttpServer启动失败"))
    }
}

执行流程:

  1. 启动消费者进程

可以看到我们用3个协程开启了3个work,也就是对应了3个channel

  1. 启动生产者组进程

这里用的gin框架,正常启动

我们可以看到rabbitMQ的控制台中,一共3个连接,1个是消费者进程,另外2个是生产者组进程,这2个正好和我们上面配置的connNums参数匹配

我们可以看到rabbitMQ的控制台中,一共9个channel,3个是消费者进程,另外6个是生产者组进程,这6个正好和我们上面配置的channelNums参数匹配

  1. 调用发送消息

ab.exe -n 1000 -c 1000 http://127.0.0.1:18776/

我们来看消费者日志打印情况,标红的可以证明我们在发送消息时让生产者根据我们的随机分配策略选择connection和channel

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/335781.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

论如何用python自动下载爱的妹子视频~嘿嘿嘿~

前言 嗨喽&#xff0c;大家好呀~这里是爱看美女的茜茜呐 又到了学Python时刻~ 现在好看的妹子真的太多啦~ 如何一次性把这些好看的视频全保存下来捏&#xff1f; 开发环境: 版 本: python 3.8 编辑器: pycharm 2022.3.2 专业版 requests >>> pip install request…

【数据结构】复杂度讲解

目录 时间复杂度与空间复杂度&#xff1a;&#xff1a; 1.算法效率 2.时间复杂度 3.空间复杂度 4.常见时间复杂度以及复杂度OJ练习 时间复杂度与空间复杂度&#xff1a;&#xff1a; 什么是数据结构? 数据结构中是计算机存储,组织数据的方式,指相互之间存在一种或多种特定关…

面向对象的设计模式

"万丈高楼平地起&#xff0c;7种模式打地基"&#xff0c;模式是一种规范&#xff0c;我们应该站在巨人的肩膀上越看越远&#xff0c;接下来&#xff0c;让我们去仔细了解了解面向对象的7种设计模式7种设计模式设计原则的核心思想&#xff1a;找出应用中可能需要变化之…

24考研|高等数学的基础概念定理(二)——第二章|导数与微分

文章目录一、基础概念定理部分1.1 导数的四则运算法则1.2 反函数的求导法则1.3 复合函数的求导法则1.4 费马引理1.5 罗尔定理1.6 拉格朗日中值定理1.7 导数为零的结论1.8 柯西中值定理1.9 洛必达法则1.10 泰勒中值定理&#xff08;定理1&#xff0c;定理2&#xff09;1.11 导数…

CRM系统能给企业带来什么? CRM系统推荐

什么是CRM系统&#xff1f; CRM系统&#xff08;又称客户关系管理系统&#xff09;是一个以客户为核心的管理软件&#xff0c;能有效改善企业与现有客户的关系&#xff0c;且帮助企业寻找新的潜在客户&#xff0c;并赢回以前老客户。 CRM系统能给企业带来什么&#xff1f; C…

计算机视觉框架OpenMMLab开源学习(五):目标检测实战

目标检测实战 前言&#xff1a;本篇主要偏向目标检测实战部分&#xff0c;使用MMDetection工具进行代码应用&#xff0c;最后对水果进行检测实战演示&#xff0c;本次环境和代码配置部分省略&#xff0c;具体内容建议参考前一篇文章&#xff1a;计算机视觉框架OpenMMLab开源学…

基于STM32设计的避障寻迹小车

一、前言 1.1 项目背景 根据美国玩具协会在一项研究中&#xff0c;过去几年全球玩具销售增长与GDP的世界平均水平大致相同。但全球玩具市场的内部结构已经占据了巨大的位置变化&#xff1a;传统玩具的市场份额正在下降&#xff0c;高科技电子玩具正在蓬勃发展。全球玩具市场的…

迁移至其他美国主机商时需要考虑的因素

网站的可访问性是关系业务的关键因素之一。一个稳定、快速且优化良好的主机上的网站更有可能享受不间断的流量&#xff0c;并在谷歌的SERP中获得更好的排名。因此&#xff0c;在构建企业网站时&#xff0c;选择合适的主机商相当重要。不过就以美国主机为例&#xff0c;由于每个…

three.js学习笔记(一):THREE.Materail五种基础材质的使用

MeshBasicMaterial&#xff08;网格基础材质&#xff09;&#xff1a;基础材质&#xff0c;用于给几何体赋予一种简单的颜色&#xff0c;或者显示几何体的线框。MeshDepthMaterial&#xff08;网格深度材质&#xff09;&#xff1a; 这个材质使用从摄像机到网格的距离来决定如何…

企业舆情监测多少钱,TOOM舆情监测专业服务平台

企业舆情监测的费用因公司不同而异&#xff0c;具体需要多少钱取决于您的需求和舆情监测公司的收费标准。一些因素&#xff0c;如舆情监测范围、数据收集方式、舆情分析报告等细节&#xff0c;都可能影响最终的费用。最好联系舆情监测公司询问具体收费标准。企业舆情监测多少钱…

Linux_用户和权限

一、认识root用户 --超级管理员 root用户拥有最大的系统操作权限&#xff0c;而普通用户在许多地方都是受限的 普通用户的权限&#xff0c;一般在其home目录内是不受限的 一旦出了home目录&#xff0c;大多数地方普通用户仅有只读和执行权限&#xff0c;无修改权限 1、su和…

linux配置密码过期的安全策略(/etc/login.defs的解读)

长期不更换密码很容易导致密码被破解&#xff0c;而linux的密码过期安全策略主要在/etc/login.defs中配置。一、/etc/login.defs文件的参数解读1、/etc/login.defs文件的内容示例[rootlocalhost ~]# cat /etc/login.defs # # Please note that the parameters in this configur…

10.jQuery中请求预处理 $.ajaxPrefilter()

在使用jQuery发起请求的时候($.get(),$.post().$ajax()都可以)会默认在请求前调用$.ajaxPrefilter()这个函数&#xff0c;我们可以利用这个来做一些事情 目录 1 定义API根路径 2 添加请求头 3 添加请求结束的回调函数 1 定义API根路径 这样后面每次请求就不用再写根路…

什么样的帮助文档才能帮助用户?

尼尔森十大交互原则的最后一个原则——“人性化帮助原则”提出&#xff0c;我们应该给系统提供一份帮助文档&#xff0c;让用户能够尽快了解系统&#xff0c;熟悉操作。 互联网时代&#xff0c;很多软件都把用户习惯培养起来了&#xff0c;对于大部分软件都有一种无师自通的感…

MySQL安装配置(Windows和 Linux )

MySQL安装配置&#xff08;Windows和 Linux &#xff09; 文章目录MySQL安装配置&#xff08;Windows和 Linux &#xff09;一、MySQL 下载1. 1 点击 **DOWNLOADS**1.2 点击 **MySQL Community (GPL) Downloads **1.3 点击 **MySQL Community Server**1.4 此时跳转到最新版本的…

2023年Q1业绩增长背后,迪士尼亟待扭转流媒体亏损困局

重新执掌迪士尼后&#xff0c;鲍勃伊格尔交出了一份表现尚可的“答卷”。 图源:迪士尼 美东时间2023年2月8日&#xff0c;迪士尼披露了2023财年Q1财报&#xff0c;营收为235.1亿美元&#xff0c;同比增长8%&#xff1b;持续经营净利润13亿美元&#xff0c;同比增长11%。受此利…

红米9a手动root方法

简介 已知红米6A/6/9/9A/9C/10A机器都可以快速解锁BL&#xff0c;无任何变砖风险 并且秒解锁BL后和官方解锁一样&#xff0c;无任何其他不良影响。推荐大家使用官网解锁&#xff0c;需要等待7天。 ​ BootLoader BootLoader是在操作系统内核运行之前运行的一段小程序。其实…

【虚拟仿真】Unity3D实现从浏览器拉起本地exe程序并传参数

推荐阅读 CSDN主页GitHub开源地址Unity3D插件分享简书地址我的个人博客 大家好&#xff0c;我是佛系工程师☆恬静的小魔龙☆&#xff0c;不定时更新Unity开发技巧&#xff0c;觉得有用记得一键三连哦。 一、前言 最近有项目需求&#xff0c;从浏览器调起来本地的exe程序&…

常见HTTP请求错误码大全

响应码由三位十进制数字组成&#xff0c;它们出现在由HTTP服务器发送的响应的第一行。 响应码分五种类型&#xff0c;由它们的第一位数字表示&#xff1a; 1xx&#xff1a;信息&#xff0c;请求收到&#xff0c;继续处理 2xx&#xff1a;成功&#xff0c;行为被成功地接受、…

【Nacos】Nacos配置中心服务端源码分析

上文说了Nacos配置中心客户端的源码流程&#xff0c;这篇介绍下Nacos配置中心服务端的源码。 服务端的启动 先来看服务启动时干了啥&#xff1f; init()方法上面有PostConstruct&#xff0c;该方法会在ExternalDumpService实例化后执行。 com.alibaba.nacos.config.server.s…