vllm源码解析(二):调度策略分析

news2024/11/14 18:48:46

五 vllm调度逻辑

前面分享的知识确实遗漏了许多细节,不过不影响我们接下来的讲解,我们在第一篇文章中已经详细讲解过三个队列用途,不熟悉的同学可以回看。

本章涉及到的流程图已上传github:
https://github.com/yblir/packages/blob/main/vllm.vsdx

从下图可以看到调度系统整体运行规则:

  • vllm/core/scheduler.py:def _schedule_default(…),每次执行推理前都要走一遍调度。调度逻辑如下:
    在这里插入图片描述

我们对调度逻辑做下详细分析:

    1. 每次执行调度前都要重新初始化一个budget类对象来管理本次调度的tokens和seqs数量, 根据数量是否超过阈值,决定将本次seq_groups放入哪个队列(一个seq_groups会包含多个seqs)。budget对象在上图三个调度方法中都有用到,用以判断正在处理的seq_group的去向。
    1. 我们来思考这样一个问题:调度的目的是让vllm每次推理都保持最大的吞吐量,而running队列中seq_groups才是送去做推理的,那么为什么调度逻辑的判断入口是swapped而不是running呢?

      让我们从社会学角度来考虑这个问题:
      你有没有遇到过这种情况,某天,你老板(调度)来到你面前,跟你(running)说,亲,你的工作饱和吗(最大吞吐量),要不要给你再来点?我想你肯定没遇到过。真实的情况是,老板会直接把工作甩你脸上,工作不饱和你就干吧,没时间干(gpu资源不足或处理数量超出阈值)就先积压起来(watiing or swapped),有时间再搞。

      那么问题来了,你(running)工作的来源有两个(waiting,swapped),要从哪个队列拿数据呢?
      稍微思考下就能知道,必定先考虑积压的(swapped),因为这是已经做过一部分的工作,如果不优先处理,它们会始终占据着你的脑容量(系统资源); 而另一边,waiting队列则是完全没开始的新工作,当前也不存在任何资源占用问题,一比较就高下立判了吧!同时这也由调度原则决定:先来的请求先被服务,swapped生命历程为:waiting -> running -> swapped, 从整个时间尺度上看,swapped肯定比现在处于waiting队列的seq_group来的更早。

    1. 如果swapped有积压的工作(非空),说明你已经在高强度工作,不能向running继续塞新工作了。同时要看看你的工作情况(_schedule_running),看在接下来工作(下一次推理)是否有风险(预期资源不足或tokens,seqs数量超出阈值),毕竟压榨打工人是为了创造更多价值,但若直接把打工人累死就啥都没了!(gpu 爆显存)。如果swapped为空,只能从waiting队列拿数据了(使用_schedule_prefills方法调度)。
    1. 从waiting队列取出数据prefill后,先判断它的seq_groups数量,如果为非0,说明有取出seq_group,再验证下budget阈值(tokens和seqs数量,_schedule_prefills方法内部已经对budget作过校验,此处仅是为了代码健壮性),没问题就能加入running队列,愉快地做推理去,至此,本次调度完结。若waiting队列为空,没有seq_group可用,那么prefill的seq_group数量必定也为0。此时swapped和waiting队列都没有提供新的seq_group(此时swapped队列为空,但waiting队列不一定为空,也可能是当前系统资源紧张,因此跳过取数据的步骤,导致prefill.seq_groups为空),唯一还在活跃的队列是running。
    1. _schedule_running处理的是running队列,这个队列里面存储的是上一次执行推理的seq_groups, 以前行不代表现在也可以,如经过推理后新生成token的kv-cache会占用部分资源,导致系统资源不足以支撑running队列中的所有seq_groups再做一次推理,所以还要对running队列中的seq_group进行校验,如果确实不能支撑,就要执行抢占操作。
    1. 接下来要判断是否存在抢占现象,如果发生抢占,就根据抢占类型转移到不同队列中,对抢占的处理在(一)有讲到。如果没有发生抢占,用_schedule_swapped调度方法 转移seq_group到running队列。源码在这里没有对swapped队列是否为空做二次判断,也可以理解吧,毕竟如果队列为空,那就啥都不取呗!
    1. 现在获得seq_groups都是经过筛选的,能确保下一次推理时不会爆显存,还要做最后一次判断,确认没有超过调度器设置的推理时的阈值(tokens和seqs,基本可确定没问题,因为三个调度方法中都有对阈值做判断,觉得这里的判断只是为了增加代码的健壮性)。之后把经过检验的所有seq_groups加入到running队列,这些才是可用于下一次推理的seq_groups(成功上岸!)
 def _schedule_default(self) -> SchedulerOutputs:
     """Schedule queued requests.
     The current policy is designed to optimize the throughput. First,
     it batches as many prefill requests as possible. And it schedules
     decodes. If there's a pressure on GPU memory, decode requests can
     be swapped or preempted.

     当前策调度略旨在优化吞吐量。首先,它会批量处理尽可能多的预填充请求。然后它会安排解码。
     如果 GPU 内存有压力,则可以交换或抢占解码请求。因此会优先从swapped进行判断。
     """
     # Include running requests to the budget.
     # 每次step都要重新初始化一个budget来管理本次调度的的tokens和seqs数量, 根据数量是否超过阈值,决定将本次
     # seq_groups放入哪个队列。(一个seq_groups会包含多个seqs)
     budget = SchedulingBudget(
             token_budget=self.scheduler_config.max_num_batched_tokens,
             max_num_seqs=self.scheduler_config.max_num_seqs,
     )

     # Make sure we include num running seqs before scheduling prefill,
     # so that we don't schedule beyond max_num_seqs for prefill.
     # 先统计正在执行推理的seq_groups中seq的数量
     for seq_group in self.running:
         budget.add_num_seqs(seq_group.request_id, seq_group.get_max_num_running_seqs())
     # lora推理相关,可忽略
     curr_loras = set(
             seq_group.lora_int_id for seq_group in self.running
             if seq_group.lora_int_id > 0) if self.lora_enabled else None

     # 以下三个变量,类似于C++中的结构体。将多个变量合在一起,通过.属性访问
     # 各自保存处于不同活跃状态(wait,run,swap)的seq_groups具有的属性
     prefills = SchedulerPrefillOutputs.create_empty()
     running_scheduled = SchedulerRunningOutputs.create_empty()
     swapped_in = SchedulerSwappedInOutputs.create_empty()

     # If any requests are swapped, prioritized swapped requests.
     # 为什么要从swap开始判断?
     # 调度的任务是优化吞吐量,即保证处于running状态的seqs最多。running从wait和swap队列
     # 获得,首先积压的任务可能要比wait的优先级高,因为swap队列中的任务始终占据着系统资源,当
     # running可添加时,应该首先处理swap。
     if not self.swapped:  # 如果swapped队列为空
         # 既然不能从swap想running转移,那就只能从wait队列拿任务了。
         # wait队列中的都是原始任务,第一步要预填充
         # prefills是一个伪结构体:可以.出以下属性
         #     seq_groups: List[SequenceGroup]
         #     ignored_seq_groups: List[SequenceGroup]
         #     num_lookahead_slots: int
         prefills = self._schedule_prefills(budget, curr_loras, enable_chunking=False)

     # Don't schedule decodes if prefills are scheduled.
     # NOTE: If `_schedule_prefills` doesn't enable chunking, self.running
     # only contains decode requests, not chunked prefills.

     # self.waiting空,或 self.swapped非空,都会导致prefills.seq_groups数量为0
     # # 这个判断的意思是,prefills.seq_groups==0,说明本次调度没有安排预填充任务,那么就安排解码任务.
     # 执行推理任务的seq_group都在running队列,因此需要对这个队列进行调度。
     # 调度什么呢?
     # 是看running队列中的seq_group是否可以继续做推理任务。因为vllm动态管理,最大限度优化吞吐量,会导致blocks资源紧张
     # 上次推理生成的tokens的kv-cache需要GPU blocks去存储,导致资源消耗。那么这次准备推理时blocks数量不一定能够它完成
     # 推理,所以要对running队列中每个seq_group进行检查,看是否可以进行做推理。
     if len(prefills.seq_groups) == 0:
         running_scheduled = self._schedule_running(budget, curr_loras, enable_chunking=False)

         # If any sequence group is preempted, do not swap in any sequence
         # group. because it means there's no slot for new running requests.
         # 在对running队列调度后(从self.running队列取seq_group,准备进行推理),如果没有seq_group被
         # 抢占(退回wait队列),也没有seq_group被转移到CPU上, 说明blocks资源充足,可以把以前
         # self.swapped队列中积压的seq_group转移到gpu blocks做推理.

         # 注意这几个队列的判断逻辑. 如果self.swapped原本就非空,会进入上面的if判断分支进行self.running队列
         # 调度取值.然后根据这个过程中是否有seq_group被preempted和swapped_out获知blocks资源使用情况.
         # 如果没有被preempted和swapped_out.说明工作不饱和,就把self.swapped内容取出来,加入running队列进行推理
         # 如果有被preempted和swapped_out,说明资源紧张. self.swapped积压的任务暂时还处理不了
         # 如果下面if为False,意思是就不要再从self.swapped转移seq_group会gpu做推理了
         if len(running_scheduled.preempted) + len(running_scheduled.swapped_out) == 0:
             swapped_in = self._schedule_swapped(budget, curr_loras)

     # 最后一次判断本次推理的tokens和seqs数量是否超过阈值
     assert budget.num_batched_tokens <= self.scheduler_config.max_num_batched_tokens
     assert budget.num_curr_seqs <= self.scheduler_config.max_num_seqs

     # Update waiting requests.
     # 这个类型被抢占的seq_group,打回原型,重新加入waiting队列。
     # 幸运的是添加到了队列头部,当再次从waiting队列取数据时,会优先处理它
     self.waiting.extendleft(running_scheduled.preempted)
     # Update new running requests.
     # 将以上通过层层筛选的seq_group加入到running队列(真·running),这些seq_group才是下一步的推理对象
     self.running.extend([s.seq_group for s in prefills.seq_groups])
     self.running.extend([s.seq_group for s in running_scheduled.decode_seq_groups])
     self.running.extend([s.seq_group for s in swapped_in.decode_seq_groups])
     # Update swapped requests.
     # 没有足够资源做推理的seq_group会从running转移到swap队列(swap队列是路径之一,另一个是加入到wait队列)
     self.swapped.extend(running_scheduled.swapped_out)
     # 统计被抢占的seq_group数量
     preempted = len(running_scheduled.preempted) + len(running_scheduled.swapped_out)

     # There should be no prefill from running queue because this policy
     # doesn't allow chunked prefills.
     assert len(running_scheduled.prefill_seq_groups) == 0
     assert len(swapped_in.prefill_seq_groups) == 0

     return SchedulerOutputs(
             scheduled_seq_groups=(prefills.seq_groups +
                                   running_scheduled.decode_seq_groups +
                                   swapped_in.decode_seq_groups),
             num_prefill_groups=len(prefills.seq_groups),
             num_batched_tokens=budget.num_batched_tokens,
             blocks_to_swap_in=swapped_in.blocks_to_swap_in,
             blocks_to_swap_out=running_scheduled.blocks_to_swap_out,
             blocks_to_copy=running_scheduled.blocks_to_copy +
                            swapped_in.blocks_to_copy,
             ignored_seq_groups=prefills.ignored_seq_groups +
                                swapped_in.infeasible_seq_groups,
             num_lookahead_slots=running_scheduled.num_lookahead_slots,
             running_queue_size=len(self.running),
             preempted=preempted,
     )

