基于 LLamafactory 的异步API高效调用实现与速度对比

news2025/1/23 12:57:28

文章目录

    • 背景
    • 摘要
    • 简介
    • 代码实现
    • 运行结果
    • 速度对比
      • 异步调用速度
      • 同步调用速度

背景

原先经常调用各家的闭源大模型的API,如果使用同步的方式调用,速度会很慢。为了加快 API 的调用速度,决定使用异步调用 API 的方式。

摘要

通过异步方式调用大语言模型 API的方法,相较于传统同步调用方式,异步调用速度提升了约 9.41 倍。利用 LLamafactory 原生数据集加载和自定义异步工具类 AsyncAPICall 实现批量数据推理,支持调用限速和断点恢复。

简介

本文编写的代码,支持原生的 llamafactory 的数据集导入方式。
推理速度远远快于同步的 API 调用方式。基于 langchain_openai.ChatOpenAI 的 invoke 方法实现异步调用。
下述代码的主要工作介绍如下:

  • 使用 LLamafactory 的原生方法加载 数据集;
  • 封装了异步调用工具类 AsyncAPICall,限制API的调用速度,逐块推理,避免程序崩溃导致所有数据丢失;

代码实现

async_call_api.py

# pip install langchain langchain_openai

import os
import sys
import json
import asyncio


import fire
from tqdm import tqdm
from dataclasses import dataclass
from aiolimiter import AsyncLimiter
from typing import List
import pandas as pd
from langchain_openai import ChatOpenAI
from dotenv import load_dotenv

from llamafactory.hparams import get_train_args
from llamafactory.extras.constants import IGNORE_INDEX
from llamafactory.data.loader import _get_merged_dataset

load_dotenv()


class AsyncLLM:
    def __init__(
        self,
        model: str = "gpt-3.5-turbo",
        base_url: str = "http://localhost:{}/v1/".format(
            os.environ.get("API_PORT", 8000)
        ),
        api_key: str = "{}".format(os.environ.get("API_KEY", "0")),
        num_per_second: int = 6,
        **kwargs,
    ):
        self.model = model
        self.base_url = base_url
        self.api_key = api_key
        self.num_per_second = num_per_second

        self.limiter = AsyncLimiter(self.num_per_second, 1)

        self.llm = ChatOpenAI(
            model=self.model, base_url=self.base_url, api_key=self.api_key, **kwargs
        )

    async def __call__(self, text):
        # 限速
        async with self.limiter:
            return await self.llm.ainvoke([text])


llm = AsyncLLM(
    base_url="http://localhost:{}/v1/".format(os.environ.get("API_PORT", 8000)),
    api_key="{}".format(os.environ.get("API_KEY", "0")),
    num_per_second=10,
)
llms = [llm]


@dataclass
class AsyncAPICall:
    uid: str = "0"

    @staticmethod
    async def _run_task_with_progress(task, pbar):
        result = await task
        pbar.update(1)
        return result

    @staticmethod
    def async_run(
        llms: List[AsyncLLM],
        data: List[str],
        keyword: str = "",
        output_dir: str = "output",
        chunk_size=500,
    ) -> List[str]:

        async def infer_chunk(llms: List[AsyncLLM], data: List):
            results = [llms[i % len(llms)](text) for i, text in enumerate(data)]
            with tqdm(total=len(results)) as pbar:
                results = await asyncio.gather(
                    *[
                        AsyncAPICall._run_task_with_progress(task, pbar)
                        for task in results
                    ]
                )
            return results

        idx = 0
        all_df = []
        file_exist_skip = False
        user_confirm = False

        while idx < len(data):
            file_path = os.path.join(output_dir, "tmp", f"{idx}.csv.temp")

            if os.path.exists(file_path):
                if not user_confirm:
                    while True:
                        user_response = input(
                            f"Find {file_path} file already exists. Do you want to skip them forever?\ny or Y to skip, n or N to rerun to overwrite: "
                        )
                        if user_response.lower() == "y":
                            user_confirm = True
                            file_exist_skip = True
                            break
                        elif user_response.lower() == "n":
                            user_confirm = True
                            file_exist_skip = False
                            break

                if file_exist_skip:
                    tmp_df = pd.read_csv(file_path)
                    all_df.append(tmp_df)
                    idx += chunk_size
                    continue

            tmp_data = data[idx : idx + chunk_size]
            loop = asyncio.get_event_loop()
            tmp_result = loop.run_until_complete(infer_chunk(llms=llms, data=tmp_data))
            tmp_result = [item.content for item in tmp_result]

            tmp_df = pd.DataFrame({"infer": tmp_result})

            if not os.path.exists(p := os.path.dirname(file_path)):
                os.makedirs(p, exist_ok=True)

            tmp_df.to_csv(file_path, index=False)
            all_df.append(tmp_df)
            idx += chunk_size

        all_df = pd.concat(all_df)
        return all_df["infer"]


