(Kafka源码五)Kafka服务端处理消息

news2025/4/28 2:04:50

Kafka 服务端(Broker)采用 Reactor 的架构思想,通过1 个 Acceptor,N 个 Processor(N默认为3),M 个 KafkaRequestHandler(M默认为8),来处理客户端请求,这种模式结合了多线程和事件驱动的设计,优点是能够有效地利用系统资源,可以实现高效地处理请求,无需为每个连接或请求创建新的线程,减少了线程上下文切换的开销,以实现高并发和高吞吐量。

文章目录

    • 服务端整体架构
      • 执行流程
    • 源码剖析
      • Acceptor 线程处理
      • Processor 线程处理
      • KafkaRequestHandlerPool 处理请求

服务端整体架构

在这里插入图片描述

Kafka 服务端的网络结构主要包含以下三层:

  • 网络连接层:Acceptor 线程接收客户端的连接请求并创建网络连接。
  • 请求转发层:Acceptor 线程以轮询的方式分发给Processor 线程,从而实现负载均衡的效果,Processor
    线程将请求放到请求队列中。
  • 请求处理层:KafkaRequestHandler线程不断地从请求队列中获取请求,解析请求,调用KafkaAPIs获取对应的操作结果,并将结果返回给客户端。

执行流程

  • Acceptor 线程在初始化的时候会往selector注册 OP_ACCEPT事件,表示可以接受客户端的连接请求,当客户端有请求连接过来时,根据selectionkey可以得到socketChannel,再将socketChannel以轮询的方式交给Processor线程(默认有3个Processor线程)处理。
  • Processor线程收到Acceptor线程分发的连接后,会先将连接放入自己的队列newConnections中,然后在selector注册OP_READ事件,表示可以读取客户端的请求,当客户端发送消息过来时,Processor线程就会处理OP_READ事件,然后Processor线程会将客户端的请求连接放入requestChannel的RequestQueue(请求队列被所有Processor线程共享)里并取消OP_READ事件的监听
  • KafkaRequestHandler线程(默认会创建8个线程)会从RequestQueue取出请求进行处理,通过KafkaApis调用得到响应结果,将处理后的响应结果放入responseQueues中(每个Processor线程对应一个responseQueues)。
  • Processor线程往selector注册OP_WRITE事件,表示可以将响应结果发送给客户端,当Processor线程检测到有OP_WRITE事件时,Processor线程就会从对应的responseQueues中取出响应结果,并通过selector.poll()方法将响应结果发送给对应的客户端且取消OP_WRITE事件的监听,最后Processor线程就会重新注册OP_READ事件,准备下一个请求的处理。

源码剖析

