【RocketMQ】消息的存储

news2024/9/21 1:26:40

Broker对消息的处理

BrokerController初始化的过程中,调用registerProcessor方法注册了处理器,在注册处理器的代码中可以看到创建了处理消息发送的处理器对象SendMessageProcessor,然后将其注册到远程服务中:

public class BrokerController {
    // 初始化
    public boolean initialize() throws CloneNotSupportedException {
        // ...
        // 注册处理器
        this.registerProcessor();
        // ...
    }
  
    // 注册处理器
    public void registerProcessor() {
        /**
         * 发送消息处理器
         */
        SendMessageProcessor sendProcessor = new SendMessageProcessor(this);
        // ...
        // 注册消息发送处理器
        this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);
        this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);
        // 省略其他注册...
    }
}

在Broker收到生产者的发送消息请求时,会进入到SendMessageProcessorprocessRequest方法中处理请求,然后又会调用asyncProcessRequest异步处理消息,然后从请求中解析请求头数据,并判断是否是批量发送消息的请求,如果是批量发送消息调用asyncSendBatchMessage方法处理,否则调用asyncSendMessage方法处理单个消息:

public class SendMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor {

    // 处理请求
    @Override
    public RemotingCommand processRequest(ChannelHandlerContext ctx,
                                          RemotingCommand request) throws RemotingCommandException {
        RemotingCommand response = null;
        try {
            // 处理请求
            response = asyncProcessRequest(ctx, request).get();
        } catch (InterruptedException | ExecutionException e) {
            log.error("process SendMessage error, request : " + request.toString(), e);
        }
        return response;
    }
  
    // 异步处理请求
    public CompletableFuture<RemotingCommand> asyncProcessRequest(ChannelHandlerContext ctx,
                                                                  RemotingCommand request) throws RemotingCommandException {
        final SendMessageContext mqtraceContext;
        switch (request.getCode()) {
            case RequestCode.CONSUMER_SEND_MSG_BACK:
                return this.asyncConsumerSendMsgBack(ctx, request);
            default:
                // 解析请求头
                SendMessageRequestHeader requestHeader = parseRequestHeader(request);
                // ...
                if (requestHeader.isBatch()) {
                    // 批量消息发送处理
                    return this.asyncSendBatchMessage(ctx, request, mqtraceContext, requestHeader);
                } else {
                    // 单个消息发送处理
                    return this.asyncSendMessage(ctx, request, mqtraceContext, requestHeader);
                }
        }
    }
  
    // 单个消息发送处理
    private CompletableFuture<RemotingCommand> asyncSendMessage(ChannelHandlerContext ctx, RemotingCommand request,
                                                                SendMessageContext mqtraceContext,
                                                                SendMessageRequestHeader requestHeader) {
        // ...
        CompletableFuture<PutMessageResult> putMessageResult = null;
        String transFlag = origProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
        // 是否使用事务
        if (transFlag != null && Boolean.parseBoolean(transFlag)) {
            if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
                response.setCode(ResponseCode.NO_PERMISSION);
                response.setRemark(
                        "the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
                                + "] sending transaction message is forbidden");
                return CompletableFuture.completedFuture(response);
            }
            // 事务处理
            putMessageResult = this.brokerController.getTransactionalMessageService().asyncPrepareMessage(msgInner);
        } else {
            // 消息持久化
            putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner);
        }
        return handlePutMessageResultFuture(putMessageResult, response, request, msgInner, responseHeader, mqtraceContext, ctx, queueIdInt);
    }

}

以单个消息的发送处理方法asyncSendMessage为例看一下消息的接收过程:

  1. 创建MessageExtBrokerInner对象,对消息的相关内容进行封装,将主题信息、队列ID、消息内容、消息属性、发送消息时间、发送消息的主机地址等信息设置到MessageExtBrokerInner中
  2. 判断是否使用了事务,如果未使用事务调用brokerControllergetMessageStore方法获取MessageStore对象,然后调用asyncPutMessage方法对消息进行持久化存储
  3. 返回消息的存储结果
public class SendMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor {

    // 单个消息发送处理
    private CompletableFuture<RemotingCommand> asyncSendMessage(ChannelHandlerContext ctx, RemotingCommand request,
                                                                SendMessageContext mqtraceContext,
                                                                SendMessageRequestHeader requestHeader) {
        // ...
        // 创建MessageExtBrokerInner对象,之后使用这个对象来操纵消息
        MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
        // 设置主题
        msgInner.setTopic(requestHeader.getTopic());
        // 设置消息所在的队列ID
        msgInner.setQueueId(queueIdInt);
        if (!handleRetryAndDLQ(requestHeader, response, request, msgInner, topicConfig)) {
            return CompletableFuture.completedFuture(response);
        }
        // 设置消息内容
        msgInner.setBody(body);
        msgInner.setFlag(requestHeader.getFlag());
        // 设置属性
        Map<String, String> origProps = MessageDecoder.string2messageProperties(requestHeader.getProperties());
        MessageAccessor.setProperties(msgInner, origProps);
        // 设置发送消息时间
        msgInner.setBornTimestamp(requestHeader.getBornTimestamp());
        // 设置发送消息的主机地址
        msgInner.setBornHost(ctx.channel().remoteAddress());
        // 设置存储消息的主机地址
        msgInner.setStoreHost(this.getStoreHost());
        msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());
        String clusterName = this.brokerController.getBrokerConfig().getBrokerClusterName();
        // 属性中添加集群名称
        MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_CLUSTER, clusterName);
        // 如果属性中包含PROPERTY_WAIT_STORE_MSG_OK
        if (origProps.containsKey(MessageConst.PROPERTY_WAIT_STORE_MSG_OK)) {
            String waitStoreMsgOKValue = origProps.remove(MessageConst.PROPERTY_WAIT_STORE_MSG_OK);
            // 设置消息属性
            msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
            origProps.put(MessageConst.PROPERTY_WAIT_STORE_MSG_OK, waitStoreMsgOKValue);
        } else {
            msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
        }
        CompletableFuture<PutMessageResult> putMessageResult = null;
        String transFlag = origProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
        // 是否使用事务
        if (transFlag != null && Boolean.parseBoolean(transFlag)) {
            if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
                response.setCode(ResponseCode.NO_PERMISSION);
                response.setRemark(
                        "the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
                                + "] sending transaction message is forbidden");
                return CompletableFuture.completedFuture(response);
            }
            // 事务处理
            putMessageResult = this.brokerController.getTransactionalMessageService().asyncPrepareMessage(msgInner);
        } else {
            // 消息写入
            putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner);
        }
        // 返回消息持久化结果
        return handlePutMessageResultFuture(putMessageResult, response, request, msgInner, responseHeader, mqtraceContext, ctx, queueIdInt);
    }
}

