【消息中间件】RocketMQ底层如何实现生产者发送消息

news2024/11/25 19:40:29

目录

一、前言

二、实现生产者发送消息

1、启动生产者

1.1、RocketMQTemplate消息发送模板

1.2、afterPropertiesSet()逻辑

1.3、DefaultMQProducer#start()逻辑

2、DefaultMQProducer#start()启动逻辑

2.1、更新路由信息到本地

2.2、从本地获取主题Topic信息

2.3、数据更新维护

3、生产者发送消息

3.1、消息发送默认实现与重试

3.2、异步发送


一、前言

    前面的文章我们聊了RocketMQ如何集成到SpringBoot,基于SpringBoot的自动装配机制;还聊了消费者的启动流程,其实跟我们下面将要聊的生产者启动流程是类似的。本篇文章我们补充一下对RocketMQTemplate消息发送模板介绍,它里面的方法笔者在另一个专栏《Java基础及实战》已经对其接口详细描述了,代码层面简单就不介绍了,感兴趣的可以去看看。本篇文章我们重点讨论生产者是如何实现消息发送的、消息发送失败如何重试、重试次数、如何检测超时、异步发送?如何进行数据维护的、数据结构、缓存本地?下面我们来探究探究:

二、实现生产者发送消息

1、启动生产者

1.1、RocketMQTemplate消息发送模板

public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> implements InitializingBean, DisposableBean {
    private static final Logger log = LoggerFactory.getLogger(RocketMQTemplate.class);
    /**
     * 默认消息生产者,在RocketMQAutoConfiguration进行SpringBoot自动装配时先初始化
     * 然后在RocketMQTemplate初始化的过程中,进行setter注入
     */
    private DefaultMQProducer producer;
    /**
     * 处理json数据
     */
    private ObjectMapper objectMapper;
    private String charset = "UTF-8";
    private MessageQueueSelector messageQueueSelector = new SelectMessageQueueByHash();
    private final Map<String, TransactionMQProducer> cache = new ConcurrentHashMap();

    public RocketMQTemplate() {
    }
    // .....此处省略n行代码.......
}

RocketMQTemplate消息发送模板扩展了Spring的AbstractMessageSendingTemplate功能,同时实现了InitializingBean接口的afterPropertiesSet()方法逻辑下面分析(上一篇也分析了)、DisposableBean接口的destroy()方法(完成资源回收,感兴趣的读者可以去看下)。RocketMQTemplate消息发送模板只有一个单纯构造方法,有几个字段,之前在前面文章也分析了是通过setter注入的,其它几个字段直接new初始化了。

1.2、afterPropertiesSet()逻辑

    @Override
    public void afterPropertiesSet() throws Exception {
        if (this.producer != null) {
            this.producer.start();
        }

    }

这里逻辑不多,比消费者的少了很多逻辑,这里就预校验以防空指针异常,然后就调用setter注入的生产者producer的start()方法启动生产者。

1.3、DefaultMQProducer#start()逻辑

    @Override
    public void start() throws MQClientException {
        this.setProducerGroup(withNamespace(this.producerGroup));
        this.defaultMQProducerImpl.start();
        if (null != traceDispatcher) {
            try {
                traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());
            } catch (MQClientException e) {
                log.warn("trace dispatcher start failed ", e);
            }
        }
    }

这里的逻辑并不多,跟消费者的启动流程类似。DefaultMQProducer是发送消息的应用程序的入口点。优化公开 getter/setter 方法的字段是可以的,但是请记住,对于大多数场景,所有这些字段都应该能够很好地开箱即用。这个类聚合了各种send方法来向代理传递消息。它们各有优缺点; 在实际编写代码之前,最好了解它们的优缺点。线程安全: 在配置和启动进程之后,这个类可以被认为是线程安全的,可以在多线程上下文中使用。