服务端的核心代码都在kafka.scala这个类,首先是main入口方法,该方法主要设置参数,然后调用启动方法

  def main(args: Array[String]): Unit = {
    try {
      //启动服务端的时候,在这里解析参数
      val serverProps = getPropsFromArgs(args)
      val kafkaServerStartable = KafkaServerStartable.fromProps(serverProps)
      //启动的核心代码方法
      kafkaServerStartable.startup
 		//...
  }

kafka的服务端核心方法都在startup()里面

  def startup() {
      //启动服务
      server.startup()
    //...
  }

创建SocketServer,startup启动后,会创建Acceptor线程和三个Processor线程并启动

//Kafka 服务端的功能 都是在这里面实现
def startup() {
 	 //创建NIO的服务端
        socketServer = new SocketServer(config, metrics, kafkaMetricsTime)
        socketServer.startup()
}
    
 def startup() {
    this.synchronized {
      // 设置发送和接收的缓冲区大小
      val sendBufferSize = config.socketSendBufferBytes
      val recvBufferSize = config.socketReceiveBufferBytes
      //获取当前broker主机id
      val brokerId = config.brokerId
      var processorBeginIndex = 0
      //endpoints表示Kafka配置文件config/server.properties中的信息
      //正常情况下,只有一个服务实例
      endpoints.values.foreach { endpoint =>
        val protocol = endpoint.protocolType
        //processorEndIndex = 0 + 3
        val processorEndIndex = processorBeginIndex + numProcessorThreads
        //创建了三个Processor的线程
        for (i <- processorBeginIndex until processorEndIndex)
         //默认新建3个Processor线程
          processors(i) = newProcessor(i, connectionQuotas, protocol)
         
        //Acceptor类的主构造函数会启动3个Processor线程
        val acceptor = new Acceptor(endpoint, sendBufferSize, recvBufferSize, brokerId,
          processors.slice(processorBeginIndex, processorEndIndex), connectionQuotas)

        acceptors.put(endpoint, acceptor)
        // Utils是线程工具类,启动acceptor线程,
        Utils.newThread("kafka-socket-acceptor-%s-%d".format(protocol.toString, endpoint.port), acceptor, false).start()
        acceptor.awaitStartup()

        processorBeginIndex = processorEndIndex
      }
    }
  }

Acceptor 线程处理

Utils.newThread启动acceptor线程的start()方法后,就会执行该线程的run方法

  • 首先 serverChannelnioSelector 注册 OP_ACCEPT 事件,nioSelector就会监听serverChannel是否有新的连接请求

  • 若有新的连接请求到来,根据该连接的key创建 SocketChannel,然后通过轮询的方式分发给不同的 processors线程处理,从而保证processor线程的负载均衡。

  def run() {
      //ServerChannel往Selector注册OP_ACCEPT事件,表示可以接收客户端的请求,
      //Selector就会检查ServerChannel是否有新的请求到达
    serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT)

    startupComplete()
    try {
      var currentProcessor = 0
      //死循环,不断轮询
      while (isRunning) {
        try {
          //selecotr 查看是否有新的注册事件
          val ready = nioSelector.select(500)
         //大于0,说明有新事件到来
          if (ready > 0) {
            //获取事件的key
            val keys = nioSelector.selectedKeys()
            //遍历注册的所有key
            val iter = keys.iterator()
            while (iter.hasNext && isRunning) {
              try {
                val key = iter.next
                //遍历完就删除
                iter.remove()
                //如果事件是OP_ACCEPT,就会调用accept()方法接收请求
                if (key.isAcceptable)
                 // 创建SocketChannel,将其分发给Processor线程处理
                  accept(key, processors(currentProcessor))
                else
                  throw new IllegalStateException("Unrecognized key state for acceptor thread.")
                 // 轮询遍历下一个Processor线程
                currentProcessor = (currentProcessor + 1) % processors.length
              } catch {
                case e: Throwable => error("Error while accepting connection", e)
              }
            }
          }
        }

  }

根据key封装socketChannel,分发给processor线程处理,processor线程将socketChannel放入自己的队列newConnections中,该队列是由ConcurrentLinkedQueue实现的队列,然后唤醒processorselector处理

def accept(key: SelectionKey, processor: Processor) {
    //根据SelectionKey获取到serverSocketChannel
    val serverSocketChannel = key.channel().asInstanceOf[ServerSocketChannel]
    //获取到一个socketChannel
    val socketChannel = serverSocketChannel.accept()
    try {
      connectionQuotas.inc(socketChannel.socket().getInetAddress)
      socketChannel.configureBlocking(false)
      socketChannel.socket().setTcpNoDelay(true)
      socketChannel.socket().setKeepAlive(true)
      // processor调用accept方法对socketChannel进行处理
      processor.accept(socketChannel)
    }
  }
 def accept(socketChannel: SocketChannel) {
    //将接收的 SocketChannel放入到自己的队列
    newConnections.add(socketChannel)
    // 唤醒 Processor 的 selector 进行处理
    wakeup()
  }

Processor 线程处理

在上面的startup()中已经创建了3个Processor线程,然后在Acceptor的主构造函数中进行启动

//主构造函数,new出来的时候会被运行
private[kafka] class Acceptor(val endPoint: EndPoint,
                              val sendBufferSize: Int,
                              val recvBufferSize: Int,
                              brokerId: Int,
                              processors: Array[Processor],
                              connectionQuotas: ConnectionQuotas)
  extends AbstractServerThread(connectionQuotas) with KafkaMetricsGroup {
  this.synchronized {
    //启动在startup()创建的3个Processor线程
    processors.foreach { processor =>
      Utils.newThread("kafka-network-thread-%d-%s-%d".format(brokerId, endPoint.protocolType.toString, processor.id), processor, false).start()
    }
  }

Processor启动之后就会执行run方法

  override def run() {
    startupComplete()
    while (isRunning) {
      try {
        //读取队列中的每个SocketChannel,都往Selector上面注册OP_READ事件
        configureNewConnections()
        //处理响应,并注册OP_WRITE事件
        processNewResponses()
        //读取和发送请求的代码应该都是在这个方法完成,用于处理OP_READ事件与OP_WRITE事件
        poll()
        //处理接收到的新请求,将这些请求放入requestChannel请求队列中并取消OP_READ事件
        processCompletedReceives()
        //处理已经发送出去的响应并重新监听OP_READ事件
        processCompletedSends()
        processDisconnected()
      } 
    swallowError(closeAll())
    shutdownComplete()
  }

不断获取连接队列里所有的SocketChannel,解析参数得到ConnectionId,再往selector注册OP_READ事件,注册之后就可以读取客户端的请求。

private def configureNewConnections() {
//当连接队列不为空
    while (!newConnections.isEmpty) {
      //不断获取连接队列里面的SocketChannel
      val channel = newConnections.poll()
      try {
         //解析SocketChannel,获取对应的参数
        val localHost = channel.socket().getLocalAddress.getHostAddress
        val localPort = channel.socket().getLocalPort
        val remoteHost = channel.socket().getInetAddress.getHostAddress
        val remotePort = channel.socket().getPort
        val connectionId = ConnectionId(localHost, localPort, remoteHost, remotePort).toString
        //往selector注册OP_READ事件
        selector.register(connectionId, channel)
      } 
    }
  }

从这段代码可以知道kafka对SocketChannel进行了封装,封装成KakaChannel,并将SelectionKey和KakaChannel进行二者的绑定,除此之外,Kafka还实现了channel复用,将connectionId和KakaChannel放入map中,避免每次发起请求都新建channel,减少了资源的消耗

  public void register(String id, SocketChannel socketChannel) throws ClosedChannelException {
        //往自己的Selector上面注册OP_READ事件
        SelectionKey key = socketChannel.register(nioSelector, SelectionKey.OP_READ);
        //kafka对SocketChannel进行了封装,封装成KakaChannel
        KafkaChannel channel = channelBuilder.buildChannel(id, key, maxReceiveSize);
        //将key和channel进行绑定
        key.attach(channel);
        //channels护了多个网络连接,实现channel复用
        this.channels.put(id, channel);
    }

将客户端的请求放入请求队列中,并取消OP_READ事件

  private def processCompletedReceives() {
    //遍历每一个请求
    selector.completedReceives.asScala.foreach { receive =>
      try {
        val channel = selector.channel(receive.source)
        val session = RequestChannel.Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, channel.principal.getName),
          channel.socketAddress)
        //对于获取到的请求进行解析,得到request
        val req = RequestChannel.Request(processor = id, connectionId = receive.source, session = session, buffer = receive.payload, startTimeMs = time.milliseconds, securityProtocol = protocol)
        //将request放入请求队列中
        requestChannel.sendRequest(req)
        //取消OP_READ事件
        selector.mute(receive.source)
      } 
    }
  }

KafkaRequestHandlerPool 处理请求

接下来就会通过KafkaRequestHandler线程去处理请求队列中的请求。回到最开始的 startup(),

  def startup() {
	//主要用于处理请求队列里面的请求
        requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, config.numIoThreads)
	//...
}

新建的KafkaRequestHandlerPool,会在主构造函数创建8个KafkaRequestHandler

class KafkaRequestHandlerPool(val brokerId: Int,
                              val requestChannel: RequestChannel,
                              val apis: KafkaApis,
                              numThreads: Int) extends Logging with KafkaMetricsGroup {

  val threads = new Array[Thread](numThreads)
  val runnables = new Array[KafkaRequestHandler](numThreads)
  //默认启动8个线程,一般情况下可以根据消息的吞吐量去设置这个参数
  for(i <- 0 until numThreads) {
    //创建线程
    runnables(i) = new KafkaRequestHandler(i, brokerId, aggregateIdleMeter, numThreads, requestChannel, apis)
    threads(i) = Utils.daemonThread("kafka-request-handler-" + i, runnables(i))
    //线程启动,就会执行run方法
    threads(i).start()
  }

}

KafkaRequestHandler启动之后就会执行run方法,将客户端的请求交由KafkaAPIs进行最终的处理。

def run() {
    while(true) {
      try {
        var req : RequestChannel.Request = null
        while (req == null) {
          val startSelectTime = SystemTime.nanoseconds
          //获取request对象
          req = requestChannel.receiveRequest(300)
          val idleTime = SystemTime.nanoseconds - startSelectTime
          aggregateIdleMeter.mark(idleTime / totalHandlerThreads)
        }

        //将请求交给KafkaApis进行处理
        apis.handle(req)
      } 
    }
  }
  def handle(request: RequestChannel.Request) {
            //处理生产者发送过来的请求
        case ApiKeys.PRODUCE => handleProducerRequest(request)
  }
def handleProducerRequest(request: RequestChannel.Request) {
    //获取到生产发送过来的请求信息
    val produceRequest = request.body.asInstanceOf[ProduceRequest]
    val numBytesAppended = request.header.sizeOf + produceRequest.sizeOf
    //按照分区的方式去遍历数据
    val (existingAndAuthorizedForDescribeTopics, nonExistingOrUnauthorizedForDescribeTopics) = produceRequest.partitionRecords.asScala.partition {
      //对发送过来的数据进行权限等判断。
      case (topicPartition, _) => authorize(request.session, Describe, new Resource(auth.Topic, topicPartition.topic)) && metadataCache.contains(topicPartition.topic)
    }
    //判断是否有写权限。
    val (authorizedRequestInfo, unauthorizedForWriteRequestInfo) = existingAndAuthorizedForDescribeTopics.partition {
      case (topicPartition, _) => authorize(request.session, Write, new Resource(auth.Topic, topicPartition.topic))
    }
       //把接收的数据追加到磁盘上
      replicaManager.appendMessages(
        produceRequest.timeout.toLong,
        produceRequest.acks,
        internalTopicsAllowed,
        authorizedMessagesPerPartition,
        sendResponseCallback)
    }

数据存储到磁盘后,调用sendResponseCallback()回调函数处理响应。

  def sendResponseCallback(responseStatus: Map[TopicPartition, PartitionResponse]) {
   //...
   quotas.produce.recordAndMaybeThrottle(
        request.session.sanitizedUser,
        request.header.clientId,
        numBytesAppended,
        produceResponseCallback)
    }
  }

