目前仅支持SQL Server,后续会支持MySQL和Oracle,Nuget上可以下载安装
或者使用Nuget命令添加包
dotnet add package Kogel.Subscribe.Mssql --version 0.0.0.1
可以用来处理DB主从同步,跨库同步,数据备份,同步ES,缓存刷新等等
(一)定义需要监听表的实体类
/// <summary> /// /// </summary> [Display(Rename = "t_oms_order_detail")] [ElasticsearchType(RelationName = "t_oms_order_detail", IdProperty = "Id")] public class OmsOrderDetail : IBaseEntity<OmsOrderDetail, int> { /// <summary> /// /// </summary> [Identity] [Display(Rename = "id")] [Nest.PropertyName("id")] public override int Id { get; set; } /// <summary> /// /// </summary> [Display(Rename = "name")] [Nest.PropertyName("name")] public string Name { get; set; } /// <summary> /// /// </summary> [Display(Rename = "trade_id")] [Nest.PropertyName("trade_id")] public int? TradeId { get; set; } /// <summary> /// /// </summary> [Display(Rename = "descption")] [Nest.PropertyName("descption")] public string Descption { get; set; } /// <summary> /// /// </summary> [Display(Rename = "create_time")] [Nest.PropertyName("create_time")] public DateTime CreateTime { get; set; } }
[Display]和[Identity]属于Kogel.Dapper.Extension的特性如果[想了解更多请点击],[ElasticsearchType]和[Nest.PropertyName]属于Elasticsearch特性,如果没用到可以忽略
(二)定义表订阅
/// <summary> /// 定义表订阅 /// </summary> public class OmsOrderDetailSubscribe : Subscribe<OmsOrderDetail> { /// <summary> /// 设置连接配置 /// </summary> /// <param name="builder"></param> public override void OnConfiguring(OptionsBuilder<OmsOrderDetail> builder) { //此连接字符串账号需要有管理员权限 builder.BuildConnection("数据库连接字符串"); } }
如果需要此表对应多张分表可以设置
//配置所有表分片 builder.BuildShards(new List<string> { "t_oms_order_detail_1", "t_oms_order_detail_2", "t_oms_order_detail_3" })
(1).如果想推送订阅到RabbitMQ中
builder.BuilderRabbitMQ(new RabbitMQ.Client.ConnectionFactory { HostName = "localhost", UserName = "guest", Password = "guest" })
可以通过BuildTopic设置交换机名称
builder.BuildTopic("kogel_subscribe_order_detail")
(2).如果想推送订阅到Kafka中
builder.BuildKafka(new ProducerConfig { BootstrapServers = "localhost:9092", Acks = Acks.None })
可以通过BuildTopic设置Topic名称
builder.BuildTopic("kogel_subscribe_order_detail")
(3).如果想推送订阅到Elasticsearch中
builder.BuildElasticsearch(new ElasticsearchConfig<OmsOrderDetail> { Settings = new Nest.ConnectionSettings(new Uri("http://localhost:9200/")), })
如果有设置Basic授权
builder.BuildElasticsearch(new ElasticsearchConfig<OmsOrderDetail> { Settings = new Nest.ConnectionSettings(new Uri("http://localhost:9200/")) .BasicAuthentication("账号","密码") })
如果想根据自己定义的分片逻辑插入到多个ES索引中可以通过WriteInterceptor
/// <summary> /// 设置连接配置 /// </summary> /// <param name="builder"></param> public override void OnConfiguring(OptionsBuilder<OmsOrderDetail> builder) { //此连接字符串账号需要有管理员权限 builder.BuildConnection("数据库连接字符串"); //定义推送ES builder.BuildElasticsearch(new ElasticsearchConfig<OmsOrderDetail> { Settings = new Nest.ConnectionSettings(new Uri("http://localhost:9200/")) .BasicAuthentication("账号", "密码"), WriteInterceptor = message => WriteInterceptor(message) }); } /// <summary> /// 定义自己的索引逻辑 /// </summary> /// <param name="messages"></param> /// <returns></returns> private EsSubscribeMessage<OmsOrderDetail> WriteInterceptor(SubscribeMessage<OmsOrderDetail> message) { string esIndexName; //这里写自己索引分片的业务逻辑 if (message.Result.Id % 3 == 0) { esIndexName = $"kogel_orders_2"; } else { esIndexName = $"kogel_orders_1"; } return message.ToEsSubscribeMessage(esIndexName); }
并且ES索引不存在的时候会动态创建
(4).如果想自定义实现订阅逻辑,在可以Subscribe订阅类中重写
/// <summary> /// 订阅变更 (每一次sql的执行会触发一次Subscribe) /// </summary> /// <param name="messageList">消息列表表示所有影响到的数据变更(会受BuildLimit限制,没有查询完成的会在下一次查出)</param> public override void Subscribes(List<SubscribeMessage<T>> messageList) { foreach (var message in messageList) { Console.WriteLine($"执行动作:{message.Operation},更新的表:{message.TableName},更新的id:{message.Result.GetId()}"); } }
以上订阅的优先级:
(三)订阅启动
启动监听所有继承自Subscribe<T>的类,在应用程序启动时执行即可
ApplicationProgram.Run();
启动前需要确保DB已经开启了SQL Server Agent
windows环境可以通过cmd命令开启
net start SQLSERVERAGENT
linux或docker环境可以通过以下命令开启
/opt/mssql/bin/mssql-conf set sqlagent.enabled true
如果是基础BaseSubscribe<T>中间基类需要定义成abstract,例如
/// <summary> /// 基础配置类需要定义成abstract /// </summary> /// <typeparam name="T"></typeparam> public abstract class BaseSubscribe<T> : Subscribe<T> where T : class, IBaseEntity { }
关闭监听,在应用程序退出时执行即可
ApplicationProgram.Close();
(四)其他配置
builder.BuildCdcConfig(new CdcConfig { //扫描间隔(每次扫描变更表的间隔,单位毫秒) 默认10000毫秒/10秒 ScanInterval = 10000, //变更捕捉文件在DB保存的时间(默认三天) Retention = 60 * 24 * 3, //是否首次扫描表全部数据再监听变更(默认false) IsFirstScanFull = false, //每次检索的变更量(默认10条) Limit = 10, //变更扫描的偏移量位置(默认从最后中止处开始) OffsetPosition = OffsetPositionEnum.Abort })