Go微服务: 基于rocketmq:server和rocketmq:broker搭建RocketMQ环境,以及生产消息和延迟消费消息的实现

news2024/12/24 4:28:42

RocketMQ 的搭建


1 ) 配置 docker-compose.yaml 文件

version: '3.5'
services:
  rmqnamesrv:
    image: foxiswho/rocketmq:server
    container_name: rmqnamesrv
    ports:
      - 9876:9876
    volumes:
      - ./logs:/opt/logs
      - ./store:/opt/store
    networks:
      rmq:
        aliases:
          - rmqnamesrv

  rmqbroker:
    image: foxiswho/rocketmq:broker
    container_name: rmqbroker
    ports:
      - "10909:10909"
      - "10911:10911"
    volumes:
      - ./logs:/opt/logs
      - ./store:/opt/store
      - ./conf/broker.conf:/etc/rocketmq/broker.conf
    environment:
        NAMESRV_ADDR: "rmqnamesrv:9876"
        JAVA_OPTS: " -Duser.home=/opt"
        JAVA_OPT_EXT: "-server -Xms128m -Xmx128m -Xmn128m"
    command: mqbroker -c /etc/rocketmq/broker.conf
    depends_on:
      - rmqnamesrv
    networks:
      rmq:
        aliases:
          - rmqbroker

  rmqconsole:
    image: styletang/rocketmq-console-ng
    container_name: rmqconsole
    ports:
      - "8080:8080"
    environment:
      JAVA_OPTS: "-Drocketmq.namesrv.addr=rmqnamesrv:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false"
    depends_on:
      - rmqnamesrv
    networks:
      rmq:
        aliases:
          - rmqconsole

networks:
  rmq:
    name: rmq
    driver: bridge

2 ) 配置文件 conf/broker.conf

brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
brokerIP1=192.168.124.6
defaultTopicQueueNums = 4
autoCreateTopicEnable = true
autoCreateSubscriptionGroup = true
listenPort = 10911
deleteWhen = 04
fileReservedTime = 120
mapedfileSizeCommitLog = 1073741824
mapedfileSizeConsumeQueue=300000
diskMaxUsedSpaceRatio = 88
maxMessageSize=65536
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
  • 注意,需要指定 brokerIP1 且不能使用 0.0.0.0 也不能不指定,否则无法通信
  • fileReservedTime 默认是 48h

3 ) 拉取镜像

  • $ docker pull foxiswho/rocketmq:server
  • $ docker pull foxiswho/rocketmq:broker
  • $ docker pull styletang/rocketmq-console-ng

4 )启动和检查

  • 启动 $ docker compose up -d
  • 检查状态 $ docker compose ps

打开 UI 界面验证

  • 访问:http://127.0.0.1:8080

上面这个就是和上面的 brokerIP1 对应

编写程序验证生产和消费消息

  • 现在简述下场景
    • 生产5条消息
    • 10s 后进行消费

代码实现

package main

import (
	"context"
	"fmt"
	"log"
	"os"
	"strconv"
	"time"

	"github.com/apache/rocketmq-client-go/v2"
	"github.com/apache/rocketmq-client-go/v2/consumer"
	"github.com/apache/rocketmq-client-go/v2/primitive"
	"github.com/apache/rocketmq-client-go/v2/producer"
)

const groupName = "BBS_SHOP_GROUP_123"

func GetMqAddr() string {
	mqAddr := "127.0.0.1:9876" // 这里填入你的 NameServer 端口
	return mqAddr
}

func ProduceMsg(mqAddr string, topic string) {
	p, err := rocketmq.NewProducer( // 普通消息生产者
		producer.WithGroupName(groupName),
		producer.WithNsResolver(primitive.NewPassthroughResolver([]string{mqAddr})),
		producer.WithRetry(2),
	)
	if err != nil {
		panic(err)
	}
	err = p.Start()
	if err != nil {
		log.Fatal()
		fmt.Println("生产者错误: %v", err.Error())
		os.Exit(1)
	}
	for i := 0; i < 5; i++ {
		msg := &primitive.Message{
			Topic: topic,
			Body:  []byte("Hello XProjectOrder " + strconv.Itoa(i)),
		}
		msg.WithDelayTimeLevel(3)
		r, err := p.SendSync(context.Background(), msg)
		if err != nil {
			fmt.Println("发送消息错误: %v", err.Error())
		} else {
			fmt.Println("生产消息成功: " + r.String() + "-" + r.MsgID)
		}
	}
	err = p.Shutdown()
	if err != nil {
		fmt.Println("生产者shutdown: %v", err.Error())
		os.Exit(1)
	}
}

