此框架是SQL Server增量订阅,用来监听增删改数据库数据变更

news2024/12/24 10:04:33

目前仅支持SQL Server,后续会支持MySQL和Oracle,Nuget上可以下载安装

或者使用Nuget命令添加包

dotnet add package Kogel.Subscribe.Mssql --version 0.0.0.1

 可以用来处理DB主从同步,跨库同步,数据备份,同步ES,缓存刷新等等

(一)定义需要监听表的实体类 

 /// <summary>
    /// 
    /// </summary>
    [Display(Rename = "t_oms_order_detail")]
    [ElasticsearchType(RelationName = "t_oms_order_detail", IdProperty = "Id")]
    public class OmsOrderDetail : IBaseEntity<OmsOrderDetail, int>
    {
        /// <summary>
        /// 
        /// </summary>
        [Identity]
        [Display(Rename = "id")]
        [Nest.PropertyName("id")]
        public override int Id { get; set; }

        /// <summary>
        /// 
        /// </summary>
        [Display(Rename = "name")]
        [Nest.PropertyName("name")]
        public string Name { get; set; }

        /// <summary>
        /// 
        /// </summary>
        [Display(Rename = "trade_id")]
        [Nest.PropertyName("trade_id")]
        public int? TradeId { get; set; }

        /// <summary>
        /// 
        /// </summary>
        [Display(Rename = "descption")]
        [Nest.PropertyName("descption")]
        public string Descption { get; set; }

        /// <summary>
        /// 
        /// </summary>
        [Display(Rename = "create_time")]
        [Nest.PropertyName("create_time")]
        public DateTime CreateTime { get; set; }
    }

[Display]和[Identity]属于Kogel.Dapper.Extension的特性如果[想了解更多请点击],[ElasticsearchType]和[Nest.PropertyName]属于Elasticsearch特性,如果没用到可以忽略

(二)定义表订阅

    /// <summary>
    /// 定义表订阅
    /// </summary>
    public class OmsOrderDetailSubscribe : Subscribe<OmsOrderDetail>
    {
        /// <summary>
        /// 设置连接配置
        /// </summary>
        /// <param name="builder"></param>
        public override void OnConfiguring(OptionsBuilder<OmsOrderDetail> builder)
        {
            //此连接字符串账号需要有管理员权限
            builder.BuildConnection("数据库连接字符串");
        }
    }

如果需要此表对应多张分表可以设置

//配置所有表分片
builder.BuildShards(new List<string>
            {
                "t_oms_order_detail_1",
                "t_oms_order_detail_2",
                "t_oms_order_detail_3"
            })

(1).如果想推送订阅到RabbitMQ中

builder.BuilderRabbitMQ(new RabbitMQ.Client.ConnectionFactory
            {
                HostName = "localhost",
                UserName = "guest",
                Password = "guest"
            })

可以通过BuildTopic设置交换机名称

builder.BuildTopic("kogel_subscribe_order_detail")

(2).如果想推送订阅到Kafka中

builder.BuildKafka(new ProducerConfig
            {
                BootstrapServers = "localhost:9092",
                Acks = Acks.None
            })

可以通过BuildTopic设置Topic名称

builder.BuildTopic("kogel_subscribe_order_detail")

(3).如果想推送订阅到Elasticsearch中

 builder.BuildElasticsearch(new ElasticsearchConfig<OmsOrderDetail>
            {
                Settings = new Nest.ConnectionSettings(new Uri("http://localhost:9200/")),
            })

如果有设置Basic授权

builder.BuildElasticsearch(new ElasticsearchConfig<OmsOrderDetail>
            {
                Settings = new Nest.ConnectionSettings(new Uri("http://localhost:9200/"))
                    .BasicAuthentication("账号","密码")
            })

如果想根据自己定义的分片逻辑插入到多个ES索引中可以通过WriteInterceptor

