目录
系列文章
1. 引言
2. 常规写法mysql
3. 封装设计接口mysql
3.1dbname.py文件
3.1.1. 导入和基类定义
3.1.2. 具体表定义类
3.1.3. 表定义整合函数
3.1.4. 全局字典和测试代码
3.2mysql_dao文件
3.2.1. 模块导入与配置
3.2.2. 数据库连接池初始化
3.2.3. CommonSQL 类功能
3.3db文件使用
4总结
系列文章
虫洞数观系列总览 | 技术全景:豆瓣电影TOP250数据采集→分析→可视化完整指南
虫洞数观系列一 | 豆瓣电影TOP250数据采集与MySQL存储实战
虫洞数观系列三 | 数据分析全链路实践:Pandas清洗统计 + Navicat可视化呈现
1. 引言
在上一篇文章中,我们完成了豆瓣TOP250电影数据的爬取,存储字段包括:
-
基础信息(中英文片名、详情页链接)
-
制作信息(导演、主演、年份、国家、类型)
-
评分数据(分数、评分人数、经典评语)
这些数据已存入MySQL数据库douban
的top250movie
表中。
本文核心目标:
-
用Python封装MySQL的CRUD(增删改查)操作类
-
建立高效的数据存取管道
-
为后续的Pandas透视分析(如:
-
按年份/国家的评分分布
-
类型与评分的关联性
-
导演/演员的作品统计等)奠定基础
-
通过标准化数据库操作接口,后续数据分析时只需关注业务逻辑,无需重复编写SQL语句。
2. 常规写法mysql
可以参考之前的文章
知识周汇 | MySQL增删改查与Python连接
对以下的数据表格实现增删改查,
# coding=utf-8
import mysql.connector.pooling
import pandas as pd
# 本地数据库
__config = {
"host": "localhost",
"port": 3306,
"user": "root",
"password": "faw-vw.1901",
"database": "douban"
}
try:
pool = mysql.connector.pooling.MySQLConnectionPool(
**__config,
pool_size=10
)
except Exception as e:
print(e)
class DoubanDao():
# 增
def add_infro_from_douban(self):
sql = "REPLACE INTO top250movie (feature) VALUES (2);"
print(sql)
try:
con = pool.get_connection()
cursor = con.cursor()
cursor.execute(sql)
con.commit()
except Exception as e:
if "con" in dir():
con.rollback()
finally:
if "con" in dir():
con.close()
# 删
def del_infro_from_douban(self):
sql = "DELETE FROM top250movie WHERE feature ='TOP0001';"
print(sql)
try:
con = pool.get_connection()
cursor = con.cursor()
cursor.execute(sql)
con.commit()
except Exception as e:
if "con" in dir():
con.rollback()
finally:
if "con" in dir():
con.close()
# 改
def update_infro_from_douban(self):
sql = "UPDATE top250movie SET movie_ch = '" + str("你好") + "' WHERE feature = '" + "TOP0002" + "';"
# print(sql)
try:
con = pool.get_connection()
cursor = con.cursor()
cursor.execute(sql)
print(sql)
con.commit()
except Exception as e:
print(e)
if "con" in dir():
con.rollback()
finally:
if "con" in dir():
con.close()
# 查
def select_infro_from_douban(self):
sql = "SELECT update_date ,movie_ch,movie_en,movie_url FROM top250movie;"
print(sql)
try:
con = pool.get_connection()
cursor = con.cursor()
cursor.execute(sql)
result = cursor.fetchall()
return result
except Exception as e:
if "con" in dir():
con.rollback()
finally:
if "con" in dir():
con.close()
这种实现方式存在代码冗余问题,当数据库表数量增加时,需要为每个表单独编写定制化逻辑,显著增加了开发维护成本。
3. 封装接口设计mysql
这边设计3个py文件,dbname.py - 表定义模块,mysql_dao.py - 数据访问对象(DAO),main.py - 主程序。
架构设计图
+-------------------+ +-------------------+ +-------------------+
| dbname.py | | mysql_dao.py | | main.py |
| 表结构定义模块 |<--->| 数据访问层 |<--->| 业务逻辑层 |
+-------------------+ +-------------------+ +-------------------+
|
v
+-------------------+
| MySQL 数据库 |
+-------------------+
3.1dbname.py文件
该文件主要是想表达数据表的列名和中文名字对应关系
from typing import Dict
class TableDefinition:
"""表定义基类"""
@staticmethod
def _create_table_dict(table_name: str, columns: Dict[str, str]) -> Dict[str, Dict[str, str]]:
"""创建表字典结构"""
return {table_name: columns}
class DouBan(TableDefinition):
"""分析表定义"""
@staticmethod
def top250movie() -> Dict[str, Dict[str, str]]:
"""top250电影"""
columns = {
'update_date': '更新日期',
'feature': '特征值',
'movie_ch': '电影中文名',
'movie_en': '电影英文名',
'movie_url': '电影详情页链接',
'director': '导演',
'star': '主演',
'start_year': '上映年份',
'country': '国籍',
'type': '类型',
'rating': '评分',
'num_ratings': '评分人数',
'comment': '评语',
}
return DouBan._create_table_dict('top250movie', columns)
# 其他表定义类...
def get_dbname_dict() -> Dict[str, Dict[str, str]]:
"""获取所有表定义的字典"""
db_dict = {}
# 合并所有表定义
db_dict.update(DouBan.top250movie())
# 添加其他表...
return db_dict
# 全局表定义字典
dbname_dic = get_dbname_dict()
for dbname in dbname_dic:
print('>>>>>>>>>>>>>>>>>')
print(dbname)
print(dbname_dic[dbname])
打印的结果:
3.1.1. 导入和基类定义
from typing import Dict
class TableDefinition:
"""表定义基类"""
@staticmethod
def _create_table_dict(table_name: str, columns: Dict[str, str]) -> Dict[str, Dict[str, str]]:
"""创建表字典结构"""
return {table_name: columns}
-
从
typing
模块导入Dict
,用于类型注解。 -
定义了一个基类
TableDefinition
,包含一个静态方法_create_table_dict
,用于创建表结构的字典表示。该方法接收表名和列定义字典,返回一个嵌套字典,外层键是表名,内层是列名到列描述的映射。
3.1.2. 具体表定义类
class DouBan(TableDefinition):
"""分析表定义"""
@staticmethod
def top250movie() -> Dict[str, Dict[str, str]]:
"""top250电影"""
columns = {
'update_date': '更新日期',
'feature': '特征值',
'movie_ch': '电影中文名',
'movie_en': '电影英文名',
'movie_url': '电影详情页链接',
'director': '导演',
'star': '主演',
'start_year': '上映年份',
'country': '国籍',
'type': '类型',
'rating': '评分',
'num_ratings': '评分人数',
'comment': '评语',
}
return DouBan._create_table_dict('top250movie', columns)
-
DouBan
继承自TableDefinition
,表示豆瓣相关的表定义。 -
定义了一个静态方法
top250movie
,返回豆瓣Top250电影的表结构:-
包含13个字段,如更新日期、电影中英文名、导演、评分等。
-
每个字段都有英文名和中文描述。
-
使用基类的
_create_table_dict
方法生成最终的字典结构。
-
3.1.3. 表定义整合函数
def get_dbname_dict() -> Dict[str, Dict[str, str]]:
"""获取所有表定义的字典"""
db_dict = {}
# 合并所有表定义
db_dict.update(DouBan.top250movie())
# 添加其他表...
return db_dict
-
该函数整合所有表定义,返回一个统一的字典。
-
目前只添加了
DouBan.top250movie()
,但注释表明可以添加其他表定义。
3.1.4. 全局字典和测试代码
# 全局表定义字典
dbname_dic = get_dbname_dict()
for dbname in dbname_dic:
print('>>>>>>>>>>>>>>>>>')
print(dbname)
print(dbname_dic[dbname])
-
生成全局表定义字典
dbname_dic
。 -
遍历并打印每个表名及其列定义,用于测试和验证。
3.2mysql_dao文件
# coding=utf-8
import pandas as pd
from tqdm import tqdm
import mysql.connector.pooling
from db.dbname import dbname_dic
# Database configuration
__config = {
"host": "localhost",
"port": 3306,
"user": "root",
"password": "faw-vw.1901",
"database": "douban"
}
# Initialize connection pool
try:
pool = mysql.connector.pooling.MySQLConnectionPool(
**__config,
pool_size=10
)
except Exception as e:
print(f"Error initializing connection pool: {e}")
class CommonSQL:
def __init__(self):
self.pool = pool
def execute_sql_no_return(self, sql):
"""Execute SQL without return value."""
try:
con = self.pool.get_connection()
cursor = con.cursor()
cursor.execute(sql)
con.commit()
self._print_success(sql)
except Exception as e:
if "con" in locals():
con.rollback()
self._print_failure(sql)
print(f"Error: {e}")
finally:
if "con" in locals():
con.close()
def executemany_sql_no_return(self, sql, value_list):
"""Execute many SQL statements without return value."""
try:
con = self.pool.get_connection()
cursor = con.cursor()
cursor.executemany(sql, value_list)
con.commit()
self._print_success(sql)
except Exception as e:
if "con" in locals():
con.rollback()
self._print_failure(sql)
print(f"Error: {e}")
finally:
if "con" in locals():
con.close()
def execute_sql_return_value(self, dbname):
"""Execute SQL and return values as a DataFrame."""
try:
con = self.pool.get_connection()
cursor = con.cursor()
sql = f"SELECT * FROM {dbname};"
cursor.execute(sql)
rows = cursor.fetchall()
columns = [desc[0] for desc in cursor.description]
df = pd.DataFrame(rows, columns=columns)
print(df)
# 将df的英文列名更换为中文列名
print(dbname)
print(dbname_dic)
print(dbname_dic[dbname])
if dbname in dbname_dic:
dbname_columns_dic = dbname_dic[dbname]
print(dbname_columns_dic)
for each_column in list(df.columns):
if each_column in dbname_columns_dic:
df.rename(columns={each_column: dbname_columns_dic[each_column]}, inplace=True)
return df
except Exception as e:
if "con" in locals():
con.rollback()
print(f"Error: {e}")
finally:
if "con" in locals():
con.close()
def bulk_update_infor_in_db(self, df, PRIMARY_KEY, update_cols, dbname):
"""Bulk update database with DataFrame."""
sql = self._create_update_sql(dbname, update_cols, PRIMARY_KEY)
self._bulk_operation(df, sql, update_cols, PRIMARY_KEY, dbname, "update")
def bulk_insert_infor_in_db(self, df, insert_cols, dbname):
"""Bulk insert into database with DataFrame."""
sql = self._create_insert_sql(dbname, insert_cols)
self._bulk_operation(df, sql, insert_cols, None, dbname, "insert")
def bulk_replace_infor_in_db(self, df, insert_cols, dbname):
"""Bulk replace into database with DataFrame."""
sql = self._create_replace_sql(dbname, insert_cols)
self._bulk_operation(df, sql, insert_cols, None, dbname, "replace")
def clear_db_table(self, dbname):
"""Clear database table."""
sql = f"TRUNCATE TABLE {dbname}"
self.execute_sql_no_return(sql)
def _create_update_sql(self, dbname, update_cols, PRIMARY_KEY):
set_parts = ", ".join([f"{col} = %s" for col in update_cols])
sql = f"UPDATE {dbname} SET {set_parts} WHERE {PRIMARY_KEY[0]} = %s;"
return sql
def _create_insert_sql(self, dbname, insert_cols):
columns = ", ".join(insert_cols)
placeholders = ", ".join(["%s"] * len(insert_cols))
sql = f"INSERT INTO {dbname} ({columns}) VALUES ({placeholders});"
return sql
def _create_replace_sql(self, dbname, insert_cols):
columns = ", ".join(insert_cols)
placeholders = ", ".join(["%s"] * len(insert_cols))
sql = f"REPLACE INTO {dbname} ({columns}) VALUES ({placeholders});"
return sql
def _bulk_operation(self, df, sql, cols, PRIMARY_KEY, dbname, operation):
"""Helper method to perform bulk operations."""
df_copy = df.copy()
i_max0 = df_copy.shape[0]
num = i_max0 // 5000
for j in range(num + 1):
value_list = []
start = j * 5000
end = min((j + 1) * 5000, i_max0)
for i in tqdm(range(start, end), desc=f"Batch {operation}"):
row = df_copy.iloc[i]
values = [str(row[cols[col]]) for col in cols]
if PRIMARY_KEY:
values.append(str(row[PRIMARY_KEY[1]]))
value_list.append(tuple(values))
self.executemany_sql_no_return(sql, value_list)
print(f"Database {dbname} {operation}d {end - start} rows!!!")
def _print_success(self, sql):
"""Print success message."""
operation = "insert" if "INSERT" in sql else "update" if "UPDATE" in sql else "execute"
print(f"Successfully {operation} {sql}")
def _print_failure(self, sql):
"""Print failure message."""
operation = "insert" if "INSERT" in sql else "update" if "UPDATE" in sql else "execute"
print(f"Failed {operation} {sql}")
3.2.1. 模块导入与配置
# coding=utf-8
import pandas as pd
from tqdm import tqdm
import mysql.connector.pooling
from db.dbname import dbname_dic
-
pandas
:用于将查询结果转换为 DataFrame。 -
tqdm
:显示批量操作的进度条。 -
mysql.connector.pooling
:MySQL 连接池,提高数据库连接效率。 -
dbname_dic
:从自定义模块导入表名和字段名的映射字典(如{'update_date': '更新日期'}
)。
3.2.2. 数据库连接池初始化
__config = {
"host": "localhost",
"port": 3306,
"user": "root",
"password": "faw-vw.1901",
"database": "douban"
}
pool = mysql.connector.pooling.MySQLConnectionPool(
**__config,
pool_size=10
)
-
使用连接池管理数据库连接,默认大小为 10,避免频繁创建/销毁连接。
3.2.3. CommonSQL
类功能
初始化方法
def __init__(self):
self.pool = pool
-
直接使用全局连接池
pool
。
基础 SQL 操作方法
execute_sql_no_return
def execute_sql_no_return(self, sql):
"""执行无返回值的 SQL(如 INSERT/UPDATE/DELETE)"""
try:
con = self.pool.get_connection()
cursor = con.cursor()
cursor.execute(sql)
con.commit()
self._print_success(sql) # 打印成功日志
except Exception as e:
con.rollback() # 回滚事务
self._print_failure(sql) # 打印失败日志
finally:
con.close() # 释放连接
-
用于执行不需要返回结果的 SQL(如 DML 语句)。
executemany_sql_no_return
def executemany_sql_no_return(self, sql, value_list):
"""批量执行无返回值的 SQL"""
try:
con = self.pool.get_connection()
cursor = con.cursor()
cursor.executemany(sql, value_list) # 批量执行
con.commit()
except Exception as e:
con.rollback()
finally:
con.close()
-
高效批量插入/更新数据(如
INSERT INTO ... VALUES (%s, %s)
)。
execute_sql_return_value
def execute_sql_return_value(self, dbname):
"""执行查询并返回 DataFrame(自动转换列名为中文)"""
sql = f"SELECT * FROM {dbname};"
cursor.execute(sql)
rows = cursor.fetchall()
columns = [desc[0] for desc in cursor.description] # 获取列名
df = pd.DataFrame(rows, columns=columns)
# 将英文列名替换为中文(通过 dbname_dic 映射)
if dbname in dbname_dic:
df.rename(columns=dbname_dic[dbname], inplace=True)
return df
-
查询结果转换为 DataFrame,并自动替换列名为中文(如
movie_ch → 电影中文名
)。
批量操作方法
bulk_insert_infor_in_db
/ bulk_update_infor_in_db
/ bulk_replace_infor_in_db
def bulk_insert_infor_in_db(self, df, insert_cols, dbname):
sql = self._create_insert_sql(dbname, insert_cols) # 生成 INSERT SQL
self._bulk_operation(df, sql, insert_cols, None, dbname, "insert")
def _create_insert_sql(self, dbname, insert_cols):
"""生成 INSERT 语句模板,如: INSERT INTO table (col1, col2) VALUES (%s, %s)"""
columns = ", ".join(insert_cols)
placeholders = ", ".join(["%s"] * len(insert_cols))
return f"INSERT INTO {dbname} ({columns}) VALUES ({placeholders});"
-
将 DataFrame 数据分批次(每批 5000 行)插入数据库,通过
tqdm
显示进度。
_bulk_operation
(核心辅助方法)
def _bulk_operation(self, df, sql, cols, PRIMARY_KEY, dbname, operation):
"""批量操作(插入/更新/替换)的通用逻辑"""
for j in range(num_batches):
value_list = []
for i in tqdm(range(start, end), desc=f"Batch {operation}"):
row = df.iloc[i]
values = [str(row[col]) for col in cols] # 提取数据
if PRIMARY_KEY: # 如果是更新操作,追加主键值
values.append(str(row[PRIMARY_KEY[1]]))
value_list.append(tuple(values))
self.executemany_sql_no_return(sql, value_list) # 批量执行
-
支持分批次处理大数据量,避免内存溢出。
其他工具方法
-
clear_db_table
:清空表(TRUNCATE TABLE
)。 -
_print_success
/_print_failure
:格式化打印操作日志。
3.3db文件使用
以下完整演示了"查询→备份→清空→重新插入→更新"的数据处理流程
# 导入必要的库
import pandas as pd # 用于数据处理和分析
from db.mysql_dao import CommonSQL # 自定义的MySQL数据库操作类
def main():
"""
主函数,执行数据库CRUD操作流程
"""
# ==================== 数据查询模块 ====================
# 从'top250movie'表查询数据并返回DataFrame
df = CommonSQL().execute_sql_return_value('top250movie')
print(df) # 打印原始数据
print(df.columns) # 打印列名
df.to_excel('原始数据.xlsx') # 导出到Excel备份
# ==================== 数据清理模块 ====================
# 清空'top250movie'表中的所有数据
CommonSQL().clear_db_table('top250movie')
# ==================== 数据插入模块 ====================
# 从Excel重新加载数据
df = pd.read_excel('原始数据.xlsx')
# 定义数据库字段与DataFrame列的映射关系
insert_cols = {
'update_date': '更新日期', # 数据库字段: DataFrame列名
'feature': '特征值',
'movie_ch': '电影中文名',
'movie_en': '电影英文名',
'movie_url': '电影详情页链接',
'director': '导演',
'star': '主演',
'start_year': '上映年份',
'country': '国籍',
'type': '类型',
'rating': '评分',
'num_ratings': '评分人数',
'comment': '评语',
}
# 执行批量插入操作(两种方式)
CommonSQL().bulk_insert_infor_in_db(df, insert_cols=insert_cols, dbname='top250movie')
CommonSQL().bulk_replace_infor_in_db(df, insert_cols=insert_cols, dbname='top250movie')
# ==================== 数据更新模块 ====================
# 定义主键和需要更新的字段映射
PRIMARY_KEY = ['feature', '特征值'] # 主键字段
update_cols = {
'movie_ch': '电影中文名', # 数据库字段: DataFrame列名
'movie_en': '电影英文名',
'movie_url': '电影详情页链接',
'director': '导演',
'star': '主演',
'start_year': '上映年份',
'country': '国籍',
'type': '类型',
'rating': '评分',
'num_ratings': '评分人数',
'comment': '评语',
}
# 执行批量更新操作
CommonSQL().bulk_update_infor_in_db(df, PRIMARY_KEY, update_cols, 'top250movie')
# 程序入口
if __name__ == '__main__':
main()
4总结
作为一名长期从事数据处理与分析的专业人士,我在实际工作中总结出了一套成熟的MySQL-DataFrame交互方案。该方案有效解决了数据分析过程中常见的"数据搬运"效率瓶颈问题,显著提升了工作效能。
✅ 智能双向无缝转换
• 实现DataFrame与数据库表的自动化映射
• 免除繁琐的SQL查询编写及结果解析过程
• 全面适配各类数据分析场景的特殊需求
⚡ 高性能批处理机制
• 采用智能分块处理技术(5000行/批)
• 基于executemany预编译实现高效数据操作
• 显著降低I/O开销,提升数据处理效率
应用价值:
• 节省90%以上的数据转换时间
• 专注于核心数据分析逻辑开发
• 充分利用DataFrame的强大分析功能
让数据真正流动起来,释放分析潜能!