C# redis通过stream实现消息队列以及ack机制

news2025/1/12 21:04:07

redis实现

查看redis版本

redis需要>5.0
Stream 是 Redis 5.0 引入的一种专门为消息队列设计的数据类型,Stream 是一个包含 0 个或者多个元素的有序队列,这些元素根据 ID 的大小进行有序排列。

它实现了大部分消息队列的功能:

  • 消息 ID 系列化生成;
  • 消息遍历;
  • 消息的阻塞和非阻塞读;
  • Consumer Groups 消费组;
  • ACK 确认机制。
  • 支持多播。

本次主要实现基本的消息发送接受确认,消费组有需要的可以看参考的文章

info

在这里插入图片描述

插入消息

XADD streamName id field value [field value ...]
# 消息队列名称后面的 「*」 ,表示让 Redis 为插入的消息自动生成唯一 ID,当然也可以自己定义。
# 消息 ID 由两部分组成:当前毫秒内的时间戳; 顺序编号。从 0 为起始值,用于区分同一时间内产生的多个命令
XADD queue01 * name wjl age 25 gender male

在这里插入图片描述

读取消息

XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]
XREAD COUNT 1 BLOCK 0 STREAMS queue01  0-0
# 指定消费组在 Stream 中的起始 ID,它决定了消费者组从哪个 ID 之后开始读取消息,0-0 从第一条开始读取, $ 表示从最后一条向后开始读取,只接收新消息。
# 如果想使用 XREAD 进行顺序消费,每次读取后要记住返回的消息 ID,下次调用 XREAD 就将上一次返回的消息 ID 作为参数传递到下一次调用就可以继续消费后续的消息了。

在这里插入图片描述
这里只是开胃菜,通过 XREAD 读取的数据其实并没有被删除,当重新执行 XREAD COUNT 1 BLOCK 0 STREAMS queue01 0-0指令的时候又会重新读取到。

创建消费组

# Stream 通过 XGROUP CREATE 指令创建消费组 (Consumer Group),需要传递起始消息 ID 参数用来初始化 last_delivered_id 变量。
# 随便再插入一些数据
XADD queue01 * name zhangsan age 52 gender male
XADD queue01 * name lisi age 34 gender male
XADD queue01 * name xiaomei age 24 gender famale
# 创建消费组的指令
# 格式
XGROUP CREATE stream group start_id
# stream:指定队列的名字;
# group:指定消费组名字;
# start_id:指定消费组在 Stream 中的起始 ID,它决定了消费者组从哪个 ID 之后开始读取消息,0-0 从第一条开始读取, $ 表示从最后一条向后开始读取,只接收新消息。
# MKSTREAM:默认情况下,XGROUP CREATE命令在目标流不存在时返回错误。可以使用可选MKSTREAM子命令作为 之后的最后一个参数来自动创建流。

# 新建group01消费组
XGROUP CREATE queue01 group01 0-0 MKSTREAM

读取群组消息

XREADGROUP GROUP groupName consumerName [COUNT n] [BLOCK ms] STREAMS streamName [stream ...] id [id ...]

XREADGROUP GROUP group01 consumer01 COUNT 1 BLOCK 0 STREAMS queue01 >
# >:命令的最后参数 >,表示从尚未被消费的消息开始读取;
# BLOCK 0:表示阻塞读取,要是大于0就是等待多少毫秒

在这里插入图片描述

如果消息队列中的消息被消费组的一个消费者消费了,这条消息就不会再被这个消费组的其他消费者读取到。

在这里插入图片描述

查看已读未确认消息

XREADGROUP GROUP groupName consumerName
XPENDING queue01 group01 

在这里插入图片描述

1 # 未读消息条数
1696822787364-0 #所有消费者读取的消息最小和最大 ID;这是最小
1696822787364-0 #所有消费者读取的消息最小和最大 ID;这是最大
consumer01
1

查看消费者读取了哪些数据

XPENDING queue01 group01 - + 10 consumer01

在这里插入图片描述

确认消息

XACK key group-key ID [ID ...]

XACK queue01 group01 1696822787364-0

在这里插入图片描述
再次查询未读消息

XPENDING queue01 group01 
XREADGROUP GROUP group01 consumer01 COUNT 1 BLOCK 0 STREAMS queue01 >

在这里插入图片描述
在这里插入图片描述

C#操作redis实现

使用FreeRedis类库,熟悉了上面的流程,直接上代码

using FreeRedis;

