vllm源码解析(一):整体架构与推理代码

news2024/11/15 6:46:47

vlllm官方代码更新频发,每个版本都有极大变动, 很难说哪个版本好用.
第一次阅读vllm源码是0.4.0版本,对这版圈复杂度极高的调度代码印象深刻
0.4.1对调度逻辑进行重构,完全大变样, 读代码速度快赶不上迭代的速度了。
现在已经更新到0.5.4, 经过长时间观察,发现主要的调度逻辑基本也稳定了下来, 应该可以作为一个固话的版本去阅读。

本文解读依据vllm 0.5.4版本. 没有修改任何代码,大家不必担心夹带私货!
打算以五篇文章的篇幅剖析vllm,希望能对大家有所帮助。

注解代码链接:
https://github.com/yblir/vllm-learn

参考文献:
https://zhuanlan.zhihu.com/p/691038809
https://zhuanlan.zhihu.com/p/681716326

一 大模型推理流程

在解析vllm源码前,我们先来回顾下llm推理流程。一个典型的推理过程如下:

在这里插入图片描述

prefill:预填充阶段,把整段prompt喂给大模型做推理,获得kv-cache并保存。
decode:大模型本质是个自回归模型,因此生成阶段,首先根据prompt中最后一个token的kv(input token 4)计算获得第一个推理结果(北),并保存对应的kv-cache(output token 1), 这个过程算一次推理;之后将 北 字作为输入(首次推理的输入是prompt,以后模型输入都是上次的生成token, 当然过程中要用到之前保存的kv-cache),做同样的推理生成 京 字,直到推理结束。

由于Decode阶段是逐一生成token,因此不能像prefill阶段那样能做大段prompt的并行计算,所以在LLM推理过程中,Decode阶段的耗时一般是更大的,单步生成token的耗时约占总推理时长的90%。

上述推理过程使用到了kv-cache技术,这里有些问题需要解决:
· 随着生成token的增多,kv-cache长度也变大,对gpu显存造成压力
· 生成的token长度无法预知,因此不能提前预知kv-cache所需的存储空间,给推理工作造成很大不确定性

vllm就是为解决上述问题而生,vllm的核心就是如何优化kv-cache,节省显存提高推理吞吐量。
调用方法也很简单,以下是qwen2 vllm推理代码:

# -*- coding: utf-8 -*-
# @Time    : 2024/8/18 20:14
# @Author  : yblir
# @File    : qwen2_vllm_inference.py
# explain  : 
# =======================================================
import os
import sys

sys.path.append('/mnt/e/PyCharm/insteresting/vllm-0.5.4/')

from vllm_module import LLM, SamplingParams
# from vllm import LLM, SamplingParams
from transformers import AutoTokenizer

os.environ['CUDA_VISIBLE_DEVICES'] = '0'
os.environ['CUDA_LAUNCH_BLOCKING'] = '1'
model_path = '/mnt/e/PyCharm/PreTrainModel/qwen2_15b_instruct'
# model_path = '/media/xk/D6B8A862B8A8433B/data/qwen2-15b-instruct'

params = {"repetition_penalty": 1.1,
          "temperature"       : 0.7,
          'n'                 : 4,
          "top_p"             : 0.8,
          "top_k"             : 20, }
sample_params = SamplingParams(**params)
llm = LLM(model=model_path,
          dtype='half'
            # dtype='float16'
          # 把模型层均分到n个gpu上, 而不是运行n个完整模型
          # tensor_parallel_size=1
          # gpu利用率最大70%
          # gpu_memory_utilization=0.7,
          )
tokenizer = AutoTokenizer.from_pretrained(model_path, )

# 构造模板
prompt = '介绍下京杭大运河'
messages = [
    {'role': 'system', 'content': '你是一个诗人'},
    {'role': 'user', 'content': prompt}
]

text = tokenizer.apply_chat_template(conversation=messages, tokenize=False, add_generation_prompt=True)

messages2 = [
    {'role': 'system', 'content': '你是一个诗人'},
    {'role': 'user', 'content': 'how far you go'}
]

text2 = tokenizer.apply_chat_template(conversation=messages2, tokenize=False, add_generation_prompt=True)

messages3 = [
    {'role': 'system', 'content': '你是一个诗人'},
    {'role': 'user', 'content': '中国首都城市什么名字'}
]

text3 = tokenizer.apply_chat_template(conversation=messages3, tokenize=False, add_generation_prompt=True)
# print(text)
outputs = llm.generate(
        # 当tokenizer.apply_chat_templat中 tokenize为 False 时激活prompts
        prompts=[text,text2,text3],

        # 当tokenizer.apply_chat_templat中 tokenize为 True 时激活prompt_token_ids,与prompts二选一
        # prompt_token_ids=[text,text2,text3],

        sampling_params=sample_params
)