/// <summary>
        /// 设置连接配置
        /// </summary>
        /// <param name="builder"></param>
        public override void OnConfiguring(OptionsBuilder<OmsOrderDetail> builder)
        {
            //此连接字符串账号需要有管理员权限
            builder.BuildConnection("数据库连接字符串");
            //定义推送ES
            builder.BuildElasticsearch(new ElasticsearchConfig<OmsOrderDetail>
            {
                Settings = new Nest.ConnectionSettings(new Uri("http://localhost:9200/"))
                    .BasicAuthentication("账号", "密码"),
                WriteInterceptor = message => WriteInterceptor(message)
            });
        }

        /// <summary>
        /// 定义自己的索引逻辑
        /// </summary>
        /// <param name="messages"></param>
        /// <returns></returns>
        private EsSubscribeMessage<OmsOrderDetail> WriteInterceptor(SubscribeMessage<OmsOrderDetail> message)
        {
            string esIndexName;
            //这里写自己索引分片的业务逻辑
            if (message.Result.Id % 3 == 0)
            {
                esIndexName = $"kogel_orders_2";
            }
            else
            {
                esIndexName = $"kogel_orders_1";
            }
            return message.ToEsSubscribeMessage(esIndexName);
        }

并且ES索引不存在的时候会动态创建

(4).如果想自定义实现订阅逻辑,在可以Subscribe订阅类中重写

/// <summary>
        /// 订阅变更 (每一次sql的执行会触发一次Subscribe)
        /// </summary>
        /// <param name="messageList">消息列表表示所有影响到的数据变更(会受BuildLimit限制,没有查询完成的会在下一次查出)</param>
        public override void Subscribes(List<SubscribeMessage<T>> messageList)
        {
            foreach (var message in messageList)
            {
                Console.WriteLine($"执行动作:{message.Operation},更新的表:{message.TableName},更新的id:{message.Result.GetId()}");
            }
        }

以上订阅的优先级:

(三)订阅启动

启动监听所有继承自Subscribe<T>的类,在应用程序启动时执行即可

ApplicationProgram.Run();

启动前需要确保DB已经开启了SQL Server Agent

windows环境可以通过cmd命令开启

net start SQLSERVERAGENT

linux或docker环境可以通过以下命令开启

/opt/mssql/bin/mssql-conf set sqlagent.enabled true

如果是基础BaseSubscribe<T>中间基类需要定义成abstract,例如

  /// <summary>
    /// 基础配置类需要定义成abstract
    /// </summary>
    /// <typeparam name="T"></typeparam>
    public abstract class BaseSubscribe<T> : Subscribe<T>
        where T : class, IBaseEntity
    {
    }

关闭监听,在应用程序退出时执行即可

ApplicationProgram.Close();

(四)其他配置

builder.BuildCdcConfig(new CdcConfig
            {
                //扫描间隔(每次扫描变更表的间隔,单位毫秒) 默认10000毫秒/10秒
                ScanInterval = 10000,

                //变更捕捉文件在DB保存的时间(默认三天)
                Retention = 60 * 24 * 3,

                //是否首次扫描表全部数据再监听变更(默认false)
                IsFirstScanFull = false,

                //每次检索的变更量(默认10条)
                Limit = 10,

                //变更扫描的偏移量位置(默认从最后中止处开始)
                OffsetPosition = OffsetPositionEnum.Abort
            })

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/51215.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

AI绘画软件汇总

AI绘画软件汇总 AI绘图在线体验 二次元绘图 在线体验地址:Stable Diffusion 模型包括&#xff1a; NovelAI&#xff0c;NovelAI的模型训练使用了数千个网站的数十亿张图片&#xff0c;包括 Pixiv、Twitter、DeviantArt、Tumblr等网站的作品。 Waifu&#xff0c;waifu的模型…

ShareSDK for Unity

本文档使用Unity2019进行演示 下载unitypackage 从Mob的github地址下载ShareSDK.unitypackage&#xff1a;Git地址&#xff0c;如下图所示 )![image.png]//download.sdk.mob.com/2022/06/22/15/165588252810937.61.png) 下载完成后得到一个.unitypackage结尾的文件&#xf…