namespace RedisMQStu01
{
    internal class Program
    {
        async static Task Main(string[] args)
        {
            var cli = new RedisClient("127.0.0.1:6379,password=,defaultDatabase=99");
            var queueName = "queue01";//队列的名字
            var groupName = "group01";//读取队列的群组的名字
            var consumerName = "consumer01";//消费者的名字
            //添加数据
            await cli.XAddAsync(queueName, "name", "wjl", "age", 25, "gender", "male");
            await cli.XAddAsync(queueName, "name", "zhangsan", "age", 52, "gender", "male");
            await cli.XAddAsync(queueName, "name", "lisi", "age", 34, "gender", "male");
            await cli.XAddAsync(queueName, "name", "xiaomei", "age", 24, "gender", "famale");
            //创建群组,如果数据存在则不需要执行了,第一次需要执行
            await cli.XGroupCreateAsync(queueName, groupName, id: "0-0", MkStream: true);
            //读取群组消息
            var ids = new Dictionary<string, string>();
            ids.Add("queue01", ">");
            var result = await cli.XReadGroupAsync(groupName, consumerName,
                    1, 0, noack: false, ids);
            //查看已读未确认的消息
            var unReadResults = await cli.XPendingAsync(queueName, groupName);
            await Console.Out.WriteLineAsync($"未读消息条数为:{unReadResults.count}");
            foreach (var item in result)
            {
                await Console.Out.WriteLineAsync(item.key);//群组名字
                foreach (var entry in item.entries)
                {
                    await Console.Out.WriteLineAsync($"\t{entry.id}");//消息队列id
                    await Console.Out.WriteAsync($"\t");
                    foreach (var field in entry.fieldValues)
                    {
                        await Console.Out.WriteAsync($"\t{field.ToString()}");
                    }
                    await Console.Out.WriteLineAsync();
                    //确认消息
                    await cli.XAckAsync(queueName,groupName, entry.id);
                }
            }
            await Console.Out.WriteLineAsync("完成");
        }
    }
}

上面的代码是生产者和消费者在一块,不满足生产环境要求,因为生产环境大多需要分开,生产者只负责生产,消费者只负责消费

生产者

using FreeRedis;

namespace RedisMQProductor01
{
    internal class Program
    {
        /// <summary>
        /// redis消息队列的生产者
        /// </summary>
        /// <param name="args"></param>
        /// <returns></returns>
        async static Task Main(string[] args)
        {
            var cli = new RedisClient("127.0.0.1:6379,password=,defaultDatabase=99");
            var queueName = "queue01";//队列的名字
            //添加数据
            await cli.XAddAsync(queueName, "name", "wjl", "age", 25, "gender", "male");
            await cli.XAddAsync(queueName, "name", "zhangsan", "age", 52, "gender", "male");
            await cli.XAddAsync(queueName, "name", "lisi", "age", 34, "gender", "male");
            await cli.XAddAsync(queueName, "name", "xiaomei", "age", 24, "gender", "famale");
            await Console.Out.WriteLineAsync("生产者添加数据完成");
        }
    }
}

消费者

using FreeRedis;

namespace RedisMQConsumer01
{
    /// <summary>
    /// redis消息队列的消费者
    /// </summary>
    internal class Program
    {
        async static Task Main(string[] args)
        {
            var cli = new RedisClient("127.0.0.1:6379,password=,defaultDatabase=99");
            var queueName = "queue01";//队列的名字
            var groupName = "group01";//读取队列的群组的名字
            var consumerName = "consumer01";//消费者的名字
            //如果数据存在则不需要执行了,第一次需要执行
            var info = await cli.XInfoGroupsAsync(queueName);
            if (info == null || info.Length < 1)
            {
                //创建群组
                await cli.XGroupCreateAsync(queueName, groupName, id: "0-0", MkStream: true);
            }
            //读取群组消息
            var ids = new Dictionary<string, string>();
            ids.Add("queue01", ">");
            //block的值是0表示无限等待
            var result = await cli.XReadGroupAsync(groupName, consumerName, 1, 0, noack: false, ids);
            while (true)
            {
                if (result != null && result.Length > 0)
                {
                    foreach (var item in result)
                    {
                        await Console.Out.WriteLineAsync(item.key);//群组名字
                        foreach (var entry in item.entries)
                        {
                            await Console.Out.WriteLineAsync($"\t{entry.id}");//消息队列id
                            await Console.Out.WriteAsync($"\t");
                            foreach (var field in entry.fieldValues)
                            {
                                await Console.Out.WriteAsync($"\t{field.ToString()}");
                            }
                            await Console.Out.WriteLineAsync();
                            //确认消息
                            await cli.XAckAsync(queueName, groupName, entry.id);
                        }
                    }
                    await Console.Out.WriteLineAsync("===============本次处理完毕===============");
                }
                //继续等待
                result = await cli.XReadGroupAsync(groupName, consumerName, 1, 0, noack: false, ids);
            }
        }
    }
}