MessageStore是一个接口,在BrokerController的初始化方法中可以看到,具体使用的是DefaultMessageStore:

public class BrokerController {
    private MessageStore messageStore;
    public boolean initialize() throws CloneNotSupportedException {
        boolean result = this.topicConfigManager.load();
        // ...
        if (result) {
            try {
                // 创建DefaultMessageStore
                this.messageStore =
                    new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener,
                        this.brokerConfig);
                // ...
            } catch (IOException e) {
                result = false;
                log.error("Failed to initialize", e);
            }
    }
          
    // 获取MessageStore
    public MessageStore getMessageStore() {
        return messageStore;
    }
}

消息存储

DefaultMessageStore中有一个CommitLog类型的成员变量,在DefaultMessageStore中的构造函数中可以看到,如果启用了Dleger,使用的是DLedgerCommitLogDLedgerCommitLogCommitLog的子类,如果未启用Dleger,就使用CommitLog自己(接下来会以CommitLog为例)。

DefaultMessageStoreasyncPutMessage方法中,首先进行了一系列的合法性校验,校验通过后会调用CommitLogasyncPutMessage进行消息写入:

public class DefaultMessageStore implements MessageStore {
  
   private final CommitLog commitLog; // CommitLog
  
   public DefaultMessageStore(final MessageStoreConfig messageStoreConfig, final BrokerStatsManager brokerStatsManager,
        final MessageArrivingListener messageArrivingListener, final BrokerConfig brokerConfig) throws IOException {
        // ...
        // 如果启用了Dleger
        if (messageStoreConfig.isEnableDLegerCommitLog()) {
            // 使用DLedgerCommitLog
            this.commitLog = new DLedgerCommitLog(this);
        } else {
            // 否则使用CommitLog
            this.commitLog = new CommitLog(this);
        }
        // ...
    }
    
    @Override
    public CompletableFuture<PutMessageResult> asyncPutMessage(MessageExtBrokerInner msg) {
        // 校验存储状态
        PutMessageStatus checkStoreStatus = this.checkStoreStatus();
        if (checkStoreStatus != PutMessageStatus.PUT_OK) {
            return CompletableFuture.completedFuture(new PutMessageResult(checkStoreStatus, null));
        }
        // 校验消息合法性
        PutMessageStatus msgCheckStatus = this.checkMessage(msg);
        if (msgCheckStatus == PutMessageStatus.MESSAGE_ILLEGAL) {
            return CompletableFuture.completedFuture(new PutMessageResult(msgCheckStatus, null));
        }
        // 进行一系列校验
        PutMessageStatus lmqMsgCheckStatus = this.checkLmqMessage(msg);
        if (msgCheckStatus == PutMessageStatus.LMQ_CONSUME_QUEUE_NUM_EXCEEDED) {
            return CompletableFuture.completedFuture(new PutMessageResult(lmqMsgCheckStatus, null));
        }
        long beginTime = this.getSystemClock().now();
        // 调用CommitLog的asyncPutMessage方法写入消息
        CompletableFuture<PutMessageResult> putResultFuture = this.commitLog.asyncPutMessage(msg);
        putResultFuture.thenAccept((result) -> {
            long elapsedTime = this.getSystemClock().now() - beginTime;
            if (elapsedTime > 500) {
                log.warn("putMessage not in lock elapsed time(ms)={}, bodyLength={}", elapsedTime, msg.getBody().length);
            }
            this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime);

            if (null == result || !result.isOk()) {
                this.storeStatsService.getPutMessageFailedTimes().add(1);
            }
        });

        return putResultFuture;
    }
    
}

合法性校验

Broker存储检查

checkStoreStatus主要对Broker是否可以写入消息进行检查,包含以下几个方面:

  • MessageStore是否已经处于关闭状态,如果处于关闭状态不再受理消息的存储
  • Broker是否是从节点,从节点只能读不能写
  • Broker是否有写权限,如果没有写入权限,不能进行写入操作
  • 操作系统是否处于PAGECACHE繁忙状态,处于繁忙状态同样不能进行写入操作
   private PutMessageStatus checkStoreStatus() {
        // 是否处于停止状态
        if (this.shutdown) {
            log.warn("message store has shutdown, so putMessage is forbidden");
            return PutMessageStatus.SERVICE_NOT_AVAILABLE;
        }
        // 是否SLAVE角色
        if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
            long value = this.printTimes.getAndIncrement();
            if ((value % 50000) == 0) {
                log.warn("broke role is slave, so putMessage is forbidden");
            }
            return PutMessageStatus.SERVICE_NOT_AVAILABLE;
        }
        // 是否可写
        if (!this.runningFlags.isWriteable()) {
            long value = this.printTimes.getAndIncrement();
            if ((value % 50000) == 0) {
                log.warn("the message store is not writable. It may be caused by one of the following reasons: " +
                    "the broker's disk is full, write to logic queue error, write to index file error, etc");
            }
            return PutMessageStatus.SERVICE_NOT_AVAILABLE;
        } else {
            this.printTimes.set(0);
        }
        // 操作系统是否处于PAGECACHE繁忙状态
        if (this.isOSPageCacheBusy()) {
            return PutMessageStatus.OS_PAGECACHE_BUSY;
        }
        return PutMessageStatus.PUT_OK;
    }

