Source code for sni.api.routers.callback

"""
Callback paths
"""

import logging
from typing import List, Sequence

from fastapi import APIRouter, status
from fastapi.responses import (
    HTMLResponse,
    PlainTextResponse,
    RedirectResponse,
)
import requests

from sni.user.models import User

from sni.esi.sso import (
    decode_access_token,
    EsiTokenError,
    get_access_token_from_callback_code,
)
from sni.esi.scope import EsiScope
from sni.esi.token import save_esi_tokens, token_has_enough_scopes
from sni.uac.token import create_user_token, StateCode, to_jwt
from sni.uac.uac import is_authorized_to_login

router = APIRouter()


[docs]def token_manipulation_error_response() -> HTMLResponse: """ Returns an HTML response explaing that the instance ran in an ESI token manipulation error. """ return HTMLResponse( content="""<h1>ESI token manipulation error</h1> <p>Sorry, something went wrong while manipulating your ESI token. It could be a temporary outage of the EVE API, or a bug in SeAT Navy Issue.</p> <p>If the problem persists, please contact the instance administrator.</p>""", status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, )
[docs]def not_authorized_to_login_response( character_name: str, character_id: int ) -> HTMLResponse: """ Returns an HTML response explaing that the user is not allowed to login on this instance. """ return HTMLResponse( content=f"""<h1>Not authorized to login</h1> <p>Character <strong>{character_name}</strong> ({character_id}) is not allowed to login. This is due to the fact that your character and/or corporation and/or alliance is not whitelisted.</p> <p>If your alliance recently joined a coalition registered on this instance, ask the coalition managed to add your alliance, and try again.</p> <p>If the problem persists, please contact the instance administrator.</p> """, status_code=status.HTTP_401_UNAUTHORIZED, )
[docs]def string_list_to_html(string_list: Sequence[str]) -> str: """ Converts a string list to an HTML bullet list. """ if not string_list: return "<p>None</p>" item_list = [f"<li><code>{item}</code></li>" for item in string_list] return "<ul>" + "".join(item_list) + "</ul>"
[docs]def not_enough_scopes_response( character_name: str, character_id: int, required_scopes: List[EsiScope], provided_scopes: List[EsiScope], ) -> HTMLResponse: """ Returns an HTML response explaing that the user did not provide enough ESI scopes. """ return HTMLResponse( content=f"""<h1>Not enough ESI scopes</h1> <p>The ESI token does not provide enough scopes for character <strong>{character_name}</strong> ({character_id}):</p> <h3>Required</h3> {string_list_to_html(required_scopes)} <h3>Provided</h3> {string_list_to_html(provided_scopes)} """, status_code=status.HTTP_401_UNAUTHORIZED, )
[docs]@router.get( "/esi", summary="ESI callback", ) async def get_callback_esi(code: str, state: str): """ ESI callback. You should not manually call this. Upon receiving a notification from EVE SSO, SNI redirects the client to the appropriate frontend callback, with the state code and user token in URL parameters. Reference: `OAuth 2.0 for Web Based Applications <https://docs.esi.evetech.net/docs/sso/web_based_sso_flow.html>`_ """ logging.info("Received callback from ESI for state %s", state) state_code: StateCode = StateCode.objects.get(uuid=state) state_code.delete() try: esi_response = get_access_token_from_callback_code(code) access_token = decode_access_token(esi_response.access_token) save_esi_tokens(esi_response) except EsiTokenError: return token_manipulation_error_response() usr: User = User.objects.get(character_id=access_token.character_id) if state_code.inviting_corporation is not None: usr.corporation = state_code.inviting_corporation usr.clearance_level = -1 usr.save() if not is_authorized_to_login(usr): return not_authorized_to_login_response( usr.character_name, usr.character_id ) if not token_has_enough_scopes(access_token, usr): return not_enough_scopes_response( usr.character_name, usr.character_id, list(usr.cumulated_mandatory_esi_scopes()), access_token.scp, ) user_token = create_user_token(state_code.app_token, usr) user_jwt_str = to_jwt(user_token) logging.info( "Issuing token %s to app %s", user_jwt_str, state_code.app_token.uuid ) if state_code.app_token.callback is None: return { "created_on": user_token.created_on, "expires_on": user_token.expires_on, "jwt": user_jwt_str, "owner": { "character_id": user_token.owner.character_id, "character_name": user_token.owner.character_name, "clearance_level": user_token.owner.clearance_level, }, } request = requests.Request( "GET", state_code.app_token.callback, params={ "state_code": str(state_code.uuid), "user_token": user_jwt_str, }, ) url = request.prepare().url if url is None: logging.error( "Failed to redirect user to %s", state_code.app_token.callback, ) return PlainTextResponse( f"Failed to redirect to {state_code.app_token.callback}" ) return RedirectResponse(url=url)