在亚马逊云科技上通过LangChain ReAct Agent开发金融多模态数据AI分析中台

news2025/1/19 14:17:40

项目简介:

小李哥将继续每天介绍一个基于亚马逊云科技AWS云计算平台的全球前沿AI技术解决方案,帮助大家快速了解国际上最热门的云计算平台亚马逊云科技AWS AI最佳实践,并应用到自己的日常工作里。

本次介绍的是如何在亚马逊云科技机器学习托管服务Amazon SageMaker上搭建一个多模态LangChain Agent,通过ReAct逻辑让Agent通过Amazon Bedrock AI模型托管服务上的大模型对用户问题进行推理决定如何执行操作,通过调用一系列亚马逊云科技AI服务,如Amazon Textract、Amazon Transcribe、Amazon Comprehend对多模态数据进行分析处理,最后利用Streamlit框架开发与用户交互的UI界面。本架构设计全部采用了云原生Serverless架构,提供可扩展和安全的AI解决方案。本方案的解决方案架构图如下:

方案所需基础知识 

什么是 Amazon SageMaker?

Amazon SageMaker 是亚马逊云科技提供的一站式机器学习服务,帮助开发者和数据科学家轻松构建、训练和部署机器学习模型。SageMaker 提供了全面的工具,从数据准备、模型训练到部署和监控,覆盖了机器学习项目的全生命周期。通过 SageMaker,用户可以加速机器学习模型的开发和上线,并确保模型在生产环境中的稳定性和性能。

什么是 LangChain Agent?

LangChain Agent 是 LangChain 框架中的一个强大组件,它允许开发者创建具备自主决策能力的 AI 应用程序。通过 LangChain Agent,AI 可以根据预定义的策略和逻辑自动选择合适的工具或模型来处理任务,并在任务执行过程中进行推理和调整。这使得开发者能够构建更复杂、更智能的系统,可以处理多步骤任务并在不同场景中作出动态响应。

什么是 ReAct 逻辑?

ReAct 逻辑(Reasoning and Acting)是一种结合了推理(Reasoning)和行动(Acting)的决策机制,用于增强 AI 系统的智能化行为。在 ReAct 逻辑下,AI 系统不仅会执行预定的任务,还会在任务执行过程中进行推理,分析当前的情境,并动态调整接下来的步骤。通过这种逻辑,AI 能够更好地应对复杂的环境和多变的任务需求。

为什么利用 LangChain Agent 分析多模态数据?

处理复杂数据类型

多模态数据通常包含文本、图像、音频、视频等多种类型的信息。LangChain Agent 可以根据 ReAct 逻辑自主选择不同的模型和工具来处理不同模态的数据,实现对多模态数据的全面分析和理解。

提高分析准确性

通过结合多个数据源的内容,LangChain Agent 能够更准确地推断和生成答案。比如,在分析一段视频时,Agent 可以同时处理转录的文本、图像帧和音频信息,从而提供更全面的分析结果。

灵活的决策能力

利用 ReAct 逻辑,LangChain Agent 可以在多模态数据分析过程中根据实时反馈调整策略,确保分析过程中的每一步都能最大程度地利用数据。这种灵活性特别适合处理数据量大、信息复杂的任务。

自动化复杂任务

LangChain Agent 可以自主进行任务分解和工具选择,自动化执行多步骤分析任务。例如,Agent 可以先从视频中提取音频,转录为文本,然后结合图像分析,最终生成全面的报告或回答复杂的问题。

本方案包括的内容

1. 利用亚马逊云科技AI系列服务处理、分析云端金融原始数据

2. 开发一个LangChain Agent,根据用户需求使用ReAct逻辑判断、选择合适的工具完成数据分析任务

3. 利用Streamlit框架开发金融分析数据中台网页应用

项目搭建具体步骤

1. 进入亚马逊云科技控制台,进入Amazon Bedrock服务主页,确认"Mistral Large"大模型是开启状态。并进入DynamoDB,创建一个表,命名为”chat_history_3c3c6530“,该表用于存储数据中台分析行为的记录。

2. 接下来进入SageMaker服务,进入Studio页面,点击Open打开Jupyter Notebook开发环境。

3. 进入到Jupyter Notebook,创建一个新的ipynb文件,安装和导入必要的依赖。

%%writefile notebook-requirements.txt
boto3
langchain==0.1.20
langchain_experimental
PyAthena[SQLAlchemy] ==3.8.3
sqlalchemy==2.0.27
pandas<2.0.0
numpy==1.24.0
nest-asyncio==1.5.5
PyPortfolioOpt
langchain-aws

!pip install -q -r notebook-requirements.txt

import json
import boto3
import datetime
import pandas as pd

region = 'us-east-1'

其中requirements.txt文件内容如下:

langchain==0.1.20
langchain_experimental
PyAthena[SQLAlchemy] ==3.8.3
sqlalchemy==2.0.27
PyPortfolioOpt
streamlit
bs4
boto3
langchainhub

