深度解析RocketMq源码-持久化组件(四) CommitLog

news2024/10/23 2:52:09

1.绪论

commitLog是rocketmq存储的核心,前面我们介绍了mappedfile、mappedfilequeue、刷盘策略,其实commitlog的核心组件我们基本上已经介绍完成。

2.commitLog的组成

commitLog的核心其实就是MqppedFilequeue,它本质上就是多个mappedFile的queue,所以可以看出commitLog和mappedFile是一对多的关系。

2.1 commitLog的基本组件

下面是commitLog的具体组成的一些组件:

public class CommitLog implements Swappable {
    // Message's MAGIC CODE daa320a7
    //commitLog的魔数
    public final static int MESSAGE_MAGIC_CODE = -626843481;
    protected static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
    // End of file empty MAGIC CODE cbd43194
    public final static int BLANK_MAGIC_CODE = -875286124;
    /**
     * CRC32 Format: [PROPERTY_CRC32 + NAME_VALUE_SEPARATOR + 10-digit fixed-length string + PROPERTY_SEPARATOR]
     */
    public static final int CRC32_RESERVED_LEN = MessageConst.PROPERTY_CRC32.length() + 1 + 10 + 1;
    //核心:即MqppedFilequeue是真正存储消息的地方,可以看出是由多个MappedFile组成
    protected final MappedFileQueue mappedFileQueue;
    //当前commitLog所属的MessageStore组件
    protected final DefaultMessageStore defaultMessageStore;
    //进行flush的组件
    private final FlushManager flushManager;
    //不常用的数据的检查组件
    private final ColdDataCheckService coldDataCheckService;
    //真正写入数据的组件
    private final AppendMessageCallback appendMessageCallback;
    private final ThreadLocal<PutMessageThreadLocal> putMessageThreadLocal;

    protected volatile long confirmOffset = -1L;

    private volatile long beginTimeInLock = 0;

    protected final PutMessageLock putMessageLock;

    protected final TopicQueueLock topicQueueLock;

    private volatile Set<String> fullStorePaths = Collections.emptySet();
    //刷新磁盘的监视器
    private final FlushDiskWatcher flushDiskWatcher;
    //commitlog大小,默认为1gb
    protected int commitLogSize;

    private final boolean enabledAppendPropCRC;
    //多路分发器,rocketmq为了支持mutt协议的组件
    protected final MultiDispatch multiDispatch;
}

构造函数:

public CommitLog(final DefaultMessageStore defaultMessageStore) {
        String storePath = defaultMessageStore.getMessageStoreConfig().getStorePathCommitLog();
        //初始化messagequeue,这个时候文件会与直接内存建立映射关系,并且完成文件预热
        if (storePath.contains(MessageStoreConfig.MULTI_PATH_SPLITTER)) {
            this.mappedFileQueue = new MultiPathMappedFileQueue(defaultMessageStore.getMessageStoreConfig(),
                    defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog(),
                    defaultMessageStore.getAllocateMappedFileService(), this::getFullStorePaths);
        } else {
            this.mappedFileQueue = new MappedFileQueue(storePath,
                    defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog(),
                    defaultMessageStore.getAllocateMappedFileService());
        }

        this.defaultMessageStore = defaultMessageStore;

        //如果是同步的时候,采用GroupCommitService进行flush
        if (FlushDiskType.SYNC_FLUSH == defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
            this.flushCommitLogService = new GroupCommitService();
        } else {
            //如果是异步刷盘,采用FlushRealTimeService进行flush
            this.flushCommitLogService = new FlushRealTimeService();
        }
        //开启瞬时缓存池技术的话,采用CommitRealTimeService进行commit
        this.commitLogService = new CommitRealTimeService();

        this.appendMessageCallback = new DefaultAppendMessageCallback();
        //每个线程都有一个消息的编码器,MessageExtEncoder负责将消息进行编码到bytebuffer中
        putMessageThreadLocal = new ThreadLocal<PutMessageThreadLocal>() {
            @Override
            protected PutMessageThreadLocal initialValue() {
                return new PutMessageThreadLocal(defaultMessageStore.getMessageStoreConfig().getMaxMessageSize());
            }
        };
        this.putMessageLock = defaultMessageStore.getMessageStoreConfig().isUseReentrantLockWhenPutMessage() ? new PutMessageReentrantLock() : new PutMessageSpinLock();
        //rocketmq实现,mutt协议的组件,它可以让消息只有一个commitlog但是分发到多个queue中
        this.multiDispatch = new MultiDispatch(defaultMessageStore, this);
        //当采用同步刷盘的时候,调用线程会阻塞等待刷盘结果,FlushDiskWatcher会检测每一个flush请求,如果超过完成时间
        //便会自动的唤醒调用线程,防止调用线程阻塞过久的场景出现
        flushDiskWatcher = new FlushDiskWatcher();
    }