消息长度检查

checkMessage方法主要是对主题的长度校验和消息属性的长度校验:

  private PutMessageStatus checkMessage(MessageExtBrokerInner msg) {
        // 如果主题的长度大于最大值
        if (msg.getTopic().length() > Byte.MAX_VALUE) {
            log.warn("putMessage message topic length too long " + msg.getTopic().length());
            return PutMessageStatus.MESSAGE_ILLEGAL;
        }
        // 如果消息属性长度大于最大值
        if (msg.getPropertiesString() != null && msg.getPropertiesString().length() > Short.MAX_VALUE) {
            log.warn("putMessage message properties length too long " + msg.getPropertiesString().length());
            return PutMessageStatus.MESSAGE_ILLEGAL;
        }
        return PutMessageStatus.PUT_OK;
    }

checkLmqMessage

checkLmqMessage主要判断在开启LMQ(Light Message Queue)时是否超过了最大消费数量:

  private PutMessageStatus checkLmqMessage(MessageExtBrokerInner msg) {
        // 如果消息属性不为空、存在PROPERTY_INNER_MULTI_DISPATCH属性、并且超过了最大消费数量
        if (msg.getProperties() != null
            && StringUtils.isNotBlank(msg.getProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH))
            && this.isLmqConsumeQueueNumExceeded()) {
            return PutMessageStatus.LMQ_CONSUME_QUEUE_NUM_EXCEEDED;
        }
        return PutMessageStatus.PUT_OK;
   }

   private boolean isLmqConsumeQueueNumExceeded() {
        // 开启了LMQ && 开启了多个队列分发 && 消费数量大于了限定值
        if (this.getMessageStoreConfig().isEnableLmq() && this.getMessageStoreConfig().isEnableMultiDispatch()
            && this.lmqConsumeQueueNum.get() > this.messageStoreConfig.getMaxLmqConsumeQueueNum()) {
            return true;
        }
        return false;
    }

消息写入

对消息进行校验完毕之后,调用了CommitLogasyncPutMessage进行消息写入,为了简单起见,这里我们先不考虑事务,处理流程如下:

  1. 首先对消息的相关属性进行了设置,主要包括以下内容

    • 存储时间
    • 消息内容的CRC校验和
    • 如果发送消息的主机地址或者当前存储消息的Broker地址使用了IPV6,设置相应的IPV6标识
  2. 获取当前线程绑定的PutMessageThreadLocal对象,里面有一个MessageExtEncoder类型的成员变量,调用它的encode方法可以对消息进行编码,将数据先写入内存buffer,然后调用MessageExtBrokerInnersetEncodedBuff方法将buffer设置到encodedBuff

  3. 加锁,从mappedFileQueue中获取上一次使用的映射文件mappedFile,并更新消息的存储时间, 如果mappedFile为空或者已写满,说明是第一次写入消息还没有创建文件或者上一次写入的文件已达到规定的大小,需要新建一个文件,如果新建文件为空打印错误日志并返回结果

    mappedFile可以看做是每一个Commitlog文件的映射对象,Commitlog文件的大小限定为1G

    mappedFileQueue是所有mappedFile的集合,可以理解为CommitLog文件所在的目录

  4. 调用mappedFileappendMessage方法向文件中追加消息数据,在调用方法时传入了回调函数appendMessageCallback,在CommitLog的构造函数中可以看到是DefaultAppendMessageCallback类型的,所以会进入到DefaultAppendMessageCallback中进行消息写入,如果写入成功,数据会留在操作系统的PAGECACHE中

  5. 调用submitFlushRequest方法执行刷盘策略,判断是否需要立刻将PAGECACHE中的数据刷到磁盘

public class CommitLog {
    // 所有mappedFile集合
    protected final MappedFileQueue mappedFileQueue;
    
    // ThreadLocal
    private final ThreadLocal<PutMessageThreadLocal> putMessageThreadLocal;
    // 写入消息的回调函数
    private final AppendMessageCallback appendMessageCallback;
    public CommitLog(final DefaultMessageStore defaultMessageStore) { // 构造函数
        //...
        // 创建回调函数
        this.appendMessageCallback = new DefaultAppendMessageCallback(defaultMessageStore.getMessageStoreConfig().getMaxMessageSize());
        //...
    }
  