4. 接下来运行以下代码,通过Python SDK boto3获取云端向量库Kendra的索引、数据源信息。以及存放数据源的S3存储桶名。

# Retrieve the Amazon Kendra index ID and data source ID.
kendra_client = boto3.client('kendra')

kendra_indexes = kendra_client.list_indices()
index = next(item for item in kendra_indexes['IndexConfigurationSummaryItems'] if item["Name"] == "kendra-index")
kendra_index_id = index['Id']
print (f"kendra_index_id is {kendra_index_id}.")


kendra_data_sources = kendra_client.list_data_sources(
    IndexId=kendra_index_id
)
data_source = next(item for item in kendra_data_sources['SummaryItems'] if item["Name"] == "kendra-data-source")
kendra_data_source_id = data_source['Id']

print (f"kendra_data_source_id is {kendra_data_source_id}.")


# Retrieve the S3 bucket names.

s3_client = boto3.client('s3')

data_source_bucket = ''
athena_result_bucket = ''
multimodal_output_bucket = ''

buckets = s3_client.list_buckets()['Buckets']
for bucket in buckets:
    bucket_name = bucket['Name']
    if 'data-source' in bucket_name:
        data_source_bucket = bucket_name
    if 'athena-query' in bucket_name:
        athena_result_bucket = bucket_name
    if 'multi-modal' in bucket_name:
        multimodal_output_bucket = bucket_name

print(f"data_source_bucket is {data_source_bucket}.")
print(f"athena_result_bucket is {athena_result_bucket}.")
print(f"multimodal_output_bucket is {multimodal_output_bucket}.")

 5. 接下来通过Python Boto3 SDK调用亚马逊云科技云端SQL数据分析服务Athena,运行表删除和新表”stock-prices-db“创建命令,并定义新表结构。

glue_db_name = 'stock-prices-db'

athena_client = boto3.client('athena')

def query_athena(query):
    response = athena_client.start_query_execution(
        QueryString=query,
        QueryExecutionContext={
            'Database': glue_db_name
        },
        ResultConfiguration={
            'OutputLocation': f"s3://{athena_result_bucket}/",
        },
        WorkGroup='primary'
    )
    print(response)

drop_table_query='DROP TABLE `stock_prices`;'
query_athena(drop_table_query)


create_table_query=f"""
CREATE EXTERNAL TABLE IF NOT EXISTS `stock-prices-db`.stock_prices ( 
    date string, 
    XLYT double, 
    ZQRS double, 
    VBMP double, 
    KLXE double, 
    JHGN double, 
    QPZT double, 
    MNBV double, 
    LKDI double, 
    PYXC double, 
    FGHT double, 
    RSTU double, 
    YXCV double, 
    QWER double, 
    PLKJ double, 
    HGFD double, 
    SEDC double, 
    FVGB double, 
    HNJM double, 
    YTRE double
)

ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
WITH SERDEPROPERTIES ('separatorChar' = ',', 'quoteChar' = '\\\"', 'escapeChar' = '\\\\')
LOCATION 's3://{data_source_bucket}/stock_prices'
TBLPROPERTIES ('skip.header.line.count'='1')
"""
query_athena(create_table_query)

6. 下面我们初始化Amazon Bedrock客户端,并指明Mistral Large大模型id。

# Create an Amazon Bedrock runtime to invoke the LLM.
from langchain_community.llms import Bedrock

bedrock_runtime = boto3.client(service_name='bedrock-runtime')

model_id = "mistral.mistral-large-2402-v1:0"

7. 接下来我们利用SQLAlchemy库创建一个用于调用Athena数据库的连接。接下来创建一个tool工具,定义函数”stock_query“,用于根据用户的问题访问Athena调用数据库执行SQL查询语句,并获取数据回复用户的问题。

import sqlalchemy
from sqlalchemy import create_engine
from langchain import PromptTemplate,SQLDatabase, LLMChain
from langchain_experimental.sql.base import SQLDatabaseChain

# Specify the parameters for the database connection to Athena.
table = 'stock_prices' # You created this table in the prior code cell.

connathena=f"athena.{region}.amazonaws.com" 
portathena='443' 
schemaathena=glue_db_name 
s3stagingathena=f's3://{athena_result_bucket}/athenaresults/'
wkgrpathena='primary'

##  Create the Athena connection string.
connection_string = f"awsathena+rest://@{connathena}:{portathena}/{schemaathena}?s3_staging_dir={s3stagingathena}&work_group={wkgrpathena}"

##  Create the Athena  SQLAlchemy engine.
engine_athena = create_engine(connection_string, echo=False)
dbathena = SQLDatabase(engine_athena)

from langchain.tools import tool
from langchain_aws import BedrockLLM

# Using the @tool decorator is the simplest way to define a custom tool. 
# By default, the decorator uses the function name as the tool name, but this can be overridden by passing a string as the first argument. 