2.2 commtLog真正存储消息的地方-MappedFileQueue

mappedfilequeue是commitLog的核心,在前面mappedfilequeue中已经仔细分析过,这里不再赘述。详情请看:深度解析RocketMq源码-持久化组件(二) MappedFileQueue-CSDN博客

2.3 commitLog是如何落盘的-FlushCommitLogService

flushmanager包含了是对commotlog罗盘进行处理的组件,它包含了commit和flush两个逻辑。在前面刷盘策略中已经仔细分析过,这里不再赘述。详情请看:

深度解析RocketMq源码-持久化组件(三) 刷盘策略-CSDN博客

2.4 同步刷盘失败怎么办 -FlushDiskWatcher

当采用同步刷盘的时候,调用线程会阻塞等待刷盘结果,FlushDiskWatcher会检测每一个flush请求,如果超过完成时间便会自动的唤醒调用线程,防止调用线程阻塞过久的场景出现。由此可以看出,就算是同步刷盘策略,也有可能因为超过3s没有刷盘成功导致数据丢失。
 @Override
    public void run() {
        while (!isStopped()) {
            GroupCommitRequest request = null;
            try {
                //获取提交的flush请求
                request = commitRequests.take();
            } catch (InterruptedException e) {
                log.warn("take flush disk commit request, but interrupted, this may caused by shutdown");
                continue;
            }
            //
            while (!request.future().isDone()) {
                long now = System.nanoTime();
                //如果超过了请求的超时时间
                if (now - request.getDeadLine() >= 0) {
                    //返回刷盘超时
                    request.wakeupCustomer(PutMessageStatus.FLUSH_DISK_TIMEOUT);
                    break;
                }
                // To avoid frequent thread switching, replace future.get with sleep here,
                long sleepTime = (request.getDeadLine() - now) / 1_000_000;
                sleepTime = Math.min(10, sleepTime);
                if (sleepTime == 0) {
                    request.wakeupCustomer(PutMessageStatus.FLUSH_DISK_TIMEOUT);
                    break;
                }
                try {
                    Thread.sleep(sleepTime);
                } catch (InterruptedException e) {
                    log.warn(
                            "An exception occurred while waiting for flushing disk to complete. this may caused by shutdown");
                    break;
                }
            }
        }
    }

3. commitLog的常见方法

3.1 commitLog是如何写入消息的 - asyncPutMessage

