基于 Lambda 实现 Claude3 的流式响应

news2024/11/26 13:56:43

在如今的大语言模型推理输出场景中,流式响应基本已成为必备的功能之一。一方面符合大语言模型生成方式的本质,另一方面当模型推理效率不是很高时,流式响应比起全部 generate 后再输出、能大幅缩短从开始请求到输出第一个 Token 的时间,极大地提高用户体验。

从端到端的视角,可以将大语言模型的流式响应分为两个部分:模型本身的流式推理、后端程序将模型的流式响应输出到客户端,如下图所示。本文将以 Claude3 为例,阐述如何端到端的实现大语言模型的流式响应。


一、流式推理

当今的主流大语言模型,大都已支撑流式推理。我们以 Python 代码来实现 Claude3 的流式推理为例,给出如下示例。Claude3 采用了 Message API,支持多模态的数据输入。推理返回结果的内容结构,相比于此前 Claude2 的 Completions API 也发生了一些变化。

import json
import sys
import boto3

client = boto3.client("bedrock-runtime", region_name="us-east-1")

# 使用文本提示调用Claude 3
model_id = "anthropic.claude-3-sonnet-20240229-v1:0"
prompt = "你好"

body = json.dumps({
    "anthropic_version": "bedrock-2023-05-31",
    "max_tokens": 1024,
    "messages": [
        {
            "role": "user",
            "content": [{"type": "text", "text": prompt}],
        }
    ],
})

response = client.invoke_model_with_response_stream(
    body=body,
    modelId=model_id,
    accept="application/json",
    contentType="application/json",
)

stream = response["body"]
if stream:
    for event in stream:
        chunk = event.get("chunk")
        if chunk:
            chunk_obj = json.loads(chunk.get("bytes").decode())
            if 'delta' in chunk_obj.keys() and 'type' in chunk_obj.keys():
                if chunk_obj['type'] == 'content_block_delta':
                    chunk_obj = chunk_obj['delta']
                    sys.stdout.write(chunk_obj["text"])
                    sys.stdout.flush()  # 正式使用时,这里可以用yield返回

输出效果如下所示:

二、流式响应的技术方案介绍

目前流式响应输出到客户端的技术,主要有长轮询、 WebSocket 与 SSE 。

长轮询

浏览器发出 XMLHttpRequest 请求,服务器端接收到请求后,会阻塞请求直到有数据或者超时才返回,浏览器 JS 在处理请求返回信息(超时或有效数据)后再次发出请求,重新建立连接。 这种方式用于流式响应,一方面增加了不必要的请求-响应的往返时间,还给客户端与服务端带来了额外的负载压力与资源浪费。

WebSocket

WebSocket 是 HTML5 定义的新协议,实现了服务器与客户端之间的全双工通信。WebSocket 连接一旦建立,客户端和服务器端处于平等地位,可以相互发送数据,不存在请求和响应的区别,其原理如下图所示。

WebSocket 通常应用于实时聊天、多人在线游戏等场景。在 AWS 上可以使用 API Gateway 与 Lambda 来实现 Websocket 的服务端。但在 Claude3 的流式响应场景中,采用 Websocket 方案显得有些大材小用,我们希望能有更轻量级的实现方式。

HTTP SSE

HTTP SSE 的全称是 HTTP Server-Sent Events,它提供了一种从服务器实时发送更新事件到客户端的技术。SSE 主要解决了客户端与服务器之间的单向实时通信需求(例如 Claude3 回答的流式响应),相较于 WebSocket(双向实时),它更加轻量级且易于实现。SSE 是基于 HTTP 协议实现的,所以更适用于服务器持续的向客户端发送文本。其原理示意如下图所示。

本文将主要使用 HTTP-SSE 技术来展开介绍,如何将 Claude3 的回答流式推向客户端。

三、使用 Python Flask 搭建 SSE Demo

一提到 SSE,我们首先考虑了使用 Python Flask 框架来实现一个原型(Demo)。Flask 是一个轻量级的 Web 应用框架,它提供了简单易用的工具和技术来构建 Web 服务器。特别是,我们利用了 Flask 的 stream_with_context 功能来实现服务器发送事件(SSE),这使得服务器能够以流的形式实时推送数据到客户端。

将 Claude3 的流式推理结果,通过 SSE 技术推送到客户端的代码如下:

#pip install Flask boto3 CORS
from flask import Flask, Response, stream_with_context, request
from flask_cors import CORS  # 导入CORS模块
import time
import json
import sys
import boto3


app = Flask(__name__)
CORS(app)  # 为app添加跨域支持

