【Py】使用flask-apscheduler动态调整作业参数(附源码)

news2025/3/1 1:16:20

之前的项目常使用Apscheduler进行定时任务调度,但最近想通过接口对这些任务进行动态调整,比如调整任务启停、调整任务执行时间、间隔时间等等

flask-apscheduler这个基于flask的库能够满足上面的需求,而且由于基于flask,所以我常用的connexion这个库理论上也能够完美支持。

接口描述

先来看一下官方文档
在这里插入图片描述
可以看到能够自动生成接口,接口详情如下:

  • /scheduler [GET]
    获取webapp基本信息
    Response 200:
{
    "current_host": "6a1cc3879c58",
    "allowed_hosts": [
        "*"
    ],
    "running": true
}
  • /scheduler/jobs [POST json job data]
    给调度器添加作业
    Request:
{
    "id": "job1",
    "func": "test:task",
    "args": [1, 2],
    "trigger": "interval",
    "seconds": 10
}

其中,上例func中的test为文件名,task为函数名

Response 200:

{
    "id": "job1",
    "name": "job1",
    "func": "test:task",
    "args": [
        1,
        2
    ],
    "kwargs": {},
    "trigger": "interval",
    "start_date": "2022-11-20T21:14:13.217787+08:00",
    "seconds": 10,
    "misfire_grace_time": 1,
    "max_instances": 1,
    "next_run_time": "2022-11-20T21:14:13.217787+08:00"
}
  • /scheduler/jobs/<job_id> [GET]
    获取作业详情
    Response 200:
{
    "id": "job1",
    "name": "job1",
    "func": "test:task",
    "args": [
        1,
        2
    ],
    "kwargs": {},
    "trigger": "interval",
    "start_date": "2022-11-20T20:57:27.662972+08:00",
    "seconds": 10,
    "misfire_grace_time": 1,
    "max_instances": 1,
    "next_run_time": "2022-11-20T21:02:27.662972+08:00"
}
  • /scheduler/jobs [GET]
    获取所有作业详情
    Response 200:
[
    {
        "id": "job1",
        "name": "job1",
        "func": "test:task",
        "args": [
            1,
            2
        ],
        "kwargs": {},
        "trigger": "interval",
        "start_date": "2022-11-20T20:57:27.662972+08:00",
        "seconds": 10,
        "misfire_grace_time": 1,
        "max_instances": 1,
        "next_run_time": "2022-11-20T21:02:47.662972+08:00"
    }
]
  • /scheduler/jobs/<job_id> [DELETE]
    从调度中删除某作业
    Response 204
  • /scheduler/jobs/<job_id> [PATCH json job data]
    更新某作业
    Request:
{
    "func": "test:task",
    "args": [2, 3],
    "trigger": "interval",
    "seconds": 10
}

Response 200:

{
    "id": "job1",
    "name": "job1",
    "func": "test:task",
    "args": [
        2,
        3
    ],
    "kwargs": {},
    "trigger": "interval",
    "start_date": "2022-11-20T21:15:31.187552+08:00",
    "seconds": 10,
    "misfire_grace_time": 1,
    "max_instances": 1,
    "next_run_time": "2022-11-20T21:15:31.187552+08:00"
}
  • /scheduler/jobs/<job_id>/pause [POST]
    暂停某作业
    Response 200:
{
    "id": "job1",
    "name": "job1",
    "func": "test:task",
    "args": [
        2,
        3
    ],
    "kwargs": {},
    "trigger": "interval",
    "start_date": "2022-11-20T21:15:31.187552+08:00",
    "seconds": 10,
    "misfire_grace_time": 1,
    "max_instances": 1,
    "next_run_time": null
}
  • /scheduler/jobs/<job_id>/resume [POST]
    恢复某作业
    Response 200:
{
    "id": "job1",
    "name": "job1",
    "func": "test:task",
    "args": [
        2,
        3
    ],
    "kwargs": {},
    "trigger": "interval",
    "start_date": "2022-11-20T21:15:31.187552+08:00",
    "seconds": 10,
    "misfire_grace_time": 1,
    "max_instances": 1,
    "next_run_time": "2022-11-20T21:17:01.187552+08:00"
}
  • /scheduler/jobs/<job_id>/run [POST]
    立即执行某作业
    Response 200:
{
    "id": "job1",
    "name": "job1",
    "func": "test:task",
    "args": [
        2,
        3
    ],
    "kwargs": {},
    "trigger": "interval",
    "start_date": "2022-11-20T21:15:31.187552+08:00",
    "seconds": 10,
    "misfire_grace_time": 1,
    "max_instances": 1,
    "next_run_time": "2022-11-20T21:21:31.187552+08:00"
}