@tool
def stock_query(query):
    # The decorator will use the function's docstring as the tool's description, so a docstring MUST be provided.

    """Use for answering questions about stocks. It only has information about stocks in table 'stock_prices'.
    This tool accepts only questions as input. Example:  What is average price of JJJ in 2014?"""
    
    sql_template = """
    <s>[INST]
    In one single line, run a SQL query to answer the question. If the answer includes stock prices, format it correctly.
    If a question ask for "closing prices", it should be the last available date in the given time period. 
    For example if the time period in the question is 2020, then the closing price is the price for the last available date in 2020.    
    
    Here is a schema of a table:
    <schema>
    {table_info}
    </schema>    
    Here is question: {input}
    [/INST]"""
    
    model_kwargs = {
        "max_tokens": 250,
        "stop": ["\n"],        
        "temperature": 0.0,
        "top_k": 50,
        "top_p": 0.9
        }
    llm = BedrockLLM(
        client=bedrock_runtime,
        model_id=model_id,
        model_kwargs=model_kwargs,
        streaming=False
    )
    
    PROMPT_sql = PromptTemplate(
        input_variables=["input", "table_info"], template=sql_template
    )
    
    db_chain = SQLDatabaseChain.from_llm(llm, dbathena, prompt=PROMPT_sql)
    
    response=db_chain.invoke(query)
    return response


# Test the run_query tool
stock_query('What are the maximum prices of stocks  RSTU, YXCV, QWER in year 2017?')

8. 运行以下代码,定义一个用于生成投资建议的LangChain tool工具”PortfolioOptimizerTool“。

from functools import reduce
from pypfopt.efficient_frontier import EfficientFrontier
from pypfopt import risk_models
from pypfopt import expected_returns
from typing import List, Optional
# Import things that are needed generically.
from langchain.tools import BaseTool


# You can also explicitly define a custom tool by subclassing the BaseTool class. 

class PortfolioOptimizerTool(BaseTool): 
    
    name = "portfolio_optimizer"
    description = """
        use this tool when you need to build optimal portfolio. 
        The output results tell you the allocation of your money on each stock.
        No need to pass stock prices to this tool.
        This tool only accept stock in this format "stock_ls":["list of stocks"]. Example "stock_ls":["xyz","abc"].
        """

    def _run(self, stocks: dict):
        """Use the tool."""

        import boto3
        import pandas as pd
        from pyathena import connect
        
        # Establish a connection to Athena.
        session = boto3.Session(region_name=region)
        athena_client = session.client('athena')
        
        # Parse the stocks input.
        json_stocks= stocks.split("\n")[0]
        if type(json_stocks) == str:
            json_stocks = json.loads(json_stocks)
            
        stock_ls = json_stocks.get("stock_ls")
        if not stock_ls:
            raise ValueError("Please provide stock list as stock_ls")
        # Execute the query.
        query = f'SELECT * from "stock-prices-db"."stock_prices"'  
        cursor = connect(s3_staging_dir=f's3://{athena_result_bucket}/athenaresults/', region_name=region).cursor()
        cursor.execute(query)

        # Fetch results.
        rows = cursor.fetchall()

        # Convert to pandas DataFrame.
        df = pd.DataFrame(rows, columns=[column[0] for column in cursor.description])

        # Filter data to use the designated list of stocks.
        stock_ls = [x.lower() for x in stock_ls]
        stock_ls.append('date')        
        df = df[stock_ls]

        # Set "Date" as the index and parse it as a datetime object.
        df.set_index("date", inplace=True)
        df.index = pd.to_datetime(df.index, format = '%Y-%m-%d')
        
        mu = expected_returns.mean_historical_return(df)
        S = risk_models.sample_cov(df)

        # Optimize for the maximal Sharpe ratio.
        ef = EfficientFrontier(mu, S)
        weights = ef.max_sharpe()
        ef.portfolio_performance(verbose=True)

        cleaned_weights = ef.clean_weights()
        ef.portfolio_performance(verbose=True)
        # Lastly, convert the weights into actual allocation values; that is, how many of each stock to buy. For your allocation, consider an investment amount of $100,000.

        from pypfopt.discrete_allocation import DiscreteAllocation, get_latest_prices
        latest_prices = get_latest_prices(df)
        da = DiscreteAllocation(weights, latest_prices, total_portfolio_value=10000)
        allocation, leftover = da.greedy_portfolio()       
        results=str(dict(cleaned_weights)).replace('{',"").replace('}',"")
        return f"These are the optimized portfolio {results}"
        

    async def _arun(self, stock_ls: int):
        """Use the tool asynchronously."""
        raise NotImplementedError("This tool does not support async")

# Initialize an optimizer.
optimizer = PortfolioOptimizerTool()        

9. 接下来定义一个自定义tool工具text_extract,用于利用Amazon Textract提取文档中的信息。

# Here, you use the @tool decorator to define a custom tool that will extract text from a .pdf file by using Amazon Textract.