def generate_stream(prompt):
    client = boto3.client("bedrock-runtime", region_name="us-east-1")

    # 调用 Claude 3 并提供文本提示
    model_id = "anthropic.claude-3-sonnet-20240229-v1:0"

    body = json.dumps({
        "anthropic_version": "bedrock-2023-05-31",
        "max_tokens": 1024,
        "messages": [{
            "role": "user",
            "content": [{"type": "text", "text": prompt}],
        }],
    })

    response = client.invoke_model_with_response_stream(
        body=body,
        modelId=model_id,
        accept="application/json",
        contentType="application/json",
    )

    stream = response["body"]
    if stream:
        for event in stream:
            chunk = event.get("chunk")
            if chunk:
                chunk_obj = json.loads(chunk.get("bytes").decode())
                if 'delta' in chunk_obj.keys() and 'type' in chunk_obj.keys():
                    if chunk_obj['type'] == 'content_block_delta':
                        chunk_obj = chunk_obj['delta']
                        yield f"data: {json.dumps(chunk_obj['text'], ensure_ascii=False)}\n\n"  # 修改为适合HTTP SSE的格式


@app.route('/stream')
def stream():
    prompt = request.args.get('prompt', '你好')  # 从HTTP请求中提取prompt变量的值,默认值为"你好"
    response = Response(stream_with_context(generate_stream(prompt)), content_type='text/event-stream')
    response.headers['Access-Control-Allow-Origin'] = '*'  # 允许所有域名跨域访问
    return response

if __name__ == '__main__':
    app.run(debug=True, port=9000, host='0.0.0.0')

web 端代码如下:

<!DOCTYPE html>
<html lang="zh">
<head>
    <meta charset="UTF-8">
    <title>Stream Demo</title>
</head>
<body>
    <h2>输入提示词:</h2>
    <input type="text" id="promptInput" placeholder="请输入提示词">
    <button onclick="startStream()">开始流式响应</button>
    <div id="output" style="margin-top: 20px;"></div>

    <script>
        function startStream() {
            const prompt = document.getElementById('promptInput').value;
            const outputDiv = document.getElementById('output');
            outputDiv.innerHTML = ''; // 清空之前的输出

            // 创建一个新的EventSource实例,连接到服务器端的/stream端点
            const eventSource = new EventSource(`http://xx.xx.xx.xx:9000/stream?prompt=${encodeURIComponent(prompt)}`);

            eventSource.onmessage = function(event) {
                // 当接收到新的数据时,将其添加到页面上

                console.log('Received data:', event.data);
                
                // 使用 decodeURIComponent 和 escape 函数转换 Unicode 编码的字符串
                var decodedMessage = decodeURIComponent(event.data).replace(/^"|"$/g, '');
                
                outputDiv.innerHTML += decodedMessage; 
            };

            eventSource.onerror = function() {
                // 如果发生错误,关闭连接
                eventSource.close();
                outputDiv.innerHTML += '<p>流已结束</p>';
            };
        }
    </script>
</body>
</html>

效果如下所示:

尽管使用 Flask API 来实现这个 Demo 相对简单直接,但在生产环境中,直接使用 Flask API 作为 API 服务器存在一些劣势,特别是当考虑到可扩展性、管理和成本效率时。例如,Flask 应用通常需要部署在一台或多台服务器上,这意味着你需要管理这些服务器的维护、监控和扩展。此外,对于流量波动较大的应用,服务器可能会在低峰时期闲置,造成资源浪费,或在高峰时期过载,影响服务质量。

为了克服这些劣势,我们转向了 AWS Lambda 来实现 streaming response。

四、基于 AWS  Lambda  来实现  SSE Demo

AWS Lambda 是一个无服务器计算服务,它允许你运行代码而无需预置或管理服务器。Lambda 只在代码执行时才收费,这使得它在成本效率上对于不定时或间歇性的工作负载非常有吸引力。通过 Lambda,我们可以实现一个更加弹性的架构,自动扩展以应对请求量的变化,同时减少了管理服务器的负担。

在客户端你可以基于 SSE 协议,调用 Lambda 函数 URL 来实时获取响应结果。

创建 Lambda 函数

AWS Lambda 默认支持使用 Node.js Runtime 支持流式响应。对于其他语言,你可以使用使用带有自定义 Runtime 方式来实现,或者使用 Lambda Web 适配器。

然后将如下代码粘贴到 Lambda 函数中:

import util from 'util';
import stream from 'stream';
import {
  BedrockRuntimeClient,
  InvokeModelWithResponseStreamCommand,
} from "@aws-sdk/client-bedrock-runtime";

import querystring from 'querystring';

const finished = util.promisify(stream.finished); 