从整体调度逻辑中我们可以看到,核心代码是三处子调度方法,接下来我们来详细分析下这些方法的代码。

5.1 _schedule_prefills调用逻辑

对waiting队列的调度充斥着各种判断,核心只有一条:当把取出seq_group加入running队列进行推理时,系统资源是否够用
基于此,有以下几种处理方式:

  • 准备取出的seq_group 超出prompt长度阈值,标记为finish,以后不再处理
  • 准备取出的seq_group 需求的blocks数量超出阈值,两种处理:①标记为finish,以后不再处理,② 不再取出seq_group,留待以后再处理,跳出调度流程。
  • 准备取出的seq_group tokens或seq数量超出推理能承受的阈值,不再取出seq_group,留待以后再处理,跳出调度流程
    在这里插入图片描述
    def _schedule_prefills(
            self,
            budget: SchedulingBudget,
            curr_loras: Optional[Set[int]],
            enable_chunking: bool = False,
    ) -> SchedulerPrefillOutputs:
        # ignored_seq_groups:记录因太长(所需的blocks和总blocks之间的差值超过阈值了),
        # 而无法继续做生成的seq_group,这些seq_group中的seq状态都会被标记为
        # FINISHED_IGNORED,表示直接不处理他们
        ignored_seq_groups: List[SequenceGroup] = []
        # 用于装载从wait队列转移出来的seq_group
        seq_groups: List[SequenceGroup] = []

        waiting_queue = self.waiting

        leftover_waiting_sequences: Deque[SequenceGroup] = deque()
        # self._passed_delay:通过比较当前请求到达时间来确定是否要从wait队列拿任务
        # 因为拿任务也要占用时间,需要平衡调度任务与推理任务的调用时机
        while self._passed_delay(time.time()) and waiting_queue:
            seq_group = waiting_queue[0]
            # list,从当前seq_group取出准备推理的seq序列. 1个prompt可能有多个seq(多输出),但wait队列中连预填充
            # 都没进行,因此这时的seq(仅是prompt)数量必定==1
            waiting_seqs = seq_group.get_seqs(status=SequenceStatus.WAITING)
            assert len(waiting_seqs) == 1, "Waiting sequence group should have only one prompt sequence."

            # 当前待推理的seq_group需要处理,或者说准备返回的tokens数量,
            # 对于WAITING状态,只有1个seq,tokens数量为prompt长度
            num_new_tokens = self._get_num_new_tokens(seq_group,
                                                      SequenceStatus.WAITING,
                                                      enable_chunking, budget)
            if not enable_chunking:
                num_prompt_tokens = waiting_seqs[0].get_len()
                # 从waiting取出的seq_group,连预填充都没做,更不会有output token,
                # 若计算出的tokens数量不等与prompt数量,一定有问题,抛出异常吧!
                assert num_new_tokens == num_prompt_tokens

            # 如果这条seq的长度 > 每次调度能处理的最大序列长度,那么把这条seq的状态置为FINISHED_IGNORED,
            # 并将对应seq_group装入ignored_seq_groups中,然后将其从waiting列表中移除,永不再处理,完结撒花~
            prompt_limit = self._get_prompt_limit(seq_group)
            if num_new_tokens > prompt_limit:
                logger.warning(
                        "Input prompt (%d tokens) is too long"
                        " and exceeds limit of %d", num_new_tokens, prompt_limit)
                for seq in waiting_seqs:
                    seq.status = SequenceStatus.FINISHED_IGNORED
                ignored_seq_groups.append(seq_group)  # 加入失败者联盟
                waiting_queue.popleft()  # 从 waiting 队列中踢出去
                continue  # 继续从waiting拿数据处理

            # If the sequence group cannot be allocated, stop.
            # 比较当前seq需要的物理块,gpu可用物理块之间的数量关系. 决定是否能给当前seq_group分配物理块
            # can_allocate返回值可能有三种: NEVER:不分配;OK:可以分配;LATER:延迟分配
            can_allocate = self.block_manager.can_allocate(seq_group)
            if can_allocate == AllocStatus.LATER:
                break
            # 当前seq需要的blocks数量,超过gpu能提供的最大数量.加入失败者联盟,永不再处理,
            elif can_allocate == AllocStatus.NEVER:
                logger.warning(
                        f"Input prompt ({num_new_tokens} tokens) is too long"
                        " and exceeds the capacity of block_manager")
                for seq in waiting_seqs:
                    seq.status = SequenceStatus.FINISHED_IGNORED
                ignored_seq_groups.append(seq_group)
                waiting_queue.popleft()
                continue

            # lora相关,忽略
            lora_int_id = 0
            if self.lora_enabled:
                lora_int_id = seq_group.lora_int_id
                assert curr_loras is not None
                assert self.lora_config is not None
                if (self.lora_enabled and lora_int_id > 0
                        and lora_int_id not in curr_loras
                        and len(curr_loras) >= self.lora_config.max_loras):
                    # We don't have a space for another LoRA, so
                    # we ignore this request for now.
                    leftover_waiting_sequences.appendleft(seq_group)
                    waiting_queue.popleft()
                    continue

            # 当前seq_group中状态为 未执行完 的序列的数量,即seq还没推理完成的数量. 刚从wait中取出时,
            # seq数量是1,但推理生成阶段,这个seq_group中会有n个seq在并行.n是外部传入的output数量. 因此这里num_new_seqs==n
            num_new_seqs = seq_group.get_max_num_running_seqs()
            # budget.can_schedule同时判断tokens和seqs数量是否超过阈值,任一个超过单次调度能执行的总数的阈值
            # 说明这step可推理的seqs数量已经马上趋于饱和,不能再加入seq到running队列。跳出while, 结束本次waiting向running的调度
            if num_new_tokens == 0 or not budget.can_schedule(num_new_tokens=num_new_tokens,
                                                              num_new_seqs=num_new_seqs):
                break

            # Can schedule this request.
            # lora相关,忽略
            if curr_loras is not None and lora_int_id > 0:
                curr_loras.add(lora_int_id)

            # 走到这一步时,说明当前seq_group已经通过上述种种验证,可以被加入running队列进行推理
            # 先将其从waiting队列中移出
            waiting_queue.popleft()
            # 为当前seq_group分配物理块,并将该seq_group中每条seq的status从waiting改为running
            self._allocate_and_set_running(seq_group)

            # ScheduledSequenceGroup类似于C++结构体。仅包含seq_group和token_chunk_size两个变量
            # 搞不懂vllm为什么总喜欢这种包裹操作,在各处代码中随处可见。用基本的list,或dict不好吗!
            seq_groups.append(
                    ScheduledSequenceGroup(seq_group=seq_group,
                                           token_chunk_size=num_new_tokens))

            # 当前seq_group的tokens和seqs数量增加到预算budget中
            budget.add_num_batched_tokens(seq_group.request_id, num_new_tokens)
            budget.add_num_seqs(seq_group.request_id, num_new_seqs)

        # Queue requests that couldn't be scheduled.
        # 和lora相关的操作,忽略
        waiting_queue.extendleft(leftover_waiting_sequences)
        if len(seq_groups) > 0:
            self.prev_prompt = True

        return SchedulerPrefillOutputs(
                seq_groups=seq_groups,
                ignored_seq_groups=ignored_seq_groups,
                num_lookahead_slots=self._get_num_lookahead_slots(is_prefill=True))

