python对mysql数据库的操作

news2025/4/18 17:42:45

现在遇到一个问题如何将数据批量的插入mysql数据库中

基础操作


import asyncio
from config import config
from mysql_pool import MysqlPool


class MysqlLoop(object):
    def __init__(self):
        self.logger = config.logger
        self.pool = MysqlPool()

    def loop_query(self, queries):
        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)
        results = loop.run_until_complete(
            self.some_query(loop, self.mysql_query, queries))
        return results

    def loop_many_query(self, queries):
        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)
        results = loop.run_until_complete(
            self.some_query(loop, self.mysql_many_query, queries))
        return results

    async def some_query(self, loop, func, args):
        tasks = []
        for item in args:
            tasks.append(self.make_future(loop, func, *item))
        results = await asyncio.gather(*tasks)
        return results

    async def make_future(self, loop, func, *args):
        future = loop.run_in_executor(None, func, *args)
        result = await future
        return result

    def mysql_query(self, sql, args=None):
        return self.pool.select_one(sql, args)

    def mysql_many_query(self, sql, args=None):
        return self.pool.select_all(sql, args)

import pymysql
import logging
import os
from dbutils.pooled_db import PooledDB


class MysqlPool(object):
    def __init__(self, db = 'spiders_binance'):
        self.db = db
        self.logger = logging
        self.pool = self.mysql_connection()

    def mysql_connection(self):
        host = 'rm-wz97166ln9cin6304zo.mysql.rds.aliyuncs.com' 
        pool = PooledDB(pymysql,
                        maxconnections=4,
                        maxcached=10,
                        host=host,
                        user='biteagle',
                        port=3306,
                        passwd="rfpMh@F36KsyQ2M",
                        db=self.db,
                        charset='utf8',
                        use_unicode=True)
        return pool

    def create_conn(self):
        conn = self.pool.connection()
        cursor = conn.cursor(pymysql.cursors.DictCursor)
        return conn, cursor

    def close_conn(self, conn, cursor):
        conn.close()
        cursor.close()

    def select_one(self, sql, args=None):
        conn, cur = self.create_conn()
        cur.execute(sql, args)
        result = cur.fetchone()
        self.close_conn(conn, cur)
        return result

    def select_all(self, sql, args=None):
        conn, cur = self.create_conn()
        cur.execute(sql, args)
        result = cur.fetchall()
        self.close_conn(conn, cur)
        return result

    def insert_one(self, sql, args=None):
        conn, cur = self.create_conn()
        result = cur.execute(sql, args)
        pk_id = cur.lastrowid
        conn.commit()
        self.close_conn(conn, cur)
        return pk_id

    def delete_one(self, sql, args=None):
        conn, cur = self.create_conn()
        result = cur.execute(sql, args)
        conn.commit()
        self.close_conn(conn, cur)
        return result

    def update_one(self, sql, args=None):
        conn, cur = self.create_conn()
        result = cur.execute(sql, args)
        conn.commit()
        self.close_conn(conn, cur)
        return result

    def update_many(self, table_name, col_list, data_list, pri_name='id'):
        # sql语句

        cols = ", ".join('`{}`=%s'.format(k) for k in col_list)
        update_many_article_into_news_sql = f"""
        UPDATE
        {table_name}
        SET
        {cols}
        WHERE
        {pri_name} = %s;
        """

        conn, cur = self.create_conn()
        # 批量插入
        try:
            res = cur.executemany(update_many_article_into_news_sql, data_list)
            # print(res)
            conn.commit()
        except Exception as e:
            self.logger.error(e)
            conn.rollback()
        finally:
            self.close_conn(conn, cur)

    def update(self, table_name, pk_id, update_data, pri_name='id'):
        """
        update data into mysql while pk = id
        """
        cols = ', '.join('`{}`=%s'.format(k) for k in update_data)
        update_sql = f"""
        UPDATE
        {table_name}
        SET
        {cols}
        WHERE
        {pri_name} = {pk_id};
        """

        self.update_one(update_sql, list(update_data.values()))

    def save_many(self, table_name, col_list, data_list):
        # sql语句

        cols = ", ".join('`{}`'.format(k) for k in col_list)
        val_cols = ', '.join('%s' for k in col_list)
        save_many_article_into_news_sql = f'INSERT IGNORE INTO {table_name}({cols}) VALUES ({val_cols})'

        conn, cur = self.create_conn()
        # 批量插入
        try:
            res = cur.executemany(save_many_article_into_news_sql, data_list)
            # print(res)
            conn.commit()
        except Exception as e:
            self.logger.error(e)
            conn.rollback()
        finally:
            self.close_conn(conn, cur)

    def save(self, table_name, save_data):
        """
        save data into mysql
        """

        cols = ", ".join('`{}`'.format(k) for k in save_data.keys())
        val_cols = ', '.join('%({})s'.format(k) for k in save_data.keys())
        save_article_into_news_sql = f"""
        INSERT IGNORE INTO 
        {table_name}
        (%s) 
        VALUES
        (%s)
        """

        # self.logger.info(f'save_data: {save_data}')

        news_id = self.insert_one(
            save_article_into_news_sql % (cols, val_cols), save_data)
        # self.logger.info('save succeed.')

        return news_id

    def delete(self, table_name, pk_id, pri_name='id'):
        delete_user_sql = f"""
        DELETE FROM
        {table_name}
        WHERE
        {pri_name} = %s;
        """

        self.delete_one(delete_user_sql, pk_id)