commitLog的核心方法就是写入消息,本质上就是在写入消息前对消息设置一些参数,比如存储时间戳,或者顺序消息的话,会转移topic,会把消息真正的topic覆盖原来的topic。然后获取到最后一个commitLog,并调用commitLog的appendMessage方法写入消息。所以commitLog其实是将消息顺序写入的。

  public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) {
        // Set the storage time
        //设置存储时间戳
        msg.setStoreTimestamp(System.currentTimeMillis());
        // Set the message body BODY CRC (consider the most appropriate setting
        // on the client)
        //将消息体进行crc编码,并存储到bodyCrc中
        msg.setBodyCRC(UtilAll.crc32(msg.getBody()));
        // Back to Results
        AppendMessageResult result = null;

        StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();
        //获取到对应的topic
        String topic = msg.getTopic();
//        int queueId msg.getQueueId();
        //如果是事务消息,获取到事务类型prepared或者commit或者rollback
        final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
        //如果是非事务消息或者commit消息
        if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
                || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
            // Delay Delivery
            if (msg.getDelayTimeLevel() > 0) {
                //如果是延迟消息的话,需要校验延迟消息的最大延迟是否在mq的要求的范围内
                if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
                    msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
                }
            
                topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;
                //并且将消息的延迟级别获取到延迟消息需要放入的queueid
                int queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());

                // Backup real topic, queueId
                //并且设置消息真正放入的地方也就是SCHEDULE_TOPIC_XXXX中
                MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
                MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
                msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));

                msg.setTopic(topic);
                msg.setQueueId(queueId);
            }
        }
        //设置存储的时间戳
        InetSocketAddress bornSocketAddress = (InetSocketAddress) msg.getBornHost();
        if (bornSocketAddress.getAddress() instanceof Inet6Address) {
            msg.setBornHostV6Flag();
        }

        InetSocketAddress storeSocketAddress = (InetSocketAddress) msg.getStoreHost();
        if (storeSocketAddress.getAddress() instanceof Inet6Address) {
            msg.setStoreHostAddressV6Flag();
        }

        PutMessageThreadLocal putMessageThreadLocal = this.putMessageThreadLocal.get();
        updateMaxMessageSize(putMessageThreadLocal);
        if (!multiDispatch.isMultiDispatchMsg(msg)) {
            //获取到消息的编码器,并且编码,因为encode为一个公共类,为了防止锁竞争,所以放入到threadLocal中
            PutMessageResult encodeResult = putMessageThreadLocal.getEncoder().encode(msg);
            if (encodeResult != null) {
                return CompletableFuture.completedFuture(encodeResult);
            }
            msg.setEncodedBuff(putMessageThreadLocal.getEncoder().getEncoderBuffer());
        }
        //构建消息上下文中
        PutMessageContext putMessageContext = new PutMessageContext(generateKey(putMessageThreadLocal.getKeyBuilder(), msg));

        long elapsedTimeInLock = 0;
        MappedFile unlockMappedFile = null;
        //开始写入消息
        putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
        try {
            MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
            long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
            this.beginTimeInLock = beginLockTimestamp;

            // Here settings are stored timestamp, in order to ensure an orderly
            // global
            msg.setStoreTimestamp(beginLockTimestamp);

            if (null == mappedFile || mappedFile.isFull()) {
                //获取到最后一个mappedfile
                mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise
            }
            if (null == mappedFile) {
                log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
                return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null));
            }
            //调用mappedfile的appendMessage方法写入消息
            result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext);
            switch (result.getStatus()) {
                case PUT_OK:
                    break;
                case END_OF_FILE:
                    unlockMappedFile = mappedFile;
                    // Create a new file, re-write the message
                    mappedFile = this.mappedFileQueue.getLastMappedFile(0);
                    if (null == mappedFile) {
                        // XXX: warn and notify me
                        log.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
                        return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result));
                    }
                    result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext);
                    break;
                case MESSAGE_SIZE_EXCEEDED:
                case PROPERTIES_SIZE_EXCEEDED:
                    return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result));
                case UNKNOWN_ERROR:
                    return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result));
                default:
                    return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result));
            }

            elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;
        } finally {
            beginTimeInLock = 0;
            putMessageLock.unlock();
        }

        if (elapsedTimeInLock > 500) {
            log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", elapsedTimeInLock, msg.getBody().length, result);
        }

        if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
            this.defaultMessageStore.unlockMappedFile(unlockMappedFile);
        }

        PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);

        // Statistics
        storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).add(1);
        storeStatsService.getSinglePutMessageTopicSizeTotal(topic).add(result.getWroteBytes());

        CompletableFuture<PutMessageStatus> flushResultFuture = submitFlushRequest(result, msg);
        //设置返回结果
        CompletableFuture<PutMessageStatus> replicaResultFuture = submitReplicaRequest(result, msg);
        return flushResultFuture.thenCombine(replicaResultFuture, (flushStatus, replicaStatus) -> {
            if (flushStatus != PutMessageStatus.PUT_OK) {
                putMessageResult.setPutMessageStatus(flushStatus);
            }
            if (replicaStatus != PutMessageStatus.PUT_OK) {
                putMessageResult.setPutMessageStatus(replicaStatus);
            }
            return putMessageResult;
        });
    }

3.2 commitLog是如何flush消息的-submitFlushRequest

