利用亚马逊云科技Bedrock和LangChain开发AI驱动数据分析平台

news2025/1/9 14:15:45

项目简介:

小李哥将继续每天介绍一个基于亚马逊云科技AWS云计算平台的全球前沿AI技术解决方案,帮助大家快速了解国际上最热门的云计算平台亚马逊云科技AWS AI最佳实践,并应用到自己的日常工作里。

本次介绍的是如何在亚马逊云科技上SageMaker上利用LangChain框架开发代码调用Amazon Titan大语言模型,基于业务数据库Schema、用户问题以及提示词模板生成LLM链,调用大模型托管服务Amazon Bedrock上的AI大语言模型,生成业务所需的SQL查询语句并利用SQL链查询数据并生成用户问题的答案。本架构设计全部采用了云原生Serverless架构,提供可扩展和安全的AI解决方案。本方案的解决方案架构图如下:

方案所需基础知识  

什么是 Amazon Bedrock?

Amazon Bedrock 是亚马逊云科技推出的一项生成式 AI 服务,旨在帮助开发者轻松访问和部署各种强大的基础模型(Foundation Models),如文本生成、对话生成、图像生成等。通过 Amazon Bedrock,开发者可以快速构建、定制和扩展 AI 应用程序,而无需从零开始训练模型。这种服务使得利用大规模预训练模型变得更加简便和高效,适用于各种行业的 AI 应用场景。

什么是 Amazon SageMaker?

Amazon SageMaker 是亚马逊云科技提供的一站式机器学习服务,帮助开发者和数据科学家轻松构建、训练和部署机器学习模型。SageMaker 提供了全面的工具,从数据准备、模型训练到部署和监控,覆盖了机器学习项目的全生命周期。通过 SageMaker,用户可以加速机器学习模型的开发和上线,并确保模型在生产环境中的稳定性和性能。

什么是 Amazon Glue?

Amazon Glue 是亚马逊云科技的一项完全托管的数据集成服务,用于发现、准备和组合数据,以实现更快的数据分析。Glue 提供了数据提取、转换和加载(ETL)的功能,支持自动数据爬网、元数据管理和数据清理。它使得企业能够轻松将数据从多个源系统整合到数据湖或数据仓库中,为后续的分析和报告提供干净、统一的数据集。

利用大模型生成 SQL 语句的应用场景

自助式数据查询

利用大模型生成 SQL 语句,使非技术用户能够通过自然语言输入查询需求,自动生成相应的 SQL 语句进行数据库查询。这大大降低了数据访问的技术门槛,帮助用户快速获取所需的数据。

复杂查询自动化

在需要构建复杂查询时,开发者可以通过描述性语言让大模型生成 SQL 语句,避免手动编写长而复杂的查询代码。这不仅提高了开发效率,还减少了语法错误的可能性。

数据分析支持

分析师可以利用大模型生成 SQL 语句,根据业务问题自动生成分析查询,快速获取洞察,支持数据驱动的决策过程。

动态报告生成

在报表生成系统中,用户可以通过自然语言描述需要的报告内容,大模型将生成对应的 SQL 语句并自动提取数据,生成实时报告。这种应用使得报告生成更加灵活和高效

本方案包括的内容

1.申请Amazon Bedrock上的Amazon Titan大模型访问权限。

2.运行Amazon Glue数据提取服务,将数据库的Schema导入到Amazon Glue Data Catalog

3.根据Data Catalog利用LangChain生成一个动态提示词模板

4. 通过LangChain API调用Amazon Titan大模型

5. 测试调用大模型生成业务数据库SQL查询语句

项目搭建具体步骤:

1. 登录亚马逊云科技控制台,在Amazon Bedrock中启用Amazon Titan Text G1 - Lite模型,该模型的ID为:”amazon.titan-text-lite-v1“。

2. 下面进入亚马逊云科技SageMkaer服务Stuidio界面,点击Open打开Studio。

3. 创建一个新的Jupyter Notebook文件,命名为mda_text_to_sql_langchain_bedrock.ipynb。首先安装必要依赖,并导入。

%%capture

!pip install sqlalchemy==2.0.29
!pip install langchain==0.1.19
!pip install langchain-experimental==0.0.58
!pip install PyAthena[SQLAlchemy]==3.8.2
!pip install -U langchain-aws==0.1.3