5.2 _schedule_running调度方法

running队列存储时上次做推理的seq_group,下次推理前, 要判断下是否还可以继续做推理,如果不能,就要从队列移出来。
会转移到waiting或swapped队列,转移规则我们第一篇博客中已经做了详细说明。
在这里插入图片描述

    def _schedule_running(
            self,
            budget: SchedulingBudget,
            curr_loras: Optional[Set[int]],
            enable_chunking: bool = False,
    ) -> SchedulerRunningOutputs:
        # Blocks that need to be swapped or copied before model execution.
        # todo 类型变了
        # blocks_to_swap_out:{gpu物理块id: cpu物理块id}
        # blocks_to_copy: {旧物理块id:[由旧物理块copy-on-write而来的新物理块id]}
        blocks_to_swap_out: List[Tuple[int, int]] = []
        blocks_to_copy: List[Tuple[int, int]] = []

        decode_seq_groups: List[ScheduledSequenceGroup] = []
        prefill_seq_groups: List[ScheduledSequenceGroup] = []
        preempted: List[SequenceGroup] = []
        swapped_out: List[SequenceGroup] = []

        # NOTE(woosuk): Preemption happens only when there is no available slot
        # to keep all the sequence groups in the RUNNING state.

        running_queue = self.running

        while running_queue:
            seq_group = running_queue[0]
            # 当前待推理的seq_group需要处理,或者说准备返回的tokens数量,对于RUNNING状态,每个seq返回1个token
            num_running_tokens = self._get_num_new_tokens(
                    seq_group, SequenceStatus.RUNNING, enable_chunking, budget)

            # todo 觉得这个判断有点多余,因为处于RUNNING状态的seq,必定有tokens返回,prompt总不能为空吧,num_running_tokens
            # todo 不可能为0, 再说,如果为0, 方法self._get_num_new_tokens内部就会抛出异常,因为做了assert断言
            if num_running_tokens == 0:
                break

            # 经过num_running_tokens检验没问题后, 将该seq_group从running_queue中取出来
            running_queue.popleft()

            # 对于这个seq_group,检查对于其中的每一个seq,是否能至少分配一个物理块给它,如果不能的话
            # (说明要执行抢占操作了,否则马上会没有资源让这个最早到达的seq_group做完推理):
            # 这里用了while...else,如果while条件正常结束,则进入else内容;如果被break,则不会执行else
            while not self._can_append_slots(seq_group):  # 如果不能为当前seq_group的每个seq都分配一个block
                # 这个seq_group本来是要送去做推理的,但没有足够的gpu物理blocks分配给它
                # 根据vllm的调度原则,这个seq_group要被优先处理,没有足够资源,就把running队列最后位置的
                # seq_group踢出去,释放gpu blocks给当前seq_group使用。

                # seq_group准备返回的tokens数量已经加到budget属性上,现在不处理它, 要把数量再减回来
                # budget会记录每次+-数量的seq_group.request_id,如果以前没被+过,现在就不会被-,就像下面的调用一样
                budget.subtract_num_batched_tokens(seq_group.request_id, num_running_tokens)
                # 在外层总调度中,已经在budget汇总了所有正在活跃的seqs数量,现在要减去属于该seq_group的seqs数量
                num_running_seqs = seq_group.get_max_num_running_seqs()
                budget.subtract_num_seqs(seq_group.request_id, num_running_seqs)

                # lora相关,忽略
                if (curr_loras is not None and seq_group.lora_int_id > 0
                        and seq_group.lora_int_id in curr_loras):
                    curr_loras.remove(seq_group.lora_int_id)

                # ------------------------------------------------------------------------------------------------------
                # 经过以下释放gpu blocks工作后,再次进入while循环判断gpu blocks数量是否够用,
                # 如果够用,进入到与while对应的else分支,如果不够用,继续释放gpu blocks,直到够用或running_queue全部取完.
                # ------------------------------------------------------------------------------------------------------

                # 如果此时running_queue队列不为空,把最后一个seq_group踢出去放入swap队列,给
                # 上面这个seq_group腾位置(释放最后一个seq_group对应的gpu blocks)
                if running_queue:
                    # Preempt the lowest-priority sequence groups.
                    victim_seq_group = running_queue.pop()

                    # 有两种swap方式,RECOMPUTE:删除所有,回炉到waiting队列. SWAP:将blocks全部转移到CPU blocks上
                    preempted_mode = self._preempt(victim_seq_group, blocks_to_swap_out)
                    if preempted_mode == PreemptionMode.RECOMPUTE:
                        preempted.append(victim_seq_group)
                    else:
                        swapped_out.append(victim_seq_group)
                # 如果running_queue队列已经空了,没有替罪的羊,只能把自己放入swap队列了.
                else:
                    # No other sequence groups can be preempted.
                    # Preempt the current sequence group.
                    preempted_mode = self._preempt(seq_group, blocks_to_swap_out)
                    if preempted_mode == PreemptionMode.RECOMPUTE:
                        preempted.append(seq_group)
                    else:
                        swapped_out.append(seq_group)
                    # 此时running_queue队列已空,已经没有seq_group可处理了,使用break中断
                    # while循环, 不走后面的else分支,直接return,而且本次调度没有指定任何待推理的seq_group
                    break
            else:
                # 为当前seq_group分配gpu 物理blocks. 这里只分配了逻辑blocks与物理blocks的映射关系
                # blocks_to_copy:[旧物理块id, copy - on - write而来的新物理块id]
                self._append_slots(seq_group, blocks_to_copy)
                is_prefill = seq_group.is_prefill()
                if is_prefill:
                    prefill_seq_groups.append(ScheduledSequenceGroup(seq_group=seq_group,
                                                                     token_chunk_size=num_running_tokens))
                else:
                    decode_seq_groups.append(ScheduledSequenceGroup(seq_group=seq_group,
                                                                    token_chunk_size=1))
                # todo 这似乎是个bug, 如果_can_append_slots为True,会跳过while直接走当前else分支
                # todo seqs在外层_schedule_default已经更新过,所以这里只更新tokens就好
                # todo 但是,如果_can_append_slots为False,budget会同时减去seq_group的tokens和seqs数量
                # todo 下面把tokens再加回来,逻辑没问题,但没有更新seqs!
                budget.add_num_batched_tokens(seq_group.request_id, num_running_tokens)
                # OPTIMIZATION:  Note that get_max_num_running_seqs is
                # expensive. For the default scheduling chase where
                # enable_chunking is False, num_seqs are updated before running
                # this method, so we don't have to update it again here.
                # 默认情况下, 以下两个if都走不到
                if enable_chunking:
                    num_running_seqs = seq_group.get_max_num_running_seqs()
                    budget.add_num_seqs(seq_group.request_id, num_running_seqs)
                if curr_loras is not None and seq_group.lora_int_id > 0:
                    curr_loras.add(seq_group.lora_int_id)

        return SchedulerRunningOutputs(
                decode_seq_groups=decode_seq_groups,
                prefill_seq_groups=prefill_seq_groups,
                preempted=preempted,
                swapped_out=swapped_out,
                blocks_to_swap_out=blocks_to_swap_out,
                blocks_to_copy=blocks_to_copy,
                num_lookahead_slots=self._get_num_lookahead_slots(is_prefill=False))

