目录
- while True: + sleep()
- Timeloop库
- threading.Timer
- sched模块
- schedule模块
- APScheduler框架
- Celery框架
- 数据流工具Apache Airflow
- 概述
- Airflow 核心概念
- Airflow 的架构
总结以下几种方案实现定时任务,可根据不同需求去使用不同方案。
while True: + sleep()
利用while True的死循环,加上 sleep()函数让其暂停一段时间,达到每隔一段时间执行特定任务的目的。
比较简单,例子如下:
import datetime
import time
def time_printer():
now = datetime.datetime.now()
ts = now.strftime('%Y-%m-%d %H:%M:%S')
print('do func time :', ts)
def loop_monitor():
while True:
time_printer()
time.sleep(5)
if __name__ == "__main__":
loop_monitor()
主要缺点:
- 只能设定间隔,不能指定具体的时间
- sleep 是一个阻塞函数,也就是说 sleep 这一段时间,程序什么也不能操作。
Timeloop库
Timeloop是一个库,可用于运行多周期任务。
Timeloop内部维护了一个任务列表jobs,用来管理所有任务。
可以使用装饰器标记任务,这样就会把任务加到任务列表jobs中,使用start方法启动任务列表重的所有任务。
示例如下:
import time
from timeloop import Timeloop
from datetime import timedelta
tl = Timeloop()
@tl.job(interval=timedelta(seconds=2))
def sample_job_every_2s():
print("2s job current time : {}".format(time.ctime()))
if __name__ == "__main__":
tl.start(block=True)
运行后打印如下:
[2023-10-02 09:48:41,926] [timeloop] [INFO] Starting Timeloop..
[2023-10-02 09:48:41,926] [timeloop] [INFO] Registered job <function sample_job_every_2s at 0x7fc3d022d0d0>
[2023-10-02 09:48:41,926] [timeloop] [INFO] Timeloop now started. Jobs will run based on the interval set
2s job current time : Mon Oct 2 09:48:43 2023
2s job current time : Mon Oct 2 09:48:45 2023
2s job current time : Mon Oct 2 09:48:47 2023
同时Timeloop还有个stop方法,可以用来暂停所有任务。
threading.Timer
threading 模块中的 Timer 是一个非阻塞函数,比 sleep 稍好一点,timer最基本理解就是定时器,我们可以启动多个定时任务,这些定时器任务是异步执行,所以不存在等待顺序执行问题。
主要有如下方法:
方法 | 说明 |
---|---|
Timer(interval, function, args=None, kwargs=None) | 创建定时器 |
cancel() | 取消定时器 |
start() | 使用线程方式执行 |
join(self, timeout=None) | 主线程等待线程执行结束 |
示例:
import datetime
from threading import Timer
def time_printer():
now = datetime.datetime.now()
ts = now.strftime('%Y-%m-%d %H:%M:%S')
print('do func time :', ts)
# 注意 Timer 只能执行一次,这里需要循环调用,否则只能执行一次
loop_monitor()
def loop_monitor():
t = Timer(5, time_printer)
t.start()
if __name__ == "__main__":
loop_monitor()
sched模块
sched模块实现了一个通用事件调度器,在调度器类使用一个延迟函数等待特定的时间,执行任务。同时支持多线程应用程序,在每个任务执行后会立刻调用延时函数,以确保其他线程也能执行。
class sched.scheduler(timefunc, delayfunc)
这个类定义了调度事件的通用接口,它需要外部传入两个参数,timefunc是一个没有参数的返回时间类型数字的函数(常用使用的如time模块里面的time),delayfunc应该是一个需要一个参数来调用、与timefunc的输出兼容、并且作用为延迟多个时间单位的函数(常用的如time模块的sleep)。
import datetime
import time
import sched
def time_printer():
now = datetime.datetime.now()
ts = now.strftime('%Y-%m-%d %H:%M:%S')
print('do func time :', ts)
loop_monitor()
def loop_monitor():
s = sched.scheduler(time.time, time.sleep) # 生成调度器
s.enter(5, 1, time_printer, ())
s.run()
if __name__ == "__main__":
loop_monitor()
scheduler对象主要方法:
enter(delay, priority, action, argument),安排一个事件来延迟delay个时间单位。
cancel(event):从队列中删除事件。如果事件不是当前队列中的事件,则该方法将跑出一个ValueError。
run():运行所有预定的事件。这个函数将等待(使用传递给构造函数的delayfunc()函数),然后执行事件,直到不再有预定的事件。
比threading.Timer更好,不需要循环调用。
schedule模块
schedule是一个第三方轻量级的任务调度模块,可以按照秒,分,小时,日期或者自定义事件执行时间。schedule允许用户使用简单、人性化的语法以预定的时间间隔定期运行Python函数(或其它可调用函数)。
示例:
import schedule
import time
def job():
print("I'm working...")
schedule.every(10).seconds.do(job)
schedule.every(10).minutes.do(job)
schedule.every().hour.do(job)
schedule.every().day.at("10:30").do(job)
schedule.every(5).to(10).minutes.do(job)
schedule.every().monday.do(job)
schedule.every().wednesday.at("13:15").do(job)
schedule.every().minute.at(":17").do(job)
while True:
schedule.run_pending()
time.sleep(1)
也可以通过 @repeat() 装饰静态方法:
import time
from schedule import every, repeat, run_pending
@repeat(every().second)
def job():
print('working...')
while True:
run_pending()
time.sleep(1)
传递参数:
import schedule
def greet(name):
print('Hello', name)
schedule.every(2).seconds.do(greet, name='Alice')
schedule.every(4).seconds.do(greet, name='Bob')
while True:
schedule.run_pending()
装饰器同样能传递参数:
from schedule import every, repeat, run_pending
@repeat(every().second, 'World')
@repeat(every().minute, 'Mars')
def hello(planet):
print('Hello', planet)
while True:
run_pending()
取消任务:
import schedule
i = 0
def some_task():
global i
i += 1
print(i)
if i == 10:
schedule.cancel_job(job)
print('cancel job')
exit(0)
job = schedule.every().second.do(some_task)
while True:
schedule.run_pending()
运行一次任务:
import time
import schedule
def job_that_executes_once():
print('Hello')
return schedule.CancelJob
schedule.every().minute.at(':34').do(job_that_executes_once)
while True:
schedule.run_pending()
time.sleep(1)
根据标签检索任务:
# 检索所有任务:schedule.get_jobs()
import schedule
def greet(name):
print('Hello {}'.format(name))
schedule.every().day.do(greet, 'Andrea').tag('daily-tasks', 'friend')
schedule.every().hour.do(greet, 'John').tag('hourly-tasks', 'friend')
schedule.every().hour.do(greet, 'Monica').tag('hourly-tasks', 'customer')
schedule.every().day.do(greet, 'Derek').tag('daily-tasks', 'guest')
friends = schedule.get_jobs('friend')
print(friends)
根据标签取消任务:
# 取消所有任务:schedule.clear()
import schedule
def greet(name):
print('Hello {}'.format(name))
if name == 'Cancel':
schedule.clear('second-tasks')
print('cancel second-tasks')
schedule.every().second.do(greet, 'Andrea').tag('second-tasks', 'friend')
schedule.every().second.do(greet, 'John').tag('second-tasks', 'friend')
schedule.every().hour.do(greet, 'Monica').tag('hourly-tasks', 'customer')
schedule.every(5).seconds.do(greet, 'Cancel').tag('daily-tasks', 'guest')
while True:
schedule.run_pending()
运行任务到某时间:
import schedule
from datetime import datetime, timedelta, time
def job():
print('working...')
schedule.every().second.until('23:59').do(job) # 今天23:59停止
schedule.every().second.until('2030-01-01 18:30').do(job) # 2030-01-01 18:30停止
schedule.every().second.until(timedelta(hours=8)).do(job) # 8小时后停止
schedule.every().second.until(time(23, 59, 59)).do(job) # 今天23:59:59停止
schedule.every().second.until(datetime(2030, 1, 1, 18, 30, 0)).do(job) # 2030-01-01 18:30停止
while True:
schedule.run_pending()
马上运行所有任务(主要用于测试):
import schedule
def job():
print('working...')
def job1():
print('Hello...')
schedule.every().monday.at('12:40').do(job)
schedule.every().tuesday.at('16:40').do(job1)
schedule.run_all()
schedule.run_all(delay_seconds=3) # 任务间延迟3秒
并行运行:使用 Python 内置队列实现:
import threading
import time
import schedule
def job1():
print("I'm running on thread %s" % threading.current_thread())
def job2():
print("I'm running on thread %s" % threading.current_thread())
def job3():
print("I'm running on thread %s" % threading.current_thread())
def run_threaded(job_func):
job_thread = threading.Thread(target=job_func)
job_thread.start()
schedule.every(10).seconds.do(run_threaded, job1)
schedule.every(10).seconds.do(run_threaded, job2)
schedule.every(10).seconds.do(run_threaded, job3)
while True:
schedule.run_pending()
time.sleep(1)
APScheduler框架
APScheduler(advanceded python scheduler)基于Quartz的一个Python定时任务框架,实现了Quartz的所有功能,使用起来十分方便。提供了基于日期、固定时间间隔以及crontab类型的任务,并且可以持久化任务。基于这些功能,我们可以很方便的实现一个Python定时任务系统。
具体使用参考文章:APScheduler框架使用
Celery框架
Celery是一个简单,灵活,可靠的分布式系统,用于处理大量消息,同时为操作提供维护此类系统所需的工具, 也可用于任务调度。Celery 的配置比较麻烦,如果你只是需要一个轻量级的调度工具,Celery 不会是一个好选择。
Celery 是一个强大的分布式任务队列,它可以让任务的执行完全脱离主程序,甚至可以被分配到其他主机上运行。通常使用它来实现异步任务(async task)和定时任务(crontab)。异步任务比如是发送邮件、或者文件上传, 图像处理等等一些比较耗时的操作 ,定时任务是需要在特定时间执行的任务。
具体使用参考:
Celery使用:优秀的python异步任务框架
Django(21):使用Celery任务框架
数据流工具Apache Airflow
概述
Apache Airflow 是Airbnb开源的一款数据流程工具,目前是Apache孵化项目。以非常灵活的方式来支持数据的ETL过程,同时还支持非常多的插件来完成诸如HDFS监控、邮件通知等功能。Airflow支持单机和分布式两种模式,支持Master-Slave模式,支持Mesos等资源调度,有非常好的扩展性。被大量公司采用。
Airflow使用Python开发,它通过DAGs(Directed Acyclic Graph, 有向无环图)来表达一个工作流中所要执行的任务,以及任务之间的关系和依赖。比如,如下的工作流中,任务T1执行完成,T2和T3才能开始执行,T2和T3都执行完成,T4才能开始执行。
Airflow提供了各种Operator实现,可以完成各种任务实现:
- BashOperator – 执行 bash 命令或脚本。
- SSHOperator – 执行远程 bash 命令或脚本(原理同 paramiko 模块)。
- PythonOperator – 执行 Python 函数。
- EmailOperator – 发送 Email。
- HTTPOperator – 发送一个 HTTP 请求。
- MySqlOperator, SqliteOperator, PostgresOperator, MsSqlOperator, OracleOperator, JdbcOperator, 等,执行 SQL 任务。
- DockerOperator, HiveOperator, S3FileTransferOperator, PrestoToMysqlOperator, SlackOperator…
除了以上这些 Operators 还可以方便的自定义 Operators 满足个性化的任务需求。
一些情况下,我们需要根据执行结果执行不同的任务,这样工作流会产生分支。如:
这种需求可以使用BranchPythonOperator来实现。
Airflow 核心概念
- DAGs:即有向无环图(Directed Acyclic Graph),将所有需要运行的tasks按照依赖关系组织起来,描述的是所有tasks执行顺序。
- Operators:可以简单理解为一个class,描述了DAG中某个的task具体要做的事。其中,airflow内置了很多operators,如BashOperator 执行一个bash 命令,PythonOperator 调用任意的Python 函数,EmailOperator 用于发送邮件,HTTPOperator 用于发送HTTP请求, SqlOperator 用于执行SQL命令等等,同时,用户可以自定义Operator,这给用户提供了极大的便利性。
- Tasks:Task 是 Operator的一个实例,也就是DAGs中的一个node。
- Task Instance:task的一次运行。Web 界面中可以看到task instance 有自己的状态,包括”running”, “success”, “failed”, “skipped”, “up for retry”等。
- Task Relationships:DAGs中的不同Tasks之间可以有依赖关系,如 Task1 >> Task2,表明Task2依赖于Task2了。通过将DAGs和Operators结合起来,用户就可以创建各种复杂的 工作流(workflow)。
Airflow 的架构
在一个可扩展的生产环境中,Airflow 含有以下组件:
- 元数据库:这个数据库存储有关任务状态的信息。
- 调度器:Scheduler 是一种使用 DAG 定义结合元数据中的任务状态来决定哪些任务需要被执行以及任务执行优先级的过程。调度器通常作为服务运行。
- 执行器:Executor 是一个消息队列进程,它被绑定到调度器中,用于确定实际执行每个任务计划的工作进程。有不同类型的执行器,每个执行器都使用一个指定工作进程的类来执行任务。例如,LocalExecutor 使用与调度器进程在同一台机器上运行的并行进程执行任务。其他像 CeleryExecutor 的执行器使用存在于独立的工作机器集群中的工作进程执行任务。
- Workers:这些是实际执行任务逻辑的进程,由正在使用的执行器确定。
Worker的具体实现由配置文件中的executor来指定,airflow支持多种Executor:
- SequentialExecutor: 单进程顺序执行,一般只用来测试
- LocalExecutor: 本地多进程执行
- CeleryExecutor: 使用Celery进行分布式任务调度
- DaskExecutor:使用Dask进行分布式任务调度
- KubernetesExecutor: 1.10.0新增, 创建临时POD执行每次任务
生产环境一般使用CeleryExecutor和KubernetesExecutor。
使用CeleryExecutor的架构如图:
使用KubernetesExecutor的架构如图:
参考:
https://mp.weixin.qq.com/s/dzA9xGoho50WK_-80hzelg
https://zhuanlan.zhihu.com/p/448847300