for output in outputs:
    # prompt = output.prompt
    # print(prompt)
    # print(output)
    # print('------------------------------------------')
    for i,item in enumerate(range(4)):
        print(output.outputs[i].text)
        print(output.outputs[i].token_ids)
    print('------------------------------------------\n')


看起来很简单吧,似乎只要2步:只要把模型初始化,再调用generate方法就搞定了。实际上这两步的后面是耦合了调度与模型改造的复杂工程,本文将深度剖析潜藏在背后的源码。

二 vllm 原理分析

vllm管理kv-cache的技术称为PagedAttention,原理类似于虚拟内存分页管理技术。
正常推理流程中,生成的token长度无法预知,因此会最大化分配一块连续显存作为kv-cache的存储空间,可能到推理结束时这些空间大部分都用不到,而且这是为当前prompt分配的,其他prompt不能使用,造成极大浪费。
换个思路,如果把显存切分成多个连续小段是否可以呢!动态分配显存小段与生成的kv-cache 之间的存储关系,这样可以最大限度地使用显存,达到提升限度推理吞吐量的目的。
这种方法称为PagedAttention,主要有3个模块组成:logical kv blocks, block table, physical kv blocks. 原理如下图所示,下面我们来逐一解析。
在这里插入图片描述

  1. logical kv blocks:逻辑表, 不实际存储kv-cache,可以理解为C++语言中的指针,prefill和decode生成的kv-cache的"地址指针"存储在logical kv blocks, 逻辑表对"指针"的存储是连续的。不过在新版vllm中,logical kv这个东西已经删除了,当然逻辑块只是形式上消失了,实际上它依然隐藏在Sequence类的各个属性中,解释起来比较复杂,我们在以后的代码分析中再详解。
  2. physical kv blocks:可理解为实际存储token的物理显存,vllm中一个块默认为16(可以装16个token的k/v值),图中展示每个block大小为4。每个block内部是连续的,但block之间是不连续的,那么如何才能与logical保持对齐呢?这就需要block table了
  3. block table:存储logical与physical关系的映射表。如logical block0 -> physical block7. block table除了记录映射关系,还记录当前block槽位填充情况。如physical block7已经填满,因此filled==4; physical block1 槽位填充了3个,再填入一个father单词的token,filled会变为4。

多batch并行推理时,会有logical blocks映射到同一个physical blocks上,大家看图就能理解:
在这里插入图片描述

我们考虑另外一个问题,llm推理有时会有多个输出,这种情况PagedAttention该如何操作呢?
多输出有两种情况:
Parallel Sampling: 如果指定了n个输出,就把prompt复制n份,拼成一个batch喂给模型做推理。这时会产生prompt 的kv-cache重复存储,对这个重复的优化是另外问题,这里不展开了。
Beam Search:集束搜索,每个decode阶段,产生top k个token(k也被称为束宽),对应着当前时刻的top k个序列。它们的前置token也会有大量的kv-cache重复。
在这里插入图片描述

前面提到Parallel Sampling模式会把prompt复制 n份,如Figure 8所示,对应sample A1和sample A2,它们各自维护一套自己的logical blocks,由于内容完全相同,它们共享一套physical blocks,其中每个physical block对应引用计数ref count都为2. 进入推理阶段后,A1和A2各自独立做推理,如果生成了相同的token,会把新token kv-cache加入共享的physical block中。如果生成了不同的token(如图中的mothers和fathers),会触发copy-on-write机制,即在gpu上开辟一个新的block,如physical block1复制内容到block2,之后各自再装入生成的不同token, 同时,block1 计数-1,block2计数+1。这种操作符合vllm核心思想:节省KV cache显存,对于相同数据对应的KV cache,能复用则尽量复用;无法复用时,再考虑开辟新的物理空间。

vllm也有对Beam Search有优化,但这不是本文重点,暂时忽略。

目前为止,我们仅回顾与源码解析相关知识,PagedAttention还有许多东西没有讲到,说太多技术点反而会让人迷失在各种细节中,有兴趣可以自行去查资料了解。

三 vllm中一些基本概念

在解析源码前, 我们还需要清楚理解vllm一些概念的意思。

3.1 vllm 数据结构

如第一章中图片上展示的,一个prompt的典型例子如下:

<|im_start|>system
你是一个诗人<|im_end|>
<|im_start|>user
中国首都是<|im_end|>
<|im_start|>assistant

正常推理过程中,1个请求(batchsize)可能包含多个prompts,在vllm中,一个prompt才被看做一个请求;一个prompt可能输出多个outputs,这时每对prompt -> output序列称为一个seq序列,每条seq都维护着独立的status,可理解为当前时刻所处的推理状态,推理是否完结等:

  • WAITING:正在waiting队列中。waiting队列中的序列都没有做过prefill。
  • RUNNING:正在running队列中,即已经开始做推理。
  • SWAPPED:正在swapped队列中,表示此时gpu资源不足,相关的seq_group被抢占。