2、DefaultMQProducer#start()启动逻辑

    public void start() throws MQClientException {
        this.start(true);
    }

    public void start(final boolean startFactory) throws MQClientException {
        switch (this.serviceState) {
            case CREATE_JUST:
                // 置为失败状态,ServiceState封装的代码可重用
                this.serviceState = ServiceState.START_FAILED;
                // 校验配置,逻辑较少
                this.checkConfig();

                if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
                    this.defaultMQProducer.changeInstanceNameToPID();
                }

                this.mQClientFactory = MQClientManager.getInstance()
                        .getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook);

                boolean registerOK = mQClientFactory
                        .registerProducer(this.defaultMQProducer.getProducerGroup(), this);
                if (!registerOK) {
                    this.serviceState = ServiceState.CREATE_JUST;
                    throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
                        + "] has been created before, specify another name please."
                            + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
                        null);
                }

                // 缓存路由信息
                this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());

                if (startFactory) {
                    mQClientFactory.start();
                }

                log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),
                    this.defaultMQProducer.isSendMessageWithVIPChannel());
                this.serviceState = ServiceState.RUNNING;
                break;
            case RUNNING:
            case START_FAILED:
            case SHUTDOWN_ALREADY:
                throw new MQClientException("The producer service state not OK, maybe started once, "
                    + this.serviceState
                    + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
                    null);
            default:
                break;
        }

        // 发送心跳到所有的Broker并且加锁
        this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
        // 启动定时任务
        RequestFutureHolder.getInstance().startScheduledTask(this);

    }

主要逻辑:

  1. 置为失败状态,ServiceState封装的代码可重用
  2. 校验配置,逻辑较少
  3. 注册生产者,维护生产者信息到MQClientInstance的(ConcurrentMap<String/* group */, MQProducerInner> 类型)producerTable字段,从中可以获取主题等信息。
  4. 缓存路由信息,里面维护了messageQueueList逻辑消息队列列表、TopicRouteData 路由表数据等几个字段。
  5. 启动MQClientInstance,跟消费者启动逻辑同理,下面补充一下
  6. 发送心跳到所有的Broker并且加锁
  7. 启动定时任务,延迟3秒执行,周期性间隔1秒执行扫描过期请求并移除

2.1、更新路由信息到本地

        // 延迟10毫秒,周期性间隔30秒定时执行
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

            @Override
            public void run() {
                try {
                    // 更新路由信息到本地,在消息发送的时候需用到路由信息
                    MQClientInstance.this.updateTopicRouteInfoFromNameServer();
                } catch (Exception e) {
                    log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e);
                }
            }
        }, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS);

MQClientInstance的底层实现涉及到远程通信模块,也维护了好几个定时任务。这里延迟10毫秒,周期性间隔30秒定时执行远程获取名字服务NameServer路由信息并更新路由信息到本地缓存起来,在消息发送的时候需用到路由信息

2.2、从本地获取主题Topic信息

    public void updateTopicRouteInfoFromNameServer() {
        Set<String> topicList = new HashSet<String>();

        // Consumer
        {
            Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();
            // 遍历已经注册的消费者表
            while (it.hasNext()) {
                Entry<String, MQConsumerInner> entry = it.next();
                MQConsumerInner impl = entry.getValue();
                if (impl != null) {
                    // 获取消费者订阅数据
                    Set<SubscriptionData> subList = impl.subscriptions();
                    if (subList != null) {
                        // 遍历订阅数据获取主题
                        for (SubscriptionData subData : subList) {
                            topicList.add(subData.getTopic());
                        }
                    }
                }
            }
        }

        // Producer
        {
            Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();
            // 遍历已经注册的消生产者表
            while (it.hasNext()) {
                Entry<String, MQProducerInner> entry = it.next();
                MQProducerInner impl = entry.getValue();
                if (impl != null) {
                    // 从路由表获取主题
                    Set<String> lst = impl.getPublishTopicList();
                    topicList.addAll(lst);
                }
            }
        }

        for (String topic : topicList) {
            this.updateTopicRouteInfoFromNameServer(topic);
        }
    }
  • 该方法主要逻辑就是从本地获取主题信息,然后遍历主题列表拉取名字服务NameServer服务端中TopicRouteInfo信息然后更新到客户端本地。
  • consumerTable(ConcurrentMap<String/* group */, MQConsumerInner>类型)维护了消费者注册的信息,MQConsumerInner是内部消息消费接口,实现类有push推模式的DefaultMQPushConsumerImpl,另一个是pull拉模式的DefaultLitePullConsumerImpl(2022版本)。消费者的订阅数据维护在Consumer端实现负载平衡的核心类RebalanceImpl的(ConcurrentMap<String /* topic */, SubscriptionData>类型) subscriptionInner字段
  • 从本地producerTable(ConcurrentMap<String/* group */, MQProducerInner>类型)获取已经注册的生产者信息,MQProducerInner的实现类目前只有DefaultMQProducerImpl,其维护了(ConcurrentMap<String/* topic */, TopicPublishInfo> 类型)topicPublishInfoTable字段,这里获取主题列表就是从该字段通过keySet()方法转换过来的。