import os
import json
import boto3

import sqlalchemy
from sqlalchemy import create_engine

from langchain.docstore.document import Document
from langchain import PromptTemplate,SagemakerEndpoint,SQLDatabase,LLMChain
from langchain_experimental.sql import SQLDatabaseChain, SQLDatabaseSequentialChain
from langchain.llms.sagemaker_endpoint import LLMContentHandler
from langchain.chains.question_answering import load_qa_chain
from langchain.prompts.prompt import PromptTemplate

from langchain.chains.api.prompt import API_RESPONSE_PROMPT
from langchain.chains import APIChain

from typing import Dict
import time
from langchain_aws import BedrockLLM

4. 接下来我们把json中的业务数据导入到DataFrame中

import pandas as pd
library_df = pd.read_json('s3_library_data.json',lines=True)
library_df

5. 在开始该实验之前,我们通过CloudFormation基础设施即代码服务,创建了Amazon Glue和Catalog云资源。这时我们通过CloudFormation Stack的数据获取这些云资源的ID名字。

## Update with the correct Cloudformation Stack Name
cloudformation_stack_name = '<LabStack>'
cfn_client = boto3.client('cloudformation')

def get_cfn_outputs(cloudformation_stack_name):
    outputs = {}
    for output in cfn_client.describe_stacks(StackName=cloudformation_stack_name)['Stacks'][0]['Outputs']:
        outputs[output['OutputKey']] = output['OutputValue']
    return outputs

outputs = get_cfn_outputs(cloudformation_stack_name)
json_formatted_str = json.dumps(outputs, indent=2)
print(json_formatted_str)

运行该代码得到的回复如下:

6. 我们继续在代码中指定我们的数据文件、Glue Crawler和Glue Database名字

# The following variables define the AWS Glue Data Catalog database and crawler to use.

## DIY Note ##  You will be required to update some of the below variables as part of the DIY
data_file = 's3_library_data.json'
lab_files_folder = 'library-data'
glue_db_name = outputs['LibraryDatabaseName']
glue_crawler_name = outputs['LibraryCrawlerName']

# The below variable does not need to be changed.
lab_files_bucket = outputs['LabfilesBucketName']

print(lab_files_bucket)

7. 下一步我们通过亚马逊云科技Python SDK Boto3,开始运行一个Glue Crawler,在Glue中构建数据库的结构和元数据,供之后利用大模型查询使用。

client = boto3.client('glue')

print("About to start running the crawler: ", glue_crawler_name)

try:
    response = client.start_crawler(Name=glue_crawler_name )
    print("Successfully started crawler. The crawler may take 2-5 mins to detect the schema.")
    while True:
        # Get the crawler status.
        response = client.get_crawler(Name=glue_crawler_name)
         # Extract the crawler state.
        status = response['Crawler']['State']
        # Print the crawler status.
        print(f"Crawler '{glue_crawler_name}' status: {status}")
        if status == 'STOPPING':  # Replace 'READY' with the desired completed state.
            break  # Exit the loop if the desired state is reached.

        time.sleep(10)  # Sleep for 10 seconds before checking the status again.
    
except:
    print("error in starting crawler. Check the logs for the error details.")

8. 接下来我们利用SQLAlchemy Python库与数据库建立连接,通过Python代码调用数据库进行数据操作。

# Define the Region.
region = boto3.session.Session().region_name

## Athena variables
connathena=f"athena.{region}.amazonaws.com" 
portathena='443'                                         # Update, if port is different
schemaathena=glue_db_name                                # from user defined params
s3stagingathena=f's3://{lab_files_bucket}/athenaresults/' # from cfn params
wkgrpathena='primary'                                    # Update, if workgroup is different

## Create the Athena connection string.
connection_string = f"awsathena+rest://@{connathena}:{portathena}/{schemaathena}?s3_staging_dir={s3stagingathena}/&work_group={wkgrpathena}"

## Create the Athena SQLAlchemy engine.
engine_athena = create_engine(connection_string, echo=False)
dbathena = SQLDatabase(engine_athena)

gdc = [schemaathena]
print("Connection to Athena database succeeded: ", gdc[0])

9. 处理Glue Catalog的信息,将数据源数据库的库名、数据库每个表命、每个列命存入字符串变量”columns_str“中。