    public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) {
        // 设置存储时间
        msg.setStoreTimestamp(System.currentTimeMillis());
        // 设置消息的CRC值
        msg.setBodyCRC(UtilAll.crc32(msg.getBody()));
        // 写入结果
        AppendMessageResult result = null;
        // 获取存储统计服务
        StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();
        // 获取主题
        String topic = msg.getTopic();
        // 获取事务类型
        final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
        if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
                || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
            // 省略事务相关处理
        }
        // 获取发送消息的主机地址
        InetSocketAddress bornSocketAddress = (InetSocketAddress) msg.getBornHost();
        if (bornSocketAddress.getAddress() instanceof Inet6Address) { // 如果是IPV6
            msg.setBornHostV6Flag(); // 设置IPV6标识
        }
        // 获取存储消息的主机地址
        InetSocketAddress storeSocketAddress = (InetSocketAddress) msg.getStoreHost();
        if (storeSocketAddress.getAddress() instanceof Inet6Address) {
            msg.setStoreHostAddressV6Flag(); // 设置IPV6标识
        }
        // 获取当前线程绑定的PutMessageThreadLocal对象
        PutMessageThreadLocal putMessageThreadLocal = this.putMessageThreadLocal.get();
        // 调用encode方法对消息进行编码,并写入buffer
        PutMessageResult encodeResult = putMessageThreadLocal.getEncoder().encode(msg);
        if (encodeResult != null) {
            return CompletableFuture.completedFuture(encodeResult);
        }
        // 将存储编码消息的buffer设置到msg中
        msg.setEncodedBuff(putMessageThreadLocal.getEncoder().encoderBuffer);
        // 创建PutMessageContext
        PutMessageContext putMessageContext = new PutMessageContext(generateKey(putMessageThreadLocal.getKeyBuilder(), msg));
        long elapsedTimeInLock = 0;
        MappedFile unlockMappedFile = null;
        // 加锁
        putMessageLock.lock(); 
        try {
            // 获取上一次写入的文件
            MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
            // 获取系统时间戳
            long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
            this.beginTimeInLock = beginLockTimestamp;
            // 再次更新存储时间戳,保证全局顺序
            msg.setStoreTimestamp(beginLockTimestamp);
            // 如果mapppedFile为空或者已满,说明是第一次写入消息还没有创建文件或者上一次写入的文件已满,需要新建一个文件
            if (null == mappedFile || mappedFile.isFull()) {
                // 使用偏移量0创建一个新的文件
                mappedFile = this.mappedFileQueue.getLastMappedFile(0);
            }
            // 如果依旧为空
            if (null == mappedFile) {
                // 提示错误
                log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
                return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null));
            }
            // 写入消息
            result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext);
            // ...

            elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;
        } finally {
            beginTimeInLock = 0;
            putMessageLock.unlock();
        }
        // ...
        PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);

        // 统计相关
        storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).add(1);
        storeStatsService.getSinglePutMessageTopicSizeTotal(topic).add(result.getWroteBytes());
        // 执行刷盘
        CompletableFuture<PutMessageStatus> flushResultFuture = submitFlushRequest(result, msg);
        CompletableFuture<PutMessageStatus> replicaResultFuture = submitReplicaRequest(result, msg);
        return flushResultFuture.thenCombine(replicaResultFuture, (flushStatus, replicaStatus) -> {
            if (flushStatus != PutMessageStatus.PUT_OK) {
                putMessageResult.setPutMessageStatus(flushStatus);
            }
            if (replicaStatus != PutMessageStatus.PUT_OK) {
                putMessageResult.setPutMessageStatus(replicaStatus);
            }
            // 返回结果
            return putMessageResult;
        });
    }
}

写入内存Buffer

编码消息

MessageExtEncoderCommitLog的一个内部类,它被CommitLog的另外一个内部类PutMessageThreadLocal所引用,ThreadLocal一般用于多线程环境下,为每个线程创建自己的副本变量,从而互不影响,PutMessageThreadLocal在构造函数中对MessageExtEncoder进行了实例化,并指定了创建缓冲区的大小:

public class CommitLog {
    
    // ThreadLocal
    private final ThreadLocal<PutMessageThreadLocal> putMessageThreadLocal;
    
    // 添加消息的ThreadLocal对象
    static class PutMessageThreadLocal {
        private MessageExtEncoder encoder; // 引用MessageExtEncoder
        private StringBuilder keyBuilder;
        PutMessageThreadLocal(int size) {
            // 创建MessageExtEncoder,size用来指定分配内存的大小
            encoder = new MessageExtEncoder(size);
            keyBuilder = new StringBuilder();
        }
        // ...
    }
}

MessageExtEncoder中使用了ByteBuffer作为消息内容存放的缓冲区,上面可知缓冲区的大小是在PutMessageThreadLocal的构造函数中指定的,MessageExtEncoder的encode方法中对消息进了编码并将数据写入分配的缓冲区

  1. 对消息属性数据的长度进行校验判断是否超过限定值
  2. 对总消息内容长度进行校验,判断是否超过最大的长度限制
  3. 根据总消息内容长度对buffer进行初始化,也就是根据消息需要的大小申请一块内存区域
  4. 将消息相关信息写入buffer:
    • 写入消息长度
    • 写入魔数
    • 写入消息体CRC校验和
    • 写入队列ID
    • 写入标识
    • 队列的偏移量, 需要注意这里还没达到偏移量的值,先占位稍后写入
    • 文件的物理偏移量, 先占位稍后写入
    • 写入系统标识
    • 写入发送消息的时间戳
    • 写入发送消息的主机地址
    • 写入存储时间戳
    • 写入存储消息的主机地址
    • RECONSUMETIMES
    • Prepared Transaction Offset
    • 写入消息体长度和消息内容
    • 写入主题长度
    • 写入主题
    • 写入属性长度和属性内容
public class CommitLog {
    
    // MessageExtEncoder
    public static class MessageExtEncoder {
        // 字节缓冲区,存储消息内容的buffer
        private final ByteBuffer encoderBuffer;
        
        MessageExtEncoder(final int size) {
            // 分配内存
            this.encoderBuffer = ByteBuffer.allocateDirect(size);
            this.maxMessageSize = size;
        }
        // 对消息进行编码并写入buffer
        protected PutMessageResult encode(MessageExtBrokerInner msgInner) {

            // 消息属性数据
            final byte[] propertiesData =
                    msgInner.getPropertiesString() == null ? null : msgInner.getPropertiesString().getBytes(MessageDecoder.CHARSET_UTF8);
            // 属性数据长度
            final int propertiesLength = propertiesData == null ? 0 : propertiesData.length;
            // 校验长度是否超过最大值
            if (propertiesLength > Short.MAX_VALUE) {
                log.warn("putMessage message properties length too long. length={}", propertiesData.length);
                return new PutMessageResult(PutMessageStatus.PROPERTIES_SIZE_EXCEEDED, null);
            }
            // 获取主题数据
            final byte[] topicData = msgInner.getTopic().getBytes(MessageDecoder.CHARSET_UTF8);
            final int topicLength = topicData.length;// 主题数据长度
            // 获取消息体内容长度
            final int bodyLength = msgInner.getBody() == null ? 0 : msgInner.getBody().length;
            // 总消息内容长度
            final int msgLen = calMsgLength(msgInner.getSysFlag(), bodyLength, topicLength, propertiesLength);

            // 是否超过最大长度限制
            if (msgLen > this.maxMessageSize) {
                CommitLog.log.warn("message size exceeded, msg total size: " + msgLen + ", msg body size: " + bodyLength
                        + ", maxMessageSize: " + this.maxMessageSize);
                return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null);
            }