def async_api_infer(
    model_name_or_path: str = "",
    eval_dataset: str = "",
    template: str = "",
    dataset_dir: str = "data",
    do_predict: bool = True,
    predict_with_generate: bool = True,
    max_samples: int = None,
    output_dir: str = "output",
    chunk_size=50,
):

    if len(sys.argv) == 1:
        model_args, data_args, training_args, finetuning_args, generating_args = (
            get_train_args(
                dict(
                    model_name_or_path=model_name_or_path,
                    dataset_dir=dataset_dir,
                    eval_dataset=eval_dataset,
                    template=template,
                    output_dir=output_dir,
                    do_predict=True,
                    predict_with_generate=True,
                    max_samples=max_samples,
                )
            )
        )
    else:
        model_args, data_args, training_args, finetuning_args, generating_args = (
            get_train_args()
        )

    dataset = _get_merged_dataset(
        data_args.eval_dataset, model_args, data_args, training_args, "sft"
    )

    labels = [item[0]["content"] for item in dataset["_response"]]
    prompts = [item[0]["content"] for item in dataset["_prompt"]]

    infers = AsyncAPICall.async_run(
        llms,
        prompts,
        chunk_size=chunk_size,
        output_dir=training_args.output_dir,
    )

    if not os.path.exists(training_args.output_dir):
        os.makedirs(training_args.output_dir, exist_ok=True)

    output_prediction_file = os.path.join(
        training_args.output_dir, "generated_predictions.jsonl"
    )

    with open(output_prediction_file, "w", encoding="utf-8") as writer:
        res: List[str] = []
        for text, pred, label in zip(prompts, infers, labels):
            res.append(
                json.dumps(
                    {"prompt": text, "predict": pred, "label": label},
                    ensure_ascii=False,
                )
            )
        writer.write("\n".join(res))


if __name__ == "__main__":
    fire.Fire(async_api_infer)

上述代码支持 yaml 文件:

async.yaml

## model
model_name_or_path: qwen/Qwen2.5-7B-Instruct

### method
do_predict: true

### dataset
dataset_dir: ../data/
eval_dataset: alpaca_zh_demo
template: qwen

### output
output_dir: output
# overwrite_output_dir: true


### eval
predict_with_generate: true
max_samples: 100

运行结果

使用下述代码运行:

python async_call_api.py async.yaml

异步调用的API推理结果:

{"prompt": "识别并解释给定列表中的两个科学理论:细胞理论和日心说。", "predict": "细胞理论和日心说是两个重要的科学理论,分别属于生物学和天文学领域。\n\n1. 细胞理论:细胞理论是生物学中的一个基本理论,它描述了所有生命体的基本结构和功能单位——细胞。细胞理论包含以下三个主要观点:\n\n   a. 所有生物体都是由一个或多个细胞组成的。\n   b. 细胞是生命活动的基本单位,负责维持生物体的生命活动。\n   c. 新细胞只能由已存在的细胞分裂产生。\n\n   细胞理论的提出和发展对于生物学研究具有重要意义,它揭示了生物体的基本结构和功能单位,为生物学研究提供了理论基础。\n\n2. 日心说:日心说是一种关于太阳系结构的理论,它描述了太阳在太阳系中的中心位置,而地球和其他行星则围绕太阳旋转。日心说最初由古希腊天文学家阿里斯塔克提出,但直到16世纪,哥白尼重新提出这一理论,并得到了进一步的发展和完善。日心说的提出颠覆了传统的地心说观念,为现代天文学的发展奠定了基础。\n\n这两个科学理论分别在生物学和天文学领域中具有重要的意义,它们揭示了生命体和宇宙的基本结构和运行规律。", "label": "细胞理论是生物科学的一个理论,它认为所有生命体都是由微小的基本单元——细胞所构成。这是生物学的一个基础理论,认为细胞是所有生物的基本结构和功能单位,所有的生物都是由一个或多个细胞组成,细胞只能通过细胞分裂产生新的细胞。这一理论由薛定谔、施瓦内和雪莱于1839年首次提出。\n\n日心说是指太阳是太阳系的中心,也就是说,行星围绕太阳旋转的理论。这个理论打破了传统的地心说观点,认为地球并不是宇宙的中心。日心说的提出者是尼古拉·哥白尼,他在16世纪初发表了他的著作《天体运行论》,阐述了太阳系行星围绕太阳运行的模型,为天文学的发展做出了巨大贡献。"}
{"prompt": "输入三支篮球队的名称并生成一个适当的口号。\n输入:俄克拉荷马城雷霆队,芝加哥公牛队,布鲁克林网队。", "predict": "\"雷霆激战,公牛荣耀,网破天荒,三队齐心创辉煌!\"", "label": "输出: \"雷霆,公牛和网队:各显神通,角逐群雄!\""}
{"prompt": "比较以下两位著名人物,使用特征列表并以句子形式提供输出。\n人物1:阿尔伯特·爱因斯坦\n人物2:斯蒂芬·霍金\n特征:智力、教育、对世界的影响", "predict": "人物1:阿尔伯特·爱因斯坦\n- 智力:极高,以其相对论理论对物理学产生了深远影响。\n- 教育:获得苏黎世联邦理工学院的学位,并在普林斯顿高等研究院担任教授。\n- 对世界的影响:他的相对论理论改变了我们对时间、空间和重力的理解,对现代科技和宇宙学产生了巨大影响。\n\n人物2:斯蒂芬·霍金\n- 智力:极高,尽管患有肌萎缩性脊髓侧索硬化症,但他在宇宙学和黑洞研究方面做出了重要贡献。\n- 教育:获得牛津大学和剑桥大学的学位,并在剑桥大学担任卢卡斯数学教授。\n- 对世界的影响:他使宇宙学和黑洞研究更加普及,通过《时间简史》等书籍向大众解释复杂的科学概念,激励了无数人对科学的兴趣。", "label": "阿尔伯特·爱因斯坦和斯蒂芬·霍金都是拥有极其出色智力的人物。两人都取得过非常高的教育成就,他们推进了科学发展并在世界范围内产生了深远的影响。爱因斯坦以其相对论和质能关系公式而闻名,而霍金以其关于黑洞和宇宙的发现而著称。两位科学家都以其深厚的学识和非凡的贡献影响了世界。"}

在输出结果中, predict 是大模型的推理结果。方便大家对比 predict 和 label,并评估大模型推理的精度。

请添加图片描述

为了避免大模型中途程序崩溃,把原始数据分块进行推理。这样即使程序中途崩溃,也能基于之前保存的分快数据继续推理,而不用重新开始推理。

速度对比

异步调用速度

下面是两个异步调用的进度条:

100%|██████████████████████████████████████████████████████████████████████████████████████████████| 50/50 [00:22<00:00, 2.27it/s]
100%|██████████████████████████████████████████████████████████████████████████████████████████████| 50/50 [00:22<00:00, 2.22it/s]

上述异步实验总共数据为100条数据,分块大小为50,故有2个进度条。100条速度44秒全部处理完成,平均处理速度 每秒处理2.2条数据。

同步调用速度

同步调用 LLM api 的代码很简单,如下所示:

infers = []
for prompt in tqdm(prompts):
    infers.append(llm.llm.invoke(prompt))

下面是同步调用的进度条:

100%|████████████████████████████████████████████████████████████████████████████████████████████| 100/100 [06:54<00:00, 4.15s/it]

如果使用同步调用,100条数据,总共耗时 6分54秒,平均每条耗时4.15秒。

方法推理100条数据时间
同步6分54秒
异步44秒

对比之下,异步调用比同步调用快了大约 9.41 倍。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2253645.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Linux的用户和权限【Linux操作系统】

