SpringCloud 微服务消息队列灰度方案 (RocketMQ 4.x)

news2025/1/3 3:31:54

目录

    • 背景
      • 遇到的问题
    • RocketMQ 基础
      • 基础消息模型
      • 扩展后的消息模型
      • 部署模型
      • 相关概念点
    • 方案对比
      • 影子Topic的方案
      • Tag的方案
      • UserProperty的方案
      • 影子Group的方案
      • 灰度分区的方案
      • 方案对比
    • 灰度分区方案设计
      • 适配只有部分灰度的情况所做的功能扩展
      • 消费者(无灰度)
      • 消费者(有灰度)
    • 改造流程
      • 生产者改造逻辑
      • 消费者改造逻辑
        • 消费者自定义负载均衡
      • MQ注册ClientID修改
      • 常量配置类
      • 链路信息传递`BasicMDC`类
      • MessageStorage类
      • 拦截配置
        • 消费者拦截
        • 生产者拦截
        • 请求拦截
    • 验证过程
      • 消费者(未启动灰度订阅)
      • 消费者(灰度订阅)
      • 验证消息链路
        • 发送端
        • 消费端
        • 发送灰度消息

背景

  • 我们公司团队为了更好地控制版本发布的影响范围,自研了灰度发布流程,目前已经支持httpgRPC等接口调用方式进行灰度流量转发,但是消息队列基于业务实现展示不支持。
  • 参考了网上很多灰度方案博文,比较热门的vivo鲁班RocketMQ的文章,但是涉及到的改造范围都是比较大,最后在看到关于Asyncer关于灰度分区 的博文,整体比较巧妙的利用MQ的原有机制,改动相对少了很多

遇到的问题

在这里插入图片描述
如上图所示普通的业务灰度流程,保证了RPC服务之间的调用灰度,但是当消息从服务发出到消息队列后,队列分区是被均分到正常服务灰度服务监听的;这样会导致正常服务消费到灰度消息;同时灰度服务消费到正常的消息;所以MQ灰度是目前需要解决的问题,这样才能完成整个灰度链路;

RocketMQ 基础

基础消息模型

在这里插入图片描述

扩展后的消息模型

在这里插入图片描述

  • 相同的ConsumerGroup下的消费者主要有两种负载均衡模式,即广播模式,和集群模式(图中是最常用的集群模式)。
  • 在集群模式下,同一个 ConsumerGroup 中的 Consumer 实例是负载均衡消费,如图中 ConsumerGroupA 订阅 TopicA,TopicA 对应 3个队列,则 GroupA 中的 Consumer1 消费的是 MessageQueue 0和 MessageQueue 1的消息,Consumer2是消费的是MessageQueue2的消息。
  • 在广播模式下,同一个 ConsumerGroup 中的每个 Consumer 实例都处理全部的队列。需要注意的是,广播模式下因为每个 Consumer 实例都需要处理全部的消息,因此这种模式仅推荐在通知推送、配置同步类小流量场景使用。

部署模型

Producer、Consumer又是如何找到Topic和Broker的地址呢?消息的具体发送和接收又是怎么进行的呢?

在这里插入图片描述

RocketMQ 部署架构上主要分为四部分:

  • 生产者 Producer

    • 发布消息的角色。Producer通过 MQ 的负载均衡模块选择相应的 Broker 集群队列进行消息投递,投递的过程支持快速失败和重试。
  • 消费者 Consumer

    • 消息消费的角色
    • 支持以推(push),拉(pull)两种模式对消息进行消费。
    • 同时也支持集群方式和广播方式的消费。
    • 提供实时消息订阅机制,可以满足大多数用户的需求。
  • 名字服务器 NameServer

    • NameServer是一个简单的 Topic 路由注册中心,支持 Topic、Broker 的动态注册与发现。
      • Broker管理,NameServer接受Broker集群的注册信息并且保存下来作为路由信息的基本数据。然后提供心跳检测机制,检查Broker是否还存活;
      • 路由信息管理,每个NameServer将保存关于 Broker 集群的整个路由信息和用于客户端查询的队列信息。Producer和Consumer通过NameServer就可以知道整个Broker集群的路由信息,从而进行消息的投递和消费。
  • 代理服务器 Broker

    • Broker主要负责消息的存储、投递和查询以及服务高可用保证。
    • NameServer几乎无状态节点,因此可集群部署,节点之间无任何信息同步。Broker部署相对复杂。
      • 每个 Broker 与 NameServer 集群中的所有节点建立长连接,定时注册 Topic 信息到所有 NameServer。
      • Producer 与 NameServer 集群中的其中一个节点建立长连接,定期从 NameServer 获取Topic路由信息,并向提供 Topic 服务的 Master 建立长连接,且定时向 Master 发送心跳。Producer 完全无状态。
      • Consumer 与 NameServer 集群中的其中一个节点建立长连接,定期从 NameServer 获取 Topic 路由信息,并向提供 Topic 服务的 Master、Slave 建立长连接,且定时向 Master、Slave发送心跳。Consumer 既可以从 Master 订阅消息,也可以从Slave订阅消息。