先启动生产者在启动消费者查看效果
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

方法改善

改善之后可以先启动消费者然后等待生产者投递数据即可

消费者

using FreeRedis;
using Newtonsoft.Json;
using SqlSugar;

namespace CelueStu02
{
    /// <summary>
    /// 备份策略消费者
    /// </summary>
    internal class Program
    {
        async static Task Main(string[] args)
        {
            var cli = new RedisClient("127.0.0.1:6379,password=,defaultDatabase=99");
            var queueName = "queue01";//队列的名字
            var groupName = "group01";//读取队列的群组的名字
            var consumerName = "consumer01";//消费者的名字
            try
            {
                var streamInfo = cli.XInfoStream(queueName);
            }
            catch
            {
                await cli.XAddAsync(queueName, "student", "");
            }

            //如果数据存在则不需要执行了,第一次需要执行
            var info = await cli.XInfoGroupsAsync(queueName);
            if (info == null || info.Length < 1)
            {
                //创建群组
                await cli.XGroupCreateAsync(queueName, groupName, id: "0-0", MkStream: true);
            }
            //读取群组消息
            var ids = new Dictionary<string, string>();
            ids.Add("queue01", ">");
            //block的值是0表示无限等待
            var result = await cli.XReadGroupAsync(groupName, consumerName, 1, 0, noack: false, ids);
            ConnectionConfig connectionConfig = new ConnectionConfig()
            {
                ConnectionString = "",//自己写数据库链接字符串
                IsAutoCloseConnection = true,
                DbType = DbType.SqlServer
            };
            using SqlSugarClient db = new SqlSugarClient(connectionConfig);
            //初始化表格
            db.CodeFirst.InitTables(typeof(Student));

            while (true)
            {
                if (result != null && result.Length > 0)
                {
                    foreach (var item in result)
                    {
                        await Console.Out.WriteLineAsync(item.key);//群组名字
                        foreach (var entry in item.entries)
                        {
                            await Console.Out.WriteLineAsync($"\t{entry.id}");//消息队列id
                            for (int i = 0; i < entry.fieldValues.Length; i++)
                            {
                                var field = entry.fieldValues[i];
                                if (field.ToString() == "student")
                                {
                                    var studentListJson = entry.fieldValues[i + 1]?.ToString() ?? "";
                                    if (string.IsNullOrWhiteSpace(studentListJson))
                                    {
                                        continue;
                                    }
                                    var students = JsonConvert.DeserializeObject<List<Student>>(studentListJson);
                                    await db.Storageable(students).ExecuteCommandAsync();
                                }
                            }
                            //确认消息
                            await cli.XAckAsync(queueName, groupName, entry.id);
                        }
                    }
                    await Console.Out.WriteLineAsync("===============本次处理完毕===============");
                }
                //继续等待
                result = await cli.XReadGroupAsync(groupName, consumerName, 1, 0, noack: false, ids);
            }
        }
    }
}

生产者

using FreeRedis;
using Newtonsoft.Json;
using SqlSugar;

namespace CelueStu01
{
    /// <summary>
    /// 备份策略生产者
    /// </summary>
    internal class Program
    {
        async static Task Main(string[] args)
        {
            var cli = new RedisClient("127.0.0.1:6379,password=,defaultDatabase=99");
            var queueName = "queue01";//队列的名字
            var perProcessNumber = 1000;//每次处理的数据条数
            int totalPage = 0;//总页码数
            ConnectionConfig connectionConfig = new ConnectionConfig()
            {
                ConnectionString = "",
                IsAutoCloseConnection = true,
                DbType = DbType.SqlServer
            };
            using (SqlSugarClient db = new SqlSugarClient(connectionConfig))
            {
                //初始化表格
                db.CodeFirst.InitTables(typeof(Student));
                do
                {
                    int count = await db.Queryable<Student>().CountAsync();
                    totalPage = count % perProcessNumber == 0 ? count / perProcessNumber : (count / perProcessNumber) + 1;
                    var students = await db.Queryable<Student>().ToPageListAsync(totalPage, perProcessNumber);
                    //批量发送,redis频繁写入会报rdb错误,限制一下写入频率
                    await cli.XAddAsync(queueName, "student", JsonConvert.SerializeObject(students));
                    List<int> deleteStudents = students.Select(p => p.Id).ToList();
                    if (deleteStudents.Any())
                    {
                        //批量删除
                        await db.Deleteable<Student>().Where(p => deleteStudents.Contains(p.Id)).ExecuteCommandAsync();
                    }
                    totalPage -= 1;
                    //Thread.Sleep(2000);
                } while (totalPage > 0);
            }
            await Console.Out.WriteLineAsync("生产者添加数据完成");
        }
    }
}

