前言:
工作需求,写一个接口,用Python来编写,我首先想到用flask小型框架来支撑,配置sqlalchemy来实现,但是在实现的过程中,发生循环导入问题
我想到用蓝图来解决此问题,但是仍然会出死循环的问题,网上找了很多资料,许多都是轻描淡写,可能不是很明白,为了能够帮助更多踩坑的人,分享一下自己的经历!!
了解导入循环的机制
报错代码:
cannot import name '......' from partially initialized module '.....'
Python的 死循环机制:
当模块之间相互导入时,两个或多个模块之间相互依赖,它们通过import
语句相互引用对方,而没有一个明确的导入顺序或者依赖管理来打破这种循环,形成了一个闭环,当尝试从任何一个模块中导入或执行函数时,Python 解释器将陷入无限循环的导入过程中
比如:A导入B,B导入A
模块A:module_a.py
import module_b
def func_a():
print("Function A")
module_b.func_b()
模块B:module_b.py
import module_a
def func_b():
print("Function B")
module_a.func_a()
比如:A导入C,B导入A,C导入B
模块A:test1.py
import test3
def t1():
print('test1')
test3.t3()
t1()
模块B:test2.py
import test1
def t2():
test1.t1()
print("test2")
模块C:test3.py
import test2
def t3():
test2.t2()
print("test3")
解决sqlalchemy集成flask导入死循环的问题
言归正传,要解决问题,要注意db实例化的位置,app导入的位置,蓝图导入的顺序!!!
解决方案:
1. 在cab_api_test/views/__init__.py 文件下,实例化db,函数封装app
3. 导入蓝图时,如果蓝图有引用db,位置需要在db的下方
2. 在models中,main语句下,导入app,如果在全局还是会发生循环问题
可能说的有些不清不楚,还是通过代码体现,就能明白啦,每步骤关键都有注释,注意好导入的顺序,和导入的位置,主要是app和db的位置,和模型创建用到的app导入问题!!
目录结构:
cab_api_test
├── app.py
├── gunicorn_config.py
├── logs
│ ├── cab_api_access.log
│ └── cab_api_error.log
├── requirements.txt
└── views
├── __init__.py
├── __pycache__
│ ├── __init__.py
│ ├── cab_api.py
│ ├── filed_heck.py
│ ├── logger.py
│ ├── models.py
│ └── session.py
cab_api_test/views/__init__.py
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
db = SQLAlchemy()
# 1. 实例化db时 要注意导入蓝图的顺序!!!
# 2. 如果蓝图中有引用db,在这导入的蓝图必须在db的下面,不然还是会报错
# 3. 我是用的蓝图是 cab_api.py
from views.cab_api import cab_api
host = '127.0.0.1'
port = 3306
user = 'root'
password = 'wq123456'
database = 'gf_test'
def create_app():
app = Flask(__name__)
app.config['DEBUG'] = True
# sqlalchemy 连接池配置
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
app.config[
'SQLALCHEMY_DATABASE_URI'] = f'mysql+pymysql://{user}:{password}@{host}:{port}/{database}?charset=utf8mb4&autocommit=true'
app.config['SQLALCHEMY_POOL_SIZE'] = 10
app.config['SQLALCHEMY_POOL_TIMEOUT'] = 10
app.config['SQLALCHEMY_MAX_OVERFLOW'] = 5
app.config['SQLALCHEMY_POOL_PRE_PING'] = True
app.config['SQLALCHEMY_ECHO'] = False
# 把app对象初始化到db中
db.init_app(app=app)
app.register_blueprint(cab_api, url_prefix='/api')
return app
cab_api_test/views/models.py
import datetime
from uuid import uuid4
from sqlalchemy import Column, String, Text, DateTime
# 这个db是全新的SQLAlchemy对象
from views import db
class Cab_review(db.Model):
__tablename__ = 'cab_review' # 表名
# 写字段
uuid = Column(String(100), nullable=False, primary_key=True, default=str(uuid4()).replace('-', ''))
cab_number = Column(String(64), nullable=True, comment='工单号')
review_item = Column(String(100), comment='项目名') # String(32) == varchar(32)
review_info = Column(Text, comment='项目信息')
review_link = Column(String(200), nullable=True, comment='项目链接')
caller_id = Column(String(100), nullable=True, comment='调用方标识ID')
caller_ip = Column(String(100), nullable=True, comment='调用方IP')
updated_time = Column(DateTime, default=datetime.datetime.now, nullable=False, comment='更新时间')
test_number = Column(String(64), nullable=True, comment='测试号')
if __name__ == '__main__': # 导入时不执行下面的语句,只有执行当前文件的时候才会执行下面的代码
# 注意要在main下面导入create_app,如果在全局导入,还是会发生循环导入的问题
from views import create_app
app = create_app()
# 先清空再创建
db.drop_all()
db.create_all(app=app)
# 做一个简单的封装 db,使用with 语句
class Session:
def __init__(self):
self.db = None
def __enter__(self):
self.db = db.session
return self.db
def __exit__(self, exc_type, exc_val, exc_tb):
if self.db:
self.db.close()
cab_api_test/views/cab_api.py
from flask import jsonify, request
import json, datetime
from uuid import uuid4
from views.logger import flask_logger
from views.filed_check import field_check
from flask import Blueprint
# Session 是我封装好的 db
from views.models import Cab_review, Session
cab_api = Blueprint('cab_api', __name__)
@cab_api.route('/', methods=['POST', ])
def main():
# 传入的参数
data = request.get_json()
cno_test = data.get('cno_test')
review_pro = data.get('review_pro')
review_result = data.get('review_result')
review_advice = data.get('review_advice')
flask_logger.info(f'请求体:{data}')
data['cno'] = '已废弃' if not data.get('cno') else data.get('cno')
data['caller_ip'] = request.remote_addr
data['url'] = data.get('url') if data.get('url') else ''
data['caller_id'] = data.get('caller_id') if data.get('caller_id') else '调用方未提供caller_id'
# 校验字段
ok, msg = field_check(cno_test, review_pro, review_result, review_advice)
if not ok:
flask_logger.info(
'-------------------------------------------------------------------------------------------------------')
return jsonify(msg)
# 查询cab_review中的数据
with Session() as session:
select_data = session.query(Cab_review).filter_by(test_number=cno_test,
review_item=review_pro).all() # [queryset,queryset],数据为空则 []
session.begin() # 开启事务
if not select_data:
try:
cab_review = Cab_review(uuid=str(uuid4()).replace('-', ''),
cab_number=data.get('cno'),
review_item=data.get('review_pro').strip(),
review_info=json.dumps(
{'review_result': data.get('review_result'),
'review_advice': data.get('review_advice')},
ensure_ascii=False),
review_link=data.get('url'),
caller_id=data.get('caller_id'),
caller_ip=data.get('caller_ip'),
test_number=data.get('cno_test'))
session.add(cab_review)
session.commit()
flask_logger.info('数据插入成功')
msg = '请求成功,数据已添加或更新'
is_success = True
except Exception as e:
session.rollback()
flask_logger.error(f'添加失败:{e}', exc_info=False)
msg = '数据添加或更新失败,请联系管理员'
is_success = False
else:
try:
session.query(Cab_review).filter_by(test_number=data.get('cno_test'),
review_item=data.get('review_pro').strip()).update(
{'review_info': json.dumps(
{'review_result': data.get('review_result'), 'review_advice': data.get('review_advice')},
ensure_ascii=False), 'review_link': data.get('url'),
'caller_id': data.get('caller_id'),
'caller_ip': data.get('caller_ip'), 'updated_time': datetime.datetime.now()})
session.commit()
flask_logger.info('数据更新成功')
msg = '请求成功,数据已添加或更新'
is_success = True
except Exception as e:
session.rollback()
flask_logger.error(f'更新失败:{e}', exc_info=False)
msg = '数据添加或更新失败,请联系管理员'
is_success = False
review_name = ['评审项:' + review_pro, '评审结果:' + review_result, '评审意见:' + review_advice,
'评审链接:' + data.get('url')]
flask_logger.info(
'-------------------------------------------------------------------------------------------------------')
return {'code': 200, 'success': is_success, 'msg': msg, 'data': '\n'.join(review_name)}
@cab_api.route('/', methods=['GET', ])
def health_check():
return 'ok'
这样就不会在发生导入死循环的问题啦!!!