相关概念点

  • CommitLog:消息体实际存储的地方,当我们发送的任一业务消息的时候,它最终会存储在commitLog上。MQ在Broker进行集群部署(这里为也简洁,不涉及主从部分)时,同一业务消息只会落到集群的某一个Broker节点上。而这个Broker上的commitLog就会存储所有Topic路由到它的消息,当消息数据量到达1个G后会重新生成一个新的commitLog。
  • Topic:消息主题,表示一类消息的逻辑集合。每条消息只属于一个Topic,Topic中包含多条消息,是MQ进行消息发送订阅的基本单位。属于一级消息类型,偏重于业务逻辑设计。
  • Tag:消息标签,二级消息类型,每一个具体的消息都可以选择性地附带一个Tag,用于区分同一个Topic中的消息类型
  • Queue:实际上Topic更像是一个逻辑概念供我们使用,在源码层级看,Topic以Queue的形式分布在多个Broker上,一个topic往往包含多条Queue
  • 消费组及其ID:表示一类Producer或Consumer,这类Producer或Consumer通常生产或消费同应用域的消息,且消息生产与消费的逻辑一致。每个消费组可以定义全局维一的GroupID来标识,由它来代表消费组。不同的消费组在消费时互相隔离,不会影响彼此的消费位点计算。

方案对比

影子Topic的方案

  • 新建一系列新的Topic来处理隔离灰度的消息
    • 例如对于TOPIC_ORDER会创建TOPIC_ORDER_GRAY来给灰度环境使用。
  • 发送方在发送时,对灰度的消息写入到影子Topic中。消费时,灰度环境只使用灰度的分组订阅灰度的Topic。

在这里插入图片描述

Tag的方案

  • 发送方在发送时,对灰度环境产生的消息的Tag加灰度标识
  • 消费方,每次灰度版本发布时只订阅灰度Tag的消息,正常的版本订阅非灰度的Tag。

在这里插入图片描述

UserProperty的方案

  • 发送方在发送时,对灰度环境产生的消息的UserProperty加灰度标识。
  • 消费方的客户端需要进行改写,根据UserProperty来过滤,正常的版本会跳过这类消息,灰度的版本会处理灰度消息。
  • 流程与灰度Tag差不错

影子Group的方案

  • 发送消息:灰度的服务发送带有灰度标识的消息
  • 消费消息:灰度服务只消费灰度标识的消息
    在这里插入图片描述

灰度分区的方案

  • 发送者注册时会检测当前将是不是灰度节点,灰度节点MQ注册的clientId会添加标识
  • 消费者订阅时会检测当前节点是不是灰度节点,灰度节点会走灰度的Queue分配策略

在这里插入图片描述

方案对比

方案优点缺点成本
影子Topic的方案使用独立的两个TOPIC,订阅关系一致,改造比较容易有临界问题,灰度切换生产时有可能会漏掉消息;同时需要根据生产、消费者关系维护对应的订阅关系;改造数据需求需要维护两套消费组和TOPC,有维护成本
影子Group的方案使用独立的两个GROUP,订阅关系一致有临界问题;需要修改生产者、订阅者、DevOps改动范围较大;正常节点和灰度节点都需要知道当前服务的灰度状态,来做出对应的处理需要维护两套GROUP的关系,维护成本高
Tag的方案通过MQ提供的tag机制过滤,可以保证灰度的消息只会被灰度节点消费,改造简单;如果Tag参与的业务过滤,不适合该方案;如果没有灰度节点订阅关系不一致,会出现消息丢失
UserProperty的方案同上同上同上
灰度分区的方案订阅关系一致,不需要额外维护多一套TOPIC、GROUP;无(比较适合公司的一个运营模式)

灰度分区方案设计

灰度分区主要基于以下几点:

  • DevOps:灰度服务的Pod容器内部会有 CANARY_RELEASEtrue 的环境变量。
  • MQ客户端心跳上报:源码中,RocketMQ客户端启动时会想向所有产生订阅关系的broker发送心跳,心跳中带有clientId,该值主要由实例名、容器ip等组成,可以利用canary环境变量做一层额外的注入
  • MQ客户端重平衡:源码中,每隔20秒/客户端上下线,都会触发一次客户端重平衡;我们可以自定义该策略,加入灰度分区平衡逻辑,来实现灰度分区和正常分区的订阅
  • MQ客户端发送方:源码中,RocketMQ发送方每次发送消息都会轮询队列发送,同时加入重试和故障规避的策略,可以通过重写该类来做扩展。

适配只有部分灰度的情况所做的功能扩展

  • 基于ThreadLocal来实现消费和发送的链路标识传递
    • 拦截HTTP请求:通过拦截http请求识别请求是否灰度,基于ThreadLocal实现线程传递;
    • 拦截MQ消息消费:通过拦截消费者消息,识别UserProperty识别存在灰度标识;更新到ThreadLocal中
    • 拦截MQ生产者发送消息:获取ThreadLocal存储的当前线程的UserProperty标识,写入到发送消息的UserProperty
  • MQ客户端发送方:检测到环境和线程链路变量,做出对应的发送策略;从而来实现灰度消息通过正常应用后,发送MQ消息能被下级监听的灰度应用接收

消费者(无灰度)

在这里插入图片描述

消费者(有灰度)

在这里插入图片描述

改造流程

生产者改造逻辑

无论何时,只要发送方自己当前环境是灰度(CANARY_RELEASE=true)或者当前是灰度链路,则会最后两个分区作为灰度队列,否则选取其他分区发送,根据RocketMQ的源码,自定义发送策略即可实现。


import org.apache.rocketmq.client.common.ThreadLocalIndex;
import org.apache.rocketmq.client.impl.producer.TopicPublishInfo;
import org.apache.rocketmq.client.latency.LatencyFaultTolerance;
import org.apache.rocketmq.client.latency.LatencyFaultToleranceImpl;
import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.logging.InternalLogger;

import java.util.Collections;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;

/**
 * Custom fault strategy
 */
public class CustomMQFaultStrategy {

