Source code for sni.scheduler
"""
Asyncronous / concurrent / schedules job management.
Simply use the global member ``sni.scheduler.scheduler``.
See also:
`APScheduler documentation <https://apscheduler.readthedocs.io/en/stable/>`_
"""
from typing import Any, Callable
import logging
from apscheduler.executors.pool import ThreadPoolExecutor
from apscheduler.jobstores.redis import RedisJobStore
from apscheduler.schedulers.background import BackgroundScheduler
from sni.db.redis import new_redis_connection
from sni.conf import CONFIGURATION as conf
import sni.utils as utils
JOBS_KEY: str = "scheduler:default:jobs"
"""The redis key for the job list"""
RUN_TIMES_KEY: str = "scheduler:default:run_times"
"""The redis key for the job run times"""
scheduler = BackgroundScheduler(
executors={
"default": ThreadPoolExecutor(conf.general.scheduler_thread_count,),
},
job_defaults={
"coalesce": True,
"executor": "default",
"jitter": 60,
"jobstore": "default",
"max_instances": 3,
"misfire_grace_time": None,
},
jobstores={
"default": RedisJobStore(
db=conf.redis.database,
host=conf.redis.host,
jobs_key=JOBS_KEY,
port=conf.redis.port,
run_times_key=RUN_TIMES_KEY,
),
},
timezone=utils.utc,
)
[docs]def run_scheduled(function: Callable) -> Callable:
"""
Decorator that makes a function scheduled to run immediately when called.
Example::
@signals.post_save.connect_via(User)
@run_scheduled
def test(_sender: Any, **kwargs):
usr = kwargs['document']
status = 'created' if kwargs.get('created', False) else 'updated'
logging.debug('User %s has been %s', status, usr.character_name)
"""
def wrapper(sender: Any, **kwargs):
scheduler.add_job(function, args=(sender,), kwargs=kwargs)
return wrapper
[docs]def start_scheduler() -> None:
"""
Clears the job store and starts the scheduler.
"""
scheduler.start()
logging.debug("Started scheduler")
[docs]def stop_scheduler() -> None:
"""
Stops the scheduler and cleans up things
"""
scheduler.shutdown()
logging.debug("Stopped scheduler")
redis = new_redis_connection()
scheduler.remove_all_jobs()
redis.delete(JOBS_KEY, RUN_TIMES_KEY)
logging.debug("Flushed jobstore")
[docs]def _test_tick() -> None:
"""
Test function to check that the scheduler is really running.
Schedule like this::
scheduler.add_job(_test_tick, 'interval', seconds=3, jitter=0)
"""
logging.debug("Tick!")
scheduler.add_job(_test_tock)
[docs]def _test_tock() -> None:
"""
Test function to check that the scheduler is really running.
"""
logging.debug("Tock!")