// Create a new Bedrock Runtime client instance.
const client = new BedrockRuntimeClient({ region: "us-east-1" });
const modelId = "anthropic.claude-3-sonnet-20240229-v1:0";
let prompt = "您好";


export const handler = awslambda.streamifyResponse(async (event, responseStream, _context) => {
  
  console.log(event);

  responseStream.setContentType("text/event-stream");
  
   // 假设event.rawQueryString存在并包含查询字符串
  const rawQueryString = event.rawQueryString;
    
  // 使用querystring模块解析查询字符串
  const params = querystring.parse(rawQueryString);
  
  prompt=params['prompt'];
    
  
  // Prepare the payload for the model.
  const payload = {
    anthropic_version: "bedrock-2023-05-31",
    max_tokens: 1000,
    messages: [
      {
        role: "user",
        content: [{ type: "text", text: prompt }],
      },
    ],
  };
  try {
    // 使用payload调用Claude并等待API响应
    const command = new InvokeModelWithResponseStreamCommand({
      contentType: "application/json",
      body: JSON.stringify(payload),
      modelId,
    });
    const apiResponse = await client.send(command);

    // 解码并处理响应流
    for await (const item of apiResponse.body) {
      const chunk = JSON.parse(new TextDecoder().decode(item.chunk.bytes));
      const chunk_type = chunk.type;

      if (chunk_type === "content_block_delta") {
        const text = chunk.delta.text;
        responseStream.write(`data: ${JSON.stringify(text)}\n\n`);
      }
    }
  } catch (error) {
    console.error("处理API响应时发生错误:", error);
    responseStream.write(`data: ${JSON.stringify({ error: "处理请求时发生错误" })}\n\n`);
  }
  
  responseStream.end();
  
  
});
配置权限

在创建完 Lambda 后,会生成一个具备基础权限的默认角色。需要在该角色中增加 Bedrock 的调用权限。点击下图中的角色名称,打开 IAM 服务中的角色管理页面。

在该 IAM 角色中添加一个内联权限,权限内容如下所示:

配置 Lambda 响应超时时间

在流式响应场景中,Lambda 的整体响应时间范围可能从几秒中到几分钟,具体取决于输入/输出 Token 的大小。所以需要提前修改 Lambda 默认的超时时间。

设置 Lambda 函数 URL

在函数 URL 配置页面,建议 Auth Type 选择 NONE。因为如果选择 AWS_IAM 方式,则需要在客户端存储 AWS 身份认证信息(如 AK/SK),有较大的安全隐患。所以,在面向最终用户的场景中,建议 Auth Type 设置为 NONE,然后在 Lambda 的 Header 中自行实现一些认证方式(如 API_KEY)。

此前,Invoke mode 需要选择 RESPONSE_STREAM,并配置 CORS,允许跨域访问。

Web 端示例代码:
<!DOCTYPE html>
<html lang="zh">
<head>
    <meta charset="UTF-8">
    <title>Stream Demo</title>
</head>
<body>
    <h2>输入提示词:</h2>
    <input type="text" id="promptInput" placeholder="请输入提示词">
    <button onclick="startStream()">开始流式响应(Lambda)</button>
    <div id="output" style="margin-top: 20px;"></div>

    <script>
        function startStream() {
            const prompt = document.getElementById('promptInput').value;
            const outputDiv = document.getElementById('output');
            outputDiv.innerHTML = ''; // 清空之前的输出

            // 创建一个新的EventSource实例,连接到服务器端的/stream端点
            const eventSource = new EventSource(`http://xx.xx.xx.xx:9000/stream?prompt=${encodeURIComponent(prompt)}`);

            eventSource.onmessage = function(event) {
                // 当接收到新的数据时,将其添加到页面上

                console.log('Received data:', event.data);
                
                // 使用 decodeURIComponent 和 escape 函数转换 Unicode 编码的字符串
                var decodedMessage = decodeURIComponent(event.data).replace(/^"|"$/g, '');
                
                outputDiv.innerHTML += decodedMessage; 
            };

            eventSource.onerror = function() {
                // 如果发生错误,关闭连接
                eventSource.close();
                outputDiv.innerHTML += '<p>流已结束</p>';
            };
        }
    </script>
</body>
</html>

效果如下:

Lambda 流式响应的限制
  • 单次调用的流式响应有默认为 20MB 的大小限制,但可以向后台申请提升限制。
  • 单次调用的流式响应的前 6MB 拥有不受限制的带宽。之后将以最大 2MBps 的速率进行流式响应。这在大语言推理场景,应该是完全足够了。
  • 通过 API Gateway 的 LAMBDA_PROXY 集成不支持流式响应。你可以在 API Gateway 和 Lambda 函数 URL 之间使用 HTTP_PROXY 集成,但你将受到 API Gateway 的 10MB 响应负载限制。此外,API Gateway 不支持分块传输编码,因此将无法实现流式响应的预期效果。

