RocketMQ5.0消息消费<三> _ 消息消费

news2025/1/12 22:58:13

RocketMQ5.0.0消息消费<三> _ 消息消费

一、消息消费

1. 消费UML图
PUSH模式消息拉取机制参考《RocketMQ5.0.0消息消费<一> _ PUSH模式的消息拉取​》,PullMessageService负责对消息队列进行消息拉取,从Broker端拉取消息后将消息存入ProcessQueue消息处理队列中,后调用ConsumeMessageService#submitConsumeRequest方法将消息提交到线程池。使用消费线程池确保了消息拉取与消息消费的解耦。RocketMQ使用ConsumeMessageService来实现消息消费的处理逻辑。

RocketMQ支持顺序消费与并发消费,本章节介绍并发消费消息的流程。

下图所示是消费消息UML图,org.apache.rocketmq.client.impl.consumer.ConsumeMessageService维护一个消费线程池。
在这里插入图片描述
org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService是PUSH模式的并发消息消费实现类,其关键属性如下。

// 消息推模式实现类
private final DefaultMQPushConsumerImpl defaultMQPushConsumerImpl;
// 默认推模式消费者
private final DefaultMQPushConsumer defaultMQPushConsumer;
// 并发消息监听
private final MessageListenerConcurrently messageListener;
// 消费线程池的任务队列
private final BlockingQueue<Runnable> consumeRequestQueue;
// 消费线程池
private final ThreadPoolExecutor consumeExecutor;
// 消费组
private final String consumerGroup;
 
// 消费延迟调度器
private final ScheduledExecutorService scheduledExecutorService;
// 定时删除过期消息线程池
private final ScheduledExecutorService cleanExpireMsgExecutors;

2. 提交消息

参考《RocketMQ5.0.0消息消费<一> _ PUSH模式的消息拉取​》,成功拉取消息后,org.apache.rocketmq.client.consumer.PullCallback回调onSuccess(),把消息提交(异步提交)到ConsumeMessageService的线程池中,供消费者消费,则本次拉取消息完成。

org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService#submitConsumeRequest是提交拉取的消息到消费线程池中的核心方法,如下代码所示。注意事项:

  • consumeMessageBatchMaxSize:获取并发消费时一次消费消息条数,默认1条。

  • 判断拉取的消息条数(默认最大32条)与 consumeBatchSize大小比较:

    a. msgs.size() <= consumeBatchSize时:组装消费请求ConsumeRequest并提交消费任务到消费线程池中;

    b. msgs.size() > consumeBatchSize时:分页提交到消费线程池中。

  • submitConsumeRequestLater():消费线程池饱和拒绝时,则延迟5s再次提交消费请求。

/**
 * 提交消息消费,供消费者消费
 * 并发消息消费入口:{@link DefaultMQPushConsumerImpl#pullMessage}中的{@link org.apache.rocketmq.client.consumer.PullCallback}
 * step1:获取并发消费时一次消费消息条数,默认1条(DefaultMQPushConsumer.consumeMessageBatchMaxSize)
 * step2:msgs.size()一次拉取消息的条数,最大32条 <= consumeBatchSize时,
 *          a. 组装消费请求并提交消费任务到消费线程池中
 *          b. 出现饱和抛出异常时,延迟5s提交{@link ConsumeMessageConcurrentlyService#submitConsumeRequestLater(ConsumeRequest)}
 * step3:msgs.size()一次拉取消息的条数,最大32条 > consumeBatchSize时,分页提交到消费线程池中
 * @param msgs 一次拉取待消费消息,最大默认32条{@link DefaultMQPushConsumer#pullBatchSize}
 * @param processQueue 待消息消费处理队列
 * @param messageQueue 消息所属消费队列
 * @param dispatchToConsume 是否转发到消费线程池,并发消费则忽略
 */
@Override
public void submitConsumeRequest(
    final List<MessageExt> msgs,
    final ProcessQueue processQueue,
    final MessageQueue messageQueue,
    final boolean dispatchToConsume) {
    // 并发消费时一次消费消息条数,默认1条(DefaultMQPushConsumer.consumeMessageBatchMaxSize)
    final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
    // msgs.size()一次拉取消息的条数,最大32条
    if (msgs.size() <= consumeBatchSize) {
        // 组装消费请求
        ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue);
        try {
            // 消费任务直接提交到消费线程池,具体消费逻辑ConsumeMessageConcurrentlyService.ConsumeRequest.run
            this.consumeExecutor.submit(consumeRequest);
        } catch (RejectedExecutionException e) {
            // 延迟5s提交到消费线程池
            this.submitConsumeRequestLater(consumeRequest);
        }
    }
    // msgs.size() > consumeBatchSize时,对拉取消息进行分页,每页有consumeBatchSize条消息
    else {
        for (int total = 0; total < msgs.size(); ) {
            // 每页有consumeBatchSize条消息
            List<MessageExt> msgThis = new ArrayList<MessageExt>(consumeBatchSize);
            for (int i = 0; i < consumeBatchSize; i++, total++) {
                if (total < msgs.size()) {
                    msgThis.add(msgs.get(total));
                } else {
                    break;
                }
            }
 
            ConsumeRequest consumeRequest = new ConsumeRequest(msgThis, processQueue, messageQueue);
            try {
                this.consumeExecutor.submit(consumeRequest);
            } catch (RejectedExecutionException e) {
                for (; total < msgs.size(); total++) {
                    msgThis.add(msgs.get(total));
                }
 
                this.submitConsumeRequestLater(consumeRequest);
            }
        }
    }
}

3. 消费消息

org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService.ConsumeRequest是提交线程池中的消费任务,该类是个线程。其关键属性如下。

// 提交消费请求的消息
private final List<MessageExt> msgs;
// PullRequest.messageQueue属性:待拉取消费队列(负载均衡后的分配的消息队列)
private final ProcessQueue processQueue;
// PullRequest.processQueue属性:消息处理队列(存储已拉取的消息)
private final MessageQueue messageQueue;

ConsumeMessageConcurrentlyService.ConsumeRequest#run方法执行消息消费逻辑,代码如下。注意事项:

  • processQueue.isDropped():判断拉取消息处理队列是否被丢弃(消费队列重新负载 _
    删除处理),避免消息重复消费;ACK时,也要进行是否丢弃判断。
  • MessageListenerConcurrently#consumeMessage:具体的消息消费业务逻辑,返回该批消息的消费结果。
  • ConsumeMessageConcurrentlyService#processConsumeResult():消费者消费消息后,进行消息ACK确认,下小节介绍。
  • ConsumeConcurrentlyStatus
    status:该批消息的消费结果(成功CONSUME_SUCCESS、失败RECONSUME_LATER)