commitLog如果要flush磁盘的话,其实是提交一个flush请求,然后根据同步刷盘还是异步刷盘,最后交由FlushCommitLogService来消费请求,最后调用mappedFile的flush方法,将数据从buffer中flush到磁盘中去的。

  /**
     * @param result 向mappedfile中追加消息的结果
     * @param messageExt 具体的消息内容已经协议头等
     * @return
     */
    public CompletableFuture<PutMessageStatus> submitFlushRequest(AppendMessageResult result, MessageExt messageExt) {
        // Synchronization flush
        //如果采用同步刷盘
        if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
            //采用的是GroupCommitService进行刷盘
            final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
            //默认是需要等待消息刷盘成功的
            if (messageExt.isWaitStoreMsgOK()) {
                //构建刷盘请求,包括当前消息的在buffer中的写指针
                GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes(),
                        this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
                flushDiskWatcher.add(request);
                //调用GroupCommitService的putRequest推送刷盘请求
                service.putRequest(request);
                //当前线程阻塞等待刷盘结果
                return request.future();
            } else {
                //如果不需要等待刷盘线程成功,便直接唤醒刷盘线程,并且返回成功(此时也可能有丢失数据的风险)
                service.wakeup();
                return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
            }
        }
        // Asynchronous flush
        else {
            //如果采用异步刷盘策略
            if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
                //如果未采用了瞬时缓存池技术,便唤醒flush服务线程
                flushCommitLogService.wakeup();
            } else  {
                //如果未采用了瞬时缓存池技术,便唤醒commit服务线程
                commitLogService.wakeup();
            }
            //返回成功
            return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
        }
    }

3.3 rocketMq是如何实现主从同步的

rocketmq以前采用的方式是通过主从的方式,主节点负责写入,然后并且commitLog同步到从节点的方式实现高可用的,但是这样有个问题,就是主节点宕机过后,需要手动的修改某个从节点为主节点。现在这种架构基本上没有使用,而换用的是后面要讲解的用raft协议实现dledger高可用架构。我们接下来大致了解一下主从架构是如何同步的。

3.3.1 提交复制请求 -submitReplicaRequest

public CompletableFuture<PutMessageStatus> submitReplicaRequest(AppendMessageResult result, MessageExt messageExt) {
        //如果节点为主节点,并且为同步复制
        if (BrokerRole.SYNC_MASTER == this.defaultMessageStore.getMessageStoreConfig().getBrokerRole()) {
            HAService service = this.defaultMessageStore.getHaService();
            //如果消息已经同步完毕
            if (messageExt.isWaitStoreMsgOK()) {
                if (service.isSlaveOK(result.getWroteBytes() + result.getWroteOffset())) {
                    GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes(),
                            this.defaultMessageStore.getMessageStoreConfig().getSlaveTimeout());
                    //会调用HAserver的putRequest向里面发送复制请求
                    service.putRequest(request);
                    service.getWaitNotifyObject().wakeupAll();
                    return request.future();
                }
                else {
                    return CompletableFuture.completedFuture(PutMessageStatus.SLAVE_NOT_AVAILABLE);
                }
            }
        }
        return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
    }

3.3.2 rocketmq的主从同步组件-HAService

1.主节点监听从节点的连接请求,并且建立socket连接

下面的代码是nio的监听并且建立连接的标准写法。

   public void run() {
            log.info(this.getServiceName() + " service started");
            
            while (!this.isStopped()) {
                try {
                    this.selector.select(1000);
                    Set<SelectionKey> selected = this.selector.selectedKeys();

                    if (selected != null) {
                        for (SelectionKey k : selected) {
                            if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) {
                                //监听多路复用的请求
                                SocketChannel sc = ((ServerSocketChannel) k.channel()).accept();

                                if (sc != null) {
                                    HAService.log.info("HAService receive new connection, "
                                        + sc.socket().getRemoteSocketAddress());

                                    try {
                                        //拿到与从节点的channel过后,并且凑早HAconnection
                                        HAConnection conn = new HAConnection(HAService.this, sc);
                                        conn.start();
                                        //加入到链接池connectionList中
                                        HAService.this.addConnection(conn);
                                    } catch (Exception e) {
                                        log.error("new HAConnection exception", e);
                                        sc.close();
                                    }
                                }
                            } else {
                                log.warn("Unexpected ops in select " + k.readyOps());
                            }
                        }

                        selected.clear();
                    }
                } catch (Exception e) {
                    log.error(this.getServiceName() + " service has exception.", e);
                }
            }

            log.info(this.getServiceName() + " service end");
        }