    private final InternalLogger log = ClientLogger.getLog();
    private final LatencyFaultTolerance<String> latencyFaultTolerance = new LatencyFaultToleranceImpl();
    private boolean sendLatencyFaultEnable = false;
    private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L};
    private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};

    private final ConcurrentHashMap<TopicPublishInfo, TopicPublishInfoCache> topicPublishInfoCacheTable = new ConcurrentHashMap<>();


    public long[] getNotAvailableDuration() {
        return notAvailableDuration;
    }


    public void setNotAvailableDuration(final long[] notAvailableDuration) {
        this.notAvailableDuration = notAvailableDuration;
    }


    public long[] getLatencyMax() {
        return latencyMax;
    }


    public void setLatencyMax(final long[] latencyMax) {
        this.latencyMax = latencyMax;
    }


    public boolean isSendLatencyFaultEnable() {
        return sendLatencyFaultEnable;
    }


    public void setSendLatencyFaultEnable(final boolean sendLatencyFaultEnable) {
        this.sendLatencyFaultEnable = sendLatencyFaultEnable;
    }


    private TopicPublishInfoCache checkCacheChanged(TopicPublishInfo topicPublishInfo) {
        if (topicPublishInfoCacheTable.containsKey(topicPublishInfo)) {
            return topicPublishInfoCacheTable.get(topicPublishInfo);
        }
        synchronized (this) {
            TopicPublishInfoCache cache = new TopicPublishInfoCache();
            List<MessageQueue> canaryQueues = MessageStorage.getCanaryQueues(topicPublishInfo.getMessageQueueList());
            List<MessageQueue> normalQueues = MessageStorage.getNormalQueues(topicPublishInfo.getMessageQueueList());
            Collections.sort(canaryQueues);
            Collections.sort(normalQueues);
            cache.setCanaryQueueList(canaryQueues);
            cache.setNormalQueueList(normalQueues);
            topicPublishInfoCacheTable.putIfAbsent(topicPublishInfo, cache);
        }
        return topicPublishInfoCacheTable.get(topicPublishInfo);
    }


    /**
     * Queue selection strategy
     */
    public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
        List<MessageQueue> messageQueueList = tpInfo.getMessageQueueList();
        TopicPublishInfoCache topicPublishInfoCache = checkCacheChanged(tpInfo);
        if (MessageStorage.isCanaryRelease() || MessageStorage.isThreadCanaryRelease()) {
            MessageQueue messageQueue = selectDefaultMessageQueue(tpInfo, lastBrokerName, topicPublishInfoCache.getCanaryQueueList());
            log.debug("canary context,send message to canary queue:{}", messageQueue.getBrokerName() + messageQueue.getQueueId());
            return messageQueue;
        } else {
            if (this.sendLatencyFaultEnable) {
                try {
                    int index = tpInfo.getSendWhichQueue().incrementAndGet();
                    int size = topicPublishInfoCache.getNormalQueueList().size();
                    for (int i = 0; i < size; i++) {
                        int pos = Math.max(Math.abs(index++) % size, 0);
                        MessageQueue mq = topicPublishInfoCache.getNormalQueueList().get(pos);
                        if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {
                            return mq;
                        }
                    }
                    final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
                    int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
                    if (writeQueueNums > 0) {
                        final MessageQueue mq = tpInfo.selectOneMessageQueue();
                        if (!topicPublishInfoCache.getCanaryQueueList().contains(mq)) {
                            if (notBestBroker != null) {
                                mq.setBrokerName(notBestBroker);
                                mq.setQueueId(tpInfo.getSendWhichQueue().incrementAndGet() % writeQueueNums);
                            }
                            return mq;
                        }
                    } else {
                        latencyFaultTolerance.remove(notBestBroker);
                    }
                } catch (Exception e) {
                    log.error("Error occurred when selecting message queue", e);
                }
            }

            return selectDefaultMessageQueue(tpInfo, lastBrokerName, topicPublishInfoCache.getNormalQueueList());
        }

    }


    /**
     * Default message queue selection strategy
     *
     * @param topicPublishInfo
     * @param lastBrokerName
     * @param queues
     * @return {@link MessageQueue }
     */
    private MessageQueue selectDefaultMessageQueue(final TopicPublishInfo topicPublishInfo, final String lastBrokerName,
                                                   List<MessageQueue> queues) {
        ThreadLocalIndex sendWhichQueue = topicPublishInfo.getSendWhichQueue();
        int size = queues.size();
        if (lastBrokerName != null) {
            for (int i = 0; i < size; i++) {
                int index = sendWhichQueue.incrementAndGet();
                int pos = Math.max(Math.abs(index) % size, 0);
                MessageQueue mq = queues.get(pos);
                if (!mq.getBrokerName().equals(lastBrokerName)) {
                    return mq;
                }
            }
        }
        int i = sendWhichQueue.incrementAndGet();
        int res = Math.max(Math.abs(i) % size, 0);
        log.debug("selectDefaultMessageQueue, lastBrokerName:{}, res:{}", lastBrokerName, topicPublishInfo.getMessageQueueList().get(res));
        return queues.get(res);
    }


    public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {
        if (this.sendLatencyFaultEnable) {
            long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency);
            this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration);
        }
    }

    private long computeNotAvailableDuration(final long currentLatency) {
        for (int i = latencyMax.length - 1; i >= 0; i--) {
            if (currentLatency >= latencyMax[i]) {
                return this.notAvailableDuration[i];
            }
        }

        return 0;
    }


    /**
     * Cache of topic publish info
     */
    private static class TopicPublishInfoCache {

        /**
         * Grayscale message queue list
         */
        private List<MessageQueue> canaryQueueList;


        private List<MessageQueue> normalQueueList;


        public List<MessageQueue> getCanaryQueueList() {
            return canaryQueueList;
        }

        public void setCanaryQueueList(List<MessageQueue> canaryQueueList) {
            this.canaryQueueList = canaryQueueList;
        }

        public List<MessageQueue> getNormalQueueList() {
            return normalQueueList;
        }

        public void setNormalQueueList(List<MessageQueue> normalQueueList) {
            this.normalQueueList = normalQueueList;
        }

    }
}