2.3、数据更新维护

    public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,
        DefaultMQProducer defaultMQProducer) {
        try {
            // 尝试加锁,最多3秒
            if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
                try {
                    TopicRouteData topicRouteData;
                    if (isDefault && defaultMQProducer != null) {
                        // 从名字服务NameServer拉取路由表信息
                        topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),
                            clientConfig.getMqClientApiTimeout());
                        if (topicRouteData != null) {
                            for (QueueData data : topicRouteData.getQueueDatas()) {
                                int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());
                                data.setReadQueueNums(queueNums);
                                data.setWriteQueueNums(queueNums);
                            }
                        }
                    } else {
                        // 从名字服务NameServer拉取路由表信息
                        topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, clientConfig.getMqClientApiTimeout());
                    }
                    if (topicRouteData != null) {
                        // 根据此主题从本地路由表获取旧数据
                        TopicRouteData old = this.topicRouteTable.get(topic);
                        // 判断是否发生了变化,通过构建、排序以及equals()比对是否相同(!old.equals(now))
                        boolean changed = topicRouteDataIsChange(old, topicRouteData);
                        if (!changed) {
                            // 主要判断此主题下数据是否存在
                            changed = this.isNeedUpdateTopicRouteInfo(topic);
                        } else {
                            log.info("the topic[{}] route info changed, old[{}] ,new[{}]", topic, old, topicRouteData);
                        }

                        if (changed) {

                            for (BrokerData bd : topicRouteData.getBrokerDatas()) {
                                this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());
                            }

                            // Update endpoint map更新端点映射到本地缓存
                            {
                                ConcurrentMap<MessageQueue, String> mqEndPoints = topicRouteData2EndpointsForStaticTopic(topic, topicRouteData);
                                if (mqEndPoints != null && !mqEndPoints.isEmpty()) {
                                    topicEndPointsTable.put(topic, mqEndPoints);
                                }
                            }

                            // Update Pub info更新生产者的TopicPublishInfo信息
                            {
                                TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData);
                                publishInfo.setHaveTopicRouterInfo(true);
                                Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();
                                while (it.hasNext()) {
                                    Entry<String, MQProducerInner> entry = it.next();
                                    MQProducerInner impl = entry.getValue();
                                    if (impl != null) {
                                        // 更新路由信息
                                        impl.updateTopicPublishInfo(topic, publishInfo);
                                    }
                                }
                            }

                            // Update sub info更新消费者的订阅数据
                            if (!consumerTable.isEmpty()) {
                                Set<MessageQueue> subscribeInfo = topicRouteData2TopicSubscribeInfo(topic, topicRouteData);
                                Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();
                                while (it.hasNext()) {
                                    Entry<String, MQConsumerInner> entry = it.next();
                                    MQConsumerInner impl = entry.getValue();
                                    if (impl != null) {
                                        impl.updateTopicSubscribeInfo(topic, subscribeInfo);
                                    }
                                }
                            }
                            TopicRouteData cloneTopicRouteData = new TopicRouteData(topicRouteData);
                            log.info("topicRouteTable.put. Topic = {}, TopicRouteData[{}]", topic, cloneTopicRouteData);
                            // 消费者路由表
                            this.topicRouteTable.put(topic, cloneTopicRouteData);
                            return true;
                        }
                    } else {
                        log.warn("updateTopicRouteInfoFromNameServer, getTopicRouteInfoFromNameServer return null, Topic: {}. [{}]", topic, this.clientId);
                    }
                } catch (MQClientException e) {
                    if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX) && !topic.equals(TopicValidator.AUTO_CREATE_TOPIC_KEY_TOPIC)) {
                        log.warn("updateTopicRouteInfoFromNameServer Exception", e);
                    }
                } catch (RemotingException e) {
                    log.error("updateTopicRouteInfoFromNameServer Exception", e);
                    throw new IllegalStateException(e);
                } finally {
                    this.lockNamesrv.unlock();
                }
            } else {
                log.warn("updateTopicRouteInfoFromNameServer tryLock timeout {}ms. [{}]", LOCK_TIMEOUT_MILLIS, this.clientId);
            }
        } catch (InterruptedException e) {
            log.warn("updateTopicRouteInfoFromNameServer Exception", e);
        }

        return false;
    }
  1. 尝试加锁,最多3秒,可以避免死锁
  2. 根据主题topic从名字服务NameServer拉取路由表信息
  3. 根据此主题从本地路由表topicRouteTable获取旧数据TopicRouteData
  4. 判断是否发生了变化,通过构建、排序以及equals()比对是否相同(!old.equals(now))
  5. 再次判断主要判断此主题下数据是否存在:判断此主题下生产者本地路由表信息topicPublishInfoTable是否存在或路由表内部消息队列信息是否存在,不存在return true;否则继续判断此主题下消费者下rebalanceImpl.topicSubscribeInfoTable是否包含此主题
  6. 如果发生了变更,维护本地(ConcurrentMap<String/* Broker Name */, HashMap<Long/* brokerId */, String/* address */>>类型)brokerAddrTable字段
  7. 如果发生了变更,更新端点映射到本地缓存(ConcurrentMap<String/* Topic */, ConcurrentMap<MessageQueue, String/*brokerName*/>>类型)topicEndPointsTable
  8. 如果发生了变更,本地producerTable获取已注册生产者信息列表,更新生产者的TopicPublishInfo信息
  9. 如果发生了变更,更新消费者的订阅数据,维护到rebalanceImpl.topicSubscribeInfoTable
  10. 更新本地topicRouteTable路由表

