RocketMQ 消息发送源码解读

news2025/1/12 3:03:09

可靠同步发送、可靠异步发送、单向发送、批量消息发送。

RocketMQ 消息发送需要考虑以下3个问题。
1)消息队列如何进行负载?
2)消息发送如何实现高可用?
3)批量消息发送如何实现一致性?

org.apache.rocketmq.common.message.Message

public class Message implements Serializable { 
	private String topic; // 主题 
	private int flag; // 消息标记(不做处理) 
	private Map<String, String> properties; // 扩展属性 
	private byte[] body; // 消息体 
	private String transactionId; // 事务 id public Message() { } }

消息扩展属性:
org.apache.rocketmq.common.sysflag.MessageSysFlag

?这是啥意思


public class MessageSysFlag { 
public final static int COMPRESSED_FLAG = 0x1; 
public final static int MULTI_TAGS_FLAG = 0x1 << 1; 
public final static int TRANSACTION_NOT_TYPE = 0; 
public final static int TRANSACTION_PREPARED_TYPE = 0x1 << 2; 
public final static int TRANSACTION_COMMIT_TYPE = 0x2 << 2; 
public final static int TRANSACTION_ROLLBACK_TYPE = 0x3 << 2; 
public final static int BORNHOST_V6_FLAG = 0x1 << 4; 
public final static int STOREHOSTADDRESS_V6_FLAG = 0x1 << 5; //Mark the flag for batch to avoid conflict 
public final static int NEED_UNWRAP_FLAG = 0x1 << 6; 
public final static int INNER_BATCH_FLAG = 0x1 << 7; // COMPRESSION_TYPE 
public final static int COMPRESSION_LZ4_TYPE = 0x1 << 8; 
public final static int COMPRESSION_ZSTD_TYPE = 0x2 << 8; 
public final static int COMPRESSION_ZLIB_TYPE = 0x3 << 8; 
public final static int COMPRESSION_TYPE_COMPARATOR = 0x7 << 8; }

tags:消息 tag,用于消息过滤。
keys:消息索引键,用空格隔开,RocketMQ 可以根据这些 key(键)快速检索消息。
waitStoreMsgOK:消息发送时是否等消息存储完成后再返回。

DefaultMQProducer

继承了 MQAdmin 接口
image.png

public interface MQAdmin {
	// 创建 topic 
	void createTopic(String key, String newTopic, int queueNum, int topicSysFlag, Map<String, String> attributes) throws MQClientException; 
	// 根据时间搜索下标 
	long searchOffset(final MessageQueue mq, final long timestamp) throws MQClientException; 
	// 最大下标 long maxOffset(final MessageQueue mq) throws MQClientException; 
	// 最小下标 long minOffset(final MessageQueue mq) throws MQClientException; 
	// 获取最早消息存储时间 long earliestMsgStoreTime(final MessageQueue mq) throws MQClientException; 
	// 根据 msgId 查询消息 MessageExt viewMessage(final String offsetMsgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException; 
	// 查询消息 QueryResult queryMessage(final String topic, final String key, final int maxNum, final long begin, final long end) throws MQClientException, InterruptedException; }

org.apache.rocketmq.client.producer.MQProducer
主要方法有:

public interface MQProducer extends MQAdmin { 
// 获取发布的消息队列 
List<MessageQueue> fetchPublishMessageQueues(final String topic) throws MQClientException; 
// 同步发送 
SendResult send(final Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException; 
// 异步发送 
void send(final Message msg, final SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException; 
// 单向消息发送 
void sendOneway(final Message msg) throws MQClientException, RemotingException, InterruptedException; 
// 同步方式发送消息,且发送到指定的消息队列。 
SendResult send(final Message msg, final MessageQueue mq) throws MQClientException, RemotingException, MQBrokerException, InterruptedException; 
// 异步方式发送消息,且发送到指定的消息队列。 
void send(final Message msg, final MessageQueue mq, final SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException; 
// 单向方式发送消息,且发送到指定的消息队列。 
void sendOneway(final Message msg, final MessageQueue mq) throws MQClientException, RemotingException, InterruptedException; 
// 同步批量消息发送。 
SendResult send(final Message msg, final MessageQueueSelector selector, final Object arg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException; 
// 异步批量消息发送。 
void send(final Message msg, final MessageQueueSelector selector, final Object arg, final SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException; 
// 单向批量消息发送。 
void sendOneway(final Message msg, final MessageQueueSelector selector, final Object arg) throws MQClientException, RemotingException, InterruptedException; 
// 发送事务消息 
TransactionSendResult sendMessageInTransaction(final Message msg, final LocalTransactionExecuter tranExecuter, final Object arg) throws MQClientException; 
// 同步批量消息发送。 
SendResult send(final Collection<Message> msgs) throws MQClientException, RemotingException, MQBrokerException, InterruptedException; 
}

DefaultMQProducer 的核心属性

public class DefaultMQProducer extends ClientConfig implements MQProducer {
	protected final transient DefaultMQProducerImpl defaultMQProducerImpl; // 实际的生存者调用实现 
	private String producerGroup; // 生产者组 
	private volatile int defaultTopicQueueNums = 4; // 默认 4 个 队列 
	private int sendMsgTimeout = 3000; // 发送超时时间 
	private int compressMsgBodyOverHowmuch = 1024 * 4; // 消息体最大大小 
	private int retryTimesWhenSendFailed = 2; // 同步重试次数 
	private int retryTimesWhenSendAsyncFailed = 2; // 异步重试次数 
	private boolean retryAnotherBrokerWhenNotStoreOK = false; // 发送失败是否会指定另外的 broker 
	private int maxMessageSize = 1024 * 1024 * 4; // 最大消息为 4M 
}

消息生产者启动流程

image.png

org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#start (boolean)

// 检查配置  
        this.checkConfig();  
        if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {  
            // 修改生产者 id  
            this.defaultMQProducer.changeInstanceNameToPID();  
        }  
  
// 创建实例  
        this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook);  
  
// 注册实例  
        boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);  
        if (!registerOK) {  
            this.serviceState = ServiceState.CREATE_JUST;  
            throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()  
                    + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),  
                    null);  
        }  
// 成功就放入方便管理  
        this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());  
  
// 启动  
        if (startFactory) {  
            mQClientFactory.start();  
        }

org.apache.rocketmq.client.impl.MQClientManager#getOrCreateMQClientInstance (org.apache.rocketmq.client.ClientConfig, org.apache.rocketmq.remoting.RPCHook)

public MQClientInstance getOrCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) {  
    // 实例 id  
    String clientId = clientConfig.buildMQClientId();  
    MQClientInstance instance = this.factoryTable.get(clientId);  
    // 不存在则创建一个实例  
    if (null == instance) {  
        instance =  
                new MQClientInstance(clientConfig.cloneClientConfig(),  
                        this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);  
        MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance);  
        if (prev != null) {  
            instance = prev;  
            log.warn("Returned Previous MQClientInstance for clientId:[{}]", clientId);  
        } else {  
            log.info("Created new MQClientInstance for clientId:[{}]", clientId);  
        }  
    }  
    return instance;  
}

