目录
基础功能
简单封装
源码等资料获取方法
基础功能
import pymysql
from pymysql.cursors import DictCursor # 导入字典类型的游标对象
# 连接数据库
db = pymysql.connect(host='192.168.3.109', # 数据库IP地址
port=3306, # 数据库端口号
user='root', # 数据库用户名
password='123456', # 数据库用户密码
db='test') # 连接的数据库名称
# cursor() 方法可以创建一个游标对象
# 如果不指定游标返回的数据,则返回的数据默认为元组类型
# cursor = db.cursor()
# 指定为dict类型游标,则返回的数据为字典类型
cursor = db.cursor(DictCursor)
# execute() 方法可以执行所有的原生SQL,成功执行则结果返回数字
# 比如:创建表格,则返回0
ret = cursor.execute("CREATE TABLE testsheet (name VARCHAR(20), gender CHAR(1),age int(2))") # 创建表格
print(F"创建表格:{ret}")
# 比如:插入数据,则返回插入记录数
ret = cursor.execute("INSERT INTO testsheet (name,gender,age) VALUES('张三', '男', 22),('三二一', '女', 22)")
print(F"插入数据:{ret}")
# 比如:查询数据,则返回记录数
ret = cursor.execute("select * from testsheet")
print(F"记录数:{ret}")
# 比如:删除表格,则返回0
ret = cursor.execute("DROP TABLE testsheet")
print(F"删除表格:{ret}")
# 使用fetchall()和fetchone()可以提取查询的数据,提取后数据删除
# fetchall() 提取所有数据,如果查询的数据为空,则返空,比如(),[];一次查询,多次提取,则返空
cursor.execute("select * from info")
ret = cursor.fetchall()
print(F"fetchall提取数据:{ret}")
ret = cursor.fetchall()
print(F"fetchall再次提取数据:{ret}")
# fetchone() 提取第一条数据,如果查询的数据为空,则返回None;一次查询,多次提取,则返回提取后的第一条数据,直到提取完,则返None。
cursor.execute("select * from info")
ret = cursor.fetchone()
print(ret)
ret = cursor.fetchone()
print(ret)
# 强调:使用execute()执行有参数的sql语句时,如果sql语句直接使用字符串初始化的方法存在SQL注入风险;需要使用execute(sql, 参数)的方式执行,防止SQL注入
# 比如查询ID为1的数据
id = 1
sql = "select * from info where id='%s'" % (id,)
cursor.execute(sql)
ret = cursor.fetchall()
print(F"查询ID为1的数据结果:{ret}")
# 如果传入的值被恶意修改成下面的值,那么数据就存在脱库的安全问题
id = "1' or '1=1"
sql = "select * from info where id='%s'" % (id,)
cursor.execute(sql)
ret = cursor.fetchall()
print(F"SQL注入的查询结果:{ret}")
# 防SQL注入的执行方式,如果使用上面的id参数,则会报SQL语句错误
sql = "select * from info where id='%s'"
try:
cursor.execute(sql, (id,))
ret = cursor.fetchall()
except:
ret = "执行报错"
print(F"防SQL注入的查询结果:{ret}")
# rollback() 回滚数据。只要没有commit,可以使用 rollback 回滚所有对数据库进行了修改的操作(增、删、改)
# db.rollback()
# 提交游标的操作到数据库。
db.commit()
# 关闭游标
cursor.close()
# 关闭数据库连接
db.close()
简单封装
import pymysql
from pymysql.cursors import DictCursor # 导入游标类型对象
class Mysql(object):
def __init__(self, host, user, password, port, db):
# 连接数据库
self.database = pymysql.connect(host=host, user=user, password=password, port=port, db=db)
print("连接数据库")
# 获取游标对象
self.cursor = self.database.cursor() # 使用该游标返回的数据为元组类型
print(self.cursor)
self.cursor = self.database.cursor(DictCursor) # 使用该游标返回的数据为字典类型
print(self.cursor)
def __del__(self):
"""对象销毁进行资源回收"""
# 关闭游标
self.cursor.close()
# 关闭数据库连接
self.database.close()
print("__del__被执行")
def execute(self, query, args=None, force=None):
"""
执行SQL
:param query: sql语句
:param args: sql语句中的参数
:param force: 是否跳过对sql语句"where"的检查
:return:返回游标对象
"""
# 对query传入的sql语句校验是否包含"where",若包含,args又没传参数则抛异常。校验的目的是防SQL注入
if force is None:
query = query.lower()
if "where" in query and args is None:
raise("检查到SQL中包含条件语句,但args参数为空,存在SQL注入的风险,若仍要执行,请将参数force的值修改为True")
self.cursor.execute(query, args)
# 返回游标对象就可以采用 execute().fetchone()的方式获取值
return self.cursor
def commit(self):
"""提交数据,提交失败则回滚"""
try:
self.database.commit()
return 0
except Exception as e:
self.database.rollback()
return -1
if __name__ == '__main__':
db = Mysql(host="192.168.0.58", user='root', password='123456', port=3306, db="test")
_id = "1 or 1=1"
code = "000036"
# SQL注入执行方式
sql = "select * from info WHERE id=%s or code=%s" % (_id, code)
# ret = db.execute(sql).fetchall()
ret = db.execute(sql, force=True).fetchall()
print(ret)
print(F"SQL注入获取的数据数:{len(ret)}")
# 防SQL注入的执行方式
sql = "select * from info where id=%s or code=%s"
ret = db.execute(sql, (_id, code)).fetchall()
print(ret)
print(F"防SQL注入获取的数据数:{len(ret)}")
print("="*50)
# 更新数据
_id = 1
code = "000011"
sql = "UPDATE `d`.`info` SET `code` = %s WHERE `id` = %s"
db.execute(sql, (code, _id))
ret = db.commit()
if ret == 0:
print("数据更新成功")
else:
print("数据更新失败")
# 获取指定字段值
_id = 1
sql = "select * from info where id=%s"
ret = db.execute(sql, (_id,)).fetchone()
print(f"获取id为{_id}的code字段值:{ret.get('code')}")
执行结果
源码等资料获取方法
各位想获取源码等教程资料的朋友请点赞 + 评论 + 收藏,三连!
三连之后我会在评论区挨个私信发给你们~