两种方式实现websocket获取数据库查询进度

news2024/11/24 11:12:41

两种方式实现websocket获取数据库查询进度

本文实现了两种方式用websocket连接实现前端通过API获取数据库查询进度,作为websocket初步学习的参考

内容目录概要:

  1. 使用额外接口获取指定数据库查询进度,查询进度的接口与获取数据的接口分开实现
  2. 查询数据的同时可以同时返回进度
  3. 使aiomysql支持pandas的read_sql_query方法

食用前提:

  • sanic web服务
  • pandas分块读取mysql数据库,示例中伪造了dataframe代替数据库中的数据,pandas异步查询mysql的实现方式放在最后额外的part

第一种方式,查询进度的接口和获取数据的接口分开实现

# -*- coding: utf-8 -*-
# -*- coding: utf-8 -*-
import asyncio
import json

import pandas as pd
from sanic import Sanic
from sanic import Websocket, Request

app = Sanic("sanic")

# 这里为了简单,将进度存储设置成了全局变量容器,合理的做法可以放到一个可共用的类、第三方存储位置等,并设置过期自动清理
query_progress = {}


class MysqlDatabaseQuery:
    async def read_query(self):
        data = pd.DataFrame({"id": [3, 5, 8], "name": ["a", "b", "c"]}, index=[1, 2, 3])
        rowcount = len(data)
        for idx, row in data.iterrows():
            await asyncio.sleep(1)
            rownumber = idx
            yield dict(row), f"{(rownumber / rowcount): .2%}"


# 方式一
# 尝试单独用一个接口来查询数据库query的进度
async def read_query_progress(requests: Request, ws: Websocket):
    query_id = requests.args.get("query_id")
    while True:
        progress = query_progress.get(str(query_id)) or "0%"
        if ws.recv_cancel:
            break
        _ = await ws.recv()
        await ws.send(progress)


# 异步处理的任务:储存数据库查询的进度
async def dispatch_query_progress(**context):
    global query_progress
    query_progress = context


# 获取的数据接口
async def read_query(requests: Request, ws: Websocket):
    data_reader = MysqlDatabaseQuery().read_query()
    records = []
    query_id = "1"  # 需要提前定义好查询id
    async for record, progress in data_reader:
        records.append(record)
        # sanic提供的发布信号并在后台执行任务的方法,定义事件名称,传递参数,任务会异步执行:https://sanic.dev/zh/guide/advanced/signals.html#%E6%B7%BB%E5%8A%A0%E4%BF%A1%E5%8F%B7-adding-a-signal
        await requests.app.dispatch("query.read.progress", context={query_id: progress})
        if ws.recv_cancel:
            break
    else:
        await ws.send(json.dumps(records))


app.add_websocket_route(read_query, "/query1")
app.add_websocket_route(read_query_progress, "/read_progress1")
app.add_signal(dispatch_query_progress, "query.read.progress")  # 添加接收信号后的处理方法handler到app中

if __name__ == '__main__':
    app.run(auto_reload=True, dev=True)

接口调用效果:

# 查询接口ws://127.0.0.1:8000/query1的返回数据:
[{"id": 3, "name": "a"}, {"id": 5, "name": "b"}, {"id": 8, "name": "c"}]

在查询接口查询的过程中,调用进度接口
在这里插入图片描述

第二种方式,数据边查询边返回进度

# -*- coding: utf-8 -*-
import asyncio
import json

import pandas as pd
from sanic import Sanic
from sanic import Websocket, Request

app = Sanic("sanic")

# 这里为了简单,将进度存储设置成了全局变量容器,合理的做法可以放到一个可共用的类、第三方存储位置等,并设置过期自动清理
query_progress = {}


