【 一 】数据库连接池
【 1 】flask操作mysql
from flask import Flask, jsonify
import pymysql
app = Flask(__name__)
app.debug = True
@app.route('/')
def index():
conn = pymysql.connect(
user='root',
password="123123",
host='127.0.0.1',
database='course',
port=3306,
autocommit=True # 通常查询不需要autocommit,但设置为True也没问题
)
cursor = conn.cursor(pymysql.cursors.DictCursor)
sql = 'SELECT * FROM course WHERE meney >= 200' # 确保字段名正确
cursor.execute(sql) # 移除多余的参数
res = cursor.fetchall()
print(res)
cursor.close() # 关闭游标
conn.close() # 关闭数据库连接
return jsonify(res)
@app.route('/home')
def home():
conn = pymysql.connect(
user='root',
password="123123",
host='127.0.0.1',
database='course',
port=3306,
autocommit=True # 通常查询不需要autocommit,但设置为True也没问题
)
cursor = conn.cursor(pymysql.cursors.DictCursor)
sql = 'SELECT * FROM course WHERE name ="java"' # 确保字段名正确
cursor.execute(sql) # 移除多余的参数
res = cursor.fetchall()
print(res)
cursor.close() # 关闭游标
conn.close() # 关闭数据库连接
return jsonify(res)
if __name__ == '__main__':
app.run()
【 2 】数据库连接池
# pip install dbutils 模块,实现数据库连接池
# 先创建出一批链接,每个请求从池中取链接操作
-每个请求用自己的链接对象
-又有池的存在
-数据并发安全,并且链接数不会过高
- 连接池的使用创建一个文件Pool.py文件(名字无所谓)
from dbutils.pooled_db import PooledDB
import pymysql
POOL = PooledDB(
creator=pymysql, # 使用链接数据库的模块
maxconnections=6, # 连接池允许的最大连接数,0和None表示不限制连接数
mincached=2, # 初始化时,链接池中至少创建的空闲的链接,0表示不创建
maxcached=5, # 链接池中最多闲置的链接,0和None不限制
maxshared=3,
# 链接池中最多共享的链接数量,0和None表示全部共享。PS: 无用,因为pymysql和MySQLdb等模块的 threadsafety都为1,所有值无论设置为多少,_maxcached永远为0,所以永远是所有链接都共享。
blocking=True, # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
maxusage=None, # 一个链接最多被重复使用的次数,None表示无限制
setsession=[], # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."]
ping=0,
# ping MySQL服务端,检查是否服务可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always
host='127.0.0.1',
port=3306,
user='root',
password='123123',
database='course',
charset='utf8'
)
- app.py
from flask import Flask, jsonify
import pymysql
from Pool import POOL
app = Flask(__name__)
app.debug = True
@app.route('/')
def index():
conn = POOL.connection()
cursor = conn.cursor(pymysql.cursors.DictCursor)
# id小于等于3的
sql = 'select * from course where id <=%s'
cursor.execute(sql, 3)
res = cursor.fetchall()
print(res)
return jsonify(res)
if __name__ == '__main__':
app.run()
【 二 】flask定制命令
【 1 】新版本定制命令
准备数据
CREATE TABLE users (
id INT AUTO_INCREMENT PRIMARY KEY, -- 设置id为自增主键
username VARCHAR(255) NOT NULL,
password VARCHAR(255) NOT NULL,
name VARCHAR(255) NOT NULL
);
定制命令.py
from flask import Flask
import click
app = Flask(__name__)
@app.cli.command("create-user")
@click.argument("name")
def create_user(name):
from Pool import POOL
conn=POOL.connection()
cursor=conn.cursor()
cursor.execute('INSERT INTO users (username, password,name) VALUES (%s, %s,%s)', (name, '123456','suer')), conn.commit()
print(name)
cursor.close()
conn.close()
@app.route('/')
def index():
return 'index'
if __name__ == '__main__':
app.run()
# 运行项目的命令是:flask --app py文件名字:app run
# 命令行中执行
# flask --app 7-flask命令:app create-user lqz
# 简写成 前提条件是 app所在的py文件名字叫 app.py
# flask create-user lqz
Pool.py
from dbutils.pooled_db import PooledDB
import pymysql
POOL = PooledDB(
creator=pymysql, # 使用链接数据库的模块
maxconnections=6, # 连接池允许的最大连接数,0和None表示不限制连接数
mincached=2, # 初始化时,链接池中至少创建的空闲的链接,0表示不创建
maxcached=5, # 链接池中最多闲置的链接,0和None不限制
maxshared=3,
# 链接池中最多共享的链接数量,0和None表示全部共享。PS: 无用,因为pymysql和MySQLdb等模块的 threadsafety都为1,所有值无论设置为多少,_maxcached永远为0,所以永远是所有链接都共享。
blocking=True, # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
maxusage=None, # 一个链接最多被重复使用的次数,None表示无限制
setsession=[], # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."]
ping=0,
# ping MySQL服务端,检查是否服务可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always
host='127.0.0.1',
port=3306,
user='root',
password='123123',
database='course',
charset='utf8'
)
- 然后在终端使用
flask --app 定制命令.py:app create-user jing
将excel表中的数据传入数据库表
from flask import Flask
from flask.cli import AppGroup
from click import argument
import pymysql
from openpyxl import load_workbook
app = Flask(__name__)
cli = AppGroup('import', help='Excel import commands')
# 这里定义你的数据库连接池,或者直接创建连接
def get_db_connection():
return pymysql.connect(host='localhost', user='root', password='123123', database='school', charset='utf8mb4')
@app.cli.command("import_excel")
@argument("excel_path")
def import_excel(excel_path):
# 加载 Excel 文件
wb = load_workbook(excel_path)
ws = wb.active # 或者使用具体的 sheet 名称
# 建立数据库连接
conn = get_db_connection()
cursor = conn.cursor(pymysql.cursors.DictCursor)
# 遍历 Excel 行并插入数据库
for row in ws.iter_rows(min_row=2, values_only=True): # 假设第一行是标题行,跳过
id,name, money = row # Excel 数据应该与这三个字段对应
# 由于 info 表中的 id 是自增字段,我们不需要指定它
# 这里我们只插入 name 和 money 字段
sql = "INSERT INTO info (id,name, money) VALUES (%s, %s,%s)"
cursor.execute(sql, (id,name, money))
# 提交事务
conn.commit()
# 关闭连接
cursor.close()
conn.close()
print(f"Data imported from {excel_path} to the database.")
# 将命令添加到 Flask CLI
app.cli.add_command(cli)
if __name__ == '__main__':
app.run()
终端使用命令 flask import_excel 1.xlsx
# flask import_excel 文件名
【 2 】django定制命令
# app应用程序下新建文件夹
management/commands/
# 在该文件夹下新建py文件,随便命名(命令名)
from django.core.management.base import BaseCommand
class Command(BaseCommand):
help = '命令提示'
def add_arguments(self, parser):
parser.add_argument('path', nargs='*', type=str,)
def handle(self, *args, **kwargs):
print('开始导入')
print(args)
print(kwargs)
小示例
# app应用程序下新建文件夹
management/commands/
# 在该文件夹下新建py文件,随便命名(命令名)
from django.core.management.base import BaseCommand
from back.models import Role
class Command(BaseCommand):
help = '这个是角色表!!!'
def add_arguments(self, parser):
parser.add_argument('name', nargs='*', type=str,)
def handle(self, *args, **options):
print('开始导入')
print(args)
name = options['name']
Role.objects.create(name=name,status=2)
self.stdout.write(self.style.SUCCESS(f'成功将 {name} 导入Role表中!!!'))
# python manage.py 文件名 传入参数
# python manage.py django定制命令 员工
这里有个小坑这里是直接将['员工']
传入到Role表中
-
nargs='*'
来定义name
参数,这意味着该参数可以接受任意数量的位置参数,并将它们作为一个列表(list)传递给options['name']
。
from django.core.management.base import BaseCommand
from back.models import Role
class Command(BaseCommand):
help = '这个是角色表!!!'
def add_arguments(self, parser):
parser.add_argument('name', type=str,)
def handle(self, *args, **options):
print('开始导入')
name = options['name']
Role.objects.create(name=name,status=2)
self.stdout.write(self.style.SUCCESS(f'成功将 {name} 导入Role表中!!!'))
'''
PS F:\project\Permissions> python manage.py django定制命令 管理员
开始导入
成功将 管理员 导入Role表中!!!
'''