参考

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1076101.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Vue路由进阶--VueRouter声明式导航

Vue路由进阶–VueRouter声明式导航 文章目录 Vue路由进阶--VueRouter声明式导航1、声明式导航1.1、导航链接1.2、高亮类名1.3、跳转传参1.4、动态路由参数可选符 1、声明式导航 1.1、导航链接 需求&#xff1a;实现导航高亮效果 vue-router提供了一个全局组件router-link(取…

【数据库——MySQL(实战项目1)】(1)图书借阅系统

目录 1. 简述2. 功能3. 数据库结构设计3.1 绘制 E-R 图3.2 创建数据库3.3 创建表3.4 插入表数据 1. 简述 经过前期的学习&#xff0c;我们已经掌握数据库基础操作&#xff0c;因此是时候来做一个实战项目了——图书借阅系统。对于图书借阅系统&#xff0c;相信大家不难想到至少…

Git Stash:临时保存和切换工作状态的利器

Git是我们日常工作中不可或缺的版本控制系统。它提供了许多强大的功能&#xff0c;其中之一是Git Stash&#xff08;暂存&#xff09;。Git Stash可以帮助我们在切换分支或保存未完成的工作时&#xff0c;临时保存当前的修改&#xff0c;以便稍后重新应用。本文将介绍Git Stash…

零信任沙盒,加密沙盒,防泄密沙盒

场景描述 随着云计算、移动互联、物联网等新技术的发展&#xff0c;传统的安全边界变得越来越模糊&#xff0c;访问控制模式局限性也越来越明显。企业需满足员工在任意时间、地点对企业内部进行访问的需求&#xff1b;服务器之间各自为界、相互独立&#xff0c;缺乏统一的安全…

Docker-consul容器服务更新与发现

目录 一、consul简介 1、什么是服务注册与发现 2、什么是consul 3、consul的关键特性 二、consul部署 1、consul服务器部署 1.1 建立consul服务 2、查看集群信息 3、通过http api 获取集群信息 三、registrator部署 1、安装Gliderlabs/Registrator 2、测试服务发现…

QT项目打包脚本

QT项目打包脚本 项目修改频繁&#xff0c;肯定不能手动在开始&#xff0c;菜单&#xff0c;找到相关环境&#xff0c;再输入windeployqt 打包。这里提供一个脚本用于打包。 按Wins&#xff0c;输入CMD D:# 创建打包用的目录 md MyProjectcd D:\MyProject# 拷贝文件 copy D:…

Redis(四)多级缓存

文章目录 一、传统缓存存在的问题二、多级缓存方案三、JVM进程缓存案例演示&#xff1a; 四、Lua语法入门Lua语言入门 五、多级缓存&#xff08;一&#xff09;安装OpenResty&#xff08;二&#xff09;OpenResty入门&#xff08;三&#xff09;请求参数处理&#xff08;四&…

2023年中国塑形内衣市场发展概况分析:五年增长率高达56%,经济与安全兼具的塑身内衣市场不容小觑[图]

塑身内衣可以重修、重塑身体曲线。塑身内衣&#xff0c;能够马上雕塑您的身材&#xff0c;美好身段表露无遗&#xff0c;塑身内衣在呵护您的身体之下&#xff0c;恰到好处的集中胸部&#xff0c;雕塑腰部&#xff0c;以及提臀作用&#xff0c;缔造自然的性感窈窕美态。 塑身内…

4D5D影院设备发展前景7D互动影院体验馆应用

5D影院设备发展前景广阔。随着科技的不断进步&#xff0c;5D影院设备在电影行业中的应用越来越广泛。5D影院设备以其独特的沉浸式体验和互动性&#xff0c;吸引了大量观众。未来&#xff0c;随着技术的不断创新和成本的降低&#xff0c;5D影院设备将会得到更多的应用和推广。 首…

9月《中国数据库行业分析报告》已发布,47页干货带你详览 MySQL 崛起之路!

