RocketMQ Broker消息处理流程剩余源码解析

news2025/1/12 16:11:43

🍊 Java学习:Java从入门到精通总结

🍊 深入浅出RocketMQ设计思想:深入浅出RocketMQ设计思想

🍊 绝对不一样的职场干货:大厂最佳实践经验指南


📆 最近更新:2023年3月4日

🍊 个人简介:通信工程本硕 for NJU💪、Java程序员🌕。做过科研paper,发过专利,优秀的程序员不应该只是CRUD

🍊 点赞 👍 收藏 ⭐留言 📝 都是我最大的动力!


文章目录

  • DefaultMessageStore
  • CommitLog
  • MappedFileQueue
  • MappedFile
  • submitFlushRequest

DefaultMessageStore

接上文,SendMessageProcessor对象接收到消息之后,会把消息变成存储对象DefaultStoreMessage实例:
在这里插入图片描述
DefaultMessageStore的默认存储消息的方法asyncPutMessage如下:

@Override
public CompletableFuture<PutMessageResult> asyncPutMessage(MessageExtBrokerInner msg) {
    PutMessageStatus checkStoreStatus = this.checkStoreStatus();
    if (checkStoreStatus != PutMessageStatus.PUT_OK) {
        return CompletableFuture.completedFuture(new PutMessageResult(checkStoreStatus, null));
    }

    PutMessageStatus msgCheckStatus = this.checkMessage(msg);
    if (msgCheckStatus == PutMessageStatus.MESSAGE_ILLEGAL) {
        return CompletableFuture.completedFuture(new PutMessageResult(msgCheckStatus, null));
    }

    long beginTime = this.getSystemClock().now();
    CompletableFuture<PutMessageResult> putResultFuture = this.commitLog.asyncPutMessage(msg);

    putResultFuture.thenAccept((result) -> {
        long elapsedTime = this.getSystemClock().now() - beginTime;
        if (elapsedTime > 500) {
            log.warn("putMessage not in lock elapsed time(ms)={}, bodyLength={}", elapsedTime, msg.getBody().length);
        }
        this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime);

        if (null == result || !result.isOk()) {
            this.storeStatsService.getPutMessageFailedTimes().incrementAndGet();
        }
    });

    return putResultFuture;
}

整个存储流程的代码还是比较清晰的:

  1. 先判断消息是否投放成功 及 检查消息的格式
  2. 上述检查都没问题时就会把消息存储在CommitLog
CompletableFuture<PutMessageResult> putResultFuturethis.commitLog.asyncPutMessage(msg);

CommitLog