3、生产者发送消息

3.1、消息发送默认实现与重试

    private SendResult sendDefaultImpl(
        Message msg,
        final CommunicationMode communicationMode,
        final SendCallback sendCallback,
        final long timeout
    ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        // 确保服务为运行状态
        this.makeSureStateOK();
        // 校验消息是否为空,消息体是否空,消息大小不能超过4M(默认可配置);否则抛异常
        Validators.checkMessage(msg, this.defaultMQProducer);
        // 随机调用ID
        final long invokeID = random.nextLong();
        long beginTimestampFirst = System.currentTimeMillis();
        long beginTimestampPrev = beginTimestampFirst;
        long endTimestamp = beginTimestampFirst;
        // 先根据Topic找到指定的TopicPublishInfo路由信息,首先尝试从本地topicPublishInfoTable获取,没有则从名字服务NameServer获取并缓存到本地
        TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
        // TopicPublishInfo路由信息不为空,消息队列可用
        if (topicPublishInfo != null && topicPublishInfo.ok()) {
            boolean callTimeout = false;
            MessageQueue mq = null;
            Exception exception = null;
            SendResult sendResult = null;
            // 重试次数,同步才有可能重试默认重试2次
            int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer
                    .getRetryTimesWhenSendFailed() : 1;
            // 由0开始,那么同步发送默认可执行3次如果发送失败
            int times = 0;
            String[] brokersSent = new String[timesTotal];
            // 同样的重试机制借助循环体
            for (; times < timesTotal; times++) {
                String lastBrokerName = null == mq ? null : mq.getBrokerName();
                // 根据容错策略选取一条消息队列投递消息,topicPublishInfo中维护了逻辑消费队列列表等信息
                MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
                if (mqSelected != null) {
                    mq = mqSelected;
                    brokersSent[times] = mq.getBrokerName();
                    try {
                        beginTimestampPrev = System.currentTimeMillis();
                        if (times > 0) {
                            //Reset topic with namespace during resend.在重发期间使用命名空间重置主题。
                            msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic()));
                        }
                        long costTime = beginTimestampPrev - beginTimestampFirst;
                        // 请求超时,选取队列也花费了一部分时间?获取路由信息可能花费一定时间,可能从名字服务端远程获取
                        if (timeout < costTime) {
                            callTimeout = true;
                            break;
                        }

                        sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo
                                , timeout - costTime);
                        endTimestamp = System.currentTimeMillis();
                        // 更新,以便生产者端负载均衡选择逻辑消费队列
                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev
                                , false);
                        // 通讯模式
                        switch (communicationMode) {
                            case ASYNC:
                                return null;
                            case ONEWAY:
                                return null;
                            // 同步发送才需要返回结果    
                            case SYNC:
                                if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
                                    if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
                                        continue;
                                    }
                                }

                                return sendResult;
                            default:
                                break;
                        }
                    } catch (RemotingException e) {
                        endTimestamp = System.currentTimeMillis();
                        // 更新可容错项,以便下次负载均衡
                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                        // 打印警告日志,continue重试
                        log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker:" +
                                " %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                        log.warn(msg.toString());
                        exception = e;
                        continue;
                    } catch (MQClientException e) {
                        endTimestamp = System.currentTimeMillis();
                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                        log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s"
                                , invokeID, endTimestamp - beginTimestampPrev, mq), e);
                        log.warn(msg.toString());
                        exception = e;
                        continue;
                    } catch (MQBrokerException e) {
                        endTimestamp = System.currentTimeMillis();
                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                        log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s"
                                , invokeID, endTimestamp - beginTimestampPrev, mq), e);
                        log.warn(msg.toString());
                        exception = e;
                        // 注意这里与上面不同,如果返回的响应编码在重试编码里面,则继续重试;否则抛异常结束这方法体
                        if (this.defaultMQProducer.getRetryResponseCodes().contains(e.getResponseCode())) {
                            continue;
                        } else {
                            if (sendResult != null) {
                                return sendResult;
                            }

                            throw e;
                        }
                    } catch (InterruptedException e) {
                        // 中断异常情况下不会存在重试机会,直接抛异常结束方法体
                        endTimestamp = System.currentTimeMillis();
                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
                        log.warn(String.format("sendKernelImpl exception, throw exception, InvokeID: %s, RT: %sms, Broker: %s"
                                , invokeID, endTimestamp - beginTimestampPrev, mq), e);
                        log.warn(msg.toString());
                        throw e;
                    }
                } else {
                    break;
                }
            }

            // 发送成功,返回发送响应结果
            if (sendResult != null) {
                return sendResult;
            }

            // 重试了依然发送失败,抛异常处理
            String info = String.format("Send [%d] times, still failed, cost [%d]ms, Topic: %s, BrokersSent: %s",
                times,
                System.currentTimeMillis() - beginTimestampFirst,
                msg.getTopic(),
                Arrays.toString(brokersSent));

            info += FAQUrl.suggestTodo(FAQUrl.SEND_MSG_FAILED);

            MQClientException mqClientException = new MQClientException(info, exception);
            if (callTimeout) {
                throw new RemotingTooMuchRequestException("sendDefaultImpl call timeout");
            }

            if (exception instanceof MQBrokerException) {
                mqClientException.setResponseCode(((MQBrokerException) exception).getResponseCode());
            } else if (exception instanceof RemotingConnectException) {
                mqClientException.setResponseCode(ClientErrorCode.CONNECT_BROKER_EXCEPTION);
            } else if (exception instanceof RemotingTimeoutException) {
                mqClientException.setResponseCode(ClientErrorCode.ACCESS_BROKER_TIMEOUT);
            } else if (exception instanceof MQClientException) {
                mqClientException.setResponseCode(ClientErrorCode.BROKER_NOT_EXIST_EXCEPTION);
            }

            throw mqClientException;
        }

        validateNameServerSetting();

        throw new MQClientException("No route info of this topic: " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO),
            null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);
    }
  1. 确保服务为运行状态
  2. 校验消息是否为空,消息体是否空,消息大小不能超过4M(默认可配置);否则抛异常
  3. 先根据Topic找到指定的TopicPublishInfo路由信息,首先尝试从本地topicPublishInfoTable获取,没有则从名字服务NameServer获取并缓存到本地
  4. TopicPublishInfo路由信息不为空,逻辑消息队列可用
  5. 重试次数,同步才有可能重试默认重试2次;由0开始,那么同步发送默认可执行3次如果发送失败
  6. 同样的重试机制借助循环体边界就是重试次数
  7. 根据容错策略选取一条消息队列投递消息,topicPublishInfo中维护了逻辑消费队列列表messageQueueList(List<MessageQueue> 类型)等信息
  8. 如果重试次数大于0,在重发期间使用命名空间重置主题。
  9. 远程调用前检查请求是否超时,选取队列也花费了一部分时间?获取路由信息可能花费一定时间,可能从名字服务端远程获取
  10. this.sendKernelImpl()预备远程调用,将消息发送到代理服务器Broker以被持久化等,其中有一部分逻辑,感兴趣可以看看
  11. 远程调用后更新,以便生产者端(客户端)负载均衡选择逻辑消费队列
  12. switch通讯模式,如果是同步模式,那么需要返回结果
  13. 异常场景处理都需要更新可容错项,以便下次负载均衡,同时打印日志:1)远程通信网络异常,会continue重试;2)消息客户端异常,可continue重试;3)远程消息代理服务器异常,注意这里与上面不同,如果返回的响应编码在重试编码里面,则继续重试;否则抛异常结束这方法体;4)中断异常情况下不会存在重试机会,直接抛异常结束方法体
  14. 如果发送成功返回结果;否则重试了依然发送失败,抛异常处理;如果请求超时,抛超时异常,否则封装为MQClientException并抛出。