2.将commitLog通过网络连接发送至从节点-GroupTransferService
 public void run() {
            log.info(this.getServiceName() + " service started");

            while (!this.isStopped()) {
                try {
                    this.waitForRunning(10);
                    this.doWaitTransfer();
                } catch (Exception e) {
                    log.warn(this.getServiceName() + " service has exception. ", e);
                }
            }

            log.info(this.getServiceName() + " service end");
        }

3.从节点读取消息并且将其加入到自己的commitLog中-HAClient

 private boolean processReadEvent() {
            
            int readSizeZeroTimes = 0;
            while (this.byteBufferRead.hasRemaining()) {
                try {
                    //从网络中读取消息到byteBufferRead中
                    int readSize = this.socketChannel.read(this.byteBufferRead);
                    if (readSize > 0) {
                        readSizeZeroTimes = 0;
                        //将bytebuffer中的数据写入到磁盘中
                        boolean result = this.dispatchReadRequest();
                        if (!result) {
                            log.error("HAClient, dispatchReadRequest error");
                            return false;
                        }
                    } else if (readSize == 0) {
                        if (++readSizeZeroTimes >= 3) {
                            break;
                        }
                    } else {
                        log.info("HAClient, processReadEvent read socket < 0");
                        return false;
                    }
                } catch (IOException e) {
                    log.info("HAClient, processReadEvent read socket exception", e);
                    return false;
                }
            }

            return true;
        }
 private boolean dispatchReadRequest() {
            //获取到消息头大小
            final int msgHeaderSize = 8 + 4; // phyoffset + size

            while (true) {
                int diff = this.byteBufferRead.position() - this.dispatchPosition;
                //获取到完整的消息内容
                if (diff >= msgHeaderSize) {
                    //获取到master的物理偏移量
                    long masterPhyOffset = this.byteBufferRead.getLong(this.dispatchPosition);
                    //获取到消息体
                    int bodySize = this.byteBufferRead.getInt(this.dispatchPosition + 8);
                    //当前同步的最大偏移量
                    long slavePhyOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();

                    if (slavePhyOffset != 0) {
                        //如果从节点偏移量和主节点偏移量不想当便直接返回
                        if (slavePhyOffset != masterPhyOffset) {
                            log.error("master pushed offset not equal the max phy offset in slave, SLAVE: "
                                + slavePhyOffset + " MASTER: " + masterPhyOffset);
                            return false;
                        }
                    }

                    if (diff >= (msgHeaderSize + bodySize)) {
                        byte[] bodyData = byteBufferRead.array();
                        //获取到消息体开始的指针
                        int dataStart = this.dispatchPosition + msgHeaderSize;
                        //调用mappedfile的appendToCommitLog方法直接写入待commitLog中
                        HAService.this.defaultMessageStore.appendToCommitLog(
                                masterPhyOffset, bodyData, dataStart, bodySize);
                        //更新同步偏移量
                        this.dispatchPosition += msgHeaderSize + bodySize;

                        if (!reportSlaveMaxOffsetPlus()) {
                            return false;
                        }

                        continue;
                    }
                }

                if (!this.byteBufferRead.hasRemaining()) {
                    this.reallocateByteBuffer();
                }

                break;
            }

            return true;
        }

4.总结

至此,rocketmq的核心持久化主键commitLog我们便已经全部分析完成,简单而言,commitLog持有一个messageFileQueue,而mappedFileQueue对应不同的mappedFile文件,而mappedFile通过mmap技术与磁盘中的文件建立映射。而磁盘中的物理文件的每个文件的名称都是以它当前的文件的起始偏移量命名的,比如第一个文件为0.log,1个mappedfile的大小为1gb,假设第一个文件的内容刚好全部写满,所以其第二个文件的名称为1024*1024*1024.log。一有消息到来,就会向commitLog中写入文件,进行持久化,其实就是获取到最后一个mappedFile,并且调用他的appendMessage方法完成消息的写入。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1843541.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

