Source code for sni.sde.jobs
"""
Recurrent SDE jobs
"""
import logging
from sni.scheduler import scheduler
from sni.db.redis import new_redis_connection
import sni.utils as utils
from .models import EsiObjectName
from .sde import (
download_latest_sde,
get_latest_sde_md5,
import_sde_dump,
)
[docs]@scheduler.scheduled_job(
"interval", days=1, start_date=utils.now_plus(minutes=10)
)
def update_sde() -> None:
"""
Checks the hash of the SDE, and if needed, downloads and imports it.
"""
redis = new_redis_connection()
latest_sde_md5 = (
get_latest_sde_md5() + "/" + str(EsiObjectName.SCHEMA_VERSION)
)
current_sde_md5 = redis.get("sde_md5")
if current_sde_md5 is not None:
current_sde_md5 = current_sde_md5.decode()
if latest_sde_md5 == current_sde_md5:
logging.debug("SDE is up to date")
return
logging.debug("SDE is out of date")
dump_path = "sde.sqlite"
download_latest_sde(dump_path)
import_sde_dump(dump_path)
redis.set("sde_md5", latest_sde_md5)