继续调用回调函数produceResponseCallback(),根据ack的值进行处理

  1. acks=0:生产者不会等待任何来自broker的确认。
  2. acks=1(默认):生产者会等待leader broker接收到消息并确认(但不保证所有副本都已同步)。
  3. acks=all 或 acks=-1:生产者会等待所有同步副本都确认接收到消息。
 def produceResponseCallback(delayTimeMs: Int) {
      //acks = 0,表示生产者不关心数据的处理结果,所以不需要返回响应信息
        if (produceRequest.acks == 0) {
         //...
        } else {
        //acks不为0,表明生产者需要响应消息
          //封装请求头
          val respHeader = new ResponseHeader(request.header.correlationId)
          //封装请求体,也就是响应消息
          val respBody = request.header.apiVersion match {
            case 0 => new ProduceResponse(mergedResponseStatus.asJava)
            case version@(1 | 2) => new ProduceResponse(mergedResponseStatus.asJava, delayTimeMs, version)
          }
        	//将响应消息发送给客户端
          requestChannel.sendResponse(new RequestChannel.Response(request, new ResponseSend(request.connectionId, respHeader, respBody)))
        }
      }

将响应放入processor对应的responseQueue中,默认情况下有3个responseQueue。

  def sendResponse(response: RequestChannel.Response) {
    //先从数组获取Processor对应的队列,再将响应放到这个队列
    responseQueues(response.processor).put(response)
    for(onResponse <- responseListeners)
      onResponse(response.processor)
  }

