项目简介:
小李哥将继续每天介绍一个基于亚马逊云科技AWS云计算平台的全球前沿AI技术解决方案,帮助大家快速了解国际上最热门的云计算平台亚马逊云科技AWS AI最佳实践,并应用到自己的日常工作里。
本次介绍的是如何在亚马逊云科技利用CodePipeline实现机器学习模型算法自动化微调和部署,首先在自动化工作流中创建Step Function状态机,利用状态机在机器学习托管服务SageMaker上微调大语言模型,最终为用户提供了一个对外暴露的URL端点用于推理。本架构设计全部采用了云原生Serverless架构,提供可扩展和安全的AI解决方案。本方案的解决方案架构图如下:
方案所需基础知识
什么是 Amazon SageMaker?
Amazon SageMaker 是亚马逊云科技提供的一站式机器学习服务,旨在帮助开发者和数据科学家轻松构建、训练和部署机器学习模型。SageMaker 提供了从数据准备、模型训练到模型部署的全流程工具,使用户能够高效地在云端实现机器学习项目。
什么是 Amazon Step Functions?
Amazon Step Functions 是亚马逊云科技提供的一项完全托管的工作流编排服务,允许用户通过可视化的方式将多个 AWS 服务串联在一起,形成自动化的流程。Step Functions 使开发者能够轻松定义和管理复杂的工作流,包括分支决策、并行处理、错误处理和重试逻辑。
使用 Step Function 状态机自动化 SageMaker 上大模型创建、微调、部署的好处
通过使用 Amazon Step Functions 状态机,开发者可以自动化 Amazon SageMaker 上的大模型创建、微调和部署过程。Step Functions 允许将这些步骤串联成一个可视化的工作流,简化了复杂的机器学习管道管理。自动化的好处包括:
提高效率:
将重复性任务自动化,减少人工干预,加速模型开发和部署流程。
降低错误风险:
通过预定义的工作流,确保每个步骤按序执行,降低人为错误的可能性。
增强可扩展性:
轻松处理不同规模的机器学习任务,从小规模实验到大规模生产部署,保持一致的工作流管理。
简化运维:
自动化流程可简化模型的监控和管理,便于随时调整和优化机器学习管道。
利用 Step Functions 自动化 SageMaker 的操作,不仅提高了机器学习项目的开发效率,还确保了整个流程的稳定性和可重复性。
本方案包括的内容
1. 通过SDK代码形式定义亚马逊云科技State Function状态机配置
2. 配置亚马逊云科技Pipeline构建CICD管道,自动化创建State Function工作流
3. 启动State Function工作流自动化大语言AI模型的创建、微调和部署
项目搭建具体步骤:
1. 首先我们进入到亚马逊云科技控制台,进入CodeCommit代码库服务,点击"Clone URL"分别复制两个代码库的URL,用于将代码库代码clone到本地。
2. 下面进入到亚马逊云科技云端IDE Cloud9中,创建一个新的Cloud9后点击“Open”打开。
3. 在IDE控制台中运行以下命令,将“genai-repo”中的模型文件下载到本地
git clone <genai-repo URL>
cd genai-repo
4. 我们在文件夹中新建如下两个文件“buildspec.yml”和“state_machine_manager.py”,分别是CICD和Step Function状态配置文件。文件内容如下:
“buildspec.yml”:该文件主要是在CICD代码构建中的配置文件,主要是运行命令“python state_machine_manager.py”
version: 0.2
phases:
install:
commands:
- python --version
- pip install --upgrade pip
- pip install boto3
- pip install --upgrade sagemaker
- pip install --upgrade stepfunctions
pre_build:
commands:
- cd $CODEBUILD_SRC_DIR
build:
commands:
- echo Build started on `date`
- cd $CODEBUILD_SRC_DIR
- echo Current directory `ls -la`
- echo Building the AWS Step-Function...
- echo Path `pwd`
- python state_machine_manager.py
post_build:
commands:
- echo Build completed on `date`
“state_machine_manager.py”:该文件主要是用于创建一个Step Function,定义工作流在SageMaker上对模型进行自动化创建、微调和部署,整个Step Function工作流包含多个状态,具体的定义在workflow_definition变量中。
import boto3
import datetime
import random
import uuid
import logging
import stepfunctions
import sagemaker
import io
import random
import json
import sys
from sagemaker import djl_inference
from sagemaker import image_uris
from sagemaker import Model
from stepfunctions import steps
from stepfunctions.steps import *
from stepfunctions.workflow import Workflow
iam = boto3.client('iam')
s3 = boto3.client('s3')
stepfunctions.set_stream_logger(level=logging.INFO)
### SET UP STEP FUNCTIONS ###
unique_timestamp = f"{datetime.datetime.now():%H-%m-%S}"
state_machine_name = f'FineTuningLLM-{unique_timestamp}'
notebook_name = f'fine-tuning-llm-{unique_timestamp}'
succeed_state = Succeed("HelloWorldSuccessful")
fail_state = Fail("HelloWorldFailed")
new_model_name = f"trained-dolly-{unique_timestamp}"
try:
# Get a list of all bucket names
bucket_list = s3.list_buckets()
# Filter bucket names starting with 'automate'
bucket_names = [bucket['Name'] for bucket in bucket_list['Buckets'] if bucket['Name'].startswith('automate')]
mybucket = bucket_names[0].strip("'[]")
except Exception as e:
print(f"Error: {e}")
# Get the stepfunction_workflow_role
try:
role = iam.get_role(RoleName='stepfunction_workflow_role')
workflow_role = role['Role']['Arn']
except iam.exceptions.NoSuchEntityException:
print("The role 'stepfunction_workflow_role' does not exist.")
# Get the sagemaker_exec_role
try:
role2 = iam.get_role(RoleName='sagemaker_exec_role')
sagemaker_exec_role = role2['Role']['Arn']
except iam.exceptions.NoSuchEntityException:
print("The role 'sagemaker_exec_role' does not exist.")
# Create a SageMaker model object
model_data="s3://{}/output/lora_model.tar.gz".format(mybucket)
image_uri = image_uris.retrieve(framework="djl-deepspeed",
version="0.22.1",
region="us-east-1")
trained_dolly_model = Model(image_uri=image_uri,
model_data=model_data,
predictor_cls=djl_inference.DJLPredictor,
role=sagemaker_exec_role)
# Create a retry configuration for SageMaker throttling exceptions. This is attached to
# the SageMaker steps to ensure they are retried until they run.
SageMaker_throttling_retry = stepfunctions.steps.states.Retry(
error_equals=['ThrottlingException', 'SageMaker.AmazonSageMakerException'],
interval_seconds=5,
max_attempts=60,
backoff_rate=1.25
)
# Create a state machinestep to create the model
model_step = steps.ModelStep(
'Create model',
model=trained_dolly_model,
model_name=new_model_name
)
# Add a retry configuration to the model_step
model_step.add_retry(SageMaker_throttling_retry)
# Create notebook for running SageMaker training job.
create_sagemaker_notebook = LambdaStep(
state_id="Create training job",
parameters={
"FunctionName": "create_notebook_function",
"Payload": {"notebook_name": notebook_name},
},
)
# Get notebook status
get_notebook_status = LambdaStep(
state_id="Get training job status",
parameters={
"FunctionName": "get_notebook_status_function",
"Payload": {"notebook_name": notebook_name},
},
)
#choice state
response_notebook_status = Choice(state_id="Response to training job status")
wait_for_training_job = Wait(
state_id="Wait for training job",
seconds=150)
wait_for_training_job.next(get_notebook_status)
#retry checking notebook status
response_notebook_status.add_choice(
rule=ChoiceRule.StringEquals(
variable="$.Payload.trainningstatus", value="Failed"
),
next_step=fail_state,
)
response_notebook_status.add_choice(
rule=ChoiceRule.StringEquals(
variable="$.Payload.trainningstatus", value="Stopped"
),
next_step=fail_state,
)
response_notebook_status.add_choice(
ChoiceRule.StringEquals(
variable="$.Payload.trainningstatus", value="NotAvailable"
),
next_step=fail_state,
)
inservice_rule=ChoiceRule.StringEquals(
variable="$.Payload.trainningstatus", value="InService"
)
response_notebook_status.add_choice(
ChoiceRule.Not(inservice_rule),
next_step=wait_for_training_job,
)
# Create a step to generate an Amazon SageMaker endpoint configuration
endpoint_config_step = steps.EndpointConfigStep(
"Create endpoint configuration",
endpoint_config_name=new_model_name,
model_name=new_model_name,
initial_instance_count=1,
instance_type='ml.g4dn.2xlarge'
)
# Add a retry configuration to the endpoint_config_step
endpoint_config_step.add_retry(SageMaker_throttling_retry)
# Create a step to generate an Amazon SageMaker endpoint
endpoint_step = steps.EndpointStep(
"Create endpoint",
endpoint_name=f"endpoint-{new_model_name}",
endpoint_config_name=new_model_name
)
# Add a retry configuration to the endpoint_step
endpoint_step.add_retry(SageMaker_throttling_retry)
# Chain the steps together to generate a full AWS Step Function
workflow_definition = steps.Chain([
create_sagemaker_notebook,
wait_for_training_job,
get_notebook_status,
response_notebook_status,
model_step,
endpoint_config_step,
endpoint_step
])
# Create an AWS Step Functions workflow based on inputs
basic_workflow = Workflow(
name=state_machine_name,
definition=workflow_definition,
role=workflow_role,
)
jsonDef = basic_workflow.definition.to_json(pretty=True)
print('---------')
print(jsonDef)
print('---------')
basic_workflow.create()
5.接下来我们将文件夹中新的全部文件上传回我们的代码库中
git add *
git commit -m "initial commit"
git pus
6. 接下来我们进入到代码构建服务CodeBuild中,创建一个新的项目。
7.为项目起名“genai-build”,并为构建添加代码库,代码库设置为genai-repo,分支选为master。
8.为代码构建添加授权权限,以及构建配置文件Buildspec,最后点击创建。
9. 接下来我们进入到CodePipeline中创建一个新的CICD部署任务
10.为pipeline起名“genai-pipeline”,并分配授权权限。
11. 首先选择CICD部署流中的数据源,选择类型为CodeCommit代码库,项目repo为“genai-repo”,分支为master。
12. 在Build代码构建阶段选择我们刚刚创建的CodeBuild项目“genai-build”。省略部署阶段,直接点击创建。
13. 等待代码构建阶段成功完成,接下来我们进入到step function服务主页。
14. 在step function主页中我们可以看到codebuild服务中我们新创建了一个Step Function: “FineTuningLLM-19-08-44”
15. 我们点击Step Function后可以获取我们之前定义的工作流配置信息、
{
"StartAt": "Create training job",
"States": {
"Create training job": {
"Parameters": {
"FunctionName": "create_notebook_function",
"Payload": {
"notebook_name": "fine-tuning-llm-19-08-44"
}
},
"Resource": "arn:aws:states:::lambda:invoke",
"Type": "Task",
"Next": "Wait for training job"
},
"Wait for training job": {
"Seconds": 150,
"Type": "Wait",
"Next": "Get training job status"
},
"Get training job status": {
"Parameters": {
"FunctionName": "get_notebook_status_function",
"Payload": {
"notebook_name": "fine-tuning-llm-19-08-44"
}
},
"Resource": "arn:aws:states:::lambda:invoke",
"Type": "Task",
"Next": "Response to training job status"
},
"Response to training job status": {
"Type": "Choice",
"Choices": [
{
"Variable": "$.Payload.trainningstatus",
"StringEquals": "Failed",
"Next": "HelloWorldFailed"
},
{
"Variable": "$.Payload.trainningstatus",
"StringEquals": "Stopped",
"Next": "HelloWorldFailed"
},
{
"Variable": "$.Payload.trainningstatus",
"StringEquals": "NotAvailable",
"Next": "HelloWorldFailed"
},
{
"Not": {
"Variable": "$.Payload.trainningstatus",
"StringEquals": "InService"
},
"Next": "Wait for training job"
}
],
"Default": "Create model"
},
"Create model": {
"Parameters": {
"ExecutionRoleArn": "arn:aws:iam::903982278766:role/sagemaker_exec_role",
"ModelName": "trained-dolly-19-08-44",
"PrimaryContainer": {
"Environment": {},
"Image": "763104351884.dkr.ecr.us-east-1.amazonaws.com/djl-inference:0.22.1-deepspeed0.9.2-cu118",
"ModelDataUrl": "s3://automate-fine-tuning-e91ee010/output/lora_model.tar.gz"
}
},
"Resource": "arn:aws:states:::sagemaker:createModel",
"Type": "Task",
"Next": "Create endpoint configuration",
"Retry": [
{
"ErrorEquals": [
"ThrottlingException",
"SageMaker.AmazonSageMakerException"
],
"IntervalSeconds": 5,
"MaxAttempts": 60,
"BackoffRate": 1.25
}
]
},
"Create endpoint configuration": {
"Resource": "arn:aws:states:::sagemaker:createEndpointConfig",
"Parameters": {
"EndpointConfigName": "trained-dolly-19-08-44",
"ProductionVariants": [
{
"InitialInstanceCount": 1,
"InstanceType": "ml.g4dn.2xlarge",
"ModelName": "trained-dolly-19-08-44",
"VariantName": "AllTraffic"
}
]
},
"Type": "Task",
"Next": "Create endpoint",
"Retry": [
{
"ErrorEquals": [
"ThrottlingException",
"SageMaker.AmazonSageMakerException"
],
"IntervalSeconds": 5,
"MaxAttempts": 60,
"BackoffRate": 1.25
}
]
},
"Create endpoint": {
"Resource": "arn:aws:states:::sagemaker:createEndpoint",
"Parameters": {
"EndpointConfigName": "trained-dolly-19-08-44",
"EndpointName": "endpoint-trained-dolly-19-08-44"
},
"Type": "Task",
"End": true,
"Retry": [
{
"ErrorEquals": [
"ThrottlingException",
"SageMaker.AmazonSageMakerException"
],
"IntervalSeconds": 5,
"MaxAttempts": 60,
"BackoffRate": 1.25
}
]
},
"HelloWorldFailed": {
"Type": "Fail"
}
}
}
16. 在Step Function运行状态视图中我们可以看到全部步骤都已经完成了。其中两个状态“create training job"和"get training job status"分别调用了两个不同的lambda python函数。
“create training job"的Python代码如下:
import boto3
import base64
import os
def lambda_handler(event, context):
aws_region = 'us-east-1'
notebook_name = event["notebook_name"]
# s3_bucket='automate-fine-tunning-gblpoc'
notebook_file = 'lab-notebook.ipynb'
iam = boto3.client('iam')
# Create SageMaker and S3 clients
sagemaker = boto3.client('sagemaker', region_name=aws_region)
s3 = boto3.resource('s3', region_name=aws_region)
s3_client = boto3.client('s3')
s3_bucket = os.environ['s3_bucket']
s3_prefix="notebook_lifecycle"
lifecycle_config_script = f"""#!/bin/bash
set -e
cd /home/ec2-user/SageMaker/
aws s3 cp s3://{s3_bucket}/{s3_prefix}/training_scripts.zip .
unzip training_scripts.zip
echo "Running training job..."
source /home/ec2-user/anaconda3/bin/activate pytorch_p310
chmod +x /home/ec2-user/SageMaker/converter.sh
chown ec2-user:ec2-user /home/ec2-user/SageMaker/converter.sh
nohup /home/ec2-user/SageMaker/converter.sh >> /home/ec2-user/SageMaker/nohup.out 2>&1 &
"""
lifecycle_config_name = f'LCF-{notebook_name}'
print(lifecycle_config_script)
# Function to manage lifecycle configuration
def manage_lifecycle_config(lifecycle_config_script):
content = base64.b64encode(lifecycle_config_script.encode('utf-8')).decode('utf-8')
try:
# Create lifecycle configuration if not found
sagemaker.create_notebook_instance_lifecycle_config(
NotebookInstanceLifecycleConfigName=lifecycle_config_name,
OnCreate=[{'Content': content}]
)
except sagemaker.exceptions.ClientError as e:
print(e)
# Try to describe the notebook instance to determine its status
# Get the role with the specified name
try:
role = iam.get_role(RoleName='sagemaker_exec_role')
sagemaker_exec_role = role['Role']['Arn']
except iam.exceptions.NoSuchEntityException:
print("The role 'sagemaker_exec_role' does not exist.")
try:
response = sagemaker.describe_notebook_instance(NotebookInstanceName=notebook_name)
except sagemaker.exceptions.ClientError as e:
print(e)
if 'RecordNotFound' in str(e):
manage_lifecycle_config(lifecycle_config_script)
# Create a new SageMaker notebook instance if not found
# Updated to 4xl by DWhite due to 12xl not being available. 7/18/2024
sagemaker.create_notebook_instance(
NotebookInstanceName=notebook_name,
InstanceType='ml.g5.4xlarge',
RoleArn=sagemaker_exec_role,
LifecycleConfigName=lifecycle_config_name,
VolumeSizeInGB=30
)
else:
raise
return {
'statusCode': 200,
'body': 'Notebook instance setup and lifecycle configuration applied.'
}
"get training job status"的代码如下:
import boto3
import json
import os
s3 = boto3.client('s3')
sagemaker = boto3.client('sagemaker')
s3_bucket = os.environ['s3_bucket']
def lambda_handler(event, context):
print(event)
notebook_name = event["notebook_name"]
notebook_status = "NotAvailable"
training_job_status = 'NotAvailable'
check_status = 'NotAvailable'
# Try to describe the notebook instance to determine its status
try:
response = sagemaker.describe_notebook_instance(NotebookInstanceName=notebook_name)
notebook_status = response['NotebookInstanceStatus']
if notebook_status == 'InService':
find_artifact = s3.list_objects_v2(
Bucket=s3_bucket,
Prefix='output/lora_model.tar.gz'
)
artifact_location = find_artifact.get('Contents',[])
if not artifact_location:
training_job_status = 'Creating'
check_status = 'Creating'
else:
if 'output/lora_model.tar.gz' in str(artifact_location):
training_job_status = 'Completed'
check_status = 'InService'
elif notebook_status == 'Failed':
check_status = 'Failed'
elif notebook_status == 'NotAvailable':
check_status = 'NotAvailable'
else:
check_status = 'Pending'
print(f"Notebook Status: {notebook_status}")
print(f"Model on s3: {training_job_status}")
print(f"Check status: {check_status}")
except sagemaker.exceptions.ClientError as e:
print(e)
return {
'statusCode': 200,
'input': notebook_name,
'trainningstatus': check_status
}
17. 在Step Function工作流全部任务结束后,我们进入到SageMaker服务中,创建一个Jupyter Notebook并打开。
18. 我们创建一个新的Jupyter Notebook文件,并复制Fine-tuning微调代码。我们节选了部分微调代码段,主要是利用PEFT和Lora微调Dolly大语言模型。
EPOCHS = 10
LEARNING_RATE = 1e-4
MODEL_SAVE_FOLDER_NAME = "dolly-3b-lora"
training_args = TrainingArguments(
output_dir=MODEL_SAVE_FOLDER_NAME,
fp16=True,
per_device_train_batch_size=1,
per_device_eval_batch_size=1,
learning_rate=LEARNING_RATE,
num_train_epochs=EPOCHS,
logging_strategy="steps",
logging_steps=100,
evaluation_strategy="steps",
eval_steps=100,
save_strategy="steps",
save_steps=20000,
save_total_limit=10,
)
trainer = Trainer(
model=model,
tokenizer=tokenizer,
args=training_args,
train_dataset=split_dataset['train'],
eval_dataset=split_dataset["test"],
data_collator=data_collator,
)
model.config.use_cache = False # silence the warnings. Please re-enable for inference!
trainer.train()
19. 我们也需要创建一个SageMaker Lifecycle configurationsj脚本,用于在Step Function自动化模型微调中触发命令开启微调,启动脚本如下。
#!/bin/bash
set -e
cd /home/ec2-user/SageMaker/
aws s3 cp s3://automate-fine-tuning-e91ee010/notebook_lifecycle/training_scripts.zip .
unzip training_scripts.zip
echo "Running training job..."
source /home/ec2-user/anaconda3/bin/activate pytorch_p310
chmod +x /home/ec2-user/SageMaker/converter.sh
chown ec2-user:ec2-user /home/ec2-user/SageMaker/converter.sh
nohup /home/ec2-user/SageMaker/converter.sh >> /home/ec2-user/SageMaker/nohup.out 2>&1 &
20. 最后我们进入到SageMaker的Endpoint工程中,就可以看到部署成功的AI大模型API端点URL了。
以上就是在亚马逊云科技上利用亚马逊云科技CICD服务CodePipeline和Step Function工作流,自动化AI大语言模型的创建、微调、部署的全部步骤。欢迎大家未来与我一起,未来获取更多国际前沿的生成式AI开发方案。