消费者改造逻辑

  • 其实最大的问题在于消费方如何动态的感知灰度的状态流转,这也是产生之前灰度分区方案的临界问题的根本原因。但是通过源码的深入探索,发现其实我们可以通过改造ClientId和自定义负载均衡策略来实现;

  • RocketMQ客户端启动的时候,会构建本地客户端id(包括实例名、ip名等),然后向broker注册自己。我们可以通过DevOps注入的环境变量CANARY_RELEASE来做改造,即灰度服务clientId后面追加canary表示,default服务后面追加default标识;

消费者自定义负载均衡

import com.alibaba.fastjson.JSON;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.logging.InternalLogger;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;


public class CanaryAllocateMessageQueueStrategyImpl implements AllocateMessageQueueStrategy {

    private final InternalLogger log = ClientLogger.getLog();

    @Override
    public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll, List<String> cidAll) {

        log.info("consumerGroup:{} currentCID:{} cidAll:{}",consumerGroup,currentCID, JSON.toJSONString(cidAll));
        if (!check(consumerGroup, currentCID, mqAll, cidAll)) {
            return Collections.emptyList();
        }

        if (mqAll.stream().anyMatch(mq -> mq.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX))) {
            return allocateByAvg(consumerGroup, currentCID, mqAll, cidAll);
        }

        if (!MessageStorage.hasCanaryRelease(cidAll)) {
            List<MessageQueue> allocate = allocateByAvg(consumerGroup, currentCID, mqAll, cidAll);
            return allocate;
        }

        if (MessageStorage.allCanaryRelease(cidAll)) {
            List<MessageQueue> messageQueues = this.balanceAllocate(consumerGroup, currentCID, mqAll, cidAll);
            log.info("[canary allocate]: group:{} sub topic:{} has all canary release client,maybe the sub is new,use the default avg strategy.\n" +
                            "current cid:{}\n" +
                            "allocate total {} message queue\n" +
                            "result:\n{}",
                    consumerGroup,
                    mqAll.get(0).getTopic(),
                    messageQueues.size(),
                    currentCID,
                    MessageStorage.joinMessageQueue(messageQueues));
            return messageQueues;
        }

        List<String> canaryCids = MessageStorage.getCanaryCids(cidAll);
        List<String> normalCids = MessageStorage.getNormalCids(cidAll);
        List<MessageQueue> canaryQueues = MessageStorage.getCanaryQueues(mqAll);
        List<MessageQueue> normalQueues = MessageStorage.getNormalQueues(mqAll);
        Collections.sort(canaryCids);
        Collections.sort(normalCids);
        Collections.sort(normalQueues);
        Collections.sort(canaryQueues);
        List<MessageQueue> result = null;
        if (canaryCids.contains(currentCID)) {
            result = allocateByAvg(consumerGroup, currentCID, canaryQueues, canaryCids);
        } else {
            result = allocateByAvg(consumerGroup, currentCID, normalQueues, normalCids);
        }
        return result;

    }


    /**
     * @param consumerGroup
     * @param currentCID
     * @param mqAll
     * @param cidAll
     * @return {@link List }<{@link MessageQueue }>
     */
    private List<MessageQueue> allocateByAvg(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
                                             List<String> cidAll) {

        List<MessageQueue> result = new ArrayList<MessageQueue>();
        if (!check(consumerGroup, currentCID, mqAll, cidAll)) {
            return result;
        }

        int index = cidAll.indexOf(currentCID);
        int mod = mqAll.size() % cidAll.size();
        int averageSize =
                mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size()
                        + 1 : mqAll.size() / cidAll.size());
        int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod;
        int range = Math.min(averageSize, mqAll.size() - startIndex);
        for (int i = 0; i < range; i++) {
            result.add(mqAll.get((startIndex + i) % mqAll.size()));
        }
        return result;
    }

    @Override
    public String getName() {
        return "CANARY";
    }


    public boolean check(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
                         List<String> cidAll) {
        if (StringUtils.isEmpty(currentCID)) {
            throw new IllegalArgumentException("currentCID is empty");
        }
        if (CollectionUtils.isEmpty(mqAll)) {
            throw new IllegalArgumentException("mqAll is null or mqAll empty");
        }
        if (CollectionUtils.isEmpty(cidAll)) {
            throw new IllegalArgumentException("cidAll is null or cidAll empty");
        }

        if (!cidAll.contains(currentCID)) {
            log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}",
                    consumerGroup,
                    currentCID,
                    cidAll);
            return false;
        }

        return true;
    }

    public List<MessageQueue> balanceAllocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll, List<String> cidAll) {
        List<MessageQueue> result = new ArrayList<MessageQueue>();
        if (!check(consumerGroup, currentCID, mqAll, cidAll)) {
            return result;
        }

        int index = cidAll.indexOf(currentCID);
        int mod = mqAll.size() % cidAll.size();
        int averageSize =
                mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size()
                        + 1 : mqAll.size() / cidAll.size());
        int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod;
        int range = Math.min(averageSize, mqAll.size() - startIndex);
        for (int i = 0; i < range; i++) {
            result.add(mqAll.get((startIndex + i) % mqAll.size()));
        }
        return result;
    }
}

