fastapi 调用ollama之下的sqlcoder模式进行对话操作数据库

news2025/1/24 5:46:02
from fastapi import FastAPI, HTTPException, Request
from pydantic import BaseModel
import ollama
import mysql.connector
from mysql.connector.cursor import MySQLCursor
import json

app = FastAPI()

# 数据库连接配置
DB_CONFIG = {
    "database": "web",        # 您的数据库名,用于存储业务数据
    "user": "root",          # 数据库用户名,需要有读写权限
    "password": "XXXXXX",    # 数据库密码,建议使用强密码
    "host": "127.0.0.1",    # 数据库主机地址,本地开发环境使用localhost
    "port": "3306"          # MySQL 默认端口,可根据实际配置修改
}

# 数据库连接函数
def get_db_connection():
    try:
        conn = mysql.connector.connect(**DB_CONFIG)
        return conn
    except Exception as e:
        raise HTTPException(status_code=500, detail=f"数据库连接失败: {str(e)}")

class SQLRequest(BaseModel):
    question: str

def get_table_relationships():
    """动态获取表之间的关联关系"""
    conn = get_db_connection()
    cur = conn.cursor()
    try:
        # 获取当前数据库名
        cur.execute("SELECT DATABASE()")
        db_name = cur.fetchone()[0]
        
        # 获取外键关系
        cur.execute("""
            SELECT 
                TABLE_NAME,
                COLUMN_NAME,
                REFERENCED_TABLE_NAME,
                REFERENCED_COLUMN_NAME
            FROM INFORMATION_SCHEMA.KEY_COLUMN_USAGE
            WHERE TABLE_SCHEMA = %s
                AND REFERENCED_TABLE_NAME IS NOT NULL
            ORDER BY TABLE_NAME, COLUMN_NAME
        """, (db_name,))
        
        relationships = []
        for row in rows:
            table_name, column_name, ref_table, ref_column = row
            relationships.append(
                f"-- {table_name}.{column_name} can be joined with {ref_table}.{ref_column}"
            )
        
        return "\n".join(relationships) if relationships else "-- No foreign key relationships found"
        
    finally:
        cur.close()
        conn.close()

def get_database_schema():
    """获取MySQL数据库表结构,以CREATE TABLE格式返回"""
    conn = get_db_connection()
    cur = conn.cursor()
    try:
        # 获取当前数据库名
        cur.execute("SELECT DATABASE()")
        db_name = cur.fetchone()[0]
        
        # 获取所有表的结构信息
        cur.execute("""
            SELECT 
                t.TABLE_NAME,
                c.COLUMN_NAME,
                c.COLUMN_TYPE,
                c.IS_NULLABLE,
                c.COLUMN_KEY,
                c.COLUMN_COMMENT
            FROM INFORMATION_SCHEMA.TABLES t
            JOIN INFORMATION_SCHEMA.COLUMNS c 
                ON t.TABLE_NAME = c.TABLE_NAME
            WHERE t.TABLE_SCHEMA = %s
                AND t.TABLE_TYPE = 'BASE TABLE'
            ORDER BY t.TABLE_NAME, c.ORDINAL_POSITION
        """, (db_name,))
        
        rows = cur.fetchall()
        
        schema = []
        current_table = None
        table_columns = []
        
        for row in rows:
            table_name, column_name, column_type, nullable, key, comment = row
            
            if current_table != table_name:
                if current_table is not None:
                    schema.append(f"CREATE TABLE {current_table} (\n" + 
                                ",\n".join(table_columns) + 
                                "\n);\n")
                current_table = table_name
                table_columns = []
            
            # 构建列定义
            column_def = f"  {column_name} {column_type.upper()}"
            if key == "PRI":
                column_def += " PRIMARY KEY"
            elif nullable == "NO":
                column_def += " NOT NULL"
                
            if comment:
                column_def += f" -- {comment}"
                
            table_columns.append(column_def)
        
        # 添加最后一个表
        if current_table is not None:
            schema.append(f"CREATE TABLE {current_table} (\n" + 
                        ",\n".join(table_columns) + 
                        "\n);\n")
            
        return "\n".join(schema)
    finally:
        cur.close()
        conn.close()