/**
 * 并发消息消费的具体逻辑
 * 入口:{@link ConsumeMessageConcurrentlyService#submitConsumeRequest}
 * step1:判断当前消费者的ProcessQueue是否被丢弃,true时重新负载均衡后所属消费队列被消费组内其他消费则占用
 * step2:恢复重试消息主题名(消息重试机制决定)
 * step3:执行消费前的钩子函数
 * step4:具体的消息消费业务逻辑,返回该批消息的消费结果
 *        {@link MessageListenerConcurrently#consumeMessage(List, ConsumeConcurrentlyContext)}
 * step5:判断是否消费超时
 * step6:执行消费后钩子函数
 * step7:再次判断当前消费者的ProcessQueue是否被丢弃(消费过程中是否被修改),防止重复消费
 *        没有丢弃,则处理消费结果,ACK机制{@link ConsumeMessageConcurrentlyService#processConsumeResult(ConsumeConcurrentlyStatus, ConsumeConcurrentlyContext, ConsumeRequest)}
 */
@Override
public void run() {
    // 判断当前消费者的ProcessQueue是否被丢弃,true时重新负载均衡后所属消费队列被消费组内其他消费则占用
    if (this.processQueue.isDropped()) {
        log.info("the message queue not be able to consume, because it's dropped. group={} {}", ConsumeMessageConcurrentlyService.this.consumerGroup, this.messageQueue);
        return;
    }
 
    // 并发消息消费监听器
    MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener;
    ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(messageQueue);
    ConsumeConcurrentlyStatus status = null;
 
    // 恢复重试消息主题名(消息重试机制决定)
    defaultMQPushConsumerImpl.tryResetPopRetryTopic(msgs, consumerGroup);
    defaultMQPushConsumerImpl.resetRetryAndNamespace(msgs, defaultMQPushConsumer.getConsumerGroup());
 
    // 执行消费前的钩子函数
    ConsumeMessageContext consumeMessageContext = null;
    if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
        consumeMessageContext = new ConsumeMessageContext();
        consumeMessageContext.setNamespace(defaultMQPushConsumer.getNamespace());
        consumeMessageContext.setConsumerGroup(defaultMQPushConsumer.getConsumerGroup());
        consumeMessageContext.setProps(new HashMap<String, String>());
        consumeMessageContext.setMq(messageQueue);
        consumeMessageContext.setMsgList(msgs);
        consumeMessageContext.setSuccess(false);
        // 执行消费前钩子函数
        ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext);
    }
 
    long beginTimestamp = System.currentTimeMillis();
    boolean hasException = false;
    ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;
    try {
        if (msgs != null && !msgs.isEmpty()) {
            for (MessageExt msg : msgs) {
                // 设置消费开始时间
                MessageAccessor.setConsumeStartTimeStamp(msg, String.valueOf(System.currentTimeMillis()));
            }
        }
        // 具体的消息消费业务逻辑,返回该批消息的消费结果
        status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
    } catch (Throwable e) {
        log.warn(String.format("consumeMessage exception: %s Group: %s Msgs: %s MQ: %s",
            RemotingHelper.exceptionSimpleDesc(e),
            ConsumeMessageConcurrentlyService.this.consumerGroup,
            msgs,
            messageQueue), e);
        hasException = true;
    }
    long consumeRT = System.currentTimeMillis() - beginTimestamp;
    if (null == status) {
        if (hasException) {
            returnType = ConsumeReturnType.EXCEPTION;
        } else {
            returnType = ConsumeReturnType.RETURNNULL;
        }
    }
    // 消费是否超时
    else if (consumeRT >= defaultMQPushConsumer.getConsumeTimeout() * 60 * 1000) {
        returnType = ConsumeReturnType.TIME_OUT;
    } else if (ConsumeConcurrentlyStatus.RECONSUME_LATER == status) {
        returnType = ConsumeReturnType.FAILED;
    } else if (ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status) {
        returnType = ConsumeReturnType.SUCCESS;
    }
 
    if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
        consumeMessageContext.getProps().put(MixAll.CONSUME_CONTEXT_TYPE, returnType.name());
    }
 
    if (null == status) {
        log.warn("consumeMessage return null, Group: {} Msgs: {} MQ: {}",
            ConsumeMessageConcurrentlyService.this.consumerGroup,
            msgs,
            messageQueue);
        status = ConsumeConcurrentlyStatus.RECONSUME_LATER;
    }
 
    // 执行消费后钩子函数
    if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
        consumeMessageContext.setStatus(status.toString());
        consumeMessageContext.setSuccess(ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status);
        ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext);
    }
 
    ConsumeMessageConcurrentlyService.this.getConsumerStatsManager()
        .incConsumeRT(ConsumeMessageConcurrentlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT);
 
    // 再次判断当前消费者的ProcessQueue是否被丢弃(消费过程中是否被修改),防止重复消费
    if (!processQueue.isDropped()) {
        // 处理消费结果,ACK机制
        ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);
    } else {
        log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", messageQueue, msgs);
    }
}

二、消息确认(ACK)

根据上节的介绍,PUSH模式拉取消息后,拉取请求提交到消费线程池,消费者消费消息。org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus是消费者消费该批消息的结果,只有在消费结果是失败(RECONSUME_LATER)时,这批消息逐条发送消费ACK确认。

Broker处理消费ACK请求时,消息主题修改为重试主题(%RETRY% + 消费组名称),而原始主题存储到消息扩展属性。同时,判断消费次数是否超出最大重试消费(默认16次),若是则进入DLQ队列。新消息再次存储到Commitlog文件,而不是直接修改消息,原因是RocketMQ顺序写入Commitlog文件,随机读,提高消息的吞吐量。消费者重新消费时,恢复为原始主题。

1. 消费端处理消费结果

org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService#processConsumeResult是处理消费结果的核心逻辑方法,如下代码。注意事项:

  • 根据消费结果计算ackIndex:

    该批消息消费成功CONSUME_SUCCESS:ackIndex为msgs.size() - 1

    该批消息消费失败RECONSUME_LATER:ackIndex为-1

  • 根据消费模式处理失败RECONSUME_LATER:

    广播:只打印日志。

    集群:该批消息逐条向Broker同步发送ACK确认,Broker返回失败时设置消费次数(+1)并再次封装成ConsumeRequest,且延迟5s重新消费。

  • 消费结果成功时:更新消费进度(移除该批消息后,获取剩下的msgTreeMap消息中最小的偏移量,避免重复消费),下小节介绍消费进度管理。

