RocketMQ5.0 生产者

news2025/1/5 8:47:18

生产者消息类型:

延迟队列的生产者

package main

import (
	"context"
	"fmt"
	"github.com/apache/rocketmq-clients/golang/v5"
	"github.com/apache/rocketmq-clients/golang/v5/credentials"
	errgroup2 "golang.org/x/sync/errgroup"
	"log"
	"os"
	"strconv"
	"time"
)

const (
	Topic     = "DelayTopic"
	GroupName = "testG"
	Endpoint  = "localhost:8081"
	Region    = "xxxxxx"
	AccessKey = "xxxxxx"
	SecretKey = "xxxxxx"
)

func main() {
	os.Setenv("mq.consoleAppender.enabled", "true")
	golang.ResetLogger()
	// new producer instance

	producer, err := golang.NewProducer(&golang.Config{
		Endpoint:    Endpoint,
		Credentials: &credentials.SessionCredentials{},
	},
		golang.WithTopics(Topic),
	)
	if err != nil {
		log.Fatal(err)
	}
	// start producer
	err = producer.Start()
	if err != nil {
		log.Fatal(err)
	}
	// gracefule stop producer
	defer producer.GracefulStop()
	var wg = errgroup2.Group{}
	wg.SetLimit(10)
	for i := 0; i < 1000; i++ {
		wg.Go(func() error {
			msg := &golang.Message{
				Topic: Topic,
				Body:  []byte("this is a message : " + strconv.Itoa(i) + time.Now().Format(time.DateTime)),
			}
			// set keys and tag
			msg.SetKeys("a", "b")
			msg.SetTag("ab")
			msg.SetDelayTimestamp(time.Now().Add(time.Second * 10))

			// send message in sync
			resp, err := producer.Send(context.TODO(), msg)
			if err != nil {
				log.Fatal(err)
			}

			for i := 0; i < len(resp); i++ {
				fmt.Printf("%#v\n", resp[i])
			}
			return nil
		})

		// wait a moment
		time.Sleep(time.Second * 1)
	}
	wg.Wait()
	time.Sleep(time.Minute * 10)
}

设置topic的。message.type                                                                                                                                     docker exec -it rmqnamesrv /bin/bash       

sh mqadmin updateTopic -c DefaultCluster -t DelayTopic -n 127.0.0.1:9876 -a +message.type=DELAY


sh mqadmin updateTopic -n <nameserver_address> -t <topic_name> -c <cluster_name> -a +message.type=DELAY

   消费者

package main

import (
	"context"
	"fmt"
	"log"
	"os"
	"time"

	"github.com/apache/rocketmq-clients/golang"
	"github.com/apache/rocketmq-clients/golang/credentials"
)

const (
	Topic     = "DelayTopic"
	GroupName = "testG"
	Endpoint  = "localhost:8081"
)

var (
	// maximum waiting time for receive func
	awaitDuration = time.Second * 5
	// maximum number of messages received at one time
	maxMessageNum int32 = 16
	// invisibleDuration should > 20s
	invisibleDuration = time.Second * 20
	// receive messages in a loop
)