def get_chinese_table_mapping():
    """动态生成表名的中文映射"""
    conn = get_db_connection()
    cur = conn.cursor()
    try:
        # 获取所有表的注释信息
        cur.execute("""
            SELECT 
                t.TABLE_NAME,
                t.TABLE_COMMENT
            FROM information_schema.TABLES t
            WHERE t.TABLE_SCHEMA = DATABASE()
            ORDER BY t.TABLE_NAME
        """)
        
        mappings = []
        for table_name, table_comment in cur.fetchall():
            # 生成表的中文名称
            chinese_name = table_name
            if table_name.startswith('web_'):
                chinese_name = table_name.replace('web_', '').replace('_', '')
            if table_comment:
                chinese_name = table_comment.split('--')[0].strip()
                # 如果中文名称以"表"结尾,则去掉"表"if chinese_name.endswith('表'):
                    chinese_name = chinese_name[:-1]
            
            mappings.append(f'           - "{chinese_name}" -> {table_name} table')
        
        return "\n".join(mappings)
    finally:
        cur.close()
        conn.close()

@app.post("/query")
async def query_database(request: Request):
    try:
        # 获取请求体数据并确保正确处理中文
        body = await request.body()
        try:
            request_data = json.loads(body.decode('utf-8'))
        except UnicodeDecodeError:
            request_data = json.loads(body.decode('gbk'))
        
        question = request_data.get('question')
        print(f"收到问题: {question}")  # 调试日志
        
        if not question:
            raise HTTPException(status_code=400, detail="缺少 question 参数")
            
        # 获取数据库结构
        db_schema = get_database_schema()
        #print(f"数据库结构: {db_schema}")  # 调试日志
        
        # 获取中文映射并打印
        chinese_mapping = get_chinese_table_mapping()
        #print(f"表映射关系:\n{chinese_mapping}")  # 添加这行来打印映射
        
        # 修改 prompt 使用更严格的指导
        prompt = f"""
        ### Instructions:
        Convert Chinese question to MySQL query. Follow these rules strictly:
        1. ONLY return a valid SELECT SQL query
        2. Use EXACT table names from the mapping below
        3. DO NOT use any table that's not in the mapping
        4. For Chinese terms, use these exact mappings:
{chinese_mapping}

        ### Examples:
        Question: 所有装修记录
        SQL: SELECT * FROM web_decoration ORDER BY id;

        Question: 查询装修
        SQL: SELECT * FROM web_decoration ORDER BY id;

        ### Database Schema:
        {db_schema}

        ### Question:
        {question}

        ### SQL Query:
        """
        
        # 获取 SQL 查询
        response = ollama.chat(model='sqlcoder:latest', 
                             messages=[{'role': 'user', 'content': prompt}])
        sql_query = response['message']['content'].strip()
        
        print(f"生成的SQL: {sql_query}")  # 调试日志
        
        # 验证 SQL 查询
        if "装修" in question and "web_decoration" not in sql_query:
            sql_query = "SELECT * FROM web_decoration ORDER BY id"
            
        if not sql_query.upper().startswith('SELECT'):
            raise HTTPException(status_code=400, detail="无效的SQL查询格式")
            
        conn = get_db_connection()
        cur = conn.cursor(dictionary=True)
        
        try:
            cur.execute(sql_query)
            results = cur.fetchall()
            return {
                "sql_query": sql_query,
                "results": results
            }
        except mysql.connector.Error as e:
            raise HTTPException(status_code=400, detail=f"SQL 执行错误: {str(e)}")
        finally:
            cur.close()
            conn.close()
            
    except json.JSONDecodeError as e:
        raise HTTPException(status_code=400, detail=f"JSON 解析错误: {str(e)}")
    except Exception as e:
        raise HTTPException(status_code=500, detail=f"处理请求时生错误: {str(e)}")

if __name__ == "__main__":
    import uvicorn
    port = 8666
    print(f"Starting server on port {port}")
    uvicorn.run(
        "main:app",
        host="0.0.0.0", 
        port=port,
        reload=True
    )

我来为您解释这段代码的主要功能和结构:

主要功能

这是一个使用 FastAPI 框架开发的 API 服务,主要用于将中文问题转换为 SQL 查询语句并执行。

核心组件说明

  1. 数据库配置与连接
DB_CONFIG = {
    "database": "web",
    "user": "root",
    "password": "XXXXXX",
    "host": "127.0.0.1",
    "port": "3306"
}