MQ注册ClientID修改

    public String buildMQClientId() {
        StringBuilder sb = new StringBuilder();
        sb.append(this.getClientIP());
        sb.append("@");
        sb.append(this.getInstanceName());
        if (!UtilAll.isBlank(this.unitName)) {
            sb.append("@");
            sb.append(this.unitName);
        }
        //The key is here
        if (MessageStorage.isCanaryRelease()) {
            sb.append(CustomCommonConstant.CANARY_RELEASE_PREFIX);
        } else {
            sb.append("@default");
        }

        if (this.enableStreamRequestType) {
            sb.append("@");
            sb.append(RequestType.STREAM);
        }
        return sb.toString();
    }

常量配置类

public class CustomCommonConstant {

    /**
     * Number of grayscale queues
     */
    public static final Integer GRAYSCALE_QUEUE_SIZE = 2;

    /**
     * Grayscale client identification
     */
    public static final String CANARY_RELEASE_PREFIX = "@canary";

    /**
     * Thread identification
     */
    public static final String THREAD_CANARY_RELEASE = "canary.release";
}

链路信息传递BasicMDC


import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;

public class BasicMDC {

    final ThreadLocal<Map<String, String>> copyOnThreadLocal = new ThreadLocal<>();
    private static final int WRITE_OPERATION = 1;
    private static final int MAP_COPY_OPERATION = 2;
    final ThreadLocal<Integer> lastOperation = new ThreadLocal();
    private Integer getAndSetLastOperation(int op) {
        Integer lastOp = (Integer)this.lastOperation.get();
        this.lastOperation.set(op);
        return lastOp;
    }

    private boolean wasLastOpReadOrNull(Integer lastOp) {
        return lastOp == null || lastOp == 2;
    }

    private Map<String, String> duplicateAndInsertNewMap(Map<String, String> oldMap) {
        Map<String, String> newMap = Collections.synchronizedMap(new HashMap());
        if (oldMap != null) {
            synchronized(oldMap) {
                newMap.putAll(oldMap);
            }
        }

        this.copyOnThreadLocal.set(newMap);
        return newMap;
    }

    public void put(String key, String val) throws IllegalArgumentException {
        if (key == null) {
            throw new IllegalArgumentException("key cannot be null");
        } else {
            Map<String, String> oldMap = (Map)this.copyOnThreadLocal.get();
            Integer lastOp = this.getAndSetLastOperation(1);
            if (!this.wasLastOpReadOrNull(lastOp) && oldMap != null) {
                oldMap.put(key, val);
            } else {
                Map<String, String> newMap = this.duplicateAndInsertNewMap(oldMap);
                newMap.put(key, val);
            }

        }
    }

    public void remove(String key) {
        if (key != null) {
            Map<String, String> oldMap = (Map)this.copyOnThreadLocal.get();
            if (oldMap != null) {
                Integer lastOp = this.getAndSetLastOperation(1);
                if (this.wasLastOpReadOrNull(lastOp)) {
                    Map<String, String> newMap = this.duplicateAndInsertNewMap(oldMap);
                    newMap.remove(key);
                } else {
                    oldMap.remove(key);
                }

            }
        }
    }

    public void clear() {
        this.lastOperation.set(1);
        this.copyOnThreadLocal.remove();
    }

    public String get(String key) {
        Map<String, String> map = (Map)this.copyOnThreadLocal.get();
        return map != null && key != null ? (String)map.get(key) : null;
    }

    public Map<String, String> getPropertyMap() {
        this.lastOperation.set(2);
        return (Map)this.copyOnThreadLocal.get();
    }

    public Set<String> getKeys() {
        Map<String, String> map = this.getPropertyMap();
        return map != null ? map.keySet() : null;
    }

    public Map<String, String> getCopyOfContextMap() {
        Map<String, String> hashMap = (Map)this.copyOnThreadLocal.get();
        return hashMap == null ? null : new HashMap(hashMap);
    }

    public void setContextMap(Map<String, String> contextMap) {
        this.lastOperation.set(1);
        Map<String, String> newMap = Collections.synchronizedMap(new HashMap());
        newMap.putAll(contextMap);
        this.copyOnThreadLocal.set(newMap);
    }
}

MessageStorage类


import com.alibaba.fastjson.JSON;
import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.logging.InternalLogger;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;

/**
 * Message storage
 */
public class MessageStorage {

    private static final InternalLogger log = ClientLogger.getLog();

    private static BasicMDC mdcUtils;

    static {
        mdcUtils = new BasicMDC();
    }

    /**
     * Determine whether there is a grayscale client
     *
     * @param cidAll
     * @return
     */
    public static boolean hasCanaryRelease(List<String> cidAll) {
        return !getCanaryCids(cidAll).isEmpty();
    }

    /**
     * Determine if all are grayscale clients
     *
     * @param cidAll
     * @return
     */
    public static boolean allCanaryRelease(List<String> cidAll) {
        List<String> canaryCids = getCanaryCids(cidAll);
        return canaryCids.size() == cidAll.size();
    }

    /**
     * Connect the message queue into a string
     *
     * @param messageQueues
     * @return
     */
    public static String joinMessageQueue(List<MessageQueue> messageQueues) {
        return messageQueues.stream()
                .map(mq -> mq.getBrokerName() + ":" + mq.getQueueId())
                .collect(Collectors.joining(", "));
    }


