Asp .Net Core 系列:集成 CAP + RabbitMQ + MySQL(含幂等性)

news2024/11/10 18:44:35

简介

官网:https://cap.dotnetcore.xyz/

CAP 是什么?

是一个 EventBus,同时也是一个在微服务或者 SOA 系统中解决分布式事务问题的一个框架。它有助于创建可扩展,可靠并且易于更改的微服务系统。

什么是 EventBus?

事件总线是一种机制,它允许不同的组件彼此通信而不彼此了解。 组件可以将事件发送到 Eventbus,而无需知道是谁来接听或有多少其他人来接听。 组件也可以侦听 Eventbus 上的事件,而无需知道谁发送了事件。 这样,组件可以相互通信而无需相互依赖。 同样,很容易替换一个组件。 只要新组件了解正在发送和接收的事件,其他组件就永远不会知道.

CAP 支持的运输器

  • RabbitMQ
  • Kafka
  • Azure Service Bus
  • Amazon SQS
  • NATS
  • In-Memory Queue
  • Redis Streams
  • Apache Pulsar

CAP 支持的持久化数据库

  • SQL Server
  • MySQL
  • PostgreSql
  • MongoDB
  • In-Memory Storage

集成 CAP + RabbitMQ + MySQL

安装 CAP NuGet 包

在你的.NET Core 项目中,通过 NuGet 包管理器安装 CAP。

dotnet add package DotNetCore.CAP
dotnet add package DotNetCore.CAP.RabbitMQ
dotnet add package DotNetCore.CAP.MySql
dotnet add package DotNetCore.CAP.Dashboard #Dashboard
dotnet add package Pomelo.EntityFrameworkCore.MySql #这个之后主要用于幂等性判断,可以不要

配置 CAP

        /// <summary>
        /// 添加分布式事务服务
        /// </summary>
        /// <param name="services">服务集合</param>
        /// <param name="capSection">cap链接项</param>
        /// <param name="rabbitMQSection">rabbitmq配置项</param>
        /// <param name="expiredTime">成功消息过期时间</param>
        /// <returns></returns>
        public static IServiceCollection AddMCodeCap(this IServiceCollection services, Action<CapOptions> configure = null, string capSection = "cap", string rabbitMQSection = "rabbitmq")
        {
            var rabbitMQOptions = ServiceProviderServiceExtensions.GetRequiredService<IConfiguration>(services.BuildServiceProvider()).GetSection(rabbitMQSection).Get<RabbitMQOptions>();

            var logger = ServiceProviderServiceExtensions.GetRequiredService<ILogger<CapContext>>(services.BuildServiceProvider());

            if (rabbitMQOptions == null)
            {
                throw new ArgumentNullException("rabbitmq not config.");
            }

            var capJson = ServiceProviderServiceExtensions.GetRequiredService<IConfiguration>(services.BuildServiceProvider()).GetValue<string>(capSection);


            if (string.IsNullOrEmpty(capJson))
            {
                throw new ArgumentException("cap未设置");
            }

            //services.AddDbContext<CapContext>(options => options.UseMySql(capJson, ServerVersion.AutoDetect(capJson)));

            services.AddCap(x =>
            {
                //使用RabbitMQ传输
                x.UseRabbitMQ(opt => { opt = rabbitMQOptions; });

                使用MySQL持久化
                x.UseMySql(capJson);

                //x.UseEntityFramework<CapContext>();

                x.UseDashboard();

                //成功消息的过期时间(秒)
                x.SucceedMessageExpiredAfter = 10 * 24 * 3600;

                x.FailedRetryCount = 5;

                //失败回调,通过企业微信,短信通知人工干预
                x.FailedThresholdCallback = (e) =>
                {
                    if (e.MessageType == MessageType.Publish)
                    {
                        logger.LogError("Cap发送消息失败;" + JsonExtension.Serialize(e.Message));
                    }
                    else if (e.MessageType == MessageType.Subscribe)
                    {
                        logger.LogError("Cap接收消息失败;" + JsonExtension.Serialize(e.Message));
                    }
                };

                configure?.Invoke(x);
            });

            return services;
        }


internal class JsonExtension
{
    private static readonly JsonSerializerSettings _jsonSerializerSettings;

    internal static JsonSerializerSettings CustomSerializerSettings;

    static JsonExtension()
    {
        _jsonSerializerSettings = DefaultSerializerSettings;
    }