Broker接收到消息后,最终消息存储在Commitlog对象中,调用的是CommitLogputMessage方法

 public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) {
        // Set the storage time
        msg.setStoreTimestamp(System.currentTimeMillis());
        // Set the message body BODY CRC (consider the most appropriate setting
        // on the client)
        msg.setBodyCRC(UtilAll.crc32(msg.getBody()));
        // Back to Results
        AppendMessageResult result = null;

        StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();

        String topic = msg.getTopic();
        int queueId = msg.getQueueId();
		
		// 延时消息处理,事务为TRANSACTION_PREPARED_TYPE 和 TRANSACTION_ROLLBACK_TYPE 消息不支持延时投递
        final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
        if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
                || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
            // Delay Delivery
            if (msg.getDelayTimeLevel() > 0) {
                if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
                    msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
                }
				// 存储消息时,延时消息进入SCHEDULE_TOPIC_XXXX的主题下
                topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;
                // 消息队列编号 = 延迟级别 - 1
                queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());

                // Backup real topic, queueId
                MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
                MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
                msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));

                msg.setTopic(topic);
                msg.setQueueId(queueId);
            }
        }

        InetSocketAddress bornSocketAddress = (InetSocketAddress) msg.getBornHost();
        if (bornSocketAddress.getAddress() instanceof Inet6Address) {
            msg.setBornHostV6Flag();
        }

        InetSocketAddress storeSocketAddress = (InetSocketAddress) msg.getStoreHost();
        if (storeSocketAddress.getAddress() instanceof Inet6Address) {
            msg.setStoreHostAddressV6Flag();
        }

        PutMessageThreadLocal putMessageThreadLocal = this.putMessageThreadLocal.get();
        PutMessageResult encodeResult = putMessageThreadLocal.getEncoder().encode(msg);
        if (encodeResult != null) {
            return CompletableFuture.completedFuture(encodeResult);
        }
        msg.setEncodedBuff(putMessageThreadLocal.getEncoder().encoderBuffer);
        PutMessageContext putMessageContext = new PutMessageContext(generateKey(putMessageThreadLocal.getKeyBuilder(), msg));

        long elapsedTimeInLock = 0;
        MappedFile unlockMappedFile = null;

		// 加锁,同一时刻只能有一个线程put消息
        putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
        try {
            MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
            long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
            this.beginTimeInLock = beginLockTimestamp;

            // Here settings are stored timestamp, in order to ensure an orderly
            // global
            msg.setStoreTimestamp(beginLockTimestamp);
			// 只有不存在映射文件或文件已存满,才进行创建
            if (null == mappedFile || mappedFile.isFull()) {
                mappedFile = this.mappedFileQueue.getLastMappedFile(0); 
            }
            if (null == mappedFile) {
                log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
                beginTimeInLock = 0;
                return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null));
            }
			// 追加消息至MappedFile的缓存中,更新写入位置wrotePosition
            result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext);
            switch (result.getStatus()) {
                case PUT_OK:
                    break;
                // 当文件剩余空间不足时,创建新的MappedFile并写入
                case END_OF_FILE: 
                    unlockMappedFile = mappedFile;
                    // Create a new file, re-write the message
                    mappedFile = this.mappedFileQueue.getLastMappedFile(0);
                    if (null == mappedFile) {
                        // XXX: warn and notify me
                        log.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
                        beginTimeInLock = 0;
                        return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result));
                    }
                    result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext);
                    break;
                case MESSAGE_SIZE_EXCEEDED:
                case PROPERTIES_SIZE_EXCEEDED:
                    beginTimeInLock = 0;
                    return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result));
                case UNKNOWN_ERROR:
                    beginTimeInLock = 0;
                    return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result));
                default:
                    beginTimeInLock = 0;
                    return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result));
            }

            elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;
            beginTimeInLock = 0;
        } finally {
        	// 释放锁
            putMessageLock.unlock();
        }

        /** log **/

        if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
            this.defaultMessageStore.unlockMappedFile(unlockMappedFile);
        }

        PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);

        // Statistics
        storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).incrementAndGet();
        storeStatsService.getSinglePutMessageTopicSizeTotal(topic).addAndGet(result.getWroteBytes());

        CompletableFuture<PutMessageStatus> flushResultFuture = submitFlushRequest(result, msg);
        CompletableFuture<PutMessageStatus> replicaResultFuture = submitReplicaRequest(result, msg);

		// 异步刷盘流程
        return flushResultFuture.thenCombine(replicaResultFuture, (flushStatus, replicaStatus) -> {
            if (flushStatus != PutMessageStatus.PUT_OK) {
                putMessageResult.setPutMessageStatus(flushStatus);
            }
            if (replicaStatus != PutMessageStatus.PUT_OK) {
                putMessageResult.setPutMessageStatus(replicaStatus);
                if (replicaStatus == PutMessageStatus.FLUSH_SLAVE_TIMEOUT) {
                    log.error("do sync transfer other node, wait return, but failed, topic: {} tags: {} client address: {}",
                            msg.getTopic(), msg.getTags(), msg.getBornHostNameString());
                }
            }
            return putMessageResult;
        });
    }

消息在CommitLog文件中是顺序存储的

RocketMQ消息存储在CommitLog文件里,最终落盘,对应的类为MappedFile,它是从MappedFileQueue中获取的,如果对象不存在,就会创建:

MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();

// 只有不存在映射文件或文件已存满,才进行创建
if (null == mappedFile || mappedFile.isFull()) {
    mappedFile = this.mappedFileQueue.getLastMappedFile(0); 
}

创建(获取)完成对象之后就会把消息插入到mappedFile里,如果文件放不下了,则会重新创建一个mappedFile来对其进行写入,最后就是使用异步Future的方式把消息持久化到磁盘上。


MappedFileQueue

上面的源码里首先从MappedFileQueue映射队列尾部中获取MappedFile对象:

public MappedFile getLastMappedFile() {
        MappedFile mappedFileLast = null;

        while (!this.mappedFiles.isEmpty()) {
            try {
                mappedFileLast = this.mappedFiles.get(this.mappedFiles.size() - 1);
                break;
            } catch (IndexOutOfBoundsException e) {
                //continue;
            } catch (Exception e) {
                log.error("getLastMappedFile has exception.", e);
                break;
            }
        }

        return mappedFileLast;
    }

MappedFile对象为空时,表示MappedFile对象不存在,此时就需要重新创建一个MappedFile对象,相应的方法在MappedFileQueue