/**
 * 处理消费结果
 * step1:计算ackIndex,为消费ACK准备:
 *            成功CONSUME_SUCCESS:ackIndex为msgs.size() - 1
 *            失败RECONSUME_LATER:ackIndex为-1
 * step2:消费模式,处理业务方返回RECONSUME_LATER的消息:
 *            广播BROADCASTING:RECONSUME_LATER的消息执行,只打印日志
 *            集群CLUSTERING:本批消息只要有一个返回RECONSUME_LATER,则本批消息都需要ACK,ACK返回失败,设置消费次数
 * step3:更新消费进度:
 *            移除该批消息后,获取剩下的msgTreeMap消息中最小的偏移量,避免重复消费
 */
public void processConsumeResult(
    final ConsumeConcurrentlyStatus status,
    final ConsumeConcurrentlyContext context,
    final ConsumeRequest consumeRequest
) {
    int ackIndex = context.getAckIndex();
 
    if (consumeRequest.getMsgs().isEmpty())
        return;
 
    // 计算ackIndex,为消费ACK准备
    switch (status) {
        // 成功,则ackIndex为msgs.size() - 1
        case CONSUME_SUCCESS:
            if (ackIndex >= consumeRequest.getMsgs().size()) {
                ackIndex = consumeRequest.getMsgs().size() - 1;
            }
            int ok = ackIndex + 1;
            int failed = consumeRequest.getMsgs().size() - ok;
            this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), ok);
            this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), failed);
            break;
        // 失败,则ackIndex为-1
        case RECONSUME_LATER:
            ackIndex = -1;
            this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(),
                consumeRequest.getMsgs().size());
            break;
        default:
            break;
    }
 
    // 根据消费模式,处理业务方返回RECONSUME_LATER的消息
    switch (this.defaultMQPushConsumer.getMessageModel()) {
        case BROADCASTING:
            // RECONSUME_LATER的消息执行,只打印日志
            for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
                MessageExt msg = consumeRequest.getMsgs().get(i);
                log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString());
            }
            break;
        case CLUSTERING:
            // 本批消息只要有一个返回RECONSUME_LATER,则本批消息都需要ACK
            List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size());
            for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
                MessageExt msg = consumeRequest.getMsgs().get(i);
                // 每条消息,向Broker发送ACK
                boolean result = this.sendMessageBack(msg, context);
                // ACK返回失败的消息,设置消费次数
                if (!result) {
                    msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
                    msgBackFailed.add(msg);
                }
            }
 
            // ACK返回失败的消息再次封装成ConsumeRequest,且延迟5s重新消费
            if (!msgBackFailed.isEmpty()) {
                consumeRequest.getMsgs().removeAll(msgBackFailed);
 
                this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());
            }
            break;
        default:
            break;
    }
 
    // 移除消息后,获取剩下的msgTreeMap消息,最小的偏移量,避免重复消费
    long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
    // 更新本地消费队列的消费进度
    if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
        this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);
    }
}

org.apache.rocketmq.client.impl.MQClientAPIImpl#consumerSendMessageBack是同步发送消息ACK确认核心方法,如下代码。请求码RequestCode.CONSUMER_SEND_MSG_BACK,注意事项:

  • delayLevel(延迟级别):取ConsumeConcurrentlyContext#delayLevelWhenNextConsume属性(消费延迟重试策略),值有:

    -1:不重试,直接进入死信队列DLQ
    0(默认):Broker控制重试次数
    >0:消费端控制重试次数

/**
 * 同步向Broker发送消息消费ACK请求
 * step1:构建消费确认ACK的发送请求头{@link ConsumerSendMsgBackRequestHeader};
 * step2:同步发送ACK
 * 注意:delayLevel(延迟级别):取{@link ConsumeConcurrentlyContext#delayLevelWhenNextConsume}属性(消费延迟重试策略),值有:
 *                      -1:不重试,直接进入死信队列DLQ;0(默认):Broker控制重试次数;>0:消费端控制重试次数
 * Broker入口:{@link org.apache.rocketmq.broker.processor.SendMessageProcessor#processRequest}
 * @param addr Broker地址
 * @param msg
 * @param consumerGroup
 * @param delayLevel 延迟级别
 * @param timeoutMillis 超时时间
 * @param maxConsumeRetryTimes 最大消费重试次数
 */
public void consumerSendMessageBack(
    final String addr,
    final MessageExt msg,
    final String consumerGroup,
    final int delayLevel,
    final long timeoutMillis,
    final int maxConsumeRetryTimes
) throws RemotingException, MQBrokerException, InterruptedException {
    // 构建消费确认ACK的发送请求头
    ConsumerSendMsgBackRequestHeader requestHeader = new ConsumerSendMsgBackRequestHeader();
    // 发送请求码CONSUMER_SEND_MSG_BACK
    RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CONSUMER_SEND_MSG_BACK, requestHeader);
 
    requestHeader.setGroup(consumerGroup);
    requestHeader.setOriginTopic(msg.getTopic());
    requestHeader.setOffset(msg.getCommitLogOffset());
    requestHeader.setDelayLevel(delayLevel);
    requestHeader.setOriginMsgId(msg.getMsgId());
    requestHeader.setMaxReconsumeTimes(maxConsumeRetryTimes);
 
    RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
        request, timeoutMillis);
    assert response != null;
    switch (response.getCode()) {
        case ResponseCode.SUCCESS: {
            return;
        }
        default:
            break;
    }
 
    throw new MQBrokerException(response.getCode(), response.getRemark(), addr);
}

2. Broker端处理消费ACK请求

SendMessageProcessor#processRequest是Broker端接收消费ACK请求方法,请求码RequestCode.CONSUMER_SEND_MSG_BACK。

org.apache.rocketmq.broker.processor.AbstractSendMessageProcessor#consumerSendMsgBack是消费ACK的核心方法,代码如下。注意事项:

  • 主题修改为重试主题:重试主题(%RETRY% + 消费组名称),并且从重试队列中随机选择一个队列;消息原始主题存储到消息扩展属性。
  • 判断延迟级别delayLevel:见上小节《消费端处理消费结果》,默认0是Broker控制重试次数。
  • 判定消费重试次数是否超出最大重试次数:

a. 消费次数 >= 最大重试消费次数 或 delayLevel < 0时:进入DLQ队列(只有写权限),不在进行消费,需要人工干预;

b. 消费次数 < 最大重试消费次数时:若是delayLevel = 0 (默认)则重置delayLevel,后消息进行延迟消费(设置延迟级别)。

  • 创建新消息对象:创建新消息并提交到Commitlog文件内存,而不是修改消息(重试次数)。原因是:RocketMQ顺序写入Commitlog文件,随机读,提高消息的吞吐量。
/**
 * Broker处理消息消费ACK确认
 * 消费者ACK入口:{@link MQClientAPIImpl#consumerSendMessageBack}
 * step1:获取重试主题(%RETRY% + 消费组名称),并且从重试队列中随机选择一个队列
 * step2:构建重试主题配置信息
 *        {@link TopicConfigManager#createTopicInSendMessageBackMethod}
 * step3:判断是否有写权限
 * step4:根据offset从Commitlog获取消息
 * step5:消息的原始主题存入到属性中
 * step6:获取最大重试消费次数,默认16次
 *        {@link SubscriptionGroupConfig.retryMaxTimes}
 * step7:消费次数 >= 最大重试消费次数 或 delayLevel < 0时,进入DLQ队列(只有写权限),不在进行消费,需要人工干预
 *       消费次数 < 最大重试消费次数,若是delayLevel = 0 (默认)则重置delayLevel,后消息进行延迟消费(设置延迟级别)
 * step8:创建新的消息对象,并重新提交到Commitlog文件内存
 *        topic为重试主题、新的msgId,其他属性与原消息一致;新的消息对象存储到Commitlog文件中;原来消息的topic、msgId存入新消息属性。
 */