文章目录 Linux的用户切换用户普通用户暂时以root用户的权限执行指令如何把一个普通用户加入白名单? 新建用户 Linux权限权限的组成更改权限文件/目录权限的表示方法&#xff1a; umask粘滞位添加粘滞位的方法 Linux的用户 Linux下有两种⽤⼾&#xff1a;超级用户&#xff08…

如何使用apache部署若依前后端分离项目

本章教程介绍,如何在apache上部署若依前后端分离项目 一、教程说明 本章教程,不介绍如何启动后端以及安装数据库等步骤,着重介绍apache的反向代理如何配置。 参考此教程,默认你已经完成了若依后端服务的启动步骤。 前端打包命令使用以下命令进行打包之后会生成一个dist目录…

优先算法 —— 滑动窗口系列 - 无重复字符的最长子串

目录 前言 1. 无重复字符的最长子串 2. 题目解析 3. 算法原理 解法1&#xff1a;暴力枚举 哈希表&#xff08;判断字符是否有重复出现&#xff09; 解法2&#xff1a;滑动窗口 4. 代码 前言 当我们发现暴力解法两个指针都不回退&#xff0c;都是向同一个方向移动的时候我…

2024年认证杯SPSSPRO杯数学建模B题(第一阶段)神经外科手术的定位与导航解题全过程文档及程序

2024年认证杯SPSSPRO杯数学建模 B题 神经外科手术的定位与导航 原题再现&#xff1a; 人的大脑结构非常复杂&#xff0c;内部交织密布着神经和血管&#xff0c;所以在大脑内做手术具有非常高的精细和复杂程度。例如神经外科的肿瘤切除手术或血肿清除手术&#xff0c;通常需要…

Jest timers

引入 我们自己先写一个定时器,在这里,这个测试是一定会通过的,因为他一旦传入callback,就算是完成了,而不是在意你的运行结果了,而且你的定时器还有几秒呢 export const timer (fn) > {setTimeout(() > {fn()}, 3000) }//test import {timer} from "./timer"…

数据链路层(四)---PPP协议的工作状态

1 PPP链路的初始化 通过前面几章的学习&#xff0c;我们学了了PPP协议帧的格式以及组成&#xff0c;那么对于使用PPP协议的链路是怎么初始化的呢&#xff1f; 当用户拨号上网接入到ISP后&#xff0c;就建立起了一条个人用户到ISP的物理链路。这时&#xff0c;用户向ISP发送一…

UE5 C++ 不规则按钮识别,复选框不规则识别 UPIrregularWidgets

插件名称&#xff1a;UPIrregularWidgets 插件包含以下功能 你可以点击任何图片&#xff0c;而不仅限于矩形图片。 UPButton、UPCheckbox 基于原始的 Button、Checkbox 扩展。 复选框增加了不规则图像识别功能&#xff0c;复选框增加了悬停事件。 欢迎来到我的博客 记录学习过…

Latex转word(docx)或者说PDF转word 一个相对靠谱的方式

0. 前言 投文章过程中总会有各种各样的要求&#xff0c;其中提供word格式的手稿往往是令我头疼的一件事。尤其在多公式的文章中&#xff0c;其中公式转换是一个头疼的地方&#xff0c;还有很多图表&#xff0c;格式等等&#xff0c;想想就让人头疼欲裂。实践中摸索出一条相对靠…

Leetcode打卡:棋盘上有效移动组合的数目

执行结果&#xff1a;通过 题目&#xff1a;2056 棋盘上有效移动组合的数目 有一个 8 x 8 的棋盘&#xff0c;它包含 n 个棋子&#xff08;棋子包括车&#xff0c;后和象三种&#xff09;。给你一个长度为 n 的字符串数组 pieces &#xff0c;其中 pieces[i] 表示第 i 个棋子的…

Day5:生信新手笔记 — R语言基本语法

一、数据类型 &#xff08;重点只有两个&#xff0c;剩下的不看&#xff09; 1.1 向量&#xff08;vector&#xff09; 矩阵&#xff08;Matrix&#xff09; 数组&#xff08;Array&#xff09; 1.2 数据框&#xff08;Data frame&#xff09; x<- c(1,2,3) #常用的向…

