APScheduler学习

发布时间 2023-07-17 15:34:54作者: 蕝戀

官方文档:

https://apscheduler.readthedocs.io

APScheduler的4个组件

  • triggers【触发器】
  • job stores【任务存储】
  • executors【执行器】
  • schedulers【调度器】

scheduler调度器

常用的2个调度器:

  • BlockingScheduler 【阻塞调度器】调度器在当前主线程运行,会阻塞主线程。

    start()后不会返回,程序会卡住在start(),因为调用start() ,会创建子线程,由子线程执行,此时主线程处于阻塞。

    应用场景:

    需要单独写一个定时任务程序,独立其他程序业务,只是为了完成定时任务而已,防止主程序退出。
    
  • BackgroundScheduler【 后台调度器】当你没有使用下面的任何框架,并且希望调度器在你的应用程序中在后台运行时使用。

    应用场景:

    如Flask、django这类WEB框架。因为他们的本身主线程就不会退出。
    

其他调度器:

  • AsyncIOScheduler:如果你的应用程序使用了asyncio模块,那么就使用它。
  • GeventScheduler:如果你的应用程序使用了gevent,就可以使用它。
  • TornadoScheduler:如果你正在构建一个Tornado应用程序,那么就使用它。
  • TwistedScheduler:如果你正在构建一个Twisted应用程序,请使用它。
  • QtScheduler:如果你正在构建一个Qt应用程序,请使用该模块。

job store任务存储

要选择合适的作业存储,你需要确定你将job任务持久化。

如果你总是在应用程序开始时重新创建job任务,那么你可以选择默认的MemoryJobStore将作业状态存在内存)。

但如果你需要你的jobs作业在调度器重启或应用程序崩溃时仍然存在,那么一般可以根据的你的开发环境和开发应用选择合适的JobStore。

使用Redis当作JobStore

from pytz import utc

from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.redis import RedisJobStore

# 定义两个JobStore
jobstores = {
    'default': RedisJobStore(db=0, host="192.168.2.6", port=6379),   # 
}

#  timezone=utc 用于指定时区?
scheduler = BackgroundScheduler(jobstores=jobstores, timezone=utc)

executor执行器

默认情况下的执行器为ThreadPoolExecutor(多线程池执行器),一般可以满足大部分用途。

如果你的job任务是CPU密集型的,可以考虑使用 ProcessPoolExecutor (多程池执行器)来充分利用多核CPU。

注意:executor是可以同时使用多个的,在添加任务的时候可以选择你想要的executor执行器。

# 定义执行器
executors = {
    # 线程池(20个线程)
    "defualt": ThreadPoolExecutor(20),
    # 进程池(5个进程)
    "process": ProcessPoolExecutor(5)
}

# 1、创建调度器
# 在创建调度器的时候可以给他指定执行器字典,然后在添加任务时可以选择任意执行器来决定如何执行同一个时间发生多个任务时的方式
scheduler = BlockingScheduler(executors=executors)

# 2、还可以通过configure方法传递执行器
# scheduler.configure(executors=executors)

trigger触发器

触发器说白了就是指定job任务执行的时间

APScheduler 带有三种内置触发器类型:

import datetime
from apscheduler.schedulers.background import BackgroundScheduler

def job_func(text):
    print("当前时间:", datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3])

scheduler = BackgroundScheduler()

# 在1-3、7-9月份中的每个星期一、二中的 0, 1, 2 , 3 执行 job_func 任务
scheduler.add_job(job_func, 'cron', month='1-3,7-9',day='0, tue', hour='0-3')

scheduler.start()

当然,也可以将多个类似的触发器组合成一个触发器:

一个任务多个triggers

官方参考连接:https://apscheduler.readthedocs.io/en/stable/modules/triggers/combining.html#module-apscheduler.triggers.combining

AndTrigger:(逻辑与运算,将多个触发器的条件合并成一个,也就是触发器的条件都要一起满足才运行。)

from apscheduler.triggers.combining import AndTrigger
from apscheduler.triggers.interval import IntervalTrigger
from apscheduler.triggers.cron import CronTrigger

# 使用AndTrigger将多个trigger组成条件
# 每2小时运行一次job_function,但只在周六和周日运行
trigger = AndTrigger([IntervalTrigger(hours=2),CronTrigger(day_of_week='sat,sun')])

scheduler.add_job(job_function, trigger)

OrTrigger:(逻辑或运算,这个就很好理解了....,其实都是很好理解...)

# 每周的周一2点运行 或者 每周的周四15点运行
trigger = OrTrigger([CronTrigger(day_of_week='mon', hour=2),
                     CronTrigger(day_of_week='tue', hour=15)])
scheduler.add_job(job_function, trigger)

APScheduler操作实例

BackgroundScheduler默认情况下,不给指定JobStore任务存储参数,

它使用的名为 "default "的MemoryJobStore和一个名为 "default "的ThreadPoolExecutor,默认最大线程数为10

from pytz import utc

from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.mongodb import MongoDBJobStore
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor

# 定义两个JobStore
jobstores = {
    'mongo': MongoDBJobStore(),  # 一个是mogodb来存储job作业
    'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')  # 另外一个是用SQLAlchemy来使用sqlite来存储,注意default是表示默认。
}
# 定义两个执行器
executors = {
    'default': ThreadPoolExecutor(20),  # 一个是多线程的,最大线程为20
    'processpool': ProcessPoolExecutor(5)  # 这个是多进程池,最大5个进程
}

job_defaults = {
    'coalesce': False,  # 这个参数不知道啥作用,回头看看官方文档...
    'max_instances': 3  # 这个参数在interval类型的任务时有用,比如你的job每2秒运行一次,到那时job的任务需要5秒,此时到定时任务后会跳过下次调度,而等待这次5秒的job运行完才会执行,当设置了这个参数值,就表示允许同时执行的job实例。
}


#  timezone=utc 用于指定时区?
scheduler = BackgroundScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=utc)

MemoryJobStore