消息发送基本流程

发送流程图
image.png

RocketMQ 发送-存储-消费全流程。
image.png

RocketMQ 发送主代码。

private SendResult sendDefaultImpl(  
        Message msg,  
        final CommunicationMode communicationMode,  
        final SendCallback sendCallback,  
        final long timeout  
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {  
    // 消息验证  
    this.makeSureStateOK();  
    Validators.checkMessage(msg, this.defaultMQProducer);  
    // 拿 topic  
    TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());  
    if (topicPublishInfo != null && topicPublishInfo.ok()) {  
    ...  
        // 发送主逻辑  
        sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);  
    }  
  
    // 远程获取 nameSrc   
validateNameServerSetting();  
  
    throw new MQClientException("No route info of this topic: " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO),  
            null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);  
}

查找路由信息

tryToFindTopicPublishInfo 是查找主题的路由信息的方法
获取路由信息,发哪个 Broker。

private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {  
    // 查找路由信息  
    TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);  
    if (null == topicPublishInfo || !topicPublishInfo.ok()) {  
        this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());  
        this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);  
        topicPublishInfo = this.topicPublishInfoTable.get(topic);  
    }  
  
    if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {  
        return topicPublishInfo;  
    } else {  
        // 从 NameSpace 中更新路由信息  
        this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);  
        topicPublishInfo = this.topicPublishInfoTable.get(topic);  
        return topicPublishInfo;  
    }  
}

TopicPublishInfo 的 Class 结构:

  
public class TopicPublishInfo {  
    private boolean orderTopic = false;  
    private boolean haveTopicRouterInfo = false;  
    private List<MessageQueue> messageQueueList = new ArrayList<>();  
    private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex();  
    private TopicRouteData topicRouteData;  
}  
  
