系列文章目录
vLLM (1) - Qwen2推理&部署
vLLM (2) - 架构总览
vLLM (3) - Sequence & SequenceGroup
vLLM (4) - LLMEngine上篇
vLLM (5) - LLMEngine下篇
vLLM (6) - Scheduler & BlockSpaceManager
文章目录
- 系列文章目录
- 前言
- 一、Scheduler
- 1.概述
- 2.Scheduler._schedule_default()回顾
- 3.Scheduler._schedule_prefills()
- 4.Scheduler._schedule_running()
- 5.Scheduler._schedule_swapped()
- 6.Scheduler._preempt()
- 7.Scheduler调度举例
- 8.Scheduler与BlockSpaceManager的关系
- 二、BlockSpaceManager
- 1.初始化
- 2.can_allocate() & allocate()
- 3.can_append_slots() & append_slots()
- 4.其他方法
- 三、CacheEngine
- 总结
前言
前面两篇展开讲解了LLMEngine
,涉及了Worker
、CacheEngine
、ModelRunner
和Scheduler
等核心组件,相信大家对其工作原理与流程已经有了相对清晰的认识。受限于篇幅,上一篇我们没有对Scheduler
彻底展开,这将会是本篇的重点。
回想一下vLLM
等大模型推理框架解决的问题是什么?当大模型开放给用户的时候,会面对大量请求,因此推理框架的目标就是通过高效利用现有资源(比如显存)等方式来增大吞吐量,降低延迟,以便快速给到用户响应。在vLLM
中,通过Scheduler
合理的调度方式,以及PageAttention
优化的GPU
使用率,完成了上述目标。而作为Scheduler
的重要组成部分,BlockSpaceManager
完成了内存块的管理。因而,本篇会围绕Scheduler
(调度逻辑)以及BlockSpaceManager
这两点展开。
一、Scheduler
1.概述
大模型推理框架中通过合理调度来增大吞吐量并非vLLM
首创,通过代码你会发现,vLLM
中的调度基于ORCA
团队提出的iteration-level scheduling,业界通常称为continuous batching
。
大模型推理的时候通常追求更大的batch
,以便充分发挥GPU
的并行性能。之前的大模型推理框架使用的batching
策略称为naive batching
或者static batching
。它是request-level
的调度,其执行逻辑是将一个batch
的输入喂给模型,需要等到所有序列均完成解码之后一起释放资源,进行下一个batch
。下图(来源于Anyscale)描述了这一过程,其中黄色块表示prompt token
,蓝色块表示生成的token
,左图为第一个迭代,右图表示batch
中所有序列完成。由于每个序列生成长度并相同,先完成的(较短的)需要等待其他序列完成后一起释放资源,GPU
没有得到充分的利用,如右图中空白区域所示。
continuous batching
则将调度细化到iteration-level
,意思是每经过一个迭代(生成一个token
)调度一次,对应到vLLM
代码就是之前讲过的LLMEngine.step()
这个方法。这种调度方式能够避免naive batching
等待所有序列完成才能释放资源的问题。如下图所示,在T5时刻S3序列已经完成,T6时刻(next iteration
)S3占用的显存被释放,S5加入batch
进行prefill
和decode
。这种策略极大的增加了吞吐量,降低了延时。
2.Scheduler._schedule_default()回顾
上一篇我们谈过,Scheduler
调度的朴素实现集中在self._schedule_default()
方法中,我们也已经论述过它的执行逻辑。现简要概括如下(代码和注释参考上一篇):
1)根据调度预算budget
,调度self.waiting
、self.running
以及self.swapped
;
2)只要self.waiting
中仍有请求,并且调度预算budget
允许,就优先调度self.waiting
,去做prefill
;
3)当2)不满足时,就调度self.running
和self.swapped
去做decode
,当然这两者有优先级之分;所以在vLLM
中,要么是做prefill
,要么是做decode
。
接下来我们来仔细看一下self._schedule_prefills()
、self._schedule_running()
以及self._schedule_swapped()
等重要方法。
3.Scheduler._schedule_prefills()
该方法的主干逻辑比较简单(代码和注释如下):对于waiting
队列中的请求,尽可能全部调度去做prefill
,但是受到如下几方面的制约:
prompt_limit
:调度配置中有两个参数限制了单个seq
的prompt
长度,1)在一个迭代中被处理的tokens
的最大数量max_num_batched_tokens
(涉及被调度的所有请求);2)模型可处理的一个seq
的最大长度max_model_len
,包括prompt
和生成的文本;取两者之间的更小值作为prompt_limit
;gpu_blocks
:gpu_blocks
是用来存储KV caches
的,没有充足的空闲的gpu_blocks
意味着没有空间可以缓存相应的KV caches
,因而需要通过self.block_manager.can_allocate(seq_group)
判断是否能够调度当前请求;budget
:调度预算限制主要体现在1)总的token
预算token_budget
,也就是上面提到的max_num_batched_tokens
;2)最大可处理的seq
的数量max_num_seqs
;
调度完成返回两部分:调度的结果,以及由于上述限制未能完成调度的部分,前者将会经历一次模型的前向过程,也就是做prefill
,后者等待下一次迭代的调度。
# vllm/core/scheduler.py
class Scheduler:
def _schedule_prefills(
self,
waiting_queue: deque, # waiting队列
budget: SchedulingBudget, # 调度预算
curr_loras: Optional[Set[int]],
enable_chunking: bool = False, # 是否使用chunked_prefill -> 我们默认是不用的
) -> Tuple[deque, SchedulerPrefillOutputs]:
"""
调度在预填充(prefill)阶段的sequence groups,
抢占(重新计算)会被视作新的prefill,
尽可能调度,直至调度预算不够为止
budget会跟着调度原位更新
"""
# 忽略的seq_groups,他们可能太长,直接忽略(但认为是被调度过,并完成了的,对应状态更新为FINISH_IGNORED)
ignored_seq_groups: List[SequenceGroup] = []
# 成功调度的seq_groups
seq_groups: List[SequenceGroup] = []
# 拷贝,防止原位修改
waiting_queue = deque([s for s in waiting_queue])
# 剩余未能被调度的请求,此项和lora相关,忽略
leftover_waiting_sequences: Deque[SequenceGroup] = deque()
while self._passed_delay(time.time()) and waiting_queue:
# waiting队列中选取第一个seq_group,且该seq_group只有一个seq序列,只包含prompt
seq_group = waiting_queue[0]
waiting_seqs = seq_group.get_seqs(status=SequenceStatus.WAITING)
assert len(waiting_seqs) == 1, (
"Waiting sequence group should have only one prompt "
"sequence.")
# 下一时刻需要计算的token数量,应该是整个prompt的长度
num_new_tokens = self._get_num_new_tokens(seq_group,
SequenceStatus.WAITING,
enable_chunking, budget)
# 验证下一时刻计算的token数等于prompt token数量
if not enable_chunking:
num_prompt_tokens = waiting_seqs[0].get_len()
assert num_new_tokens == num_prompt_tokens
# 超出prompt的长度限制,添加到ignored_seq_groups,序列seq状态设置为FINISHED_IGNORED
prompt_limit = self._get_prompt_limit(seq_group)
if num_new_tokens > prompt_limit:
logger.warning(
"Input prompt (%d tokens) is too long"
" and exceeds limit of %d", num_new_tokens, prompt_limit)
for seq in waiting_seqs:
seq.status = SequenceStatus.FINISHED_IGNORED
ignored_seq_groups.append(seq_group)
waiting_queue.popleft()
continue
# block_manager确定是否能为该seq_group分配资源
can_allocate = self.block_manager.can_allocate(seq_group)
if can_allocate == AllocStatus.LATER:
break
elif can_allocate == AllocStatus.NEVER:
logger.warning(
"Input prompt (%d tokens) is too long"
" and exceeds the capacity of block_manager",
num_new_tokens)
for seq in waiting_seqs:
seq.status = SequenceStatus.FINISHED_IGNORED
ignored_seq_groups.append(seq_group)
waiting_queue.popleft()
continue
# lora相关,忽略
lora_int_id = 0
if self.lora_enabled:
lora_int_id = seq_group.lora_int_id
assert curr_loras is not None
assert self.lora_config is not None
if (self.lora_enabled and lora_int_id > 0
and lora_int_id not in curr_loras
and len(curr_loras) >= self.lora_config.max_loras):
# We don't have a space for another LoRA, so
# we ignore this request for now.
leftover_waiting_sequences.appendleft(seq_group)
waiting_queue.popleft()
continue
# 预算已经用完,不能从waiting中调度更多的seq_group去做prefill,结束调度
num_new_seqs = seq_group.get_max_num_running_seqs()
if (num_new_tokens == 0 # ==0 这个好像没遇到
or not budget.can_schedule(num_new_tokens=num_new_tokens,
num_new_seqs=num_new_seqs)):
break
# 调度,并将序列的状态从WAITING改成RUNNING
if curr_loras is not None and lora_int_id > 0:
curr_loras.add(lora_int_id)
waiting_queue.popleft()
self._allocate_and_set_running(seq_group) # block_manager
seq_groups.append(
ScheduledSequenceGroup(seq_group=seq_group,
token_chunk_size=num_new_tokens))
# budget原位更新信息
budget.add_num_batched_tokens(seq_group.request_id, num_new_tokens)
budget.add_num_seqs(seq_group.request_id, num_new_seqs)
# 剩余未能被调度的请求
waiting_queue.extendleft(leftover_waiting_sequences)
if len(seq_groups) > 0:
self.prev_prompt = True
# 剩余未能被调度的请求,prefill调度情况
return waiting_queue, SchedulerPrefillOutputs(
seq_groups=seq_groups,
ignored_seq_groups=ignored_seq_groups,
num_lookahead_slots=self._get_num_lookahead_slots(is_prefill=True))
# num_lookahead_slots=0,我们没有使用lookahead
4.Scheduler._schedule_running()
该方法调度可以分为两种情况:
- 空闲的
gpu_blocks
充足:这种情况可以直接调度; - 空闲的
gpu_blocks
不足:假如可用的gpu_blocks
数量都小于seqs
的数量,那么就无法直接调度(通常假设一个seq
至少占用一个block
);这时候就涉及到抢占preempt
,具体操作是从running_queue
中优先级低的开始不断剔除,根据preempt
模式,将这些剔除的部分装在preempted
或者swapped_out
,前者对应的是重计算,而后者则是将这些请求暂时搬到cpu
上,等待机会搬回gpu
继续计算;
blocks_to_swap_out
:由于抢占,需要记录gpu_block
和(转去的)cpu_block
;
blocks_to_copy
:对于parallel sampling
场景,需要对一个输入采样生成多个输出,此时一个seq_group
中就会存在多个seqs
。刚开始这些seqs
共享相同的prompt
,也就可以共享相同的内存空间(blocks
);当采样的token
开始不一样时,就会在当前block
上触发复制机制,申请一块新的physical_block
,并且复制当前block
上的tokens
。上述过程就是所谓的copy-on-write
机制;blocks_to_copy
是用来记录copy
前后的这两个block
的。
class Scheduler:
# ...
def _schedule_running(
self,
running_queue: deque, # running队列
budget: SchedulingBudget, # 调度预算
curr_loras: Optional[Set[int]],
policy: Policy, # running队列的排序策略, 默认是fcfs
enable_chunking: bool = False, # 是否使用chunked prefill,默认不使用
) -> Tuple[deque, SchedulerRunningOutputs]:
"""
调度处于running状态的seq_groups
"""
# 被抢占之后需要swapp_out的block,gpu block number -> cpu block number
blocks_to_swap_out: List[Tuple[int, int]] = []
# 当最后一个block与别的seqs共享的时候,需要申请一个新的block,并且复制这些tokens
blocks_to_copy: List[Tuple[int, int]] = []
# 通常情况下都是已完成prefill,在做decode,也是我们遇到的情况
decode_seq_groups: List[ScheduledSequenceGroup] = []
# chunked prefill对应的情况,我们这边不涉及,一直都是空
prefill_seq_groups: List[ScheduledSequenceGroup] = []
# 被抢占的seq_group和被swap的seq_group,对应于两种不同的抢占模式
preempted: List[SequenceGroup] = []
swapped_out: List[SequenceGroup] = []
# NOTE(woosuk): Preemption happens only when there is no available slot
# to keep all the sequence groups in the RUNNING state.
# In this case, the policy is responsible for deciding which sequence
# groups to preempt.
now = time.time()
running_queue = policy.sort_by_priority(now, running_queue) # 根据策略优先排序
while running_queue:
seq_group = running_queue[0]
num_running_tokens = self._get_num_new_tokens(
seq_group, SequenceStatus.RUNNING, enable_chunking, budget)
if num_running_tokens == 0:
# token budget不够,跳出循环
break
running_queue.popleft()
while not self._can_append_slots(seq_group): # 使用block_manager判断
# 槽位不够,也就是新生成token存储kv cache的空间不够
# 预算中移除相应的seq_group,也就是实时更新buget
budget.subtract_num_batched_tokens(seq_group.request_id,
num_running_tokens)
num_running_seqs = seq_group.get_max_num_running_seqs()
budget.subtract_num_seqs(seq_group.request_id,
num_running_seqs)
# lora,忽略
if (curr_loras is not None and seq_group.lora_int_id > 0
and seq_group.lora_int_id in curr_loras):
curr_loras.remove(seq_group.lora_int_id)
if running_queue:
# 抢占最低优先级的seq_group
victim_seq_group = running_queue.pop()
preempted_mode = self._preempt(victim_seq_group,
blocks_to_swap_out)
# 根据抢占模式选择重计算还是转到cpu上
if preempted_mode == PreemptionMode.RECOMPUTE:
preempted.append(victim_seq_group)
else:
swapped_out.append(victim_seq_group)
else:
# 抢占当前的seq_group,也就是先直接干掉自己,并跳出循环
preempted_mode = self._preempt(seq_group,
blocks_to_swap_out)
if preempted_mode == PreemptionMode.RECOMPUTE:
preempted.append(seq_group)
else:
swapped_out.append(seq_group)
break
else:
# 刚上来一般资源充足,都直接进入这个分支。
# 有槽位,为该seq_group的seqs分配新的槽位;同时更新blocks_to_copy;
self._append_slots(seq_group, blocks_to_copy)
is_prefill = seq_group.is_prefill()
if is_prefill:
# chunked prefill的情况,忽略
prefill_seq_groups.append(
ScheduledSequenceGroup(
seq_group=seq_group,
token_chunk_size=num_running_tokens))
else:
# 常规decode
decode_seq_groups.append(
ScheduledSequenceGroup(seq_group=seq_group,
token_chunk_size=1))
# 更新budget
budget.add_num_batched_tokens(seq_group.request_id,
num_running_tokens)
# 这部分直接忽略
# OPTIMIZATION: Note that get_max_num_running_seqs is
# expensive. For the default scheduling chase where
# enable_chunking is False, num_seqs are updated before running
# this method, so we don't have to update it again here.
if enable_chunking:
num_running_seqs = seq_group.get_max_num_running_seqs()
budget.add_num_seqs(seq_group.request_id, num_running_seqs)
if curr_loras is not None and seq_group.lora_int_id > 0:
curr_loras.add(seq_group.lora_int_id)
return running_queue, SchedulerRunningOutputs(
decode_seq_groups=decode_seq_groups,
prefill_seq_groups=prefill_seq_groups,
preempted=preempted,
swapped_out=swapped_out,
blocks_to_swap_out=blocks_to_swap_out,
blocks_to_copy=blocks_to_copy,
num_lookahead_slots=self._get_num_lookahead_slots(
is_prefill=False))
5.Scheduler._schedule_swapped()
该方法与Scheduler._shedule_running()
非常类似,因为它两本质上是一样的,只是这边需要swap in
,代码和注释见下方。
class Scheduler:
# ...
def _schedule_swapped(
self,
swapped_queue: deque, # swapped队列,包含的是被swapped out的请求
budget: SchedulingBudget, # 调度预算
curr_loras: Optional[Set[int]],
policy: Policy, # swapped队列的排序策略
enable_chunking: bool = False, # 是否使用chunked prefill,默认不使用
) -> Tuple[deque, SchedulerSwappedInOutputs]:
"""
调度被swap out的sequence groups
"""
# 需要swapp_in的block,cpu block number -> gpu block number
blocks_to_swap_in: List[Tuple[int, int]] = []
# 当最后一个block与别的seqs共享的时候,需要申请一个新的block,并且复制这些tokens
blocks_to_copy: List[Tuple[int, int]] = []
# 同 _schedule_running()
decode_seq_groups: List[ScheduledSequenceGroup] = []
prefill_seq_groups: List[ScheduledSequenceGroup] = []
now = time.time()
swapped_queue = policy.sort_by_priority(now, swapped_queue)
# 不可行的seq_groups,类似于上面的ignored_seq_groups
infeasible_seq_groups: List[SequenceGroup] = []
# 剩余未能被调度的请求
leftover_swapped: Deque[SequenceGroup] = deque()
while swapped_queue:
seq_group = swapped_queue[0]
# block_manager确定是否能为该seq_group (swap in)分配资源
is_prefill = seq_group.is_prefill()
alloc_status = self.block_manager.can_swap_in(
seq_group, self._get_num_lookahead_slots(is_prefill))
if alloc_status == AllocStatus.LATER:
break
elif alloc_status == AllocStatus.NEVER:
logger.warning(
"Failing the request %s because there's not enough kv "
"cache blocks to run the entire sequence.",
seq_group.request_id)
for seq in seq_group.get_seqs():
seq.status = SequenceStatus.FINISHED_IGNORED
infeasible_seq_groups.append(seq_group)
swapped_queue.popleft()
continue
# lora,忽略
lora_int_id = 0
if self.lora_enabled:
lora_int_id = seq_group.lora_int_id
assert curr_loras is not None
assert self.lora_config is not None
if (lora_int_id > 0 and (lora_int_id not in curr_loras)
and len(curr_loras) >= self.lora_config.max_loras):
# We don't have a space for another LoRA, so
# we ignore this request for now.
leftover_swapped.appendleft(seq_group)
swapped_queue.popleft()
continue
# The total number of sequences in the RUNNING state should not
# exceed the maximum number of sequences.
num_new_seqs = seq_group.get_max_num_running_seqs()
num_new_tokens = self._get_num_new_tokens(seq_group,
SequenceStatus.SWAPPED,
enable_chunking, budget)
# budget无法调度, 同 _schedule_prefills
if (num_new_tokens == 0
or not budget.can_schedule(num_new_tokens=num_new_tokens,
num_new_seqs=num_new_seqs)):
break
if lora_int_id > 0 and curr_loras is not None:
curr_loras.add(lora_int_id)
# 调度,做swap in,并添加槽位
swapped_queue.popleft()
self._swap_in(seq_group, blocks_to_swap_in)
self._append_slots(seq_group, blocks_to_copy)
# 同 _schedule_running()
is_prefill = seq_group.is_prefill()
if is_prefill: # chunked prefill, 忽略
prefill_seq_groups.append(
ScheduledSequenceGroup(seq_group,
token_chunk_size=num_new_tokens))
else:
decode_seq_groups.append(
ScheduledSequenceGroup(seq_group, token_chunk_size=1))
# 更新预算
budget.add_num_batched_tokens(seq_group.request_id, num_new_tokens)
budget.add_num_seqs(seq_group.request_id, num_new_seqs)
# 剩余的无法完成调度的swapped_queue
swapped_queue.extendleft(leftover_swapped)
return swapped_queue, SchedulerSwappedInOutputs(
decode_seq_groups=decode_seq_groups,
prefill_seq_groups=prefill_seq_groups,
blocks_to_swap_in=blocks_to_swap_in,
blocks_to_copy=blocks_to_copy,
num_lookahead_slots=self._get_num_lookahead_slots(
is_prefill=False),
infeasible_seq_groups=infeasible_seq_groups,
)
6.Scheduler._preempt()
前面提到,抢占preempt
是self._schedule_running
中的重要环节,它的目的是暂时舍弃优先级低的seqs
,确保优先级较高的部分有充足的blocks
或者slots
使用。根据不同的抢占模式(重计算RECOMPUTE
和置换SWAP
)执行不同的(被)抢占逻辑,需要记录block
的交换情况,更新seq
的状态。
def _preempt(
self,
seq_group: SequenceGroup,
blocks_to_swap_out: List[Tuple[int, int]],
preemption_mode: Optional[PreemptionMode] = None,
) -> PreemptionMode:
# If preemption mode is not specified, we determine the mode as follows:
# We use recomputation by default since it incurs lower overhead than
# swapping. However, when the sequence group has multiple sequences
# (e.g., beam search), recomputation is not currently supported. In
# such a case, we use swapping instead.
# FIXME(woosuk): This makes our scheduling policy a bit bizarre.
# As swapped sequences are prioritized over waiting sequences,
# sequence groups with multiple sequences are implicitly prioritized
# over sequence groups with a single sequence.
# TODO(woosuk): Support recomputation for sequence groups with multiple
# sequences. This may require a more sophisticated CUDA kernel.
# 根据上面说的,确定抢占模式
if self.user_specified_preemption_mode is None:
if seq_group.get_max_num_running_seqs() == 1:
preemption_mode = PreemptionMode.RECOMPUTE
else:
preemption_mode = PreemptionMode.SWAP
elif self.user_specified_preemption_mode == "swap":
preemption_mode = PreemptionMode.SWAP
else:
preemption_mode = PreemptionMode.RECOMPUTE
# 发生过多抢占,发出提醒
if self.num_cumulative_preemption % 50 == 0:
logger.warning(
"Sequence group %s is preempted by %s mode because there is "
"not enough KV cache space. This can affect the end-to-end "
"performance. Increase gpu_memory_utilization or "
"tensor_parallel_size to provide more KV cache memory. "
"total_num_cumulative_preemption=%d", seq_group.request_id,
preemption_mode, self.num_cumulative_preemption + 1)
self.num_cumulative_preemption += 1
# 根据不同的抢占模式执行不同的(被)抢占逻辑
if preemption_mode == PreemptionMode.RECOMPUTE:
self._preempt_by_recompute(seq_group)
elif preemption_mode == PreemptionMode.SWAP:
self._preempt_by_swap(seq_group, blocks_to_swap_out)
else:
raise AssertionError("Invalid preemption mode.")
return preemption_mode
def _preempt_by_recompute(
self,
seq_group: SequenceGroup,
) -> None:
"""重计算"""
seqs = seq_group.get_seqs(status=SequenceStatus.RUNNING)
assert len(seqs) == 1
# 将seqs/seq_group恢复成waiting时期的状态,并将相应的block释放出来
for seq in seqs:
seq.status = SequenceStatus.WAITING
self.free_seq(seq)
seq.reset_state_for_recompute()
def _preempt_by_swap(
self,
seq_group: SequenceGroup,
blocks_to_swap_out: List[Tuple[int, int]],
) -> None:
"""交换至cpu"""
self._swap_out(seq_group, blocks_to_swap_out)
def _swap_in(
self,
seq_group: SequenceGroup,
blocks_to_swap_in: List[Tuple[int, int]],
) -> None:
"""
cpu -> gpu,在self._schedule_swapped()中被调用
"""
# 更新swap_in,同时将seq状态设置为RUNNING
mapping = self.block_manager.swap_in(seq_group)
blocks_to_swap_in.extend(mapping)
for seq in seq_group.get_seqs(status=SequenceStatus.SWAPPED):
seq.status = SequenceStatus.RUNNING
def _swap_out(
self,
seq_group: SequenceGroup,
blocks_to_swap_out: List[Tuple[int, int]],
) -> None:
"""
gpu -> cpu
"""
if not self.block_manager.can_swap_out(seq_group):
# FIXME(woosuk): Abort the sequence group instead of aborting the
# entire engine.
raise RuntimeError(
"Aborted due to the lack of CPU swap space. Please increase "
"the swap space to avoid this error.")
# 更新swap_out,同时将seq状态设置为SWAPPED
mapping = self.block_manager.swap_out(seq_group)
blocks_to_swap_out.extend(mapping)
for seq in seq_group.get_seqs(status=SequenceStatus.RUNNING):
seq.status = SequenceStatus.SWAPPED
7.Scheduler调度举例
为了更形象的说明Scheduler
的调度,我们这里举个小例子。假设用户请求有300条,每条的prompt tokens
个数是[27, 30, 24, 27, 30, 24, ...]
调度预算budget
的max_num_seqs
是256,max_num_batched_tokens
是2048。
1)self._schedule_prefills()
调度self.waiting
,从300条请求中按照时间顺序优先级调度了75条去做prefill
;为什么是75条?75条的token
数量为27 + 30 + 24 + ... = 2025 < 2048
,如果是76条就超过了max_num_batched_tokens
;
2)由于budget
的max_num_seqs
是256,而1)中只调度了75条,所以继续执行1)中操作3次(分3个iteration
完成),调度请求个数为75 + 75 + 75 + 31 = 256
,它们做完prefill
之后状态都修改成了RUNNING
,剩余44条仍然在self.waiting
中;
3)self._schedule_runnings()
调度self.running
(256条);self.swapped
此时还是空的,因此不需要调度;
4)多次执行3),如果存在free gpu blocks
不够的情况,则swap out
;如果有部分请求已完成,也就是处于RUNNING
状态的seq
个数少于max_num_seqs
,可以继续执行1)做prefill
。
总结一下:有可以做prefill
的请求是尽可能做prefill
,否则做decode
。
8.Scheduler与BlockSpaceManager的关系
Scheduler
的调度逻辑基本涵盖在以上几个方法中,但同时需要self.block_manager
对内存进行管理。为了更直观简洁的理解Scheduler
和BlockSpaceManager
之间的关系,我将两者的方法做成了一张表格如下。第一列是Scheduler
的上层方法;第二列是Scheduler
的底层的方法,通常被第一列的上层方法调用;第三列是BlockSpaceManager
的方法,和第二列有明显的对应关系,所以说block_manager
是scheduler
中非常重要的部分。
二、BlockSpaceManager
BlockSpaceManager
是我们用来管理逻辑块和物理块的一个类,BlockSpaceManagerV1
是它的一个子类,也是本次我们所用到的。我们将对其方法展开说一下,也是完备我们对Scheduler
整个过程的理解。
1.初始化
BlockSpaceManagerV1
初始化部分如下,创建了self.gpu_allocator
以及self.cpu_allocator
这两个内存分配器,由于我们这边场景相对简单,上述内存分配器是UncachedBlockAllocator
的实例化对象。那么self.gpu_allocator
的工作是什么呢?在开始的时候根据可用的显存创建物理块physical blocks
,当调度发生时,需要申请物理块,或者释放物理块,统计闲置的物理块,就这么简单。
# vll/core/block_manager_v1.py
class BlockSpaceManagerV1(BlockSpaceManager):
"""
管理逻辑块和物理块之间的映射
"""
def __init__(
self,
block_size: int, # block大小,默认16
num_gpu_blocks: int, # 之前获取的gpu块的个数
num_cpu_blocks: int, # 之前获取的cpu块的个数
watermark: float = 0.01, # 水位线,用于留存一部分blocks
sliding_window: Optional[int] = None, # 是否使用sliding window
enable_caching: bool = False, # 是否使用chunked prefill
) -> None:
self.block_size = block_size
self.num_total_gpu_blocks = num_gpu_blocks
self.num_total_cpu_blocks = num_cpu_blocks
# 忽略
if enable_caching and sliding_window is not None:
raise NotImplementedError(
"Sliding window is not allowed with prefix caching enabled!")
self.block_sliding_window = None
if sliding_window is not None:
# Round up to nearest block size to regularize sliding window
# allocation sizes.
self.block_sliding_window = math.ceil(sliding_window / block_size)
self.watermark = watermark
assert watermark >= 0.0
self.enable_caching = enable_caching
self.watermark_blocks = int(watermark * num_gpu_blocks)
if self.enable_caching:
# 该情况下使用CachedBlockAllocator来分配block,我们暂时不涉及
logger.info("Automatic prefix caching is enabled.")
self.gpu_allocator: BlockAllocatorBase = CachedBlockAllocator(
Device.GPU, block_size, num_gpu_blocks)
self.cpu_allocator: BlockAllocatorBase = CachedBlockAllocator(
Device.CPU, block_size, num_cpu_blocks)
else:
# 常规情况,使用UncachedBlockAllocator
self.gpu_allocator = UncachedBlockAllocator(
Device.GPU, block_size, num_gpu_blocks)
self.cpu_allocator = UncachedBlockAllocator(
Device.CPU, block_size, num_cpu_blocks)
# 记录seq_i到BlockTable的映射关系,也就是该seq使用了哪些block
self.block_tables: Dict[int, BlockTable] = {}
# cross-attention涉及的由seqence_group.req_id到BlockTable的映射,只在encoder-decoder模型中才会用到
# Note that each SequenceGroup has a unique request ID
self.cross_block_tables: Dict[str, BlockTable] = {}
2.can_allocate() & allocate()
在Scheduler._schedule_prefills()
方法中,同时调用了self.can_allocate()
以及self.allocate()
方法。self.can_allocate()
判断当前是否有足够的空闲的gpu_blocks
分配给当前seq_group
做prefill
;在实际计算时,一般不会打满所有的gpu_blocks
而是留存一些,也就是这边的self.watermark_blocks
。
def BlockSpaceManagerV1(BlockSpaceManager):
# ...
def can_allocate(self, seq_group: SequenceGroup) -> AllocStatus:
"""
判断当前是否有足够的空闲的gpu_blocks可以分配
"""
# FIXME(woosuk): Here we assume that all sequences in the group share
# the same prompt. This may not be true for preempted sequences.
# 对于encoder-decoder模型的一些限制,跳过
check_no_caching_or_swa_for_blockmgr_encdec(self, seq_group)
# 计算所需的blocks数量
self_num_required_blocks = self._get_seq_num_required_blocks(
seq_group.get_seqs(status=SequenceStatus.WAITING)[0])
cross_num_required_blocks = self._get_seq_num_required_blocks(
seq_group.get_encoder_seq())
num_required_blocks = self_num_required_blocks + \
cross_num_required_blocks
if self.block_sliding_window is not None:
num_required_blocks = min(num_required_blocks,
self.block_sliding_window)
# 获取当前空闲的blocks数量
num_free_gpu_blocks = self.gpu_allocator.get_num_free_blocks()
# 使用水位线来防止频繁的缓存驱逐,留点家底缓冲
if (self.num_total_gpu_blocks - num_required_blocks <
self.watermark_blocks):
# 所需超过总的blocks数量,直接舍弃
return AllocStatus.NEVER
if num_free_gpu_blocks - num_required_blocks >= self.watermark_blocks:
# blocks充裕,可以直接分配
return AllocStatus.OK
else:
# 空闲的blocks不够,需要等待后续释之后才能分配
return AllocStatus.LATER
在self.can_allocate()
返回AllocStatus.OK
,也就是有充足的free_gpu_blocks
的情况下,可使用self.allocate()
方法为当前seq_group
分配gpu blocks
,我们走的分支非常简单(见代码和注释),通过self.gpu_allocator
分配一个free gpu block
,并更新其ref_count
(表示被几个seq
共用)。
def BlockSpaceManagerV1(BlockSpaceManager):
# ...
def allocate(self, seq_group: SequenceGroup) -> None:
"""
为seq_group分配gpu blocks
"""
# 对于encoder-decoder模型的一些限制,跳过
is_encoder_decoder = seq_group.is_encoder_decoder()
check_no_caching_or_swa_for_blockmgr_encdec(self, seq_group)
# Allocate decoder sequences
seq = seq_group.get_seqs(status=SequenceStatus.WAITING)[0]
block_table: BlockTable = \
self._allocate_sequence(seq,
seq_group.num_seqs(),
is_encoder_decoder)
# Assign the self-attention block tables for each sequence.
for seq in seq_group.get_seqs(status=SequenceStatus.WAITING):
self.block_tables[seq.seq_id] = block_table.copy()
# Allocate encoder sequence
if is_encoder_decoder:
# A SequenceGroup has only a single encoder sequence (at most),
# thus allocate with a ref count of 1
block_table = self._allocate_sequence(seq_group.get_encoder_seq(),
1, is_encoder_decoder)
# Assign the cross-attention block table for the SequenceGroup.
self.cross_block_tables[seq_group.request_id] = block_table
def _allocate_sequence(self, \
seq: Sequence, \
ref_count: int, \
is_encoder_decoder: bool = True) -> BlockTable:
"""
为prompt tokens申请新的物理块
"""
# 根据逻辑块分配/更新物理块
num_prompt_blocks = len(seq.logical_token_blocks)
block_table: BlockTable = [] # BlockTable = List[PhysicalTokenBlock]
for logical_idx in range(num_prompt_blocks):
# 前两种情况都不是常规情况,忽略
if (self.block_sliding_window is not None
and logical_idx >= self.block_sliding_window):
block = block_table[logical_idx % self.block_sliding_window]
# Set the reference counts of the token blocks.
block.ref_count = ref_count
elif not is_encoder_decoder and self.enable_caching:
# 这种对应着CachedBlockAllocator
block = self.gpu_allocator.allocate(
seq.hash_of_block(logical_idx),
seq.num_hashed_tokens_of_block(logical_idx))
else:
# 【走这条分支】非常简单的分配,并且为分配的block更新其ref_count,也就是该block为ref_count个seq共享
block = self.gpu_allocator.allocate()
block.ref_count = ref_count
block_table.append(block)
return block_table
3.can_append_slots() & append_slots()
这两个方法是在Scheduler._schedule_running()
和Scheduler._schedule_swapped()
中使用的,也就是在decode
阶段,每次生成一个token
,那么每个seq
每次需要的是申请一个槽位slot
而不是像prefill
阶段那样申请若干个blocks
。
这边讲一下self.append_slot()
这个方法。seq
的逻辑块logical_token_blocks
的更新是要早于物理块physical_token_blocks
(在下述代码中用block_table
表示)的更新的,调用这个方法的时候,逻辑块已经完成了更新,也就是已经考虑新的token
,而物理块还处理待更新状态,这时候就分为两种情况:
1)当前block_table
中最后一个block
还有槽位,可以直接使用;
2)最后一个block
的block_size
个槽位全部使用完,这时就必须使用self._allocate_last_physical_block(seq)
新开一个物理块;从代码来看,该情况对应的分支就是len(block_table) < len(logical_blocks)
,也就是物理块比逻辑块少一块。
最后要说的是,self.append_slot()
的返回结果是blocks_to_copy
,也就是记录了copy-on-write
的信息。
def BlockSpaceManagerV1(BlockSpaceManager):
# ...
def can_append_slots(self,
seq_group: SequenceGroup,
num_lookahead_slots: int = 0) -> bool:
"""
对于该seq_group,是否有充足的空间(槽位slots)以支持继续生成
"""
assert (num_lookahead_slots == 0
), "lookahead allocation not supported in BlockSpaceManagerV1"
# Simple heuristic: If there is at least one free block
# for each sequence, we can append.
# 当每个seq至少能分配一个空闲的block时,可以选择append
num_free_gpu_blocks = self.gpu_allocator.get_num_free_blocks()
num_seqs = seq_group.num_seqs(status=SequenceStatus.RUNNING)
return num_seqs <= num_free_gpu_blocks
def append_slots(
self,
seq: Sequence,
num_lookahead_slots: int = 0,
) -> List[Tuple[int, int]]:
"""
为新的token分配一个槽位
"""
# seq占用的逻辑块 & 物理块
logical_blocks = seq.logical_token_blocks
block_table = self.block_tables[seq.seq_id]
# 逻辑块个数大于物理块(比如当前token数是33,block_size=16, 逻辑块刚好开始第3块,物理块第2块刚好被填满,需要申请第3块)
if len(block_table) < len(logical_blocks):
assert len(block_table) == len(logical_blocks) - 1
# 该分支忽略
if (self.block_sliding_window
and len(block_table) >= self.block_sliding_window):
# reuse a block
block_table.append(block_table[len(block_table) %
self.block_sliding_window])
else:
# 分配一个新的物理块
new_block = self._allocate_last_physical_block(seq)
block_table.append(new_block)
# 这边返回的是blocks_to_copy
return []
# 在最后一个物理块中分配一个槽位给新的token
last_block = block_table[-1]
assert last_block.device == Device.GPU
if last_block.ref_count == 1:
# 当前block没有被其他seqs共享,可以直接分配
# 忽略
if self.enable_caching:
# If the last block is now complete, we may reuse an old block to save memory.
maybe_new_block = self._maybe_promote_last_block(
seq, last_block)
block_table[-1] = maybe_new_block
return []
else:
# 当前block被其他seqs共享,需要使用Copy on Write
# 分道扬镳,直接独开一个物理块,作为该seq新的最后一个物理块
new_block = self._allocate_last_physical_block(seq)
block_table[-1] = new_block
# 这边free只是将ref_count减去1,而不是直接完全释放成空闲的block
self.gpu_allocator.free(last_block)
# 返回copy的src和dst的block信息
return [(last_block.block_number, new_block.block_number)]
4.其他方法
BlockSpaceManagerV1
还包含其他方法,诸如self.swap_out
、self.swap_in()
和self.free()
,此处不再一一展开,感兴趣的可以自行阅读源码。
三、CacheEngine
值得一提的是,BlockSpaceManger
只是做了一个block
的分配,是一种指派或者说是安排。这些block
实际存储指定tokens
的kv caches
是在模型执行部分由CacheEngine
来完成的,这里就不展开了。
总结
本篇展开讲述了Scheduler
以及BlockSpaceManager
的工作原理和过程。至此,相信大家对vLLM
框架的整个流程有了较为完整清晰的认识。当然,vLLM
中还有很多tricks
本系列尚未涉及,比如Prefix Caching
、Speculative Decoding
等等,这些部分可能会单独开一些系列并作为补充链接放在下一篇。