3.2、异步发送

    @Deprecated
    public void send(final Message msg, final SendCallback sendCallback, final long timeout)
        throws MQClientException, RemotingException, InterruptedException {
        ExecutorService executor = this.getAsyncSenderExecutor();
        try {
            executor.submit(new Runnable() {
                @Override
                public void run() {
                    try {
                        sendDefaultImpl(msg, CommunicationMode.ASYNC, sendCallback, timeout);
                    } catch (Exception e) {
                        sendCallback.onException(e);
                    }
                }

            });
        } catch (RejectedExecutionException e) {
            throw new MQClientException("executor rejected ", e);
        }

    }

异步发送的底层实现就是多线程编程了,通过开启子线程去异步执行远程调用。具体实现细节就是上面讲的消息发送与重试,不过异步是不会存在重试的,想想到底为什么。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/90495.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

flink on yarn

文章目录flink sql client on yarnsession 模式Per-Job Cluster 模式flink run安装完hadoop 3.3.4之后&#xff0c;启动hadoop、yarn 将flink 1.14.6上传到各个服务器节点&#xff0c;解压 flink sql client on yarn https://nightlies.apache.org/flink/flink-docs-release…

SQL注入

目录 一、SQL注入原理 二、SQL注入的危害 三、SQL注入的分类 四、SQL注入的流程 五、总结 一、SQL注入原理 1.SQL注入产生的原因&#xff1a; 当Web应用向后台数据库传递SQL语句进行数据库操作时。如果对用户输入的参数没有经过严格的过滤处理&#xff0c;那么攻击者就可以构造…