5.3 _schedule_swapped调度方法

swapped -> running, 这个调度比较简单,原则就一条:系统是否有足够的资源让它回来做推理
在这里插入图片描述

    def _schedule_swapped(
            self,
            budget: SchedulingBudget,
            curr_loras: Optional[Set[int]],
            enable_chunking: bool = False,
    ) -> SchedulerSwappedInOutputs:
        # Blocks that need to be swapped or copied before model execution.
        # [(cpu物理块id, gpu物理块id)]
        blocks_to_swap_in: List[Tuple[int, int]] = []
        # [(旧物理块,copy - on - write而来的新物理块id)]
        blocks_to_copy: List[Tuple[int, int]] = []
        # 准备解码的seq_group
        decode_seq_groups: List[ScheduledSequenceGroup] = []
        # 准备预填充的seq_group
        prefill_seq_groups: List[ScheduledSequenceGroup] = []
        # 因各种原因,被标记为不再处理的seq_group,如预填充序列太长了...
        infeasible_seq_groups: List[SequenceGroup] = []

        swapped_queue = self.swapped

        leftover_swapped: Deque[SequenceGroup] = deque()
        while swapped_queue:
            # 取出swap队列中最早被抢占的seq_group
            seq_group = swapped_queue[0]

            # ----------------------------------------------------------------------------------------------------------
            # If the sequence group cannot be swapped in, stop.
            # 对被抢占seq_group有两种处理方式,1. 清空放入waiting队列,这时is_prefill为True
            # 2.blocks全部转移到CPU上,这时is_prefill为False
            # self._get_num_lookahead_slots(is_prefill)必定为0,否则抛出异常,block_manager_v1不支持非0情况
            is_prefill = seq_group.is_prefill()
            # 根据需要的,与可用的物理blocks数量判断,是否可以把当前seq_group从swap队列转移到running队列
            alloc_status = self.block_manager.can_swap_in(
                    seq_group, self._get_num_lookahead_slots(is_prefill))
            if alloc_status == AllocStatus.LATER:    # 稍后,资源多时再处理
                break
            elif alloc_status == AllocStatus.NEVER:  # 不合格,永不再处理
                logger.warning(
                        "Failing the request %s because there's not enough kv "
                        "cache blocks to run the entire sequence.",
                        seq_group.request_id)
                for seq in seq_group.get_seqs():
                    seq.status = SequenceStatus.FINISHED_IGNORED
                infeasible_seq_groups.append(seq_group)
                swapped_queue.popleft()
                continue
            # ----------------------------------------------------------------------------------------------------------

            # lora相关,忽略
            lora_int_id = 0
            if self.lora_enabled:
                lora_int_id = seq_group.lora_int_id
                assert curr_loras is not None
                assert self.lora_config is not None
                if (lora_int_id > 0 and (lora_int_id not in curr_loras)
                        and len(curr_loras) >= self.lora_config.max_loras):
                    # We don't have a space for another LoRA, so
                    # we ignore this request for now.
                    leftover_swapped.appendleft(seq_group)
                    swapped_queue.popleft()
                    continue

            # The total number of sequences in the RUNNING state should not
            # exceed the maximum number of sequences.
            # 取出这个seq_group在剩余生命周期内将并行运行的最大seq数量
            num_new_seqs = seq_group.get_max_num_running_seqs()
            # 当前准备转移的seq_group,需要处理,或者说准备返回的tokens数量,
            # decode模式:每个seq num_token=1,其他模式则遵循各自的状态
            num_new_tokens = self._get_num_new_tokens(seq_group,
                                                      SequenceStatus.SWAPPED,
                                                      enable_chunking, budget)
            # 感觉num_new_tokens==0的判断有点多余,基本不可能为0
            # budget.can_schedule 会判断加上当前seq_group的num_new_tokens和num_new_seqs后
            # 总数是否会超标,如果超标,说明不能再添加任何seq_group到running队列,直接结束本次调度
            if (num_new_tokens == 0
                    or not budget.can_schedule(num_new_tokens=num_new_tokens, num_new_seqs=num_new_seqs)):
                break

            if lora_int_id > 0 and curr_loras is not None:
                curr_loras.add(lora_int_id)

            # 如果能走到这步,说明可向running队列转移了。先把当前seq_group从swap队列踢出来
            # 再把CPU上的blocks转移到GPU block上
            swapped_queue.popleft()
            self._swap_in(seq_group, blocks_to_swap_in)
            self._append_slots(seq_group, blocks_to_copy)
            # 判断是不是预填充,将这个seq_group加入不同的分组
            is_prefill = seq_group.is_prefill()
            if is_prefill:
                prefill_seq_groups.append(ScheduledSequenceGroup(seq_group, token_chunk_size=num_new_tokens))
            else:
                decode_seq_groups.append(ScheduledSequenceGroup(seq_group, token_chunk_size=1))

            # 将这个马上上岸的seq_group的tokens和seqs数量更新到budget中
            budget.add_num_batched_tokens(seq_group.request_id, num_new_tokens)
            budget.add_num_seqs(seq_group.request_id, num_new_seqs)

        swapped_queue.extendleft(leftover_swapped)

        return SchedulerSwappedInOutputs(
                decode_seq_groups=decode_seq_groups,
                prefill_seq_groups=prefill_seq_groups,
                blocks_to_swap_in=blocks_to_swap_in,
                blocks_to_copy=blocks_to_copy,
                num_lookahead_slots=self._get_num_lookahead_slots(
                        is_prefill=False),
                infeasible_seq_groups=infeasible_seq_groups,
        )

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2102669.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