public class TopicRouteData extends RemotingSerializable {  
    private String orderTopicConf;  
    private List<QueueData> queueDatas;  
    private List<BrokerData> brokerDatas;  
    private HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;  
    //It could be null or empty  
    private Map<String/*brokerName*/, TopicQueueMappingInfo> topicQueueMappingByBroker;  
}

MQClientInstance#updateTopicRouteInfoFromNameServer 从 NameSpace 拿路由信息的方法。

public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,  
                                                  DefaultMQProducer defaultMQProducer) {  
    // 去 NameSpace 拿路由信息  
    TopicRouteData topicRouteData;  
    if (isDefault && defaultMQProducer != null) {  
        topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(clientConfig.getMqClientApiTimeout());  
        if (topicRouteData != null) {  
            for (QueueData data : topicRouteData.getQueueDatas()) {  
                int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());  
                data.setReadQueueNums(queueNums);  
                data.setWriteQueueNums(queueNums);  
            }  
        }    } else {  
        topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, clientConfig.getMqClientApiTimeout());  
    }  
  
    if (topicRouteData != null) {  
        // 获取路由信息  
        TopicRouteData old = this.topicRouteTable.get(topic);  
        // 对比  
        boolean changed = topicRouteData.topicRouteDataChanged(old);  
        if (!changed) {  
            // 发生变化就改路由信息  
            changed = this.isNeedUpdateTopicRouteInfo(topic);  
        } else {  
            log.info("the topic[{}] route info changed, old[{}] ,new[{}]", topic, old, topicRouteData);  
        }  
  
        if (changed) {  
  
            // 更新 Broker 地址缓存表  
            for (BrokerData bd : topicRouteData.getBrokerDatas()) {  
                this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());  
            }  
  
            // Update endpoint map  
            {  
                ConcurrentMap<MessageQueue, String> mqEndPoints = topicRouteData2EndpointsForStaticTopic(topic, topicRouteData);  
                if (!mqEndPoints.isEmpty()) {  
                    topicEndPointsTable.put(topic, mqEndPoints);  
                }  
            }  
            // Update Pub info  
            {  
                // 将 topicRouteData 转换为 TopicPublishInfo  
                TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData);  
                publishInfo.setHaveTopicRouterInfo(true);  
                for (Entry<String, MQProducerInner> entry : this.producerTable.entrySet()) {  
                    MQProducerInner impl = entry.getValue();  
                    if (impl != null) {  
                        impl.updateTopicPublishInfo(topic, publishInfo);  
                    }  
                }  
            }  
  
            // Update sub info  
            if (!consumerTable.isEmpty()) {  
                // 更新 consumer 信息  
                Set<MessageQueue> subscribeInfo = topicRouteData2TopicSubscribeInfo(topic, topicRouteData);  
                for (Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) {  
                    MQConsumerInner impl = entry.getValue();  
                    if (impl != null) {  
                        impl.updateTopicSubscribeInfo(topic, subscribeInfo);  
                    }  
                }  
            }  
            TopicRouteData cloneTopicRouteData = new TopicRouteData(topicRouteData);  
            log.info("topicRouteTable.put. Topic = {}, TopicRouteData[{}]", topic, cloneTopicRouteData);  
            this.topicRouteTable.put(topic, cloneTopicRouteData);  
            return true;        }  
    }

选择消息队列

首先会 broker + queue 的方式进行顺序排列。

[{“brokerName”: “broker-a”、“queueId”: 0}、
{“brokerName”:“broker-a”、“queueId”:1}、
{“brokerName”:“broker-a”、“queueId”:2}、
{“brokerName”:“broker-a”、“queueId”:3}、
{“brokerName”:“broker-b”、“queueId”:0}、
…]

Q:重试机制
由retryTimesWhenSendFailed指定同步方式重试次数
由 retryTimesWhenSend AsyncFailed 指定异常重试次数

选择消息队列有两种方式:

1)sendLatencyFaultEnable=false,默认不启用 Broker 故障延迟机制。
2)sendLatencyFaultEnable=true,启用 Broker 故障延迟机制。