图像数字化基础

一、像素 1、获取图像指定位置的像素 import cv2 image cv2.imread("E:\\images\\2.png") px image[291,218] print("坐标(291,218)上的像素的BGR值是&#xff1a;",px) &#xff08;1&#xff09;RGB色彩空间 R通道&#xff1a;红色通道 G通道&…

LVS(Linux Virtual Server)集群,(1)NAT模式

Cluster&#xff1a;集群&#xff0c;为了解决某个特定问题将多台计算机组合起来形成的单个系统。 集群分为三种类型&#xff1a; LB(Load Balancing)&#xff0c;负载均衡&#xff0c;多个主机组成&#xff0c;每个主机只承担一部分访问请求 HA(High Availiablity)&#xf…

网工内推 | H3C工程师,大专可投,无责底薪加提成

01 内蒙古华贸信息科技有限公司 &#x1f537;招聘岗位&#xff1a;H3C工程师 &#x1f537;岗位职责&#xff1a; 1、负责核心网络的7*24小时网络运维&#xff0c;可持续对网络进行优化&#xff0c;提供高质量的网络服务&#xff1b; 2、能够独立运维高端核心设备和楼层接入网…

Premiere Pro 关键帧的运用(光盘滚入盘盒)

制作“光盘滚入盘盒”效果&#xff0c;步骤如下&#xff1a; 1.新建项目>新建序列&#xff0c;项目名称为“光盘滚入盘盒”&#xff0c;序列设置如下图所示。 2.导入素材到项目面板中。 3.新建“颜色遮罩 ”。在项目面板中&#xff0c;右键【新建项目】-【颜色遮罩】-【视…

idea2023开发插件入门

idea2023开发插件入门 创建工程 通过 idea plugin 来创建工程 修改 开发语言 默认创建的工程是用scala开发的&#xff0c;但是我不会&#xff0c;就会java,所以改成java创建 build.gradle.kt 为 build.gradlesettings.gradle.kt 为 settings.gradle build.gradle修改为以…

css实现多行文本的展开收起

背景 在我们写需求时可能会遇到类似于这样的多行文本展开与收起的场景&#xff1a; 那么&#xff0c;如何通过纯css实现这样的效果呢&#xff1f; 实现的难点 &#xff08;1&#xff09;位于多行文本右下角的 展开收起按钮。 &#xff08;2&#xff09;展开和收起两种状态的…

Linux--视频推流及问题

方案一&#xff1a; mjpg-streamer,它运行在ARM板上 在手机上使用浏览器直接观看视频 方案二&#xff1a; 推流端&#xff08;Fmpeg&#xff09;--rtmp-->Nginx&#xff08;流媒体服务器&#xff09;--rtmp/httpflv/hls-->浏览器、播放器 此篇文章记录方案二的具体细…

理解HTTP请求格式

HTTP概念 HTTP全称HyperTextTransfer Protocol(超文本传输协议)是一种用于分布式、协作式和超媒体信息系统的应用层协议&#xff1b;HTTP是一个客户端&#xff08;用户&#xff09;和服务端&#xff08;网站&#xff09;之间请求和响应的标准。 HTTP 协议是以 ASCII 码传输&…

IDEA 运行 ‘xxx‘ 时出错. 命令行过长. 通过 JAR 清单或通过类路径文件缩短命令行,然后重新运行.

IDEA 运行 ‘xxx’ 时出错. 命令行过长. 通过 JAR 清单或通过类路径文件缩短命令行&#xff0c;然后重新运行. 目录 IDEA 运行 xxx 时出错. 命令行过长. 通过 JAR 清单或通过类路径文件缩短命令行&#xff0c;然后重新运行.报错解决方案jar清单jar清单 报错原因&#xff1a;1.使…

可预约上门服务的在线DIY预约小程序源码系统 带完整的安装代码包以及搭建教程

系统概述 这款可预约上门服务的在线 DIY 预约小程序源码系统是为满足各类上门服务需求而设计的。它允许用户通过小程序方便地预约各种服务&#xff0c;如家政服务、维修服务、美容美发服务等。同时&#xff0c;商家可以在后台管理系统中方便地管理预约信息、服务项目、员工信息…