            // 初始化
            this.resetByteBuffer(encoderBuffer, msgLen);
            // 1 写入消息长度
            this.encoderBuffer.putInt(msgLen);
            // 2 写入魔数
            this.encoderBuffer.putInt(CommitLog.MESSAGE_MAGIC_CODE);
            // 3 写入消息体CRC校验和
            this.encoderBuffer.putInt(msgInner.getBodyCRC());
            // 4 写入队列ID
            this.encoderBuffer.putInt(msgInner.getQueueId());
            // 5 写入标识
            this.encoderBuffer.putInt(msgInner.getFlag());
            // 6 队列的偏移量, 稍后写入
            this.encoderBuffer.putLong(0);
            // 7 文件的物理偏移量, 稍后写入
            this.encoderBuffer.putLong(0);
            // 8 写入系统标识
            this.encoderBuffer.putInt(msgInner.getSysFlag());
            // 9 写入发送消息的时间戳
            this.encoderBuffer.putLong(msgInner.getBornTimestamp());
            // 10 写入发送消息的主机地址
            socketAddress2ByteBuffer(msgInner.getBornHost() ,this.encoderBuffer);
            // 11 写入存储时间戳
            this.encoderBuffer.putLong(msgInner.getStoreTimestamp());
            // 12 写入存储消息的主机地址
            socketAddress2ByteBuffer(msgInner.getStoreHost() ,this.encoderBuffer);
            // 13 RECONSUMETIMES
            this.encoderBuffer.putInt(msgInner.getReconsumeTimes());
            // 14 Prepared Transaction Offset
            this.encoderBuffer.putLong(msgInner.getPreparedTransactionOffset());
            // 15 写入消息体长度
            this.encoderBuffer.putInt(bodyLength);
            if (bodyLength > 0)
                this.encoderBuffer.put(msgInner.getBody());// 写入消息内容
            // 16 写入主题长度
            this.encoderBuffer.put((byte) topicLength);
            // 写入主题
            this.encoderBuffer.put(topicData);
            // 17 写入属性长度
            this.encoderBuffer.putShort((short) propertiesLength);
            if (propertiesLength > 0)
                this.encoderBuffer.put(propertiesData); // 写入属性数据
            encoderBuffer.flip();
            return null;
        }
    }
}

写入内存映射文件

前面提到MappedFile可以看做是每一个Commitlog文件的映射,里面记录了文件的大小以及数据已经写入的位置,还有两个字节缓冲区ByteBuffer和MappedByteBuffer,它们的继承关系如下:

ByteBuffer:字节缓冲区,用于在内存中分配空间,可以在JVM堆中分配内存(HeapByteBuffer),也可以在堆外分配内存(DirectByteBuffer)。

MappedByteBuffer:是ByteBuffer的子类,它是将磁盘的文件内容映射到虚拟地址空间,通过虚拟地址访问物理内存中映射的文件内容,也叫文件映射,可以减少数据的拷贝。

MappedFile提供了两种方式来进行内容的写入,对应不同的init方法:

第一种通过ByteBuffer分配缓冲区并将内容写入缓冲区,并且使用了暂存池对内存进行管理,需要时进行申请,使用完毕后回收,类似于数据库连接池。

第二种是通过MappedByteBuffer,对CommitLog进行文件映射,然后进行消息写入。

综上所述,开启使用暂存池时会使用ByteBuffer,否则使用MappedByteBuffer进行内容写入。

public class MappedFile extends ReferenceResource {
    // 记录文件的写入位置
    protected final AtomicInteger wrotePosition = new AtomicInteger(0);
    // 文件大小
    protected int fileSize;
    // 字节buffer
    protected ByteBuffer writeBuffer = null;
    // 文件映射
    private MappedByteBuffer mappedByteBuffer;
    // 暂存池,类似线程池,只不过池中存放的是申请的内存
    protected TransientStorePool transientStorePool = null;
    // 初始化
    public void init(final String fileName, final int fileSize,
        final TransientStorePool transientStorePool) throws IOException {
        init(fileName, fileSize);
        // 从暂存池中获取一块内存
        this.writeBuffer = transientStorePool.borrowBuffer();
        this.transientStorePool = transientStorePool;
    }
    
    // 初始化
    private void init(final String fileName, final int fileSize) throws IOException {
        // ...
        try {
            // 获取文件
            this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel();
            // 进行文件映射
            this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize);
            TOTAL_MAPPED_VIRTUAL_MEMORY.addAndGet(fileSize);
            TOTAL_MAPPED_FILES.incrementAndGet();
            ok = true;
        } catch (FileNotFoundException e) {
            // ...
        } catch (IOException e) {
            // ...
        } finally {
            if (!ok && this.fileChannel != null) {
                this.fileChannel.close();
            }
        }
    }
}

// 暂存池
public class TransientStorePool {
    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);

    private final int poolSize; // 暂存池大小
    private final int fileSize; // 申请的每一块内存大小
    private final Deque<ByteBuffer> availableBuffers; // 双端队列,存放申请的内存
    private final MessageStoreConfig storeConfig; // 存储配置

    public TransientStorePool(final MessageStoreConfig storeConfig) {
        this.storeConfig = storeConfig;
        this.poolSize = storeConfig.getTransientStorePoolSize();
        this.fileSize = storeConfig.getMappedFileSizeCommitLog();
        this.availableBuffers = new ConcurrentLinkedDeque<>();
    }

    /**
     * 初始化
     */
    public void init() {
        // 根据暂存池大小申请内存
        for (int i = 0; i < poolSize; i++) {
            // 申请直接内存
            ByteBuffer byteBuffer = ByteBuffer.allocateDirect(fileSize);
            final long address = ((DirectBuffer) byteBuffer).address();
            Pointer pointer = new Pointer(address);
            LibC.INSTANCE.mlock(pointer, new NativeLong(fileSize));
            // 放入到暂存池中
            availableBuffers.offer(byteBuffer);
        }
    }
}