面试真题 | 需求评审中从几个方面发现问题

面试官问题 在需求评审会议中&#xff0c;你会发现什么问题&#xff1f; 在需求评审时&#xff0c;是通过哪几个角度来进行考虑及发现问题的&#xff1f; 考察点 是否参加过需求评审 在需求评审过程中是否能提出有效的问题 4个角度发现问题 在需求评审的过程中通过以下4个…

【Vue 快速入门系列】一文透彻vue中使用axios及跨域问题的解决

文章目录一、什么是Axios&#xff1f;1.前置知识2.vue中使用axios3.Axios两种请求方式①.调用接口②.传入对象3.Axios支持的请求类型①.get请求②.post请求③.put请求④.patch请求⑤.delete请求二、跨域问题解决方案1.什么是跨域问题&#xff1f;2.解决方案一&#xff1a;在Vue…

基于微信小程序的社区心理健康服务-计算机毕业设计

项目介绍 社区心理健康服务平台小程序采用java开发语言、以及Mysql数据库等技术。系统主要分为管理员和用户、咨询师三部分&#xff0c;管理员服务端&#xff1a;首页、个人中心、用户管理、咨询师管理、心理书籍管理、相关资源管理、试卷管理、试题管理、系统管理、订单管理&…

希沃 API 网关架构演进之路