提供了 MySQL 数据库的基本连接配置。

  1. 主要工具函数
  • get_table_relationships(): 获取数据库表之间的外键关系
  • get_database_schema(): 获取数据库表结构
  • get_chinese_table_mapping(): 生成表名的中文映射关系
  1. 核心 API 端点
@app.post("/query")

这个端点接收中文问题,主要处理流程:

  • 接收并解析用户的中文问题
  • 获取数据库结构和表映射
  • 使用 ollama 模型将中文转换为 SQL 查询
  • 执行 SQL 查询并返回结果
  1. 智能转换功能
    使用 ollamasqlcoder 模型将中文问题转换为 SQL 查询,包含:
  • 严格的表名映射
  • SQL 查询验证
  • 错误处理机制

特点

  1. 支持中文输入处理
  2. 自动获取数据库结构
  3. 动态生成中文表名映射
  4. 完善的错误处理机制
  5. 支持热重载的开发模式

使用示例

可以通过 POST 请求访问 /query 端点:

{
    "question": "查询所有装修记录"
}

服务会返回:

{
    "sql_query": "SELECT * FROM web_decoration ORDER BY id",
    "results": [...]
}

安全特性

  1. 数据库连接错误处理
  2. SQL 注入防护
  3. 请求体编码自适应(支持 UTF-8 和 GBK)
  4. 查询结果的安全封装

查看效果:
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2242283.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

如何监控Kafka消费者的性能指标?

要监控 Kafka 消费者性能指标,可以遵循以下最佳实践和策略: 关键性能指标监控: 消息吞吐量:监控消费者和生产者的吞吐量,以评估数据处理和消费的效率。延迟:监控端到端的延迟,例如通过比较消息产…

【LINUX相关】

一、Linux怎么进行查看日志? 首先得问问开发项目日志存放在哪里,可以使用多种命令来查看日志。常用的命令包括tail、cat、less和grep等。例如:1、使用tail命令可以实时查看日志文件的最新内容:tail -f log_file, 2、使用cat命令可…

IT运维的365天--019 用php做一个简单的文件上传工具

前情提要:朋友的工作室,有几个网站分布在不同的服务器上,要经常进行更新,之前是手动复制压缩包到各个服务器去更新(有写了自动更新的Shell脚本)。但还是觉得太麻烦,每次还要手动传输压缩包到各个…

计算机网络 (4)计算机网络体系结构

前言 计算机网络体系结构是指计算机网络层次结构模型,它是各层的协议以及层次之间的端口的集合。这一体系结构为计算机网络及其部件应完成的功能提供了精确定义,并规定了这些功能应由何种硬件或软件来实现。 一、主流模型 计算机网络体系结构存在多种模型…

C++- 基于多设计模式下的同步异步日志系统

第一个项目:13万字,带源代码和详细步骤 目录 第一个项目:13万字,带源代码和详细步骤 1. 项目介绍 2. 核心技术 3. 日志系统介绍 3.1 为什么需要⽇志系统 3.2 ⽇志系统技术实现 3.2.1 同步写⽇志 3.2.2 异步写⽇志 4.知识点和单词补充 4.1单词补充 4.2知识点补充…

小程序租赁系统打造便捷租赁体验助力共享经济发展

内容概要 小程序租赁系统是一个极具创新性的解决方案,它通过简化租赁过程,让物品的共享变得便捷流畅。对于那些有闲置物品的用户来说,他们可以轻松发布自己的物品,让其他需要的人快速找到并租借。而对于找东西的人来说&#xff0…

EXCEL 或 WPS 列下划线转驼峰

使用场景: 需要将下划线转驼峰,直接在excel或wps中第一行使用公式,然后快速刷整个列格式即可。全列工下划线转为格式,使用效果如下: 操作步骤: 第一步:在需要显示驼峰的一列,复制以…

【SpringBoot】公共字段自动填充

问题引入 JavaEE开发的时候,新增字段,修改字段大都会涉及到创建时间(createTime),更改时间(updateTime),创建人(craeteUser),更改人(updateUser),如果每次都要自己去setter(),会比较麻烦&#…

华为云租户网络-用的是隧道技术