public class MappedFileQueue {
    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
    private static final InternalLogger LOG_ERROR = InternalLoggerFactory.getLogger(LoggerName.STORE_ERROR_LOGGER_NAME);
	
	// 批量删除文件上限
    private static final int DELETE_FILES_BATCH_MAX = 10;
	// 目录
    private final String storePath;
	// 每个映射文件的大小
    private final int mappedFileSize;
	// 映射文件数组
    private final CopyOnWriteArrayList<MappedFile> mappedFiles = new CopyOnWriteArrayList<MappedFile>();
	// 分配MappedFile服务
    private final AllocateMappedFileService allocateMappedFileService;
	// 最后flush到的offset
    private long flushedWhere = 0;
    // 最后commit到的offset
    private long committedWhere = 0;
	// 最后保存的时间戳
	private volatile long storeTimestamp = 0;
	
	/**
	*	获取最后一个可写入的映射文件
	*	当最后一个文件已经满的时候,创建一个新的文件
	*/
	public MappedFile getLastMappedFile(final long startOffset, boolean needCreate) {
        long createOffset = -1;
        MappedFile mappedFileLast = getLastMappedFile();
		
		// 不存在映射文件
        if (mappedFileLast == null) {
        	// 计算从哪个offset开始
            createOffset = startOffset - (startOffset % this.mappedFileSize);
        }
		
		// 最后一个文件已满
        if (mappedFileLast != null && mappedFileLast.isFull()) {
            createOffset = mappedFileLast.getFileFromOffset() + this.mappedFileSize;
        }
		
		// 创建文件
        if (createOffset != -1 && needCreate) {
            String nextFilePath = this.storePath + File.separator + UtilAll.offset2FileName(createOffset);
            String nextNextFilePath = this.storePath + File.separator
                + UtilAll.offset2FileName(createOffset + this.mappedFileSize);
            MappedFile mappedFile = null;

            if (this.allocateMappedFileService != null) {
                mappedFile = this.allocateMappedFileService.putRequestAndReturnMappedFile(nextFilePath,
                    nextNextFilePath, this.mappedFileSize);
            } else {
                try {
                    mappedFile = new MappedFile(nextFilePath, this.mappedFileSize);
                } catch (IOException e) {
                    log.error("create mappedFile exception", e);
                }
            }

            if (mappedFile != null) {
                if (this.mappedFiles.isEmpty()) {
                    mappedFile.setFirstCreateInQueue(true);
                }
                this.mappedFiles.add(mappedFile);
            }

            return mappedFile;
        }

        return mappedFileLast;
    }

    public MappedFile getLastMappedFile(final long startOffset) {
        return getLastMappedFile(startOffset, true);
    }
}

当获取到的“最后一个”MappedFile不存在或文件已满时,则新建一个,并计算新文件的createOffset

MappedFile

CommitLog的代码中,最终把消息追加到MappedFile文件的缓冲区中,同时更新其写入位置writePosition,但是还没有刷盘:

result = mappedFile.appendMessages(messageExtBatch, this.appendMessageCallback, putMessageContext);

其中调用了MappedFileappendMessage方法实现添加消息到消息映射文件:

public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb,
            PutMessageContext putMessageContext) {
        assert messageExt != null;
        assert cb != null;

        int currentPos = this.wrotePosition.get();

        if (currentPos < this.fileSize) {
        	// 缓冲区
            ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice();
            byteBuffer.position(currentPos);
            AppendMessageResult result;
            // 判断是否是批量操作
            if (messageExt instanceof MessageExtBrokerInner) {
                result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos,
                        (MessageExtBrokerInner) messageExt, putMessageContext);
            } else if (messageExt instanceof MessageExtBatch) {
                result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos,
                        (MessageExtBatch) messageExt, putMessageContext);
            } else {
                return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
            }
            // 更新写入位置 + 写入偏移量
            this.wrotePosition.addAndGet(result.getWroteBytes());
            this.storeTimestamp = result.getStoreTimestamp();
            return result;
        }
        log.error("MappedFile.appendMessage return null, wrotePosition: {} fileSize: {}", currentPos, this.fileSize);
        return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
    }

通过源码可以看到,实际上是吧消息放入ByteBuffer,同时更新写入位置和偏移量


submitFlushRequest

提交刷盘请求的源码如下:

public CompletableFuture<PutMessageStatus> submitFlushRequest(AppendMessageResult result, MessageExt messageExt) {
        // 异步flush
        if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
            final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
            if (messageExt.isWaitStoreMsgOK()) {
                GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes(),
                        this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
                service.putRequest(request);
                return request.future();
            } else {
                service.wakeup();
                return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
            }
        }
        // 同步flush
        else {
            if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
                flushCommitLogService.wakeup();	// 异步,使用MappedByteBuffer(默认策略)
            } else  {
                commitLogService.wakeup();	// 异步,使用字节缓冲区 + FileChannel
            }
            return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
        }
    }

CommitLog文件刷盘是由FlushCommitLogService服务具体执行,一共有两种刷盘策略:

  1. FlushDiskType.SYNC_FLUSH:同步刷盘
  2. FlushDiskType.ASYNC_FLUSH:异步刷盘

同步刷盘的情况下,必须要等到数据刷盘成功后才会有返回结果;如果是异步刷盘,只需要把消息放入内存之后即可返回。

上述策略可以在broker.conf配置文件中进行配置

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/388162.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

CCNP350-401学习笔记(补充题目1-100)

1、wireless client to roam --->> wireless client 2、Cisco aWIPS policies on the WLC 3、 import json -->> while true -->>except -->> File open -->>File.close() -->> File.open() 4、 login console group radius 5、undesir…

大数据框架之Hadoop:MapReduce(七)MapReduce扩展案例

一、倒排索引案例&#xff08;多job串联&#xff09; 1、需求 有大量的文本&#xff08;文档、网页&#xff09;&#xff0c;需要建立搜索索引&#xff0c;如图4-31所示。 &#xff08;1&#xff09;数据输入 &#xff08;2&#xff09;期望输出数据 atguigu c.txt–>2…

VS2022+Qt5.14.2成功编译MITK2022.10