为了帮助大家及时了解中国数据库行业发展现状、梳理当前数据库市场环境和产品生态等情况&#xff0c;从2022年4月起&#xff0c;墨天轮社区行业分析研究团队出品将持续每月为大家推出最新《中国数据库行业分析报告》&#xff0c;持续传播数据技术知识、努力促进技术创新与行业生…

算法题:摆动序列(贪心算法解决序列问题)

这道题是一道贪心算法题&#xff0c;如果前两个数是递增&#xff0c;则后面要递减&#xff0c;如果不符合则往后遍历&#xff0c;直到找到符合的。&#xff08;完整题目附在了最后&#xff09; 代码如下&#xff1a; class Solution(object):def wiggleMaxLength(self, nums):…

获得京东商品描述 API (商品详情图)

电商卖货&#xff0c;所有产品信息都是通过商品的主图、详情图、商品描述展示出来的。做一个简洁大方又能把产品特点介绍清楚的商品详情信息非常重要。在现有的淘宝京东拼多多等平台&#xff0c;有大量的商品数据&#xff0c;基本你做什么产品都能找到同类的产品信息。如果能够…

Spring5应用之整合MyBatis

作者简介&#xff1a;☕️大家好&#xff0c;我是Aomsir&#xff0c;一个爱折腾的开发者&#xff01; 个人主页&#xff1a;Aomsir_Spring5应用专栏,Netty应用专栏,RPC应用专栏-CSDN博客 当前专栏&#xff1a;Spring5应用专栏_Aomsir的博客-CSDN博客 文章目录 参考文献前言为什…

腾讯云2023年双十一优惠活动整理汇总

随着双十一的到来&#xff0c;各大云服务厂商也开始纷纷推出优惠活动&#xff0c;其中腾讯云作为国内知名的云服务提供商&#xff0c;自然也不例外。那么&#xff0c;腾讯云双十一优惠活动都有哪些呢&#xff1f;本文将会为大家整理汇总&#xff0c;方便大家了解和选择。 一、腾…

Java拆装箱

拆装箱 基本数据类型和包装类的区别 包装类是对象&#xff0c;基本数据类型不是。包装类型是引用的传递&#xff0c;基本数据类型是值的传递声明方式不同&#xff1a;包装类通过new关键字&#xff0c;基本数据类型不需要通过new存储位置不同&#xff1a;包装类型存储在堆内存…

Flutter 打包 windows桌面端可执行文件

简单一说 因为个人兴趣爱好&#xff0c;在写一个跨平台工具。为了省事没去官网看文档&#xff0c;直接翻阅各大博客网站&#xff0c;一个简单的命令&#xff0c;博客写的内容比较复杂。为了方便自己和有需要同学&#xff0c;简单做一个记录。 Flutter提供了一种方便命令行的方…

将网站域名访问从http升级到https(腾讯云/阿里云)

文章目录 1.前提说明2.服务器安装 docker 与 nginx2.1 安装 docker&#x1f340; 基于 centos 的安装&#x1f340; 基于ubuntu 2.2 配置阿里云国内加速器&#x1f340; 找到相应页面&#x1f340; 创建 docker 目录&#x1f340; 创建 daemon.json 文件&#x1f340; 重新加载…

安全与隐私:直播购物App开发中的重要考虑因素

随着直播购物App的崭露头角&#xff0c;开发者需要特别关注安全性和隐私问题。本文将介绍在直播购物App开发中的一些重要安全和隐私考虑因素&#xff0c;并提供相关的代码示例。 1. 数据加密 在直播购物App中&#xff0c;用户的个人信息和支付信息是极为敏感的数据。为了保护…

【K8S系列】深入解析k8s 网络插件—kube-router

序言 做一件事并不难&#xff0c;难的是在于坚持。坚持一下也不难&#xff0c;难的是坚持到底。 文章标记颜色说明&#xff1a; 黄色&#xff1a;重要标题红色&#xff1a;用来标记结论绿色&#xff1a;用来标记论点蓝色&#xff1a;用来标记论点 在现代容器化应用程序的世界中…

【揭秘】那些你可能没发现的高质量免费学习资源网站

在我们的日常生活和工作中&#xff0c;我们总会遇到需要学习和提升自己的时候&#xff0c;但往往在寻找学习资源的过程中&#xff0c;我们会遇到一些问题。比如&#xff0c;有些网站的资源都是付费才能观看&#xff0c;让我们感到困扰。其实&#xff0c;只是你还没找对资源网站…