【Win11的Bug】无法在文件夹中创建txt文件

问题 右键只能新建文件夹 , 无法新建txt文本文档 解决办法 将注册表中的一个参数从1改为0即可. 具体内容: WinR输入regeditHKEY_LOCAL_MACHINE\SOFTWARE\Microsoft\Windows\CurrentVersion\Policies\System 将1改为0(下面这张图我已改过) 4.然后重新启动电脑即可 小技…

word如何快速创建目录?

文章目录 1&#xff0c;先自己写出目录的各级标题。2、选中目标标题&#xff0c;然后给它们编号3、给标题按照个人需求开始分级4、插入域构建目录。4.1、利用快捷键插入域构建目录4.2、手动插入域构建目录 听懂掌声&#xff01;学会了吗&#xff1f; 前提声明&#xff1a;我在此…

Java程序调kubernetes(k8s1.30.7)core API简单示例,并解决403权限验证问题,即何进行进行权限授权以及验证

简单记录问题 一、问题描述 希望通过Java程序使用Kubernetes提供的工具包实现对Kubernetes集群core API的调用&#xff0c;但是在高版本上遇见权限验证问题4xx。 <dependency><groupId>io.kubernetes</groupId><artifactId>client-java</artifact…

合合信息扫描全能王线下体验活动:科技与人文的完美交融

文章目录 前言签到欢迎仪式产品体验智能高清滤镜去除透字效果照片高清修复 破冰行动会议感受 前言 作为合合信息旗下扫描全能王的忠实粉丝&#xff0c;上周&#xff0c;我很荣幸参与了扫描全能王“扫出你的能量buff”快闪活动及技术交流会。这次活动的不仅让我对这款强大的文档…

【工具变量】上市公司企业所在地城市等级直辖市、副省级城市、省会城市 计划单列市(2005-2022年)

一、包含指标&#xff1a; 股票代码 股票代码 股票简称 年份 所属城市 直辖市&#xff1a;企业所在地是否属于直辖市。1是&#xff0c;0否。 副省级城市&#xff1a;企业所在地是否属于副省级城市。1是&#xff0c;0否。 省会城市&a…

Svn如何切换删除账号

记录Svn清除切换账号 1.首先打开小乌龟的设置如下图 打开设置后单击已保存数据&#xff0c;然后选择清除 接上图选择清除后&#xff0c;就可以打勾选择清除已保存的账号&#xff0c;我们再次检出的就可以切换账号了 &#x1f449;总结 本次记录Svn清除切换账号 如能帮助到你…

ASP.NET Core SignalR 入门

一、简介 &#x1f4e2; SignalR的主要功能是提供服务器和客户端之间的实时通信。当连接的客户端变得可用时&#xff0c;服务器可以立即向其推送内容&#xff0c;而不是等待客户端发起请求。这种功能特别适合需要实时更新数据的应用场景&#xff0c;如聊天应用、实时数据分析、…

分布式光伏电站如何实现监控及集中运维管理?

安科瑞戴婷 Acrel-Fanny 前言 今年以来&#xff0c;在政策利好推动下光伏、风力发电、电化学储能及抽水蓄能等新能源行业发展迅速&#xff0c;装机容量均大幅度增长&#xff0c;新能源发电已经成为新型电力系统重要的组成部分&#xff0c;同时这也导致新型电力系统比传统的电…

2022-12-4----Android11(H713m)---- WiFi驱动添加写入mac号补丁

一、问题 用全志的写号工具&#xff0c;写入wifi_mac&#xff0c;设置下边不生效 二、分析 因为我们的WiFi不是用全志平台的&#xff0c;也不是用全志集成好的&#xff0c;而是用希微这家第三方的WiFi/BT&#xff0c;所以该驱动还没完善。 三、修改前的准备 用写号工具写号…

Linux笔记---进程:进程替换

1. 进程替换的概念 进程替换是指在一个正在运行的进程中&#xff0c;用一个新的程序替换当前进程的代码和数据&#xff0c;使得进程开始执行新的程序&#xff0c;而不是原来的程序。 这种技术通常用于在不创建新进程的情况下&#xff0c;改变进程的行为。 我们之前谈到过for…