func ComsumeMsg(mqAddr string, topic string) {
	c, err := rocketmq.NewPushConsumer(
		consumer.WithGroupName(groupName),
		consumer.WithNsResolver(primitive.NewPassthroughResolver([]string{mqAddr})),
	)
	if err != nil {
		panic(err)
	}
	err = c.Subscribe(topic, consumer.MessageSelector{},
		func(ctx context.Context, msgList ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
			for i := range msgList {
				fmt.Printf("订阅消息,消费%v \n", msgList[i])
			}
			return consumer.ConsumeSuccess, nil
		})
	if err != nil {
		fmt.Println("消费消息错误: %v", err.Error())
	}
	err = c.Start()
	if err != nil {
		fmt.Println("开启消费这错误: %v", err.Error())
	}
	time.Sleep(time.Hour)
	err = c.Shutdown()
	if err != nil {
		fmt.Println("shutdown消费者错误: %v", err.Error())
	}
}

func main() {
	topic := "BBS_SHOP_TOPIC_123"
	mqAddr := GetMqAddr()
	ProduceMsg(mqAddr, topic)
	ComsumeMsg(mqAddr, topic)
}
  • 以上是一个 demo,在真实场景,自行进行封装处理

  • 这里定义了一个 主题 BBS_SHOP_TOPIC_123 和 一个订阅组 BBS_SHOP_GROUP_123

  • 查看生产消息输出

    生产消息成功: SendResult [sendStatus=0, msgIds=C0A87C06209F0000000017eef7e80001, offsetMsgId=C0A87C0600002A9F0000000000000000, queueOffset=0, messageQueue=MessageQueue [topic=BBS_SHOP_TOPIC_123, brokerName=broker-bbs, queueId=1]]-C0A87C06209F0000000017eef7e80001
    生产消息成功: SendResult [sendStatus=0, msgIds=C0A87C06209F0000000017eef7e80002, offsetMsgId=C0A87C0600002A9F00000000000000DE, queueOffset=1, messageQueue=MessageQueue [topic=BBS_SHOP_TOPIC_123, brokerName=broker-bbs, queueId=2]]-C0A87C06209F0000000017eef7e80002
    生产消息成功: SendResult [sendStatus=0, msgIds=C0A87C06209F0000000017eef7e80003, offsetMsgId=C0A87C0600002A9F00000000000001BC, queueOffset=2, messageQueue=MessageQueue [topic=BBS_SHOP_TOPIC_123, brokerName=broker-bbs, queueId=3]]-C0A87C06209F0000000017eef7e80003
    生产消息成功: SendResult [sendStatus=0, msgIds=C0A87C06209F0000000017eef7e80004, offsetMsgId=C0A87C0600002A9F000000000000029A, queueOffset=3, messageQueue=MessageQueue [topic=BBS_SHOP_TOPIC_123, brokerName=broker-bbs, queueId=0]]-C0A87C06209F0000000017eef7e80004
    生产消息成功: SendResult [sendStatus=0, msgIds=C0A87C06209F0000000017eef7e80005, offsetMsgId=C0A87C0600002A9F0000000000000378, queueOffset=4, messageQueue=MessageQueue [topic=BBS_SHOP_TOPIC_123, brokerName=broker-bbs, queueId=1]]-C0A87C06209F0000000017eef7e80005
    
  • 查看消费 (10s 之后)

    订阅消息,消费[Message=[topic=BBS_SHOP_TOPIC_123, body=Hello XProjectOrder 0, Flag=0, properties=map[CONSUME_START_TIME:1717572747702 DELAY:3 MAX_OFFSET:1 MIN_OFFSET:0 REAL_QID:1 REAL_TOPIC:BBS_SHOP_TOPIC_123 UNIQ_KEY:C0A87C06209F0000000017eef7e80001], TransactionId=], MsgId=C0A87C06209F0000000017eef7e80001, OffsetMsgId=C0A87C0600002A9F0000000000000456,QueueId=1, StoreSize=221, QueueOffset=0, SysFlag=0, BornTimestamp=1717572737685, BornHost=172.30.0.1:46162, StoreTimestamp=1717572747694, StoreHost=192.168.124.6:10911, CommitLogOffset=1110, BodyCRC=407480418, ReconsumeTimes=0, PreparedTransactionOffset=0] 
    订阅消息,消费[Message=[topic=BBS_SHOP_TOPIC_123, body=Hello XProjectOrder 1, Flag=0, properties=map[CONSUME_START_TIME:1717572747706 DELAY:3 MAX_OFFSET:1 MIN_OFFSET:0 REAL_QID:2 REAL_TOPIC:BBS_SHOP_TOPIC_123 UNIQ_KEY:C0A87C06209F0000000017eef7e80002], TransactionId=], MsgId=C0A87C06209F0000000017eef7e80002, OffsetMsgId=C0A87C0600002A9F0000000000000533,QueueId=2, StoreSize=221, QueueOffset=0, SysFlag=0, BornTimestamp=1717572737698, BornHost=172.30.0.1:46162, StoreTimestamp=1717572747701, StoreHost=192.168.124.6:10911, CommitLogOffset=1331, BodyCRC=1867421940, ReconsumeTimes=0, PreparedTransactionOffset=0] 
    订阅消息,消费[Message=[topic=BBS_SHOP_TOPIC_123, body=Hello XProjectOrder 2, Flag=0, properties=map[CONSUME_START_TIME:1717572747710 DELAY:3 MAX_OFFSET:1 MIN_OFFSET:0 REAL_QID:3 REAL_TOPIC:BBS_SHOP_TOPIC_123 UNIQ_KEY:C0A87C06209F0000000017eef7e80003], TransactionId=], MsgId=C0A87C06209F0000000017eef7e80003, OffsetMsgId=C0A87C0600002A9F0000000000000610,QueueId=3, StoreSize=221, QueueOffset=0, SysFlag=0, BornTimestamp=1717572737702, BornHost=172.30.0.1:46162, StoreTimestamp=1717572747704, StoreHost=192.168.124.6:10911, CommitLogOffset=1552, BodyCRC=1984416078, ReconsumeTimes=0, PreparedTransactionOffset=0] 
    INFO[0010] update offset to broker success               MessageQueue="MessageQueue [topic=BBS_SHOP_TOPIC_123, brokerName=broker-bbs, queueId=0]" consumerGroup=BBS_SHOP_GROUP_123 offset=0
    订阅消息,消费[Message=[topic=BBS_SHOP_TOPIC_123, body=Hello XProjectOrder 3, Flag=0, properties=map[CONSUME_START_TIME:1717572747713 DELAY:3 MAX_OFFSET:1 MIN_OFFSET:0 REAL_QID:0 REAL_TOPIC:BBS_SHOP_TOPIC_123 UNIQ_KEY:C0A87C06209F0000000017eef7e80004], TransactionId=], MsgId=C0A87C06209F0000000017eef7e80004, OffsetMsgId=C0A87C0600002A9F00000000000006ED,QueueId=0, StoreSize=221, QueueOffset=0, SysFlag=0, BornTimestamp=1717572737706, BornHost=172.30.0.1:46162, StoreTimestamp=1717572747709, StoreHost=192.168.124.6:10911, CommitLogOffset=1773, BodyCRC=21035480, ReconsumeTimes=0, PreparedTransactionOffset=0] 
    INFO[0010] update offset to broker success               MessageQueue="MessageQueue [topic=BBS_SHOP_TOPIC_123, brokerName=broker-bbs, queueId=1]" consumerGroup=BBS_SHOP_GROUP_123 offset=1
    INFO[0010] update offset to broker success               MessageQueue="MessageQueue [topic=BBS_SHOP_TOPIC_123, brokerName=broker-bbs, queueId=2]" consumerGroup=BBS_SHOP_GROUP_123 offset=1
    INFO[0010] update offset to broker success               MessageQueue="MessageQueue [topic=BBS_SHOP_TOPIC_123, brokerName=broker-bbs, queueId=3]" consumerGroup=BBS_SHOP_GROUP_123 offset=1
    订阅消息,消费[Message=[topic=BBS_SHOP_TOPIC_123, body=Hello XProjectOrder 4, Flag=0, properties=map[CONSUME_START_TIME:1717572747717 DELAY:3 MAX_OFFSET:2 MIN_OFFSET:0 REAL_QID:1 REAL_TOPIC:BBS_SHOP_TOPIC_123 UNIQ_KEY:C0A87C06209F0000000017eef7e80005], TransactionId=], MsgId=C0A87C06209F0000000017eef7e80005, OffsetMsgId=C0A87C0600002A9F00000000000007CA,QueueId=1, StoreSize=221, QueueOffset=1, SysFlag=0, BornTimestamp=1717572737709, BornHost=172.30.0.1:46162, StoreTimestamp=1717572747711, StoreHost=192.168.124.6:10911, CommitLogOffset=1994, BodyCRC=522480763, ReconsumeTimes=0, PreparedTransactionOffset=0]
    
  • 以上就是生产和消费的主要过程