网关往期迭代与痛点 希沃网关的发展经历了四个版本的迭代。2013 年公司开始尝试互联网业务&#xff0c;那时候采用了 OpenRestyNGINX 静态配置的方式搭建了最初的网关&#xff0c;开发人员通过 SCP 来发布。与此同时一个比较严重的问题就是&#xff0c;每次上线发布都需要运维…

喜讯+1!袋鼠云数栈技术团队获“2022年度优秀开源技术团队”

近日&#xff0c;在“开源中国&#xff08;OSCHINA&#xff09;”开展的年度评选中&#xff0c;袋鼠云数栈技术团队凭借在2022年间的技术分享频率及质量、运营积极性等多方面的表现&#xff0c;荣获“2022年度优秀开源技术团队”的称号&#xff0c;这也是袋鼠云数栈技术团队连续…

umi学习总结

文章目录umi介绍umi是什么&#xff1f;umi的特性开发环境Node.js依赖管理工具目录结构路由配置路由页面跳转Link组件路由组件参数&#xff1a;路由动态参数query信息样式使用css样式dva为什么需要状态管理umi如何管理状态umi介绍 umi是什么&#xff1f; Umi&#xff0c;中文发…

自定义委托类

setItemDelegete();该函数可以自定义委托类 该例子为Qt官网的一个例子&#xff1a;使用QSpinBox来提供编辑功能 首先创建一个项目&#xff1a;名为object在项目中添加一个c类&#xff0c;类名为SpinBoxDelegate 修改该类的基类&#xff1a;更改为QImageDelegate,然后需要添加重…

12/15历史上的今天

宜找代驾 星期四 农历十一月廿二 今夜无人拥你入怀不如喝完杯中酒走入夜色中踏上回家的归途 *约翰-梅尔西藏墨脱公路嘎隆拉隧道顺利贯通 2010年12月15日&#xff0c;西藏墨脱公路控制性工程——嘎隆拉隧道顺利贯通。   2010年12月15日西藏墨脱公路控制性工程——嘎隆拉隧道…

华为开源自研AI框架昇思MindSpore应用实践:RNN实现情感分类

目录一、环境准备1.进入ModelArts官网2.使用CodeLab体验Notebook实例二、数据准备1.数据下载模块2.加载IMDB数据集2.加载预训练词向量三、数据集预处理四、模型构建1.Embedding2.RNN(循环神经网络)3.Dense4.损失函数与优化器5.训练逻辑6.评估指标和逻辑五、模型训练与保存六、模…