经过之前的步骤,消息内容已经写入到内存缓冲区中,并且也知道准备进行写入的CommitLog对应的映射文件,接下来就可以调用MappedFile的appendMessagesInner方法将内存中的内容写入映射文件,处理逻辑如下:

  1. MappedFile中记录了文件的写入位置,获取准备写入的位置,如果写入的位置小于文件大小,意味着当前文件可以进行内容写入,反之说明此文件已写满,不能继续下一步,需要返回错误信息

  2. 如果writeBuffer不为空,使用writeBuffer,否则使用mappedByteBuffer的slice方法创建一个与MappedFile共享的内存区byteBuffer,设置byteBuffer的写入位置,之后通过byteBuffer来进行消息写入,由于是共享内存区域,所以写入的内容会影响到writeBuffer或者mappedByteBuffer中

  3. 调用回调函数的doAppend方法进行写入,前面可知回调函数是DefaultAppendMessageCallback类型的

  4. 更新MappedFile写入位置,返回写入结果

public class MappedFile extends ReferenceResource {
    // 记录文件的写入位置
    protected final AtomicInteger wrotePosition = new AtomicInteger(0);
    // 文件大小
    protected int fileSize;
    // 字节buffer
    protected ByteBuffer writeBuffer = null;
    // 文件映射
    private MappedByteBuffer mappedByteBuffer;
    // 写入消息
    public AppendMessageResult appendMessage(final MessageExtBrokerInner msg, final AppendMessageCallback cb,
            PutMessageContext putMessageContext) {
        // 调用appendMessagesInner
        return appendMessagesInner(msg, cb, putMessageContext);
    }
    
    public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb,
            PutMessageContext putMessageContext) {
        assert messageExt != null;
        assert cb != null;
        // 获取写入位置
        int currentPos = this.wrotePosition.get();
        // 如果写指针小于文件大小
        if (currentPos < this.fileSize) {
            // 如果writeBuffer不为空,使用writeBuffer的slice方法创建共享内存区,否则使用mappedByteBuffer
            ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice();
            // 设置共享内存区的写入位置
            byteBuffer.position(currentPos);
            AppendMessageResult result;
            if (messageExt instanceof MessageExtBrokerInner) { // 单个消息处理
                // 通过共享内存区byteBuffer写入数据
                result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos,
                        (MessageExtBrokerInner) messageExt, putMessageContext);
            } else if (messageExt instanceof MessageExtBatch) { // 批量消息
                // 通过共享内存区byteBuffer写入数据
                result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos,
                        (MessageExtBatch) messageExt, putMessageContext);
            } else {
                return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
            }
            // 更新MappedFile的写入位置
            this.wrotePosition.addAndGet(result.getWroteBytes());
            this.storeTimestamp = result.getStoreTimestamp();
            return result;
        }
        log.error("MappedFile.appendMessage return null, wrotePosition: {} fileSize: {}", currentPos, this.fileSize);
        return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
    }
}

进入到DefaultAppendMessageCallbackdoAppend方法中,首先来看方法的入参:

  • fileFromOffset:文件的起始位置偏移量
  • byteBuffer:缓冲区,也就是上一步中创建的共享内存区
  • maxBlank:上一步中可知传入的是文件总大小减去当前要写入的位置,也就是文件剩余空间大小
  • msgInner:消息内容的封装体
  • putMessageContext:消息写入上下文

方法的处理逻辑如下:

  1. 计算文件要写入位置偏移量:文件起始位置偏移量 + 准备写入位置的偏移量

  2. 从消息写入上下文中获取主题所属队列的KEY,根据KEY从主题队列路由表中获取队列偏移量,如果获取为空,将偏移量初始化为0并加入到路由表中

  3. 从msgInner中获取之前已经写入到内存的消息数据preEncodeBuffer,并获取消息内容的长度

  4. 校验是否有足够的空间写入数据,如果消息长度 + END_FILE_MIN_BLANK_LENGTH(预留空间大小) 大于剩余空间,说明超出了限定的文件大小,此时只将文件大小和魔数写入文件,然后返回写入结果,结果类型为END_OF_FILE(超过文件大小)。

    这里可以看出每个CommitLog文件需要预留一部分空间(8个字节)用于存储文件大小和魔数。

  5. 计算队列偏移量在preEncodeBuffer中的位置,之前在编码消息步骤时并未写入队列的偏移量值的大小,这里需要找到对应位置更新队列偏移量的值

  6. 再次更新消息的存储时间,并将preEncodeBuffer的内容写入文件共享缓冲区byteBuffer,**此时消息内容已经写入文件对应的内存buffer中,驻留在操作系统的PAGECACHE中,接下来需要根据刷盘策略决定何时将内容保存到硬盘中。 **

消息写入结果

  • PUT_OK:写入成功;
  • END_OF_FILE:超过文件大小;
  • MESSAGE_SIZE_EXCEEDED:消息长度超过最大允许长度:
  • PROPERTIES_SIZE_EXCEEDED:消息、属性超过最大允许长度;
  • UNKNOWN_ERROR:未知异常;