当然还有finished状态,主要记录因何种原因导致finished:

  • FINISHED_STOPPED:正常执行完毕,例如碰到符号,该seq的推理正常结束了
  • FINISHED_LENGTH_CAPPED:因为seq的长度达到最大长度限制,而结束推理
  • FINISHED_ABORTED:因不正常状态,而被终止的推理。例如客户端断开连接,则服务器会终止相关seq的推理
  • FINISHED_IGNORED:因prompt过长而被终止执行的推理。本质上也是受到长度限制

从上面可以看出单独管理seq有点复杂,所以我们需要统一管理一个prompt和它对应的所有outputs,称为一个seq_group。seq_group是vllm推理管理数据的基本单元。vllm 中设定一个seq_group中所有seq共享共一个prompt。这些变量的包含关系如下:
在这里插入图片描述
这张图是从某位大佬的文章中取来的,他用的版本应该是0.4.0,在0.5.4版本中,Sequence类已删除了logical_token_blocks属性和_append_tokens_to_blocks方法,对应的功能转移到其他代码中了。不过从图中,仍能清晰看到二者的从属关系。

3.2 调度原则

前面提到vllm的核心是对kv-cache的优化,而这种优化是通过调度系统Scheduler来完成的。
Scheduler维护着三个双端队列, 在3.1也有提到:
waiting,running,swapped。
每完成一次推理,都要对这三个队列进行动态调整,在下一次推理时实现最大限度提升吞吐量的目的.
这三个队列的作用如下:

  • waiting: 所有输入的模型的prompt都会被加入waiting队列中,这是输入数据的入口。这时的seq只有一条,就是prompt,连prefill都还没做,不管在外部设置了多输出还是Beam Search,此时只有这一条数据。waiting队列另一个数据来源是running队列。
  • running: 存储着上一次被送去做推理seq_groups,在下一次做推理前, 要对running队列中的seq_groups做检查,看系统是否有足够资源让它们留在队列中继续做下一次推理。如果当前系统资源不满足做一次推理,就把seq_group一条条pop()出来,转移到waiting或swapped队列中,直到满足下一次推理的资源需求。当然,running队列也会从waiting和swapped队列拿数据过来做推理,至于转移,怎么拿,接下来我们会详细分析。
  • swapped: 可以理解为失败者集散地,都是不满足条件(gpu blocks资源不足或某些推理参数超过阈值),被从running队列中踢出去的,等条件满足时,还会被从新加入running队列做推理。

上面提到了三个队列间的交互,它们的交互依据就是vllm的调度原则。
vllm调度的原则可以总结如下,了解了下面的处理逻辑,就能理解三个队列的用途及它们之间相互转移数据的规则:

  • 先来的请求先被服务(First-Come-First-Serve, FCFS)
  • 如有抢占的需要,后来的请求先被抢占(preemption)

FCFS大家都很好理解,我们来看下对preemption的处理:

抢占发生在推理阶段,vllm核心是最大限度优化吞吐量,推理过程中gpu显存不足或推理的tokens和seqs数量超过设定阈值,都会发生抢占,即暂时中断一些任务的执行,释放gpu上与它们相关的kv-cache,等资源充足,再恢复它们的执行。针对preemption有两种处理方式:

如果parallel sampling=1,直接释放所有physical blocks,将任务重新放回wait队列(放到队列头部,下一次最先取它),重新从prefill阶段开始做推理。
如果parallel sampling>1, 如果将它们直接丢掉,那未免过于浪费, 先把处理的好的blocks交换到CPU上,等gpu显存充足,再把这些blocks从CPU加载回来。

上面提到超过推理参数超过阈值也会导致抢占,这里的阈值指每次允许推理的最大seqs数量和最大tokens数量。在vllm 0.5.4版本中,由一个类budget的对象管理,每次推理前都要重新构建一个budget类对象,统计seqs和tokens数量是否越界。

至此,我们可以得出结论,判断一个seq_group是否被抢占的因素有三个:gpu blocks数量是否充足,当前调度能处理的seqs和tokens是否超过数量阈值。

3.3 vllm推理流程

有了上面的知识储备,我们就能理解一个seq从开始到结束,整个生命周期内的历程:
在这里插入图片描述

  • 每条prompt处理成Sequence对象,然后Sequence包装成seq_group,这条seq_group会存入waiting队列。此时只有一条seq,就是prompt,连预填充prefill都没做。status为waiting
  • ② 调度器选中这条seq_group做推理,图中我们展示两种情况,4输出和1输出,因此会产生4条seq和1条seq, 其中4 seq共享prompt,status为running。
  • ③ 推理一段时间后,gpu blocks资源不足或tokens或seqs数量超出阈值,发生抢占现象。多输出的seq_group相关kv blocks会被swap out到CPU上;而单输出的seq_group则会把相关的(prefill和decode)kv blocks释放,将seq_group重新放回waiting队列,就像什么都没发生过,幸运的是会被放到waiting队列最前面。
  • ④ 系统资源充足了,被swapped的seq_group会从CPU上swap_in gpu继续推理;单输出seq_group则从prefill开始重新奋斗
  • ⑤⑥ 多输出必定会出现某条seq先推理结束,此时还活跃的seq数减1, 变为3个,当某条seq推理完结,会被标记为finished, 以后不再调用资源处理它,只有等seq_group中所有seq都推理结束,该seq_group才算推理完成。