    /**
     * Get the list of grayscale clients
     *
     * @param cidAll
     * @return
     */
    public static List<String> getCanaryCids(List<String> cidAll) {
        return cidAll.stream()
                .filter(cid -> cid.contains(CustomCommonConstant.CANARY_RELEASE_PREFIX))
                .collect(Collectors.toList());
    }

    /**
     * Get a list of non grayscale clients
     *
     * @param cidAll
     * @return
     */
    public static List<String> getNormalCids(List<String> cidAll) {
        return cidAll.stream()
                .filter(cid -> !cid.contains(CustomCommonConstant.CANARY_RELEASE_PREFIX))
                .collect(Collectors.toList());
    }


    /**
     * Get the list of grayscale queues
     *
     * @param mqAll
     * @return
     */
    public static List<MessageQueue> getCanaryQueues(List<MessageQueue> mqAll) {
        Collections.sort(mqAll);
        log.info("topic:{} reBalance, has canary release client, allocate {} message queue by canary release strategy.\n", JSON.toJSONString(mqAll));
        int size = mqAll.size();
        if (size > CustomCommonConstant.GRAYSCALE_QUEUE_SIZE) {
            List<MessageQueue> lastTwo = mqAll.subList(size - CustomCommonConstant.GRAYSCALE_QUEUE_SIZE, size);
            return new ArrayList<>(lastTwo);
        } else {
            return new ArrayList<>();
        }
    }

    /**
     * Get non grayscale queue list
     *
     * @param mqAll
     * @return
     */
    public static List<MessageQueue> getNormalQueues(List<MessageQueue> mqAll) {
        Collections.sort(mqAll);
        log.info("topic:{} reBalance, has normal release client, allocate {} message queue by canary release strategy.\n", JSON.toJSONString(mqAll));
        int size = mqAll.size();
        if (size > CustomCommonConstant.GRAYSCALE_QUEUE_SIZE) {
            List<MessageQueue> lastTwo = mqAll.subList(0, size - 2);
            return new ArrayList<>(lastTwo);
        } else {
            return new ArrayList<>(mqAll);
        }
    }

    private static volatile String canaryRelease = null;

    /**
     * Determine whether it is a grayscale client
     *
     * @return
     */
    public static boolean isCanaryRelease() {
        return Boolean.parseBoolean(getCanaryRelease());
    }

    /**
     * Determine whether it is a grayscale client
     *
     * @return boolean
     */
    public static boolean isThreadCanaryRelease() {
        String data = mdcUtils.get(CustomCommonConstant.THREAD_CANARY_RELEASE);
        return Boolean.parseBoolean(data);
    }

    /**
     * Set MDC
     *
     * @param key
     * @param value
     */
    public static void setMDC(String key, String value) {
        mdcUtils.put(key, value);
    }

    /**
     * @param key
     * @return {@link String }
     */
    public static String getMdcKey(String key) {
        return mdcUtils.get(key);
    }

    /**
     * Clear MDC
     */
    public static void clearMDC() {
        mdcUtils.clear();
    }

    /**
     * Replacement strategy
     *
     * @return {@link AllocateMessageQueueStrategy }
     */
    public static AllocateMessageQueueStrategy getAllocateMessageQueueAveragely(AllocateMessageQueueStrategy allocateMessageQueueStrategy) {
//        if (isCanaryRelease()) {
//            return new CanaryAllocateMessageQueueStrategyImpl();
//        } else {
//            return allocateMessageQueueStrategy;
//        }
        return new CanaryAllocateMessageQueueStrategyImpl();
    }

    private static String getCanaryRelease() {
        if (Objects.isNull(canaryRelease)) {
            synchronized (MessageStorage.class) {
                if (Objects.nonNull(canaryRelease)) {
                    return canaryRelease;
                }
                //CANARY_RELEASE
                String tmpCanaryRelease = System.getProperty("canary.release");
                if (Objects.isNull(tmpCanaryRelease)) {
                    tmpCanaryRelease = "false";
                }
                canaryRelease = tmpCanaryRelease;
                return canaryRelease;
            }
        }
        return canaryRelease;
    }
}

拦截配置

消费者拦截
@Component
@Aspect
public class ConsumerAOP {

    @Pointcut("execution(public * org.apache.rocketmq.client.consumer.listener.*.*(..))")
    public void aspectConsumer() {
    }

    @Before("aspectConsumer()")
    public void doBefore(JoinPoint joinPoint) {
        Object[] args = joinPoint.getArgs();
        for (Object arg : args) {
            if (arg instanceof List<?>) {
                List<MessageExt> messageExtList = (List<MessageExt>) arg;
                for (MessageExt messageExt : messageExtList) {
                    String threadCanaryRelease = messageExt.getProperty(CustomCommonConstant.THREAD_CANARY_RELEASE);
                    if(StringUtils.isEmpty(threadCanaryRelease)){
                        threadCanaryRelease = "false";
                    }
                    MessageStorage.setMDC(CustomCommonConstant.THREAD_CANARY_RELEASE, threadCanaryRelease);
                }
            }
        }
    }
}
@Component
@Aspect
public class ListenerAOP {

    @Pointcut("execution(public * org.apache.rocketmq.spring.core.*.*(..))")
    public void aspectListener(){}


    @Before("aspectListener()")
    public void doBefore(JoinPoint joinPoint){
        Object[] args = joinPoint.getArgs();
        for (Object arg : args) {
            if (arg instanceof MessageExt){
                MessageExt messageExt = (MessageExt) arg;
                String threadCanaryRelease = messageExt.getProperty(CustomCommonConstant.THREAD_CANARY_RELEASE);
                if(StringUtils.isEmpty(threadCanaryRelease)){
                    threadCanaryRelease = "false";
                }
                MessageStorage.setMDC(CustomCommonConstant.THREAD_CANARY_RELEASE, threadCanaryRelease);
            }
        }
    }
}
生产者拦截
@Component
@Aspect
public class ProducerAOP {