如果 sendLatencyFaultEnable=false 延时故障启动机制,则调用 TopicPublishInfo #selectOneMessageQueue

    public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {  
        if (this.sendLatencyFaultEnable) {  
            try {  
                int index = tpInfo.getSendWhichQueue().incrementAndGet();  
                // 1)轮询获取一个消息队列。  
                for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {  
                    int pos = index++ % tpInfo.getMessageQueueList().size();  
                    MessageQueue mq = tpInfo.getMessageQueueList().get(pos);  
                    // 2)验证该消息队列是否可用,latencyFaultTolerance.isAvailable(mq.getBrokerName())是关键。  
                    if (!StringUtils.equals(lastBrokerName, mq.getBrokerName()) && latencyFaultTolerance.isAvailable(mq.getBrokerName())) {  
                        return mq;  
                    }  
                }  
                // 如果返回的MessageQueue可用,则移除latencyFaultTolerance关于该topic的条目,表明该Broker故障已经修复。  
                final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();  
                int writeQueueNums = tpInfo.getWriteQueueIdByBroker(notBestBroker);  
                if (writeQueueNums > 0) {  
                    // Broker故障延迟机制  
                    final MessageQueue mq = tpInfo.selectOneMessageQueue();  
                    if (notBestBroker != null) {  
                        mq.setBrokerName(notBestBroker);  
                        mq.setQueueId(tpInfo.getSendWhichQueue().incrementAndGet() % writeQueueNums);  
                    }  
                    return mq;  
                } else {  
                    latencyFaultTolerance.remove(notBestBroker);  
                }  
  
                return tpInfo.selectOneMessageQueue();  
            }  
// 不启用  
            return tpInfo.selectOneMessageQueue(lastBrokerName);  
        }  
  
// 故障机制,平移到其他 Queue  
        public MessageQueue selectOneMessageQueue() {  
            int index = this.sendWhichQueue.incrementAndGet();  
            int pos = index % this.messageQueueList.size();  
  
            return this.messageQueueList.get(pos);  
        }

RocketMQ 故障延迟机制核心类
image.png

核心类解析:
1、LatenctFaultTolerance:接口
2、LatenctFaultToleranceImpl:故障容错实现类
3、FaultItem:延迟最小单位
4、MQFalutStrategy:操作 LatenctFaultTolerance 接口封装类

详解
todo

消息发送

DefaultMQProducerImpl#sendKernelImpl
参数:

1)Message msg:待发送消息。
2)MessageQueue mq:消息将发送到该消息队列上。
3)CommunicationMode communicationMode:消息发送模式,包括 SYNC、ASYNC、ONEWAY。
4)SendCallback sendCallback:异步消息回调函数。
5)TopicPublishInfo topicPublishInfo:主题路由信息。
6)long timeout:消息发送超时时间。

主发送逻辑

private SendResult sendKernelImpl() {  
    // broker 地址 
    if (null == brokerAddr) {  
        tryToFindTopicPublishInfo(mq.getTopic());  
        brokerName = this.mQClientFactory.getBrokerNameFromMessageQueue(mq);  
        brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(brokerName);  
    }  
  
    SendMessageContext context = null;  
    if (brokerAddr != null) {  
        brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr);  
  
        byte[] prevBody = msg.getBody();  
        try {  
            //for MessageBatch,ID has been set in the generating process  
            // 对于 MsgBatch,在生成过程中已设置 ID            
            if (!(msg instanceof MessageBatch)) {  
                MessageClientIDSetter.setUniqID(msg);  
            }  
  
            // 标记压缩 FLAG  
            int sysFlag = 0;  
            boolean msgBodyCompressed = false;  
            if (this.tryToCompressMessage(msg)) {  
                sysFlag |= MessageSysFlag.COMPRESSED_FLAG;  
                sysFlag |= compressType.getCompressionFlag();  
                msgBodyCompressed = true;  
            }  
  
            // 标记事务消息  
            final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);  
            if (Boolean.parseBoolean(tranMsg)) {  
                sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;  
            }  
  
            // 注册消息发送钩子函数  
            if (hasCheckForbiddenHook()) {  
                CheckForbiddenContext checkForbiddenContext = new CheckForbiddenContext();  
				...
                this.executeCheckForbiddenHook(checkForbiddenContext);  
            }  
  
            if (this.hasSendMessageHook()) {  
                context = new SendMessageContext();  
                ... 
                // 发送之前的增强逻辑  
                this.executeSendMessageHookBefore(context);  
            }  
  
            // 构建消息发送请求包  
            SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();  
            ...
  
            // 发送消息 同步、异步、单向  
            SendResult sendResult = null;  
            switch (communicationMode) {  
                case ASYNC:  
                    break;                
                case ONEWAY:  
	                break;
                case SYNC:  
                    break;            
            }  
  
            // 发送之后的处理逻辑  
            if (this.hasSendMessageHook()) {  
                context.setSendResult(sendResult);  
                this.executeSendMessageHookAfter(context);  
            }  
  
            return sendResult;  
        } catch (RemotingException | InterruptedException | MQBrokerException e) {  
            // 如果注册了钩子函数,就执行 after 逻辑  
            if (this.hasSendMessageHook()) {  
                context.setException(e);  
                this.executeSendMessageHookAfter(context);  
            }  
            throw e;  
        } finally {  
            msg.setBody(prevBody);  
            msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));  
        }  
    }  
  
    throw new MQClientException("The broker[" + brokerName + "] not exist", null);  
}