public class CommitLog {
    class DefaultAppendMessageCallback implements AppendMessageCallback {
        // 预留空间大小,8个字节
        private static final int END_FILE_MIN_BLANK_LENGTH = 4 + 4;
        public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank,
            final MessageExtBrokerInner msgInner, PutMessageContext putMessageContext) {
            // 计算写入位置物理偏移量:文件起始位置 + 准备写入位置的偏移量
            long wroteOffset = fileFromOffset + byteBuffer.position();

            Supplier<String> msgIdSupplier = () -> {
                int sysflag = msgInner.getSysFlag();
                int msgIdLen = (sysflag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 4 + 4 + 8 : 16 + 4 + 8;
                ByteBuffer msgIdBuffer = ByteBuffer.allocate(msgIdLen);
                MessageExt.socketAddress2ByteBuffer(msgInner.getStoreHost(), msgIdBuffer);
                msgIdBuffer.clear();//because socketAddress2ByteBuffer flip the buffer
                msgIdBuffer.putLong(msgIdLen - 8, wroteOffset);
                return UtilAll.bytes2string(msgIdBuffer.array());
            };

            // 获取消息队列信息
            String key = putMessageContext.getTopicQueueTableKey();
            // 从主题队列路由表中获取队列偏移量
            Long queueOffset = CommitLog.this.topicQueueTable.get(key);
            // 如果偏移量为空
            if (null == queueOffset) {
                queueOffset = 0L; // 初始化为0
                // 添加到路由表中
                CommitLog.this.topicQueueTable.put(key, queueOffset);
            }

            boolean multiDispatchWrapResult = CommitLog.this.multiDispatch.wrapMultiDispatch(msgInner);
            if (!multiDispatchWrapResult) {
                return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
            }

            // 如果开启事务需要特殊处理
            final int tranType = MessageSysFlag.getTransactionValue(msgInner.getSysFlag());
            // ...
            // 获取之前已经写入到buffer的消息数据
            ByteBuffer preEncodeBuffer = msgInner.getEncodedBuff();
            // 获取数据长度
            final int msgLen = preEncodeBuffer.getInt(0);

            // 校验是否有足够的空间写入数据,如果消息长度 + 预留空间大小 大于最大值
            if ((msgLen + END_FILE_MIN_BLANK_LENGTH) > maxBlank) {
                this.msgStoreItemMemory.clear();
                // 1 设置文件大小
                this.msgStoreItemMemory.putInt(maxBlank);
                // 2 写入魔数
                this.msgStoreItemMemory.putInt(CommitLog.BLANK_MAGIC_CODE);
                // 开始时间
                final long beginTimeMills = CommitLog.this.defaultMessageStore.now();
                // 将文件大小和魔数写入buffer
                byteBuffer.put(this.msgStoreItemMemory.array(), 0, 8);
                // 返回写入结果,由于剩余空间不足以写入消息内容,这里返回类型为END_OF_FILE
                return new AppendMessageResult(AppendMessageStatus.END_OF_FILE, wroteOffset,
                        maxBlank, /* only wrote 8 bytes, but declare wrote maxBlank for compute write position */
                        msgIdSupplier, msgInner.getStoreTimestamp(),
                        queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
            }
            // 计算队列偏移量的位置
            int pos = 4 + 4 + 4 + 4 + 4;
            // 6 写入队列偏移量
            preEncodeBuffer.putLong(pos, queueOffset);
            pos += 8;
            // 7 写入物理偏移量
            preEncodeBuffer.putLong(pos, fileFromOffset + byteBuffer.position());
            int ipLen = (msgInner.getSysFlag() & MessageSysFlag.BORNHOST_V6_FLAG) == 0 ? 4 + 4 : 16 + 4;
            // 8 系统标识, 9 BORNTIMESTAMP, 10 BORNHOST, 11 STORETIMESTAMP
            pos += 8 + 4 + 8 + ipLen; // 计算存储时间戳的写入位置
            // 更新新存储时间戳
            preEncodeBuffer.putLong(pos, msgInner.getStoreTimestamp());


            final long beginTimeMills = CommitLog.this.defaultMessageStore.now();
            // 将preEncodeBuffer的数据写入byteBuffer
            byteBuffer.put(preEncodeBuffer);
            // 清空buffer
            msgInner.setEncodedBuff(null);
            // 设置返回结果
            AppendMessageResult result = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, msgLen, msgIdSupplier,
                msgInner.getStoreTimestamp(), queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);

            switch (tranType) {
                case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
                case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
                    break;
                case MessageSysFlag.TRANSACTION_NOT_TYPE:
                case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
                    // The next update ConsumeQueue information
                    CommitLog.this.topicQueueTable.put(key, ++queueOffset);
                    CommitLog.this.multiDispatch.updateMultiQueueOffset(msgInner);
                    break;
                default:
                    break;
            }
            return result;
        }
    }
}

刷盘

由于篇幅原因,刷盘机制将另写一篇文章。

总结

参考
丁威、周继锋《RocketMQ技术内幕》
https://github.com/apache/rocketmq/blob/develop/docs/cn/Example_LMQ.md

RocketMQ版本:4.9.3

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/384024.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

哨声吹响,与世界杯相关的欺诈也在激增

2022 年世界杯已经开始&#xff0c;通过虚假流媒体网站与彩票针对足球迷的诈骗激增。近日&#xff0c;Zscaler 发现与世界杯相关的新注册域名有所增加&#xff0c;尽管并非都是恶意的&#xff0c;也是值得警惕的。 流量趋势 随着世界杯的开赛&#xff0c;从 11 月 21 日流媒体…

Python set集合全部操作方法

文章目录一. 介绍1. 创建set集合2. 判断元素是否在集合内3. 集合推导式(Set comprehension)二. 集合基本操作1. add&#xff08;&#xff09;添加单个元素2. update&#xff08;&#xff09;添加列表&#xff0c;元组&#xff0c;字典等整体数据3. remove ( ) 移除元素&#xf…

PACS三维影像后处理系统源码 PACS源码

PACS源码 PACS3D影像后处理系统源码 一、系统概述&#xff1a; ​基于VC MSSQL开发的一套三甲医院医学影像PACS系统源码&#xff0c;集成3D影像后处理功能&#xff0c;包括三维多平面重建、三维容积重建、三维表面重建、三维虚拟内窥镜、最大/小密度投影、心脏动脉钙化分析等…

mysql数据库之视图

视图&#xff08;view&#xff09;是一种虚拟的存在&#xff0c;视图中的数据并不在数据库中实际存在&#xff0c;行和列数据来自定义视图的查询中使用的表&#xff0c;并且是在使用视图时动态生成的。 通俗的讲&#xff0c;视图之保存了查询的sql逻辑&#xff0c;不保存查询结…

【C语言】函数指针和指针函数

文章目录[TOC](文章目录)前言概述函数指针定义&#xff1a;使用&#xff1a;回调函数指针函数前言 今天学一下函数指针 提示&#xff1a;以下是本篇文章正文内容&#xff0c;下面案例可供参考 概述 函数指针&#xff1a;是一个指向函数的指针&#xff0c;在内存空间中存放的…

关于小程序内存在违规内容的处理警告