# Generate dynamic prompts to populate the AWS Glue Data Catalog.
# Harvest AWS Glue crawler metadata.

def parse_catalog():
    # Connect to the Data Catalog.
    columns_str = 'database|table|column_name'
    
    # Define the AWS Glue cient.
    glue_client = boto3.client('glue')
    
    for db in gdc:
        response = glue_client.get_tables(DatabaseName =db)
        for tables in response['TableList']:
            # Classification in the response for S3 and other databases is different. Set classification based on the response location.
            if tables['StorageDescriptor']['Location'].startswith('s3'):  classification='s3' 
            else:  classification = tables['Parameters']['classification']
            for columns in tables['StorageDescriptor']['Columns']:
                    dbname,tblname,colname=tables['DatabaseName'],tables['Name'],columns['Name']
                    columns_str = columns_str+f'\n{dbname}|{tblname}|{colname}'
    return columns_str

glue_catalog = parse_catalog()

print(glue_catalog)

基于我们测试的数据,输出的”columns_str“变量如下。

10. 我们定义函数”identify_channel“,使用LangChain框架的LLMChain方法,通过定义提示词模板调用Amazon Bedrock大模型,获取基于用户问题的生成的SQL数据查询语句,并返回语句以及最佳数据查询方法(利用Athena查询S3中的数据,或利用API查询数据)。

# Define the Bedrock LLM model to use.

bedrock_model = 'amazon.titan-text-lite-v1'

BEDROCK_CLIENT = boto3.client("bedrock-runtime", 'us-east-1')

# Amazon Titan does not require any inference modifiers.
if bedrock_model == 'amazon.titan-text-lite-v1':
    inference_modifier = {}
else:
    inference_modifier = {"temperature":0.0, "max_tokens":50}

llm = BedrockLLM(model_id=bedrock_model, client=BEDROCK_CLIENT, model_kwargs = inference_modifier)

def identify_channel(query):
    prompt_template_titan = """You are a SQL expert. Convert the below natural language question into a valid SQL statement. The schema has the structure below:\n
     """+glue_catalog+""" 
     \n
     Here is the question to be answered:\n
     {query}
     \n
     Provide the SQL query that would retrieve the data based on the natural language request.\n
     
     """
    
    prompt_template = prompt_template_titan
    
    # print(prompt_template)
    
    ## Define prompt 1.
    PROMPT_channel = PromptTemplate(template=prompt_template, input_variables=["query"])
    
    # Define the LLM chain.
    llm_chain = LLMChain(prompt=PROMPT_channel, llm=llm)
    # Run the query and save to generated texts.
    generated_texts = llm_chain.run(query)
    
    # Set the channel from where the query can be answered.
    if 's3' in generated_texts: 
            channel='db'
            db=dbathena
            print("SET database to athena")
    elif 'api' in generated_texts: 
            channel='api'
            print("SET database to weather api")        
    else: raise Exception("User question cannot be answered by any of the channels mentioned in the catalog")
    # print("Step complete. Channel is: ", channel)
    
    return channel, db


11. 下面我们定义函数”run_query“,通过最优的数据查询方法,利用调用LangChain框架"SQLDatabaseChain.from_llm"方法,访问数据库执行我们刚生成的SQL语句。

def run_query(query):

    channel, db = identify_channel(query) # Call the identify channel function first.

    _DEFAULT_TEMPLATE = """
    Here is a schema of a table:
    <schema>
    {table_info}
    </schema>       
    Run a SQL query to answer the question. Follow this format:
    
    SQLQuery: the correct SQL query. For example: select count ( * )  from s3_library_data where genre = 'Novel'
    SQLResult: the result of the SQL query.
    Answer: convert the SQLResult to a grammatically correct sentence.
    
    Here is question: {input}"""
    
    PROMPT_sql = PromptTemplate(
        input_variables=["table_info","input"], template=_DEFAULT_TEMPLATE
    )

    
    if channel=='db':
        db_chain = SQLDatabaseChain.from_llm(llm, db, prompt=PROMPT_sql, verbose=True, return_intermediate_steps=False)
        response=db_chain.run(query)
    else: raise Exception("Unlisted channel. Check your unified catalog")
    return response

12. 我们通过测试问题,调用”run_query“函数进行测试生成回复。

