Source code for sni.db.migration

"""
Database migration utilities
"""

import logging
from typing import Any, Dict, Optional

from pymongo.collection import Collection

from .mongodb import get_pymongo_collection


[docs]def ensure_minimum_version(collection: Collection, version: int) -> None: """ For all documents in the collection, if its ``_version`` field is less than the given version, sets it to that version. """ collection.update_many( {"_version": {"$lt": version}}, {"$set": {"_version": version}}, )
[docs]def finalize_migration(mongoengine_model_class) -> None: """ Finalizes the migration process for a collection. Example:: collection = start_migration(EsiRefreshToken) if collection is None: return ... finalize_migration(EsiRefreshToken) """ mongoengine_model_class.ensure_indexes()
[docs]def has_outdated_documents( collection: Collection, schema_version: int ) -> bool: """ Tells wether a (pymongo) collection has documents whose ``_version`` field have value less than the given schema version. """ return ( collection.count_documents({"_version": {"$ne": schema_version}}) > 0 )
[docs]def set_if_not_exist( collection: Collection, field_name: str, value: Any, *, version: Optional[int] = None, ) -> None: """ Creates a field with a given value in all documents, if that field does not already exist. If the version kwargs if specified, only update documents of that version. """ query: Dict[str, Any] = {field_name: {"$exists": False}} if version is not None: query["_version"] = version update = {"$set": {field_name: value}} collection.update_many(query, update)
[docs]def start_migration(mongoengine_model_class) -> Optional[Collection]: """ Starts the migration of a collection. If the returned value is ``None``, then the collection is up to date. Example:: collection = start_migration(EsiRefreshToken) if collection is None: return ... finalize_migration(EsiRefreshToken) """ # pylint: disable=protected-access collection_name = mongoengine_model_class._get_collection_name() collection = get_pymongo_collection(collection_name) if not has_outdated_documents( collection, mongoengine_model_class.SCHEMA_VERSION ): return None logging.info( 'Migrating collection "%s" to version %d', collection_name, mongoengine_model_class.SCHEMA_VERSION, ) collection.drop_indexes() return collection