现在遇到一个问题如何将数据批量的插入mysql数据库中
基础操作
import asyncio
from config import config
from mysql_pool import MysqlPool
class MysqlLoop(object):
def __init__(self):
self.logger = config.logger
self.pool = MysqlPool()
def loop_query(self, queries):
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
results = loop.run_until_complete(
self.some_query(loop, self.mysql_query, queries))
return results
def loop_many_query(self, queries):
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
results = loop.run_until_complete(
self.some_query(loop, self.mysql_many_query, queries))
return results
async def some_query(self, loop, func, args):
tasks = []
for item in args:
tasks.append(self.make_future(loop, func, *item))
results = await asyncio.gather(*tasks)
return results
async def make_future(self, loop, func, *args):
future = loop.run_in_executor(None, func, *args)
result = await future
return result
def mysql_query(self, sql, args=None):
return self.pool.select_one(sql, args)
def mysql_many_query(self, sql, args=None):
return self.pool.select_all(sql, args)
import pymysql
import logging
import os
from dbutils.pooled_db import PooledDB
class MysqlPool(object):
def __init__(self, db = 'spiders_binance'):
self.db = db
self.logger = logging
self.pool = self.mysql_connection()
def mysql_connection(self):
host = 'rm-wz97166ln9cin6304zo.mysql.rds.aliyuncs.com'
pool = PooledDB(pymysql,
maxconnections=4,
maxcached=10,
host=host,
user='biteagle',
port=3306,
passwd="rfpMh@F36KsyQ2M",
db=self.db,
charset='utf8',
use_unicode=True)
return pool
def create_conn(self):
conn = self.pool.connection()
cursor = conn.cursor(pymysql.cursors.DictCursor)
return conn, cursor
def close_conn(self, conn, cursor):
conn.close()
cursor.close()
def select_one(self, sql, args=None):
conn, cur = self.create_conn()
cur.execute(sql, args)
result = cur.fetchone()
self.close_conn(conn, cur)
return result
def select_all(self, sql, args=None):
conn, cur = self.create_conn()
cur.execute(sql, args)
result = cur.fetchall()
self.close_conn(conn, cur)
return result
def insert_one(self, sql, args=None):
conn, cur = self.create_conn()
result = cur.execute(sql, args)
pk_id = cur.lastrowid
conn.commit()
self.close_conn(conn, cur)
return pk_id
def delete_one(self, sql, args=None):
conn, cur = self.create_conn()
result = cur.execute(sql, args)
conn.commit()
self.close_conn(conn, cur)
return result
def update_one(self, sql, args=None):
conn, cur = self.create_conn()
result = cur.execute(sql, args)
conn.commit()
self.close_conn(conn, cur)
return result
def update_many(self, table_name, col_list, data_list, pri_name='id'):
# sql语句
cols = ", ".join('`{}`=%s'.format(k) for k in col_list)
update_many_article_into_news_sql = f"""
UPDATE
{table_name}
SET
{cols}
WHERE
{pri_name} = %s;
"""
conn, cur = self.create_conn()
# 批量插入
try:
res = cur.executemany(update_many_article_into_news_sql, data_list)
# print(res)
conn.commit()
except Exception as e:
self.logger.error(e)
conn.rollback()
finally:
self.close_conn(conn, cur)
def update(self, table_name, pk_id, update_data, pri_name='id'):
"""
update data into mysql while pk = id
"""
cols = ', '.join('`{}`=%s'.format(k) for k in update_data)
update_sql = f"""
UPDATE
{table_name}
SET
{cols}
WHERE
{pri_name} = {pk_id};
"""
self.update_one(update_sql, list(update_data.values()))
def save_many(self, table_name, col_list, data_list):
# sql语句
cols = ", ".join('`{}`'.format(k) for k in col_list)
val_cols = ', '.join('%s' for k in col_list)
save_many_article_into_news_sql = f'INSERT IGNORE INTO {table_name}({cols}) VALUES ({val_cols})'
conn, cur = self.create_conn()
# 批量插入
try:
res = cur.executemany(save_many_article_into_news_sql, data_list)
# print(res)
conn.commit()
except Exception as e:
self.logger.error(e)
conn.rollback()
finally:
self.close_conn(conn, cur)
def save(self, table_name, save_data):
"""
save data into mysql
"""
cols = ", ".join('`{}`'.format(k) for k in save_data.keys())
val_cols = ', '.join('%({})s'.format(k) for k in save_data.keys())
save_article_into_news_sql = f"""
INSERT IGNORE INTO
{table_name}
(%s)
VALUES
(%s)
"""
# self.logger.info(f'save_data: {save_data}')
news_id = self.insert_one(
save_article_into_news_sql % (cols, val_cols), save_data)
# self.logger.info('save succeed.')
return news_id
def delete(self, table_name, pk_id, pri_name='id'):
delete_user_sql = f"""
DELETE FROM
{table_name}
WHERE
{pri_name} = %s;
"""
self.delete_one(delete_user_sql, pk_id)
if __name__ == "__main__":
from langchain.document_loaders import DirectoryLoader, TextLoader
import pymysql
import re
data_directory = r"C:/Users/Asus/Desktop/back_bitspider2/chroma/data"
pool = MysqlPool()
cols = ['id', 'title', 'content']
#按目录加载文档
loader = DirectoryLoader(data_directory, glob='**/*.txt')
docs = loader.load()
from time import sleep
# cursor.execute(sql,(id,title,content))
# db.commit()
# i = 0
data_list = [
]
for i in range(len(docs)):
if i < 99:
continue
str = docs[i].metadata["source"]
match = re.search(r"chroma\\data\\(.*).txt", str)
if match:
title = match.group(1)
# title = ""
content = docs[i].page_content
# data.append(id,title,content)
data_list.append([i,title,content])
pool.save_many('binance', cols, data_list)