# query = """How many books with a genre of Fantasy are in the library?""" 
# query = """Find 3 books in the library with Tarzan in the title?""" 
# query = """How many books by author Stephen King are in the library?""" 
query = """How many total books are there in the library?""" 

response =  run_query(query)
print("----------------------------------------------------------------------")
print(f'SQL and response from user query {query}  \n  {response}')

我们得到大模型通过LangChain调用数据库后生成的SQL查询语句回复。 

以上就是在亚马逊云科技上利用亚马逊云科技上利用Amazon Bedrock上的大模型和LangChain开发AI驱动的数据平台的全部步骤。欢迎大家未来与我一起,未来获取更多国际前沿的生成式AI开发方案。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2050131.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【Qt】QWidget的styleSheet属性

QWidget的styleSheet属性 通过CSS设置widget的样式。 样式具体描述了界面具体是什么样子的。 CSS (Cascading Style Sheets 层叠样式表) 在进行网页开发的时候&#xff0c;设置网页样式的方式。本⾝属于⽹⻚前端技术. 主要就是⽤来描述界⾯的样式。CSS已经发展多年&#xff0c…

10 个 C# 关键字和功能

在 Stack Overflow 调查中&#xff0c;C# 语言是排名第 5 位的编程语言。它广泛用于创建各种应用程序&#xff0c;范围从桌面到移动设备再到云原生。由于有如此多的语言关键字和功能&#xff0c;对于开发人员来说&#xff0c;要跟上新功能发布的最新信息将是一项艰巨的任务。本…

PYQT实现上传图片,保存图片