五、总结&综述

本文从端到端的视角,介绍了 Claude3 的流式推理,以及服务端流式响应的技术选型。通过比较分析,建议基于 Http-SSE 这种轻量级方式,来实现流式响应。最后,以 Claude3 为例,基于 Http-SSE 技术,分别介绍了使用 Python Flask、 AWS 云原生服务 Lambda 实现流式响应的实践。

本篇作者

张盼富

AWS 解决方案架构师,从业十三年,先后经过历云计算、供应链金融、电商等多个行业,担任过高级开发、架构师、产品经理、开发总监等多种角色,有丰富的大数据应用与数据治理经验。加入亚马逊云科技后,致力于通过大数据+AI 技术,帮助企业加速数字化转型。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1578508.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

2024年最新可用免费云服务器整理汇总

随着云计算技术的不断发展&#xff0c;越来越多的个人和企业开始使用云服务器来满足其数据存储、网站搭建、应用开发等需求。其中&#xff0c;免费云服务器更是受到广大用户的青睐。本文将为大家整理汇总最新的可用免费云服务器资源&#xff0c;助力大家轻松享受云上之旅&#…

transform 模型常见问题

目录 transform 模型常见问题 transform 模型常见问题 1.Transformer为何使用多头注意力机制?(为什么不使用一个头) 答:多头可以使参数矩阵形成多个子空间,矩阵整体的size不变,只是改变了每个head对应的维度大小,这样做使矩阵对多方面信息进行学习,但是计算量和单个h…

Vue 引入config.js后别的js访问不到window对象下的属性

Vue项目里,我们项目配置的请求服务器地址都是在public里config.js里,如下例: 然后在index.html里引入config.js,如下图: 这里要注意的是,script的src要写上<%= BASE_URL %>,代码如下: <!DOCTYPE html> <html><head><meta charset="…

Linux虚拟网络设备深度解析:使用场景、分类与开发者指南

Linux虚拟网络设备支撑着各种复杂的网络需求和配置&#xff0c;从基础的网络桥接到高级的网络隔离和加密&#x1f510;。以下是对主要Linux虚拟网络设备的介绍、它们的作用以及适用场景的概览&#xff0c;同时提出了一种合理的分类&#xff0c;并指出应用开发人员应该着重掌握的…

qt调试日志文件生成

系列文章目录 第一章 qt日志文件生成功能 文章目录 系列文章目录前言一、qt日志文件生成功能二、使用步骤1.代码示例2.运行截图 前言 qt有固定的调试日志接口&#xff0c;可以通过终端去打印&#xff0c;但是仅适用在本地去调试&#xff0c;例如想长期放到测试台去检测&#…

Unity性能优化篇(十三) 物理优化

1.尽量使用简单的碰撞器进行碰撞检测&#xff0c;如球体碰撞器、盒子碰撞器、胶囊体碰撞器&#xff0c;少用网格碰撞器等复杂的碰撞器(如下图)。即使用多个简单的碰撞器组合在一起&#xff0c;也往往比使用网格碰撞器的性能要好。 2.如果要把多个碰撞器组合成一个碰撞器&…

网络通信三要素:IP、端口和协议

IP&#xff1a;设备在网络中的地址&#xff0c;是唯一的标识 IP&#xff1a;全程”互联网协议地址“&#xff0c;是分配给上网设备的唯一标志 IP地址有两种形式&#xff1a; IPv4&#xff1a;32位 IPv6&#xff1a;共128位。分成8段表示&#xff0c;每取四位编码成一个16进制…

matlab学习001-简单的矩阵输入及绘制信号曲线

目录 1&#xff0c;熟悉简单的矩阵输入 1.1&#xff0c;创建矩阵 1.2&#xff0c;在命令行调用文件中的变量 1.3&#xff0c;ones函数 1.4&#xff0c;who和whos的使用 2&#xff0c;绘制信号曲线 2.1&#xff0c;实指数信号 2.2&#xff0c;频率为50Hz的周期方波信号…

win11系统和ubuntu双系统首次连接网线上网流程

硬件准备 首先需要将网线连接到电脑&#xff0c;另一头可以连接交换机或者路由器 上网前需要拨号上网&#xff0c;如果是连的路由器&#xff0c;那么一台路由器上拨号一次就行了。 如果是连的交换机需要拨号上网 这里踩的第一个坑是刚开始电脑连的是交换机1又连的交换机2&…