三种发送方式 :

1、同步
2、异步
3、OneWay

// 发送方式  
switch (communicationMode) {  
    case ONEWAY:  
        this.remotingClient.invokeOneway(addr, request, timeoutMillis);  
        return null;    
    case ASYNC:  
        final AtomicInteger times = new AtomicInteger();  
        long costTimeAsync = System.currentTimeMillis() - beginStartTime;  
        if (timeoutMillis < costTimeAsync) {  
            throw new RemotingTooMuchRequestException("sendMessage call timeout");  
        }  
        this.sendMessageAsync(addr, brokerName, msg, timeoutMillis - costTimeAsync, request, sendCallback, topicPublishInfo, instance,  
            retryTimesWhenSendFailed, times, context, producer);  
        return null;   
    case SYNC:  
        long costTimeSync = System.currentTimeMillis() - beginStartTime;  
        if (timeoutMillis < costTimeSync) {  
            throw new RemotingTooMuchRequestException("sendMessage call timeout");  
        }  
        return this.sendMessageSync(addr, brokerName, msg, timeoutMillis - costTimeSync, request);  
    default:  
        assert false;  
        break;}

其核心重试逻辑就是先记录到容错 Item 中,然后在通过回调方法就行发送。
org.apache.rocketmq.client.impl.MQClientAPIImpl#sendMessageAsync 。

try {  
    this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() {  
        @Override  
        public void operationComplete(ResponseFuture responseFuture) {  
            long cost = System.currentTimeMillis() - beginStartTime;  
            RemotingCommand response = responseFuture.getResponseCommand();  
            if (null == sendCallback && response != null) {  
  
                try {  
                    SendResult sendResult = MQClientAPIImpl.this.processSendResponse(brokerName, msg, response, addr);  
                    if (context != null && sendResult != null) {  
                        context.setSendResult(sendResult);  
                        context.getProducer().executeSendMessageHookAfter(context);  
                    }  
                } catch (Throwable e) {  
                }  
  
                producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), false);  
                return;            }  
  
            if (response != null) {  
                try {  
                    SendResult sendResult = MQClientAPIImpl.this.processSendResponse(brokerName, msg, response, addr);  
                    assert sendResult != null;  
                    if (context != null) {  
                        context.setSendResult(sendResult);  
                        context.getProducer().executeSendMessageHookAfter(context);  
                    }  
  
                    try {  
                        sendCallback.onSuccess(sendResult);  
                    } catch (Throwable e) {  
                    }  
  
                    producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), false);  
                } catch (Exception e) {  
                    producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true);  
                    // 重试  
                    onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance,  
                        retryTimesWhenSendFailed, times, e, context, false, producer);  
                }  
            } else {  
                producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true);  
                if (!responseFuture.isSendRequestOK()) {  
                    MQClientException ex = new MQClientException("send request failed", responseFuture.getCause());  
                    // 重试  
                    onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance,  
                        retryTimesWhenSendFailed, times, ex, context, true, producer);  
                } else if (responseFuture.isTimeout()) {  
                    MQClientException ex = new MQClientException("wait response timeout " + responseFuture.getTimeoutMillis() + "ms",  
                        responseFuture.getCause());  
                    // 重试  
                    onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance,  
                        retryTimesWhenSendFailed, times, ex, context, true, producer);  
                } else {  
                    MQClientException ex = new MQClientException("unknow reseaon", responseFuture.getCause());  
                    // 重试  
                    onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance,  
                        retryTimesWhenSendFailed, times, ex, context, true, producer);  
                }  
            }  
        }  
    });  
} catch (Exception ex) {  
    long cost = System.currentTimeMillis() - beginStartTime;  
    producer.updateFaultItem(brokerName, cost, true);  
    // 重试  
    onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance,  
        retryTimesWhenSendFailed, times, ex, context, true, producer);  
}