注意:并不是每个seq_group都会经历抢占,如果系统资源充足,会跳过抢占的,此时的执行次序为:①②⑤⑥ 或 ①②⑤

四 推理代码解析

经过漫长的知识铺垫,我们终于能看到vllm的核心代码了
我们先略过初始化阶段,直接从generate开始,遇到重要知识点再现场分析

outputs = llm.generate(
        # 当tokenizer.apply_chat_templat中 tokenize为 False 时激活prompts
        prompts=[text,text2,text3],

        # 当tokenizer.apply_chat_templat中 tokenize为 True 时激活prompt_token_ids,与prompts二选一
        # prompt_token_ids=[text,text2,text3],

        sampling_params=sample_params
)

vllm推理代码真的很简洁,只要一行代码就行,背后逻辑就极其复杂了,而且每个版本都有很大改动。

  • vllm/entrypoints/llm.py
    def generate(
            self,
            prompts: Union[Union[PromptInputs, Sequence[PromptInputs]], Optional[Union[str, List[str]]]] = None,
            sampling_params: Optional[Union[SamplingParams, Sequence[SamplingParams]]] = None,
            prompt_token_ids: Optional[Union[List[int], List[List[int]]]] = None,
            use_tqdm: bool = True,
            lora_request: Optional[Union[List[LoRARequest], LoRARequest]] = None,
            prompt_adapter_request: Optional[PromptAdapterRequest] = None,
            guided_options_request: Optional[Union[LLMGuidedOptions, GuidedDecodingRequest]] = None
    ) -> List[RequestOutput]:

        if self.llm_engine.model_config.embedding_mode:
            raise ValueError(
                    "LLM.generate() is only supported for generation models (XForCausalLM)."
            )

        # cast 表面看是类型转换,但实践上没做任何事,直接原样返回, 那么这个调用的意义是什么 ?
        if prompt_token_ids is not None:
            inputs = self._convert_v1_inputs(
                    prompts=cast(Optional[Union[str, List[str]]], prompts),
                    prompt_token_ids=prompt_token_ids,
            )
        else:
            inputs = cast(Union[PromptInputs, Sequence[PromptInputs]], prompts)

        if isinstance(guided_options_request, dict):
            if len(guided_options_request) > 1:
                raise ValueError(
                        "You can only use one guided decoding but multiple is "
                        f"specified: {guided_options_request}")
            guided_options_request = GuidedDecodingRequest(**guided_options_request)

        if sampling_params is None:
            # Use default sampling params.
            sampling_params = SamplingParams()

        # 校验入参,并将一个batchsize中的每条prompt处理成Sequence对象,然后Sequence包装成SequenceGroup组,
        # 1. prompt->seq->seq_group, 2. 将seq_group加入合适gpu维护的scheduler的waiting队列,等待处理
        self._validate_and_add_requests(
                inputs=inputs,
                params=sampling_params,
                lora_request=lora_request,
                prompt_adapter_request=prompt_adapter_request,
                guided_options=guided_options_request)

        # 首先从scheduler的waiting队列取数据,加入到running队列,再从running队列
        # 中取数据推理,若物理blocks不够用,从running转入swap队列
        outputs = self._run_engine(use_tqdm=use_tqdm)

        return LLMEngine.validate_outputs(outputs, RequestOutput)

generate 代码中最重要的模块有两个,_validate_and_add_requests(数据预处理),_run_engine(实际推理)