    internal static JsonSerializerSettings DefaultSerializerSettings
    {
        get
        {
            var settings = new JsonSerializerSettings();

            // 设置如何将日期写入JSON文本。默认值为“IsoDateFormat”
            //settings.DateFormatHandling = DateFormatHandling.IsoDateFormat;
            // 设置在序列化和反序列化期间如何处理DateTime时区。默认值为 “RoundtripKind”
            //settings.DateTimeZoneHandling = DateTimeZoneHandling.RoundtripKind;
            // 设置在序列化和反序列化期间如何处理默认值。默认值为“Include”
            //settings.DefaultValueHandling = DefaultValueHandling.Include;
            // 设置写入JSON文本时DateTime和DateTimeOffset值的格式,以及读取JSON文本时预期的日期格式。默认值为“ yyyy'-'MM'-'dd'T'HH':'mm':'ss.FFFFFFFK ”。
            settings.DateFormatString = "yyyy-MM-dd HH:mm:ss";
            // 设置在序列化和反序列化期间如何处理空值。默认值为“Include”
            //settings.NullValueHandling = NullValueHandling.Include;
            // 设置序列化程序在将.net对象序列化为JSON时使用的契约解析器
            settings.ContractResolver = new CamelCasePropertyNamesContractResolver();
            // 设置如何处理引用循环(例如,类引用自身)。默认值为“Error”。
            settings.ReferenceLoopHandling = ReferenceLoopHandling.Ignore;
            // 是否格式化文本
            settings.Formatting = Formatting.Indented;
            //支持将Enum 由默认 Number类型 转换为String
            //settings.SerializerSettings.Converters.Add(new StringEnumConverter());
            //将long类型转为string
            //settings.SerializerSettings.Converters.Add(new NumberConverter(NumberConverterShip.Int64));

            return settings;
        }
    }

    public static T Deserialize<T>(string json, JsonSerializerSettings serializerSettings = null)
    {
        if (string.IsNullOrEmpty(json)) return default;

        if (serializerSettings == null) serializerSettings = _jsonSerializerSettings;

        //值类型和String类型
        if (typeof(T).IsValueType || typeof(T) == typeof(string))
        {
            return (T)Convert.ChangeType(json, typeof(T));
        }

        return JsonConvert.DeserializeObject<T>(json, CustomSerializerSettings ?? serializerSettings);
    }

    public static object Deserialize(string json, Type type, JsonSerializerSettings serializerSettings = null)
    {
        if (string.IsNullOrEmpty(json)) return default;

        if (serializerSettings == null) serializerSettings = _jsonSerializerSettings;

        return JsonConvert.DeserializeObject(json,type, CustomSerializerSettings ?? serializerSettings);
    }

    public static string Serialize<T>(T obj, JsonSerializerSettings serializerSettings = null)
    {
        if (obj is null) return string.Empty;
        if (obj is string) return obj.ToString();
        if (serializerSettings == null) serializerSettings = _jsonSerializerSettings;
        return JsonConvert.SerializeObject(obj, CustomSerializerSettings ?? serializerSettings);
    }
}

appsettings.json

{
    "cap": "Server=127.0.0.1;Port=3306;Database=spring;Uid=root;Pwd=123456;Allow User Variables=true;Pooling=true;Min Pool Size=0;Max Pool Size=100;Connection Lifetime=0;",
    "rabbitmq": {
        "HostName": "127.0.0.1",
        "Port": 5672,
        "UserName": "guest",
        "Password": "guest",
        "VirtualHost": "/"
    }
}

使用 CAP 发布事件

public class YourService
{
    private readonly ICapPublisher _capPublisher;

    public YourService(ICapPublisher capPublisher)
    {
        _capPublisher = capPublisher;
    }

    public async Task DoSomethingAsync()
    {
        // ... 业务逻辑 ...

        await _capPublisher.PublishAsync("your.event.name", new YourEventData { /* ... */ },"callback.name");
    }
}

订阅事件

你需要实现一个事件处理器来订阅并处理事件。这通常是通过继承 ICapSubscribe 接口或使用 CAP 的[CapSubscribe]属性来实现的

public class YourEventHandler : ICapSubscribe
{
    [CapSubscribe("your.event.name")]
    public async Task Handle(YourEventData eventData)
    {
        // 处理事件逻辑
    }
}

或者,使用特性:

[CapSubscribe("your.event.name")]
public class YourEventHandler
{
    public async Task Handle(YourEventData eventData)
    {
        // 处理事件逻辑
    }
}

其它说明

配置

DefaultGroupName

默认值:cap.queue.{程序集名称}