批量发送

DefaultMQProducer #send (Collection msgs)
和单条发送差不多。

总结

关键点
1、延迟容错机制
2、重试机制

问题:

  • 消息队列如何进行负载?
  • 消息发送如何实现高可用?
  • 批量消息发送如何实现一致性?
  • 消息发送流程

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/420000.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

基于SpringBoot+微信小程序的失物招领小程序

基于SpringBoot微信小程序的失物招领小程序 ✌全网粉丝20W,csdn特邀作者、博客专家、CSDN新星计划导师、java领域优质创作者,博客之星、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java技术领域和毕业项目实战✌ &#x1f345;文末获取项目下载方式&#x1f345; 一、项目…

通达信欧奈尔RPS指标公式编写和设置方法(完全版)

通达信欧奈尔RPS指标公式的编写和设置较为复杂&#xff0c;对于初学者来说可能具有一定挑战性。在编写口袋支点公式时&#xff0c;需要使用RPS指标公式作为基础条件&#xff0c;因此有必要先了解其编写和设置方法。 一、上市一年以上选股 首先选出上市一年以上的股票&#xff…

python实现折线图和条形图

画图的部分函数 pandas 读取数据 相关包&#xff1a;import pandas as pd 函数&#xff1a;dfpd.read_excel(“文件名”) #读取excel文件 df.head(n)#查看前n行 df.tail(n)#查看后n行 df.shape #查看行数和列数 df.columns # 查看列索引 df.index #查看行索引 df.info() #查看…

怎样恢复删除的视频?视频恢复,4个方法!

案例&#xff1a;怎样恢复删除的视频 【谁懂啊&#xff01;电脑里视频太多了太占内存&#xff0c;本想删除一些不太重要的&#xff0c;但却删错了&#xff01;有朋友知道怎样恢复删除的视频吗&#xff1f;】 在数字化时代&#xff0c;我们经常使用电脑来存储和管理各种视频文…

股票量比实时筛选报警

一.什么是股票的量比 量比是短线投资一个参考指标&#xff0c;是衡量相对成交量的一个数值和指标&#xff0c;用于反映股票交易相对于以往的活跃程度&#xff1b;是指在股市开盘以后&#xff0c;平均每一分钟的成交量与过去五个交易日平均每分钟交易量的比。反映股票相对最近5…

自动驾驶定位模块的作用是什么?为什么会有多种坐标系?