class MysqlDatabaseQuery:
    async def read_query(self):
        data = pd.DataFrame({"id": [3, 5, 8], "name": ["a", "b", "c"]}, index=[1, 2, 3])
        rowcount = len(data)
        for idx, row in data.iterrows():
            await asyncio.sleep(1)
            rownumber = idx
            # 这里将row转成了dataframe,模拟了pd.read_sql_query(sql, chunksize=...)传递了chunksize参数时得到生成器(不是异步生成器,但是文章最后会介绍将pd.read_sql_query(sql, chunksize=...)修改成异步生成器的方式)
            yield row.to_frame().T, f"{(rownumber / rowcount): .2%}"


# 方式二:
# 边读取数据边返回进度,不需要额外维护进度的存储,也不需要定义query_id,这种方式感觉更优雅
async def read_query(requests, ws: Websocket):
    data_reader = MysqlDatabaseQuery().read_query()
    df = pd.DataFrame()
    progress = ''
    async for frame_part, progress in data_reader:
        df = pd.concat([df, frame_part], ignore_index=True)
        data = await ws.recv(0.0001)  # 只等待一小会儿,如果没有接收到progress的请求就继续下一轮循环,等了但没完全等(bushi
        if data == "progress":
            await ws.send(progress)
        if ws.recv_cancel:
            break
    else:
        await ws.send(progress)     # 如果必须保证最后100%要送给前端,可以加上这一句,虽然可能会重复,但在前端展示时不会有啥影响
        await ws.send(df.to_json(orient="records"))


app.add_websocket_route(read_query, "/query2")

if __name__ == '__main__':
    app.run(auto_reload=True, dev=True)

接口调用效果:
在这里插入图片描述
有可能会多返回1个100%,不过在前端展示时应该没什么变化

对比两种实现方式,第二种显然更简便优雅一下,不过如果想要在当前查询之外还能随时查询某个数据库查询的进度时,就必须用第一种实现方式了
此外需要注意的是:如果使用第一种实现方式,还需要考虑query_id的定义以及储存的查询进度应该设置一定的过期时间,过期之后就删除,否则可能会导致数据一直增长而占用内存过多的情况

既然是使用异步框架,查询就应该支持异步查询

因此顺便介绍一种实现了可使用aiomysql连接的pandas read_query方式:
aiomysql_pool

# -*- coding:utf-8 -*-
from __future__ import annotations

import asyncio
import logging
from abc import ABC
from asyncio import AbstractEventLoop

import aiomysql
import pandas as pd
from pandas._typing import DtypeArg
from pypika.queries import QueryBuilder

from aio_pandas_database import AIOPandasSQLDatabase

loop_ = asyncio.get_event_loop()