@tool
def text_extract(inputString):
    """Useful for when you need to trigger conversion of pdf version of quaterly reports to text files using amazon textextract"""
    print(inputString)
    lambda_client = boto3.client('lambda', region_name=region)
    lambda_payload = {"inputString:"+inputString}
    response=lambda_client.invoke(FunctionName='textract-pdf-files', # This Lambda function is invoked to do text extraction.
                        InvocationType='RequestResponse',
                     Payload=json.dumps(inputString))
    print(response['Payload'].read())
    return response

10.接下来定义一个自定义tool工具transcribe_audio,用于利用Amazon Transcribe提取语音文件中的信息。

# Define a custom tool that will transcribe audio to text.

@tool
def transcribe_audio(inputString):
    """Useful for when you need to convert audio recordings of earnings calls from audio to text format using Amazon Transcribe"""
    
    print(inputString)
    lambda_client = boto3.client('lambda', region_name=region)
    lambda_payload = {"inputString:"+inputString}
    response=lambda_client.invoke(FunctionName='transcribe-audio', # This Lambda function is invoked to generate transcripts.
                        InvocationType='RequestResponse',
                     Payload=json.dumps(inputString))
    print(response['Payload'].read())
    return response

11. 接下来我们将要处理的文件”Amazon-10K-2023-EarningsReport.pdf“上传到用于数据处理的S3桶中,在对其进行文字提取和音频转录。

file_name='Amazon-10K-2023-EarningsReport.pdf'
copy_source = {
    'Bucket': data_source_bucket,
    'Key': file_name
}
s3_client.copy_object(CopySource=copy_source, Bucket=multimodal_output_bucket, Key=file_name)

text_extract('process')
transcribe_audio('process')

12. 接下来利用亚马逊云科技向量库Kendra对处理后的文档数据向量化并创建索引

import time

kendra_client = boto3.client("kendra")

sync_response = kendra_client.start_data_source_sync_job(
    Id = kendra_data_source_id,
    IndexId = kendra_index_id
)

print("Wait for the data source to sync with the index.")

time.sleep(30)

while True:
    jobs = kendra_client.list_data_source_sync_jobs(
        Id = kendra_data_source_id,
        IndexId = kendra_index_id
    )

    status = jobs["History"][0]["Status"]
    print(" Syncing data source. Status: "+status)
    
    if status in ['FAILED','SUCCEEDED','INCOMPLETE','ABORTED']:
        break
    time.sleep(30)

13. 定义新的工具”lookup_info“,用于在向量库中基于用户问题搜索答案。

from langchain.chains.combine_documents import create_stuff_documents_chain
from langchain.chains import create_retrieval_chain
from langchain_core.prompts import ChatPromptTemplate
from langchain.retrievers import AmazonKendraRetriever

@tool
def lookup_info(question):
    """Useful for when you need to look up information from a knowledge base."""
    model_kwargs = {
        "max_tokens": 2048,
        "temperature": 0.0,
        "top_k": 50,
        "top_p": 0.9
        }
    llm_retrieve = Bedrock(
        client=bedrock_runtime,
        model_id=model_id,
        model_kwargs=model_kwargs,
        streaming=False
    )
      
    retriever = AmazonKendraRetriever(index_id=kendra_index_id,region_name=region, top_k=3)

    prompt_template = """
    Here are a few documents in <documents> tags:
      <documents>
      {context}
      </documents>
      Based on the above documents, provide a detailed answer for, {input} 
      Answer "don't know" if not present in the document. 
    """
    prompt = ChatPromptTemplate.from_template(prompt_template)

    docs_chain = create_stuff_documents_chain(
        llm_retrieve, prompt
    )
    retrieval_chain = create_retrieval_chain(retriever, docs_chain)

    result = retrieval_chain.invoke({"input": question})

 
    return result['answer']

14. 接下来我们定义一个列表,包含我们之前创建的全部tool工具。

# Define the list of tools.

tools = [stock_query, lookup_info, optimizer, text_extract, transcribe_audio]

15. 我们利用LangChain提示词模板定义我们的提示词

from langchain_core.prompts import ChatPromptTemplate

template = '''Respond to the following questions as best you can. You have access to the following tools:

{tools}

Use the following format:

Question: the input question you must answer.
Thought: you should always think about what to do.
Action: the action to take, should be one of [{tool_names}].
Action Input: the input to the action.
Observation: the result of the action.
... (this Thought/Action/Action Input/Observation can repeat N times).

Thought: I now know the final answer.
Final Answer: the final answer to the original input question.

Begin!

Question: {input}
Thought:{agent_scratchpad}

'''

prompt = ChatPromptTemplate.from_template(template)

16. 为Agent添加对话历史记忆功能,让Agent可以通过对话历史更精确地生成回复。对话历史通过LangChain保存在DynamoDB中。

import uuid
from langchain.memory.chat_message_histories import DynamoDBChatMessageHistory
from langchain.memory import ConversationBufferMemory