示例代码

可以使用示例代码,其中包括使用connexion以及flask两种方式的代码,具体看你的项目需求而定。

进入代码根目录
执行如下代码运行容器

>>> docker-compose up -d

完成后进入容器

>>> docker exec -it falsk-apscheduler-test /bin/bash

停止supervisord

>>> supervisorctl -c Services/falsk-apscheduler-test/supervisord.conf stop all

connexion

>>> python3 swagger_server/app.py

随后可以看到定时输出如下内容:

/app/swagger_server/app.py:47: DeprecationWarning: 'app.json_encoder' is deprecated and will be removed in Flask 2.3. Customize 'app.json_provider_class' or 'app.json' instead.
  app.app.json_encoder = encoder.JSONEncoder
 * Serving Flask app 'app'
 * Debug mode: off
WARNING: This is a development server. Do not use it in a production deployment. Use a production WSGI server instead.
 * Running on all addresses (0.0.0.0)
 * Running on http://127.0.0.1:8000
 * Running on http://172.22.0.2:8000
Press CTRL+C to quit
2022-11-20 22:21:14.358093 execute task 1+2=3
2022-11-20 22:21:24.358596 execute task 1+2=3
2022-11-20 22:21:34.357602 execute task 1+2=3
2022-11-20 22:21:44.357543 execute task 1+2=3
2022-11-20 22:21:54.357700 execute task 1+2=3
2022-11-20 22:22:04.357793 execute task 1+2=3
2022-11-20 22:22:14.358036 execute task 1+2=3
2022-11-20 22:22:24.358291 execute task 1+2=3
2022-11-20 22:22:34.358224 execute task 1+2=3

flask

>>> python3 test.py

随后可以看到定时输出如下内容:

 * Serving Flask app 'test'
 * Debug mode: off
WARNING: This is a development server. Do not use it in a production deployment. Use a production WSGI server instead.
 * Running on http://127.0.0.1:8000
Press CTRL+C to quit
2022-11-21 16:51:19.376061 execute task 1+2=3
2022-11-21 16:51:29.377086 execute task 1+2=3
2022-11-21 16:51:39.376033 execute task 1+2=3
2022-11-21 16:51:49.376166 execute task 1+2=3
2022-11-21 16:51:59.376259 execute task 1+2=3
2022-11-21 16:52:09.376141 execute task 1+2=3
2022-11-21 16:52:19.376926 execute task 1+2=3
2022-11-21 16:52:29.376742 execute task 1+2=3

接口测试

将代码中的flask-apscheduler-test.postman_collection.json文件导入Postman中
在这里插入图片描述
依次对接口进行测试即可
在这里插入图片描述
注意:

  1. 进行接口测试时,不要关闭容器终端;
  2. 测试代码运行在8000端口,如果有占用,可以手动改为别的端口,再重新运行

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/24406.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

查题校园免费题库接口

查题校园免费题库接口 本平台优点&#xff1a; 多题库查题、独立后台、响应速度快、全网平台可查、功能最全&#xff01; 1.想要给自己的公众号获得查题接口&#xff0c;只需要两步&#xff01; 2.题库&#xff1a; 查题校园题库&#xff1a;查题校园题库后台&#xff08;点…

ButterKnife依赖注入框架源码解析