func main() {
	// log to console
	os.Setenv("mq.consoleAppender.enabled", "true")
	golang.ResetLogger()
	// new simpleConsumer instance
	simpleConsumer, err := golang.NewSimpleConsumer(&golang.Config{
		Endpoint:      Endpoint,
		Credentials:   &credentials.SessionCredentials{},
		ConsumerGroup: "string",
	},
		golang.WithAwaitDuration(awaitDuration),
		golang.WithSubscriptionExpressions(map[string]*golang.FilterExpression{
			Topic: golang.SUB_ALL,
		}),
	)
	if err != nil {
		log.Fatal(err)
	}
	// start simpleConsumer
	err = simpleConsumer.Start()
	if err != nil {
		log.Fatal(err)
	}
	// gracefule stop simpleConsumer
	defer simpleConsumer.GracefulStop()

	go func() {
		defer func() {
			if err := recover(); err != nil {
				fmt.Println(err)
			}
		}()
		for {
			fmt.Println("start recevie message")
			mvs, err := simpleConsumer.Receive(context.TODO(), maxMessageNum, invisibleDuration)
			if err != nil {
				fmt.Println(err)
			}
			// ack message
			for _, mv := range mvs {
				simpleConsumer.Ack(context.TODO(), mv)
				fmt.Println(string(mv.GetBody()) + "  " + time.Now().Format(time.DateTime))
			}
			fmt.Println("wait a moment")
			fmt.Println()
			time.Sleep(time.Second * 3)
		}
	}()
	// run for a while
	time.Sleep(time.Minute * 20)
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1971946.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

实现函数返回字符的种类

文章目录 一、题目二、思路三、代码实现 提示&#xff1a;以下是本篇文章正文内容&#xff0c;下面案例可供参考 一、题目 例如&#xff1a; 分别输入&#xff1a;a B 6分别输出&#xff1a;# * &#xff1f; 二、思路 第一步 获取键盘输入的字符&#xff0c;键盘输入的话…

服务器 Linux 的文件系统初探

好久没更新文章了&#xff0c;最近心血来潮&#xff0c;重新开始知识的累计&#xff0c;做出知识的沉淀~ 万事万物皆文件 文件系统&#xff1a;操作系统如何管理文件&#xff0c;内部定义了一些规则或者定义所以在 Linux 中所有的东西都是以文件的方式进行操作在 Linux 中&am…

Linux中yum、rpm、apt-get、wget的区别,yum、rpm、apt-get常用命令,CentOS、Ubuntu中安装wget

文章目录 一、常见Linux发行版本二、Linux中yum、rpm、apt-get、wget的区别2.1 yum2.2 rpm2.3 apt-get2.4 wget2.5 总结 三、CentOS中yum的作用3.1 yum清空缓存列表3.2 yum显示信息3.3 yum搜索、查看3.4 yum安装3.5 yum删除、卸载程序3.6 yum包的升级、降级 四、Ubuntu中apt-ge…

线段树、贪心与推销员

[NOIP 2015] 推销员 - 洛谷 核心&#xff1a;利用线段树处理贪心内容。建两个线段树维护两端。 #include<bits/stdc.h> using namespace std; int n; int d[100100]; int t[1000100]; int deep;//当前最深 int ans; struct node{int id,mx; }; struct sgt{int a[10001…

8.1-java+tomcat环境的配置+代理

一、回顾 1.安装nodejs&#xff0c;这是一个jdk一样的软件运行环境 yum -y list installed|grep epel yum -y install nodejs node -v 2.下载对应的nodejs软件npm yum -y install npm npm -v npm set config .....淘宝镜像 3.安装vue/cli command line interface 命令行…

技术详解:互联网医院系统源码与医保购药APP的整合开发策略

本篇文章&#xff0c;小编将从系统架构、数据安全、用户体验和技术实现等方面详细探讨互联网医院系统与医保购药APP的整合开发策略。 一、系统架构 1.模块化设计 互联网医院系统与医保购药APP的整合需要采用模块化设计。 2.微服务架构 每个功能模块作为一个独立的微服务&am…

AI大模型需要什么样的数据?

数据将是未来AI大模型竞争的关键要素 人工智能发展的突破得益于高质量数据的发展。例如&#xff0c;大型语言模型的最新进展依赖于更高质量、更丰富的训练数据集&#xff1a;与GPT-2相比&#xff0c;GPT-3对模型架构只进行了微小的修改&#xff0c;但花费精力收集更大的高质量…

【pyhton】Python中zip用法详细解析与应用实战

✨✨ 欢迎大家来到景天科技苑✨✨ &#x1f388;&#x1f388; 养成好习惯&#xff0c;先赞后看哦~&#x1f388;&#x1f388; &#x1f3c6; 作者简介&#xff1a;景天科技苑 &#x1f3c6;《头衔》&#xff1a;大厂架构师&#xff0c;华为云开发者社区专家博主&#xff0c;…

请你学习:前端布局1 - CSS盒模型(Box Model)是CSS布局的核心概念之一

CSS盒模型&#xff08;Box Model&#xff09;是CSS布局的核心概念之一&#xff0c;它描述了如何对文档中的元素进行布局和尺寸计算。每个元素都会生成一个矩形的盒子&#xff0c;这个盒子由几个部分组成&#xff0c;包括内容&#xff08;content&#xff09;、内边距&#xff0…

解决Tomcat控制台打印日志出现乱码

1.进入安装目录&#xff0c;找到conf这个文件夹并打开 2.找到logging.properties&#xff0c;用记事本打开 3.找到java.util.logging.ConsoleHandler.encoding&#xff0c;将UTF-8修改为GBK

红酒与时尚秀场:品味潮流与风尚

在光影交织的时尚秀场上&#xff0c;每一道流光溢彩都诠释着潮流与风尚的碰撞。当定制红酒洒派红酒&#xff08;Bold & Generous&#xff09;与时尚秀场相遇&#xff0c;两者交织出一段优雅而充满魅力的故事&#xff0c;领着我们品味潮流之巅的风尚。 一、红酒与时尚的初遇…

【MySQL进阶篇】管理

1、系统数据库 MySQL数据库安装完成之后&#xff0c;自带以下四个数据库&#xff0c;具体作用如下&#xff1a; 数据库含义mysql存储MySQL服务器正常运行所需要的各种信息&#xff08;时区、主从、用户、权限等&#xff09;information_schema提供了访问数据库元数据的各种表…

摄像头防抖中的IMU传感器是什么吗?

摄像头防抖中的IMU传感器是什么吗&#xff1f; 在现代摄影与摄像技术的飞速发展中&#xff0c;防抖功能作为提升画质与用户体验的关键技术之一&#xff0c;得到了广泛的应用与重视。IMU&#xff08;Inertial Measurement Unit&#xff0c;惯性测量单元&#xff09;传感器作为防…

豆包大模型视觉、语音能力升级!文生图更懂“国风”,TTS“拿捏”情绪

2024 火山引擎 AI 创新巡展・成都站于近日正式举办。活动现场发布了豆包・图生图模型&#xff0c;以及升级版的豆包・文生图模型、豆包・语音合成模型、豆包・声音复刻模型。 本文介绍了升级版文生图、语音合成、声音复刻模型特征&#xff0c;包括图像生成方面更深刻理解主客体…

3千米以上音视频键鼠延长解决方案:KVM光纤延长器

KVM光纤延长器​​​​​​​是什么&#xff1f; KVM光纤延长器是一种使用光纤来传输键盘、视频和鼠标&#xff08;KVM&#xff09;信号的设备&#xff0c;由发送端和接收端组成&#xff0c;一般成对使用。它可以让用户在远离电脑的地方如同在本地一样方便快捷的操作电脑。 KV…

Java码农人生使用手册——类和对象

一、类的定义和使用 类是用来对一个实体&#xff08;对象&#xff09;来进行描述的。 注意&#xff1a; 类名采用大驼峰定义 二、类的实例化 定义了一个类&#xff0c;就相当于在计算机中定义了一个新的类型&#xff0c;用类的类型创建对象的过程&#xff0c;称为类的实例化。 …

《从U-Net到Transformer:深度模型在医学图像分割中的应用综述》论文阅读

网络首发地址&#xff1a;https://link.cnki.net/urlid/51.1307.tp.20231026.1648.002 摘要&#xff1a; U-Net以卷积神经网络&#xff08;CNN&#xff09;为主干&#xff0c;其易于优化促使在医学图像分割领域的发展&#xff0c; 但只擅长获取局部特征&#xff0c;缺乏长期相…

大模型三种模式Embedding、copilot、Agent

大模型的三种应用模式——Embedding、Copilot、Agent——代表了不同级别的智能化和自动化程度&#xff0c;以及与人类用户的交互方式。下面是每种模式的具体解释&#xff1a; 嵌入模式&#xff08;Embedding Mode&#xff09; 定义&#xff1a;在嵌入模式中&#xff0c;大模型…

Tomcat部署——个人笔记

Tomcat部署——个人笔记 文章目录 [toc]简介安装配置文件WEB项目的标准结构WEB项目部署IDEA中开发并部署运行WEB项目 本学习笔记参考尚硅谷等教程。 简介 Apache Tomcat 官网 Tomcat是Apache 软件基金会&#xff08;Apache Software Foundation&#xff09;的Jakarta 项目中…

虚拟机Windows server忘记密码解决方法

原理 utilman.exe是Windows辅助工具管理器程序&#xff0c;‌虽然它本身不是一个关键的系统进程&#xff0c;‌但通过修改这个文件&#xff0c;‌用户可以访问一些有用的UI设置。‌在某些情况下&#xff0c;‌比如忘记密码需要重置时&#xff0c;‌通过修改utilman.exe文件为c…