protected RemotingCommand consumerSendMsgBack(final ChannelHandlerContext ctx, final RemotingCommand request)
    throws RemotingCommandException {
    final RemotingCommand response = RemotingCommand.createResponseCommand(null);
    final ConsumerSendMsgBackRequestHeader requestHeader =
        (ConsumerSendMsgBackRequestHeader) request.decodeCommandCustomHeader(ConsumerSendMsgBackRequestHeader.class);
 
    // The send back requests sent to SlaveBroker will be forwarded to the master broker beside
    final BrokerController masterBroker = this.brokerController.peekMasterBroker();
    if (null == masterBroker) {
        response.setCode(ResponseCode.SYSTEM_ERROR);
        response.setRemark("no master available along with " + brokerController.getBrokerConfig().getBrokerIP1());
        return response;
    }
 
    // The broker that received the request.
    // It may be a master broker or a slave broker
    final BrokerController currentBroker = this.brokerController;
 
    SubscriptionGroupConfig subscriptionGroupConfig =
        masterBroker.getSubscriptionGroupManager().findSubscriptionGroupConfig(requestHeader.getGroup());
    if (null == subscriptionGroupConfig) {
        response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST);
        response.setRemark("subscription group not exist, " + requestHeader.getGroup() + " "
            + FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST));
        return response;
    }
 
    BrokerConfig masterBrokerConfig = masterBroker.getBrokerConfig();
    if (!PermName.isWriteable(masterBrokerConfig.getBrokerPermission())) {
        response.setCode(ResponseCode.NO_PERMISSION);
        response.setRemark("the broker[" + masterBrokerConfig.getBrokerIP1() + "] sending message is forbidden");
        return response;
    }
 
    if (subscriptionGroupConfig.getRetryQueueNums() <= 0) {
        response.setCode(ResponseCode.SUCCESS);
        response.setRemark(null);
        return response;
    }
 
    // 获取重试主题:%RETRY% + 消费组名称
    String newTopic = MixAll.getRetryTopic(requestHeader.getGroup());
    // 从重试队列中随机选择一个队列
    int queueIdInt = this.random.nextInt(subscriptionGroupConfig.getRetryQueueNums());
 
    int topicSysFlag = 0;
    if (requestHeader.isUnitMode()) {
        topicSysFlag = TopicSysFlag.buildSysFlag(false, true);
    }
 
    // Create retry topic to master broker 构建重试主题配置信息
    TopicConfig topicConfig = masterBroker.getTopicConfigManager().createTopicInSendMessageBackMethod(
        newTopic,
        subscriptionGroupConfig.getRetryQueueNums(),
        PermName.PERM_WRITE | PermName.PERM_READ, topicSysFlag);
    if (null == topicConfig) {
        response.setCode(ResponseCode.SYSTEM_ERROR);
        response.setRemark("topic[" + newTopic + "] not exist");
        return response;
    }
 
    // 是否有写权限
    if (!PermName.isWriteable(topicConfig.getPerm())) {
        response.setCode(ResponseCode.NO_PERMISSION);
        response.setRemark(String.format("the topic[%s] sending message is forbidden", newTopic));
        return response;
    }
 
    // 根据offset从Commitlog获取消息
    // Look message from the origin message store
    MessageExt msgExt = currentBroker.getMessageStore().lookMessageByOffset(requestHeader.getOffset());
    if (null == msgExt) {
        response.setCode(ResponseCode.SYSTEM_ERROR);
        response.setRemark("look message by offset failed, " + requestHeader.getOffset());
        return response;
    }
 
    //for logic queue
    if (requestHeader.getOriginTopic() != null
        && !msgExt.getTopic().equals(requestHeader.getOriginTopic())) {
        //here just do some fence in case of some unexpected offset is income
        response.setCode(ResponseCode.SYSTEM_ERROR);
        response.setRemark("look message by offset failed to check the topic name" + requestHeader.getOffset());
        return response;
    }
 
    // 消息的原始主题存入到属性中
    final String retryTopic = msgExt.getProperty(MessageConst.PROPERTY_RETRY_TOPIC);
    if (null == retryTopic) {
        MessageAccessor.putProperty(msgExt, MessageConst.PROPERTY_RETRY_TOPIC, msgExt.getTopic());
    }
    msgExt.setWaitStoreMsgOK(false);
 
    // 延迟级别
    int delayLevel = requestHeader.getDelayLevel();
 
    // 获取最大重试消费次数,默认16次
    int maxReconsumeTimes = subscriptionGroupConfig.getRetryMaxTimes();
    if (request.getVersion() >= MQVersion.Version.V3_4_9.ordinal()) {
        Integer times = requestHeader.getMaxReconsumeTimes();
        if (times != null) {
            maxReconsumeTimes = times;
        }
    }
 
    // 消费次数 >= 最大重试消费次数时,进入DLQ队列(只有写权限),不在进行消费,需要人工干预
    boolean isDLQ = false;
    if (msgExt.getReconsumeTimes() >= maxReconsumeTimes
        || delayLevel < 0) {
 
        // 进入DQL队列,重置newTopic为:%DLQ% + 消费组
        isDLQ = true;
        newTopic = MixAll.getDLQTopic(requestHeader.getGroup());
        queueIdInt = randomQueueId(DLQ_NUMS_PER_GROUP);
 
        // Create DLQ topic to master broker
        topicConfig = masterBroker.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic,
            DLQ_NUMS_PER_GROUP,
            PermName.PERM_WRITE | PermName.PERM_READ, 0);
 
        if (null == topicConfig) {
            response.setCode(ResponseCode.SYSTEM_ERROR);
            response.setRemark("topic[" + newTopic + "] not exist");
            return response;
        }
        msgExt.setDelayTimeLevel(0);
    }
    // 消费次数 < 最大重试消费次数,则消息进行延迟消费
    else {
        // 根据消费次数,设置延迟级别
        if (0 == delayLevel) {
            delayLevel = 3 + msgExt.getReconsumeTimes();
        }
 
        msgExt.setDelayTimeLevel(delayLevel);
    }
 
    /*
        创建新的消息对象
        a. topic为重试主题、新的msgId,其他属性与原消息一致;
        b. 新的消息对象存储到Commitlog文件中;
        c. 原来消息的topic、msgId存入新消息属性。
     */
    MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
    msgInner.setTopic(newTopic);
    msgInner.setBody(msgExt.getBody());
    msgInner.setFlag(msgExt.getFlag());
    MessageAccessor.setProperties(msgInner, msgExt.getProperties());
    msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));
    msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(null, msgExt.getTags()));
 
    msgInner.setQueueId(queueIdInt);
    msgInner.setSysFlag(msgExt.getSysFlag());
    msgInner.setBornTimestamp(msgExt.getBornTimestamp());
    msgInner.setBornHost(msgExt.getBornHost());
    msgInner.setStoreHost(this.getStoreHost());
    msgInner.setReconsumeTimes(msgExt.getReconsumeTimes() + 1); // 消费次数+1
 
    String originMsgId = MessageAccessor.getOriginMessageId(msgExt);
    MessageAccessor.setOriginMessageId(msgInner, UtilAll.isBlank(originMsgId) ? msgExt.getMsgId() : originMsgId);
    msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));
 
    boolean succeeded = false;
 
    // 存储到Commitlog中
    // Put retry topic to master message store
    PutMessageResult putMessageResult = masterBroker.getMessageStore().putMessage(msgInner);
    if (putMessageResult != null) {
        String commercialOwner = request.getExtFields().get(BrokerStatsManager.COMMERCIAL_OWNER);
 
        switch (putMessageResult.getPutMessageStatus()) {
            case PUT_OK:
                String backTopic = msgExt.getTopic();
                String correctTopic = msgExt.getProperty(MessageConst.PROPERTY_RETRY_TOPIC);
                if (correctTopic != null) {
                    backTopic = correctTopic;
                }
                if (TopicValidator.RMQ_SYS_SCHEDULE_TOPIC.equals(msgInner.getTopic())) {
                    masterBroker.getBrokerStatsManager().incTopicPutNums(msgInner.getTopic());
                    masterBroker.getBrokerStatsManager().incTopicPutSize(msgInner.getTopic(), putMessageResult.getAppendMessageResult().getWroteBytes());
                    masterBroker.getBrokerStatsManager().incQueuePutNums(msgInner.getTopic(), msgInner.getQueueId());
                    masterBroker.getBrokerStatsManager().incQueuePutSize(msgInner.getTopic(), msgInner.getQueueId(), putMessageResult.getAppendMessageResult().getWroteBytes());
                }
                masterBroker.getBrokerStatsManager().incSendBackNums(requestHeader.getGroup(), backTopic);
 
                if (isDLQ) {
                    masterBroker.getBrokerStatsManager().incDLQStatValue(
                        BrokerStatsManager.SNDBCK2DLQ_TIMES,
                        commercialOwner,
                        requestHeader.getGroup(),
                        requestHeader.getOriginTopic(),
                        BrokerStatsManager.StatsType.SEND_BACK_TO_DLQ.name(),
                        1);
 
                    String uniqKey = msgInner.getProperties().get(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
                    DLQ_LOG.info("send msg to DLQ {}, owner={}, originalTopic={}, consumerId={}, msgUniqKey={}, storeTimestamp={}",
                        newTopic,
                        commercialOwner,
                        requestHeader.getOriginTopic(),
                        requestHeader.getGroup(),
                        uniqKey,
                        putMessageResult.getAppendMessageResult().getStoreTimestamp());
                }
 
                response.setCode(ResponseCode.SUCCESS);
                response.setRemark(null);
 
                succeeded = true;
                break;
            default:
                break;
        }
 
        if (!succeeded) {
            response.setCode(ResponseCode.SYSTEM_ERROR);
            response.setRemark(putMessageResult.getPutMessageStatus().name());
        }
    } else {
        if (isDLQ) {
            String owner = request.getExtFields().get(BrokerStatsManager.COMMERCIAL_OWNER);
            String uniqKey = msgInner.getProperties().get(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
            DLQ_LOG.info("failed to send msg to DLQ {}, owner={}, originalTopic={}, consumerId={}, msgUniqKey={}, result={}",
                newTopic,
                owner,
                requestHeader.getOriginTopic(),
                requestHeader.getGroup(),
                uniqKey,
                "null");
        }
 
        response.setCode(ResponseCode.SYSTEM_ERROR);
        response.setRemark("putMessageResult is null");
    }
 
    if (this.hasConsumeMessageHook() && !UtilAll.isBlank(requestHeader.getOriginMsgId())) {
        String namespace = NamespaceUtil.getNamespaceFromResource(requestHeader.getGroup());
        ConsumeMessageContext context = new ConsumeMessageContext();
        context.setNamespace(namespace);
        context.setTopic(requestHeader.getOriginTopic());
        context.setConsumerGroup(requestHeader.getGroup());
        context.setCommercialRcvStats(BrokerStatsManager.StatsType.SEND_BACK);
        context.setCommercialRcvTimes(1);
        context.setCommercialOwner(request.getExtFields().get(BrokerStatsManager.COMMERCIAL_OWNER));
 
        context.setAccountAuthType(request.getExtFields().get(BrokerStatsManager.ACCOUNT_AUTH_TYPE));
        context.setAccountOwnerParent(request.getExtFields().get(BrokerStatsManager.ACCOUNT_OWNER_PARENT));
        context.setAccountOwnerSelf(request.getExtFields().get(BrokerStatsManager.ACCOUNT_OWNER_SELF));
        context.setRcvStat(isDLQ ? BrokerStatsManager.StatsType.SEND_BACK_TO_DLQ : BrokerStatsManager.StatsType.SEND_BACK);
        context.setSuccess(succeeded);
        context.setRcvMsgNum(1);
        //Set msg body size 0 when sent back by consumer.
        context.setRcvMsgSize(0);
        context.setCommercialRcvMsgNum(succeeded ? 1 : 0);
 
        try {
            this.executeConsumeMessageHookAfter(context);
        } catch (AbortProcessException e) {
            response.setCode(e.getResponseCode());
            response.setRemark(e.getErrorMessage());
        }
    }
 
    return response;
}

重试新消息的延迟时间达到后,则允许再次拉取消息,进而重新消费。拉取消息提交到消费线程消费时,重试主题恢复到原始主题重新消费。ConsumeRequest#run()消费消息任务,详细见上节,如下代码所示。

@Override
public void run() {
    ......
 
    // 恢复重试消息主题名(消息重试机制决定)
    defaultMQPushConsumerImpl.tryResetPopRetryTopic(msgs, consumerGroup);
    defaultMQPushConsumerImpl.resetRetryAndNamespace(msgs, defaultMQPushConsumer.getConsumerGroup());
 
    ......
}

三、消费进度管理

上面章节介绍,消费一批消息成功后,则ProceeQueue移除该批消息,获取剩下的msgTreeMap消息中最小的偏移量,避免重复消费,同时更新消费偏移量。那么消费进度存储在哪里呢?

  • 广播模式:同一个消费组的所有消费者都需要消费主题下的所有消息,也就是同组内的消费者的消息消费行为是对立的,互相不影响,所以消费进度存储在本地消费者。

  • 集群模式:同一个消费组的所有消费者共享主题下的所有消息,同一个消息消费队列在同一时间只会被消费组内的一个消费者消费,并且随着消费队列的动态变化重新负载,所以消费进度存储在Broker。

    org.apache.rocketmq.client.consumer.store.OffsetStore是消费进度接口,其UML图及接口方法如下。
    在这里插入图片描述
    在这里插入图片描述
    消费者启动时,会初始化消费进度。如:根据集群模式创建OffsetStore接口的具体实现类;执行load()加载消费进度到内存中(offsetTable维护);执行5s定时周期任务,持久化消费进度。

1. 广播模式消费进度存储

org.apache.rocketmq.client.consumer.store.LocalFileOffsetStore是广播模式消费进度存储的实现类,其关键属性如下。

// 本地存储目录,默认用户主目录/.rocketmq_offsets,通过-Drocketmq.client.localOffsetStoreDir配置
public final static String LOCAL_OFFSET_STORE_DIR = System.getProperty(
    "rocketmq.client.localOffsetStoreDir",
    System.getProperty("user.home") + File.separator + ".rocketmq_offsets");
// 存储文件,路径:LOCAL_OFFSET_STORE_DIR/.rocketmq_offsets/{mQClientFactory.getClientId()}/groupName/offsets.json
private final String storePath;
// 内存中消费队列的消费进度
private ConcurrentMap<MessageQueue/* 消费队列 */, AtomicLong/* 消费进度 */> offsetTable = new ConcurrentHashMap<MessageQueue, AtomicLong>();

消费者启动时,会初始化消费进度。若是广播模式,则从文件中加载消费进度,LocalFileOffsetStore#load加载消费进度,如下代码所示。

/**
 * 广播模式下加载消费进度文件(消费者启动时,会初始化消费进度)
 * 消费者启动入口:{@link DefaultMQPushConsumerImpl#start()}
 * step1:读取本地磁盘消费队列的消费进度到内存
 *        {@link LocalFileOffsetStore#readLocalOffset()}
 * step2:遍历,消费进度存储到内存中消费队列的消费进度{@link LocalFileOffsetStore#offsetTable}
 */
@Override
public void load() throws MQClientException {
    // 读取本地磁盘消费队列的消费进度到内存
    OffsetSerializeWrapper offsetSerializeWrapper = this.readLocalOffset();
    if (offsetSerializeWrapper != null && offsetSerializeWrapper.getOffsetTable() != null) {
        offsetTable.putAll(offsetSerializeWrapper.getOffsetTable());
 
        // 打印日志
        for (Entry<MessageQueue, AtomicLong> mqEntry : offsetSerializeWrapper.getOffsetTable().entrySet()) {
            AtomicLong offset = mqEntry.getValue();
            log.info("load consumer's offset, {} {} {}",
                    this.groupName,
                    mqEntry.getKey(),
                    offset.get());
        }
    }
}

消费者定时5s周期执行持久化消费进度到磁盘,LocalFileOffsetStore#persistAll,代码如下。

/**
 * 持久化所有消费进度到磁盘
 * 什么时候持久化呢?  MQ的客户端实例启动定时任务,每5s持久化一次
 * 定时任务{@link MQClientInstance#startScheduledTask()}5s执行persistAllConsumerOffset()方法
 */
@Override
public void persistAll(Set<MessageQueue> mqs) {
    if (null == mqs || mqs.isEmpty())
        return;
 
    // 构建对象,把指定的MessageQueue添加到offsetTable中,进行持久化
    OffsetSerializeWrapper offsetSerializeWrapper = new OffsetSerializeWrapper();
    for (Map.Entry<MessageQueue, AtomicLong> entry : this.offsetTable.entrySet()) {
        if (mqs.contains(entry.getKey())) {
            AtomicLong offset = entry.getValue();
            offsetSerializeWrapper.getOffsetTable().put(entry.getKey(), offset);
        }
    }
 
    // 转为json字符串
    String jsonString = offsetSerializeWrapper.toJson(true);
    if (jsonString != null) {
        try {
            MixAll.string2File(jsonString, this.storePath);
        } catch (IOException e) {
            log.error("persistAll consumer offset Exception, " + this.storePath, e);
        }
    }
}

2. 集群模式消费进度存储

org.apache.rocketmq.client.consumer.store.RemoteBrokerOffsetStore是集群模式消费进度存储的实现类,其关键属性如下。

// 内存中消费队列的消费进度
private ConcurrentMap<MessageQueue/* 消费队列 */, AtomicLong/* 消费进度 */> offsetTable = new ConcurrentHashMap<MessageQueue, AtomicLong>();

消费者启动时,会初始化消费进度。执行load()方法为空,即:集群模式下无需加载操作。成功消费该批消息后,执行OffsetStore#updateOffset方法更新本地消费队列的消费进度(内存消费进度)。而消费者定时5s周期执行持久化消费进度到磁盘,LocalFileOffsetStore#persistAll(发送到Broker更新内存消费进度)。同时,Broker端默认10s周期执行持久化一次消费进度,如下所示是集群模式下消费进度管理图 。
在这里插入图片描述
org.apache.rocketmq.broker.offset.ConsumerOffsetManager是Broker的消费进度更新管理器,其关键属性如下。存储文件名为${RocketMQ_HOME}/store/config/consumerOffset.json。

// 消费进度列表
protected ConcurrentMap<String/* topic@group */, ConcurrentMap<Integer/* 消费队列ID */, Long/* 消费进度 */>> offsetTable = new ConcurrentHashMap<String, ConcurrentMap<Integer, Long>>(512);

RemoteBrokerOffsetStore#persist持久化指定消费队列的消费进度,其核心方法是updateConsumeOffsetToBroker(),其代码如下。

/**
 * Update the Consumer Offset synchronously, once the Master is off, updated to Slave, here need to be optimized.
 */
@Override
public void updateConsumeOffsetToBroker(MessageQueue mq, long offset, boolean isOneway) throws RemotingException,
    MQBrokerException, InterruptedException, MQClientException {
    // 获取消费队列的Broker
    FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(this.mQClientFactory.getBrokerNameFromMessageQueue(mq), MixAll.MASTER_ID, false);
    if (null == findBrokerResult) {
        this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
        findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(this.mQClientFactory.getBrokerNameFromMessageQueue(mq), MixAll.MASTER_ID, false);
    }
 
    if (findBrokerResult != null) {
        UpdateConsumerOffsetRequestHeader requestHeader = new UpdateConsumerOffsetRequestHeader();
        requestHeader.setTopic(mq.getTopic());
        requestHeader.setConsumerGroup(this.groupName);
        requestHeader.setQueueId(mq.getQueueId());
        requestHeader.setCommitOffset(offset);
        requestHeader.setBname(mq.getBrokerName());
 
        // 单向更新,无更新响应结果
        if (isOneway) {
            this.mQClientFactory.getMQClientAPIImpl().updateConsumerOffsetOneway(
                findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5);
        }
        // 不是单向,则返回是否更新成功
        else {
            this.mQClientFactory.getMQClientAPIImpl().updateConsumerOffset(
                findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5);
        }
    } else {
        throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
    }
}

org.apache.rocketmq.client.consumer.store.RemoteBrokerOffsetStore#readOffset是读取Broker端的消费进度的核心方法,代码如下。

@Override
public long readOffset(final MessageQueue mq, final ReadOffsetType type) {
    if (mq != null) {
        switch (type) {
            // 先从内存中读取,再从磁盘读取
            case MEMORY_FIRST_THEN_STORE:
            // 从内存读取
            case READ_FROM_MEMORY: {
                AtomicLong offset = this.offsetTable.get(mq);
                if (offset != null) {
                    return offset.get();
                } else if (ReadOffsetType.READ_FROM_MEMORY == type) {
                    return -1;
                }
            }
            // 从磁盘读取
            case READ_FROM_STORE: {
                try {
                    // 从broker获取指定消费队列的消费偏移量
                    long brokerOffset = this.fetchConsumeOffsetFromBroker(mq);
                    AtomicLong offset = new AtomicLong(brokerOffset);
                    this.updateOffset(mq, offset.get(), false);
                    return brokerOffset;
                }
                // No offset in broker
                catch (OffsetNotFoundException e) {
                    return -1;
                }
                //Other exceptions
                catch (Exception e) {
                    log.warn("fetchConsumeOffsetFromBroker exception, " + mq, e);
                    return -2;
                }
            }
            default:
                break;
        }
    }
 
    return -3;
}

四、参考资料

  1. https://blog.csdn.net/qq_21040559/article/details/122775470
  2. https://www.cnblogs.com/shanml/p/16989785.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/739760.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Cocktail mac版-Cocktail 苹果版(清理维护优化工具)安装教程

Cocktail for Mac是一款Mac OS X系统清理、修复和优化的常规实用工具。它不仅可以一键清理系统中的残余垃圾。还可以帮助用户修改系统的隐藏属性、隐藏文件&#xff0c;对优化mac系统有很大的帮助。CocktailV11.4破解版增加了清除macOS内容缓存的功能。 内容缓存保留了各种Appl…

如何查看自己windows电脑的ip地址

可能有些小伙伴对查看自己电脑的ip地址不太熟悉&#xff0c;今天这里介绍几种方式&#xff1a; 我自己的电脑是Win11&#xff0c;就直接展示截图了。 一、命令行方式&#xff1a; windowsR打开 CMD(命令行窗口)或者windows PowerShell窗口&#xff0c;输入以下命令&#xff1…

企业信息化可以为企业带来什么效益?

一、什么是信息化 在具体谈信息化前我们先来谈一谈信息化和数字化 信息化&#xff1a;信息化是指培养、发展以计算机为主的智能化工具为代表的新生产力&#xff0c;并使之造福于社会的历史过程。与智能化工具相适应的生产力&#xff0c;称为信息化生产力。 数字化&#xff1a;…

MySQL基础篇第6章(多表查询)

文章目录 1、一个案例引发的多表连接1.1 案例说明1.2 笛卡儿积 2、多表查询分类讲解2.1 等值连接2.2 非等值连接2.3 自连接2.4 内连接2.5 外连接(OUTER JOIN)2.5.1 左外连接(LEFT OUTER JOIN)2.5.2 右外连接2.5.3 满外连接(FULL OUTER JOIN) 3、UNION的使用3.1 UNION操作符3.2 …

Linux 命令大全(下)

Linux 命令大全&#xff08;上&#xff09; 本文目录 6. 网络通讯 常用命令6.1 ssh 命令 – 安全的远程连接服务器6.1.1 含义6.1.2 语法格式6.1.3 常用参数6.1.4 参考示例 6.2 netstat 命令 – 显示网络状态6.2.1 含义6.2.2 语法格式6.2.3 常用参数6.2.4 参考示例 6.3 dhclient…

IP协议【图解TCP/IP(笔记九)】

文章目录 IP即网际协议IP相当于OSI参考模型的第3层网络层与数据链路层的关系 IP基础知识IP地址属于网络层地址路由控制■ 发送数据至最终目标地址■ 路由控制表 数据链路的抽象化IP属于面向无连接型 IP即网际协议 TCP/IP的心脏是互联网层。这一层主要由IP&#xff08;Internet…

【netty】TCP 粘包和拆包及解决方案

TCP 粘包和拆包基本介绍 TCP是面向连接的&#xff0c;面向流的&#xff0c;提供高可靠性服务。收发两端&#xff08;客户端和服务器端&#xff09;都要有一一成对的socket&#xff0c;因此&#xff0c;发送端为了将多个发给接收端的包&#xff0c;更有效的发给对方&#xff0c…

电商API接口商品页面数据(详情数据,销量数据,sku数据,视频数据,优惠券数据)接口代码示例

有探讨稳定采集电商等多平台整站实时商品详情历史价格数据接口&#xff0c;通过该接口开发者可以更好地了解商品的情况&#xff0c;商品详情数据详细信息查询&#xff0c;数据参数包括&#xff1a;商品链接&#xff0c;商品列表主图、价格、标题&#xff0c;sku&#xff0c;库存…

技术流 | 使用eBPF增强kubernetes可观测性的实践分享

本文作者&#xff1a;擎创科技某大拿 01 背景与问题 当前&#xff0c;云原生技术主要是以容器技术为基础围绕着 Kubernetes的标准化技术生态&#xff0c;通过标准可扩展的调度、网络、存储、容器运行时接口来提供基础设施&#xff0c;同时通过标准可扩展的声明式资源和控制器来…

narak靶机详解

narak靶机复盘 首先对靶机进行扫描&#xff0c;找到靶机的真实ip地址。 然后dirb进行目录扫描&#xff0c;扫描到一个目录&#xff0c;我们打开发现是一个登陆界面。 并没有用户名和密码&#xff0c;我们就用cewl扫描这个网站&#xff0c;扫出一个字典&#xff0c;用来暴力破…

2、JDk、JRE、JVM三者区别和联系

JDK JRE JVM 含义 JDK: Java Develpment Kit java 开发工具 JRE: Java Runtime Environment java 运行时环境 JVM: java Virtual Machine java 虚拟机 一张图来解释&#xff1a; 联系&#xff1a; JVM不能单独搞定class的执行&#xff0c;解释class的时候JVM需要调用解…

Openlayers实战:非4326,3857的投影

Proj4js 是一个 JavaScript 库,用于将点坐标从一个坐标系转换到另一个坐标系,包括基准转换。Openlayers地图上,除了默认的4326和3857投影方式外,可以通过Proj4js的拓展,可以显示其他的投影。 本实战中,将ESRI:53009投射到Openlayers地图上。 安装依赖 npm install proj4…

java高级语法笔记

Java ArrayList java泛型语法介绍 https://www.runoob.com/java/java-generics.html 匿名函数->&#xff08;Lambda 表达式 &#xff0c;java8的新特性&#xff09;

使用openKylin操作系统下载VMware Tools教程(超详细图文教程)

目录 前言操作步骤验证使用 前言 VMware Tools作为一个VMware十分有用的工具&#xff0c;下载它也经常作为配置VMware的一个常有环节。本篇文章&#xff0c;我将用国产操作系统openKylin为大家演示如何下载安装VMware Tools。 操作步骤 1.点击 虚拟机----安装VMware Tools…

Kubernets与Docker的故事

在 2016 年底的 1.5 版里&#xff0c;Kubernetes 引入了一个新的接口标准&#xff1a;CRI &#xff0c;Container Runtime Interface。 CRI 采用了 ProtoBuffer 和 gPRC&#xff0c;规定 kubelet 该如何调用容器运行时去管理容器和镜像&#xff0c;但这是一套全新的接口&#…

diffusion model(四)文生图diffusion model(classifier-free guided)

文章目录 系列阅读 文生图diffusion model&#xff08;classifier-free guided&#xff09;背景方法大意模型如何融入类别信息&#xff08;或语义信息&#xff09;采用交叉注意力机制融入基于channel-wise attention融入 如何训练 ϵ θ ( x t , y , t ) \epsilon_{\theta}(x_t…

P2 第一章 电路模型与电路定律

1、电源为什么可以等效为负值电阻&#xff1f; 当然电源可以等效为负值电阻&#xff0c;但是它不是真实电阻。 思考&#xff1a;阻碍反义词有推动&#xff0c;电动势&#xff1a;即电子运动的趋势&#xff0c;能够克服导体电阻对电流的阻力&#xff0c;使电荷在闭合的导体回路…

【Linux后端服务器开发】信号量与信号

目录 一、信号量概述 二、信号概述 三、信号产生 1、终端按键产生信号 2、调用系统函数产生信号 3、硬件异常产生信号 4、软件条件 四、信号保存 1、信号阻塞 2、信号捕捉流程 五、信号递达 一、信号量概述 信号量&#xff1a;一个计数器&#xff0c;通常用来表示公…

【档案专题】二、电子档案管理

导读&#xff1a;主要针对电子档案管理相关内容介绍。对从事电子档案管理信息化的职业而言&#xff0c;不断夯实电子档案管理相关理论基础是十分重要。只有通过不断梳理相关知识体系和在实际工作当中应用实践&#xff0c;才能走出一条专业化加职业化的道路&#xff0c;从而增强…

《黑马头条》 ElectricSearch 分词器 联想词 MangoDB day08-平台管理[实战]作业

07 app端文章搜索 1) 今日内容介绍 1.1)App端搜索-效果图 1.2)今日内容 2) 搭建ElasticSearch环境 2.1) 拉取镜像 docker pull elasticsearch:7.4.0 2.2) 创建容器 docker run -id --name elasticsearch -d --restartalways -p 9200:9200 -p 9300:9300 -v /usr/share/elasticse…