文章目录
- 入门小案例及认识路由
- 小总结
- 配置文件
- 路由系统
- 路由支持正则
- cbv (用的比较少)
- 模板
- 渲染变量及循环
- 请求响应
- pipreqs(找当前项目依赖的包)
- 闪现(flash)
- 请求扩展(类似中间件)
- 猴子补丁
- 蓝图(blueprint)
- threading.local
- g对象的使用
- flask-session
- 数据库连接池
- wtforms(forms组件)
- 信号
- flask-script(制定命令)
- SQLAlchemy
- 一对多关系与多对多关系
- 基于scoped_session实现线程安全
- orm增删改查
- flask-migrate
flask中文网: https://flask.net.cn/
入门小案例及认识路由
from flask import Flask
app = Flask(__name__)
@app.route("/")
def index():
return "hello world"
if __name__ == '__main__':
app.run()
from flask import Flask,request,render_template,redirect
app = Flask(__name__)
@app.route("/hello")
def index():
# 当前请求地址,当前请求携带过来的数据
print(request.path)
return "hello world"
@app.route("/")
def index():
# html模板要建在templates文件夹下
return render_template("index.html")
if __name__ == '__main__':
app.run()
from flask import Flask,request,render_template,redirect,session,url_for
app = Flask(__name__)
app.debug = True
# 如果使用session,必须配置secret_key
app.secret_key ='sdfsdfsdfsdf'
USERS = {
1:{'name':'张三','age':18,'gender' :'男','text':'道路千万条'},
2:{'name':'李四','age':28,'gender':'男','text' :'安全第一条'},
3:{'name':'王五','age':18,'gender':'女','text':'行车不规范'},
@app.route('/detail/<int:nid>',methods=['GET'])
def detail(nid):
user = session.get('user_info')
if not user:
return redirect('/login')
info = USERS.get(nid)
return render_template('detail.html',info=info)
@app.route('/index',methods=['GET'])
def index():
user = session.get('user_info')
if not user:
# return redirect( /login )
url = url_for('11')
return redirect(url)
return render_template('index.html',user_dict=USERS)
@app.route('/login',methods=['GET','POST'],endpoint='11')
def login():
if request.method == "GET":
return render_template('login.html')
else:
# request.query_string
user = request.form.get('user')
pwd = request.form.get('pwd')
if user=='cxw' and pwd=='123':
session['user_info'] = user
return redirect('http://www.baidu.com')
return render_template('login. html',error="用户名或密码错误")
if __name__ == '__main__':
app.run()
//login.html
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Title</title>
</head>
<body>
<h1>用户登录</h1>
<form method='post'>
<input type="text" name='user'>
<input type="text" name='pwd'>
<input type="submit" value='登录'> {{error}}
</form>
</body>
</html>
//detail.html
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Title</title>
</head>
<body>
<h1>详细信息 {{info.name}}</h1>
<div>
{{info.text}}
</div>
</body>
</html>
//index.html
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Title</title>
</head>
<body>
<table>
{% for k,v in user_dict.items() %}
<tr>
<td>{{k}}</td>
<td>{{v.name}}</td>
<td>{{v['name']}}</td>
<td>{{v.get('name')}}</td>
<td><a href='/detail/{{k}}'}*>查看详细</a></td>
</tr>
</table>
</body>
</html>
小总结
@app.route('/login',methods=['GET','POST'],endpoint='11')
-第一个参数是路径
-methods=['GET','POST'],请求方式
-endpoint='11',别名,如果不写,使用函数名作为别名
路由的本质: self.add_url_rule
# 另一种路由注册方式
def xxx():
return 'xxx'
app.add_url_rule('/xxx',view_func=xxx)
配置文件
from flask import Flask
app = Flask(__name__)
#第一种方式
# app的配置文件全在config字典中,但是有一些常用的,比如debug, 会直接提到app这一层
#app.debug=True
#app.config['DEBUG'] = False
#第二种方式
#app.config.from_pyfile("seting.py")
#第三种方式
app.config.from_object("mysettings.TestingConfig")
@app.route("/")
def index():
return "hello world"
if __name__ == '__main__':
app.run()
# SECRET_KEY:如果使用session,必须配置
# SESSION_COOKIE NAME: cookie名字
# 数据库地址,端口号,也要放到配置文件中,但是不是的置的参数
# flask内 置session如何实现的?
-通过SECRET KEY加密以后,当做cookie返回给浏览 器
-下次发送请求,携带cookie过来,反解,再放到session中
```python
# seting.py
DEBUG=True
# mysettings.py
class Config(object) :
DEBUG = False
TESTING = False
DATABASE LURI ='sqlite://:memory:'
class TestingConfig(Config): # 测试阶段
DEBUG = True
TESTING = False
DATABASE URI ='sqlite://:memory:'
class DevelopmentConfig(Config): # 开发阶段
DEBUG = False
TESTING = False
DATABASE URI ='sqlite://:memory:'
路由系统
基本使用请见上第一小节
# 基本使用
@app.route('/detail/<int:nid>',methods=['GET'], endpoint='detail')
转换器
DEFAULT_CONVERTERS = {
'default': UnicodeConverter,
'string': UnicodeConverter,
'any': Anyconverter,
'path': Pathconverter,
'int': IntegerConverter,
'float': FloatConverter,
'uuid': UUIDConverter,
}
@app.route和app.add_url_ rule 参数:
rule,URL规则
view_func,视图函数名称
defaults = None, 默认值,当URL中无参数, 函数需要参数时,使用defaults = {'k': 'v'} 为函数提供参数
endpoint = None, 名称, 用于反向生成URL,即: url_for('名称')
methods = None,允许的请求方式,如: ["GET", "POST"]
#对URL最后的/符号是否严格要求,默认严格,False, 就是不严格
strict_slashes = None
#重定向到指定地址
redirect_to = None ,
@app. route(' /index/<int:nid>',redirect_ to= ' /home/ <nid> ')
#子域名访问
subdomain = None
路由支持正则
#1写类,继承BaseConverter
#2注册: app.url_map.converters['regex'].RegexConverter
#3使用: @app.route('/index/<regex("\d+"):nid>")正则表达式会当作第二个参数传递到类中
from flask import Flask,vlews,url_for
from werkzeug.routing import BaseConverter
app = Flask(import_name=__name__)
class RegexConverter(BaseConverter):
#自定义URL匹配正则表达式
def __init__(self, map, regex):
super(RegexConverter, self).__init__(map)
self.regex=regex
def to_python(self, value):
#路由四配时,匹配成功后传递给视图正数中参数的值
return int(value)
def to_url(self, value):
#使用ur1_for反向生成URL时,传递的参数经过该方法处理,返回的值用于生成URL中的参数
val=super(RegexConverter, self).to_ url(value)
return val
#添加到flask中
app.url_map.converters['regex']=RegexConverter
@app.route('/index/<regex("\d+"):nid>')
def index(nid):
print(url_for("index", nid='888'))
return 'Index'
if __name__ == '__main__':
app.run()
cbv (用的比较少)
from flask import Flask
from flask import views
app = Flask(__name__)
#class IndexView(views.View):
# methods = ['GET']
# # decorators = [auth,]
# def dispatch_request(self):
# print('Index')
# return 'Index!'
def auth(func):
def inner(*args,**kwargs):
print('before')
result = func(*args,**kwargs)
print('after')
return result
return inner
class IndexView(views.MethodView):
methods = ['GET'] #指定运行的请求方法
#登录认证装饰器加在哪?
decorators = [auth, ] #加多 个就是从上往下的效果
def get(self):
return "我是get请求"
def post(self):
return "我是post请求"
#路由如何注册?
# IndexView.as_view('index'),必须传name
app.add_url_rule('/index',view_func=IndexView.as_view('index'))
if __name__ == '__main__':
app.run()
#用的比较少
#继承views.MethodView, 只需要写get, post, delete方法
#如果加装饰器decorators = [auth, ]
#允许的请求方法methods = ['GET']
模板
渲染变量及循环
<!IDOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Title</title>
</head>
<body>
<h1>用户列表</h1>
<table>
{% for k,v in user_dict.items() %}
<tr>
<td>{{k}}</td>
<td>{{v.name}}</td>
<td>{{v['name']}}</td>
<td>{{v.get("nane")}}</td>
<td><a href="/detail/{{k}}">查看详细</a></td>
</tr>
{% endfor %}
</table>
</body>
</html>
请求响应
from flask import Flask
from flask import request
from flask import render_template
from flask import redirect
from flask import make_response,jsonify
app=Flask(__name__)
@app.route('/login.html', mnethods=['GET','POST'])
def login():
#请求相关信息
# request.method 提交的方法
# request.args get请求提及的数据
# request.form post请求提交的数据
# request.values post和get提交的数据总和
# request.cookies 客户端所带的cookie
# request.headers 请求头
# request.path 不带域名,请求路径
# request.full_path 不带域名 ,带参数的请求路径
# request.script_root
# request.url 带域名带参数的请求路径
# request.base_url 带域名请求路径
# request.url_root 域名
# request.host_url 域名
# request.host 127.0.0.1:500
# request.fles
# obj = request.flles["the_flle. name"]
# 响应对象
# 响应相关信息
#return '字符串'
# return render_template( html模板路径,**{})
# return redirect('/index. html')
#对着django, JsonResponse
# return jsonify({'k1':' v1'})
aa='hello world'
res=make_response(aa)
res.set_cookie('xxx','1qz')
res.headers['X-Something'] ='A value'
return res
# response = make_response(render_template( 'index.html'))
# response是flask.wrappers.Response类型
# response.delete_cookie('key')
# response.set_cookie('key','value')
# response.headers['X-Something'] ='A value'
# return response
# return 'hello'
pipreqs(找当前项目依赖的包)
pip install pipreqs
在项目根目录下执行命令:(生成requirements.txt文件)
pipreqs ./ --encoding=utf-8
执行以下命令安装所有组件:
pip install -r requirements.txt
闪现(flash)
-设置:flash(‘aaa’ )
-取值: get_flashed_message()
-假设在a页面操作出错,跳转到页面,在b页面显示a页面的错误信息
from flask import Flask, request, jsonify, make_response, session, flash
app=Flask(__name__)
app.debug=True
app.secret_key='dcdsjhcksjsdhshd'
@app.route('/',methods=['GET', 'POST'])
def index():
flash('lqz-nb') #往某个位置放值
return 'hello'
@app.route('/order',methods=['GET','POST'])
def order():
return get_flashed_messages() #从某个位置取出来
if __name__ ='__main__':
app.run()
请求扩展(类似中间件)
from flask import Flask, request, jsonify, make_response, session, flash,redirect
app=Flask(__name__)
app.debug=True
app.secret_key='dcdsjhcksjsdhshd'
#请求来了就会触发,类似于django的process_ request,如果有多个,顺序是从上往下
@app.before_request
def before(*args,**kwargs):
print('请求来了执行我')
if request.path=='/login':
return None
else:
name=session.=get('user')
if not name:
return redirect('/login')
else:
return None
@app.route('/login',methods=['GET', 'POST'])
def index():
session['user']='lqz'
return 'login'
@app.route('/order',methods=['GET', 'POST'])
def index():
return 'order'
#请求走了就会触发,类似fdjango的process. response,如果有多个,顺序是从下往上执行
@app.after_request
def after(response):
print('我走了')
return response
# 项目启动起来第一次会走, 以后都不会走了
@app.before_first_request
def first():
print('可以做初始化操作')
# 每一个请求后都会走,即使发生错误
@app.teardown_request
def ter(e):
print(e)
print('可以记录错误日志')
return '我错了'
@app.errorhandler(500)
def error_500(arg):
return render_template('error. html',message='500错误')
@app.errorhandler(404)
def error_404(arg):
return render_template('error.htm1',message='404错误')
# 全局标签 html页面引用 {{sb(3,4)}}
@app.template_globa1()
def sb(a1, a2):
return a1 + a2
# 全局过滤器 {{1|db(2,3)}}
@app.template_filter()
def db(a1, a2, a3):
reture a1+a2+a3
if __name__ ='__main__':
app.run()
猴子补丁
一个概念,面向对象理念
#有什么用?
#这里有一个比较实用的例子,
#很多用到import json,
#后来发现yjison性能更高,
#如果觉得把每个文件的import json改成import uison as json成本较高,
#或者说想测试一下uison替换是否符合预期, 只需要在入口加上:
#只需要在程序入口
import json
import ujson
def monkey_patch_json():
json.__name__ ='ujson'
json.dumps = ujson.dumps
json.loads = ujson.loads
monkey_patch_json()
aa= json.dumps({'name':'1qz','age':19})
print(aa)
#协程,单线程下实现并发
# from gevent import monkey;monkey.patch_all()
import gevent
import time
def eat():
print('eat food 1')
#time.sleep(2)
gevent.sleep(2) # 没有用猴子补丁的时候用这个
print(' eat food 2' )
def p1ay():
print(' play 1')
#time.sleep(1)
gevent.sleep(1) # 没有用猴子补丁的时候用这个
print(' play 2' )
g1= gevent.spawn(eat)
g2 gevent.spawn(play)
gevent.joinall([g1, g2])
print('主')
蓝图(blueprint)
1没有蓝图之前前,都是单文件
2有了蓝图可以分文件
蓝图使用:
#第一步在app中注册蓝图,括号里是一个 蓝图对象
app.register_b1ueprint(user.us)
#第二步,在不同文件中注册路由时,直接使用蓝图对象注册,不用使用app了,避免了循环导入的问题
@account.route('/1ogin. htm1',methods=['GET', "POST"])
中小型项目目录划分:
项目名字:
-pro_ flask文件夹
-templates
login.htm1
-statics
code.png
-views
blog.py
account.py
user.py
-__init__.py
-run.py
threading.local
#不用1ocal
from threading import Thread
import time
1qz = -1
def task(arg):
global 1qz
1qz = arg
# time.sleep(2)
print(1qz)
for i in range(10) :
t =Thread(target=task,args=(i,))
t. start()
from threading import Thread
from threading import 1ocal
import time
from threading import get_ident
#特殊的对象
1qz = 1oca1()
def task(arg):
#对象.val = 1/2/3/4/5
1qz.value = arg
time.sleep(2)
print(1qz.value)
for i in range (10):
t = Thread(target=task,args=(i,))
t.start()
from threading import get_ident, Thread
inport time
storage = {}
def set(k,v):
ident = get_ident()
if ident in storage:
storage[ident][k] = v
else:
storage[ident] = {k:v}
def get(k):
ident = get_ident()
return storage[ident][k]
def task(arg):
set('val',arg)
v = get('val')
print (v)
for i in range(10) :
t = Thread(target=task, args=(i,))
t. start()
# 面向对象版
from threading import get_ident,Thread
import time
class Local(object):
storage = {}
def set(self, k, v):
ident = get_i dent()
if ident in Local.storage:
Loca1.storage[ident][k] = v
else:
Local.storage[ident] = {k: v}
def get(self, k):
ident = get_ident()
return Local.storage[ident][k]
obj = Local()
def task(arg):
obj.set('va1', arg)
v = obj.get('val')
print(v)
for i in range(10):
t =Thread(target=task, args=(i,))
t. start()
# 最终版
from threading import get_ ident,Thread
import time
class Local(object):
# storage = {} 类的storage
def __init__(self):
object.__setattr__(self,'storage',{})
def __setattr__(self, k,v):
ident = get_ident()
if ident in Local.storage:
Local.storage[ident][k] = v
else:
Local.storage[ident] = {k: v}
def __getattr__(self, k):
ident = get_ident()
return Local.storage[ident][k]
obj = Local()
obj2 = Local()
def task(arg):
obj.val = arg
print(obj.va1)
for i in range(10):
t = Thread(target=task ,args=(i,))
t. start()
g对象的使用
#1 g是一个全局变量,在当前请求中可以放值,取值
#2 g和session有什么不同? session只要是这个人,没有删除session,每次请求都会有值
# session对象是可以跨request的, 只要session还未失效, 不同的request的请求会获取到同一个session, 但是g对象不是,g对象不需要管过期时间,请求次就g对象就改变了一次,或者重新赋值了一次
flask-session
作用:将默认保存的签名cookie中的值保存到redis/memcached/file/Mongodb/SQLAlchemy
安装: pip3 install flask-session
使用1:
from flask import Flask, session
from flask_session import RedissessionInterface
import redis
app = Flask(__name__)
conn=redis.Redis(host='127.0.0.1',port-6379)
#use_signer是否对key签名
app.sesslon_interface=Redi ssessionIntenface(conn,key_prefix='1qz')
@app.route('/')
def hello_world():
session['name']='1qz'
return "Hello Horld!"
if __name__== '__main__':
app.run()
使用2:(本质跟上面一样)
from redis import Redis
from flask.ext.session import Session
app.config['SESSION_TYPE'] = 'redis'
app.config['SESSION_REDIS'] = Redis(host='192.168.0.94',port='6379')
Session(app)
问题:设置cookie时, 如何设定关闭浏览器则cookie失效。
response.set_cookie('k','v',exipre=None) #这样设置即可
#在session中设置
app.session_interface=RedisSessionInterface(conn,key_ prefix='1qz',permanent=False)
# 一般不用,我们一般都设置超时时间,多长时间后失效
问题: cookie默认超时时间是多少?如何设置超时时间
#源码expires = self.get_expiration_time(app,session)
'PERMANENT_SESSION_LIFETIME':
timedelta(days=31),这个配置文件控制
数据库连接池
【参考】https://www.cnblogs.com/liuqingzheng/articles/9006055.html
pymsq|链接数据库
import pymysql
conn= pymysql.connect(host='127.0.0.1', port=3306, user='root',passwd='123456', db='s8day127db')
cursor = conn.cursor(cursor=pymysql.cursors=.DictCursor)
# cursor . execute("select id,name from users where name=%s and pwd=%s",['1qz','123',])
cursor.execute("select id,name from users where name=%(user)s and pwd=%(pwd)s" ,{'user':'1qz','pwd':'123'})
obj = cursor.fetchone()
conn.commit()
cursor.close()
conn.close()
print(obj)
wtforms(forms组件)
1校验数据
2渲染标签
from wtforms import Form
from wtforms.fields import simple
from wtforms import validators
from wtforms import widgets
app = Flask(__name__,template_folder='templates')
app.debug = True
class LoginForm(Form):
#字段(内部包含正则表达式)
name = simple.StringField(
label='用户名',
validators=[
validators.DataRequired(message='用户名不能为空.'),
validators.Length(min=6, max=18,message='用户名长度必须大于% (min)d且小于% (max)d')
],
widget=widgets.TextInput(), #页面上显示的插件
render_kw={'class':'form-control' }
)
#字段(内部包含正则表达式)
pwd = simple.PasswordField(
label='密码',
validators= [
validators.DataRequired (message='密码不能为空'),
validators.Length(min=8, message='用户名长度必须大于%(min)d'),
validators.Regexp(regex="^(?=.*[a-z])(?=.*[A-Z])(?=.*\d)(?=.*[$@$!%*?&])[A-Za-z\d$@$ !%*?&]{8,}",
message='密码至少8个字符,至少1个大写字母,1个小写字母,1个数字和1个特殊字符')
],
widget=widgets.PasswordInput(),
render_kw={'class':'form-control'}
)
@app.route('/login',methods=["GET",'POST'])
def login():
if request.method =='GET':
form = LoginForm()
return render.template('login.html',form=form)
else:
form = LoginFor(formdata=request.form)
if form.validate():
print('用户提交数据通过格式验证,提交的值为:',form.data)
else:
print(form.errors)
return render_template('login. html',form=form)
if __name__=='__main__':
app.run()
<!DOCTYPE html>
<html lang=" en">
<head>
<meta charset="UTF-8">
<title>Title</title>
</head>
<body>
<h1>登录</h1>
<form method="post" >
<p>{{form.name.label}} {{form.name}} {{form.name.errors[0] }}</p>
<p>{{form.pwd.label1}} {{form.pwd}} {{form.pwd.errors[0]}}</p>
<input type=" submit" value="提交">
</form>
</body>
</html>
信号
Flask框架中的信号基于blinker,其主要就是让开发者可是在flask请求过程中定制一一些用户行为
安装: pip3 install blinker
内置信号:
request_started =_signals.signal('request-started')
#请求到来前执行
request_finished = _signals.signal('request-finished')
#请求结束后执行
before_render.template =_signals.signal('before -render-template') # 模板渲染前执行
template_rendered =_signals.signal('template-rendered')
#模板渲染后执行
got_request_exception =_signals.signal('got-request-exception')
#请求执行出现异常时执行
request_tearing.down =_signals.signal('request-tearing-down')
#请求执行完毕后自动执行(无论成功与否)
appcontext_tearing_down =_signals.signal('appcontext-tearing-down')#应用上下文执行完毕后自动执行(无论成功与否)
appcontext_pushed =_signals.signal('appcontext -pushed')
#应用上下文push时执行
appcontext_popped =_signals.signal('appcontext-popped')
#应用上下文pop时执行
message_flashed =_signals.signal('message-flashed')
#调用flask在其中添加数据时,自动触发
from flask import Flask,signals
app=Flask(__name__,static_url_path='/static',static_folder='static',template folder='templates')
#第一步:写一个函数(触发果些动作)
#往信号中注册函数
def func(*args,**kwargs):
print('触发信号',args, kwargs)
#第二步:函数跟内置信号绑定
signals.request_started.connect(func)
@app.route("/")
def index():
return 'xxx'
if __name__='__main__':
app.run()
######内置信号的使用
##第一步写一个函数(触发某些动作)
#往信号中注册函数
def func(*args, ** kwargs):
print(args[0]) # 当前app对象
print('触发信号' ,args,kwargs)
#第二步:函数跟内置信号绑定
signals.request_started.connect(func)
####自定义信号的使用
#自定义信号
##第一步:定义一个信号
# xxxxx=_ signals.signal('xxxxx')
##第二步:定义一个函数
# def func3(*args, **kwargs):
# import time
# time. sleep(1)
# print('触发信号' ,args,kwargs)
# #第三步:信号跟函数绑定
# xxxxx.connect(func3)
#第四步:触发信号
xxxxx. send(1,k='2')
flask-script(制定命令)
1模拟出类似django的启动方式: python manage. py runserver
2 pip install flask-script
自定制命令:
@manager.command
def custom(arg):
"""
自定义命令
python manage.py custom 123
:param arg:
:return:
"""
print(arg)
@manager.option('-n', '--name', dest= 'name')
#@manager.option('-u', '--url', dest='url')
def cnd(name, url):
"""
自定义命令(-n也可以写成--name)
执行: python manege.py cnd -n 1qz -u http://www.oldboyedu.com
执行: python marlage.py cmd --name 1qz --url http://ww.oldboyedu.com
:param name:
:param url:
:return:
"""
print(name, url)
#有什么用?
#把excel的数据导入数据库,定制个命令,去执行
# python manage.py insertdb -f xxx.exc1 -t aa
SQLAlchemy
pip install sqlalchemy
1执行原生sql (不常用)
import time
import threading
import sqlalchemy
from sqlalchemy import create.engine
from sqlalchemy.engine.base import Engine
engine=create_engine(
"mysql+pymysq1://root:123456@127.0.0.1:3306/test?charset=utf8" ,
max_overflow=0,# 超过连接池大小外最多创建的连接
pool_size=5, #连接池大小
pool_timeout=30,# 池中没有线程最多等待的时间,否则报错
pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收 (重器)
def task(arg):
conn = engine.raw_connection()
cursor = conn.cursor()
cursor.execute(
"select * from app01_book")
result = cursor.fetchall()
print(result)
cursor.close()
conn.close()
2 orm使用
models,py
import datetime
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column,Integer,String,Text,ForeignKey,DateTime,UniqueConstraint,Index
Base=declarative_base()
class Users(Base):
__tablename__ = 'users' #数据库表名称
id = Column(Integer, primary key=True) # id主键
name = Column(String(32),index=True,nullablemFalse) # name列, 索引,不可为空
# email = Column(String(32), unique=True)
#datetime.datetime.now不能加括号,加了括号,以后永远是当前时间
# ctime = Column(DateTime, default=datetime.datetime.now)
# extra = Column(Text, nullable=True)
__table_args__ = (
# UniqueConstraint('id', 'name', name='uix_ id_ name'), #联合唯一
# Index('ix_id_name', ”name', 'email'), #索引
)
def init_db():
"""
根据类创建数据库表
: return:
"""
engine = create_engine(
'mysql+pymysql://root:123456@127.0.0.1:3306/ aaa?charset-utf8',
max_overflow=0, # 超过连接池大小外最多创建的连接
pool_size=5, # 连接池大小
pool_timeout=30, # 池中没有线程最多等待的时间,否则报错
pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收 (重置)
)
#通过engine对象创建表
Base.metadata.create_all(engine)
#删除表
def drop_table():
#创建engine对象
engine = create_engine(
"mysql+pymysql://root:123@127. 0. 0. 1:3306/ aaa?charset=utf8",
max_overflow=0,# 超过连接池大小外最多创建的连接
pool_size=5, #连接池大小
pool_timeout=30, # 池中没有线程最多等待的时间,否则报错
pool_recycle=-1 #多久之后对线程池中的线程进行一-次连接的回收(重置)
#通过engine对象删除所有表
Base.metadata.drop_all(engine)
if __name__ = '__main__':
create_table()
# drop_table()
app.py
from sqlalchemy.orm import sessionmaker
from sqlalchemy import create engine
from models import Users
#"mysql+pymysq1://root@127.0.0.1:3306/aaa"
engine = create.engine("mysq1+pymysql://root:123456@127 .0.0.1:3306/aaa", max_overf1ow=0, pool_size=5)
Connection = sessionmaker(bind=engine)
#每次执行数据库操作时,都需要创建一个Connection
con = Connection()
#########执行ORM操作#############
obj1 = Users(name="1qz")
con.add(obj1)
#提交事务
con.commit()
#关闭session,其实是将连接放回连接池
con.close()
一对多关系与多对多关系
#一个Hobby可以有很多人喜欢
#一个人只能由一个Hobby
class Hobby (Base):
__tablename__=' hobby'
id = Column(Integer, primary_key= True)
caption = Column(String(50),default='篮球')
class Person(Base):
__tablename__='person'
nid = Column(Integer, primary_key=True)
name = Column(String (32),index=True,nullable=True)
# hobby指的是tablename而不是类名,uselist=False
#一对多的关系,关联字段写在多的一”方
hobby_id = Column(Integer, ForeignKey('hobby.id'))
#跟数据库无关,不会新增字段,只用于快速链表操作
#类名,backref用于反向查询
#hobby=relationship('Hobby',backref='pers')
# 多对多关系
class Boy2Girl(Base):
__tablename__ = 'boy2girl'
id = Column(Integer, primary_key=True,autoincrement=True)
girl_id = Column(Integer, ForeignKey('girl.id')
boy_id = Column(Integer, ForeignKey('boy.id'))
class Girl(Base):
__tablename__ = "girl"
id = Column(Integer, primary_key=True)
name = Column(String(64),unique=True,nullable=False)
class Boy(Base):
__tablename__ = 'boy'
id = Column(Integer, primary_key=True,autoincrement=True)
hostname = Column(String(64), unique=True,nullable=False)
#与生成表结构无关,仅用于查询方便,放在哪个单表中都可以
servers = relationship('Girl', secondary= 'boy2girl', backref="boys")
基于scoped_session实现线程安全
from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
from sqlalchemy.orm import scoped.session
from models import Users
engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s6", max_overflow=0, pool_size=5)
Session = sessionmaker(bind=engine)
"""
#线程安全,基于本地线程实现每个线程用同-个session
#特殊的: scoped_ session中有 原来方法的Session中的一下方法:
public_ methods - (
'__contains__' ,'__iter__','add', 'add_ all','begin', 'begin. nested',
'close', 'commit', 'connection', 'delete','execute', 'expire',
'expire_all', 'expunge', ' expunge_ all', 'flush', 'get. bind',
'is_ modified', 'bulk_ save_ objects', 'bulk_ insert. mappings',
'bulk_ update_ mappings ' ,
'merge', 'query', 'refresh', 'rollback',
'scalar'
)
"""
#scoped_session类并没有继承Session,但是却又它的所有方法
session = scoped_session(Session)
#共**.*.**.执行ORM操作共#.**.*.#
obj1 = Users(name="alex1")
session.add(obj1)
# 1制作engine
engine = create_engine('mysql+pymysql://root: 123@127.0.0.1:3306/aaa',max_overflow=0, pool_size=5)
#2制造一个session类(会话)
Session = sessionmaker(bind=engine)
#得到一个类
# 3得到一个session对象(线程安全的session)
#现在的session已经不是session对象了
#为什么线程安全,还是用的local
session = scoped_session(Session)
# session=Session() #得到个session对 象
# conn=Session(
# 4创建一个对象
obj1 = User(name="lqz")
# 5把对象通过add放入
session.add(obj1)
#6提交
session.commit()
orm增删改查
from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
from sqlalchemy.orm import scoped.session
from models import User
engine = create_engine('mysql+pymysql://root: 123@127.0.0.1:3306/aaa',max_overflow=0, pool_size=5)
Session = sessionmaker(bind=engine)
session = scoped_session(Session)
###1批量添加
obj=User(name='xxx')
obj2=User(name='yyyy')
obj3=User(name='zzz')
session.add_all([obj, obj2, obj3])
###2简单删除(查到删除)
# res=session.query(User).filter_by(name='2008').delete()
res=session.query(User).filter(User.id>3).delete()
#影响1行
print(res)
##3修改
res=session.query(User).filter_by(id=1).update({'name': 'egon'})
###4基本查询操作
res=session.query(User).al1()
print(type(res))
res=session.query(User).first()
print(res)
# 执行原生sql
cursor = session.execute('insert into users(name) values (value)',params={"value":'1qz'})
print(cursor.lastrowid)
session.commit()
#并没有真正关闭连接,而足放回池中
session.close()
flask-migrate
离线脚本,创建表
详见代码
flask-migrate
python3 manage.py db init 初始化:只执行一次
python3 manage.py db migrate 等同于makemigartions
python3 manage.py db upgrade 等同于migrate
Flask-SQLAlchemy如何使用
1 from flask_sq1alchemy import SQLAlchemy
2 db = SQLA1chemy()
3 db.init_app(app)
4以后在视图函数中使用
-db.session就是咱们讲的session
flask-migrate的使用(表创建,字段修改)
1 from flask_migrate import Migrate ,MigrateCommand
2 Migrate(app, db)
3 manager.add_command('db' ,MigrateCommand)
【参考】https://www.bilibili.com/video/BV11N411f7Ga
【参考】https://www.cnblogs.com/xiaoyuanqujing/category/1561311.html