BuffterKnife 采用 注解 APT技术 APT&#xff1a;Annotation Processor tool 注解处理器&#xff0c;是javac的一个工具&#xff0c;每个处理器都是继承于AbstractProcessor 注解处理器是运行在自己的java虚拟机中 APT如何生成字节码文件&#xff1a; Annotation Processing 不…

李立宗《讲给入门者的深度学习》

14天学习训练营导师课程&#xff1a; 李立宗《讲给入门者的深度学习》 一、什么是深度学习&#xff1f; 1、传统方法、机器学习、深度学习的区别&#xff1f; 以取暖为例&#xff0c;来说明三者的不同之处。 传统方法&#xff1a;通过火炉生火&#xff0c;需要生火、添柴、…

公众号免费题库使用

公众号免费题库使用 本平台优点&#xff1a; 多题库查题、独立后台、响应速度快、全网平台可查、功能最全&#xff01; 1.想要给自己的公众号获得查题接口&#xff0c;只需要两步&#xff01; 2.题库&#xff1a; 题库&#xff1a;题库后台&#xff08;点击跳转&#xff09;…

Python实现点选验证码识别, 模拟登陆小破站并自动发弹幕

前言 嗨喽~大家好呀&#xff0c;这里是魔王呐 ❤ ~! 开发环境: Python 3.8 Pycharm 2021.2 谷歌浏览器 谷歌驱动 模块使用: selenium >>> pip install selenium3.141.0 指定版本安装 time 打码平台 如果安装python第三方模块: win R 输入 cmd 点击确定, 输入…

【白话科普】从“熊猫烧香”聊聊计算机病毒

大家还记得2006年在网络上肆虐的“熊猫烧香”病毒吗&#xff1f; 虽然图标是一只小熊猫举着三根香&#xff0c;但是它是一款拥有自动传播、自动感染硬盘能力和强大的破坏能力的病毒&#xff0c;它不但能感染系统中exe&#xff0c;com&#xff0c;pif&#xff0c;src&#xff0c…

STM32实战总结:HAL之I2C

I2C基础知识参考&#xff1a; 嵌入式常见接口协议总结_路溪非溪的博客-CSDN博客 电路图 扩展的I2C接口&#xff0c;可以连接支持I2C的设备。常见于传感器等。 参考手册 目前大部分MCU都带有IIC总线接口&#xff0c;STM32F1也不例外。但是这里我们不使用STM32F1的硬件IIC&#x…

Linux查看磁盘、文件系统、文件夹、文件大小的命令(lsblk、df、du、ll)

记录&#xff1a;325 场景&#xff1a;在CentOS 7.9操作系统上&#xff0c;使用lsblk命令查看磁盘大小和磁盘挂载情况&#xff1b;使用df查看文件系统大小和挂载情况&#xff1b;使用du命令查看文件夹(目录)大小&#xff1b;使用ll和ls查看文件大小。 版本&#xff1a; 操作…

XXL-JOB任务有效期支持实现方案

概述 在做数据产品或平台系统时&#xff0c;经常会遇到类似如下截图中&#xff0c;有截至日期的定时调度任务的需求。即定时任务只在指定的开始日期-截至日期里指定的时间里执行。具体的业务需求场景&#xff0c;如营销活动的看板数据的订阅邮件&#xff0c;推送名单的活动&am…

实验(五):外部中断实验

一、实验目的与任务 实验目的&#xff1a; 1&#xff0e;掌握外部中断的工作原理&#xff1b; 2&#xff0e;学会中断程序设计。 任务&#xff1a; 1&#xff0e;运行Keil开发环境&#xff0c;完成外部中断响应软件编程&#xff1b; 2&#xff0e;外部中断接口分别接按键K1、K2…

Hibernate基本使用

注&#xff1a;本文使用maven创建项目。 目录&#xff1a;Hibernate简介&#xff1a;Hibernate使用&#xff1a;一、手动创建&#xff1a;1.建表&#xff1a;2.pom.xml中导入相关依赖&#xff1a;3.创建Hibernate核心配置文件hibernate.cfg.xml&#xff1a;4.创建实体类UserEnt…