接着服务端需要响应客户端,回到processor的run方法

 override def run() {
        //处理响应,并注册OP_WRITE事件
        processNewResponses()
          //处理已经发送出去的响应并重新监听OP_READ事件
        processCompletedSends()
  }

处理responseQueues中的响应可以分为三种类型:

  • NoOpAction:对于不需要返回响应的请求,重新注册OP_READ监听事件。
  • SendAction:需要发送响应的情况,接下来注册OP_WRITE监听事件,并最终通过selector.poll()方法将响应结果发送给客户端。
  • CloseConnectionAction:需要关闭的响应。
private def processNewResponses() {
    //通过Process线程的id获取Response对象
    var curr = requestChannel.receiveResponse(id)
    while (curr != null) {
      try {
        curr.responseAction match {
          //对于不需要返回响应的请求
          case RequestChannel.NoOpAction =>
            curr.request.updateRequestMetrics
            //重新监听OP_READ事件
            selector.unmute(curr.request.connectionId)
            //需要发送响应的情况
          case RequestChannel.SendAction =>
            //注册OP_WRITE事件,发送响应
            sendResponse(curr)
           // 需要关闭的响应,关闭连接
          case RequestChannel.CloseConnectionAction =>
            curr.request.updateRequestMetrics
            close(selector, curr.request.connectionId)
        }
      } finally {
        curr = requestChannel.receiveResponse(id)
      }
    }
  }