2022年12月全国DAMA-CDGA/CDGP数据治理认证招生简章

20DAMA认证为数据管理专业人士提供职业目标晋升规划&#xff0c;彰显了职业发展里程碑及发展阶梯定义&#xff0c;帮助数据管理从业人士获得企业数字化转型战略下的必备职业能力&#xff0c;促进开展工作实践应用及实际问题解决&#xff0c;形成企业所需的新数字经济下的核心职…

R语言stan进行基于贝叶斯推断的回归模型

可以从许多统计软件包中运行Stan。到目前为止&#xff0c;我一直在从R运行Stan。 我们围绕stan进行一些咨询&#xff0c;帮助客户解决独特的业务问题。 简单线性回归 第一步是为Stan模型编写文件。这包含一个文件linreg.stan&#xff1a; 视频&#xff1a;线性回归中的贝叶斯…

新闻舆情管理平台开发,监控舆情发展趋势

打造企业良好声誉可能需要几年、十几年甚至更久&#xff0c;而毁掉它只需要短短几分钟。尤其是互联网时代下&#xff0c;人们接收信息的速度越来越快&#xff0c;在新闻发出去的几分钟内就能迅速占据热搜榜。而且网络上每天都会产生上亿条信息&#xff0c;单纯的依靠人工进行监…

openEuler 通过 手工方式 安装 ceph 步骤 Cephadm无法应用到openEuler 提醒不支持

ceph集群在openEuler手工安装过程Cephadm安装步骤前置要求1.openEuler版本2. Python 33. Systemd4. Time synchronization (such as chrony or NTP)5. LVM2 for provisioning storage devices安装1. 创建用户ceph2. 安装 ceph3. 生成配置项3.1 机器及组件规划列表3.2 ceph.conf…

Python第三方库之nibabel

1.nibabel简介 NiBabel提供对一些常见医学和神经影像文件格式的读/写访问&#xff0c;包括ANALYZE&#xff08;plain&#xff0c;SPM99&#xff0c;SPM2及更高版本&#xff09;&#xff0c;GIFTI&#xff0c;NIfTI1&#xff0c;NIfTI2&#xff0c;CIFTI-2&#xff0c;MINC1&am…

[附源码]SSM计算机毕业设计疫情防控期间人员档案追寻系统JAVA

项目运行 环境配置&#xff1a; Jdk1.8 Tomcat7.0 Mysql HBuilderX&#xff08;Webstorm也行&#xff09; Eclispe&#xff08;IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持&#xff09;。 项目技术&#xff1a; SSM mybatis Maven Vue 等等组成&#xff0c;B/S模式 M…

RocketMQ系列——搭建Namesrv源码调试环境整理

目录 RocketMQ系列-搭建Namesrv源码调试环境 Namesrv源码调试环境搭建 导入项目到IDEA 创建所需目录 环境配置 启动Namesrv 总结 RocketMQ系列-搭建Namesrv源码调试环境 在学习任何一个技术框架的时候&#xff0c;我们通常都是先了解是什么&#xff0c;有什么作用、解决…

Java流程控制语句

流程控制语句 在一个程序执行的过程中&#xff0c;各条语句的执行顺序对程序的结果是有直接影响的。所以&#xff0c;我们必须清楚每条语句的执行流程。而且&#xff0c;很多时候要通过控制语句的执行顺序来实现我们想要的功能。 流程控制语句分类 顺序结构、分支结构&#…

【毕业设计】深度学习社交安全距离检测系统 - python opencv

文章目录0 前言1 课题背景2 实现效果3 相关技术3.1 YOLOV43.2 基于 DeepSort 算法的行人跟踪4 最后0 前言 &#x1f525; Hi&#xff0c;大家好&#xff0c;这里是丹成学长的毕设系列文章&#xff01; &#x1f525; 对毕设有任何疑问都可以问学长哦! 这两年开始&#xff0c…