class AIOMysqlPool(aiomysql.Pool, ABC):
    def __init__(self, minsize=1, maxsize=10, echo=False, pool_recycle=-1, loop=None,
                 host="localhost", port=3306, user="root", password="root", db="services",	# 改成自己数据库的信息
                 cursorclass=aiomysql.DictCursor,
                 **kwargs) -> None:
        """
        Parameters
        ----------
        minsize
        maxsize
        echo
        pool_recycle: int
            值应该>=-1,-1表示不会回收
            代表一个连接多久未被使用了之后就回收,如果此时再获取连接会返回一个新的连接
        loop
        host
        port
        user
        password
        database
        kwargs
        """
        super().__init__(minsize, maxsize, echo, pool_recycle, loop,
                         host=host, port=int(port), user=user, password=password, db=db, cursorclass=cursorclass,
                         **kwargs)
        self._loop = loop or asyncio.get_event_loop()

    async def execute(self, sql, query_params=None, is_fetchone=False):
        await asyncio.sleep(1)
        if isinstance(sql, QueryBuilder):
            sql = sql.get_sql()
        query_params = query_params or ()
        async with self.acquire() as conn:
            async with conn.cursor() as cursor:
                try:
                    row = await cursor.execute(sql, query_params)
                    res = await cursor.fetchall()
                    await conn.commit()  # update时需要更新,但其他类型查询例如select时commit也无妨
                    if is_fetchone:
                        assert row <= 1
                        return res[0] if row else None
                    return row, res
                except Exception as e:
                    logging.error(
                        f"SQL execution met an unexpected error, "
                        f"hint:\n {e}\n, "
                        f"the error query sql is: \n{sql}\n"
                    )
                    await conn.rollback()
                    raise e

    def get_loop(self) -> AbstractEventLoop:
        return self._loop

    async def fetchone(self, sql):
        res = await self.execute(sql, is_fetchone=True)
        return res

    async def fetchall(self, sql):
        row, res = await self.execute(sql)
        return row, res

    async def update(self, sql):
        row, res = await self.execute(sql)
        return row, res

    async def delete(self, sql):
        row, res = await self.execute(sql)
        return row, res

    async def async_read_query(
            self,
            sql,
            ws=None,
            index_col=None,
            coerce_float: bool = True,
            params=None,
            parse_dates=None,
            chunksize: int | None = None,
            dtype: DtypeArg | None = None,
    ):
        conn = await self.acquire()
        aiodb = AIOPandasSQLDatabase(conn)
        frame_or_async_generator = await aiodb.async_read_query(
            sql,
            None,
            index_col,
            coerce_float,
            params,
            parse_dates,
            chunksize,
            dtype
        )
        if chunksize and ws is not None:
            frames = [pd.DataFrame()]
            progress = ''
            async for frame_part in frame_or_async_generator:
                progress = conn.progress
                frames.append(frame_part)
                data = await ws.recv(0.0001)  # 只等待一小会儿,如果没有接收到progress的请求就继续下一轮循环,等了但没完全等(bushi
                if data == "progress":
                    await ws.send(progress)
                if ws.recv_cancel:
                    break
            else:
                await ws.send(progress)  # 如果必须保证最后100%要送给前端,可以加上这一句,虽然可能会重复,但在前端展示时不会有啥影响
            df = pd.concat(frames, ignore_index=True)
            return df
        return frame_or_async_generator


    async def async_read_query2(
            self,
            sql,
            app=None,
            index_col=None,
            coerce_float: bool = True,
            params=None,
            parse_dates=None,
            chunksize: int | None = None,
            dtype: DtypeArg | None = None,
    ):
        conn = await self.acquire()
        aiodb = AIOPandasSQLDatabase(conn)
        frame_or_async_generator = await aiodb.async_read_query(
            sql,
            app,
            index_col,
            coerce_float,
            params,
            parse_dates,
            chunksize,
            dtype
        )
        return frame_or_async_generator


p = AIOMysqlPool()


async def run_test():
    # res = await p.fetchone("select 1;")
    # print(res)
    row, res = await p.execute("select 2 as a, 3 as b union all select 1 as a, 2 as b;")
    print(row, res)
    try:
        row, res = await p.execute("select 2 as a, 3 as b union all select 1 as a, 2 as b;", is_fetchone=True)
        print(row, res)
    except AssertionError as e:
        print(f"查询的结果超过1行")


async def run_test2():
    sql = """select * from test_table"""

    tasks = []
    for i in range(3):
        tasks.append(p.execute(sql))
    results = await asyncio.gather(*tasks)
    for result in results:
        print(result)


async def run_update():
    u_sql = """
    update test_table set `name` = 'abc' where id = 1
    """
    row, res = await p.execute(u_sql)
    print(row, res)

    s_sql = """
    select * from test_table where `name` = 'abc'
    """
    row, res = await p.execute(s_sql)
    print(row, res)


async def rsq_t():
    sql = """select * from test_table"""

    tasks = []
    for i in range(3):
        tasks.append(p.async_read_query(sql))
    results = await asyncio.gather(*tasks)
    for result in results:
        print(result)


async def run_async_stream_query():
    sql = """select * from test_table"""
    df = await p.async_read_query(sql, chunksize=1)
    print(df)


async def run_async_stream_query2():
    sql = """select * from test_table"""
    ag = await p.async_read_query2(sql, chunksize=1)
    while True:
        try:
            r = await ag.__anext__()
            print(r)
        except StopAsyncIteration:
            print("break")
            break