目录 一 编译结果 二 编译问题解决 三 参考链接 一 编译结果 二 编译问题解决 error C2220错误 1> mitkSlicedGeometry3D.cpp 1>D:\MITK\src\MITK-2022.10\Modules\Core\src\DataManagement\mitkSlicedGeometry3D.cpp(1,1): error C2220: 以下警告被视为错误 [D:\MI…

能代替try catch处理异常的优雅方式

前言软件开发过程中&#xff0c;不可避免的是需要处理各种异常&#xff0c;就我自己来说&#xff0c;至少有一半以上的时间都是在处理各种异常情况&#xff0c;所以代码中就会出现大量的try {…} catch {…} finally {…} 代码块&#xff0c;不仅有大量的冗余代码&#xff0c;而…

【办公类-19-03】办公中的思考——Python批量统一文件名的序号(保教主任整理打印文件)

背景需求&#xff1a;为迎接督导检查&#xff0c;保教主任从各条线收集文本资料。并在每个文件名称前手动编号。但是她嘀咕道&#xff1a;”为什么两套资料放在一个文件里就不是按照数字序号排序&#xff1f;&#xff0c;有的是1X-&#xff0c;有的是40X&#xff0c;看起来很乱…

20230304 CF855 div3 vp

Dashboard - Codeforces Round 855 (Div. 3) - Codeforces呃呃&#xff0c;评价是&#xff0c;毫无进步呃呃呃呃呃呃呃呃呃呃呃呃呃呃呃呃呃该加训了该加训了该加训了该加训了该加训了该加训了该加训了该加训了该加训了该加训了该加训了该加训了该加训了该加训了该加训了该加训…

有关平方或高次方的公式整理一元高次方程的求解

Part.I Introduction 这篇博文记录一下数学中常用的有关平方或高次方的一些公式。 Chap.I 一些结论 下面一部分汇总了一些重要的结论 完全平方公式&#xff1a;(ab)2a22abb2(ab)^2a^22abb^2(ab)2a22abb2平方差公式&#xff1a;a2−b2(ab)(a−b)a^2-b^2(ab)(a-b)a2−b2(ab)(…

Spring——Bean管理-注解方式进行属性注入

Spring针对Bean管理中创建对象提供的注解有哪些&#xff1f;Component&#xff1a;普通Service&#xff1a;业务逻辑层Controller&#xff1a;controller层Repository&#xff1a;dao层用注解的方式是为什么&#xff1f;简化xml方式开发&#xff0c;只需要注解就可以完成在配置…

【Redis场景5】集群秒杀优化-分布式锁

集群环境下的秒杀问题 前序 【Redis场景1】用户登录注册 【Redis场景2】缓存更新策略(双写一致) 【Redis场景3】缓存穿透、击穿问题 【Redis场景拓展】秒杀问题-全局唯一ID生成策略 【Redis场景4】单机环境下秒杀问题 在单机环境下的并发问题&#xff0c;我们可以使用相关…

39. 实战:基于api接口实现视频解析播放(32接口,窗口化操作,可导出exe,附源码)

目录 前言 目的 思路 代码实现 需要导入的模块 1. 导入解析网站列表&#xff0c;实现解析过程 2. 设计UI界面 3. 设置窗口居中和循环执行 4. 注意事项 完整源码 运行效果 总结 前言 本节将类似34. 实战&#xff1a;基于某api实现歌曲检索与下载&#xff08;附完整…

SpringCloud:Nacos的安装(Windows,Linux)

目录 一、认识和安装Nacos 1、下载 2、点击进入Github&#xff0c;进入Releases 3、点击Tags 4、解压&#xff08;Windows版&#xff09; 5、端口配置 6、启动 7、访问 二、Linux系统安装Nacos 1、打开虚拟机&#xff0c;使用xshell连接虚拟机&#xff0c;Nacos依赖于…

JVM内置锁synchronized关键字详解

目录 JVM内置锁synchronized关键字详解 设计同步器的意义 如何解决线程并发安全问题&#xff1f; synchronized原理详解 synchronized底层原理 synchronized在jdk1.6前后的变化【重点】 jdk小于1.6时 jdk>1.6时 轻量级锁何时升级为重量级锁&#xff1f;&#xff1f;…

【ROS学习笔记10】ROS中配置自定义Cpp头文件和导入自定义Python库

【ROS学习笔记10】ROS中配置自定义Cpp头文件和导入自定义Python库 文章目录【ROS学习笔记10】ROS中配置自定义Cpp头文件和导入自定义Python库一、ROS中的头文件和源文件1.1 自定义头文件调用1.2 自定义源文件调用二、Python模块的导入Reference写在前面&#xff0c;本系列笔记参…

springBoot 启动指定配置文件环境多种方案

springBoot 启动指定配置文件环境理论上是有多种方案的&#xff0c;一般都是结合我们的实际业务选择不同的方案&#xff0c;比如&#xff0c;有pom.xml文件指定、maven命令行指定、配置文件指定、启动jar包时指定等方案&#xff0c;今天我们一一分享一下&#xff0c;以供参考&a…

Java知识复习(十二)Docker

1、容器 一句话概括容器&#xff1a;容器就是将软件打包成标准化单元&#xff0c;以用于开发、交付和部署容器镜像是轻量的、可执行的独立软件包 &#xff0c;包含软件运行所需的所有内容&#xff1a;代码、运行时环境、系统工具、系统库和设置。容器化软件适用于基于 Linux 和…

Redis学习(一):NoSQL概述

为什么要使用Nosql 现在是大数据时代&#xff0c;过大的数据一般的数据库无法进行分析处理了。 单机MySQL的年代 90年代&#xff0c;一个基本的网站访问量一般不会太大&#xff0c;单个数据库完全足够&#xff01; 那个时候&#xff0c;更多的去使用静态网站&#xff0c;服务器…

TD算法超详细解释,一篇文章看透彻!

【已解决】TD算法超详细解释和实现&#xff08;Sarsa&#xff0c;n-step Sarsa&#xff0c;Q-learning&#xff09;一篇文章看透彻&#xff01; 郑重声明&#xff1a;本系列内容来源 赵世钰(Shiyu Zhao)教授的强化学习数学原理系列&#xff0c;本推文出于非商业目的分享个人学习…

DockerFile创建及案例

DockerFile dockerfile是用来构建docker镜像的文件&#xff0c;命令脚本参数脚本&#xff01; 构建步骤 编写一个dockerfile文件docker build 构建成为一个对象docker run 运行镜像docker push 发布镜像&#xff08;DockerHub、阿里云镜像仓库&#xff09; 去官网Docker-Hub…

51单片机——串口通信,小白讲解,相互学习

通讯的基本概念 51单片机不仅可以实现串口通信&#xff0c;还可以通过IO口模拟实现多种其他通信&#xff0c;比如 SPI&#xff0c;IIC等&#xff0c;学习这些通信前&#xff0c;我们很有必要了解下通信的基本概念。通信的方式可以分为多种&#xff0c;按照数据传输方式可分为串…

MySQL——复合查询+表的内外连接

文章目录复合查询基本查询多表查询自连接子查询1、单行子查询2、多行子查询3、多列子查询4、在from子句中使用子查询&#x1f60a;5、合并查询结果unionunion all表的内连和外连内连接外连接左外连接右外连接复合查询 接下来我们就要进行的是多张表里进行查询操作&#xff0c;…