鲜花商城|基于Springboot实现鲜花商城系统

作者主页&#xff1a;编程千纸鹤 作者简介&#xff1a;Java、前端、Pythone开发多年&#xff0c;做过高程&#xff0c;项目经理&#xff0c;架构师 主要内容&#xff1a;Java项目开发、毕业设计开发、面试技术整理、最新技术分享 收藏点赞不迷路 关注作者有好处 文末获得源码 …

xgboost 为什么拟合残差能获得更好的效果(思考)

以时序预测为例&#xff1a; 现在要 预测2022年之后的值&#xff0c;可以预测下降幅度&#xff08;和预测残差的步骤一样&#xff09;。 假设有一个隐藏的规律&#xff1a;对于21年的高峰&#xff0c;22年的下降幅度会更大&#xff08;如time3是&#xff0c;下降幅度会比其他的…

Spring依赖注入源码解析(下)

文章目录前言本章目标resolveDependency—解决依赖查找1、doResolveDependency2、Autowreid寻找依赖流程图依赖注入完整流程图前言 在上一篇文章Spring依赖注入源码解析&#xff08;上&#xff09;中&#xff0c;主要介绍了寻找注入点、以及注入源码分析 本章目标 这一篇主要…

市面上最适合跑步用的耳机有哪些、分享五款最优秀的跑步耳机

随着人们日益对健康的重视&#xff0c;”全民健身“正在全国&#xff0c;乃至全世界蔓延开来&#xff0c;其中跑步锻炼凭借着门槛低&#xff0c;益处多成为了大部分人的健身的首选。而随着跑步大军的壮大&#xff0c;国内蓝牙耳机市场也是一片火热。其中蓝牙无线运动耳机凭借着…

【python小项目】用python写一个小工具——番茄钟

用python写一个小工具——番茄钟 最近听到朋友说在用番茄钟&#xff0c;有点兴趣也想下载一个来用用&#xff0c;后面仔细一想这玩意做起来也不难&#xff0c;索性自己顺手写一个算了&#xff0c;在这里也分享给大家了 一、功能简述 番茄钟即番茄工作法&#xff0c;番茄工作法…

产品经理必备的能力有哪些?

从一名普通的产品经理到一名优秀的产品经理要经历什么&#xff1f;哪些又是产品经理必备的能力&#xff1f;产品经理对能力的需求也不尽相同&#xff0c;在不同的团队合作模式下&#xff0c;还必须懂得各种能力。 一、业务分析能力 数据分析能力该是什么样的呢 1、有数据意识…

indexDB 本地数据库

indexDB 本地数据库 IndexedDB是一种使用浏览器存储大量数据的方法&#xff0c;它创造的数据可以被查询&#xff0c;并且可以离线使用。 优点&#xff1a;空间大小&#xff0c;大于250M&#xff1b;支持二进制&#xff1a;IndexedDB不但可以存储对象&#xff0c;字符串等&#…

利用MS11_003 IE漏洞攻击win7主机

利用MS11_003 IE漏洞攻击win7主机 微软2011年2月9日发布12个安全补丁,其中3个最高级别为严重等级,9个为重要等级,共计修复了影响 Windows、Office、IE 和 IIS 的22个漏洞。 MS11-003、MS11-006 和 MS11-007 为严重等级,需要优先部署。其中,MS11-003 的最高利用指数为1它修…

基于web的课程管理系统设计与实现(java+SqlServer)

目 录 摘 要 I Abstract II 1 绪论 1 1.1 课题背景 1 1.2 本课题研究的意义 2 1.3 主要研究内容 3 2 开发环境与相关技术 4 2.1 JSP技术 4 2.1.1 JAVA简介 4 2.1.2 JSP简介 4 2.1.3 SSH2框架介绍 5 2.2 Myeclipse介绍 6 2.3 SQL2008 数据库 7 2.4 Browser/Server&#xff08;B…