Source code for sni.api.routers.teamspeak

"""
Teamspeak related paths
"""

from datetime import datetime
from typing import Optional

from fastapi import (
    APIRouter,
    Depends,
    HTTPException,
    status,
)
import mongoengine as me
import pydantic as pdt

from sni.teamspeak.teamspeak import (
    close_teamspeak_connection,
    complete_authentication_challenge,
    new_authentication_challenge,
    new_teamspeak_connection,
)
import sni.utils as utils
from sni.uac.clearance import assert_has_clearance
from sni.uac.token import (
    from_authotization_header_nondyn,
    Token,
)

from .user import GetUserShortOut

router = APIRouter()


[docs]class PostAuthStartOut(pdt.BaseModel): """ Model for `POST /teamspeak/auth/start` responses. """ expiration_datetime: datetime challenge_nickname: str user: GetUserShortOut
[docs]@router.post( "/auth/start", response_model=PostAuthStartOut, summary="Starts a teamspeak authentication challenge", ) def port_auth_start(tkn: Token = Depends(from_authotization_header_nondyn)): """ Starts a new authentication challenge for the owner of the token. A random nickname is returned (see `PostAuthStartOut` for more details), and the user has 1 minute to update its teamspeak nickname to it, and then call `POST /teamspeak/auth/complete`. """ assert_has_clearance(tkn.owner, "sni.teamspeak.auth") return PostAuthStartOut( expiration_datetime=utils.now_plus(minutes=2), challenge_nickname=new_authentication_challenge(tkn.owner), user=GetUserShortOut.from_record(tkn.owner), )
[docs]@router.post( "/auth/complete", status_code=status.HTTP_201_CREATED, summary="Completes a teamspeak authentication challenge", ) def post_auth_complete(tkn: Token = Depends(from_authotization_header_nondyn)): """ Completes an authentication challenge for the owner of the token. See the `POST /teamspeak/auth/start` documentation. """ assert_has_clearance(tkn.owner, "sni.teamspeak.auth") exception: Optional[HTTPException] = None try: connection = new_teamspeak_connection() complete_authentication_challenge(connection, tkn.owner) except LookupError: exception = HTTPException( status.HTTP_404_NOT_FOUND, detail="Could not find corresponding teamspeak client", ) except me.DoesNotExist: exception = HTTPException( status.HTTP_404_NOT_FOUND, detail="Could not find challenge for user", ) close_teamspeak_connection(connection) if exception is not None: raise exception