chat_history_table = 'TO BE PROVIDED' # Provide the DynamoDB table name for storing conversations (prompts and answers).
  
chat_session_id = '0'
  
if chat_session_id == '0' :
    chat_session_id = str(uuid.uuid4())

print (chat_session_id)

chat_history_memory = DynamoDBChatMessageHistory(table_name=chat_history_table, session_id=chat_session_id)

17. 下面我们定义分析平台与用户交互时使用的Bedrock上的大语言模型以及模型参数,并创建一个ReAct Agent根据提示词生成回复。该Agent的特点就是同时会生成推理过程并根据推理过程,引导Agent持续更新调用外部数据源的行为步骤,获得的回复更准确。

from langchain.agents import AgentExecutor, create_react_agent
from langchain_core.runnables.history import RunnableWithMessageHistory


model_kwargs = {
        "max_tokens": 2048,
        "stop": ["\n"],        
        "temperature": 0.0,
        "top_k": 50,
        "top_p": 0.9
        }
llm = Bedrock(
        client=bedrock_runtime,
        model_id=model_id,
        model_kwargs=model_kwargs,
        streaming=False
    )

# Initialize a ReAct agent.
agent = create_react_agent(llm, tools, prompt)

# Create an agent executor by passing in the agent and tools.
agent_executor = AgentExecutor(agent=agent, tools=tools, verbose=True)
agent_with_chat_history = RunnableWithMessageHistory(
    agent_executor,
    # This is needed because a session ID is required in most real-world scenarios.
    # It isn't really used here because we are using the basic, in-memory ChatMessageHistory.
    lambda session_id: message_history,
    input_messages_key="input",
    history_messages_key="chat_history",
)

response = agent_executor.invoke({"input":"What are the closing prices of stocks MNBV, PYXC, LKDI in year 2014?"})
print(response['output'])

18.最后我们通过调用ReAct Agent对示例问题生成回复,示例问题为”查询HFGD、SEDC、FVGB“三家公司的股价。

agent_executor.invoke({"input": "What are the closing prices of stocks HGFD, SEDC, FVGB in year 2018? Can you build an optimized portfolio using these three stocks? Please provide answers to both questions."})

得到的回复如下:

19. 新建一个Python文件”streamlit_app.py“,复制以下代码。该代码利用streamlit框架开发了一个金融数据分析中台网页交互UI,在代码中创建了ReAct Agent代理并利用亚马逊云科技系列AI服务自定义tool工具,基于用户问题查询不同类型的多模态原始数据,最终利用大模型为用户生成精准回复。

import boto3
from botocore.exceptions import ClientError
import json
import langchain
from importlib import reload
import time
import sys
import os
import streamlit as st
import uuid
import datetime
import pandas as pd
import sqlalchemy
from sqlalchemy import create_engine
from langchain_community.llms import Bedrock
from langchain.agents import initialize_agent
from langchain.agents import AgentExecutor, create_react_agent
from langchain.chains.combine_documents import create_stuff_documents_chain
from langchain.chains import create_retrieval_chain
from langchain.retrievers import AmazonKendraRetriever
from langchain_core.tools import Tool, StructuredTool
from langchain.agents import initialize_agent
from langchain.memory.chat_message_histories import DynamoDBChatMessageHistory
from streamlit.web.server.websocket_headers import _get_websocket_headers
from langchain import hub
from langchain_core.runnables.history import RunnableWithMessageHistory

st.set_page_config(layout="wide")

#Session states to hold sateful variables
if 'generated' not in st.session_state:
    st.session_state['generated'] = []
if 'past' not in st.session_state:
    st.session_state['past'] = []
if 'messages' not in st.session_state:
    st.session_state['messages'] = []
if 'ant_key' not in st.session_state:
    st.session_state['ant_key'] = ''
if 'chat_id' not in st.session_state:
    st.session_state['chat_id'] = 1
if 'client_id' not in st.session_state:
    st.session_state['client_id'] = ''
if 'prompt' not in st.session_state:
    st.session_state['prompt'] = ''
if 'memory' not in st.session_state:
    st.session_state['memory'] = ""

######## Global Variables ########
REGION='us-east-1'

bedrock_runtime = boto3.client(service_name='bedrock-runtime')

model_id = "mistral.mistral-large-2402-v1:0"

with open ('param.json','r') as f:
    params=json.load(f)

table = 'stock_prices'
chat_history_table = params['ddb_table']

######## Setting session Id ########

if len(st.session_state['messages'])<1:

    ######## browser client info ########
    headers = _get_websocket_headers()
    st.session_state['client_id'] = str(headers.get("Sec-Websocket-Key"))
    st.session_state['chat_id']= st.session_state['chat_id']+1


session_id=st.session_state['client_id']
chat_id= st.session_state['chat_id']

#persist dynamodb table id for chat history for each session and browser client
@st.cache_data
def db_table_id(session_id, chat_id):
    chat_sess_id=str(uuid.uuid4())
    return chat_sess_id

