Source code for sni.api.routers.group

"""
Group management paths
"""

from datetime import datetime
import logging
from typing import List, Optional

from fastapi import (
    APIRouter,
    Depends,
    status,
)
import pydantic as pdt

from sni.user.models import Group, User

from sni.uac.clearance import assert_has_clearance, has_clearance
from sni.uac.token import (
    from_authotization_header_nondyn,
    Token,
)

from .user import GetUserShortOut
from .common import BSONObjectId

router = APIRouter()


[docs]class GetGroupShortOut(pdt.BaseModel): """ Model for an element of `GET /group` responses """ group_id: str group_name: str
[docs]class GetGroupOut(pdt.BaseModel): """ Model for `GET /group/{group_id}` responses. """ authorized_to_login: Optional[bool] created_on: datetime description: str group_id: str group_name: str is_autogroup: bool members: List[GetUserShortOut] owner: Optional[GetUserShortOut] updated_on: datetime
[docs] @staticmethod def from_record(grp: Group) -> "GetGroupOut": """ Converts a group database record to a response. """ return GetGroupOut( authorized_to_login=grp.authorized_to_login, created_on=grp.created_on, description=grp.description, group_id=str(grp.pk), group_name=grp.group_name, is_autogroup=grp.is_autogroup, members=[ GetUserShortOut.from_record(member) for member in grp.members ], owner=GetUserShortOut.from_record(grp.owner) if grp.owner is not None else None, updated_on=grp.updated_on, )
[docs]class PostGroupIn(pdt.BaseModel): """ Model for `POST /group` responses. """ description: str = "" group_name: str
[docs]class PutGroupIn(pdt.BaseModel): """ Model for `POST /group` responses. """ add_members: Optional[List[str]] = None authorized_to_login: Optional[bool] description: Optional[str] = None members: Optional[List[str]] = None owner: Optional[str] = None remove_members: Optional[List[str]] = None
[docs]@router.delete( "/{group_id}", summary="Delete a group", ) def delete_group( group_id: BSONObjectId, tkn: Token = Depends(from_authotization_header_nondyn), ): """ Deletes a group. Requires a clearance level of 9 or more. """ assert_has_clearance(tkn.owner, "sni.delete_group") grp: Group = Group.objects.get(pk=group_id) logging.debug("Deleting group %s (%s)", grp.group_name, group_id) grp.delete()
[docs]@router.get( "", response_model=List[GetGroupShortOut], summary="List all group names", ) def get_group(tkn: Token = Depends(from_authotization_header_nondyn),): """ Lists all the group names. Requires a clearance level of 0 or more. """ assert_has_clearance(tkn.owner, "sni.read_group") return [ GetGroupShortOut(group_id=str(grp.pk), group_name=grp.group_name) for grp in Group.objects().order_by("group_name") ]
[docs]@router.get( "/{group_id}", response_model=GetGroupOut, summary="Get basic informations about a group", ) def get_group_name( group_id: BSONObjectId, tkn: Token = Depends(from_authotization_header_nondyn), ): """ Returns details about a given group. Requires a clearance level of 0 or more. """ assert_has_clearance(tkn.owner, "sni.read_group") return GetGroupOut.from_record(Group.objects(pk=group_id).get())
[docs]@router.post( "", response_model=GetGroupOut, status_code=status.HTTP_201_CREATED, summary="Create a group", ) def post_groups( data: PostGroupIn, tkn: Token = Depends(from_authotization_header_nondyn), ): """ Creates a group. Requires a clearance level of 9 or more. """ assert_has_clearance(tkn.owner, "sni.create_group") grp = Group( description=data.description, members=[tkn.owner], group_name=data.group_name, owner=tkn.owner, ).save() logging.debug( "Created group %s (%s) owned by %s", data.group_name, str(grp.pk), tkn.owner.character_name, ) return GetGroupOut.from_record(grp)
[docs]@router.put( "/{group_id}", response_model=GetGroupOut, summary="Update a group", ) def put_group( group_id: BSONObjectId, data: PutGroupIn, tkn: Token = Depends(from_authotization_header_nondyn), ): """ Updates a group. All fields in the request body are optional. The `add_members` and `remove_members` fields can be used together, but the `members` cannot be used in conjunction with `add_members` and `remove_members`. Requires a clearance level of 9 or more of for the user to be the owner of the group. """ grp: Group = Group.objects.get(pk=group_id) if not ( tkn.owner == grp.owner or has_clearance(tkn.owner, "sni.update_group") ): raise PermissionError logging.debug("Updating group %s (%s)", grp.group_name, group_id) if data.add_members is not None: grp.members += [ User.objects.get(character_name=member_name) for member_name in set(data.add_members) ] if data.authorized_to_login is not None: assert_has_clearance(tkn.owner, "sni.set_authorized_to_login") grp.authorized_to_login = data.authorized_to_login if data.description is not None: grp.description = data.description if data.members is not None: grp.members = [ User.objects.get(character_name=member_name) for member_name in set(data.members) ] if data.owner is not None: grp.owner = User.objects.get(character_name=data.owner) if data.remove_members is not None: grp.members = [ member for member in grp.members if member.character_name not in data.remove_members ] grp.members = list(set(grp.members + [grp.owner])) grp.save() return GetGroupOut.from_record(grp)