效果

总结

  • 以上是简单的环境搭建和生产消息,以及延迟消费消息的 demo 示例
  • 实际场景中,结合以上demo,对一些异步发送消息的场景进行灵活运用和升级

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1792711.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

使用System-Verilog实现FPGA基于DE2-115开发板驱动HC_SR04超声波测距模块|集成蜂鸣器,led和vga提示功能

文章目录 前言一、SystemVerilog——下一代硬件设计语言1.1 语言基础2.2 面向对象编程1.3 接口&#xff08;Interfaces&#xff09;1.4 程序包&#xff08;Packages&#xff09;1.5 数据结构1.6 随机化&#xff08;Randomization&#xff09;1.7 并发性和时序控制1.8 功能增强1…

小程序丨最大填表限制如何开启?

老师在新建填表时&#xff0c;希望设置最大数量限制&#xff0c;若填表达到限制&#xff0c;后续的学生将不能继续提交填表。 通过开启【表格最大限制】功能即可实现&#xff0c;下面就来教大家如何制作吧。 &#x1f50e;如何开启表格最大限制功能&#xff1f; 按照常规流程…

Mac OS 用户开启 8080 端口

开启端口 sudo vim /etc/pf.conf # 开放对应端口 pass out proto tcp from any to any port 8080 # 刷新配置文件 sudo pfctl -f /etc/pf.conf sudo pfctl -e获取本机ip地址 ifconfig en0 | grep inet | grep -v inet6 | awk {print $2}访问指定端口