遇到了问题&#xff0c;就记录一下解决过程吧 小程序涉及提供在线观看视频服务&#xff0c;请补充文娱-视频广场类目&#xff1b; 我们的小程序做了一个类似抖音上下滑动观看视频的页面&#xff0c;被警告了&#xff0c;查看了文档&#xff0c; https://developers.weixin.qq.…

C语言程序-计算闰年平年

#include <stdio.h> int main() { int year ,month,day,i1; printf("欢迎使用本程序&#xff0c;使用愉快\n") ; while(i<10){ printf("请输入您需要计算的年\n"); scanf("%d",&year); printf("…

安全高效 | AIRIOT智慧工地管理解决方案

建筑工地施工材料、机械设备、工程车、人员各个环节管理相对复杂、建筑业也是安全事故频发的高危行业&#xff0c;安全管控尤为重要。建筑施工单位想要保障安全生产&#xff0c;做好能源消耗管控降低生产成本&#xff0c;需要解决掉很多现状问题&#xff1a;1、工地施工作业人员…

【Leetcode 剑指Offer】第 11 天 双指针(简单)

双指针剑指 Offer 18. 删除链表的节点剑指 Offer 18. 删除链表的节点 给定单向链表的头指针和一个要删除的节点的值&#xff0c;定义一个函数删除该节点。 返回删除后的链表的头节点。 示例 1: 输入: head [4,5,1,9], val 5 输出: [4,1,9] 解释: 给定你链表中值为 5 的第二…

服务器(centos7.6)已经安装了宝塔面板,想在里面安装一个SVN工具(subversion),应该如何操作呢?

首先&#xff0c;在登录进入宝塔面板&#xff0c;然后点击左侧终端&#xff0c;进入终端界面&#xff0c;如下图&#xff1a;------------------------------------------如果是第一次使用会弹出输入服务器用户名和密码&#xff0c;此时输入root账号和密码&#xff0c;即可进入…

2.基于Label studio的训练数据标注指南:(智能文档)文档抽取任务、PDF、表格、图片抽取标注等

文档抽取任务Label Studio使用指南 1.基于Label studio的训练数据标注指南&#xff1a;信息抽取&#xff08;实体关系抽取&#xff09;、文本分类等 2.基于Label studio的训练数据标注指南&#xff1a;&#xff08;智能文档&#xff09;文档抽取任务、PDF、表格、图片抽取标注等…

Python3-集合

Python3 集合 集合&#xff08;set&#xff09;是一个无序的不重复元素序列。 可以使用大括号 { } 或者 set() 函数创建集合&#xff0c;注意&#xff1a;创建一个空集合必须用 set() 而不是 { }&#xff0c;因为 { } 是用来创建一个空字典。 创建格式&#xff1a; parame …

奇淫技巧:熟练使用Fetch一个干翻PostMan的顶级技巧

一&#xff1a;如何使用fetch重新发送一个接口请求&#xff1f; 按照如下的请求进行复制 fetch("https://bip-test.yyuap.com/mdf-node/uniform/user/checkAuthByCode?terminalType1&serviceCodeorderList&codeuserdef_filterItem,userdef_schemaSetting"…

《C++ Primer》第十章 泛型算法

《C Primer》第十章 泛型算法 10.1 概述 大多数算法定义在头文件algorithm中&#xff0c;还有一些算法在numeric中。例如标准库算法find: int val 42;//即将查找的值 //如果在vec中找到想要的元素&#xff0c;则返回结果指向它&#xff0c;否则返回vec.cend() auto result …

小样本学习--学习记录

之前在做课题的时候&#xff0c;把数据不均衡和小样本的概念混淆了&#xff0c;昨天看了一篇论文&#xff1a;《 面向小样本数据的机器学习方法研究综述 &#xff08;陈良臣&#xff0c;傅德印&#xff09;》 &#xff0c;这篇论文写的非常清晰。推荐阅读。 网上的一些综述整理…

2月VR大数据:硬件份额变化不大,PS VR2首发游戏超50款

Hello大家好&#xff0c;每月一期的VR内容/硬件大数据统计又和大家见面了。 想了解VR软硬件行情么&#xff1f;关注这里就对了。我们会统计Steam平台的用户及内容等数据&#xff0c;每月初准时为你推送&#xff0c;不要错过喔&#xff01;本数据报告包含&#xff1a;Steam VR硬…

擅长捉弄的内存马同学:Valve内存马

前言 内存马的文章已经很久没有更新过了&#xff0c; 这篇文章不太适合想直接学习利用Valve内存马的师傅 &#xff0c;因为我这篇文章可能会有大篇笔墨去说Tomcat容器&#xff0c;至于原因就是我想更深入的了解一些Tomcat&#xff0c;而Valve内存马属于已经被师傅们玩烂了的一…

现场设备发生故障,如何第一时间通知相关人员?

一、前言 虹科物联网HMI作为一站式物联网解决方案&#xff0c;致力于解决用户在数据采集和可视化、远程监控、边缘计算、软PLC、数据存储&#xff08;SQL数据库和CSV文件&#xff09;、数据上云&#xff08;OPC UA、MQTT&#xff09;等方面的需求&#xff0c;帮助企业快速实现…

运营数据分析模型—用户画像

用户画像 伴随着大数据应用的讨论、创新,个性化技术成为了一个重要落地点。相比传统的线下会员管理、问卷调查、购物篮分析,大数据第一次使得企业能够通过互联网便利地获取用户更为广泛的反馈信息,为进一步精准、快速地分析用户行为习惯、消费习惯等重要商业信息,提供了足…

SpringCloud系列(十四)[分布式搜索引擎篇] - 索引库及文档的增删改查操作

本文主要介绍一下索引库及文档的一些增删改查操作, 以下都是一些常用的操作, 无需死记硬背, 只需要用到的时候常翻阅即可;   当然学习索引库和文档的一些基本操作还是要先在虚拟机启动一下 elasticsearch 及 kibana, 启动成功后输入 172.16.xx.xxx:5601 后出现以下界面即启动成…