力扣-968监控二叉树(Java贪心详细题解)

题目链接&#xff1a;968. 监控二叉树 - 力扣&#xff08;LeetCode&#xff09; 前情提要&#xff1a; 本题是一道名副其实的hard题目&#xff0c;他考察二叉树和贪心的综合运用能力。 所以我们不仅要会贪心还要会二叉树的一些知识&#xff0c;如果没有写二叉树类型的题目&a…

实战项目-快速实战-springboot dataway

最后附项目源码, 开箱即用 访问地址 http://127.0.0.1:8101/interface-ui/#/ 效果图 具体怎么用, 大家还是看官网,中文文档 https://www.dataql.net/docs/dataway/ui/ui-list 项目结构 代码 DataWayApplication package com.zero.dataway;import net.hasor.spring.boot…

集合框架,List常用API,栈和队列初识

回顾 集合框架 两个重点——ArrayList和HashSet. Vector/ArraysList/LinkedList区别 VectorArraysListLinkedList底层实现数组数组链表线程安全安全不安全不安全增删效率较低较低高扩容*2*1.5-------- &#xff08;>>&#xff09;运算级最低&#xff0c;记得加括号。 常…

窖藏之秘:白酒在窖藏过程中经历了哪些变化?

在中华五千年的文明史中&#xff0c;白酒一直扮演着举足轻重的角色。它不仅是文人墨客笔下的灵感源泉&#xff0c;更是亲朋好友间传递情感的桥梁。在众多白酒品牌中&#xff0c;豪迈白酒&#xff08;HOMANLISM&#xff09;以其不同的酿造工艺和窖藏技艺&#xff0c;成为了酒中翘…

