💝💝💝欢迎来到我的博客,很高兴能够在这里和您见面!希望您在这里可以感受到一份轻松愉快的氛围,不仅可以获得有趣的内容和知识,也可以畅所欲言、分享您的想法和见解。
- 推荐:kwan 的首页,持续学习,不断总结,共同进步,活到老学到老
- 导航
- 檀越剑指大厂系列:全面总结 java 核心技术,jvm,并发编程 redis,kafka,Spring,微服务等
- 常用开发工具系列:常用的开发工具,IDEA,Mac,Alfred,Git,typora 等
- 数据库系列:详细总结了常用数据库 mysql 技术点,以及工作中遇到的 mysql 问题等
- 新空间代码工作室:提供各种软件服务,承接各种毕业设计,毕业论文等
- 懒人运维系列:总结好用的命令,解放双手不香吗?能用一个命令完成绝不用两个操作
- 数据结构与算法系列:总结数据结构和算法,不同类型针对性训练,提升编程思维,剑指大厂
非常期待和您一起在这个小小的网络世界里共同探索、学习和成长。💝💝💝 ✨✨ 欢迎订阅本专栏 ✨✨
博客目录
- 一.需求说明
- 1.需求
- 2.实现方式
- 3.asyncio 介绍
- 二.实现步骤
- 1.异步终止任务
一.需求说明
1.需求
我有一个很耗时的异步任务,我希望能通过接口的方式终止这个异步任务,在 python 中如何实现呢?
2.实现方式
可以使用 asyncio 相关的 api 进行实现
- 创建并运行异步任务
- 取消异步任务
- api 地址
3.asyncio 介绍
asyncio
是 Python 标准库中的一个模块,用于编写单线程的并发代码。它使用 async
/await
语法来编写异步代码,允许你以非阻塞的方式执行 I/O 操作,如网络请求、文件读写等。
以下是 asyncio
的一些关键特性:
- 事件循环(Event Loop):
asyncio
使用事件循环来管理所有异步操作。事件循环是一个运行在后台的无限循环,它不断地检查是否有新的事件(如 I/O 完成、定时器到期等)需要处理。 - 协程(Coroutines):协程是
asyncio
中的并发单元,它们是使用async def
定义的函数。协程在遇到await
表达式时会暂停执行,直到等待的任务完成,然后继续执行。 - 任务(Tasks):任务是协程的执行单元。你可以使用
asyncio.create_task()
将协程包装成任务,然后将其放入事件循环中执行。 - Futures:
Future
是一个低级别的并发机制,它代表了一个尚未完成的操作。Future
可以被等待,当操作完成时,它会通知等待者。 - 同步原语:
asyncio
提供了同步原语,如锁(Lock)、事件(Event)、条件变量(Condition)和信号量(Semaphore),用于在异步代码中处理共享资源的同步。 - 流(Streams):
asyncio
还提供了流(如StreamReader
和StreamWriter
),它们用于处理网络通信中的二进制数据流。 - 子进程:
asyncio
允许你异步地创建和管理子进程。 - 超时和取消:你可以为异步操作设置超时,或者在必要时取消正在进行的操作。
二.实现步骤
1.异步终止任务
from typing import Any
from manager.graph.manager import GraphManager
from manager.schemas.manager import SchemasManager
from utils import logger
from fastapi import APIRouter
from manager.graph.query_graph import LocalQueryGraph
import time
import asyncio
GraphRouter = APIRouter(prefix="/test", tags=["测试管理"])
global task_map
task_map: dict[Any, Any] = {}
class GraphRouterMap:
# 长时间运行的异步任务
@staticmethod
async def long_running_task():
try:
while True:
# 模拟任务运行
print("长时间运行的任务正在运行.....")
await asyncio.sleep(2)
except asyncio.CancelledError:
print("任务被取消")
@staticmethod
@GraphRouter.get("/start", summary="启动任务", description="启动任务")
async def start_task():
# 启动长时间任务,并返回任务对象
run_id = time.strftime("%Y%m%d-%H%M%S")
task = asyncio.create_task(GraphRouterMap.long_running_task())
task_map[run_id] = task
return run_id
@staticmethod
@GraphRouter.get("/stop", summary="中止任务", description="中止任务")
async def stop_task(run_id: str):
task = task_map.get(run_id)
task.cancel()
return {"message": "任务取消请求已发送"}
觉得有用的话点个赞
👍🏻
呗。
❤️❤️❤️本人水平有限,如有纰漏,欢迎各位大佬评论批评指正!😄😄😄💘💘💘如果觉得这篇文对你有帮助的话,也请给个点赞、收藏下吧,非常感谢!👍 👍 👍
🔥🔥🔥Stay Hungry Stay Foolish 道阻且长,行则将至,让我们一起加油吧!🌙🌙🌙