数据链路层知识分享【计算机网络】【以太网帧 | MTU的影响 | ARP技术】

博客主页&#xff1a;花果山~程序猿-CSDN博客 文章分栏&#xff1a;Linux_花果山~程序猿的博客-CSDN博客 关注我一起学习&#xff0c;一起进步&#xff0c;一起探索编程的无限可能吧&#xff01;让我们一起努力&#xff0c;一起成长&#xff01; 目录 前文 一&#xff0c; 以…

贝叶斯优化、高斯过程相关概念总结

目录 贝叶斯优化 高斯过程 采集函数 贝叶斯优化 贝叶斯优化 &#xff5c; 黑盒优化全局最优方法 &#xff5c; Bayesian Optimization_哔哩哔哩_bilibili 贝叶斯优化用于解决寻找某个函数的最大值/最小值&#xff0c;在自变量维度比较小时(<20)表现的非常好。 适用…

嵌入式系统软件开发环境_2.一般架构

1.Eclipse框架 嵌入式系统软件开发环境是可帮助用户开发嵌入式软件的一组工具的集合&#xff0c;其架构的主要特征离不开“集成”问题&#xff0c;采用什么样的架构框架是决定开发环境优劣主要因素。Eclipse框架是当前嵌入式系统软件开发环境被普遍公认的一种基础环境框架。目…

OCC介绍及框架分析

1.OCC介绍 Open CASCADE &#xff08;简称OCC&#xff09;是一开源的几何造型引擎&#xff0c;OCCT库是由Open CASCADE公司开发和市场运作的。它是为开源社区比较成熟的基于BREP结构的建模引擎&#xff0c;能够满足二维三维实体造型和曲面造型&#xff0c;国内研究和使用它的单…

NodeJs 连接本地 mySql 数据库获取数据

写在前面 今天把 nodejs 连接本地数据库的坑简单的踩一下&#xff0c;为后续写接口做个铺垫 安装 mySql &#xff08;mac举例子&#xff09; 安装地址 安装完成大概这个样子&#xff0c;起动起来就行 安装本地数据库连接工具&#xff08;navicat举例子&#xff09; 安装地…

RISC_CPU模块的调试

代码&#xff1a; cpu.v include "clk_gen.v" include "accum.v" include "adr.v" include "alu.v" include "machine.v" include "counter.v" include "machinectl.v" include "register.v&quo…

探索Linux命令的新利器:linux-command

在Linux操作系统中&#xff0c;熟练掌握各种命令是成为一名高效开发者或管理员的关键。然而&#xff0c;即使是经验丰富的用户&#xff0c;有时也会遇到命令用法不熟悉或者记忆模糊的情况。这时&#xff0c;一个功能强大的命令搜索工具就显得格外重要。最近在逛github的时候正好…

Qemu虚拟机在线迁移到VMware

libvirt版本&#xff1a;libvirt-10.0.0qemu版本&#xff1a;qemu-8.2.0 在生产环境中&#xff0c;大多数的场景是 vmware 虚拟机迁移到 qemu 环境&#xff0c;一般是通过关机然后导出、导入磁盘镜像来实现。 如果要将 qemu 环境虚拟机迁移到 vmware 怎么办呢&#xff1f;要求…

Ruby on Rails Post项目设置网站初始界面

在构建了Ruby的Web服务器后&#xff0c;第三步就可以去掉框架的官方页面&#xff0c;设置自己的网页初始页了。 Linux系统安装Ruby语言-CSDN博客 、在Ubuntu中创建Ruby on Rails项目并搭建数据库-CSDN博客、 Ruby语言建立Web服务器-CSDN博客 了解Ruby onRails项目中的主要文件…

go语言对接S3存储的SDK(支持minio和OSS)

背景 在某个项目中&#xff0c;客户要求支持S3协议的存储&#xff0c;因为之前的项目是go来开发的支持的oss和minio 。 但并不一定支持S3的协议&#xff0c;而且使用了二种SDK&#xff0c;感觉比较麻烦。 既然客户提出来了要求。那我们改一下就是了。 操作 引入 go语言中有对…