Flask使用线程异步执行耗时任务

news2025/2/28 6:11:42

1 问题说明

1.1 任务简述

在开发Flask应用中一定会遇到执行耗时任务,但是Flask是轻量级的同步框架,即在单个请求时服务会阻被塞,直到任务完成(注意:当前请求被阻塞不会影响到其他请求)。

解决异步问题有两种思路,一种是借助外部工具实现异步,例如:消息队列(RabbitMQ)、 异步任务队列(Celery+Redis);另一种借助Python中的进程、线程或协程解决异步。我的是小项目选择因此选择了第二种方法。

经过测试,我在Flask中使用协程(gevent)会被阻塞;使用进程(multiprocessing)不会被阻塞,操作数据库出了问题(有可能是我没操作正确的问题);最后选择使用线程(threading)。

注意:使用线程会出现线程安全问题,

1.2 注意的问题

(1)线程安全

"""
!!! 注意
1 由于flask-SQLAlchemy对SQLAlchemy进行了封装,所以是线程安全的,可以在线程中直接使用;
2 原生SQLAlchemy中
        # 2.1 直接使用Session是线程不安全的,不推荐使用
        self.session = Session(self.engine)

        # 2.2 直接使用scoped_session是线程安全的,推荐使用
        # 获取sessionmaker
        session_factory = sessionmaker(engine)
        # scoped_session是线程安全的
        # 注意,此session不能使用Query对象
        session = scoped_session(session_factory)
        
!!!
"""

(2)使用数据库

# 注意:高版本的Flask-SQLAlchemy(我的版本Flask-SQLAlchemy==3.1.1,SQLAlchemy==2.0.16)必须使用”with app.app_context()“,本质原因是Flask关联的SQLAlchemy版本太高
# 否则无法插入数据库并报错,错误内容如下(低版本不会出现此问题):
"""
This typically means that you attempted to use functionality that needed
the current application. To solve this, set up an application context
with app.app_context(). See the documentation for more information.
"""

2 工程布局

项目布局如下:
在这里插入图片描述

3 源代码

(1)main.py

from blueprint import init_blueprint
from config_app import app
from config_db import init_mysql_db


# 初始化MySQL
init_mysql_db()

init_blueprint()

if __name__ == '__main__':
    app.run(host='0.0.0.0', debug=True)

(2)config_app.py

from flask import Flask
from flask_cors import CORS


def create_app():
    flask_app = Flask(__name__)
    CORS(flask_app, supports_credentials=True)
    return flask_app


app = create_app()

(3)config_db.py


from flask_sqlalchemy import SQLAlchemy
from flask_marshmallow import Marshmallow

# 添加pymysql驱动,连接MySQL数据库
import pymysql

from config_app import app

pymysql.install_as_MySQLdb()

"""
!!! 注意
1 由于flask-SQLAlchemy对SQLAlchemy进行了封装,所以是线程安全的,可以在线程中直接使用;
2 原生SQLAlchemy中
        # 2.1 直接使用Session是线程不安全的,不推荐使用
        self.session = Session(self.engine)

        # 2.2 直接使用scoped_session是线程安全的,推荐使用
        # 获取sessionmaker
        session_factory = sessionmaker(engine)
        # scoped_session是线程安全的
        # 注意,此session不能使用Query对象
        session = scoped_session(session_factory)
        
!!!
"""

# 创建MySQL单实例
mysql_db = SQLAlchemy()

# 创建Schema
mysql_schema = Marshmallow()


# 创建数据库
"""
create database async default character set utf8mb4 collate utf8mb4_unicode_ci;
"""

class MysqlConf:
    acc = "root"
    pwd = "123456"
    host = "192.168.108.200"
    port = 3306
    db = "async"


mysql_conf = MysqlConf()


# 初始化MySQL数据库
def init_mysql_db():

    # 配置MySQL数据库url
    db_url = "mysql://" + mysql_conf.acc + ":" + mysql_conf.pwd + "@" + mysql_conf.host + ":" + str(mysql_conf.port) + "/" + mysql_conf.db
    app.config["SQLALCHEMY_DATABASE_URI"] = db_url

    # 关闭sqlalchemy自动跟踪数据库
    app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False

    # 显示底层执行的SQL语句
    app.config['SQLALCHEMY_ECHO'] = True

    # 解决‘No application found. Either work inside a view function or push an application context.’
    app.app_context().push()

    # 初始化app
    mysql_db.init_app(app)

    # 初始化schema
    mysql_schema.init_app(app)


# 初始化table
def init_table():
    # 删除表
    mysql_db.drop_all()
    # 创建表
    mysql_db.create_all()

(4)dao.py

import datetime

from config_db import mysql_db as db


# Create table of bm_record
class User(db.Model):
    # Record table
    __tablename__ = "as_user"

    id = db.Column("us_id", db.Integer, nullable=False, primary_key=True, autoincrement=True)
    name = db.Column("us_name", db.String(100))
    age = db.Column("us_age", db.Integer)
    create_time = db.Column("us_create_time", db.DateTime, default=datetime.datetime.now)