代码如下 from PyQt5.QtWidgets import * from PyQt5.QtGui import * from PyQt5.QtCore import * import sysclass MyWindow(QMainWindow):def __init__(self):super(MyWindow, self).__init__()self.setWindowTitle("图片处理")self.setGeometry(200, 200, 500, …

Unity的物理系统

目录 3D 物理系统 主要组件 2D 物理系统 主要组件 物理引擎的选择与应用 物理模拟的控制与优化 Unity中Nvidia PhysX引擎与Box2D引擎在性能和功能上的具体比较是什么&#xff1f; 如何在Unity项目中实现Havok物理引擎&#xff0c;并与PhysX或Box2D引擎结合使用&#xf…

C/C++跳动的爱心代码

系列文章 序号直达链接1C/C爱心代码2C/C跳动的爱心3C/C李峋同款跳动的爱心代码4C/C满屏飘字表白代码5C/C大雪纷飞代码6C/C烟花代码7C/C黑客帝国同款字母雨8C/C樱花树代码9C/C奥特曼代码10C/C圣诞树代码 写在前面 C语言实现简单版的跳动的红色爱心。 语言需求&#xff1a;C语…

pdf翻译成中文的工具有哪些?pdf翻译商务学习两不误

在全球化日益加深的今天&#xff0c;pdf文档的国际交流愈发频繁&#xff0c;但语言障碍却常常让人头疼。 别担心&#xff0c;寻找一款可靠的pdf翻译成中文在线软件&#xff0c;已成为解决这一难题的关键。 今天&#xff0c;就让我们一起探索五款强大的pdf翻译器&#xff0c;它…

二叉搜索树的遍历方法

前序遍历 访问顺序 根子树->左子树->右子树 首先访问根节点&#xff0c;然后递归的前序遍历左子树&#xff0c;最后递归的前序遍历右子树 void preOrder(TreeNode* node) { if (node nullptr) return; cout << node->val << ; // 访问根节点 pre…

系统编程-文件操作和时间编程

3 文件操作和时间编程 目录 3 文件操作和时间编程 一、时间编程 1、有关时间的shell命令 2、时间API &#xff08;1&#xff09;获取日历时间 &#xff08;2&#xff09;将日历时间转化为本地时间 &#xff08;3&#xff09;将日历时间转化为格林威治时间 &#xff08…

【通过禁用任务管理器实现进程保活】(bat)

效果展示 上代码&#xff0c;球球给我点个关注吧 该程序的操作实际是开启后挂在后台循环100万次 kill掉taskmgr.exe的命令的bat脚本 echo off if "%1""hide" goto CmdBegin start mshta vbscript:createobject("wscript.shell").run(""…

git 学习--GitHub Gitee码云 GitLab

1 集中式和分布式的区别 1.1 集中式 集中式VCS必须有一台电脑作为服务器&#xff0c;每台电脑都把代码提交到服务器上&#xff0c;再从服务器下载代码。如果网络出现问题或服务器宕机&#xff0c;系统就不能使用了。 1.2 分布式 分布式VCS没有中央服务器&#xff0c;每台电脑…

基于Java+SpringBoot+Vue的体育馆管理系统的设计与实现

基于JavaSpringBootVue的体育馆管理系统的设计与实现 前言 ✌全网粉丝20W,csdn特邀作者、博客专家、CSDN[新星计划]导师、java领域优质创作者,博客之星、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java技术领域和毕业项目实战✌ &#x1f345;文末获取项目下载方式&…

JavaScript初级——基础知识

一、JS的HelloWord 1、JS的代码需要编写到script标签中 2、JS的执行是根据语句从上到下一次执行的。 二、JS的编写位置 1、可以将js代码编写到标签的onclick属性中&#xff0c;当我们点击按钮时&#xff0c;js代码才会执行。 2、可以将js代码写在超链接的href属性中&#xff0…

fastzdp_sqlmodel新增get_first和is_exitsts方法

说明 经过fastzdp_login的整合&#xff0c;我们发现&#xff0c;fastzdp_sqlmodel还可以继续封装两个便捷的方法。 get_first&#xff1a;获取查询结果集中的第一条数据is_exitsts&#xff1a;判断数据是否已存在 封装get_first方法 def get_first(engine, model, query_di…

优化算法|牛顿-拉夫逊优化算法(NRBO)详解

算法介绍 本篇推文将介绍一种新的智能优化算法&#xff0c;牛顿-拉夫逊优化算法(Newton-Raphson-based optimizer, NBRO)&#xff0c;该成果由学者Sowmya等人于2024年2月发表于期刊《Engineering Applications of Artificial Intelligence》。文末提供了原文作者的完整代码git…

【python爬虫】邮政包裹物流查询2瑞数6加密

大家好呀&#xff0c;我是你们的好兄弟【星云牛马】&#xff0c;今天给大家带来的是瑞数6的补环境的总结&#xff0c;补环境肯定是需要一些基础补环境知识的&#xff0c;所以建议没有基础的小伙伴可以加入学习群进行学习&#xff0c;有基础的伙伴加入交流起来。 QQ群&#xff…

《向量数据库指南》——AI应用部署落地与权限安全差别

部署落地与权限安全差别 RAG部署有许多系统化优化点 RAG分化程度非常高,因为RAG是许多东西的组成,类似大数据生态,里边有非常多的环节,从数据抓取、数据清洗、数据挖掘,然后预处理,再经过模型分析,比如说embedding模型生成向量,然后再做数据的持久化,serving stack,就…

打卡学习Python爬虫第二天|数据解析Re 正则表达式

在前面的学习中&#xff0c;我们已经基本掌握了抓取整个网页的基本技能&#xff0c;但是在实际的需求当中&#xff0c;我们不需要整个网页的内容&#xff0c;只需要一小部分。这就涉及到数据提取的问题。 三种数据解析的方式&#xff1a;可混合使用 1、re解析 2、bs4解析 3…

无人机低成本集群技术实现详解

在现代科技的迅猛发展中&#xff0c;无人机技术已广泛应用于军事侦察、环境监测、农业植保、物流配送等多个领域。其中&#xff0c;无人机集群技术作为提高任务效率、降低成本的重要手段&#xff0c;正受到越来越多的关注。本项目旨在研发一套低成本无人机集群系统&#xff0c;…

<C语言>指针的深度学习

目录 一、字符指针 二、指针数组 三、数组指针 1.数组指针的定义 2.&数组名与数组名 3.数组指针的使用 四、数组参数、指针参数 1.一维数组传参 2.二维数组传参 3.一级指针传参 4.二级指针传参 五、函数指针 六、函数指针数组 七、指向函数指针数组的指针 八、回调函数 1…

微信云开发云存储全部下载

一、安装 首先按照这个按照好依赖 安装 | 云开发 CloudBase - 一站式后端云服务 npm i -g cloudbase/cli 二、登录 tcb login 下载 首先在你要下载到的本地文件内创建一个名为&#xff1a;cloudbaserc.json 的json文件。 填入你的id {"envId":"你的云开发环…