Source code for sni.api.routers.corporation

"""
Corporation management paths
"""

from datetime import datetime
from typing import Dict, Iterator, List, Optional

from fastapi import APIRouter, Depends
import pydantic as pdt

from sni.esi.scope import EsiScope, esi_scope_set_to_hex
from sni.esi.token import tracking_status, TrackingStatus
from sni.uac.clearance import assert_has_clearance
from sni.uac.token import (
    create_state_code,
    from_authotization_header_nondyn,
    Token,
)
from sni.user.models import Alliance, Corporation, User
from sni.user.user import ensure_corporation

from .user import GetUserShortOut

router = APIRouter()


[docs]class GetAllianceShortOut(pdt.BaseModel): """ Short alliance description """ alliance_id: int alliance_name: str
[docs] @staticmethod def from_record(alliance: Alliance) -> "GetAllianceShortOut": """ Converts an instance of :class:`sni.user.models.Alliance` to :class:`sni.api.routers.alliance.GetAllianceShortOut` """ return GetAllianceShortOut( alliance_id=alliance.alliance_id, alliance_name=alliance.alliance_name, )
[docs]class GetCorporationOut(pdt.BaseModel): """ Corporation data """ alliance: Optional[GetAllianceShortOut] authorized_to_login: Optional[bool] ceo: GetUserShortOut corporation_id: int corporation_name: str cumulated_mandatory_esi_scopes: List[EsiScope] mandatory_esi_scopes: List[EsiScope] ticker: str updated_on: datetime
[docs] @staticmethod def from_record(corporation: Corporation) -> "GetCorporationOut": """ Converts an instance of :class:`sni.user.models.Corporation` to :class:`sni.api.routers.corporation.GetCorporationOut` """ return GetCorporationOut( alliance=GetAllianceShortOut.from_record(corporation.alliance) if corporation.alliance is not None else None, authorized_to_login=corporation.authorized_to_login, ceo=GetUserShortOut.from_record(corporation.ceo), corporation_id=corporation.corporation_id, corporation_name=corporation.corporation_name, cumulated_mandatory_esi_scopes=list( corporation.cumulated_mandatory_esi_scopes() ), mandatory_esi_scopes=corporation.mandatory_esi_scopes, ticker=corporation.ticker, updated_on=corporation.updated_on, )
[docs]class PostCorporationGuestOut(pdt.BaseModel): """ Model for ``POST /corporation/{corporation_id}/guest`` reponses. """ state_code: str
[docs]class GetCorporationShortOut(pdt.BaseModel): """ Short corporation description """ corporation_id: int corporation_name: str
[docs] @staticmethod def from_record(corporation: Corporation) -> "GetCorporationShortOut": """ Converts an instance of :class:`sni.user.models.Corporation` to :class:`sni.api.routers.corporation.GetCorporationShortOut` """ return GetCorporationShortOut( corporation_id=corporation.corporation_id, corporation_name=corporation.corporation_name, )
[docs]class GetTrackingOut(pdt.BaseModel): """ Represents a corporation tracking response. """ invalid_refresh_token: List[GetUserShortOut] = [] no_refresh_token: List[GetUserShortOut] = [] valid_refresh_token: List[GetUserShortOut] = []
[docs] @staticmethod def from_user_iterator(iterator: Iterator[User]) -> "GetTrackingOut": """ Creates a tracking response from a user iterator. See :meth:`sni.esi.token.tracking_status` """ result = GetTrackingOut() ldict: Dict[int, List[GetUserShortOut]] = { TrackingStatus.HAS_NO_REFRESH_TOKEN: result.no_refresh_token, TrackingStatus.ONLY_HAS_INVALID_REFRESH_TOKEN: result.invalid_refresh_token, TrackingStatus.HAS_A_VALID_REFRESH_TOKEN: result.valid_refresh_token, } for usr in iterator: status = tracking_status(usr) ldict[status].append(GetUserShortOut.from_record(usr)) return result
[docs]class PutCorporationIn(pdt.BaseModel): """ Model for ``PUT /corporation/{corporation_id}`` requests """ authorized_to_login: Optional[bool] mandatory_esi_scopes: Optional[List[EsiScope]]
[docs]@router.get( "", response_model=List[GetCorporationShortOut], summary="Get the list of corporations", ) def get_corporations(tkn: Token = Depends(from_authotization_header_nondyn),): """ Gets the list of corporations registered in this instance. Requires a clearance level of 0 or more. """ assert_has_clearance(tkn.owner, "sni.read_corporation") return [ GetCorporationShortOut.from_record(corporation) for corporation in Corporation.objects( corporation_id__gte=2000000 ).order_by("corporation_name") ]
[docs]@router.get( "/{corporation_id}", response_model=GetCorporationOut, summary="Get informations about a corporation", ) def get_corporation( corporation_id: int, tkn: Token = Depends(from_authotization_header_nondyn), ): """ Get informations about a corporation. Note that this corporation must be registered on SNI """ assert_has_clearance(tkn.owner, "sni.read_corporation") corporation = Corporation.objects(corporation_id=corporation_id).get() return GetCorporationOut.from_record(corporation)
[docs]@router.post( "/{corporation_id}", response_model=GetCorporationOut, summary="Manually fetch a corporation from the ESI", ) def post_corporation( corporation_id: int, tkn: Token = Depends(from_authotization_header_nondyn), ): """ Manually fetches a corporation from the ESI. Requires a clearance level of 8 or more. """ assert_has_clearance(tkn.owner, "sni.fetch_corporation") corporation = ensure_corporation(corporation_id) return GetCorporationOut.from_record(corporation)
[docs]@router.put( "/{corporation_id}", response_model=GetCorporationOut, summary="Modify a corporation registered on SNI", ) def put_corporation( corporation_id: int, data: PutCorporationIn, tkn: Token = Depends(from_authotization_header_nondyn), ): """ Modify a corporation registered on SNI. Note that it does not modify it on an ESI level. Requires a clearance level of 2 or more. """ corporation: Corporation = Corporation.objects( corporation_id=corporation_id ).get() assert_has_clearance(tkn.owner, "sni.update_corporation", corporation.ceo) corporation.authorized_to_login = data.authorized_to_login if data.mandatory_esi_scopes is not None: corporation.mandatory_esi_scopes = data.mandatory_esi_scopes corporation.save() return GetCorporationOut.from_record(corporation)
[docs]@router.delete( "/{corporation_id}/guest/{character_id}", summary="Deletes a corporation guest", ) def delete_corporation_guest( corporation_id: int, character_id: int, tkn: Token = Depends(from_authotization_header_nondyn), ): """ Deletes a corporation guest """ corporation: Corporation = Corporation.objects( corporation_id=corporation_id ).get() assert_has_clearance( tkn.owner, "sni.delete_corporation_guest", corporation.ceo ) guest: User = User.objects( character_id=character_id, clearance_level__lt=0, corporation=corporation, ).get() guest.delete()
[docs]@router.get( "/{corporation_id}/guest", response_model=List[GetUserShortOut], summary="Corporation guests", ) def get_corporation_guests( corporation_id: int, tkn: Token = Depends(from_authotization_header_nondyn), ): """ Returns the list of guests in this corporation. """ corporation: Corporation = Corporation.objects( corporation_id=corporation_id ).get() assert_has_clearance( tkn.owner, "sni.read_corporation_guests", corporation.ceo ) return [ GetUserShortOut.from_record(guest) for guest in corporation.guest_iterator() ]
[docs]@router.post( "/{corporation_id}/guest", response_model=PostCorporationGuestOut, summary="Creates a state code for a new guest to this corporation", ) def post_corporation_guest( corporation_id: int, tkn: Token = Depends(from_authotization_header_nondyn), ): """ Creates a state code for a new guest to this corporation. The user then has to login with this state code to be considered a guest. """ corporation: Corporation = Corporation.objects( corporation_id=corporation_id ).get() assert_has_clearance( tkn.owner, "sni.create_corporation_guest", corporation.ceo ) state_code = create_state_code( tkn.parent, inviting_corporation=corporation, code_prefix=esi_scope_set_to_hex( corporation.cumulated_mandatory_esi_scopes() ), ) return PostCorporationGuestOut(state_code=str(state_code.uuid))
[docs]@router.get( "/{corporation_id}/tracking", response_model=GetTrackingOut, summary="Corporation tracking", ) def get_corporation_tracking( corporation_id: int, tkn: Token = Depends(from_authotization_header_nondyn), ): """ Reports which member (of a given corporation) have a valid refresh token attacked to them, and which do not. Requires a clearance level of 1 and having authority over this corporation. """ corporation: Corporation = Corporation.objects( corporation_id=corporation_id ).get() assert_has_clearance(tkn.owner, "sni.track_corporation", corporation.ceo) return GetTrackingOut.from_user_iterator(corporation.user_iterator())