EasyExcel模板导出与公式计算(上)

目录 环境要求 功能预览 需求分析 源码跟踪 自定义数据处理器 ​总结 最近做项目时遇到这样一个需求&#xff0c;将数据库表的含有公式的信息导出为Excel文件并且需要计算其结果&#xff0c;由于上网查询资料后未能完美解决&#xff0c;故将此踩坑过程记录下来以供参考。…

Datawhale X 李宏毅苹果书 AI夏令营第五期 DL进阶方向 Task3笔记

Datawhale X 李宏毅苹果书 向李宏毅学深度学习&#xff08;进阶&#xff09; 是 Datawhale 2024 年 AI 夏令营第五期的学习活动&#xff08;“深度学习 进阶”方向&#xff09; 往期task1链接&#xff1a;深度学习进阶-Task1 往期task2链接&#xff1a;深度学习进阶-Task2 我做…

惠中科技RDS自清洁膜层:光伏领域的绿色革命

惠中科技RDS自清洁膜层&#xff1a;光伏领域的绿色革命 在全球能源转型和光伏产业蓬勃发展的背景下&#xff0c;光伏电站的运营维护面临着诸多挑战&#xff0c;其中灰尘污染问题尤为突出。灰尘的堆积不仅降低了光伏板的透光率&#xff0c;还直接影响了电站的发电效率和经济效益…