chat_session_id=db_table_id(session_id, chat_id)


######## Create tools and Agent ########

# Import tools from utility folder.
from utility import stock_query_mm, kendra_tool_mm, portfolio_tool
from utility.aws_tools import text_extract, transcribe_audio,sentiment_analysis #To complete the DIY, add sentiment_analysis to this import list

# Create tools
tools = [stock_query_mm.run_query, kendra_tool_mm.lookup_info, portfolio_tool.OptimizePortfolio(), text_extract, transcribe_audio,sentiment_analysis] #To complete the DIY, include sentiment_analysis tool in this list

# Define Bedrock LLM
model_kwargs = {
        "max_tokens": 250,
        "stop": ["\n"],
        "temperature": 0.0,
        "top_k": 50,
        "top_p": 0.9
        }
llm = Bedrock(
        client=bedrock_runtime,
        model_id=model_id,
        model_kwargs=model_kwargs
    )

# Saving conversation hisory in DynamoDB table
chat_history_memory = DynamoDBChatMessageHistory(table_name=chat_history_table, session_id=chat_session_id)

# Initialize a LangChain ReAct Agent
react_agent = create_react_agent(llm, tools, hub.pull("hwchase17/react"))

if st.session_state['memory']:
    agent_executor = RunnableWithMessageHistory(
        AgentExecutor(agent=react_agent, tools=tools),
        # This is needed because in most real world scenarios, a session id is needed
        # It isn't really used here because we are using a simple in memory ChatMessageHistory
        lambda session_id: chat_history_memory,
        input_messages_key="input",
        history_messages_key="chat_history",
    )
else:
    agent_executor = AgentExecutor(agent=react_agent, tools=tools)

# A function to invoke the agent and parse the output
def query(prompt, agent_executor):

    try:
        if st.session_state['memory']:
            output=agent_executor.invoke({"input":prompt},config={"configurable": {"session_id": chat_session_id}})
        else:
            output=agent_executor.invoke({"input":prompt})
            
        chat_history_memory.add_ai_message(str(output['output']))
        response = output.get('output').replace('\n','').replace('```','')
        response = response.replace("%", " percent")
    except Exception as e:
        print(e)
        response = "I couldn't find the answer to your inquiry. Could you try again?"

    return response

def action_doc(agent_executor):
    st.title('Multimodal Agent to assist Financial Analyst')

    # Display chat messages from history on app rerun
    for message in st.session_state.messages:
        if "role" in message.keys():
            with st.chat_message(message["role"]):
                st.markdown(message['content'].replace("$","USD ").replace("%", " percent"))

        else:
            with st.expander(label="**Intermediate Steps**"):
                st.write(message["steps"])

    if prompt := st.chat_input("Hello?"):
        st.session_state.messages.append({"role": "user", "content": prompt})
        with st.chat_message("user"):
            st.markdown(prompt)


        with st.chat_message("assistant"):
            message_placeholder = st.empty()
            output_answer=query(prompt, agent_executor)
            message_placeholder.markdown(output_answer.replace("$","USD ").replace("%", " percent"))
        st.session_state.messages.append({"role": "assistant", "content": output_answer})

######## Streamlit app UX display ########

def app_sidebar():
    with st.sidebar:
        st.write('## How to use:')
        description = """This app lets you query multimodal documents and get relevant answers from documents inculde CSV files, audio files and pdf files. To refresh the current session, click the `Clear Session` button."""
        st.markdown(description)
        st.write('---')
        st.write('## Sample Questions')
        st.markdown("""
                    - What are the closing prices of stocks MNBV, PYXC, LKDI in year 2014? Can you build an optimized portfolio using these three stocks? Please provide answers to both questions.
                    - What is the net sales for Amazon in 2022 and 2023? What is the percent difference?
                    - What is Amazon doing with Responsible Supply Chain?   
                    """)
        st.write('## DIY Sample Question')
        st.markdown("""
                    - What is the sentiment of shareholders towards Amazon's profit in 2023?
                    """)
        st.markdown("""
                    **Datasets**
                    
                    - [Q1 2023 Quaterly Earnings recordings](https://s2.q4cdn.com/299287126/files/doc_financials/2023/q1/Amazon-Quarterly-Earnings-Report-Q1-2023-Full-Call-v1.mp3)
                    - [2023 10-K Reports](https://d18rn0p25nwr6d.cloudfront.net/CIK-0001018724/d2fde7ee-05f7-419d-9ce8-186de4c96e25.pdf)
                    - [Sustainability Executive Summary](https://sustainability.aboutamazon.com/reporting)                    
                    -  Fictional stock tickers and stock prices are generated by Amazon Bedrock.
                    """)
        st.write('---')

        use_memory=''
        mem = st.checkbox('Conversation Memory')
        if mem:
            use_memory=True
        st.session_state['memory']=use_memory

        if st.button('Clear Session'):
            '''
                The Clear context helps to refresh the UI and also create a new session for the chat. This creates a new Dynamo DB table to                   hold the chat history.
            '''
            # Delete all the items in Session state
            for key in st.session_state.keys():
                del st.session_state[key]
            # create new session state items
            if 'generated' not in st.session_state:
                st.session_state['generated'] = []
            if 'past' not in st.session_state:
                st.session_state['past'] = []
            if 'messages' not in st.session_state:
                st.session_state['messages'] = []
            if 'ant_key' not in st.session_state:
                st.session_state['ant_key'] = ''
            if 'chat_id' not in st.session_state:
                st.session_state['chat_id'] = 1
            if 'client_id' not in st.session_state:
                st.session_state['client_id'] = ''
            if 'prompt' not in st.session_state:
                st.session_state['prompt'] = ""
            if 'memory' not in st.session_state:
                st.session_state['memory'] = False