4.1 数据预处理

  • vllm/entrypoints/llm.py
    def _validate_and_add_requests(
            self,
            inputs: Union[PromptInputs, Sequence[PromptInputs]],
            params: Union[SamplingParams, Sequence[SamplingParams], PoolingParams, Sequence[PoolingParams]],
            lora_request: Optional[Union[Sequence[LoRARequest], LoRARequest]],
            prompt_adapter_request: Optional[PromptAdapterRequest],
            guided_options: Optional[GuidedDecodingRequest] = None,
    ) -> None:
        # 如果输入是一条prompt,而不是list,会在此处自动转换为list
        if isinstance(inputs, (str, dict)):
            # Convert a single prompt to a list.
            inputs = [inputs]
           
		...
		
        # Add requests to the engine.
        # 遍历每一条prompt,1个prompt算1个request,需要有1个全局唯一的request_id
        for i, request_inputs in enumerate(inputs):
            self._add_request(
                    request_inputs,
                    params[i] if isinstance(params, Sequence) else params,
                    lora_request=lora_request[i] if isinstance(lora_request, Sequence) else lora_request,
                    prompt_adapter_request=prompt_adapter_request)
                    
    # ==========================================================================================================
    def _add_request(
            self,
            inputs: PromptInputs,
            params: Union[SamplingParams, PoolingParams],
            lora_request: Optional[Union[List[LoRARequest], LoRARequest]] = None,
            prompt_adapter_request: Optional[PromptAdapterRequest] = None
    ) -> None:
        # 每个prompt赋1个全局唯一的request_id
        request_id = str(next(self.request_counter))
        self.llm_engine.add_request(
                request_id,
                inputs,
                params,
                lora_request=lora_request,
                prompt_adapter_request=prompt_adapter_request)
  • vllm/engine/llm_engine.py
  # ==========================================================================================================
    def add_request(
            self,
            request_id: str,  # 每个请求的唯一id,在vLLM内部,1条prompt算1个请求,会附给1个请求id
            inputs: PromptInputs,  # prompt
            params: Union[SamplingParams, PoolingParams],  # 用于采样的参数(温度、topk等)
            arrival_time: Optional[float] = None,  # 请求到达的时间。如果是None,则用当前系统时间
            lora_request: Optional[LoRARequest] = None,  # 如果是用lora模型做推理,相关的lora请求
            trace_headers: Optional[Mapping[str, str]] = None,
            prompt_adapter_request: Optional[PromptAdapterRequest] = None,

    ) -> None:
        if lora_request is not None and not self.lora_config:
            raise ValueError(f"Got lora_request {lora_request} but LoRA is not enabled!")
        # 设置该请求的到达时间
        if arrival_time is None:
            arrival_time = time.time()

        # processed_inputs:dict,= {'prompts':xxx,'prompts_token_ids':xxx,'multi_modal_data':None}
        processed_inputs = self.process_model_inputs(
                request_id=request_id,
                inputs=inputs,
                lora_request=lora_request,
                prompt_adapter_request=prompt_adapter_request)
        # 1. prompt->seq->seq_group, 2. 将seq_group加入合适gpu维护的scheduler的waiting队列,等待处理
        self._add_processed_request(
                request_id=request_id,
                processed_inputs=processed_inputs,
                params=params,
                arrival_time=arrival_time,
                lora_request=lora_request,
                prompt_adapter_request=prompt_adapter_request,
                trace_headers=trace_headers,
        )

我们看到,经过疯狂套娃后,最终实际干活的是self.process_model_inputs和self._add_processed_request这两个方法,其他全都是中间商~

  • vllm/engine/llm_engine.py
    def process_model_inputs(
            self,
            request_id: str,
            inputs: PromptInputs,
            lora_request: Optional[LoRARequest] = None,
            prompt_adapter_request: Optional[PromptAdapterRequest] = None,
    ) -> LLMInputs:
        if isinstance(inputs, str):
            inputs = {"prompt": inputs}

        if "prompt_token_ids" not in inputs:
            # 这个函数就是为了拿到self.tokenizer
            tokenizer = self.get_tokenizer_group("prompts must be None if skip_tokenizer_init is True")
            # 文字prompt编码成token_id
            prompt_token_ids = tokenizer.encode(request_id=request_id,
                                                prompt=inputs["prompt"],
                                                lora_request=lora_request)
        # 如果入参前已经做好token_ids,直接取出来用
        else:
            prompt_token_ids = inputs["prompt_token_ids"]

        # 使用未合并的lora才会走进入这个判断分支
        if prompt_adapter_request:
            prompt_token_ids = [0] * prompt_adapter_request.prompt_adapter_num_virtual_tokens + prompt_token_ids

        # LLMInputs继承自TypedDict,将入参转换为字典
        # llm_inputs = {'prompts':xxx,'prompts_token_ids':xxx,'multi_modal_data':None}
        llm_inputs = LLMInputs(prompt_token_ids=prompt_token_ids,
                               prompt=inputs.get("prompt"),
                               multi_modal_data=inputs.get("multi_modal_data"))

        # todo 使用functools.partial高阶用法,返回的是一个固定llm_inputs参数的函数,真的好用吗?
        # 目前觉得这个函数会拖慢速度,因为每个prompt都要经过这里获得模型架构的操作
        # 目前这个函数,经过多层调用后, 最后原样返回,没对llm_inputs做任何操作
        return self.input_processor(llm_inputs) 