# 插入数据
def insert_record_dao(user: User):
    # Add data
    db.session.add(user)
    # Commit data
    db.session.commit()

(5)blueprint.py

# 构建蓝本
import threading
import time
from concurrent.futures import ThreadPoolExecutor

from flask import Blueprint, jsonify

from config_app import app
from config_db import init_table
from dao import insert_record_dao, User

user = Blueprint("user", __name__)


# 注册蓝本
def init_blueprint():
    app.register_blueprint(user, url_prefix='/user')


@user.route("/initdb")
def init_db():
    init_table()
    return jsonify("success")


@user.route("/add")
def add_user():
    user: User = User()
    user.name = "zhangsan"
    user.age = 12

    time.sleep(10)
    insert_record_dao(user)

    return jsonify(user.id)


executor = ThreadPoolExecutor(3)
@user.route("/thread_pool")
def run_task_thread_pool():
    executor.submit(_run_thread_pool)
    return jsonify("success")


# 执行线程
def _run_thread_pool():
    print("Run thread pool")
    time.sleep(2)
    user: User = User()
    user.name = "thread pool"
    user.age = 12

    # 注意:必须使用”with app.app_context()“,否则无法插入数据库,并且不会报错
    with app.app_context():
        insert_record_dao(user)

    print("End thread pool")
    pass


@user.route("/thread")
def run_task_thread():
    task = threading.Thread(target=_run_thread, name='thread-01')
    task.start()

    return jsonify("success")


# 执行线程
def _run_thread():
    print("Run thread")
    time.sleep(2)

    user: User = User()
    user.name = "thread"
    user.age = 12

    # 注意:高版本的Flask-SQLAlchemy(我的版本Flask-SQLAlchemy==3.1.1,SQLAlchemy==2.0.16)必须使用”with app.app_context()“,本质原因是Flask关联的SQLAlchemy版本太高
    # 否则无法插入数据库并报错,错误内容如下(低版本不会出现此问题):
    """
    This typically means that you attempted to use functionality that needed
    the current application. To solve this, set up an application context
    with app.app_context(). See the documentation for more information.
    """
    with app.app_context():
        insert_record_dao(user)

    print("End thread")
    pass

4 请求截图

(1)初始化数据库

在这里插入图片描述

(2)添加用户

在这里插入图片描述

(3)使用线程池

在这里插入图片描述

(4)使用线程

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1278238.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

C语言--每日选择题--Day32

如果大家对读研究生和就业不知道如何抉择,我的建议是看大家的经济基础,如果家里不是很需要你们工作,就读研提升自己的学历,反之就就业;毕竟经济基础决定上层建筑; 第一题 1. 下面代码的结果是:…

Paxos 算法

Paxos 算法 介绍 Paxos 算法是第一个被证明完备的分布式系统共识算法。共识算法的作用是让分布式系统中的多个节点之间对某个提案(Proposal)达成一致的看法。提案的含义在分布式系统中十分宽泛,像哪一个节点是 Leader 节点、多个事件发生的…

百马百担c语言编程

以下是一个百马百担问题的C语言编程实现&#xff1a; #include <stdio.h>int main() { int n, m, k; scanf("%d%d%d", &n, &m, &k); int a[n], b[m], c[k]; for (int i 0; i < n; i) { scanf("%d", &a[i]);…

LangChain 18 LangSmith监控评估Agent并创建对应的数据库

LangChain系列文章 LangChain 实现给动物取名字&#xff0c;LangChain 2模块化prompt template并用streamlit生成网站 实现给动物取名字LangChain 3使用Agent访问Wikipedia和llm-math计算狗的平均年龄LangChain 4用向量数据库Faiss存储&#xff0c;读取YouTube的视频文本搜索I…

ELK高级搜索,深度详解ElasticStack技术栈-下篇

前言&#xff1a;ELK高级搜索&#xff0c;深度详解ElasticStack技术栈-上篇 14. search搜索入门 14.1. 搜索语法入门 14.1.1 query string search 无条件搜索所有 GET /book/_search结果&#xff1a; {"took" : 969,"timed_out" : false,"_shar…

大数据湖项目建设方案:文档全文101页,附下载

关键词&#xff1a;大数据解决方案&#xff0c;数据湖解决方案&#xff0c;数据治理解决方案&#xff0c;数据中台解决方案 一、大数据湖建设思路 1、明确目标和定位&#xff1a;明确大数据湖的目标和定位是整个项目的基础&#xff0c;这可以帮助我们确定项目的内容、规模、所…

Mybatis 分页查询的三种实现

Mybatis 分页查询 1. 直接在 sql 中使用 limit2. 使用 RowBounds3. 使用 Mybatis 提供的拦截器机制3.1 创建一个自定义拦截器类实现 Interceptor3.2 创建分页查询函数 与 sql3.3 编写拦截逻辑3.4 注册 PageInterceptor 到 Mybatis 拦截器链中3.5 测试 准备一个分页查询类 Data…