def main(agent_executor):
    params=app_sidebar()
    action_doc(agent_executor)


if __name__ == '__main__':
    main(agent_executor)

20. 安装运行应用的的必要依赖

sudo yum install blas-devel lapack-devel
pip install -r requirements.txt -q

21. 运行命令启动streamlit服务器

streamlit run streamlit_app.py

22. 打开启动服务器后返回的URL,在浏览器中打开,根据左侧的提示问题向Agent提问就可以得到基于原始数据查询的的文字回复了。

以上就是在亚马逊云科技上利用亚马逊云科技Amazon Sagemaker上搭建LangChain Agent,开发智能化多模态数据金融分析的中台全部步骤。欢迎大家未来与我一起,未来获取更多国际前沿的生成式AI开发方案。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2067296.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

简易版营业厅宽带系统

TOC ssm018简易版营业厅宽带系统jsp 绪论 1.1 研究背景 当前社会各行业领域竞争压力非常大&#xff0c;随着当前时代的信息化&#xff0c;科学化发展&#xff0c;让社会各行业领域都争相使用新的信息技术&#xff0c;对行业内的各种相关数据进行科学化&#xff0c;规范化管…

音频Transformer架构

第3单元:音频Transformer架构 本课程中,我们主要关注Transformer模型以及它们如何应用于音频任务。虽然您不需要了解这些模型的内部细节,但了解使它们工作的主要概念很有用,因此我们在本小节中回顾一下关于Transformer的知识。有关transformer的深入了解,请查看我们的NLP…

互联网的发展是否加剧了数字鸿沟?

有人问&#xff1a;互联网的发展是否加剧了数字鸿沟。 互联网的发展确实在某种程度上加剧了数字鸿沟。虽然互联网的普及为全球范围内的人们提供了前所未有的访问信息、教育资源和经济机会的机会&#xff0c;但其发展也凸显并放大了不同群体之间的差距&#xff0c;比如以下几个…

dokcer 安装 redis(单机版)

准备工作 拉取redis镜像 docker pull redis 通过docker-compose 安装redis 很方便、很简单 先安装docker&#xff0c;参考我这个安装示例进行安装 https://blog.csdn.net/qq_33192671/article/details/13714973 然后安装docker-compose&#xff0c;要是拉取docker-compose无…

【在Linux世界中追寻伟大的One Piece】IO基础

目录 1 -> 回顾 1.1 -> 回顾C文件接口 1.2 -> 总结 2 -> 系统文件I/O 3 -> 接口介绍 3.1 -> open 3.2 -> open函数返回值 3.3 -> 文件描述符fd 4 -> 0 & 1 & 2 5 -> 文件描述符的分配规则 6 -> 重定向 7 -> 使用dup2系…

跨链互通:Web3如何实现多链互操作性

随着区块链技术的发展&#xff0c;各类区块链网络不断涌现&#xff0c;然而&#xff0c;不同链之间的互操作性问题成为了一个重要挑战。跨链互通&#xff08;Cross-chain Interoperability&#xff09;技术正是为了解决这一问题&#xff0c;旨在打破各区块链网络间的壁垒&#…

恒创科技:如何管理和减少Windows服务器 CPU 负载?

CPU 负载是衡量网络服务器或计算机中央处理器 (CPU) 在任意给定时间内处理工作量的指标。它通常表示 CPU 正在执行或排队等待处理的进程数。 如何读取和管理CPU负载&#xff1a; 对于 Windows 系统 Windows 本身不支持“top”和“ps”命令&#xff0c;而类 Unix 系统则支持。不…

Xinstall助力App运营,邀请码自动识别,效率翻倍!

在App推广和运营的道路上&#xff0c;邀请码一直是一个让人又爱又恨的存在。它能够帮助我们追踪用户来源&#xff0c;衡量推广效果&#xff0c;但同时&#xff0c;繁琐的填写步骤也让许多潜在用户望而却步。然而&#xff0c;随着Xinstall的出现&#xff0c;这一切都将迎来颠覆性…

Promise学习之同步与异步