process_model_inputs功能是把输入的prompt转换为token_id, 结果以字典形式输出:
llm_inputs = {‘prompts’:xxx,‘prompts_token_ids’:xxx,‘multi_modal_data’:None}

   # ==========================================================================================================
    
    def _add_processed_request(
            self,
            request_id: str,
            processed_inputs: LLMInputs,
            params: Union[SamplingParams, PoolingParams],
            arrival_time: float,
            lora_request: Optional[LoRARequest],
            prompt_adapter_request: Optional[PromptAdapterRequest],
            trace_headers: Optional[Mapping[str, str]] = None,
    ) -> None:
        # Create the sequences.
        block_size = self.cache_config.block_size
        # self.seq_counter是在类中初始化,所以可以为每条seq生成不重复的id,
        # seq_id与request_id是两个独立的变量
        seq_id = next(self.seq_counter)
        eos_token_id = self._get_eos_token_id(lora_request)
        # seq 包含当前prompt的各种信息:token_id,status(waiting,...), 占用blocks数量(逻辑,物理数量相同)
        seq = Sequence(seq_id, processed_inputs, block_size, eos_token_id,
                       lora_request, prompt_adapter_request)
        # 将seq和采样参数合并为seq_group
        # --------------------------------------------------------------------------------------------------------------
        if isinstance(params, SamplingParams):
            seq_group = self._create_sequence_group_with_sampling(
                    request_id,
                    seq,
                    params,
                    arrival_time=arrival_time,
                    lora_request=lora_request,
                    trace_headers=trace_headers,
                    prompt_adapter_request=prompt_adapter_request)
        elif isinstance(params, PoolingParams):
            seq_group = self._create_sequence_group_with_pooling(
                    request_id,
                    seq,
                    params,
                    arrival_time=arrival_time,
                    lora_request=lora_request,
                    prompt_adapter_request=prompt_adapter_request)
        else:
            raise ValueError("Either SamplingParams or PoolingParams must be provided.")
        # --------------------------------------------------------------------------------------------------------------

        # Add the sequence group to the scheduler with least unfinished seqs.
        # 获得当前每个gpu上还没推理结束的seq_group数量: len(self.waiting) + len(self.running) + len(self.swapped)
        costs = [scheduler.get_num_unfinished_seq_groups() for scheduler in self.scheduler]
        # 找出工作量最少的调度器
        min_cost_scheduler = self.scheduler[costs.index(min(costs))]
        # 将当前seq_group加入这个调度器中(根据self.scheduler初始过程可知,每个gpu维护一个调度器,
        # 这条代码的意思就是当前seq_group由工作量最少的gpu负责推理)
        min_cost_scheduler.add_seq_group(seq_group)

用户输入的prompt经过_validate_and_add_requests处理后,会封装为seq_group,然后将seq_group加入合适gpu维护的scheduler的waiting队列, 等待处理。
seq和seq_group是vllm推理的基本单元,是两个非常重要的概念,我们来直观感受下它们的数据格式:

seq:
在这里插入图片描述

seq_group:
初始阶段seqs_dict都只有一个元素,经过prefill后,会扩展为n个元素(n是输出outputs数量)
在这里插入图片描述

4.2 推理流程

    def _run_engine(
            self, *, use_tqdm: bool
    ) -> List[Union[RequestOutput, EmbeddingRequestOutput]]:
        # Initialize tqdm.
        if use_tqdm:
            num_requests = self.llm_engine.get_num_unfinished_requests()
            pbar = tqdm(
                    total=num_requests,
                    desc="Processed prompts",
                    dynamic_ncols=True,
                    postfix=f"est. speed input: {0:.2f} toks/s, output: {0:.2f} toks/s",
            )
        # Run the engine.
        outputs: List[Union[RequestOutput, EmbeddingRequestOutput]] = []
        total_in_toks = 0
        total_out_toks = 0

        # 如果当前调度器中还有没完成推理的请求(调度器中waiting/running/swapped任一队列非空)
        while self.llm_engine.has_unfinished_requests():
            # 执行1次推理调度(step),决定哪些请求的数据可以参与到这次推理中,step输出本次推理结果
            step_outputs = self.llm_engine.step()
            # 一次step推理后,如果有请求已经完成了推理,将推理结果装进outputs中,
            for output in step_outputs:
                if output.finished:
                    outputs.append(output)
                    if use_tqdm:
                        if isinstance(output, RequestOutput):
                            # Calculate tokens only for RequestOutput
                            total_in_toks += len(output.prompt_token_ids)
                            in_spd = total_in_toks / pbar.format_dict["elapsed"]
                            total_out_toks += sum(len(stp.token_ids) for stp in output.outputs)
                            out_spd = total_out_toks / pbar.format_dict["elapsed"]
                            pbar.postfix = (
                                f"est. speed input: {in_spd:.2f} toks/s, output: {out_spd:.2f} toks/s"
                            )
                        pbar.update(1)
        if use_tqdm:
            pbar.close()
        # Sort the outputs by request ID.
        # This is necessary because some requests may be finished earlier than
        # its previous requests.
        return sorted(outputs, key=lambda x: int(x.request_id))