默认的消费者组的名字,在不同的 Transports 中对应不同的名字,可以通过自定义此值来自定义不同 Transports 中的名字,以便于查看。

GroupNamePrefix

默认值:Null

为订阅 Group 统一添加前缀。 https://github.com/dotnetcore/CAP/pull/780

TopicNamePrefix

默认值: Null

为 Topic 统一添加前缀。 https://github.com/dotnetcore/CAP/pull/780

Version

默认值:v1

用于给消息指定版本来隔离不同版本服务的消息,常用于A/B测试或者多服务版本的场景。以下是其应用场景:

FailedRetryInterval *

默认值:60 秒

在消息发送的时候,如果发送失败,CAP将会对消息进行重试,此配置项用来配置每次重试的间隔时间。

在消息消费的过程中,如果消费失败,CAP将会对消息进行重试消费,此配置项用来配置每次重试的间隔时间。

ConsumerThreadCount *

默认值:1

消费者线程并行处理消息的线程数,当这个值大于1时,将不能保证消息执行的顺序。

FailedRetryCount *

默认值:50

重试的最大次数。当达到此设置值时,将不会再继续重试,通过改变此参数来设置重试的最大次数。

SucceedMessageExpiredAfter

默认值:24*3600 秒(1天后)

成功消息的过期时间(秒)。 当消息发送或者消费成功时候,在时间达到 SucceedMessageExpiredAfter 秒时候将会从 Persistent 中删除,你可以通过指定此值来设置过期的时间。

FailedMessageExpiredAfter *

默认值:15243600 秒(15天后)

失败消息的过期时间(秒)。 当消息发送或者消费失败时候,在时间达到 FailedMessageExpiredAfter 秒时候将会从 Persistent 中删除,你可以通过指定此值来设置过期的时间。

EnablePublishParallelSend

默认值: false

默认情况下,发送的消息都先放置到内存同一个Channel中,然后线性处理。 如果设置为 true,则发送消息的任务将由.NET线程池并行处理,这会大大提高发送的速度。

补偿事务

某些情况下,消费者需要返回值以告诉发布者执行结果,以便于发布者实施一些动作,通常情况下这属于补偿范围。

你可以在消费者执行的代码中通过重新发布一个新消息来通知上游,CAP 提供了一种简单的方式来做到这一点。 你可以在发送的时候指定 callbackName 来得到消费者的执行结果,通常这仅适用于点对点的消费。以下是一个示例。

例如,在一个电商程序中,订单初始状态为 pending,当商品数量成功扣除时将状态标记为 succeeded ,否则为 failed。

序列化

意味着你可以调整序列化配置

自定义序列化

 public class MessageSerializer : ISerializer
 {
     public Message Deserialize(string json)
     {
         return JsonExtension.Deserialize<Message>(json);
     }

     public object Deserialize(object value, Type valueType)
     {
         if (value is JToken jToken)
         {
             return jToken.ToObject(valueType);
         }
         throw new NotSupportedException("Type is not of type JToken");
     }

     public ValueTask<Message> DeserializeAsync(TransportMessage transportMessage, Type valueType)
     {
         if (valueType == null || transportMessage.Body.IsEmpty)
         {
             return ValueTask.FromResult(new DotNetCore.CAP.Messages.Message(transportMessage.Headers, null));
         }
         var json = Encoding.UTF8.GetString(transportMessage.Body.ToArray());
         return ValueTask.FromResult(new DotNetCore.CAP.Messages.Message(transportMessage.Headers, JsonExtension.Deserialize(json, valueType)));
     }

     public bool IsJsonType(object jsonObject)
     {
         return jsonObject is JsonToken || jsonObject is JToken;
     }

     public string Serialize(Message message)
     {
         return JsonExtension.Serialize(message);
     }

     public ValueTask<TransportMessage> SerializeAsync(Message message)
     {
         if (message == null)
         {
             throw new ArgumentNullException(nameof(message));
         }
         if (message.Value == null)
         {
             return ValueTask.FromResult(new TransportMessage(message.Headers, null));
         }
         var json = JsonExtension.Serialize(message.Value);
         return ValueTask.FromResult(new TransportMessage(message.Headers, Encoding.UTF8.GetBytes(json)));
     }
 }

然后将你的实现注册到容器中:

services.AddSingleton<DotNetCore.CAP.Serialization.ISerializer, MessageSerializer>();
services.AddCap(x =>
{ xxx
}

事务

CAP 不直接提供开箱即用的基于 DTC 或者 2PC 的分布式事务,相反我们提供一种可以用于解决在分布式事务遇到的问题的一种解决方案。

在分布式环境中,由于涉及通讯的开销,使用基于2PC或DTC的分布式事务将非常昂贵,在性能方面也同样如此。另外由于基于2PC或DTC的分布式事务同样受CAP定理的约束,当发生网络分区时它将不得不放弃可用性(CAP中的A)。

针对于分布式事务的处理,CAP 采用的是“异步确保”这种方案。类似于Java中Seata的Saga模式

幂等性

在说幂等性之前,我们先来说下关于消费端的消息交付。

由于CAP不是使用的 MS DTC 或其他类型的2PC分布式事务机制,所以存在至少消息严格交付一次的问题,具体的说在基于消息的系统中,存在以下三种可能:

  • Exactly Once() (仅有一次)
  • At Most Once (最多一次)
  • At Least Once (最少一次)

在CAP中,我们采用的交付保证为 At Least Once。

由于我们具有临时存储介质(数据库表),也许可以做到 At Most Once, 但是为了严格保证消息不会丢失,我们没有提供相关功能或配置。

以自然的方式处理幂等消息

通常情况下,保证消息被执行多次而不会产生意外结果是很自然的一种方式是采用操作对象自带的一些幂等功能。比如:

数据库提供的 INSERT ON DUPLICATE KEY UPDATE 或者是采取类型的程序判断行为。

显式处理幂等消息

另外一种处理幂等性的方式就是在消息传递的过程中传递ID,然后由单独的消息跟踪器来处理。

下面我们基于MySql和Redis实现显式处理幂等消息

    public interface IMessageTracker
    {
        Task<bool> HasProcessedAsync(string msgId);

        bool HasProcessed(string msgId);

        Task MarkAsProcessedAsync(string msgId);
       
        void MarkAsProcessed(string msgId);
    }
    

    internal class MessageTrackLog
    {
        public MessageTrackLog(string messageId)
        {
            MessageId = messageId;
            CreatedTime = DateTime.Now;
        }

        public string MessageId { get; set; }

        public DateTime CreatedTime { get; set; }

    }


    public class MessageData<T>
    {
        public string Id { get; set; }

        public T MessageBody { get; set; }

        public DateTime CreatedTime { get; set; }

        public MessageData(T messageBody)
        {
            MessageBody = messageBody;
            CreatedTime = DateTime.Now;
            Id = SnowflakeGenerator.Instance().GetId().ToString();
        }
    }


    internal class SnowflakeGenerator
    {
        private static long machineId;//机器ID
        private static long datacenterId = 0L;//数据ID
        private static long sequence = 0L;//计数从零开始

        private static long twepoch = 687888001020L; //惟一时间随机量

        private static long machineIdBits = 5L; //机器码字节数
        private static long datacenterIdBits = 5L;//数据字节数
        public static long maxMachineId = -1L ^ -1L << (int)machineIdBits; //最大机器ID
        private static long maxDatacenterId = -1L ^ (-1L << (int)datacenterIdBits);//最大数据ID

        private static long sequenceBits = 12L; //计数器字节数,12个字节用来保存计数码        
        private static long machineIdShift = sequenceBits; //机器码数据左移位数,就是后面计数器占用的位数
        private static long datacenterIdShift = sequenceBits + machineIdBits;
        private static long timestampLeftShift = sequenceBits + machineIdBits + datacenterIdBits; //时间戳左移动位数就是机器码+计数器总字节数+数据字节数
        public static long sequenceMask = -1L ^ -1L << (int)sequenceBits; //一微秒内能够产生计数,若是达到该值则等到下一微妙在进行生成
        private static long lastTimestamp = -1L;//最后时间戳

        private static object syncRoot = new object();//加锁对象
        static SnowflakeGenerator snowflake;

        static SnowflakeGenerator()
        {
            snowflake = new SnowflakeGenerator();
        }

        public static SnowflakeGenerator Instance()
        {
            if (snowflake == null)
                snowflake = new SnowflakeGenerator();
            return snowflake;
        }

        public SnowflakeGenerator()
        {
            Snowflakes(0L, -1);
        }

        public SnowflakeGenerator(long machineId)
        {
            Snowflakes(machineId, -1);
        }

        public SnowflakeGenerator(long machineId, long datacenterId)
        {
            Snowflakes(machineId, datacenterId);
        }

        private void Snowflakes(long machineId, long datacenterId)
        {
            if (machineId >= 0)
            {
                if (machineId > maxMachineId)
                {
                    throw new Exception("机器码ID非法");
                }
                SnowflakeGenerator.machineId = machineId;
            }
            if (datacenterId >= 0)
            {
                if (datacenterId > maxDatacenterId)
                {
                    throw new Exception("数据中心ID非法");
                }
                SnowflakeGenerator.datacenterId = datacenterId;
            }
        }

        /// <summary>
        /// 生成当前时间戳
        /// </summary>
        /// <returns>毫秒</returns>
        private static long GetTimestamp()
        {
            //让他2000年开始
            return (long)(DateTime.UtcNow - new DateTime(2000, 1, 1, 0, 0, 0, DateTimeKind.Utc)).TotalMilliseconds;
        }

        /// <summary>
        /// 获取下一微秒时间戳
        /// </summary>
        /// <param name="lastTimestamp"></param>
        /// <returns></returns>
        private static long GetNextTimestamp(long lastTimestamp)
        {
            long timestamp = GetTimestamp();
            int count = 0;
            while (timestamp <= lastTimestamp)//这里获取新的时间,可能会有错,这算法与comb同样对机器时间的要求很严格
            {
                count++;
                if (count > 10)
                    throw new Exception("机器的时间可能不对");
                System.Threading.Thread.Sleep(1);
                timestamp = GetTimestamp();
            }
            return timestamp;
        }

        /// <summary>
        /// 获取长整形的ID
        /// </summary>
        /// <returns></returns>
        public long GetId()
        {
            lock (syncRoot)
            {
                long timestamp = GetTimestamp();
                if (SnowflakeGenerator.lastTimestamp == timestamp)
                { //同一微妙中生成ID
                    sequence = (sequence + 1) & sequenceMask; //用&运算计算该微秒内产生的计数是否已经到达上限
                    if (sequence == 0)
                    {
                        //一微妙内产生的ID计数已达上限,等待下一微妙
                        timestamp = GetNextTimestamp(SnowflakeGenerator.lastTimestamp);
                    }
                }
                else
                {
                    //不一样微秒生成ID
                    sequence = 0L;
                }
                if (timestamp < lastTimestamp)
                {
                    throw new Exception("时间戳比上一次生成ID时时间戳还小,故异常");
                }
                SnowflakeGenerator.lastTimestamp = timestamp; //把当前时间戳保存为最后生成ID的时间戳
                long Id = ((timestamp - twepoch) << (int)timestampLeftShift)
                    | (datacenterId << (int)datacenterIdShift)
                    | (machineId << (int)machineIdShift)
                    | sequence;
                return Id;
            }
        }
    }
基于Redis显式处理幂等消息

   internal class RedisMessageTracker : IMessageTracker
{
    #region 属性和字段
    private const string KEY_PREFIX = "msgtracker:"; // 默认Key前缀
    private const int DEFAULT_CACHE_TIME = 60 * 60 * 24 * 3; // 默认缓存时间为3天,单位为秒

    private readonly IDatabase _redisDatabase;
    #endregion

     //依赖StackExchange.Redis;
    public RedisMessageTracker(ConnectionMultiplexer multiplexer)
    {
        _redisDatabase = multiplexer.GetDatabase();
    }

    public bool HasProcessed(string msgId)
    {
        return _redisDatabase.KeyExists(KEY_PREFIX + msgId);
    }

    public async Task<bool> HasProcessedAsync(string msgId)
    {
        return await _redisDatabase.KeyExistsAsync(KEY_PREFIX + msgId);
    }

    public void MarkAsProcessed(string msgId)
    {
        var msgRecord = new MessageTrackLog(msgId);
        _redisDatabase.StringSet($"{KEY_PREFIX}{msgId}", JsonExtension.Serialize(msgRecord), TimeSpan.FromMinutes(DEFAULT_CACHE_TIME));
    }

    public async Task MarkAsProcessedAsync(string msgId)
    {
        var msgRecord = new MessageTrackLog(msgId);
        await _redisDatabase.StringSetAsync($"{KEY_PREFIX}{msgId}", JsonExtension.Serialize(msgRecord), TimeSpan.FromMinutes(DEFAULT_CACHE_TIME));
    }
}

        public static IServiceCollection AddRedisMessageTracker(this IServiceCollection services)
        {
            services.AddScoped<IMessageTracker, RedisMessageTracker>();

            return services;
        }
基于Mysql显式处理幂等消息
   internal class MySqlMessageTracker : IMessageTracker
   {
       private readonly CapContext _capContext;

       public MySqlMessageTracker(CapContext capContext)
       {
           _capContext = capContext;
       }

       public bool HasProcessed(string msgId)
       {
           return _capContext.MessageTrackLogs.Any(x => x.MessageId == msgId);
       }

       public Task<bool> HasProcessedAsync(string msgId)
       {
           return _capContext.MessageTrackLogs.AnyAsync(x => x.MessageId == msgId);
       }

       public void MarkAsProcessed(string msgId)
       {
           MessageTrackLog messageTrackLog = new MessageTrackLog(msgId);
           _capContext.MessageTrackLogs.Add(messageTrackLog);
           _capContext.SaveChanges();
       }

       public async Task MarkAsProcessedAsync(string msgId)
       {
           MessageTrackLog messageTrackLog = new MessageTrackLog(msgId);
           await _capContext.MessageTrackLogs.AddAsync(messageTrackLog);
           await _capContext.SaveChangesAsync();
       }
   }

    internal class CapContext : DbContext
    {
        public CapContext(DbContextOptions<CapContext> options)
       : base(options)
        {

        }

        protected override void OnModelCreating(ModelBuilder modelBuilder)
        {
            // 可以在这里进行模型配置
            modelBuilder.Entity<MessageTrackLog>().ToTable("message_track_log");
            modelBuilder.Entity<MessageTrackLog>().HasKey(b => b.MessageId);
        }

        protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder)
        {
            base.OnConfiguring(optionsBuilder);

        }

        public DbSet<MessageTrackLog> MessageTrackLogs { get; set; }
    }


        public static IServiceCollection AddMySqlMessageTracker(this IServiceCollection services)
        {
            services.AddScoped<IMessageTracker, MySqlMessageTracker>();

            var serviceProvider = services.BuildServiceProvider();

            using (var context = serviceProvider.GetService<CapContext>())
            {
                context.Database.ExecuteSqlRaw(@"
                                CREATE TABLE IF NOT EXISTS  `message_track_log` (
                               `MessageId` varchar(255) CHARACTER SET utf8mb4 NOT NULL,
                               `CreatedTime` datetime NOT NULL,
                                CONSTRAINT `PK_message_track_log` PRIMARY KEY (`MessageId`)
                                ) CHARACTER SET=utf8mb4;
                ");
            }
            return services;
        }


使用

    [ApiController]
    [Route("[controller]")]
    public class WeatherForecastController : ControllerBase
    {       
       private readonly IMessageTracker _messageTracker;

        public WeatherForecastController(IMessageTracker messageTracker)
        {
            _messageTracker = messageTracker;
        }
        [CapSubscribe("order.test")]
        [NonAction]
        public void OrderTest(MessageData<string> messageData)
        {
            try
            {
                if (_messageTracker.HasProcessed(messageData.Id))
                    return;
       
                 Console.WriteLine("业务逻辑:"+messageData.MessageBody);
                
                 //xxxx
       
                _messageTracker.MarkAsProcessed(messageData.Id);
            }
            catch (Exception ex)
            {
                throw ex;
            }
        }
   }    

监控

Consul

CAP的 Dashboard 使用 Consul 作为服务发现来显示其他节点的数据,然后你就在任意节点的 Dashboard 中切换到 Servers 页面看到其他的节点。

通过点击 Switch 按钮来切换到其他的节点看到其他节点的数据,而不必访问很多地址来分别查看。

在这里插入图片描述

以下是一个配置示例, 你需要在每个节点分别配置:

services.AddCap(x =>
{
    x.UseMySql(Configuration.GetValue<string>("ConnectionString"));
    x.UseRabbitMQ("localhost");
    x.UseDashboard();
    x.UseConsulDiscovery(_ =>
    {
        _.DiscoveryServerHostName = "localhost";
        _.DiscoveryServerPort = 8500;
        _.CurrentNodeHostName = Configuration.GetValue<string>("ASPNETCORE_HOSTNAME");
        _.CurrentNodePort = Configuration.GetValue<int>("ASPNETCORE_PORT");
        _.NodeId = Configuration.GetValue<string>("NodeId");
        _.NodeName = Configuration.GetValue<string>("NodeName");
    });
});
启用 Dashboard

首先,你需要安装Dashboard的 NuGet 包。

PM> Install-Package DotNetCore.CAP.Dashboard

然后,在配置中添加如下代码:

services.AddCap(x =>
{
    x.UseDashboard();
});

默认情况下,你可以访问 http://localhost:xxx/cap 这个地址打开Dashboard。

Dashboard 配置项
  • PathBase

默认值:N/A

当位于代理后时,通过配置此参数可以指定代理请求前缀。

  • PathMatch *

默认值:‘/cap’

你可以通过修改此配置项来更改Dashboard的访问路径。

  • StatsPollingInterval

默认值:2000 毫秒

此配置项用来配置Dashboard 前端 获取状态接口(/stats)的轮询时间

  • AllowAnonymousExplicit

Default: true

显式允许对 CAP 仪表板 API 进行匿名访问,当启用ASP.NET Core 全局授权筛选器请启用 AllowAnonymous。

  • AuthorizationPolicy

Default: null.

Dashboard 的授权策略。 需设置 AllowAnonymousExplicit为 false。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1697100.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Pytorch深度学习实践笔记6(b站刘二大人)

&#x1f3ac;个人简介&#xff1a;一个全栈工程师的升级之路&#xff01; &#x1f4cb;个人专栏&#xff1a;pytorch深度学习 &#x1f380;CSDN主页 发狂的小花 &#x1f304;人生秘诀&#xff1a;学习的本质就是极致重复! 《PyTorch深度学习实践》完结合集_哔哩哔哩_bilibi…

项目管理-人力资源管理

目录 一、概述 二、人力资源计划编制 2.1 概述 2.2 层次结构图 2.3 分配任务矩阵 三、组建项目团队 3.1 概述 3.2 内部谈判 3.3 事先分派 3.4 外部招聘 3.5 虚拟团队 3.6 总结 四、项目团队建设 4.1 概述 4.2 团队发展过程 4.2.1 概述 4.2.2 形成期 4.2.3 震…

【每日力扣】84. 柱状图中最大的矩形 与 295. 数据流的中位数

&#x1f525; 个人主页: 黑洞晓威 &#x1f600;你不必等到非常厉害&#xff0c;才敢开始&#xff0c;你需要开始&#xff0c;才会变的非常厉害 84. 柱状图中最大的矩形 给定 n 个非负整数&#xff0c;用来表示柱状图中各个柱子的高度。每个柱子彼此相邻&#xff0c;且宽度为…

我用LLaMA-Factory微调大模型来实现商品评论情感分析,准确率高达91.70%

大家好&#xff0c;我是程序锅。 最近在modelscope上闲逛的时候&#xff0c;在数据集板块发现有一个商品评论情感预测数据集。这个数据集源自一个比赛&#xff0c;它的目的是为了预测电商平台顾客的评论是好评还是差评。 数据示例如下所示&#xff08;其中0代表差评&#xff…

电子电器架构 - AUTOSAR软件架构介绍

电子电器架构 - AUTOSAR软件架构介绍 我是穿拖鞋的汉子,魔都中坚持长期主义的汽车电子工程师。 老规矩,分享一段喜欢的文字,避免自己成为高知识低文化的工程师: 屏蔽力是信息过载时代一个人的特殊竞争力,任何消耗你的人和事,多看一眼都是你的不对。非必要不费力证明自己…

山东大学软件学院项目实训-创新实训-基于大模型的旅游平台(二十)- JUC(6)

目录 wait , notify wait vs sleep 正确使用方法 同步保护性暂停 join的源码 Future 异步生产者/消费者模型 定义 Park & Unpark 原理 wait , notify 小故事小南需要烟才能工作&#xff0c;但它又要占这锁让别人无法进来。那么这个时候开一个waitSet相当于就是休…

解决vue3项目vite打包忽略.vue扩展名

项目打包时报could not relolve “...”&#xff0c;因为vite已不再默认忽略.vue扩展名。 解决方法如下&#xff1a; 在vite.config.js中配置vite使其忽略 .vue 扩展名&#xff08;不建议忽略&#xff09; 注意&#xff1a;即使忽略了.vue文件&#xff0c;在实际写的时候也要加…

【Linux】为 VMware 的 Linux 系统(CentOS 7)设置静态IP地址

文章目录 准备工作查看 子网掩码 和 网关IP确认准备设置的虚拟机端口没有被占用 调整设置编辑配置文件配置文件说明 完成配置&#xff0c;准备测试使用命令终端连接服务器 我是一名立志把细节说清楚的博主&#xff0c;欢迎【关注】&#x1f389; ~ 原创不易&#xff0c; 如果有…

OS复习笔记ch6-1

死锁的原理 定义 一组进程中&#xff0c;其中每个进程因等待事件而阻塞&#xff0c;且所等待的事件只能被这组进程中的另一阻塞进程激发称之为死锁。 举例如下 四个车辆希望紧迫的希望能很快通过&#xff0c;每辆车需要两个象限的资源&#xff0c;然而四个车都只得到一个象…

使用 Django Rest Framework 构建强大的 Web API

文章目录 安装 Django Rest Framework创建序列化器创建视图和 URL 路由配置认证和权限测试 API Django Rest Framework&#xff08;DRF&#xff09;是一个强大的工具&#xff0c;用于在 Django Web 框架中构建灵活且功能丰富的 Web API。它提供了许多功能&#xff0c;包括序列化…

揭秘Kafka从入门到精通,架构最全详解

Kafka架构最全详解 Kafka&#xff0c;作为关键消息中间件&#xff0c;广泛应用于大型架构与顶尖企业。本篇深入解析Kafka架构&#xff0c;掌握其核心技术要点。 Kafka Apache Kafka 是一个分布式发布-订阅消息系统&#xff0c;由LinkedIn开创的分布式发布-订阅消息系统&#x…

长安链使用Golang编写智能合约教程(一)

编写前的注意事项&#xff1a; 1、运行一条带有Doker_GoVM的链 2、建议直接用官方的在线IDE去写合约&#xff0c;因为写完可以直接测&#xff0c;缺点只是调试不方便。 3、自己拉环境在本地写合约&#xff0c;编译时注意编译环境&#xff0c;官方有提醒你去Linux下去编译。 …

010-Linux磁盘介绍

文章目录 1、名词 2、类型 3、尺寸 4、接口/协议/总线 5、命名 6、分区方式 MBR分区 GPT分区 1、名词 磁盘是计算机主要的存储介质&#xff0c;可以存储大量的二进制数据&#xff0c;并且断电后也能保持数据不丢失。早期计算机使用的磁盘是软磁盘&#xff08;Floppy D…

牛客网刷题 | BC99 正方形图案

目前主要分为三个专栏&#xff0c;后续还会添加&#xff1a; 专栏如下&#xff1a; C语言刷题解析 C语言系列文章 我的成长经历 感谢阅读&#xff01; 初来乍到&#xff0c;如有错误请指出&#xff0c;感谢&#xff01; 描述 KiKi学习了循环&am…

CST初级教程 六

本篇教程将以差分线为例&#xff0c;实例讲解参数化建模及参数扫描。 一 Project创建 点击New and Recent&#xff0c;再点击New Template 点击MICROVAVES & RF/OTICAL&#xff0c;然后在选中Circuit & Components。 点击对话框中Next按钮&#xff0c;在弹出对话框…

C语言中的七种常用排序

今天&#xff0c;为大家整理了C语言中几种常用的排序&#xff0c;以及他们在实际中的运用&#xff08;有Bug请在下方评论&#xff09;&#xff1a; 一.桶排序 #include <stdio.h> int main() {int book[1001],i,j,t,n;for(i0;i<1000;i)book[i]0;scanf("%d"…

B树与B+树区别

B树和B树是常见的数据库索引结构&#xff0c;都具有相较于二叉树层级较少&#xff0c;查找效率高的特点&#xff0c;它们之间有以下几个主要区别&#xff1a; 1.节点存储数据的方式不同 B树的叶子结点和非叶子节点都会存储数据&#xff0c;指针和数据共同保存在同一节点中B树…

MySQL的索引, 到底怎么创建?

目录 前言 MySQL的数据结构 索引是一把双刃剑 索引创建原则 如何给一个列挑选索引? 索引列的基数, 要尽量小 索引列的类型尽量小 索引长字符串的前缀 不要对索引列进行计算操作或者函数计算. 不要老想着查询, 想想插入该怎么办? 避免索引冗余和重复 前言 今天在…

【二叉树】:LeetCode:100.相同的数(分治)

&#x1f381;个人主页&#xff1a;我们的五年 &#x1f50d;系列专栏&#xff1a;初阶初阶结构刷题 &#x1f389;欢迎大家点赞&#x1f44d;评论&#x1f4dd;收藏⭐文章 1.问题描述&#xff1a; 2.问题分析&#xff1a; 二叉树是区分结构的&#xff0c;即左右子树是不一…

上5个B端系统的设计规范,让你的开发比着葫芦画瓢。

B端系统设计规范在企业级系统开发中起着重要的作用&#xff0c;具体包括以下几个方面&#xff1a; 统一风格和布局&#xff1a;设计规范能够统一系统的风格和布局&#xff0c;使不同功能模块的界面看起来一致&#xff0c;提升用户的使用体验和学习成本。通过统一的设计规范&am…