算法复盘——Leetcode hot100: 双指针算法

双指针算法 11. 盛最多水的容器 - 力扣&#xff08;LeetCode&#xff09; 优化解法&#xff1a;用 left 和 right 两个指针从两端向中心收缩&#xff0c;一边收缩一边计算 [left, right] 之间的矩形面积&#xff0c;取最大的面积值即是答案。 其实是优化区间选择 直接丢弃体…

51单片机-串口通信(单片机和PC互发数据)

作者&#xff1a;Whappy 时间&#xff1a;2024.9.3 关于串口的疑问&#xff1f; 根据我的代码是不是初始化完成串口之后&#xff0c;只要我们使用串口发送数据就会触发中断&#xff1f; &#xff08;在文章下面&#xff09; ChatGPT said: ChatGPT 是的&#xff0c;根据…

IDEA项目启动在不同端口的方法,服务多端口启动

前言 在本地测试分布式事务以及分布式锁的过程中&#xff0c;在IDEA中多端口启动服务&#xff0c;可以高效方便开展测试调试工作。 开启流程 1.打开Edit Configurations 2.选中要复制的服务&#xff0c;点击复制小图标 3.配置启动端口号&#xff0c;点击保存 --server.port1…

解密Docker核心:深入理解Docker基础架构

随着云计算技术的普及&#xff0c;Docker容器技术在现代应用开发和部署中占据了重要地位。要充分理解Docker的优势与运用&#xff0c;深入掌握其基础架构是关键。本文将深入探讨Docker的核心组成部分及其在容器化平台中的角色和作用。 一、Docker的基础架构概述 Docker的基础…