电脑重装系统后卡顿怎么办?教你快速解决电脑卡顿问题

​Win10电脑卡顿怎么办&#xff1f;许多用户在使用电脑的过程中发现&#xff0c;随着使用时间的增加&#xff0c;电脑会越来越卡顿。有些小伙伴就会选择重装电脑系统&#xff0c;那么我们在重装电脑之后要进行什么操作才能让电脑不卡顿呢&#xff1f; 操作方法&#xff1a; 优化…

java学生成绩管理系统源码swing(GUI) MySQL带开发教程永久学习

今天给大家演示一款由Java swing即GUI和mysql数据库实现的&#xff0c;学生成绩管理系统&#xff0c;系统采用了MVC的设计模式&#xff0c;结构层次非常清晰&#xff0c;此外&#xff0c;该项目有手把手的开发教程&#xff0c;适合刚入门Java的学生学习&#xff0c;下面我们来看…

Pr:导出设置

◆ ◆ ◆导出设置&#xff08;媒体文件&#xff09;Export Settings&#xff08;Media File&#xff09;基本设置文件名File Name指定导出的文件名。位置Location可以点击蓝色字更改导出的文件的存放位置。预设Preset选择导出预设。匹配源 Match Source预设会将大多数设置与源…

[附源码]Python计算机毕业设计高校贫困生信息管理系统Django(程序+LW)

该项目含有源码、文档、程序、数据库、配套开发软件、软件安装教程 项目运行 环境配置&#xff1a; Pychram社区版 python3.7.7 Mysql5.7 HBuilderXlist pipNavicat11Djangonodejs。 项目技术&#xff1a; django python Vue 等等组成&#xff0c;B/S模式 pychram管理等…

Mysql 查询获取 为数字的 字符串

先看示例数据: test_value 字段 为 VARVCHAR 类型 数据样例&#xff1a; 包含 纯数字&#xff0c; 带小数点的数字&#xff0c; 字符串 获取里面的纯数字 &#xff1a;使用正则匹配 函数 REGEXP &#xff0c;返回 1代表不匹配&#xff0c; 返回 0 代表匹配 包含小数点 [^0-…

两步开启研发团队专属ChatOps|极狐GitLab ChatOps 的设计与实践

本文来自&#xff1a; 彭亮 极狐(GitLab) 高级产品经理 郭旭东 极狐(GitLab) 资深创新架构师 舒文斌 极狐(GitLab) 高级网站可靠性工程师 最近几天&#xff0c;ChatGPT 真是杀疯了 &#xff01; 相信大家的朋友圈&#xff0c;已经被调戏、询问或探讨 ChatGPT 的贴子刷屏。 看到…

虹科案例 | 风电机组的预测性维护应该如何进行?

虹科预测性维护方案 在风能领域的应用 虹科案例 01 应用背景 风能是最重要的清洁能源之一&#xff0c;大力发展风电等清洁能源是实现国家可持续发展战略的必然选择。发展风电、光伏等新能源的高效运维技术已成为当前电力系统面临的重要问题之一。在风电机组单机容量较大、机组…

在Azure上设置存储账户

目录 &#xff08;一&#xff09;前言 &#xff08;二&#xff09;正文 1. 搜索存储账户类型资源 2. 开始创建新存储账户 &#xff08;1&#xff09;基本信息 &#xff08;2&#xff09;高级选项 &#xff08;3&#xff09;网络配置 &#xff08;4&#xff09;数据保护…

怎么看电脑是32位还是64位?2个方法,快速查看

熟悉计算机操作系统的朋友应该知道&#xff0c;电脑系统分为32位和64位。不同系统位数的兼容软件也会有所不同。怎么看电脑是32位还是64位&#xff1f;这里小编分享2个方法&#xff0c;快速查看自己的电脑系统位数。 方法一&#xff1a;电脑属性查看法 很多小伙伴不知道怎么看…