系列文章目录
vLLM (1) - Qwen2推理&部署
vLLM (2) - 架构总览
vLLM (3) - Sequence & SequenceGroup
vLLM (4) - LLMEngine上篇
vLLM (5) - LLMEngine下篇
文章目录
- 系列文章目录
- 前言
- 一、类图
- 二、LLM._validate_and_add_requests()
- 1.LLM
- 2.LLMEngine
- 3.Scheduler
- 三、LLM._run_engine()
- 1.LLM
- 2.LLMEngine
- 3.Scheduler
- 总结
前言
前面一篇已经讲了LLM
和LLMEngine
的初始化阶段,包括设备初始化,模型加载和cache
初始化等等,本篇来讲解LLMEngine
的生成阶段,功能包括请求的调度以及模型的推理,初步分析调度器Scheduler
是怎么工作的,也就是下图的Scheduler
部分。
一、类图
老规矩,上类图。不过,这边放两个类图,对应两个不同的动作,第一个是将请求requests
添加到LLMEngine
,第二个就是执行LLMEngine
,也就是调度使得所有requests
尽快得到输出responses
,直到结束。
# 类图1:添加请求
+-------------------------+
| LLM |
+-------------------------+
| + llm_engine: LLMEngine |
| + generate(): List[RequestOutput] | # 生成
| - _validate_and_add_requests() | # 被self.generate()调用,用于给llm_engine添加requests
| - _run_engine() | # 调度解码
+-------------------------+
|
|
v
+-------------------------+
| LLMEngine |
+-------------------------+
| + scheduler: Scheduler | # 调度器
| + add_request() | # 添加request,为LLM._validate_and_requests()调用
| + process_model_input(): LLMInputs | # 将输入转换成需要的形式,为self.add_request()调用
| - _add_processed_request() | # 经过self.process_model_input()处理的输入,使用该方法添加该request
+-------------------------+
|
|
v
+-------------------------+
| Scheduler |
+-------------------------+
| + add_seq_group() | # 经过上述处理的输入用来创建了SequenceGroup对象,该对象被添加到了Scheduler中
+-------------------------+
# 类图2:调度和模型推理
+-------------------------+
| LLM |
+-------------------------+
| + llm_engine: LLMEngine |
| + generate(): List[RequestOutput] |
| - _validate_and_add_requests() |
| - _run_engine() |
+-------------------------+
|
|
v
+-------------------------+
| LLMEngine |
+-------------------------+
| + scheduler: Scheduler |
| + output_processer | # 输出处理器
| + model_executor: GPUExecutor | # 执行器
| + has_unfinished_request(): bool | # 确认是否存在没有解码完成的请求
| + step(): List[Union[RequestOutput, EmbeddingRequestOutput]] | # 执行一次解码迭代,返回新的输出
| - _process_model_outputs(): List[Union[RequestOutput, EmbeddingRequestOutput]] | # 输出后处理
+-------------------------+
|
|
v
+-------------------------+
| Scheduler |
+-------------------------+
| + waiting: Deque[SequenceGroup] | # waiting队列,等待被调度的请求
| + running: Deque[SequenceGroup] | # running队列,当前可被调度解码的请求
| + swapped: Deque[SequenceGroup] | # 由于显存压力暂时被交换到cpu内存上的请求
| + block_manager: Union[BlockSpaceManagerV1, BlockSpaceManagerV2, EmbeddingModelBlockSpaceManager] | # 块管理器,管理逻辑块和物理块的映射
| + add_sequence_group() | # 上面已经说过
| + schedule(): Tuple[List[SequenceGroupMetadata], SchedulerOutputs] | # 调度waiting、running和swapped,为LLMEngine.step()调用
| - _schedule(): SchedulerOutputs | # 为self.schedule()调用
| - _schedule_default(): SchedulerOutputs | # 为self._schedule()调用,是默认的调度方式
+-------------------------+
二、LLM._validate_and_add_requests()
1.LLM
LLM.generate()
方法,针对用户输入,生成文本并返回给用户,如下面代码所示。该方法除了输入输出处理步骤,关键部分交给了self._validate_and_add_requests()
和self._run_engine()
方法。
class LLM:
def __init__(self, ...):
# ...
self.llm_engine = LLMEngine.from_engine_args(
engine_args, usage_context=UsageContext.LLM_CLASS)
self.request_counter = Counter() # 请求计数器
def generate(
self,
prompts: Union[Union[PromptStrictInputs, Sequence[PromptStrictInputs]],
Optional[Union[str, List[str]]]] = None,
sampling_params: Optional[Union[SamplingParams,
Sequence[SamplingParams]]] = None,
prompt_token_ids: Optional[Union[List[int], List[List[int]]]] = None,
use_tqdm: bool = True,
lora_request: Optional[Union[List[LoRARequest], LoRARequest]] = None,
) -> List[RequestOutput]:
"""Generates the completions for the input prompts.
This class automatically batches the given prompts, considering
the memory constraint. For the best performance, put all of your prompts
into a single list and pass it to this method.
Args:
inputs: A list of inputs to generate completions for.
sampling_params: The sampling parameters for text generation. If
None, we use the default sampling parameters.
When it is a single value, it is applied to every prompt.
When it is a list, the list must have the same length as the
prompts and it is paired one by one with the prompt.
use_tqdm: Whether to use tqdm to display the progress bar.
lora_request: LoRA request to use for generation, if any.
Returns:
A list of `RequestOutput` objects containing the
generated completions in the same order as the input prompts.
Note:
Using ``prompts`` and ``prompt_token_ids`` as keyword parameters is
considered legacy and may be deprecated in the future. You should
instead pass them via the ``inputs`` parameter.
"""
# 本文不涉及embedding_mode,忽略
if self.llm_engine.model_config.embedding_mode:
raise ValueError(
"LLM.generate() is only supported for generation models "
"(XForCausalLM).")
# 针对不同的输入,做一下数据转换
if prompt_token_ids is not None:
inputs = self._convert_v1_inputs(
prompts=cast(Optional[Union[str, List[str]]], prompts),
prompt_token_ids=prompt_token_ids,
)
else:
inputs = cast(
Union[PromptStrictInputs, Sequence[PromptStrictInputs]],
prompts)
# 采样参数,包括temperature,topk这些
if sampling_params is None:
# Use default sampling params.
sampling_params = SamplingParams()
# LLMEngine对这些请求的输入做处理,并且为它们创建sequence group,把它们交给调度器scheduler
self._validate_and_add_requests(
inputs=inputs,
params=sampling_params,
lora_request=lora_request,
)
# 使用LLMEngine合理调度,给到用户响应
outputs = self._run_engine(use_tqdm=use_tqdm)
return LLMEngine.validate_outputs(outputs, RequestOutput)
# ------------- self._validate_and_add_requests() -------------
def _validate_and_add_requests(
self,
inputs: Union[PromptStrictInputs, Sequence[PromptStrictInputs]],
params: Union[SamplingParams, Sequence[SamplingParams], PoolingParams,
Sequence[PoolingParams]],
lora_request: Optional[Union[Sequence[LoRARequest], LoRARequest]],
) -> None:
if isinstance(inputs, (str, dict)):
# Convert a single prompt to a list.
inputs = [inputs]
num_requests = len(inputs)
# 采样个数应该和输入的prompt个数一致
if isinstance(params, list) and len(params) != num_requests:
raise ValueError("The lengths of prompts and params "
"must be the same.")
# 本文不涉及,忽略
if isinstance(lora_request,
list) and len(lora_request) != num_requests:
raise ValueError("The lengths of prompts and lora_request "
"must be the same.")
# 为llm_engine添加请求
for i, request_inputs in enumerate(inputs):
self._add_request(
request_inputs,
params[i] if isinstance(params, Sequence) else params,
lora_request=lora_request[i] if isinstance(
lora_request, Sequence) else lora_request,
)
def _add_request(
self,
inputs: PromptInputs,
params: Union[SamplingParams, PoolingParams],
lora_request: Optional[Union[List[LoRARequest], LoRARequest]] = None,
) -> None:
# 当前请求id
request_id = str(next(self.request_counter))
self.llm_engine.add_request(request_id,
inputs,
params,
lora_request=lora_request)
2.LLMEngine
LLM._validate_and_add_requests()
:(验证和)添加请求。具体来讲,当请求到来,使用LLMEngine.process_model_inputs()
对原始的输入做些处理,在此基础上创建SequenceGroup
,以便scheduler
调度,这部分由LLMEngine
来完成,如下面代码所示:
class LLMEngine
# ...
def add_request(
self,
request_id: str, # 请求id
inputs: PromptInputs, # 输入
params: Union[SamplingParams, PoolingParams], # 采样参数
arrival_time: Optional[float] = None, # 请求抵达时间
lora_request: Optional[LoRARequest] = None,
) -> None:
"""将请求添加到这个engine的请求池中,
当使用engine.step()时,scheduler会进行调度,去处理这个请求
"""
# lora相关,忽略
if lora_request is not None and not self.lora_config:
raise ValueError(f"Got lora_request {lora_request} but LoRA is "
"not enabled!")
if arrival_time is None:
arrival_time = time.time()
# 处理输入,将输入数据转换成LLMInput的实例
processed_inputs = self.process_model_inputs(request_id=request_id,
inputs=inputs,
lora_request=lora_request)
# 根据处理过的输入添加请求,也就是把sequence group交给scheduler
self._add_processed_request(
request_id=request_id,
processed_inputs=processed_inputs,
params=params,
arrival_time=arrival_time,
lora_request=lora_request,
)
def _add_processed_request(
self,
request_id: str,
processed_inputs: LLMInputs,
params: Union[SamplingParams, PoolingParams],
arrival_time: float,
lora_request: Optional[LoRARequest],
) -> None:
""" 针对每一个request,向调度器中添加sequence group """
# 根据输入创建sequence
block_size = self.cache_config.block_size # 默认16
seq_id = next(self.seq_counter)
eos_token_id = self._get_eos_token_id(lora_request)
seq = Sequence(seq_id, processed_inputs, block_size, eos_token_id,
lora_request)
# 创建SequenceGroup
if isinstance(params, SamplingParams): # 正常我们都是采样,走这个分支
seq_group = self._create_sequence_group_with_sampling(
request_id,
seq,
params,
arrival_time=arrival_time,
lora_request=lora_request,
)
elif isinstance(params, PoolingParams):
seq_group = self._create_sequence_group_with_pooling(
request_id,
seq,
params,
arrival_time=arrival_time,
lora_request=lora_request,
)
else:
raise ValueError(
"Either SamplingParams or PoolingParams must be provided.")
# 向调度器中添加sequence group,添加很简单,后面会看到
self.scheduler.add_seq_group(seq_group)
def process_model_inputs(
self,
request_id: str,
inputs: PromptInputs,
lora_request: Optional[LoRARequest] = None,
) -> LLMInputs:
# 将PromptInputs转成LLMInputs
if isinstance(inputs, str):
inputs = {"prompt": inputs}
# 获取输入和tokens
if "prompt_token_ids" not in inputs:
tokenizer = self.get_tokenizer_group("prompts must be None if skip_tokenizer_init is True")
prompt_token_ids = tokenizer.encode(request_id=request_id,
prompt=inputs["prompt"],
lora_request=lora_request)
else:
prompt_token_ids = inputs["prompt_token_ids"]
return LLMInputs(prompt_token_ids=prompt_token_ids, # 只简单包含这3项数据
prompt=inputs.get("prompt"),
multi_modal_data=inputs.get("multi_modal_data"))
3.Scheduler
下面代码展示了Scheduler.add_seq_group()
方法,非常简单,只是将seq_group
添加到self.waiting
中,等待被调度。
class Scheduler:
# ...
def add_seq_group(self, seq_group: SequenceGroup) -> None:
# Add sequence groups to the waiting queue.
self.waiting.append(seq_group)
至此,所有的requests已经被添加。
三、LLM._run_engine()
1.LLM
LLM._run_engine()
代码如下所示,它的使命就是使用self.llm_engine.step()
方法,不断调度队列中的请求,并生成新的token
直至所有的请求全部响应完成。step
表示执行一步,或者执行一个迭代,也就是生成下一个token
,经过多次的step
,一些较短的序列就采样完成了,这时候self.llm_engine
中就出现了越来越多的finished_requests
,直到所有请求全部完成,那就可以返回输出结果了。
class LLM:
# ...
def _run_engine(
self, *, use_tqdm: bool
) -> List[Union[RequestOutput, EmbeddingRequestOutput]]:
"""
执行engine,根据策略(requests优先级)调度解码
"""
# 忽略了无关代码
outputs: List[Union[RequestOutput, EmbeddingRequestOutput]] = [] # 输出
while self.llm_engine.has_unfinished_requests():
# 只要还存在未解码结束的请求,就要执行一次解码迭代,生成下一个token
step_outputs = self.llm_engine.step()
for output in step_outputs:
if output.finished:
outputs.append(output)
# 按照request_id排序
return sorted(outputs, key=lambda x: int(x.request_id))
2.LLMEngine
但是,我们目前还有个问题没有考虑,那就是调度。一次能够同时处理的请求数量是有限的,受到最大可处理请求数量,以及可以用显存等因素的制约。举一个最简单的例子,我给到vllm
的请求有300个,但是vllm
能同时处理的请求是256个,这是剩余的44个就只能在外边等着。等self.llm_engine.step()
完成多个迭代,假设有10个请求完成了,那我们应该怎么办?显然,应该清理掉这些finished_requests
,从44个嗷嗷待哺的requests
中选取前面10个,和unfinished_requests
放一起处理。因此,LLMEngine.step()
(如下面代码所示)应该先执行self.scheduler.schedule()
完成这一个迭代的调度,然后执行模型前向过程self.model_executor.execute_model()
。
class LLMEngine
# ...
def step(self) -> List[Union[RequestOutput, EmbeddingRequestOutput]]:
"""执行一次解码decoding迭代,返回新生成的结果"""
# 调度 在下一个迭代中需要被执行序列,以及需要被swapped in/out/copy的token blocks
seq_group_metadata_list, scheduler_outputs = self.scheduler.schedule()
# 调度完之后使用model_executor执行模型,处理这些请求
if not scheduler_outputs.is_empty():
execute_model_req = ExecuteModelRequest(
seq_group_metadata_list=seq_group_metadata_list,
blocks_to_swap_in=scheduler_outputs.blocks_to_swap_in,
blocks_to_swap_out=scheduler_outputs.blocks_to_swap_out,
blocks_to_copy=scheduler_outputs.blocks_to_copy,
num_lookahead_slots=scheduler_outputs.num_lookahead_slots,
running_queue_size=scheduler_outputs.running_queue_size,
)
output = self.model_executor.execute_model(
execute_model_req=execute_model_req)
else:
output = []
# 对模型输出做处理
request_outputs = self._process_model_outputs(
output, scheduler_outputs.scheduled_seq_groups,
scheduler_outputs.ignored_seq_groups, seq_group_metadata_list)
if not request_outputs:
# Stop the execute model loop in parallel workers until there are
# more requests to process. This avoids waiting indefinitely in
# torch.distributed ops which may otherwise timeout, and unblocks
# the RPC thread in the workers so that they can process any other
# queued control plane messages, such as add/remove lora adapters.
self.model_executor.stop_remote_worker_execution_loop()
return request_outputs
3.Scheduler
终于来到了调度环节,代码如下所示。为了不影响我们分析最关键的部分,先把Scheduler.schedule()
方法中的其他代码忽略掉了。关注的路线是:self.schedule()
->self._schedule()
-> self._schedule_default()
。self._schedule_default()
部分代码如下所示,我们来仔细分析一下这个方法。
- 初始化调度预算:使用
SchedulingBudget()
创建预算budget
,预算限制为token_budget
和max_num_seqs
; - 更新调度预算:
self.running
是上一次LLMEngine.step()
过后还处于RUNNING
状态的这些sequence_groups
构成的队列,这部分理应占用一部分预算,budget.add_num_seqs()
将j记录/更新预算中已被占用的sequence
个数; - 初始化本次调度的结果:比如通过本次调度,我们在
self.waiting
中依然会有一些sequence_groups
处于等待状态,那么我们把它称为remaining_waiting
,当前将self.waiting
赋值给它;同时self.waiting
中另一部分是被调度成功的,也就是可以喂给模型做生成了,那么这部分调度结果被初始化为代码中的prefills
;同理,running_scheduled
被调度来执行下一次step
的,remaining_running
则是需要排队等下一轮安排; - 调度
self.waiting
:使用self._schedule_prefills()
进行调度,它做的事情是根据预算等信息确认self.waiting
中有多少可以被调度的sequence_groups
,并将它们的下面的sequences
状态由WAITING
改为RUNNING
(具体代码不在这展开);同时self.swapped
的优先级是比self.waiting
更高的,因此有self.swapped
的时候就不会调度self.waiting
; - 调度
self.running
和self.swapped
:调度的条件是len(prefills.seq_groups) == 0
,vllm
会同时考虑调度预算budget
以及显存块blocks
的制约(这两者是不一样的,后续文章会讲到),尽可能多的调度waiting
队列中的请求;当无法调度更多的请求时,比如达到了设置的处理请求上限256条时,开始调度self.runnning
和self.swapped
;self.runnning
和self.swapped
都处于decode
阶段,只是self.running
在gpu
上,而self.swapped
在cpu
上等待回归,self.running
优先级高于self.swapped
; - 更新调度后的
self.waiting
、self.running
和self.swapped
,它们有多种不同的组成部分,举例来说,self.waiting
不仅源于remaining_waiting
,而且来源于running_scheduled.preempted
,它表示在running_scheduled中优先级较低,且由于显存压力被挤下来(被占用)的sequence_groups
,他们需要在下一个step
中被从头计算的,所以它们应该添加到self.waiting
中;并且,由于它们是在本次running
调度中挤下来的,怎么说也比remaining_waiting
优先级高,所以这部分添加使用self.waiting.extendleft()
;其余各项可以类比分析一下; - 返回
SchedulerOutputs
对象。
class Schedule:
def __init(self, ...):
# ...
# 在WAITING状态的seq_groups,包含了新的prefill或者preempted请求
self.waiting: Deque[SequenceGroup] = deque()
# 在RUNNING状态的seq_groups,包含了decode请求
self.running: Deque[SequenceGroup] = deque()
# 在SWAPPED状态的seq_groups,包含了被swapped out的decode请求
self.swapped: Deque[SequenceGroup] = deque()
def schedule(self) -> Tuple[List[SequenceGroupMetadata], SchedulerOutputs]:
# 调度sequence groups
scheduler_outputs = self._schedule()
# 创建输入数据(给模型)
seq_group_metadata_list: List[SequenceGroupMetadata] = []
# 其他代码,太长,先忽略 ...
return seq_group_metadata_list, scheduler_outputs
def _schedule(self) -> SchedulerOutputs:
"""调度队列中的请求"""
if self.scheduler_config.chunked_prefill_enabled:
return self._schedule_chunked_prefill()
else:
# 默认走这一条
return self._schedule_default()
def _schedule_default(self) -> SchedulerOutputs:
"""尽可能多的预填充的请求组成一个batch,调度来解码;
如果显存不够,可以将部分解码请求切换到swapped或者preempted状态
"""
# 1) 根据最大的序列数量和最大的总token数量确定调度预算
# 随着调度的进行,SchedulingBudget会记录seq和token的使用量
budget = SchedulingBudget(
token_budget=self.scheduler_config.max_num_batched_tokens,
max_num_seqs=self.scheduler_config.max_num_seqs,
)
# 2) 调度处于running状态的序列(不超过最大序列数量)
for seq_group in self.running:
budget.add_num_seqs(seq_group.request_id,
seq_group.get_max_num_running_seqs())
# lora相关,先忽略
curr_loras = set(
seq_group.lora_int_id for seq_group in self.running
if seq_group.lora_int_id > 0) if self.lora_enabled else None
# 3) 不同状态的sequence_groups,并初始化它们调度的output(其实就是表示当前的n条seq被调度了,没什么东西)
# "remaining_xxx"表示经过本轮调度依旧保持为"xxx"状态的部分
remaining_waiting, prefills = (self.waiting,
SchedulerPrefillOutputs.create_empty())
remaining_running, running_scheduled = (
self.running, SchedulerRunningOutputs.create_empty())
remaining_swapped, swapped_in = (
self.swapped, SchedulerSwappedInOutputs.create_empty())
# If any requests are swapped, prioritized swapped requests.
if not self.swapped:
# 4) 没有swapped请求时,调度处于prefill阶段的seq_groups
remaining_waiting, prefills = self._schedule_prefills(
self.waiting, budget, curr_loras, enable_chunking=False)
# 调度策略:first come first serve
fcfs_policy = PolicyFactory.get_policy(policy_name="fcfs")
# Don't schedule decodes if prefills are scheduled.
# NOTE: If `_schedule_prefills` doesn't enable chunking, self.running
# only contains decode requests, not chunked prefills.
# 5) 调度running和swapped
if len(prefills.seq_groups) == 0:
# 没有prefill被调度时,调度running (decode)
remaining_running, running_scheduled = self._schedule_running(
self.running,
budget,
curr_loras,
fcfs_policy,
enable_chunking=False)
# If any sequence group is preempted, do not swap in any sequence
# group. because it means there's no slot for new running requests.
if len(running_scheduled.preempted) + len(
running_scheduled.swapped_out) == 0:
# 当不存在preempted和swapped_out的时候,才有可能安排swapped (in)的调度
remaining_swapped, swapped_in = self._schedule_swapped(
self.swapped, budget, curr_loras, fcfs_policy)
# 确认预算是否够用
assert (budget.num_batched_tokens <=
self.scheduler_config.max_num_batched_tokens)
assert budget.num_curr_seqs <= self.scheduler_config.max_num_seqs
# 更新waiting请求
self.waiting = remaining_waiting
# running被抢占的放到waiting前面,优先级更高
self.waiting.extendleft(running_scheduled.preempted)
# 更新running请求
self.running = remaining_running
# 预填充部分的添加到running
self.running.extend([s.seq_group for s in prefills.seq_groups])
# running中处于解码阶段的
self.running.extend(
[s.seq_group for s in running_scheduled.decode_seq_groups])
# 将要被swap in,并且在解码阶段
self.running.extend(
[s.seq_group for s in swapped_in.decode_seq_groups])
# 更新swapped请求
self.swapped = remaining_swapped
# running被swap out的
self.swapped.extend(running_scheduled.swapped_out)
# 由于优先级较低,在显存不足的情况下,暂时被其他优先级高的请求(生成新token)抢占的请求
preempted = (len(running_scheduled.preempted) +
len(running_scheduled.swapped_out))
# There should be no prefill from running queue because this policy
# doesn't allow chunked prefills.
assert len(running_scheduled.prefill_seq_groups) == 0
assert len(swapped_in.prefill_seq_groups) == 0
return SchedulerOutputs(
scheduled_seq_groups=(prefills.seq_groups +
running_scheduled.decode_seq_groups +
swapped_in.decode_seq_groups),
num_prefill_groups=len(prefills.seq_groups),
num_batched_tokens=budget.num_batched_tokens,
blocks_to_swap_in=swapped_in.blocks_to_swap_in,
blocks_to_swap_out=running_scheduled.blocks_to_swap_out,
blocks_to_copy=running_scheduled.blocks_to_copy +
swapped_in.blocks_to_copy,
ignored_seq_groups=prefills.ignored_seq_groups +
swapped_in.infeasible_seq_groups,
num_lookahead_slots=running_scheduled.num_lookahead_slots,
running_queue_size=len(self.running),
preempted=preempted,
)
至此,我们对LLMEngine
的生成阶段有了初步的了解,尤其是Scheduler
对用户请求的调度。然而有一些细节我们还不清楚,比如我们没有展开讲解self._schedule_prefills()
等方法,也未涉及调度过程中block_manager
的操作,这些将放到下一篇给出详细的回答。
总结
本篇介绍了LLM._validate_and_add_requests()
和LLM._run_engine()
这两个方法。尤其是后者,其中调度器Scheduler
扮演了非常重要的角色,在调度预算和显存限制下,按照指定策略调度了用户请求,进行推理。由于篇幅关系,对Scheduler
更加细致的分析会在下一篇展开。