leetcode 2816.翻倍以链表形式表示的数字

1.题目要求: 给你一个 非空 链表的头节点 head &#xff0c;表示一个不含前导零的非负数整数。将链表 翻倍 后&#xff0c;返回头节点 head 。2.题目代码: /*** Definition for singly-linked list.* struct ListNode {* int val;* struct ListNode *next;* };*/ str…

AI建模——文/图生模型产品介绍与模型免费下载

说明&#xff1a; 记录AI文生3D模型、图生3D模型的相关产品&#xff1b;记录其性能、功能、收费与免费方法 1.AI建模产品 Robin MeshAnything Meshy 生成效果比较&#xff1a; 2. Rodin 官网&#xff1a;gHyperHuman 支持&#xff1a;文生模型、图生模型 模型生成与下载…

自动控制:模糊控制器的原理及设计

自动控制&#xff1a;模糊控制器的原理及设计 引言 随着控制技术的不断发展&#xff0c;模糊控制器&#xff08;Fuzzy Controller&#xff09;作为一种智能控制技术&#xff0c;广泛应用于许多复杂系统中。与传统的线性控制器不同&#xff0c;模糊控制器无需精确的数学模型&a…

IOS17.0安装巨魔:TrollRestore巨魔发布

&#x1f47b; TrollRestore 17.0 巨魔发布 15.0 - 16.7 RC&#xff08;20H18&#xff09;和17.0。 官网&#xff1a;https://trollrestore.com/ 下载&#xff1a;https://pan.metanetdisk.com/IOS/%E5%B7%A8%E9%AD%94%E7%8E%A9%E5%AE%B6/TrollRestore.com 使用&#xff1a;ht…

《OpenCV计算机视觉》—— 图像边缘检测

文章目录 一、图像边缘检测概述二、常见的图像边缘检测算法&#xff08;简单介绍&#xff09;1.sobel算子2.Scharr算子3.Laplacian算子4.Canny算子 三、代码实现 一、图像边缘检测概述 图像边缘检测是一种重要的图像处理技术&#xff0c;用于定位二维或三维图像中对象的边缘。…

【运维监控】prometheus+node exporter+grafana 监控linux机器运行情况(1)

本示例是通过prometheus的node exporter收集主机的信息&#xff0c;然后在grafana的dashborad进行展示。本示例使用到的组件均是最新的&#xff0c;下文中会有具体版本说明&#xff0c;linux环境是centos。本示例分为四个部分&#xff0c;即prometheus、grafana、node exporter…

南京网站建设自己网站

南京是一座古老而又现代化的城市&#xff0c;拥有悠久的历史和文化底蕴。在这个信息时代&#xff0c;网站已经成为了企业和个人宣传推广的重要途径之一。南京网站建设作为一种推广方式&#xff0c;不仅能够展示企业形象&#xff0c;还能够传递信息、吸引客户、增加销售。 南京网…

Spring Boot-自定义banner

在 Spring Boot 应用中&#xff0c;你可以自定义启动时显示的 banner。这些 banner 可以包括图形、文字或者其他形式的标识。如图所示&#xff1a; 1. 使用 banner.txt 文件 默认情况下&#xff0c;Spring Boot 使用项目的 banner.txt 文件中的内容作为启动时的 banner。你可以…

计算机岗位(面试)

计算机岗位&#xff08;面试&#xff09; 计算机主要有哪几部分构成&#xff1f;计算机组成原理的内容&#xff1f; 计算机主要由‌硬件和软件‌两大部分构成。‌硬件部分包括五大基本组件&#xff1a;‌‌运算器、‌控制器、‌存储器、‌输入设备和输出设备‌‌。‌具体来说&a…