正常情况下处理已经发送出去的响应,将响应从响应队列中移除,并重新监听OP_READ事件,准备处理客户端的下一个请求。

  private def processCompletedSends() {
    selector.completedSends.asScala.foreach { send =>
      //移除响应队列的响应
      val resp = inflightResponses.remove(send.destination).getOrElse {
       //...
      }
      resp.request.updateRequestMetrics()
      selector.unmute(send.destination)
    }
  }

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2204998.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

眼镜店会员积分卡系统软件试用版下载 佳易王配镜顾客信息管理系统使用操作教程

一、前言 【软件试用版资源文件下载可以点文章最后卡片了解】 眼镜店会员积分软件是专门为眼镜店设计的管理工具&#xff0c;旨在通过积分活动提升顾客的忠诚度&#xff0c;促进复购率&#xff0c;并优化顾客体验。 眼镜店会员积分卡系统软件试用版下载 佳易王配镜顾客信息管…

WEB攻防-python考点CTF与CMS-SSTI模板注入PYC反编译

知识点&#xff1a; 1、PYC&#xff08;python编译后的文件&#xff09;文件反编译&#xff1b; 2、Python-Web-SSTI&#xff1b; 3、SSTI模板注入利用分析&#xff1b; &#xff08;Server-Side Template Injection&#xff09; SSTI 就是服务器端模板注入 当前使用的一…

高级算法设计与分析-MaxFlow网络流基础知识

MaxFlow网络流 1 网络流基础概念 source:源点 sink:终点 Flow:流量 capacity:容量 Residual:残量 Residual Network:残量网络 Augmenting path:增广路径,表示从源点 s 到终点 t 不包含环的路径 Bottleneck capacity:瓶颈容量 2 最大流 2.1 基础概念 2.2 增广路算法 …

C++刷怪笼(7)string类

目录 1.前言 2.正文 2.1标准库中的string类 2.1.1string类 2.1.2auto和范围for 2.1.3string类的常用接口说明 2.2string类的模拟实现 2.2.1经典的string类问题 2.2.2浅拷贝 2.2.3深拷贝 ​编辑 2.2.4写时拷贝 3.小结 1.前言 前面我们对C的封装这一大特性进行了详细…

Win10电脑录屏全攻略:轻松掌握屏幕录制技巧

在日常生活和工作中&#xff0c;我们经常需要录制屏幕来展示操作步骤、制作教程视频或记录重要内容。如果你正在使用Win10系统&#xff0c;那么有多种方法可以轻松实现屏幕录制。下面就给大家分享五种不同的录屏工具&#xff0c;让你轻松掌握屏幕录制技巧。 一、嗨格式录屏大师…

从数据到资产,数据资产管理的风口能撑多久?

从数据到资产&#xff0c;数据资产管理的风口能撑多久&#xff1f; 前言从数据到资产 前言 在今天的大数据和人工智能世界里&#xff0c;数据的价值已被推到了前所未有的高度。作为一家企业&#xff0c;如何有效管理这些数据并从中提取出最具价值的部分&#xff0c;成了各行各…

AI产品经理指南| 面试了100位AI产品经理后的心得总结

AI正深刻地变革着各行各业&#xff0c;而在这场技术革命中&#xff0c;首先受到影响的或许正是那些与AI紧密相连的专业人士——产品经理。 我们本篇中的对话始于Vanessa对「面试了100位AI产品经理」的心得总结。 &#x1f469;&#x1f3fb; Vanessa&#xff1a; 字节跳动AI产…

NTAG_8654

NTAG_8654 产品型号 NTAG_8654 标签尺寸 85.5*54*4.3MM 芯片类型 NFC Ntag 213/215/216 工作频率 13.56MHZ 工作温度 -30C~100C 标签材质 ABS 支持协议 14443A 协议 标签材质 黑/白 安装方式 背胶粘贴或柳钉安装 应用领域 物业巡更&#xff0c;设备巡检&…

工业智能化的安全护盾,CodeMeter如何实现高效授权与防护

工业自动化的快速发展对技术创新提出了越来越高的要求。为了在全球竞争中保持领先&#xff0c;制造企业不仅需要提高生产效率&#xff0c;还必须确保软件的安全性和可靠性。如何在保证高效运营的同时&#xff0c;保护核心知识产权和防止软件盗用&#xff0c;成为了行业内的重要…

拓扑排序基础及应用案例

