文章目录
- 安装 pip install apscheduler 测试 from apscheduler.executors.pool import ThreadPoolExecutor from apscheduler.schedulers.blocking import BlockingScheduler from apscheduler.jobstores.memory import MemoryJobStore import time count = 0 def exception_maker(): ''' 异常制造器,用来模拟任务执行被中断 :return: ''' return 1 / 0 def job_printer(text): ''' 死循环,用来模拟长时间执行的任务 :param text: :return: ''' global count count += 1 # while True: # time.sleep(2) # print("job text:{} count:{}".format(text,count)) print("\n\n\n") print("*"*50) for x in range(3): print(schedule.get_jobs()) print("jobtext:{} count:{}".format(text,count)) time.sleep(10) if __name__ == '__main__': # 使用默认的作业存储器 JOBSTORE = { 'default': MemoryJobStore() } EXECUTOR = { 'default': ThreadPoolExecutor(1) # 1个线程 } JOB_DEFAULTS = { 'coalesce': False, # 不合并任务 'max_instances': 1 # 新作业的默认最大实例限制为 1 } schedule = BlockingScheduler(jobstores=JOBSTORE, executors=EXECUTOR, job_defaults=JOB_DEFAULTS, ) schedule.add_job(job_printer, "cron", second='*/10', args=['每10秒执行一次!']) schedule.add_job(exception_maker, "interval",hours=1) schedule.print_jobs() schedule.start()
- 安装 pip install Flask-APScheduler 配置 习惯配置一个settings.py # 使用默认的作业存储器 JOBSTORE = { 'default': MemoryJobStore() } EXECUTOR = { 'default': ThreadPoolExecutor(1) # 1个线程 } JOB_DEFAULTS = { 'coalesce': False, # # 关闭新job的合并,当job延误或者异常原因未执行时 'max_instances': 1 # 新作业的默认最大实例限制为 1} # 配置的默认任务列表 JOBS = [ { "id": "task_name", "func": "tast:task_file.task_func_name", # 任务函数 scheduler类:文件名.函数名 "trigger": "interval", "hours": 3, 'replace_existing': True, # 如果存在相同的job_id,是否替换 } ] # 是否开启API SCHEDULER_API_ENABLED = True # api前缀(默认是/scheduler) SCHEDULER_API_PREFIX = '/task' # 配置允许执行定时任务的主机名 SCHEDULER_ALLOWED_HOSTS = ['*'] # auth验证。默认是关闭的, # SCHEDULER_AUTH = HTTPBasicAuth() Flask加载配置 from flask import Flask from flask_apscheduler import APScheduler def create_app(object_name="settings"): app = Flask(__name__) app.config.from_object(object_name) scheduler = APScheduler() scheduler.init_app(app) scheduler.start() # 配置api权限验证的回调函数 # 如果需要配置权限验证,需要在config中配置 # SCHEDULER_AUTH = HTTPBasicAuth() # @scheduler.authenticate # def authenticate(auth): # return auth['username'] == 'userName' and auth['password'] == "passsWord" return app API 配置 具体使用自己去配置。 def _load_api(self): """ Add the routes for the scheduler API. """ self._add_url_route('get_scheduler_info', '', api.get_scheduler_info, 'GET') self._add_url_route('add_job', '/jobs', api.add_job, 'POST') self._add_url_route('get_job', '/jobs/<job_id>', api.get_job, 'GET') self._add_url_route('get_jobs', '/jobs', api.get_jobs, 'GET') self._add_url_route('delete_job', '/jobs/<job_id>', api.delete_job, 'DELETE') self._add_url_route('update_job', '/jobs/<job_id>', api.update_job, 'PATCH') self._add_url_route('pause_job', '/jobs/<job_id>/pause', api.pause_job, 'POST') self._add_url_route('resume_job', '/jobs/<job_id>/resume', api.resume_job, 'POST') self._add_url_route('run_job', '/jobs/<job_id>/run', api.run_job, 'POST')
- 安装 pip install django_apscheduler 配置 INSTALLED_APPS = [ 'django.contrib.admin', 'django.contrib.auth', 'django.contrib.contenttypes', 'django.contrib.sessions', 'django.contrib.messages', 'django.contrib.staticfiles', 'django_apscheduler', # 定时任务 ] 配置后要配置一下数据库 python manage.py migrate 使用 在views.py中配置即可。 from django_apscheduler.jobstores import DjangoJobStore, register_events, register_job @register_job(scheduler, "interval", minutes=1o, id='test', max_instances=1, args=['get_data', 1, "scheduler", random.choice(['1', 'dd', 'tres', 'ccc'])], replace_existing=True) def test(a,b,c,d): print(a,b,c,d) register_events(scheduler) scheduler.start() 管理员后台也可查看到具体的job以及相关运行日志。
- https://blog.csdn.net/cui_yonghua/article/details/121255401 https://apscheduler.readthedocs.io/en/latest/ https://blog.csdn.net/qq_15969343/article/details/125565204 https://segmentfault.com/a/1190000039111644
最近项目涉及到定时任务的配置,最初用的是celery,后来发现celery在任务配置上不够友好,特别是在一个任务未运行完毕时再启动一个Worker或者新的Task会造成任务上的冲突。如果限制并发数或者利用锁机制去解决就更不好了,前者本身会限制celery异步框架的优势,后者实际上还是运行了,只是忽略掉了核心代码,并未对任务本身就行保存或延迟。最后在同事的建议下选择了Apscheduler组件来操作,用着确实挺爽的。
- 安装
pip install apscheduler
- 测试
from apscheduler.executors.pool import ThreadPoolExecutor
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.jobstores.memory import MemoryJobStore
import time
count = 0
def exception_maker():
'''
异常制造器,用来模拟任务执行被中断
:return:
'''
return 1 / 0
def job_printer(text):
'''
死循环,用来模拟长时间执行的任务
:param text:
:return:
'''
global count
count += 1
# while True:
# time.sleep(2)
# print("job text:{} count:{}".format(text,count))
print("\n\n\n")
print("*"*50)
for x in range(3):
print(schedule.get_jobs())
print("jobtext:{} count:{}".format(text,count))
time.sleep(10)
if __name__ == '__main__':
# 使用默认的作业存储器
JOBSTORE = {
'default': MemoryJobStore()
}
EXECUTOR = {
'default': ThreadPoolExecutor(1) # 1个线程
}
JOB_DEFAULTS = {
'coalesce': False, # 不合并任务
'max_instances': 1 # 新作业的默认最大实例限制为 1 }
schedule = BlockingScheduler(jobstores=JOBSTORE,
executors=EXECUTOR,
job_defaults=JOB_DEFAULTS,
)
schedule.add_job(job_printer, "cron", second='*/10', args=['每10秒执行一次!'])
schedule.add_job(exception_maker, "interval",hours=1)
schedule.print_jobs()
schedule.start()
pip install apscheduler
from apscheduler.executors.pool import ThreadPoolExecutor
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.jobstores.memory import MemoryJobStore
import time
count = 0
def exception_maker():
'''
异常制造器,用来模拟任务执行被中断
:return:
'''
return 1 / 0
def job_printer(text):
'''
死循环,用来模拟长时间执行的任务
:param text:
:return:
'''
global count
count += 1
# while True:
# time.sleep(2)
# print("job text:{} count:{}".format(text,count))
print("\n\n\n")
print("*"*50)
for x in range(3):
print(schedule.get_jobs())
print("jobtext:{} count:{}".format(text,count))
time.sleep(10)
if __name__ == '__main__':
# 使用默认的作业存储器
JOBSTORE = {
'default': MemoryJobStore()
}
EXECUTOR = {
'default': ThreadPoolExecutor(1) # 1个线程
}
JOB_DEFAULTS = {
'coalesce': False, # 不合并任务
'max_instances': 1 # 新作业的默认最大实例限制为 1 }
schedule = BlockingScheduler(jobstores=JOBSTORE,
executors=EXECUTOR,
job_defaults=JOB_DEFAULTS,
)
schedule.add_job(job_printer, "cron", second='*/10', args=['每10秒执行一次!'])
schedule.add_job(exception_maker, "interval",hours=1)
schedule.print_jobs()
schedule.start()
- 安装
pip install Flask-APScheduler
- 配置
习惯配置一个settings.py
# 使用默认的作业存储器
JOBSTORE = {
'default': MemoryJobStore()
}
EXECUTOR = {
'default': ThreadPoolExecutor(1) # 1个线程
}
JOB_DEFAULTS = {
'coalesce': False, # # 关闭新job的合并,当job延误或者异常原因未执行时
'max_instances': 1 # 新作业的默认最大实例限制为 1}
# 配置的默认任务列表
JOBS = [
{
"id": "task_name",
"func": "tast:task_file.task_func_name", # 任务函数 scheduler类:文件名.函数名
"trigger": "interval",
"hours": 3,
'replace_existing': True, # 如果存在相同的job_id,是否替换
}
]
# 是否开启API
SCHEDULER_API_ENABLED = True
# api前缀(默认是/scheduler)
SCHEDULER_API_PREFIX = '/task'
# 配置允许执行定时任务的主机名
SCHEDULER_ALLOWED_HOSTS = ['*']
# auth验证。默认是关闭的,
# SCHEDULER_AUTH = HTTPBasicAuth()
- Flask加载配置
from flask import Flask
from flask_apscheduler import APScheduler
def create_app(object_name="settings"):
app = Flask(__name__)
app.config.from_object(object_name)
scheduler = APScheduler()
scheduler.init_app(app)
scheduler.start()
# 配置api权限验证的回调函数
# 如果需要配置权限验证,需要在config中配置
# SCHEDULER_AUTH = HTTPBasicAuth()
# @scheduler.authenticate
# def authenticate(auth):
# return auth['username'] == 'userName' and auth['password'] == "passsWord"
return app
- API 配置
具体使用自己去配置。
def _load_api(self):
"""
Add the routes for the scheduler API.
"""
self._add_url_route('get_scheduler_info', '', api.get_scheduler_info, 'GET')
self._add_url_route('add_job', '/jobs', api.add_job, 'POST')
self._add_url_route('get_job', '/jobs/<job_id>', api.get_job, 'GET')
self._add_url_route('get_jobs', '/jobs', api.get_jobs, 'GET')
self._add_url_route('delete_job', '/jobs/<job_id>', api.delete_job, 'DELETE')
self._add_url_route('update_job', '/jobs/<job_id>', api.update_job, 'PATCH')
self._add_url_route('pause_job', '/jobs/<job_id>/pause', api.pause_job, 'POST')
self._add_url_route('resume_job', '/jobs/<job_id>/resume', api.resume_job, 'POST')
self._add_url_route('run_job', '/jobs/<job_id>/run', api.run_job, 'POST')
pip install Flask-APScheduler
习惯配置一个settings.py
# 使用默认的作业存储器
JOBSTORE = {
'default': MemoryJobStore()
}
EXECUTOR = {
'default': ThreadPoolExecutor(1) # 1个线程
}
JOB_DEFAULTS = {
'coalesce': False, # # 关闭新job的合并,当job延误或者异常原因未执行时
'max_instances': 1 # 新作业的默认最大实例限制为 1}
# 配置的默认任务列表
JOBS = [
{
"id": "task_name",
"func": "tast:task_file.task_func_name", # 任务函数 scheduler类:文件名.函数名
"trigger": "interval",
"hours": 3,
'replace_existing': True, # 如果存在相同的job_id,是否替换
}
]
# 是否开启API
SCHEDULER_API_ENABLED = True
# api前缀(默认是/scheduler)
SCHEDULER_API_PREFIX = '/task'
# 配置允许执行定时任务的主机名
SCHEDULER_ALLOWED_HOSTS = ['*']
# auth验证。默认是关闭的,
# SCHEDULER_AUTH = HTTPBasicAuth()
from flask import Flask
from flask_apscheduler import APScheduler
def create_app(object_name="settings"):
app = Flask(__name__)
app.config.from_object(object_name)
scheduler = APScheduler()
scheduler.init_app(app)
scheduler.start()
# 配置api权限验证的回调函数
# 如果需要配置权限验证,需要在config中配置
# SCHEDULER_AUTH = HTTPBasicAuth()
# @scheduler.authenticate
# def authenticate(auth):
# return auth['username'] == 'userName' and auth['password'] == "passsWord"
return app
具体使用自己去配置。
def _load_api(self):
"""
Add the routes for the scheduler API.
"""
self._add_url_route('get_scheduler_info', '', api.get_scheduler_info, 'GET')
self._add_url_route('add_job', '/jobs', api.add_job, 'POST')
self._add_url_route('get_job', '/jobs/<job_id>', api.get_job, 'GET')
self._add_url_route('get_jobs', '/jobs', api.get_jobs, 'GET')
self._add_url_route('delete_job', '/jobs/<job_id>', api.delete_job, 'DELETE')
self._add_url_route('update_job', '/jobs/<job_id>', api.update_job, 'PATCH')
self._add_url_route('pause_job', '/jobs/<job_id>/pause', api.pause_job, 'POST')
self._add_url_route('resume_job', '/jobs/<job_id>/resume', api.resume_job, 'POST')
self._add_url_route('run_job', '/jobs/<job_id>/run', api.run_job, 'POST')
- 安装
pip install django_apscheduler
- 配置
INSTALLED_APPS = [
'django.contrib.admin',
'django.contrib.auth',
'django.contrib.contenttypes',
'django.contrib.sessions',
'django.contrib.messages',
'django.contrib.staticfiles',
'django_apscheduler', # 定时任务
]
- 配置后要配置一下数据库
python manage.py migrate
- 使用
在views.py中配置即可。
from django_apscheduler.jobstores import DjangoJobStore, register_events, register_job
@register_job(scheduler, "interval", minutes=1o, id='test', max_instances=1,
args=['get_data', 1, "scheduler", random.choice(['1', 'dd', 'tres', 'ccc'])],
replace_existing=True)
def test(a,b,c,d):
print(a,b,c,d)
register_events(scheduler)
scheduler.start()
- 管理员后台也可查看到具体的job以及相关运行日志。
pip install django_apscheduler
INSTALLED_APPS = [
'django.contrib.admin',
'django.contrib.auth',
'django.contrib.contenttypes',
'django.contrib.sessions',
'django.contrib.messages',
'django.contrib.staticfiles',
'django_apscheduler', # 定时任务
]
python manage.py migrate
在views.py中配置即可。
from django_apscheduler.jobstores import DjangoJobStore, register_events, register_job
@register_job(scheduler, "interval", minutes=1o, id='test', max_instances=1,
args=['get_data', 1, "scheduler", random.choice(['1', 'dd', 'tres', 'ccc'])],
replace_existing=True)
def test(a,b,c,d):
print(a,b,c,d)
register_events(scheduler)
scheduler.start()