整个推理engine中,最重要的是self.llm_engine.step(),封装了所有的调度,推理和后处理代码。

    def step(self) -> List[Union[RequestOutput, EmbeddingRequestOutput]]:
        # 多GPU并行推理时走AsyncLLMEngine分支。如果进入当前LLMEngine,性能会下降,这里会抛出异常。
        if self.parallel_config.pipeline_parallel_size > 1:
            raise NotImplementedError(
                    "Pipeline parallelism is only supported through AsyncLLMEngine "
                    "as performance will be severely degraded otherwise.")

        # 上述if判断表明,只有一个GPU可用。因此self.scheduler也只有一个元素,是当前GPU的调度
        # 该函数调用改变调度的内部状态(self.running、self.swapped 和 self.waiting)
        seq_group_metadata_list, scheduler_outputs = self.scheduler[0].schedule()
        
		...
		
        return request_outputs

step中使用的调度代码如下:

    def schedule(self) -> Tuple[List[SequenceGroupMetadata], SchedulerOutputs]:
        # Schedule sequence groups.
        # This function call changes the internal states of the scheduler
        # such as self.running, self.swapped, and self.waiting.
        # 该函数调用改变调度的内部状态(self.running、self.swapped 和 self.waiting)
        scheduler_outputs = self._schedule()
        ...

        return seq_group_metadata_list, scheduler_outputs

调度系统是vllm代码的核心,接下来,我们花单独一篇文章,详细解读self._schedule()的内部逻辑

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2102990.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

认知杂谈34

今天分享 有人说的一段争议性的话 I 环境的影响 I 首先得说说&#xff0c;环境这东西对人的影响真不是盖的。你要是老待在一个死气沉沉的地方&#xff0c;那你的激情和梦想&#xff0c;可能慢慢就会被磨得平平无奇。 I 激情的消逝 I 本来你可能是满怀激情&#xff0c;想要大干一…

HMI触屏网关-VISION如何与Node-red通信

上文&#xff1a;HMI触屏网关-VISION如何与Modbus TCP从机通信-CSDN博客 1. Node-red启用HTTP监听 HTTP监听&#xff0c;用于模拟WebAPI服务端&#xff0c;接收WebAPI客户端GET请求。 启用HTTP监听服务&#xff0c;选择请求方式GET&#xff1b;URL自定义&#xff0c;本示例设…

人活着的意义是什么

大家好&#xff0c;我是凡人。 最近很多行业已经受到了 AI 带来的冲击&#xff0c;造成现在网络上一些消极情绪滋生。 这篇文章来源于我回答了一个知乎小哥的问题后&#xff0c;有了点思考&#xff0c;他的问题是这样的“可能我们普通人一辈子都无法为人类社会做出大的贡献&a…

自动化测试概念(1)

常⻅⾯试题 1.⾃动化测试能够取代⼈⼯测试吗&#xff1f; ⾃动化测试不⼀定⽐⼈⼯测试更能保障系统的可靠性&#xff0c;⾃动化测试是测试⼈员⼿⼯编写&#xff0c; 后续如果有功能的变更⾃动化也需要进⾏不定期的维护和更新。 2.⾃动化测试可以⼤幅度降低⼯作量&#xff1…

腾讯40岁老哥毕业了

我的朋友岳京杭发了篇文章&#xff0c;谈到了一位腾讯站up主「老龚40了啥也不是」 近期离职腾讯的故事。 2019年年底&#xff0c;36岁的老龚跳槽去了深圳腾讯&#xff0c;北漂变深漂。老龚合租在腾讯公司附近&#xff0c;月租2000&#xff0c;老婆孩子依然在长沙&#xff0c;老…

远程桌面 Rust Desk 自建服务器

因为某些原因(诈骗)&#xff0c;Rush Desk 服务已暂停国内访问&#xff0c;今天我们介绍如何利用自己的服务器搭建 Rust Desk 远程桌面&#xff0c;低延迟电脑远程手机&#xff0c;手机远程电脑等 一、准备工作 准备一台服务器&#xff0c;我用的腾讯云服务器&#xff0c;一年…

全国地市未来产业水平数据集(2008-2023年)

未来产业&#xff0c;作为驱动经济社会高质量发展的核心引擎&#xff0c;是指依托科技创新和模式创新&#xff0c;引领全球新一轮科技革命和产业变革&#xff0c;具有前瞻性、先导性、战略性的新兴产业领域。也是实现生产力大解放&#xff0c;推动生产力质的跃迁并形成新质生产…

2024华为OD机试真题-反射计数Python-C卷D卷-200分

2024华为OD机试最新E卷题库-(C卷+D卷+E卷)-(JAVA、Python、C++) 目录 题目描述 输入描述 输出描述 用例1 题目解析 代码 题目描述 给定一个包含 0 和 1 的二维矩阵。 给定一个初始位置和速度,一个物体从给定的初始位置出发,在给定的速度下进行移动,遇到矩阵的边缘则…