文章目录 基础内容应用案例&#xff1a;软件构建系统的依赖管理背景描述解决方案具体步骤示例代码 基础内容 拓扑排序&#xff08;Topological Sort&#xff09;是一种针对有向无环图&#xff08;Directed Acyclic Graphs, DAG&#xff09;的排序方法。它的目的是找出一种图中…

【树莓派5B】IO串口通信使用

超级简单的串口使用 前言零、检查准备&#xff08;可略&#xff09;0.1 查看UART引脚&#xff1a;0.2 扩展一下引脚查看的方法 一、配置准备1.1 检查端口配置1.2 查看串口映射1.3 下载minicom串口调试工具1.4 通过命令获取串口上的数据 二、python的serial进行收发测试总结 前言…

Llama-3.2-3B-Instruct PyTorch模型微调最佳实践

1 引言 Meta Llama 3.2多语言大型语言模型集合&#xff08;LM&#xff09;是一个1B和3B大小&#xff08;文本输入/文本输出&#xff09;的预训练和指令微调模型集合。Llama 3.2指令调整的纯文本模型针对多语言对话用例进行了优化&#xff0c;包括智能检索和总结任务。它们在常…

【用Java学习数据结构系列】HashMap与TreeMap的区别,以及Map与Set的关系

看到这句话的时候证明&#xff1a;此刻你我都在努力 加油陌生人 个人主页&#xff1a;Gu Gu Study 专栏&#xff1a;用Java学习数据结构系列 喜欢的一句话&#xff1a; 常常会回顾努力的自己&#xff0c;所以要为自己的努力留下足迹 喜欢的话可以点个赞谢谢了。 作者&#xff…

js逆向——2024最新有道翻译过控制台反调试

今日受害者网址&#xff1a;https://fanyi.youdao.com/ 最近收到粉丝反馈&#xff0c;有道翻译网站添加了反调试功能&#xff0c;出现了打不开控制台的情况 那么&#xff0c;今天就加餐讲一下如何过掉有道的反调试吧~ 我进去测试了一下&#xff0c;发现就是右键检查/f12打开控…

Java-基础

1. 导入模块不能纯粹的复制粘贴&#xff0c;要从new里导入&#xff0c;因为前者建立不了关联 2. 数组 String[] name{"张三","李四","王五"};int[] numsnew int[]{1,2,3};//二维String[][] names{{"张三","李四"},{"…

算法设计课程简介

算法设计课程简介 1. 课程概述 算法设计是一门计算机科学的核心课程&#xff0c;旨在教授学生如何设计、分析和优化各种算法&#xff0c;以解决实际问题。该课程不仅涉及具体算法的实现&#xff0c;更注重算法在时间复杂度和空间复杂度上的优化&#xff0c;帮助学生培养编写高…

map系列的使用

map和multimap参考文档 map和multimap参考文档https://legacy.cplusplus.com/reference/map/ map类的介绍 map的声明如下&#xff0c;Key就是map底层关键字的类型&#xff0c;T是map底层T的类型。但要注意&#xff1a;map的 key 与 T 是封装在std::pair<Key&#xff0c;…

第二十一篇:你知道直播,小区视频点播等是怎么实现的吗?(组播协议)

你知道直播&#xff0c;小区视频点播等是怎么实现的吗&#xff1f;其实现就是运用了组播&#xff01; 信息从信息源发送给组播成员&#xff0c;肯定不能全网无选择的传播&#xff0c;那不是组播&#xff0c;那就是广播了&#xff0c;路由器不支持广播&#xff0c;却支持组播&a…

Kylin系统根分区扩容步骤

问题背景 工作中&#xff0c;有时候Linux操作系统的根分区空间不足&#xff0c;我们需要将已安装的系统根分区进行扩容。那么该如何进行操作呢&#xff1f;我以VMware虚拟机上的Kylin系统为例&#xff0c;进行了详细的实验操作。具体详情请见下面的文章。 虚拟机扩容硬盘 首…

【高等代数笔记】线性空间(二十四下半部分-二十六)

3.23 子空间的运算 【推论1】 dim ⁡ ( V 1 V 2 ) dim ⁡ V 1 dim ⁡ V 2 ⇔ V 1 ∩ V 2 0 \dim(\textbf{V}_1\textbf{V}_2 )\dim\textbf{V}_1\dim\textbf{V}_2\Leftrightarrow\textbf{V}_1\cap\textbf{V}_2\textbf{0} dim(V1​V2​)dimV1​dimV2​⇔V1​∩V2​0 3.24 子…