【UML用户指南】-03-UML的14种图

目录 1、结构图 1、类图&#xff08;class diagram&#xff09; 2、对象图&#xff08;object diagram&#xff09; 3、构件图 &#xff08;component diagram&#xff09; 4、组合结构图 5、包图&#xff08;package diagram&#xff09; 6、部署图&#xff08;deploym…

【LeetCode算法】第100题:相同的树

目录 一、题目描述 二、初次解答 三、官方解法 四、总结 一、题目描述 二、初次解答 1. 思路&#xff1a;二叉树的先序遍历。采用递归的先序遍历方法&#xff0c;首先访问根节点若不同则返回false&#xff0c;其次访问左子树和右子树。在访问左右子树时&#xff0c;需要注意…

gkuubibiih

c语言中的小小白-CSDN博客c语言中的小小白关注算法,c,c语言,贪心算法,链表,mysql,动态规划,后端,线性回归,数据结构,排序算法领域.https://blog.csdn.net/bhbcdxb123?spm1001.2014.3001.5343 给大家分享一句我很喜欢我话&#xff1a; 知不足而奋进&#xff0c;望远山而前行&am…

Kubernetes集群安装部署(Anolis OS 8)

本次 Kubernetes 集群是基于 kubeadm 进行部署的&#xff0c;操作系统采用的 Anolis OS 8.9。 主机IP配置k8s192.168.211.112核&#xff0c;4G&#xff0c;20G硬盘k8s2192.168.211.122核&#xff0c;2G&#xff0c;20G硬盘k8s3192.168.211.132核&#xff0c;2G&#xff0c;20G…

【Vue】异步更新 $nextTick

文章目录 一、引出问题二、解决方案三、代码实现 一、引出问题 需求 编辑标题, 编辑框自动聚焦 点击编辑&#xff0c;显示编辑框让编辑框&#xff0c;立刻获取焦点 即下图上面结构隐藏&#xff0c;下面结构显示&#xff0c;并且显示的时候让它自动聚焦。 代码如下 问题 “…

带Tkinter界面的小验证加密程序——Python课程作业案例分析