Cali Linux上的PoshC2安装和使用

一、安装PoshC2 curl -sSL https://raw.githubusercontent.com/nettitude/PoshC2/master/Install-for-Docker.sh | sudo bash二、创建工程 posh-project -n test三、修改配置文件 posh-config将图中的baidu.com改为自己要攻击的域名或者IP地址 四、执行 posh-server 显示没…

苍穹外卖10(Spring Task定时任务,WebSocket双向通信,订单状态定时处理,来电提醒,客户催单)

目录 一、Spring Task 1. 介绍 2. 入门 1 使用步骤 2 使用示例 3. 详解 1 Scheduled注解 2 cron表达式 1 cron表达式6个域 2 各个域的取值说明 4. 小结 二、订单状态定时处理 1. 需求分析 1 问题分析 2 功能需求 2. 代码开发 1 修改引导类加EnableScheduling …

stm32GPO的相关操作

GPIO的使用 1.GPIO八种工作模式1.1 上拉输入1.2 下拉输入1.3 浮空输入1.4 模拟输入1.5 推挽输出1.6 开漏输出1.7 复用推挽输出1.8 复用开漏输出 2.相关寄存器2.1 寄存器配置IO 3.相关库函数 1.GPIO八种工作模式 保护二极管的作用&#xff1a;用来保护IO&#xff0c;一般情况IO的…

图书馆自助借书机怎么借书

图书馆自助借书机借书流程如下&#xff1a; 1. 找到图书馆自助借书机&#xff0c;在机器上选择借书功能。 2. 输入自己的借书卡号或者身份证号码&#xff0c;如果是第一次借书&#xff0c;可能需要进行注册。 3. 输入图书的条形码号码&#xff0c;可以通过扫描条形码或者手动输…

Verilog语法——按位取反“~“和位宽扩展的优先级

前言 先说结论&#xff0c;如下图所示&#xff0c;在Verilog中“~ ”按位取反的优先级是最高的&#xff0c;但是在等式计算时&#xff0c;有时候会遇到位宽扩展&#xff0c;此时需要注意的是位宽扩展的优先级高于“~”。 验证 仿真代码&#xff0c;下面代码验证的是“~”按位取…

2024年上半年WSK-PETS5报名及考试时间公布

4月1日&#xff0c;中国教育考试网发布了2024年上半年全国外语水平考试WSK&#xff08;PETS5&#xff09;的报名及考试通知&#xff0c;为方便关注者&#xff0c;知识人网小编特做全文转载。 国家公派留学人员全国外语水平考试&#xff08;WSK-PETS5&#xff09;成绩作为国家留…

蓝桥杯刷题 深度优先搜索-[NewOJ P1158]N皇后(C++)

题目描述 n皇后问题&#xff1a;n 个皇后放置在 nn 的棋盘上&#xff0c;并且使皇后彼此之间不能相互攻击。 上面布局用序列2 4 6 1 3 5表示&#xff0c;第i个数字表示第i行皇后放的列号。 按照这种格式输出前3个解&#xff0c;并统计总解数。 输入格式 输入一个正整数n&a…

memcached集群

一、介绍 memcache本身没有像redis所具备的数据持久化功能&#xff0c;但是可以通过做集群同步的方式&#xff0c;让各memcache服务器的数据进行同步&#xff0c;从而实现数据的一致性&#xff0c;即保证各memcache的数据是一样的&#xff0c;即使有任何一台memcache发生故障&…

CTK插件框架学习-服务工厂(06)

CTK插件框架学习-信号槽(05)https://mp.csdn.net/mp_blog/creation/editor/137240105 一、服务工厂定义 注册插件时使用服务工厂注册&#xff0c;使用getService根据调用者插件资源文件内容获取在服务工厂内的对应实现在服务工厂中可以知道是哪个插件正在调用服务工厂懒汉模式…

Java数组详解

​TOC 第一章、数组的概念介绍 1.1&#xff09;数组的概念 ①数组就是用来储存数据的容器,可以存储同一种类型的数据&#xff0c;是同一种数据类型的集合。实现对这些数据的统一管理。如果数组中存储的是基本类型数据&#xff0c;我就不能往里面存引用类型数据。数组中存储的…

Java-Tomcat

一、web补充技术 ①&#xff1a;B/S架构 主流的方式&#xff0c;只要有浏览器即可。编程方式直接基于socket即可 ②&#xff1a;javascript 简称js&#xff0c;早期只是实现在客户端的浏览器的动态效果&#xff0c;但服务端不会解释运行&#xff0c;所以本质上是静态资源。 …