if __name__ == "__main__":
    from langchain.document_loaders import DirectoryLoader, TextLoader
    import pymysql
    import re
    data_directory = r"C:/Users/Asus/Desktop/back_bitspider2/chroma/data"
    pool = MysqlPool()
    cols = ['id', 'title', 'content']
     #按目录加载文档
    loader = DirectoryLoader(data_directory, glob='**/*.txt')
    docs = loader.load()
    from time import sleep
    # cursor.execute(sql,(id,title,content))
    # db.commit()
    # i = 0
    data_list = [
    ]
    for  i in range(len(docs)):
        if i < 99:
            continue
        str = docs[i].metadata["source"]
        match = re.search(r"chroma\\data\\(.*).txt", str)
        if match:
            title = match.group(1)
        # title = ""
        content = docs[i].page_content
        # data.append(id,title,content)
        data_list.append([i,title,content])
        
        
        
  
    pool.save_many('binance', cols, data_list)
    

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2332188.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

深入解析栈回溯技术:如何通过异常处理精准定位程序崩溃点

一、栈回溯 1.1 栈回溯的原理 调试程序时&#xff0c;经常发生这类错误&#xff1a; 1.读写某个地址&#xff0c;导致程序崩溃 2.调用某个空函数&#xff0c;导致程序崩溃在异常处理函数中&#xff0c;可以打印出”发生错误瞬间”的所有寄存器。 我们调试时&#xff0c;可以…

重构居家养老安全网:从 “被动响应” 到 “主动守护”

随着全球老龄化加剧&#xff0c;居家养老安全成为社会关注的核心议题。 传统养老模式依赖人工巡检或单一传感器&#xff0c;存在响应滞后、隐私泄露、场景覆盖不足等问题。 由此智绅科技应运而生&#xff0c;七彩喜智慧养老系统构筑居家养老安全网。 而物联网&#xff08;Io…

Unity6下架中国区,团结引擎接棒:这是分裂,还是本地化的开始?

就在近日&#xff0c;一则消息在国内游戏开发圈内迅速传播开来&#xff1a;Unity 6 及其后续版本已在中国大陆及港澳地区下架。这意味着&#xff0c;未来中国用户将无法直接使用 Unity 最新的主线版本。而取而代之的&#xff0c;是由 Unity 中国主导推出的本地化产品 —— 团结…

ESP8266水位监测以及温湿度数据采集

上面就是ESP8266的引脚图&#xff0c;水温检测使用的是水位监测传感器&#xff0c;温湿度测量使用的是DHT11&#xff0c;DHT11的反应时间是2秒&#xff0c;这里要注意。开发采用Arduino程序 1. 传感器初始化 功能&#xff1a;初始化DHT11温湿度传感器和串口通信。 代码实现&…

国产信创数据库:PolarDB 分布式版 V2.0,支持集中分布式一体化

阿里云PolarDB数据库管理软件&#xff08;分布式版&#xff09;V2.0 &#xff0c;安全可靠的集中分布式一体化数据库管理软件。点此查看详情https://www.aliyun.com/activity/database/polardbx-v2?spma2c6h.13046898.publish-article.8.44146ffaE0lEWT 立即咨询专家&#xf…

Axure PR 9 中继器 09 删除行

大家好&#xff0c;我是大明同学。 接着上期的内容&#xff0c;这期内容&#xff0c;我们来了解一下Axure中继器数据表删除行交互设计。 预览地址&#xff1a;https://vvlmqu.axshare.com 删除行 1.打开上期RP 文件&#xff0c;设计一个删除弹窗元件&#xff0c; 创建为动态面…

HDCP(五)

HDCP 2.2 测试用例设计详解 基于HDCP 2.2 CTS v1.1规范及协议核心机制&#xff0c;以下从正常流程与异常场景两大方向拆解测试用例设计要点&#xff0c;覆盖认证、密钥管理、拓扑验证等关键环节&#xff1a; 1. 正常流程测试 1.1 单设备认证 • 测试目标&#xff1a;验证源设…

商城APP打包教程

下载 HBuilderX 工具 HBuilderX支持插件拓展功能。App开发版已集成相关插件、开箱即用 根据自身电脑系统选择对应软件下载&#xff0c;建议选择APP开发版 2. 下载好软件安装后打开 建议直接在uniapp插件页面一键导入&#xff0c;正常情况下uniapp插件都是最新的&#xff0c;大家…

Spring 框架的核心基础:IoC 和 AOP