Python课程作业案例分析 作业题目要求实现结果动图题目分析主要库介绍和说明实现源码及注释作业题目要求 某个公司采用公用电话传递数据,数据是四位的整数,在传递过程中是加密的。加密规则如下:每位数字都加上5,然后用和除以10的余数代替该数字,再将第一位和第四位交换,…

持续监控和优化的简单介绍

DevOps 监控提供了有关生产环境状况的全面且最新的信息&#xff0c;以及有关其服务、基础设施和应用程序的详细信息。通过从日志和指标中收集数据&#xff0c;您可以在软件开发生命周期的每个步骤中监控合规性和性能。 监控不仅仅针对生产问题&#xff0c;它涵盖了规划、开发、…

python --对象属性、类属性、类方法和静态方法

对象属性和类属性 --掌握--对象属性 目标掌握对象属性的使用 对象属性&#xff0c;有时也称实例属性、普通属性、公有属性 、或者直接叫属性 在类内部&#xff0c;访问对象属性语法&#xff1a; self.对象属性名 在类外部&#xff0c;访问对象属性语法&#xff1a; 对象名.对…

关系代数与规范化

本文是根据自己的理解&#xff0c;结合实践整理所得&#xff0c;有兴趣的可以参考学习。

掌握Django文件处理:一步步构建上传功能

创建模型 首先先进入我们的testsite项目下&#xff0c;打开members/models.py文件&#xff0c;先添加我们保存文件的数据模型&#xff1a; class Document(models.Model):name models.CharField(max_length255)file models.FileField(upload_touploads/) # uploads/ 是文件…

大模型基架:Transformer如何做优化?

大模型的基础模式是transformer&#xff0c;所以很多芯片都实现先专门的transformer引擎来加速模型训练或者推理。本文将拆解Transformer的算子组成&#xff0c;展开具体的数据流分析&#xff0c;结合不同的芯片架构实现&#xff0c;分析如何做性能优化。 Transformer结构 tr…

CTF本地靶场搭建——GZ:CTF安装

GZ:CTF 项目地址:https://gitcode.com/GZTimeWalker/GZCTF GZCTF 是一款开源的网络安全竞技平台&#xff0c;由开发者GZTimeWalker维护。该项目旨在提供一个环境&#xff0c;让网络安全爱好者和专业人士能够实践他们的技能&#xff0c;通过解决各种安全问题&#xff08;即“…

流量分析——一、蚁剑流量特征

君衍. 一、Webshell特征流量分析二、环境介绍三、使用Wireshark进行流量分析1、环境说明2、HTTP追踪流分析3、蚁剑请求体中代码块解读 四、使用BurpSurite进行流量分析1、环境配置2、抓包分析 六、总结 一、Webshell特征流量分析 对于重保、护网等攻防演练的防守方来说&#x…

Python03:python代码初体验2

1、变量命名规范 1&#xff09;字母&#xff08;Unicode字符&#xff09;、数字、下划线&#xff0c;不能使用特殊字符&#xff0c;数字不能开头 2&#xff09;变量名是区分大小写的&#xff08;大小写敏感&#xff0c;x和X是两个不同的变量名&#xff09; 3&#xff09;不能使…

D-Day 上海站回顾丨以科技赋能量化机构业务

5月31日下午&#xff0c;DolphinDB 携手光大证券&#xff0c;在上海成功举办 D-Day 行业交流会。三十余位来自私募机构的核心策略研发、量化交易员、数据分析专家们齐聚现场&#xff0c;深入交流量化投研交易过程中的经验、挑战及解决方案。 DolphinDB 赋能机构业务平台 来自光…

【一百】【算法分析与设计】N皇后问题常规解法+位运算解法

N皇后问题 链接&#xff1a;登录—专业IT笔试面试备考平台_牛客网 来源&#xff1a;牛客网 题目描述 给出一个nnn\times nnn的国际象棋棋盘&#xff0c;你需要在棋盘中摆放nnn个皇后&#xff0c;使得任意两个皇后之间不能互相攻击。具体来说&#xff0c;不能存在两个皇后位于同…

K8s Pod的QoS类

文章目录 OverviewPod的QoS分类Guaranteed1.如何将 Pod 设置为保证Guaranteed2. Kubernetes 调度器如何管理Guaranteed类的Pod Burstable1. 如何将 Pod 设置为Burstable2.b. Kubernetes 调度程序如何管理 Burstable Pod BestEffort1. 如何将 Pod 设置为 BestEffort2. Kubernete…