本篇我们介绍如何利用 Python DB API 连接和操作 MySQL 数据库,包括数据的增删改查操作、存储过程调用以及事务处理等。
Python 是一种高级、通用的解释型编程语言,以其优雅、准确、 简单的语言特性,在云计算、Web 开发、自动化运维、数据科学以及机器学习等人工智能领域获得了广泛应用。
Python 定义了连接和操作数据库的标准接口 Python DB API。不同的数据库在此基础上实现了特定的驱动,这些驱动都实现了标准接口。
MySQL Connector/Python 是官方提供的 Python 访问 MySQL 驱动程序,接下来我们介绍如何通过该驱动连接和操作 MySQL 数据库。本文使用 Python 3。
36.1 连接数据库
MySQL Connector/Python
MySQL Connector/Python 支持 MySQL 提供的所有功能,包括 Python 和 MySQL 数据类型转换,MySQL 扩展语法等。另外,它还支持数据流的压缩,TCP/IP 连接以及 SSL 安全连接。
MySQL Connector/Python 是纯 Python 实现的 API,不需要安装 MySQL客户端代码库或者任何非标准的 Python 模块。
下表列出了兼容的 Connect/Python、MySQL 以及 Python 版本:
Connector/Python | MySQL Server | Python |
---|---|---|
8.0 | 8.0, 5.7, 5.6, 5.5 | 3.6, 3.5, 3.4, 2.7 |
2.2 | 5.7, 5.6, 5.5 | 3.5, 3.4, 2.7 |
2.1 | 5.7, 5.6, 5.5 | 3.5, 3.4, 2.7, 2.6 |
2.0 | 5.7, 5.6, 5.5 | 3.5, 3.4, 2.7, 2.6 |
1.2 | 5.7, 5.6, 5.5 (5.1, 5.0, 4.1) | 3.4, 3.3, 3.2, 3.1, 2.7, 2.6 |
安装 MySQL Connector/Python 之前需要满足以下条件:
- 使用管理员权限执行安装;
- 已经安装 Python,同时将安装目录添加到系统环境变量 PATH。
MySQL Connector/Python 可以从 pypi.org 网站下载,或者我们可以直接使用 pip 工具进行安装。
pip install mysql-connector-python
或者也可以指定安装具体的驱动版本:
pip install mysql-connector-python==8.0.28
以下命令可以用于卸载 MySQL Connector/Python:
pip uninstall mysql-connector-python
执行后系统会提示确认信息:
Proceed (y/n)? y
输入 y 确认卸载。
安装完成后,打开 Python 命令行,输入以下代码验证驱动安装并成功连接 MySQL 数据库:
>>> import mysql.connector
>>> mysql.connector.connect(host='localhost',database='mysql',user='root',password='your pass')
如果屏幕显示类似以下信息,说明已经成功安装 MySQL Connector/Python 驱动:
<mysql.connector.connection.MySQLConnection object at 0x0187AE50>
使用 connect() 函数连接 MySQL
新建一个 Python 文件 connect.py,输入以下代码:
import mysql.connector
from mysql.connector import Error
def connect():
""" Connect to MySQL database """
conn = None
try:
conn = mysql.connector.connect(host='192.168.56.104',
database='hrdb',
user='tony',
password='tony')
if conn.is_connected():
print('Connected to MySQL database')
except Error as e:
print(e)
finally:
if conn is not None and conn.is_connected():
conn.close()
if __name__ == '__main__':
connect()
以上代码包含的步骤如下:
- 首先,从 MySQL Connector/Python 包中导入 mysql.connector 和 Error 对象;
- 其次,使用 connect() 函数连接 MySQL 服务器。connect() 函数包含 4 个参数:host、database、user 以及 password。示例中的 connect() 函数建立了一个 python_mysql 数据连接,并且返回了一个 MySQLConnection 对象。
- 再次,使用 is_connected() 方法检查数据库连接是否成功。如果产生任何异常,例如 MySQL 服务不可用、数据库不存在或者无效的用户名/密码,Python 将会抛出这个异常。try except 代码库处理并显示异常信息。
- 最后,使用 close() 方法关闭数据库连接。
输入以下命令测试 connect.py 模块:
>python connect.py
Connected to MySQL database
如果用户名或者密码无效,将会返回以下错误:
1045 (28000): Access denied for user 'root'@'localhost' (using password: YES)
如果 MySQL 服务不可用,将会返回以下错误:
2003: Can't connect to MySQL server on 'localhost:3306' (10061 No connection could be made because the target machine actively refused it)
在以上示例中,我们在代码中硬编码了数据库的配置信息,例如 localhost、python_mysql、root 等,这不是一种好的方法,我们应该使用数据库配置文件存储这些信息。
使用 MySQLConnection 对象连接 MySQL
首先,创建一个数据库配置文件 config.ini,定义数据库相关的参数:
[mysql]
host = 192.168.56.104
port = 3305
database = hrdb
user = tony
password = tony
其次,创建一个新的模块 python_mysql_dbconfig.py,用于读取 config.ini 文件中的配置信息并返回一个字典对象:
from configparser import ConfigParser
def read_db_config(filename='config.ini', section='mysql'):
""" Read database configuration file and return a dictionary object
:param filename: name of the configuration file
:param section: section of database configuration
:return: a dictionary of database parameters
"""
# create parser and read ini configuration file
parser = ConfigParser()
parser.read(filename)
# get section, default to mysql
db = {}
if parser.has_section(section):
items = parser.items(section)
for item in items:
db[item[0]] = item[1]
else:
raise Exception('{0} not found in the {1} file'.format(section, filename))
return db
我们使用了 ConfigureParser 包读取配置文件。在 Python 解释器中测试 python_mysql_dbconfig 模块:
>>> from python_mysql_dbconfig import read_db_config
>>> read_db_config()
{'host': '192.168.56.104', 'port': '3306', 'database': 'hrdb', 'user': 'tony', 'password': 'tony'}
然后创建一个新的模块 connect2.py,使用 MySQLConnection 对象连接 python_mysql 数据库:
from mysql.connector import MySQLConnection, Error
from python_mysql_dbconfig import read_db_config
def connect():
""" Connect to MySQL database """
db_config = read_db_config()
conn = None
try:
print('Connecting to MySQL database...')
conn = MySQLConnection(**db_config)
if conn.is_connected():
print('Connection established.')
else:
print('Connection failed.')
except Error as error:
print(error)
finally:
if conn is not None and conn.is_connected():
conn.close()
print('Connection closed.')
if __name__ == '__main__':
connect()
以上代码包含的步骤如下:
- 首先,从 MySQL Connector/Python 包中导入 MySQLConnection 以及 Error 对象,从 python_mysql_dbconfig 模块中导入 read_db_config。
- 然后,读取数据库配置信息,创建一个新的 MySQLConnection 实例对象。其他内容和上文中的示例类似。
测试一下 connect2 模块:
>python connect2.py
Connecting to MySQL database...
connection established.
Connection closed.
36.2 查询数据
在 Python 代码中查询 MySQL 数据的步骤如下:
- 连接 MySQL 数据库,返回一个 MySQLConnection 对象;
- 使用 MySQLConnection 对象初始化一个 MySQLCursor 游标对象;
- 调用游标的 execute() 方法执行查询;
- 使用 fetchone()、fetchmany() 或者 fetchall() 方法从结果集中获取数据;
- 调用相关对象的 close() 方法关闭游标和数据库连接。
fetchone() 方法
fetchone() 方法用于从结果集中返回下一条记录,如果没有记录时返回 None。例如:
from mysql.connector import MySQLConnection, Error
from python_mysql_dbconfig import read_db_config
def query_with_fetchone():
try:
dbconfig = read_db_config()
conn = MySQLConnection(**dbconfig)
cursor = conn.cursor()
cursor.execute("SELECT * FROM department")
row = cursor.fetchone()
while row is not None:
print(row)
row = cursor.fetchone()
except Error as e:
print(e)
finally:
cursor.close()
conn.close()
if __name__ == '__main__':
query_with_fetchone()
代码中的具体步骤如下:
- 连接 MySQL 数据库,返回一个 MySQLConnection 对象;
- 使用 MySQLConnection 对象初始化一个 MySQLCursor 游标对象;
- 执行查询语句,查找数据表 department 中的全部数据行;
- 调用 fetchone() 方法从结果集中获取第一行数据。然后在 while 循环中显示数据行的内容并获取下一行数据,直到返回所有数据行;
- 调用相关对象的 close() 方法关闭游标和数据库连接。
示例中的 read_db_config() 函数来自上文中的 python_mysql_dbconfig.py 模块。
fetchall() 方法
如果查询返回的数据行较少,可以使用 fetchall() 方法一次性从数据库返回所有数据。例如:
from mysql.connector import MySQLConnection, Error
from python_mysql_dbconfig import read_db_config
def query_with_fetchall():
try:
dbconfig = read_db_config()
conn = MySQLConnection(**dbconfig)
cursor = conn.cursor()
cursor.execute("SELECT * FROM department")
rows = cursor.fetchall()
print('Total Row(s):', cursor.rowcount)
for row in rows:
print(row)
except Error as e:
print(e)
finally:
cursor.close()
conn.close()
if __name__ == '__main__':
query_with_fetchall()
代码逻辑和上一个示例类似,除了使用 fetchall() 方法替代 fetchone() 方法。因为我们返回了数据表 department 中的全部记录,可以通过游标对象的 rowcount 数据获取返回的总行数。
fetchmany() 方法
对于相对大一些的查询结果,返回整个查询结果集需要更多时间;而且 fetchall() 需要足够多的内存存储完整结果,可能导致效率问题。
为此,MySQL Connector/Python 接口提供了 fetchmany() 方法,可以获取查询结果中的下一行或者下一批数据行,从而达到返回时间和内存占用的平衡。
首先,定义一个生成器,将数据库调用拆分成一系列的 fetchmany() 方法调用:
def iter_row(cursor, size=10):
while True:
rows = cursor.fetchmany(size)
if not rows:
break
for row in rows:
yield row
其次,调用 iter_row() 生成器每次获取 10 行数据:
def query_with_fetchmany():
try:
dbconfig = read_db_config()
conn = MySQLConnection(**dbconfig)
cursor = conn.cursor()
cursor.execute("SELECT * FROM employee")
for row in iter_row(cursor, 10):
print(row)
except Error as e:
print(e)
finally:
cursor.close()
conn.close()
36.3 插入数据
使用 MySQL Connector/Python 接口插入数据主要包含以下步骤:
- 连接 MySQL 数据库,返回一个 MySQLConnection 对象;
- 使用 MySQLConnection 对象初始化一个 MySQLCursor 游标对象;
- 执行 INSERT 语句插入数据并提交事务;
- 关闭数据库连接。
插入单行数据
我们首先在数据库 hrdb 中创建一个新的数据表 books:
-- 通过 SQL 脚本创建表
CREATE TABLE books(
id INTEGER NOT NULL AUTO_INCREMENT,
title VARCHAR(50) NOT NULL,
isbn VARCHAR(13) NOT NULL,
photo BLOB,
PRIMARY KEY (id)
);
以下示例为数据表 books 插入了一行新数据:
from mysql.connector import MySQLConnection, Error
from python_mysql_dbconfig import read_db_config
def insert_book(title, isbn):
query = "INSERT INTO books(title,isbn) " \
"VALUES(%s,%s)"
args = (title, isbn)
try:
db_config = read_db_config()
conn = MySQLConnection(**db_config)
cursor = conn.cursor()
cursor.execute(query, args)
if cursor.lastrowid:
print('last insert id', cursor.lastrowid)
else:
print('last insert id not found')
conn.commit()
except Error as error:
print(error)
finally:
cursor.close()
conn.close()
def main():
insert_book('SQL编程思想','9787121421402')
if __name__ == '__main__':
main()
主要代码如下:
- 首先,从 MySQL Connector/Python 包中导入 MySQLConnection 以及 Error 对象,从 python_mysql_dbconfig 模块中导入 read_db_config() 函数;
- 其中,定义一个新的函数 insert_book(),包含两个参数:title 和 isbn。在该函数内部,构造一个 INSERT 语句(query)和数据(args);
- 然后,在 try except 代码块中创建一个新连接,执行语句,提交事务。注意,我们需要显示调用 commit() 方法提交事务。如果成功插入一行数据,我们可以使用游标对象的 lastrowid 属性获取自增字段(AUTO_INCREMENT )最后插入的 id。
- 接着,在 insert_book() 函数的最后关闭游标和数据库连接。
- 最后,在 main() 函数中调用 insert_book() 函数插入一行数据。
插入多行数据
MySQL INSERT 语句支持一次插入多行数据,例如:
INSERT INTO books(title,isbn)
VALUES ('高性能MySQL(第4版)', '9787121442575'),
('Linux是怎样工作的', '9787115581617'),
('机器学习', '9787302423287');
对于 Python,我们可以使用 MySQLCursor 对象的 executemany() 方法插入多行数据。例如:
from mysql.connector import MySQLConnection, Error
from python_mysql_dbconfig import read_db_config
def insert_books(books):
query = "INSERT INTO books(title,isbn) " \
"VALUES(%s,%s)"
try:
db_config = read_db_config()
conn = MySQLConnection(**db_config)
cursor = conn.cursor()
cursor.executemany(query, books)
conn.commit()
except Error as e:
print('Error:', e)
finally:
cursor.close()
conn.close()
def main():
books = [('高性能MySQL(第4版)', '9787121442575'),
('Linux是怎样工作的', '9787115581617'),
('深度学习', '9787302423287')]
insert_books(books)
if __name__ == '__main__':
main()
示例中的代码逻辑和上一个示例类似,只是将 execute() 方法替换成了 executemany() 方法。在 main() 函数中,我们传递了一个元组列表,每个元素包含了相应的 title 和 isbn。
36.4 更新数据
使用 MySQL Connector/Python 接口更新数据的操作步骤如下:
- 连接 MySQL 数据库,返回一个 MySQLConnection 对象;
- 使用 MySQLConnection 对象初始化一个 MySQLCursor 游标对象;
- 执行 UPDATE 语句更新数据并提交事务;
- 关闭数据库连接。
以下示例通过 id 更新图书的的 title 信息:
from mysql.connector import MySQLConnection, Error
from python_mysql_dbconfig import read_db_config
def update_book(book_id, title):
# read database configuration
db_config = read_db_config()
# prepare query and data
query = """ UPDATE books
SET title = %s
WHERE id = %s """
data = (title, book_id)
try:
conn = MySQLConnection(**db_config)
# update book title
cursor = conn.cursor()
cursor.execute(query, data)
# accept the changes
conn.commit()
except Error as error:
print(error)
finally:
cursor.close()
conn.close()
if __name__ == '__main__':
update_book(4, '机器学习')
其中,UPDATE 语句使用了两个占位符(%),分别用于 title 和 id。然后将 UPDATE 语句(query)和 (title,id) 元组传递给 execute() 方法,MySQL Connector/Python 会将查询转换为以下语句:
UPDATE books
SET title = '机器学习'
WHERE id = 4
注意,如果 SQL 语句需要接收用户的输入,一定要使用占位符(%s),这种方法可以阻止 SQL 注入问题。
接下来我们运行一个测试。首先查看 id 等于 4 的数据:
SELECT id, title, isbn
FROM books
WHERE id = 4;
id|title |isbn |
--+--------+-------------+
4|深度学习 |9787302423287|
然后执行 update.py 模块:
python update.py
最后再次查看 id 等于 4 的记录:
SELECT id, title, isbn
FROM books
WHERE id = 4;
id|title |isbn |
--+--------+-------------+
4|机器学习 |9787302423287|
36.5 删除数据
使用 MySQL Connector/Python 接口插入数据主要包含以下步骤:
- 连接 MySQL 数据库,返回一个 MySQLConnection 对象;
- 使用 MySQLConnection 对象初始化一个 MySQLCursor 游标对象;
- 执行 UPDATE 语句更新数据并提交事务;
- 关闭数据库连接。
以下示例通过 id 实现图书数据的删除:
from mysql.connector import MySQLConnection, Error
from python_mysql_dbconfig import read_db_config
def delete_book(book_id):
db_config = read_db_config()
query = "DELETE FROM books WHERE id = %s"
try:
# connect to the database server
conn = MySQLConnection(**db_config)
# execute the query
cursor = conn.cursor()
cursor.execute(query, (book_id,))
# accept the change
conn.commit()
except Error as error:
print(error)
finally:
cursor.close()
conn.close()
if __name__ == '__main__':
delete_book(4)
我们同样在 DELETE 语句中使用了占位符(%)。调用 execute() 方法时传递了 DELETE 语句和 (book_id,) 元组,MySQL Connector/Python 会将删除转换为以下语句:
DELETE FROM books
WHERE id = 4;
然后运行以上示例。最后再次查询 id 等于 4 的图书,不会返回任何数据,意味着成功删除了一条记录。
36.6 调用存储过程
MySQL Connector/Python 接口支持调用 MySQL 存储过程。
准备工作
第 31 篇介绍了 MySQL 存储过程相关的知识。
我们首先创建两个作为演示的存储过程。以下存储过程返回了全部图书:
USE hrdb;
DELIMITER $$
CREATE PROCEDURE find_all()
BEGIN
SELECT
title,
isbn
FROM books
ORDER BY title;
END$$
DELIMITER ;
直接调用 find_all() 存储过程:
CALL find_all();
title |isbn |
-------------------+-------------+
Linux是怎样工作的 |9787115581617|
SQL编程思想 |9787121421402|
高性能MySQL(第4版)|9787121442575|
第二个存储过程 find_by_isbn() 用于通过 ISBN 查找图书:
DELIMITER $$
CREATE PROCEDURE find_by_isbn(
IN p_isbn VARCHAR(13),
OUT p_title VARCHAR(50)
)
BEGIN
SELECT title
INTO p_title
FROM books
WHERE isbn = p_isbn;
END$$
DELIMITER ;
find_by_isbn() 包含两个参数:p_isbn(入参)以及 p_title(出参)。通过传入 ISBN,该存储过程返回对应的图表名称,例如:
CALL find_by_isbn('9787121421402',@title);
SELECT @title;
@title |
----------+
SQL编程思想|
Python 调用存储过程
在 Python 代码中调用存储过程的步骤如下:
- 连接 MySQL 数据库,返回一个 MySQLConnection 对象;
- 使用 MySQLConnection 对象初始化一个 MySQLCursor 游标对象;
- 调用游标对象的 callproc() 方法,第一个参数为存储过程名称。如果存储过程需要参数,以列表的形式作为第二个参数传递给callproc() 方法。如果存储过程返回了结果集,可以调用游标的 stored_results() 获取一个列表迭代器对象,然后通过 fetchall() 方法遍历结果。
- 关闭数据库连接。
以下示例调用了 find_all() 存储过程并打印返回结果:
from mysql.connector import MySQLConnection, Error
from python_mysql_dbconfig import read_db_config
def call_find_all_sp():
try:
db_config = read_db_config()
conn = MySQLConnection(**db_config)
cursor = conn.cursor()
cursor.callproc('find_all')
# print out the result
for result in cursor.stored_results():
print(result.fetchall())
except Error as e:
print(e)
finally:
cursor.close()
conn.close()
if __name__ == '__main__':
call_find_all_sp()
下面的示例调用了 find_by_isbn() 存储过程:
from mysql.connector import MySQLConnection, Error
from python_mysql_dbconfig import read_db_config
def call_find_by_isbn():
try:
db_config = read_db_config()
conn = MySQLConnection(**db_config)
cursor = conn.cursor()
args = ['9787121421402', 0]
result_args = cursor.callproc('find_by_isbn', args)
print(result_args[1])
except Error as e:
print(e)
finally:
cursor.close()
conn.close()
if __name__ == '__main__':
call_find_by_isbn()
列表 args 包含两个元素,第一个元素是 isbn,第二个元素是 0。第二个元素仅仅是 p_title 参数的占位符。
callproc() 方法返回了一个列表(result_args)。该列表包含两个元素,第二个元素(result_args[1])存储了 p_title 参数的值。
36.7 读写 BLOB 对象
更新 BLOB 字段
我们首先创建一个函数 read_file(),用于读取文件内容:
def read_file(filename):
with open(filename, 'rb') as f:
photo = f.read()
return photo
然后,创建一个函数 update_blob(),用于更新指定 id 对应的图书照片:
from mysql.connector import MySQLConnection, Error
from python_mysql_dbconfig import read_db_config
def update_blob(book_id, filename):
# read file
data = read_file(filename)
# prepare update query and data
query = "UPDATE books " \
"SET photo = %s " \
"WHERE id = %s"
args = (data, book_id)
db_config = read_db_config()
try:
conn = MySQLConnection(**db_config)
cursor = conn.cursor()
cursor.execute(query, args)
conn.commit()
except Error as e:
print(e)
finally:
cursor.close()
conn.close()
代码主要步骤如下:
- 调用 read_file() 函数读取文件中的内容;
- 构造一个 UPDATE 语句,用于更新指定 book_id 对应的图书 photo 字段;
- 通过 try except 代码块连接数据库、初始化游标、执行查询语句并提交事务;
- 在 finally 代码块中关闭游标和数据库连接。
测试一下 update_blob() 函数:
def main():
update_blob(1, "pictures\thinking_in_sql.png")
if __name__ == '__main__':
main()
我们可以将下面的图片放入 pictures 目录作为测试。
读取 BLOB 字段
接下来我们从 books 表中的 photo 字段读取数据并写入文件。
首先,创建一个 write_file() 函数,用于写入二进制文件:
def write_file(data, filename):
with open(filename, 'wb') as f:
f.write(data)
然后,创建一个函数 read_blob(),用于读取 BLOB 字段:
def read_blob(book_id, filename):
query = "SELECT photo FROM books WHERE id = %s"
# read database configuration
db_config = read_db_config()
try:
# query blob data form the books table
conn = MySQLConnection(**db_config)
cursor = conn.cursor()
cursor.execute(query, (book_id,))
photo = cursor.fetchone()[0]
# write blob data into a file
write_file(photo, filename)
except Error as e:
print(e)
finally:
cursor.close()
conn.close()
read_blob() 从数据表 books 中读取 BLOB 数据并将其写入 filename 参数指定的文件。
测试一下 read_blob() 函数:
def main():
read_blob(144,"output\thinking_in-sql.jpg")
if __name__ == '__main__':
main()
打开 output 文件夹,如果能够看到一张和导入之前的图片相同的文件,意味着已经成功从数据库中读取了 BLOB 数据。