Python—14、定时任务
文档结构
- 1、需求简介
- 2、类库实现
1、需求简介
在实际的应用开发中,很多场景下需要在固定的时间,按照固定的时间间隔周期性的运行某些任务;
2、类库实现
一般实现任务的定时运行有如下几种方式:
1)while …sleep
2)Threading.Timer
3)schedule模块
4)Apscheduler模块(推荐)
2.1、while…sleep
import time
def taskDetail(loopTime:int):currTime = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())print(f"loopTime:{loopTime}-->","currTime:",currTime)def taskMonitor():loopFlag = TruestartPoint = 1while loopFlag:if startPoint <=5:taskDetail(startPoint)startPoint = startPoint + 1time.sleep(5)else:print("startPoint达到阈值上限,退出循环")breakif __name__ == "__main__":taskMonitor()loopLevel = 0while (loopLevel <= 10):print(f"loopLevel:-->{loopLevel}")loopLevel = loopLevel + 1time.sleep(1)
结果输出:
D:\SoftWare\Python\python.exe E:/PythonProject/pythonProject/PyQt5/TaskDetail.py
loopTime:1--> currTime: 2022-06-02 10:27:53
loopTime:2--> currTime: 2022-06-02 10:27:58
loopTime:3--> currTime: 2022-06-02 10:28:03
loopTime:4--> currTime: 2022-06-02 10:28:08
loopTime:5--> currTime: 2022-06-02 10:28:13
startPoint达到阈值上限,退出循环
loopLevel:-->0
loopLevel:-->1
loopLevel:-->2
loopLevel:-->3
loopLevel:-->4
loopLevel:-->5
loopLevel:-->6
loopLevel:-->7
loopLevel:-->8
loopLevel:-->9
loopLevel:-->10Process finished with exit code 0
说明:该方式只能实现任务的周期运行,但是无法异步调用;
2.2、threading.Timer
import time
from threading import Timerdef taskDetail(loopTime: int):currTime = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())print(f"loopTime:{loopTime}-->", "currTime:", currTime)def taskMonitor():taskDetail(2)t = Timer(5, taskMonitor)t.start()if __name__ == "__main__":taskMonitor()loopLevel = 0while (loopLevel <= 10):print(f"loopLevel:-->{loopLevel}")loopLevel = loopLevel + 1time.sleep(1)
输出结果:
D:\SoftWare\Python\python.exe E:/PythonProject/pythonProject/PyQt5/TaskDetail.py
loopTime:2--> currTime: 2022-06-02 10:20:49
loopLevel:-->0
loopLevel:-->1
loopLevel:-->2
loopLevel:-->3
loopLevel:-->4
loopTime:2--> currTime: 2022-06-02 10:20:54
loopLevel:-->5
loopLevel:-->6
loopLevel:-->7
loopLevel:-->8
loopLevel:-->9
loopTime:2--> currTime: 2022-06-02 10:20:59
loopLevel:-->10Process finished with exit code -1
说明:
1)使用 Threading.Timer 可以实现任务的异步调用,但是 该方法实际为一个递归调用,有一定的递归深度限制;
2)即使调整了递归深度限制,运行到达到最大CPU时,python会直接销毁程序;
- 修改递归深度
sys.setrecursionlimit(100000000)
2.3、schedule模块
- 安装 schedule库
pip install schedule
- 代码实现
import time
import scheduledef taskDetail(taskName: str):currTime = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())print(f"{taskName}-->", "currTime:", currTime)def scheduleMonitor():schedule.clear()# 创建一个按3秒间隔执行任务schedule.every(1).seconds.do(taskDetail("Task-A"))# 创建一个按2秒间隔执行任务schedule.every(3).seconds.do(taskDetail("Task-B"))while True:schedule.run_pending()if __name__ == "__main__":scheduleMonitor()loopLevel = 0while (loopLevel <= 10):print(f"loopLevel:-->{loopLevel}")loopLevel = loopLevel + 1time.sleep(1)
说明:该模块的任务调用模拟没有实现;该模块需要while Ture配合使用,而且占用的CPU也比其他几种方式高。
2.4、APScheduler框架
框架手册:https://apscheduler.readthedocs.io/en/stable/userguide.html
- 安装 APScheduler库
pip install APScheduler
Apschedule框架有4种核心组件:
1)触发器(trigger)
2)作业存储(JobStore)
3)执行器(Executor)
4)调度器(Schedul)
2.4.1、触发器(trigger)
触发器决定了任务的运行时间、运行周期和运行频率;按照不同的运行要求可以分为
date、interval、cron三类;
1)date:在指定的时间,运行一次任务;
2)interval:以固定的时间间隔持续运行作业;
3)cron:在一天中特定的时间周期性运行任务;
4)trigger-list 触发器,即组合触发器,可以将 date、interval、cron 三类触发器组合实现任务的运行;
(一) date 触发
此处 schedule.add_job 的参数
trigger配置为date,run_date设置运行时间;
run_date 的格式可以直接以日期或时间格式,也可以以字符串形式;
-
run_date=date(2022 ,6, 5) -
run_date=datetime(2022 ,6, 5, 18, 00, 00) -
run_date="2022-06-22" -
run_date="2022-06-02 15:00:10" -
代码实现
import time
from apscheduler.schedulers.blocking import BlockingSchedulerdef taskDetail(taskName: str):currTime = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())print(f"{taskName}-->", "currTime:", currTime)if __name__ == "__main__":apSchedule = BlockingScheduler()apSchedule.add_job(func=taskDetail, trigger='date', run_date="2022-06-02 15:00:10", args=['task-A'])apSchedule.start()
(二) interval 触发
1)此处 schedule.add_job 的参数
trigger配置为interval;
2)当 start_date没有指定时,将以 datetime.now() + interval作为第一次开始运行的时间;
3)运行参数:
weeks (int)days (int)hours (int)minutes (int)seconds (int)start_date (datetime|str)end_date (datetime|str)timezone (datetime.tzinfo|str)jitter (int|None)– 延迟作业执行的最大时间,单位秒(second);
- 代码实现
if __name__ == "__main__":apSchedule = BlockingScheduler()apSchedule.add_job(func=taskDetail, trigger='interval', minutes=1, seconds=5,end_date="2022-06-02 16:05:00", args=['task-A'])apSchedule.start()
结果输出:
D:\SoftWare\Python\python.exe E:/PythonProject/pythonProject/PyQt5/apscheduleDemo.py
mainThread--> currTime: 2022-06-02 16:18:03
D:\SoftWare\Python\lib\site-packages\apscheduler\util.py:436: PytzUsageWarning: The localize method is no longer necessary, as this time zone supports the fold attribute (PEP 495). For more details on migrating to a PEP 495-compliant implementation, see https://pytz-deprecation-shim.readthedocs.io/en/latest/migration.htmlreturn tzinfo.localize(dt)
task-A--> currTime: 2022-06-02 16:19:08
task-A--> currTime: 2022-06-02 16:20:13Process finished with exit code -1
说明:此处 任务运行的时间间隔为 65s;
- 时间间隔
schedule.add_job 的参数配置中,如下几种方式等同:
trigger='interval', seconds=65trigger='interval', minutes=1, seconds=5同理超出 60s的时间间隔,可以直接以 seconds 参数设置,或者结合 seconds + minutes + hours + …+ weeks 组合设置,即其他粒度的间隔参数只是便于时间间隔的单位换算;
- 运行时间
当
start_date设置的时间小于 当前时间时,则任务实际的运行时间是start_date + n*interval大于当前时间的最近 一次时间;
如当前时间为:2022-06-02 16:31:03,设置的start_date="2022-06-02 16:28:00", seconds=65,则首次运行的实际时间为:2022-06-02 16:31:15;
- 周期+定时运行
当需要在某个固定时间,周期性运行某个任务时,需要配合
start_date和seconds/minutes/...weeks或者seconds + minutes + hours+...+weeks等的组合实现;
缺点:interval 参数无法实现不同时间粒度的运行 要求,如在每个月的某几天运行,或者每天的某几个小时运行;
(三) cron 触发
- 时间粒度
1)此处 schedule.add_job 的参数
trigger配置为cron;
2)时间粒度参数:
year (int|str)–- 4digit yearmonth (int|str)– month (1-12)day (int|str)– day of month (1-31)week (int|str)– ISO week (1-53)day_of_week (int|str)– number or name of weekday (0-6 or mon,tue,wed,thu,fri,sat,sun)hour (int|str)– hour (0-23)minute (int|str)– minute (0-59)second (int|str)– second (0-59)start_date (datetime|str)– earliest possible date/time to trigger on (inclusive)end_date (datetime|str)– latest possible date/time to trigger on (inclusive)timezone (datetime.tzinfo|str)– time zone to use for the date/time calculations (defaults to scheduler timezone)jitter (int|None)– delay the job execution by seconds at mostjitter
3)当时间粒度参数的具体值是单个数值时,可以使用 int 类型,当是某个区间或其他序列时,使用 str 格式;
- 参数阈值
1)如下列表中的表达式适用于 年 到 秒,在一个时间粒度的参数中,阈值可以使用多个表达式的组合,以逗号分割;
2)每个时间粒度的参数设置了后,都起作用;如同时设置了day、hour、minute、second,那么任务将在对应时间粒度的时间内运行;
| 表达式 | 时间参数 | 描述 |
|---|---|---|
* | any | 所有时间粒度的每个值; |
*/a | any | 所有时间粒度的值域的 a 的倍数,从 0 开始,如果时间粒度是 second=“*/13”,则运行时间为 每分钟的 00,13,26,39,52,00 秒; |
a-b | any | 所有时间粒度的值域,满足 a <= t <= b; |
a-b/c | any | 所有时间粒度的值域,从a 开始,间隔 c,到 b 结束; |
xth y | day | 每个月 第 x 个 周 y 运行; |
last x | day | 每个月的最后一个 周 x 运行; |
last | day | 每个月的最后一天运行; |
x,y,z | any | 值域时间符合 x,y,z 时运行; |
英文的第几全程简写对照如下:
first / 1st,second / 2nd,third /3rd,fourth /4th,
fifth / 5th,sixth / 6th,seventh / 7th,eighth / 8th,
ninth /9th,tenth / 10th,eleventh / 11th,twelfth / 12th
星期的周几英文简写和数字对照如下:
mon(0),tue(1),wed(2),thu(3),fri(4),sat(5),sun(6)
- 代码实现
if __name__ == "__main__":apSchedule = BlockingScheduler()apSchedule.add_job(func=taskDetail,trigger="cron",day="5th thu",args=["task-A"])print("mainThread-->", "currTime:", time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()))apSchedule.start()
说明:上述设置为 每个月的第二个周四运行;
- 时区切换
当时区由标准时区切换为夏令时,或者由夏令时切换为标准时区时,任务调度的时间时间将提前半小时或滞后半小时,可以通过禁用夏令时来规避该问题的出现;
可以设置时区参数:timezone="UTC";
4) 组合触发器
对于单个触发器,无法满足某些场景:
如:在每周的周二,2:00:00运行;或者 每周三的 13:00:00运行;
此时可以使用多个触发器组合来实现特定的运行要求;
组合触发器分为2类:
A:与触发器(AndTrigger);
B:或触发器(OrTrigger);
如下代码实现每周周四 2:00:00、7:00:00 或者 每周日 15:00:00、19:00:00任务运行;
- 或触发组合
import time
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.triggers.combining import OrTrigger
from apscheduler.triggers.cron import CronTriggerdef taskDetail(taskName: str):currTime = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())print(f"{taskName}-->", "currTime:", currTime)trigList = OrTrigger([CronTrigger(day_of_week="thu", hour="2,7", minute=00, second=00),CronTrigger(day_of_week="sun", hour="15,19", minute=30, second=00)])apscheduler = BlockingScheduler()
apscheduler.add_job(func=taskDetail, trigger=trigList, args=["task-A"])
apscheduler.start()
- 与触发器
import time
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.triggers.combining import AndTrigger
from apscheduler.triggers.cron import CronTrigger
from apscheduler.triggers.interval import IntervalTriggerdef taskDetail(taskName: str):currTime = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())print(f"{taskName}-->", "currTime:", currTime)trigList = AndTrigger([CronTrigger(day_of_week="thu", hour="2,7", minute=00, second=00),CronTrigger(day_of_week="sun", hour="15,19", minute=30, second=00)])apscheduler = BlockingScheduler()
apscheduler.add_job(func=taskDetail, trigger=trigList, args=["task-A"])
apscheduler.start()
说明:此处没有实现 AndTrigger 触发逻辑;
2.4.1、作业存储(JobStore)
2.4.1、执行器(Executor)
2.4.1、调度器(Schedule)
BlockingScheduler
在前台运行的调度器,开始执行后永远不会返回;所以 schedule.start()执行后面的代码将无法执行;
BackgroundScheduler
在后台运行的调度器,执行 schedule.start()会立刻返回,可以执行后面的代码逻辑;
- 后台线程相比于前台线程多了一个参数
daemon,用来设置是否为守护线程,默认为 True;- 当该 schedule 设置为守护线程后,在主线程结束后,执行schedule 的独立线程也将立刻停止;
scheduler = BackgroundScheduler(daemon=True)
如下类型的调度器供了解,只有特定场景下需要;
- AsyncIOScheduler
- GeventScheduler
- TornadoScheduler
- TwistedScheduler
- QtScheduler
添加作业
A:schedule.add_job()
1)方式:使用schedule实例来调用 add_job方法;;
2)一个schedule 实例可以调用多次 add_job方法,func参数可以是同一个函数,也可以是不同的函数;
from datetime import datetime
from apscheduler.schedulers.blocking import BlockingSchedulerdef taskDetail(taskName: str):print(f"{taskName}-->", "currTime:", datetime.now())if __name__ == "__main__":scheduler = BlockingScheduler(daemon=True)scheduler.add_job(func=taskDetail, trigger='cron', second="*/2", args=["task-A"])scheduler.add_job(func=taskDetail, trigger='cron', second="*/3", args=["task-B"])scheduler.start()
B:$schedule.scheduled_job()
使用 schedule实例调用 scheduled_job方法来修饰目标函数,即装饰器的使用;
代码实现:
import time
from apscheduler.schedulers.blocking import BlockingSchedulerscheduler = BlockingScheduler()@scheduler.scheduled_job(trigger='cron', second="*/13", args=["task-A"])
def taskDetail(taskName: str):currTime = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())print(f"{taskName}-->", "currTime:", currTime)if __name__ == "__main__":print("mainThread-->", "currTime:", time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()))scheduler.start()
说明:此处的调度器变量申明需要放到 函数申请的前面,否则装饰器无法使用,变量不识别;
移除作业
-
方式A:通过使用作业的 ID 和作业存储别名调用 remove_job();
说明:对于装饰器方式添加的作业,只有该方法可以移除作业; -
方式B:通过在从 add_job() 获取的作业实例上调用 remove();
-
代码实现
if __name__ == "__main__":scheduler = BackgroundScheduler(daemon=True)scheduler.add_job(func=taskDetail, trigger='cron', second="*/2", args=["task-A"],id="job_A")job_B= scheduler.add_job(func=taskDetail, trigger='cron', second="*/3", args=["task-B"])scheduler.start()time.sleep(10)print("方式[scheduler.remove_job]移除作业-->job_A")scheduler.remove_job(job_id="job_A");time.sleep(10)print("方式[jobinstance.remove]移除作业-->job_B")job_B.remove();time.sleep(10)print("主线程和scheduler独立线程执行结束")
结果输出
D:\SoftWare\Python\python.exe E:/PythonProject/pythonProject/PyQt5/apscheduleDemo.py
D:\SoftWare\Python\lib\site-packages\apscheduler\util.py:436: PytzUsageWarning: The localize method is no longer necessary, as this time zone supports the fold attribute (PEP 495). For more details on migrating to a PEP 495-compliant implementation, see https://pytz-deprecation-shim.readthedocs.io/en/latest/migration.htmlreturn tzinfo.localize(dt)
task-B--> currTime: 2022-06-07 22:13:03.014156
task-A--> currTime: 2022-06-07 22:13:04.003080
task-B--> currTime: 2022-06-07 22:13:06.001904
task-A--> currTime: 2022-06-07 22:13:06.001904
task-A--> currTime: 2022-06-07 22:13:08.010611
task-B--> currTime: 2022-06-07 22:13:09.013001
task-A--> currTime: 2022-06-07 22:13:10.012642
task-B--> currTime: 2022-06-07 22:13:12.017692
task-A--> currTime: 2022-06-07 22:13:12.017692
方式[scheduler.remove_job]移除作业-->job_A
task-B--> currTime: 2022-06-07 22:13:15.016496
task-B--> currTime: 2022-06-07 22:13:18.005886
task-B--> currTime: 2022-06-07 22:13:21.007383
方式[jobinstance.remove]移除作业-->job_B
主线程和scheduler独立线程执行结束Process finished with exit code 0
暂停/继续作业
-
方式A:
scheduler.pause_job(job_id=" ")或scheduler.resume_job(job_id=" ")
方式B:
job.pause()或job.resume() -
代码实现
if __name__ == "__main__":scheduler = BackgroundScheduler(daemon=True)scheduler.add_job(func=taskDetail, trigger='cron', second="*/2", args=["task-A"],id="job_A")job_B= scheduler.add_job(func=taskDetail, trigger='cron', second="*/3", args=["task-B"])scheduler.start()time.sleep(10)print("[scheduler.pause_job]方式暂停作业{}-->job_A".format(10))scheduler.pause_job(job_id="job_A");print("[jobInstance.pause]方式暂停作业{}-->job_B".format(10))job_B.pause()time.sleep(10)print("[job_A,job_B]暂停运行10s结束")print("[scheduler.resume_job]方式继续作业{pause_time}-->job_A")scheduler.resume_job(job_id="job_A");print("[jobInstance.resume]方式继续作业-->job_B")job_B.resume()while True:# print("mainThread-->线程循环休眠1s")time.sleep(1)
2.4.1、接口参考(Interface)
1)执行器相关
apscheduler.eventsapscheduler.executors.asyncioapscheduler.executors.baseapscheduler.executors.debugapscheduler.executors.geventapscheduler.executors.poolapscheduler.executors.twisted
2)作业存储相关
apscheduler.jobapscheduler.jobstores.baseapscheduler.jobstores.memoryapscheduler.jobstores.mongodbapscheduler.jobstores.redisapscheduler.jobstores.rethinkdbapscheduler.jobstores.sqlalchemyapscheduler.jobstores.zookeeper
3)调度器相关
apscheduler.schedulersapscheduler.schedulers.asyncioapscheduler.schedulers.backgroundapscheduler.schedulers.baseapscheduler.schedulers.blockingapscheduler.schedulers.geventapscheduler.schedulers.tornadoapscheduler.schedulers.twisted
4)触发器相关
apscheduler.triggers.baseapscheduler.triggers.combiningapscheduler.triggers.cronapscheduler.triggers.dateapscheduler.triggers.interval
====================================== over ==================================
本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!