目录 前言 一、同步与异步 (一) 同步 (二) 异步 二、总结 (一) 同步 (二) 异步 前言 Java有多线程&#xff0c;前端有同步与异步&#xff0c;异步操作可以优化用户体验、提高性能与响应、处理并发与并行任务等等&#xff0c;异步操作有发送Ajax请求、读文件等&#xff0…

简明的Arthas故障排查实践

写在文章开头 Arthas是一款强大的开源Java诊断程序,它可以非常方便的启动并以界面式的方式和Java程序进行交互,支持监控程序的内存使用情况、线程信息、gc情况、甚至可以反编译并修改现上代码等。所以它成为笔者进行线上问题排查的重要手段,而本文将从实际使用的角度介绍一下…

我带着我的未来回来了!

&#x1f51d;&#x1f51d;&#x1f51d;&#x1f51d;&#x1f51d;&#x1f51d;&#x1f51d;&#x1f51d;&#x1f51d;&#x1f51d;&#x1f51d;&#x1f51d;&#x1f51d;&#x1f51d;&#x1f51d; &#x1f947;博主昵称&#xff1a;小菜元 &#x1f35f;博客主页…

第九周:机器学习笔记

第九周机器学习周报 摘要Abstract机器学习——Spatial Transformer1.1 How to transform an image/feature map?&#xff08;怎么做&#xff09;1.2 Interpolation&#xff08;插值&#xff09;1.3 spatial Transformer的应用 Pytorch学习1. 线性层2. 其他层的介绍3. 搭建小实…

Leetcode 237.19.83.82 删除链表重复结点 C++实现

Leetcode 237. 删除链表中的节点 问题&#xff1a;有一个单链表的head&#xff0c;我们想删除它其中的一个节点node。给你一个需要删除的节点 node 。你将 无法访问 第一个节点head。链表的所有值都是唯一的&#xff0c;并且保证给定的节点 node不是链表中的最后一个节点。删除…

buuctf [ACTF新生赛2020]usualCrypt

前言&#xff1a;学习笔记。 常规&#xff1a; 下载 解压 查壳。 32位IDA pro打开 先查找字符串 在进入main() 分析&#xff1a; 关键函数&#xff1a; 第一部分&#xff1a; 大写转小写 小写转大写。 已知&#xff1a; 密文&#xff0c;以及加密过程由三部分组成。 那么逆向…

一款电容型、非接触式感知的智能水浸模组-WS11

水侵模组 - WS11&#xff08;Water Sensor-MC11S&#xff09;是一款电容型、非接触式感知的智能水浸模组&#xff0c;集成了高集成度差分式数字电容芯片MC11S。模组内嵌MCU&#xff0c;通过UART输出电容和检测状态信息&#xff0c;进行算法分析&#xff0c;有效滤除振动、凝露等…

Android自定义一个带背景的圆环形进度条(Kotlin)

前言 在Android开发过程中&#xff0c;难免遇到一些复杂的UI组件需要我们自定义 当然使用系统原生组件拼凑也能完成&#xff0c;但是UI复杂度增加了不说&#xff0c;在更新UI状态的时候还不好管理&#xff0c;最重要的是复用的价值不大&#xff0c;上述的操作很容易引增加码冗…

温湿度传感器---DHT11

温湿度传感器---DHT11 一、DHT11介绍二、DHT11原理图三、DHT11工作原理和时序3.1 DHT11工作时序 四、DHT11代码 一、DHT11介绍 DHT11数字温湿度传感器是一款含有已校准数字信号输出的温湿度复合传感器&#xff0c;传感器内部包括一个8位单片机控制一个电阻式感湿元件和一个NTC…

如何用Java SpringBoot+Vue打造“花开富贵”花园管理系统【实战教程】

✍✍计算机编程指导师 ⭐⭐个人介绍&#xff1a;自己非常喜欢研究技术问题&#xff01;专业做Java、Python、微信小程序、安卓、大数据、爬虫、Golang、大屏等实战项目。 ⛽⛽实战项目&#xff1a;有源码或者技术上的问题欢迎在评论区一起讨论交流&#xff01; ⚡⚡ Java实战 |…

学习【正点原子】新建寄存器版本MDK工程过程中错误总结

参考视频&#xff1a; 第21讲 基础篇-新建寄存器版本MDK工程2_哔哩哔哩_bilibili 参考文档&#xff1a;F:\STM32学习\【正点原子】探索者STM32F407开发板V3 资料盘(A盘)STM32F407 探索者开发指南V1.2 准备 使用的是正点原子探索者【STM32F407ZG】&#xff0c;编译软件版本是…

读论文《Behavior Pattern Mining-based Multi-Behavior Recommendation》

论文地址&#xff1a;arxiv.org/pdf/2408.12152v1 项目地址&#xff1a;GitHub - rookitkitlee/BPMR 基于行为模式挖掘的多行为推荐&#xff1a;论文提出了一种新颖的多行为推荐算法&#xff08;BPMR&#xff09;&#xff0c;旨在通过分析用户和项目之间的复杂交互模式来提高…