    @Pointcut("execution(public * org.apache.rocketmq.client.producer.*.*(..))")
    public void aspectProducer() {
    }

    @Before("aspectProducer()")
    public void doBefore(JoinPoint joinPoint) {
        Object[] args = joinPoint.getArgs();
        for (Object arg : args) {
            if (arg instanceof Message) {
                Message message = (Message) arg;
                String threadCanaryRelease = MessageStorage.getMdcKey(CustomCommonConstant.THREAD_CANARY_RELEASE);
                if(StringUtils.isEmpty(threadCanaryRelease)){
                    threadCanaryRelease = "false";
                }
                message.putUserProperty(CustomCommonConstant.THREAD_CANARY_RELEASE, threadCanaryRelease);
            }
        }
    }
}
请求拦截
@Slf4j
@Component
@WebFilter(urlPatterns = "/*", filterName = "requestFilter")
@Order(Integer.MIN_VALUE)
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
public class ForwardFilter implements Filter {

    @SneakyThrows
    @Override
    public void doFilter(ServletRequest servletRequest, ServletResponse servletResponse, FilterChain filterChain) {
        HttpServletRequest req = (HttpServletRequest) servletRequest;
        String threadCanaryRelease = req.getHeader("THREAD_CANARY_RELEASE");
        log.info("attributeNames:{}", JSON.toJSONString(req.getHeaderNames()));
        if(Objects.nonNull(threadCanaryRelease)){
            MessageStorage.setMDC(CustomCommonConstant.THREAD_CANARY_RELEASE, "true");
        }
        filterChain.doFilter(servletRequest, servletResponse);
    }
}

验证过程

消费者(未启动灰度订阅)

在这里插入图片描述

消费者(灰度订阅)

在这里插入图片描述

验证消息链路

发送端
@Slf4j
@RestController
@RequestMapping("/producer")
public class ProducerMessageContrlller {
    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    @PostMapping("/send")
    public void send()
    {
        boolean canaryRelease = MessageStorage.isCanaryRelease();
        String body = String.format("发送消息,环境:%s", canaryRelease ? "灰度" : "正式");
//        for (Integer i = 0; i < 100; i++) {
            log.info("发送消息,环境:{}", canaryRelease ? "灰度" : "正式");
            rocketMQTemplate.convertAndSend("TEST-DATA-MSG", body);
//        }
    }
}
消费端
@Slf4j
@Service
@RocketMQMessageListener(topic = "TEST-DATA-MSG", consumerGroup = "test-consumer-group")
public class MyConsumer1 implements RocketMQListener<MessageExt> {
    @Override
    public void onMessage(MessageExt message) {
        boolean canaryRelease = MessageStorage.isCanaryRelease();
        Map<String, String> properties = message.getProperties();
        log.info("received message: 【{}】 环境:【{}】 配置参数:【{}】 ", new String(message.getBody()),canaryRelease ? "灰度" : "正式",JSON.toJSONString(properties));
    }
}

发送灰度消息

在这里插入图片描述

在这里插入图片描述
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2240868.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

YOLOv8改进,YOLOv8结合DynamicConv(动态卷积),CVPR2024,二次创新C2f结构

摘要 大规模视觉预训练显著提高了大规模视觉模型的性能。现有的低 FLOPs 模型无法从大规模预训练中受益。在本文中,作者提出了一种新的设计原则,称为 ParameterNet,旨在通过最小化FLOPs的增加来增加大规模视觉预训练模型中的参数数量。利用 DynamicConv 动态卷积将额外的参…

【C++】在windows下配置一个小巧实用的C/C++调试环境

目录 1.准备环境 2.cgdb 3. gdb-dashboard 4.常用命令 4.1 cgdb命令 4.2 gdb常用命令 虽然在大部分常用的C/C编辑器中&#xff0c;调试功能已经很方便且完善&#xff0c;但是&#xff0c;如果你还需要一个小巧一点&#xff0c;调试信息还完善的调试环境的&#xff0c;可以…

Dolby TrueHD和Dolby Digital Plus (E-AC-3)编码介绍

文章目录 1. Dolby TrueHD特点总结 2. Dolby Digital Plus (E-AC-3)特点总结 Dolby TrueHD 与 Dolby Digital Plus (E-AC-3) 的对比 Dolby TrueHD和Dolby Digital Plus (E-AC-3) 是两种高级的杜比音频编码格式&#xff0c;常用于蓝光影碟、流媒体、影院等高品质音频传输场景。它…

k8s上部署redis高可用集群

介绍&#xff1a; Redis Cluster通过分片&#xff08;sharding&#xff09;来实现数据的分布式存储&#xff0c;每个master节点都负责一部分数据槽&#xff08;slot&#xff09;。 当一个master节点出现故障时&#xff0c;Redis Cluster能够自动将故障节点的数据槽转移到其他健…

计算机网络——路由选择算法

路由算法 路由的计算都是以子网为单位计算的——找到从原子网到目标子网的路径 链路状态算法

4.3 Java JNI 机制

1 绪论 JNI 是一个原生编程接口。它允许在 Java 虚拟机&#xff08;JVM&#xff09;内运行的 Java 代码与用其它编程语言&#xff08;如 C、C 和汇编&#xff09;编写的应用程序和库进行互操作。 JNI 最重要的好处是它对底层 JVM 的实现没有限制。因此&#xff0c;JVM 供应商可…