一、IoC&#xff08;Inversion of Control&#xff0c;控制反转&#xff09; 定义&#xff1a; IoC&#xff08;Inversion of Control&#xff0c;控制反转&#xff09;&#xff0c;就是把对象创建和依赖关系的管理交给 Spring 容器&#xff0c;而不是由程序员手动去创建对象…

SpringBoot 基础知识,HTTP 概述

1. 概述 1.1 Spring Spring 提供若干个子项目&#xff0c;每个项目用于完成特定功能 Spring 的若干个子项目都基于一个基础的框架&#xff1a;Spring Framework 框架类似于 房屋的地基 但 Spring Framework 配置繁琐&#xff0c;入门难度大 1.2 Spring Boot 于是&#xf…

《网络管理》实践环节04:SNMP监控数据采集流程及SNMP协议详细分析

兰生幽谷&#xff0c;不为莫服而不芳&#xff1b; 君子行义&#xff0c;不为莫知而止休。 1 实验目标 1. 理解SNMP网络管理原理 2. 掌握SNMP服务器采集SNMP Agent数据的方法 3. 掌握SNMP报文发送和应答流程 4. 掌握典型GetResponsePDU数据结构分析的方法 4. 具备SNMP通信…

《Uniapp-Vue 3-TS 实战开发》构建HTTP请求拦截器

引言 在 UniApp 结合 TypeScript 和 Vue3 的项目开发中&#xff0c;请求拦截器起着至关重要的作用。它能够在请求发送前和响应接收后对数据进行统一处理&#xff0c;极大地提高了代码的可维护性和功能性。本文将详细解析上述代码中请求拦截器的实现及其在 UniApp-Ts-Vue3 项目中…

从PDF中提取表格:以GB/T2260—2007为例

文章目录 先说结论前因后果思路1、PDF2CSV2、PDF2MD → MD2CSV3、针对不同表格的两种思路1&#xff09; 竖形三线表2&#xff09;五元素为一组 还没结束批量处理1、分割markdown文档2、跳过另一种格式的文档 总结一下 先说结论 结论就是&#xff0c;博主用了一天的时间去研究如…

初识MySQL · 复合查询(内外连接)

目录 前言&#xff1a; 基本查询回顾 笛卡尔积和子查询 笛卡尔积 内外连接 子查询 单行子查询 多行子查询 多列子查询 from中使用子查询 合并查询 前言&#xff1a; 在前文我们学习了MySQL的基本查询&#xff0c;就是简单的套用了select语句&#xff0c;最多不过是…

辛格迪客户案例 | 北京舒曼德医药实施电子合约系统(eSign)

01 北京舒曼德医药科技开发有限公司&#xff1a;医药科技的数字化先锋 北京舒曼德医药科技开发有限公司&#xff08;以下简称“舒曼德医药”&#xff09;作为国内医药科技领域的领军企业&#xff0c;致力于创新药物的研发、临床试验和市场推广。公司以“科技兴药、质量为先、服…

Python面向对象-开闭原则(OCP)

1. 什么是开闭原则&#xff1f; 开闭原则(Open-Closed Principle, OCP) 是面向对象设计的五大SOLID原则之一&#xff0c;由Bertrand Meyer提出。其核心定义是&#xff1a; “软件实体(类、模块、函数等)应该对扩展开放&#xff0c;对修改关闭。” 对扩展开放&#xff1a;当需求…

Class 文件和类加载机制

一、Class 文件 与 类加载机制 概述 什么是 Class 文件&#xff1f; Java 源码&#xff08;.java&#xff09;经过 javac 编译器 编译生成的字节码文件&#xff08;.class&#xff09;&#xff1b;由 JVM 识别执行&#xff0c;包含类的完整结构信息&#xff08;如字段、方法、…

Vue3+Vite+TypeScript+Element Plus开发-07.Mockjs引用与Axios封装

系列文档目录 Vue3ViteTypeScript安装 Element Plus安装与配置 主页设计与router配置 静态菜单设计 Pinia引入 Header响应式菜单缩展 Mockjs引用与Axios封装 登录设计 登录成功跳转主页 多用户动态加载菜单 Pinia持久化 动态路由-配置 文章目录 目录 系列文档目…

【Redis】背景知识

一、Redis的特性 Redis是一种基于键值对&#xff08;key-value&#xff09;的NoSQL数据库&#xff0c;与很多键值对数据库不同的是&#xff0c;Redis中的值可以是由string&#xff08;字符串&#xff09;&#xff0c;hash&#xff08;哈希&#xff09;&#xff0c;list&#xf…

航电系统的任务载荷集成技术要点概述!

一、任务载荷集成技术难点 1. 接口标准化与兼容性 异构设备协议冲突&#xff1a;不同厂商的载荷设备&#xff08;如光学相机、雷达、电子战模块&#xff09;采用不同的通信协议&#xff08;如1553B、RS422、以太网&#xff09;&#xff0c;需设计统一的总线接口标准以支持即…