无人车,要实现自动驾驶,首先要知道自己的的位置。更准确的说法是:相对某个坐标系,确定车辆的位置和姿态。 这个坐标系可以是局部的: 也可以是全局坐标系: 这是更大维度上的坐标系。 坐标系确定之后,相对坐标原点和坐标轴,车上坐标系(本地坐标系),平移得到位置(x…

SQL语句要点一文速览

以下内容参考《SQL必知必会&#xff08;第4版&#xff09;》 了解 SQL 数据库&#xff08;database&#xff09;&#xff1a;保存有组织的数据的容器&#xff08;通常是一个文件或一组文件&#xff09;。最简单的办法是将数据库想象为一个文件柜。这个文件柜是一个存放数据的…

【数据结构】算法的时间复杂度和空间复杂度(下)(附leetcode练习题)

☃️个人主页&#xff1a;fighting小泽 &#x1f338;作者简介&#xff1a;目前正在学习C语言和数据结构 &#x1f33c;博客专栏&#xff1a;数据结构 &#x1f3f5;️欢迎关注&#xff1a;评论&#x1f44a;&#x1f3fb;点赞&#x1f44d;&#x1f3fb;留言&#x1f4aa;&…

【Linux】system V 共享内存

文章目录system V1. 共享内存原理第一阶段原理第二阶段原理2. 直接写代码--编写代码进行原理介绍shmget函数ftok函数key值用法1. 创建key值2. 创建共享内存 获取共享内存3. 将自己和共享内存关联起来4. 将自己和共享内存取消关联5. 删除共享内存用指令删除调用系统调用完整代码…

数据库管理-第六十六期 SQL Domain(20230413)

数据库管理 2023-04-13第六十六期 SQL Domain1 基本介绍2 Domain的表达式和条件3 语法4 语义5 示例总结第六十六期 SQL Domain 上一期一笔带过了部分Oracle 23c的新特性&#xff0c;这一期重点讲一下SQL Domain新特性。 【https://docs.oracle.com/en/database/oracle/oracle-…

【提升效率神器】Python简单批量生成PDF文档(详细做法)

文章目录前言一、准备二、基本使用三、批量生成PDF总结前言 日常办公中&#xff0c;经常会使用PDF文档&#xff0c;难免需要对PDF文档进行编辑&#xff0c;有时候PDF文档中的大部分内容都是一样的&#xff0c;只是发送对象不同。 这种模板套用的场景下&#xff0c;使用Python…

BI 知识大全,值得收藏的干货

01、什么是商业智能BI&#xff1f; 商业智能BI可以实现业务流程和业务数据的规范化、流程化、标准化&#xff0c;打通ERP、OA、CRM等不同业务信息系统&#xff0c;整合归纳企业数据&#xff0c;利用数据可视化满足企业不同人群对数据查询、分析和探索的需求&#xff0c;从而为…

OpenCV实例(三)答题卡识别

OpenCV实例&#xff08;三&#xff09;答题卡识别1.答题卡识别概述2.单道题目的识别2.1基本流程及原理2.2代码实例&#xff1a;作者&#xff1a;Xiou 1.答题卡识别概述 随着信息化的发展&#xff0c;计算机阅卷已经成为一种常规操作。在大型考试中&#xff0c;客观题基本不再…

重整网站。。。。。。。。。

重整网站 写好回复的人 “ xxxxxxxx”通知栏&#xff0c;并且快速跳转到需要的页面。个人页面&#xff0c;记录自己发送的消息与回复的信息。以css 上传的图片防止被拉伸拉坏。 下拉的选择下拉的分页的好处。 评论功能的那一栏中的一个小的评论&#xff0c;如果手机端的话&a…

RabbitMQ 保证消息不丢失的几种手段

文章目录1.RabbitMQ消息丢失的三种情况2.RabbitMQ消息丢失解决方案2.1 针对生产者2.1.1 方案1 &#xff1a;开启RabbitMQ事务2.1.2 方案2&#xff1a;使用confirm机制2.2 Exchange路由到队列失败2.3 RabbitMq自身问题导致的消息丢失问题解决方案2.3.1 消息持久化2.3.2 设置集群…

无废话硬核分享:Linux 基础知识点总结很详细,全的很,吐血奉献

Linux 的学习对于一个程序员的重要性是不言而喻的。前端开发相比后端开发&#xff0c;接触 Linux 机会相对较少&#xff0c;因此往往容易忽视它。但是学好它却是程序员必备修养之一。 Linux 基础 操作系统 操作系统Operating System简称OS&#xff0c;是软件的一部分&#x…

【0基础学爬虫】爬虫基础之数据存储

大数据时代&#xff0c;各行各业对数据采集的需求日益增多&#xff0c;网络爬虫的运用也更为广泛&#xff0c;越来越多的人开始学习网络爬虫这项技术&#xff0c;K哥爬虫此前已经推出不少爬虫进阶、逆向相关文章&#xff0c;为实现从易到难全方位覆盖&#xff0c;特设【0基础学…

物联网时代的网络安全

近年来&#xff0c;物联网 (IoT) 彻底改变了我们的生活和工作方式。从智能家居到自动驾驶汽车&#xff0c;物联网设备在我们的日常生活中变得越来越普遍。 根据 Statista 的一份报告&#xff0c;到 2025 年将有超过 750 亿个物联网 (IoT) 设备投入使用。 然而&#xff0c;这…

c++之STl容器-string

目录 容器的分类 string string的概念 string的初始化 string的遍历 string的一些基本操作 char*类型和string类型互转 字符串的连接 字符串的查找和替换 string的截断和删除 容器的分类 在实际的开发过程中&#xff0c;数据结构本身的重要性不会逊于操作于数据结构的算…

SpringMVC03-文件上传、异常处理、拦截器

SpringMVC03 SpringMVC的文件上传 一 、文件上传的前端必要前提 form 表单的 entcype取值必须是&#xff1a;multipart/form-data。默认值&#xff1a;application/x-www-form-urlencoded&#xff0c;是表单请求正文的类型method 属性取值必须是 post提供一个文件选择域 二…