influxDB 时序数据库安装 flux语法 restful接口 nodjsAPI

安装 Install InfluxDB | InfluxDB OSS v2 Documentation Debian和Ubuntu用户可以用apt-get包管理来安装最新版本的InfluxDB。 对于Ubuntu用户&#xff0c;可以用下面的命令添加InfluxDB的仓库&#xff0c;添加之后即可apt-get 安装influxdb2 wget -q https://repos.influx…

7z 解压器手机版与解压专家:安卓解压工具对决

7z 解压器手机版和解压专家都是在安卓设备上广受欢迎的解压软件。7z 解压器手机版由深圳乡里云网络科技有限公司开发&#xff0c;大小为 32.8M&#xff0c;支持多种常见的压缩文件格式&#xff0c;如.zip、.rar、.7z 等。 它对安卓操作系统的特性和用户习惯进行了优化&#xf…

亮数据——助力全球数据抓取的高效代理平台

目录 实际案例&#xff1a;利用代理服务抓取企业信息完整代码运行结果 亮数据的技术优势与应用场景产品更新&#xff1a;简化注册流程与智能助手升级立即注册&#xff0c;开启您的数据抓取之旅&#xff01; 在如今的大数据时代&#xff0c;企业决策越来越依赖于数据分析&#x…

设计模式之责任链模式(Chain Of Responsibility)

一、责任链模式介绍 1、责任链模式介绍 职责链模式(chain of responsibility pattern) 定义: 避免将一个请求的发送者与接收者耦合在 一起&#xff0c;让多个对象都有机会处理请求。将接收请求的对象连接成一条链&#xff0c;并且沿着这条链 传递请求&#xff0c;直到有一个对…

【月之暗面kimi-注册/登录安全分析报告】

前言 由于网站注册入口容易被机器执行自动化程序攻击&#xff0c;存在如下风险&#xff1a; 暴力破解密码&#xff0c;造成用户信息泄露&#xff0c;不符合国家等级保护的要求。短信盗刷带来的拒绝服务风险 &#xff0c;造成用户无法登陆、注册&#xff0c;大量收到垃圾短信的…

低代码牵手 AI 接口:开启智能化开发新征程

一、低代码与 AI 接口的结合趋势 低代码开发平台近年来在软件开发领域迅速崛起。随着企业数字化转型的需求不断增长&#xff0c;低代码开发平台以其快速构建应用程序的优势&#xff0c;满足了企业对高效开发的需求。例如&#xff0c;启效云低代码平台通过范式化和高颗粒度的可配…

安培环路定理

回忆 静电场中的回路定理&#xff1a;→静电场是保守场 安培环路定理 1、圆形回路包围无限长载流直导线 &#xff08;1&#xff09;回路逆时针 &#xff08;2&#xff09;回路顺时针 规定&#xff1a; 回路正向由右手螺旋定则判断&#xff08;根据回路绕行方向&#xff0c;…

IDEA 2024.3正式版发布,速览新功能!

0 前言 IntelliJ IDEA 2024.3 引入了一系列可以提升您的开发体验的强大新功能。 IDE 现在提供代码逻辑结构的表示&#xff0c;简化了 Kubernetes 应用程序的调试体验&#xff0c;引入了集群范围的 Kubernetes 日志访问。 1 关键亮点 1.1 Structure工具窗口中的 Logical代码结…

LabVIEW 实现 find_nearest_neighbors 功能(二维平面上的最近邻查找)

1. 背景介绍 在数据分析和图像处理领域&#xff0c;经常需要查找给定点的最近邻居点。在LabVIEW中&#xff0c;计算二维平面上多个点之间的欧氏距离&#xff0c;并返回距离最近的几个点是一种常见操作。find_nearest_neighbors 函数用于实现这个功能。 2. 欧氏距离计算 在二维…

LeetCode 单调栈 下一个更大元素 I

下一个更大元素 I nums1 中数字 x 的 下一个更大元素 是指 x 在 nums2 中对应位置 右侧 的 第一个 比 x 大的元素。 给你两个 没有重复元素 的数组 nums1 和 nums2 &#xff0c;下标从 0 开始计数&#xff0c;其中nums1 是 nums2 的子集。 对于每个 0 < i < nums1.length…

vue的组件使用

1.安装element plus组件库 npm install element-plus --save

2024-11-14 算法学习及论文辅导(每日更新,随时联系)

看看学习小群的学习氛围&#x1f447;&#x1f3fb; 很多同学自己学习遇到问题没人解决&#xff0c;最终消耗了时间&#xff0c;精力同时大大消耗了自己对学习的信心&#x1f627; &#x1f973;来看看跟班学习&#xff0c;大家遇到问题的时候是怎么解决的&#xff1a; 首先…

开源三代示波器的高速波形刷新方案开源,支持VNC远程桌面,手机,Pad,电脑均可访问(2024-11-11)

说明&#xff1a; 1、本来这段时间是一年一度Hackaday硬件设计开源盛宴&#xff0c;但hackaday电子大赛在去年终结了。所以我开源个我的吧。 2、三代示波器的高速波形刷新方案&#xff0c;前两年就做好了&#xff0c;这两年忙H7-TOOL的更新比较多&#xff0c;三代示波器的更新…

B-树特点以及插入、删除数据过程

B树&#xff08;B-Tree&#xff09;是一种自平衡的多路查找树&#xff0c;它广泛应用于数据库索引和文件系统中&#xff0c;尤其适用于外部存储设备&#xff08;如磁盘&#xff09;。B树的设计使得它能够高效地存储大量数据并支持高效的插入、删除和查询操作。以下是B树的主要特…