async def run_async_stream_query3():
    sql = """select * from test_table"""
    async for f in await p.async_read_query(sql, chunksize=1):
        print(f)


if __name__ == '__main__':
    loop_.run_until_complete(run_test())
    loop_.run_until_complete(run_test2())
    loop_.run_until_complete(run_update())
    loop_.run_until_complete(rsq_t())
    loop_.run_until_complete(run_async_stream_query())
    loop_.run_until_complete(run_async_stream_query2())
    loop_.run_until_complete(run_async_stream_query3())

async pandas read_query,实现了将pandas的read_sql_query支持aiomysql:

# aio_pandas_database.py
# -*- coding: utf-8 -*-
from __future__ import annotations

import asyncio

from pandas._typing import DtypeArg
from pandas.io.sql import _convert_params, _wrap_result, DatabaseError


class AIOPandasSQLDatabase:
    def __init__(self, con):
        self.con = con
        self.con.progress = None

    async def _fetchall_as_list(self, cur):
        result = await cur.fetchall()
        if not isinstance(result, list):
            result = list(result)
        return result

    async def async_read_query(
            self,
            sql,
            app,
            index_col=None,
            coerce_float: bool = True,
            params=None,
            parse_dates=None,
            chunksize: int | None = None,
            dtype: DtypeArg | None = None,
    ):
        try:
            async with self.con as conn:
                async with conn.cursor() as cursor:
                    args = _convert_params(sql, params)
                    await cursor.execute(*args)
                    columns = [col_desc[0] for col_desc in cursor.description]
                    if chunksize is not None:
                        return self._query_iterator(
                            cursor,
                            app,
                            chunksize,
                            columns,
                            index_col=index_col,
                            coerce_float=coerce_float,
                            parse_dates=parse_dates,
                            dtype=dtype,
                        )
                    else:
                        data = await self._fetchall_as_list(cursor)
                        frame = _wrap_result(
                            data,
                            columns,
                            index_col=index_col,
                            coerce_float=coerce_float,
                            parse_dates=parse_dates,
                            dtype=dtype,
                        )
                        return frame
        except Exception as exc:
            try:
                await conn.rollback()
            except Exception as inner_exc:  # pragma: no cover
                ex = DatabaseError(
                    f"Execution failed on sql: {args[0]}\n{exc}\nunable to rollback"
                )
                raise ex from inner_exc

            ex = DatabaseError(f"Execution failed on sql '{args[0]}': {exc}")
            raise ex from exc

    async_read_sql = async_read_query

    # @staticmethod
    async def _query_iterator(
            self,
            result,
            app,
            chunksize: int,
            columns,
            index_col=None,
            coerce_float=True,
            parse_dates=None,
            dtype: DtypeArg | None = None,
    ):
        """Return generator through chunked result set"""
        has_read_data = False
        query_id = "1"
        while True:
            # 本地读取太快了,sleep来方便调试
            data = await result.fetchmany(chunksize)
            # 这种进度的方式设定存在一个问题:一个connection中多个查询开了事务分布执行语句会导致进度(大概会,没验证过)
            progress = f"{(result.rownumber/result.rowcount): .2%}"
            self.con.progress = progress
            if app is not None:
                await app.dispatch("query.read.progress", context={query_id: progress})
            await asyncio.sleep(1)
            if not data:
                if not has_read_data:
                    yield _wrap_result(
                        [],
                        columns,
                        index_col=index_col,
                        coerce_float=coerce_float,
                        parse_dates=parse_dates,
                    )
                break
            else:
                has_read_data = True
                yield _wrap_result(
                    data,
                    columns,
                    index_col=index_col,
                    coerce_float=coerce_float,
                    parse_dates=parse_dates,
                    dtype=dtype,
                )

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/20775.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

SAP S4 FI后台详细配置教程- PART3 (财务凭证相关配置篇)

