接外包,有相关需求的可以联系我:Telegram | Email

Apscheduler定时任务以及Flask和Django的配置

该文章创建(更新)于09/16/2022,请注意文章的时效性!

最近项目涉及到定时任务的配置,最初用的是celery,后来发现celery在任务配置上不够友好,特别是在一个任务未运行完毕时再启动一个Worker或者新的Task会造成任务上的冲突。如果限制并发数或者利用锁机制去解决就更不好了,前者本身会限制celery异步框架的优势,后者实际上还是运行了,只是忽略掉了核心代码,并未对任务本身就行保存或延迟。最后在同事的建议下选择了Apscheduler组件来操作,用着确实挺爽的。

Simple Case

  • 安装
pip install apscheduler
  • 测试
from apscheduler.executors.pool import ThreadPoolExecutor  
from apscheduler.schedulers.blocking import BlockingScheduler  
from apscheduler.jobstores.memory import MemoryJobStore  
import time  
count = 0  

def exception_maker():  
    '''  
    异常制造器,用来模拟任务执行被中断  
    :return:  
    '''    
    return 1 / 0  


 def job_printer(text):  
     '''  
     死循环,用来模拟长时间执行的任务  
     :param text:  
     :return:  
     '''  
     global count  
     count += 1  
     # while True:  
     #     time.sleep(2)  
     #     print("job text:{} count:{}".format(text,count))  
     print("\n\n\n")  
     print("*"*50)  
     for x in range(3):  
         print(schedule.get_jobs())  
         print("jobtext:{} count:{}".format(text,count))  
         time.sleep(10)  


if __name__ == '__main__':  
    # 使用默认的作业存储器  
    JOBSTORE = {  
        'default': MemoryJobStore()  
    }  

    EXECUTOR = {  
        'default': ThreadPoolExecutor(1) # 1个线程  
    }  

    JOB_DEFAULTS = {  
        'coalesce': False, # 不合并任务  
        'max_instances': 1 # 新作业的默认最大实例限制为 1    }  

    schedule = BlockingScheduler(jobstores=JOBSTORE,  
                                 executors=EXECUTOR,  
                                 job_defaults=JOB_DEFAULTS,  
                                 )  
    schedule.add_job(job_printer, "cron", second='*/10', args=['每10秒执行一次!'])  
    schedule.add_job(exception_maker, "interval",hours=1)  

    schedule.print_jobs()  
    schedule.start()

Flask 配置

  • 安装
pip install Flask-APScheduler
  • 配置

习惯配置一个settings.py

# 使用默认的作业存储器  
JOBSTORE = {  
    'default': MemoryJobStore()  
}  

EXECUTOR = {  
    'default': ThreadPoolExecutor(1)  # 1个线程  
}  

JOB_DEFAULTS = {  
    'coalesce': False,  # # 关闭新job的合并,当job延误或者异常原因未执行时  
    'max_instances': 1  # 新作业的默认最大实例限制为 1}  

# 配置的默认任务列表  
JOBS = [  
    {  
        "id": "task_name",  
        "func": "tast:task_file.task_func_name",  # 任务函数 scheduler类:文件名.函数名
        "trigger": "interval",  
        "hours": 3,  
        'replace_existing': True,  # 如果存在相同的job_id,是否替换  
    }  
]  

# 是否开启API  
SCHEDULER_API_ENABLED = True  
# api前缀(默认是/scheduler)  
SCHEDULER_API_PREFIX = '/task'  
# 配置允许执行定时任务的主机名  
SCHEDULER_ALLOWED_HOSTS = ['*']  
# auth验证。默认是关闭的,  
# SCHEDULER_AUTH = HTTPBasicAuth()
  • Flask加载配置
from flask import Flask
from flask_apscheduler import APScheduler

def create_app(object_name="settings"):
    app = Flask(__name__)
    app.config.from_object(object_name)

    scheduler = APScheduler()  
    scheduler.init_app(app)  
    scheduler.start()  
    # 配置api权限验证的回调函数  
    # 如果需要配置权限验证,需要在config中配置  
    # SCHEDULER_AUTH = HTTPBasicAuth()  
    # @scheduler.authenticate  
    # def authenticate(auth):  
    #     return auth['username'] == 'userName' and auth['password'] == "passsWord"
    return app
  • API 配置

具体使用自己去配置。

def _load_api(self):
    """
    Add the routes for the scheduler API.
    """
    self._add_url_route('get_scheduler_info', '', api.get_scheduler_info, 'GET')
    self._add_url_route('add_job', '/jobs', api.add_job, 'POST')
    self._add_url_route('get_job', '/jobs/<job_id>', api.get_job, 'GET')
    self._add_url_route('get_jobs', '/jobs', api.get_jobs, 'GET')
    self._add_url_route('delete_job', '/jobs/<job_id>', api.delete_job, 'DELETE')
    self._add_url_route('update_job', '/jobs/<job_id>', api.update_job, 'PATCH')
    self._add_url_route('pause_job', '/jobs/<job_id>/pause', api.pause_job, 'POST')
    self._add_url_route('resume_job', '/jobs/<job_id>/resume', api.resume_job, 'POST')
    self._add_url_route('run_job', '/jobs/<job_id>/run', api.run_job, 'POST')

Django配置Apscheduler

  • 安装
pip install django_apscheduler
  • 配置
INSTALLED_APPS = [
    'django.contrib.admin',
    'django.contrib.auth',
    'django.contrib.contenttypes',
    'django.contrib.sessions',
    'django.contrib.messages',
    'django.contrib.staticfiles',
    'django_apscheduler', # 定时任务
]
  • 配置后要配置一下数据库
python manage.py migrate
  • 使用

在views.py中配置即可。

from django_apscheduler.jobstores import DjangoJobStore, register_events, register_job

@register_job(scheduler, "interval", minutes=1o, id='test', max_instances=1,
              args=['get_data', 1, "scheduler", random.choice(['1', 'dd', 'tres', 'ccc'])],
              replace_existing=True)
def test(a,b,c,d):
    print(a,b,c,d)


register_events(scheduler)
scheduler.start()
  • 管理员后台也可查看到具体的job以及相关运行日志。

参考



👇 Share | 分享 👇


要不赞赏一下?

微信
支付宝
PayPal
Bitcoin

版权声明 | Copyright

除非特别说明,本博客所有作品均采用知识共享署名-非商业性使用-禁止演绎 4.0 国际许可协议进行许可。转载请注明转自-
https://www.emperinter.info/2022/09/16/apscheduler/


要不聊聊?

我相信你准备留下的内容是经过思考的!【勾选防爬虫,未勾选无法留言】

*

*



微信公众号

👉 NewsLetter ❤️ 邮箱订阅 👈

优惠码


阿里云国际版20美元
Vultr10美元
搬瓦工 | Bandwagon应该有折扣吧?
域名 | namesiloemperinter(1美元)