Ubuntu系统、CentOS系统双网卡的配置

虚机双网卡配置前言一、CentOS系统1.配置网卡信息1.1编辑eth0网卡1.2查看eth0网卡信息1.3编辑eth1网卡1.4查看eth1网卡信息2.关闭网卡arp代答和rp_filter校验2.1编辑配置文件2.2查看配置文件3.重启网络服务4.配置路由4.1 配置路由4.2 查看路由二、Ubuntu系统1.配置网卡信息1.1.…

微信小程序运行机制和生命周期

一. 运行机制 首先了解下小程序的运行机制&#xff0c;小程序从启动到最终被销毁&#xff0c;会经历很多不同的状态&#xff0c;小程序在不同状态下会有不同的表现。大致运行机制如下图。 小程序生命周期图 接下来我们是图中概念讲解&#xff0c;项目中也会经常遇到。 1&…

etf动量轮动+大盘择时:年化30%的策略

原创文章第111篇&#xff0c;专注“个人成长与财富自由、世界运作的逻辑&#xff0c; AI量化投资”。 今天重点来探索一下elegantRL。 昨天的文章金融强化学习与finRL开发包里介绍了finRL的源码结构&#xff0c;背后的强化学习框架是elegantRL。 聚宽平台上有一个“动量轮动…

Java#18(面向对象三大特征之一:继承)

目录 一.继承 1.Java中提供了关键字extends,可以让一个类和另一个类建立起继承关系 2.继承的好处 3.什么时候使用继承? 二.继承的特点 java只支持单继承,不支持多继承,但支持多层继承 三.子类到底能继承父类中的哪些内容? 四.继承中成员变量和成员方法的访问特点 1. 继…

Apache Jmeter压力测试与性能监控,监测cpu、内存、磁盘、网络

1.官网下载Jmeter 解压&#xff0c;bin目录下 Windows 运行jmeter.bat 、Linux运行jmeter.sh 2.jmeter-plugins-manager 插件 测试机下载放置Jmeter的apache-jmeter-5.5\lib\ext 目录下,重新jmeter。 3.ServerAgent-2.2.3.zip下载 下载好放服务器端&#xff0c;给可执行文…

FPGA精简版UDP协议实现板间网线传输视频,提供3套工程源码

目录1.FPGA精简版UDP介绍2.网线板间视频传输---精简版UDP再次精简3.网线板间视频传输---实现方案4.网线板间视频传输---发送端方案5.网线板间视频传输---接收端方案6.工程1介绍---Artix7(RTL8211)双网口环回7.工程2介绍---Artix7发送--->Kintex7(B50610)接收8.工程3介绍---K…

RabbitMQ的广播模式(fanout)在(基于xml配置)项目中使用

项目结构 添加相关的jar包&#xff1a; pom.xml <?xml version"1.0" encoding"UTF-8"?> <project xmlns"http://maven.apache.org/POM/4.0.0"xmlns:xsi"http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation&qu…

@敏捷组织从业者,开放敏捷架构O-AA™标准考试及认证项目重磅上线!

数字化转型和敏捷转型需同时进行&#xff0c; O-AA™标准更强调组织和文化的转型。 认证项目 重磅发布 发布物资源 标准中文从业认证考试 标准讲师认证培训课程 ALL IN ∨ 开放敏捷架构O-AA™标准采用了基于结果、以产品为中心的方法&#xff0c;使企业能够以灵活和敏捷的…

[ros2实操]1-ros2的安装(ubuntu1804)与运行

参考链接: Recording and playing back data — ROS 2 Documentation: Galactic documentation 使用docker创建了一个ubuntu1804镜像: docker run -it --gpus all \-p 8860:8860 \-v /tmp/.X11-unix:/tmp/.X11-unix \-v /home/lbw/temp_dir:/temp_dir \-e DISPLAYunix$DISPL…