本篇主要介绍凭证相关的配置&#xff0c;希望对学习SAP财务的同学有帮助。 1、定义凭证类型 概念功能说明&#xff1a; • 凭证类型是区分不同交易类型的方法并决定能够被过帐的会计形式。 例如&#xff0c; 可将所有的会计凭证按业务类分成&#xff1a; 总帐凭证、收款…

基础选择器

一、任务目标 掌握基础选择器的应用 二、任务背景 CSS&#xff08;层叠样式表&#xff09;选择器是CSS规则的一部分&#xff0c;用来指定需要设置样式的HTML元素。通过选择器可以实现CSS对HTML元素的一对一、一对多、多对一的控制。 三、任务内容 选择器类型 描述 示例 通配选…

概率论基础

一、条件概率的三大公式 条件概率中的条件就代表观测变量&#xff0c;观测变量意思就是这个变量的取值是否已经定下来了 1.乘法公式 2.全概率公式 随机现象&#xff1a;在一定的条件下&#xff0c;并不总出现相同结果的现象称为随机现象。随机现象的各种结果会表现出一定的…

分类效果评价(机器学习)

目录 准确率 精确率(precision) 召回率(recall&#xff0c;也称为查全率) 调回平均 对于一般分类问题&#xff0c;有训练误差、泛化误差、准确率、错误率等指标 对于常见的二分类问题&#xff0c;样本只有两种分类结果&#xff0c;将其定义为正例与反例。 那么在进行分类…

基于java大学生就业信息管理系统

随着高校教育体制的改革大学生人数的不断增加&#xff0c;毕业生就业制度发生了根本的变化。单位和学生走向人才市场&#xff0c;双向选择&#xff0c;择优录用。因此在这样的情况下&#xff0c;在INTERNET上开发并运行信息管理系统就能够极大地提高工作效率&#xff0c;弥补了…

VMware 网络模式

VMware提供了三种网络工作模式&#xff0c;它们分别是&#xff1a; Bridged&#xff08;桥接模式&#xff09; NAT&#xff08;网络地址转换模式&#xff09; Host-Only&#xff08;仅主机模式&#xff09; 1、桥连接模式 【NAT 设置】【DHCP 设置】不可编辑 2、仅主机模式 【自…

MySQL并发事务会引起的问题

MySQL事务并发的问题主要分为以上三种 脏读: 比如 事务A 对用户表进行了 一次查询 和一次修改 他将用户1的 部门 从 部门1 改为了 部门2 但事务A 并没有提交 然后事务B 只做了一步查询 查用户表 此时 如果出现脏读 则 事务B查到的 用户1 的所属部门是 部门2 而 这是 事务A其实…

Python画小仓鼠

肉嘟嘟的小动物很是可爱&#xff0c;本文介绍运用Python中的turtle库控制函数绘制小仓鼠。    文章目录一、效果展示二、代码详解1 导入库2 播放音乐3 定义画小仓鼠头的函数4 定义画左眼和右眼的函数5 定义画嘴的函数一、效果展示 在介绍代码之前&#xff0c;先来看下本文的实…

mPEG-Dendro Azide,mPEG-Dendro N3,甲氧基聚乙二醇树状叠氮化物bisMPA树状大分子供应

1、名称 英文&#xff1a;mPEG-Dendro Azide&#xff0c;mPEG-Dendro N3 中文&#xff1a;甲氧基-聚乙二醇-树状叠氮化物 2、CAS编号&#xff1a;N/A 3、所属分类&#xff1a; Azide PEG Methoxy PEG 4、分子量&#xff1a;可定制&#xff0c;甲氧基-PEG-树状叠氮化物 200…

Linux下文件和目录的基础操作

文章目录一、Linux 下文件和目录的特点二、 计算机中文件大小的表示方式三、 ls 命令四、切换目录五、 相对路径和绝对路径六、创建和删除1、touch 创建文件2、mkdir 创建目录3、rm 删除文件和目录七、 查看、移动和复制文件1、tree 浏览目录结构2. cp 复制文件和目录3. mv 移动…

Linux下C/C++实现以十六进制的形式显示命令(xxd)

如果你需要在linux文本文件的十六进制转储&#xff1f;且正在寻找可以执行此操作的命令行实用程序&#xff0c;xxd的命令可以为你做这件事。xxd命令将文件显示为十六进制值和ASCII表示&#xff0c;并允许对其进行编辑。 xxd - 以十六进制形式表示 xxd程序接受文件或标准输入&…

python--函数

目录函数1.1 自定义函数1.2 调用函数1.3 作用域函数 1.1 自定义函数 无参数、无返回值 def function():表达式无参数、有返回值 def function():表达式return 需返回的值tips&#xff1a;函数中可以有多个return语句&#xff0c;但是只要执行一个return语句&#xff0c;就意…

PPT+Visio复现顶刊三维流程图

复现 论文中的图3&#xff0c;改图是研究流程&#xff0c;主要讲了神经网络的流程。 A future land use simulation model (FLUS) for simulating multiple land use scenarios by coupling human and natural effects https://doi.org/10.1016/j.landurbplan.2017.09.019 1.…

【Touchstone 1.02.0数据格式解析】

Touchstone 1.0&2.0数据格式解析 在进行S参数仿真时&#xff0c;一般存储的S参数模型为SnP格式&#xff0c;如双端口模型为S2P格式&#xff0c;四端口模型为S4P格式。了解SnP格式的具体要求&#xff0c;对于S参数的应用具有重要作用。 本质上&#xff0c;S参数是由S参数矩…

对比Python,PySpark 大数据处理其实更香

对于数据分析师、数据科学家和任何使用数据的人来说&#xff0c;能够熟练而有效地处理大数据是一项非常有优势的技能。 如果你已经熟悉运用 Python 和 pandas 做常规数据处理&#xff0c;并且想学习处理大数据&#xff0c;那么熟悉 PySpark&#xff0c;并将用其做数据处理&…

免费搜题系统搭建

免费搜题系统搭建 本平台优点&#xff1a; 多题库查题、独立后台、响应速度快、全网平台可查、功能最全&#xff01; 1.想要给自己的公众号获得查题接口&#xff0c;只需要两步&#xff01; 2.题库&#xff1a; 查题校园题库&#xff1a;查题校园题库后台&#xff08;点击跳…

C语言详细知识点复习(上)

文章目录一、C语言概述1、C语言的主要特点2、算法的概念及特点二、C程序设计的基础语法1、常量和变量2、数据类型3、运算符和表达式4、C 语句5、数据的输入和输出三、选择结构四、循环结构1、循环结构2、break\continue3、循环程序举例一、C语言概述 1、C语言的主要特点 程序…

微信小程序最新用户头像昵称获取规则调整应对措施(2022)

目录一、调整二、应对措施2.1 更新头像2.2 更新昵称三、完整代码一、调整 小程序用户头像昵称获取规则调整公告 以前通过wx.getUserProfile获取用户信息&#xff0c;用户点击同意以后&#xff0c;便可以直接获取相关信息&#xff0c;但是官方最近做出了调整&#xff0c;直接将…

图书管理系统

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 目录 文章目录 前言 一.界面设计 1.管理员菜单 2.用户菜单 3.用户操作&#xff1a; 查找图书借阅图书归还图书显示图书 4.管理员操作 查找图书新增图书删除图…

【Matlab】Matlab导入多个.mat文件并画图的过程详解

Matlab导入多个.mat文件并画图的过程详解0. 实验背景1. 导入.mat文件存储1.1 导入.mat文件及作图最简单的方式&#xff1a;1.2 导入.mat文件及作图的脚本代码2. plot画图总结2.1 画散点图2.1.1 点形状2.1.2 点大小2.1.3 点颜色2.1.4 点填充2.2 画折线图2.2.1 折线形状2.2.2 折线…