算法工程师面试八股(搜广推方向)

文章目录 机器学习线性和逻辑回归模型逻辑回归二分类和多分类的损失函数二分类为什么用交叉熵损失而不用MSE损失&#xff1f;偏差与方差Layer Normalization 和 Batch NormalizationSVM数据不均衡特征选择排序模型树模型进行特征工程的原因GBDTLR和GBDTRF和GBDTXGBoost二阶泰勒…

报错:执行sudo gedit时 No protocol specifiedUnable to init server: 无法连接: 拒绝连接

1.问题描述 在执行sudo gedit编辑文件时&#xff0c;报错连接不上服务&#xff1a; 2.问题解决 2.1先安装Vncserver sudo apt-get update sudo apt-get install tightvncserver2.2执行 vncserver 按提示输入密码&#xff0c;不宜过短 2.3若出现提示warning 则按提示执行&…

打印元素绘制协议Java实现

我一直提倡的面向接口和约定编程&#xff0c;而打印元素绘制协议一直是我推荐的打印实现方式&#xff0c;我以前只是强调按打印元素绘制协议输出数据就行了&#xff0c;有实现程序按协议控制打印&#xff0c;说是可以用任何语言实现客户端程序而不影响打印业务&#xff0c;那么…

C++——初始化列表

初始化列表&#xff1a;一一个冒号开始&#xff0c;接着是一个以逗号分隔的数据成员列表&#xff0c;每个“成员变量”后面跟一个放在括号中的初始值或表达式。 #include <iostream> using namespace std; class Date { public:Date(int year, int month, int day): _ye…

国标GBT 27930关键点梳理

1、充电总流程 整个充电过程包括六个阶段:物理连接完成、低压辅助上电、充电握手阶段、充电参数配置阶段、充电阶段和充电结束阶段。 在各个阶段,充电机和 BMS 如果在规定的时间内没有收到对方报文或没有收到正确报文,即判定为超时(超时指在规定时间内没有收到对方的完整数据包…

Hdoop学习笔记(HDP)-Part.06 安装OracleJDK

目录 Part.01 关于HDP Part.02 核心组件原理 Part.03 资源规划 Part.04 基础环境配置 Part.05 Yum源配置 Part.06 安装OracleJDK Part.07 安装MySQL Part.08 部署Ambari集群 Part.09 安装OpenLDAP Part.10 创建集群 Part.11 安装Kerberos Part.12 安装HDFS Part.13 安装Ranger …

酷开科技 | 酷开系统,让家庭娱乐方式焕然一新!

在这个快节奏的社会&#xff0c;家庭娱乐已成为我们日常生活中不可或缺的一部分&#xff0c;为了给家庭带来更多欢笑与感动&#xff0c;酷开科技发力研发出拥有丰富内容和技术的智能电视操作系统——酷开系统&#xff0c;它集合了电影、电视剧、综艺、游戏、音乐等海量内容&…

大数据Doris(三十二):Doris高级功能

文章目录 Doris高级功能 一、​​​​​​​表结构变更

基于spring boot电子商务系统

一、 系统总体结构设计 (一) 功能结构图 图1-1 后台管理子系统 图1-2 电子商务子系统功能结构图 (二) 项目结构目录截图&#xff08;例如下图&#xff09; 图 1-3 系统目录图 (三) 系统依赖截图 图 1-2 所有依赖截图 (四) 配置文件 1、 全局配置文件 2、 其他配置文…

链表数组插入排序

InsertSort 插入排序算法&#xff0c;比如打扑克牌的算法时&#xff0c;按照从左到右&#xff0c;找到对应的位置插入排序 最重要的是位置移动 找到对应位置值 #include "iostream" #include "bits/stdc.h"using namespace std;void sort(vector<in…

高级前端面试中的三个 “送命题” !!!

原型与原型链 说到原型&#xff0c;就不得不提一下构造函数&#xff0c;首先我们看下面一个简单的例子&#xff1a; function Dog(name,age){this.name name;this.age age; }let dog1 new Dog("哈士奇",3); let dog2 new Dog("泰迪",2);首先创造空的…

Go GORM简介

GORM&#xff08;Go Object-Relational Mapping&#xff09;是一个用于Go语言的ORM库&#xff0c;它提供了一种简单、优雅的方式来操作数据库。GORM支持多种数据库&#xff0c;包括MySQL、PostgreSQL、SQLite和SQL Server。以下是GORM的一些主要特性 全功能ORM&#xff1a;GORM…

leetCode 47. 全排列 II + 回溯算法 + 图解 + 笔记

给定一个可包含重复数字的序列 nums &#xff0c;按任意顺序 返回所有不重复的全排列 示例 1&#xff1a; 输入&#xff1a;nums [1,1,2] 输出&#xff1a; [[1,1,2],[1,2,1],[2,1,1]] 示例 2&#xff1a; 输入&#xff1a;nums [1,2,3] 输出&#xff1a;[[1,2,3],[1,3,2…