文章目录[隐藏]
最近项目涉及到定时任务的配置,最初用的是celery,后来发现celery在任务配置上不够友好,特别是在一个任务未运行完毕时再启动一个Worker或者新的Task会造成任务上的冲突。如果限制并发数或者利用锁机制去解决就更不好了,前者本身会限制celery异步框架的优势,后者实际上还是运行了,只是忽略掉了核心代码,并未对任务本身就行保存或延迟。最后在同事的建议下选择了Apscheduler组件来操作,用着确实挺爽的。
Simple Case
- 安装
pip install apscheduler
- 测试
from apscheduler.executors.pool import ThreadPoolExecutor
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.jobstores.memory import MemoryJobStore
import time
count = 0
def exception_maker():
'''
异常制造器,用来模拟任务执行被中断
:return:
'''
return 1 / 0
def job_printer(text):
'''
死循环,用来模拟长时间执行的任务
:param text:
:return:
'''
global count
count += 1
# while True:
# time.sleep(2)
# print("job text:{} count:{}".format(text,count))
print("\n\n\n")
print("*"*50)
for x in range(3):
print(schedule.get_jobs())
print("jobtext:{} count:{}".format(text,count))
time.sleep(10)
if __name__ == '__main__':
# 使用默认的作业存储器
JOBSTORE = {
'default': MemoryJobStore()
}
EXECUTOR = {
'default': ThreadPoolExecutor(1) # 1个线程
}
JOB_DEFAULTS = {
'coalesce': False, # 不合并任务
'max_instances': 1 # 新作业的默认最大实例限制为 1 }
schedule = BlockingScheduler(jobstores=JOBSTORE,
executors=EXECUTOR,
job_defaults=JOB_DEFAULTS,
)
schedule.add_job(job_printer, "cron", second='*/10', args=['每10秒执行一次!'])
schedule.add_job(exception_maker, "interval",hours=1)
schedule.print_jobs()
schedule.start()
Flask 配置
- 安装
pip install Flask-APScheduler
- 配置
习惯配置一个
settings.py
# 使用默认的作业存储器
JOBSTORE = {
'default': MemoryJobStore()
}
EXECUTOR = {
'default': ThreadPoolExecutor(1) # 1个线程
}
JOB_DEFAULTS = {
'coalesce': False, # # 关闭新job的合并,当job延误或者异常原因未执行时
'max_instances': 1 # 新作业的默认最大实例限制为 1}
# 配置的默认任务列表
JOBS = [
{
"id": "task_name",
"func": "tast:task_file.task_func_name", # 任务函数 scheduler类:文件名.函数名
"trigger": "interval",
"hours": 3,
'replace_existing': True, # 如果存在相同的job_id,是否替换
}
]
# 是否开启API
SCHEDULER_API_ENABLED = True
# api前缀(默认是/scheduler)
SCHEDULER_API_PREFIX = '/task'
# 配置允许执行定时任务的主机名
SCHEDULER_ALLOWED_HOSTS = ['*']
# auth验证。默认是关闭的,
# SCHEDULER_AUTH = HTTPBasicAuth()
- Flask加载配置
from flask import Flask
from flask_apscheduler import APScheduler
def create_app(object_name="settings"):
app = Flask(__name__)
app.config.from_object(object_name)
scheduler = APScheduler()
scheduler.init_app(app)
scheduler.start()
# 配置api权限验证的回调函数
# 如果需要配置权限验证,需要在config中配置
# SCHEDULER_AUTH = HTTPBasicAuth()
# @scheduler.authenticate
# def authenticate(auth):
# return auth['username'] == 'userName' and auth['password'] == "passsWord"
return app
- API 配置
具体使用自己去配置。
def _load_api(self):
"""
Add the routes for the scheduler API.
"""
self._add_url_route('get_scheduler_info', '', api.get_scheduler_info, 'GET')
self._add_url_route('add_job', '/jobs', api.add_job, 'POST')
self._add_url_route('get_job', '/jobs/<job_id>', api.get_job, 'GET')
self._add_url_route('get_jobs', '/jobs', api.get_jobs, 'GET')
self._add_url_route('delete_job', '/jobs/<job_id>', api.delete_job, 'DELETE')
self._add_url_route('update_job', '/jobs/<job_id>', api.update_job, 'PATCH')
self._add_url_route('pause_job', '/jobs/<job_id>/pause', api.pause_job, 'POST')
self._add_url_route('resume_job', '/jobs/<job_id>/resume', api.resume_job, 'POST')
self._add_url_route('run_job', '/jobs/<job_id>/run', api.run_job, 'POST')
Django配置Apscheduler
- 安装
pip install django_apscheduler
- 配置
INSTALLED_APPS = [
'django.contrib.admin',
'django.contrib.auth',
'django.contrib.contenttypes',
'django.contrib.sessions',
'django.contrib.messages',
'django.contrib.staticfiles',
'django_apscheduler', # 定时任务
]
- 配置后要配置一下数据库
python manage.py migrate
- 使用
在views.py中配置即可。
from django_apscheduler.jobstores import DjangoJobStore, register_events, register_job
@register_job(scheduler, "interval", minutes=1o, id='test', max_instances=1,
args=['get_data', 1, "scheduler", random.choice(['1', 'dd', 'tres', 'ccc'])],
replace_existing=True)
def test(a,b,c,d):
print(a,b,c,d)
register_events(scheduler)
scheduler.start()
- 管理员后台也可查看到具体的job以及相关运行日志。