机器学习实战篇——肿瘤良性/恶性分类器(二元逻辑回归)

机器学习之实战篇——肿瘤良性/恶性分类器&#xff08;二元逻辑回归&#xff09; 前言数据集和实验文件下载相关文章推荐实验过程导入相关模块数据预处理手写二元逻辑回归模型&#xff08;小批量梯度下降&#xff09;sklearn逻辑回归器 前言 实验中难免有许多缺陷和错误&#…

20240903软考架构-------软考111-115答案解析

每日打卡题111-115答案 111、【2016年真题】 难度&#xff1a;一般 实时操作系统&#xff08;RTOS&#xff09;内核与应用程序之间的接口称为&#xff08; &#xff09;。 A&#xff0e;I&#xff0f;O接口 B&#xff0e;PCI C&#xff0e;API D&#xff0e;GUI 答案&#xff…

HTML音乐圣诞树

目录 写在前面 完整代码 下载代码 代码分析 系列文章 写在最后 写在前面 圣诞节(Christmas)亦称耶稣圣诞节、主降生节,天主教亦称耶稣圣诞瞻礼。译名为“基督弥撒”,它源自古罗马人迎接新年的农神节,与基督教本无关系。在基督教盛行罗马帝国后,教廷随波逐流地将这…

【Java那些事】关于Git的使用

目录 下拉代码仓库篇 上传代码篇 下拉代码仓库篇 第一步&#xff0c;下拉代码&#xff0c;复制链接。 &#xff08;从开源网站上复制链接&#xff09; &#xff08;建立本地仓库&#xff09; 这里的URL一般都会自动填充刚刚复制的链接【瞅瞅&#xff0c;确保是想要的那个项…

【pycharm-乱码】简单记录一下都有哪些涉及编码

控制台 路径&#xff1a;setting-》general-》console setting-》editor-》file encodings 路径&#xff1a;setting-》editor->file and code templates #!/user/bin/env python3 # -*- coding: utf-8 -*-setting->tools->ssh terminal

QT入门-安装

文章目录 起步安装QT在线安装安装过程配置环境变量更新或删除Qt 平台需求 界面简介绍创建第一个项目 起步 安装QT 您可以使用在线或离线安装程序安装Qt框架和工具&#xff0c;或者自己构建源包。 安装程序允许您下载并安装以下组件: Qt库&#xff0c;为特定的开发平台(操作…

登录-异步请求用户数据无法保存-bug

bug情况&#xff1a; 在进行登录时需要发送两次一次是登录请求&#xff0c;一次是登录后获取用户信息&#xff0c;但是浏览器本地存储没有成功保存user信息 原登录方法&#xff1a; // 账号密码登录 function login() {formRef.value.validate((valid) > {if (valid) {//发…

KRTS网络模块:UDP通信

KRTS网络模块:UDP通信 目录 KRTS网络模块:UDP通信UDP简介KRST UDP简介核心特性界面设计 核心代码运行实例稳定性测试 UDP简介 UDP&#xff08;User Datagram Protocol&#xff0c;用户数据报协议&#xff09;是一种无连接的传输层协议&#xff0c;它位于OSI七层模型中的传输层…

「对比评测」标准WPF DataGrid与DevExpress WPF GridControl有何不同?(二)

DevExpress WPF拥有120个控件和库&#xff0c;将帮助您交付满足甚至超出企业需求的高性能业务应用程序。通过DevExpress WPF能创建有着强大互动功能的XAML基础应用程序&#xff0c;这些应用程序专注于当代客户的需求和构建未来新一代支持触摸的解决方案。 无论是Office办公软件…

NanoPC-T6安装redriod笔记

这里主要用于自己对安装过程的记录&#xff0c;中间可能记录比较粗糙。 重新编译内核 参考链接&#xff1a;【环境搭建】基于linux的NanoPC-T6_LTS系统固件编译环境搭建 基于docker构建编译环境 docker run -it \ --privilegedtrue --cap-addALL \ --name nanopc_t6_lts_en…

协同开发工具Git

网上对于Git的使用方法介绍的很多&#xff0c;在日常工作中&#xff0c;Git是团队开发必不可少的工具之一&#xff0c;我想为一些刚使用Git的小伙伴们介绍一下常遇到的小问题。 1&#xff1a;拼写错误。这应该是每个初学者都会犯得错误&#xff0c;当出现这种错误还是比较好排…

STM32的CRC校验(基于HAL库)

一&#xff1a;CRC概念 1&#xff1a;什么是CRC crc是一种纠错技术&#xff0c;代表循环冗余校验&#xff0c;是数据通信领域中最常用的一种差错校验码&#xff0c;其信息字段和校验长度可以任意指定&#xff0c;但要求通信双方定义的CRC标准一致。主要用来检测或校验数据传输或…