1.验证租户网络是vxlan 2.验证用OVS 2.1控制节点VXLAN 本端ip(local ip)192.168.31.8 2.2计算节点VXLAN 本端ip(local ip)192.168.31.11 计算节点用的是bond0做隧道网络 2.3查看bond文件是否主备模式

网络编程-002-UDP通信

1.UDP通信的简单介绍 1.1不需要通信握手,无需维持连接,网络带宽需求较小,而实时性要求高 1.2 包大小有限制,不发大于路径MTU的数据包 1.3容易丢包 1.4 可以实现一对多,多对多 2.客户端与服务端=发送端与接收端 代码框架 收数据方一般都是客户端/接收端 3.头文件 #i…

从PE结构到LoadLibrary

从PE结构到LoadLibrary PE是Windows平台主流可执行文件格式,.exe , .dll, .sys, .com文件都是PE格式 32位的PE文件称为PE32,64位的称为PE32,PE文件格式在winnt.h头中有着详细的定义,PE文件头包含了一个程序在运行时需要的所有信息&#xff…

AntFlow:一款高效灵活的开源工作流引擎

AntFlow 是一款功能强大、设计优雅的开源工作流引擎,其灵感来源于钉钉的工作流设计理念,旨在为企业和开发者提供灵活、高效的工作流解决方案。AntFlow 支持复杂的业务流程管理,具有高度可定制性,且拥有现代化的前端设计&#xff0…

智慧安防丨以科技之力,筑起防范人贩的铜墙铁壁

近日,贵州省贵阳市中级人民法院对余华英拐卖儿童案做出了一审宣判,判处其死刑,剥夺政治权利终身,并处没收个人全部财产。这一判决不仅彰显了法律的威严,也再次唤起了社会对拐卖儿童犯罪的深切关注。 余华英自1993年至2…

python机器人Agent编程——多Agent框架的底层逻辑(上)

目录 一、前言二、两个核心概念2.1 Routines(1)清晰的Prompt(2)工具调用json schema自动生成(3)解析模型的toolcall指令(4)单Agent的循环决策与输出 PS.扩展阅读ps1.六自由度机器人相…

【大语言模型】ACL2024论文-14 任务:不可能的语言模型

【大语言模型】ACL2024论文-14 任务:不可能的语言模型 目录 文章目录 【大语言模型】ACL2024论文-14 任务:不可能的语言模型目录摘要研究背景问题与挑战如何解决创新点算法模型实验效果重要数据与结论推荐阅读指数和推荐理由 后记 任务:不可能…

redis linux 安装

下载解压 https://download.redis.io/releases/ tar -zvxf ----redis-7.4.1编译 进入目录下 # redis 依赖c yum install gcc-cmake可能会有问题,所以记得换源# 安装到 /usr/local/redis make PREFIX/usr/local/redis installcd src ./redis-serverredis.confi…

计算机毕业设计Hadoop+大模型空气质量预测 空气质量可视化 空气质量分析 空气质量爬虫 Spark 机器学习 深度学习 Django 大模型

温馨提示:文末有 CSDN 平台官方提供的学长联系方式的名片! 温馨提示:文末有 CSDN 平台官方提供的学长联系方式的名片! 温馨提示:文末有 CSDN 平台官方提供的学长联系方式的名片! 作者简介:Java领…

云原生之运维监控实践-使用Telegraf、Prometheus与Grafana实现对InfluxDB服务的监测

背景 如果你要为应用程序构建规范或用户故事,那么务必先把应用程序每个组件的监控指标考虑进来,千万不要等到项目结束或部署之前再做这件事情。——《Prometheus监控实战》 去年写了一篇在Docker环境下部署若依微服务ruoyi-cloud项目的文章,当…

HTML+CSS+JavaScript

一、HTML 1、什么是HTML HTML(Hyper Text Markup Language)也叫超文本标记语言,什么意思呢? 超文本:普通文本语言没有什么特殊功能,而超文本,是表示一种比文本语言功能更强大的语言&#xff0c…

Dropout 和 BatchNorm 在训练和验证中的差异

文章目录 1. Dropout1.1 作用1.2 训练和验证的差异1.3 示例 2. Batch Normalization (BatchNorm)2.1 作用2.2 训练和验证时的差异2.3 示例 3. 总结4. 实际使用建议 在神经网络中,Dropout 和 Batch Normalization (BatchNorm) 是常见的层,其行为在 训练阶…