Source code for sni.uac.token
"""
Token management
"""
import logging
from typing import Optional
from uuid import uuid4
import fastapi
import jwt
import jwt.exceptions as jwt_exceptions
from sni.user.models import Corporation, User
from sni.conf import CONFIGURATION as conf
import sni.utils as utils
from .models import StateCode, Token
[docs]def create_dynamic_app_token(
owner: User,
*,
callback: Optional[str] = None,
comments: Optional[str] = None,
parent: Optional[Token] = None,
) -> Token:
"""
Creates a new dynamic app token for that user.
Warning: Does not check that the user is allowed to actually do that.
"""
new_token = Token(
callback=callback,
comments=comments,
created_on=utils.now(),
owner=owner,
parent=parent,
token_type=Token.TokenType.dyn,
uuid=str(uuid4()),
)
new_token.save()
logging.info("Created dynamic app token %s", new_token.uuid)
return new_token
[docs]def create_permanent_app_token(
owner: User,
*,
callback: Optional[str] = None,
comments: Optional[str] = None,
parent: Optional[Token] = None,
) -> Token:
"""
Creates a new permanent app token for that user.
Warning: Does not check that the user is allowed to actually do that.
"""
new_token = Token(
callback=callback,
comments=comments,
created_on=utils.now(),
owner=owner,
parent=parent,
token_type=Token.TokenType.per,
uuid=str(uuid4()),
)
new_token.save()
logging.info("Created permanent app token %s", new_token.uuid)
return new_token
[docs]def create_state_code(
app_token: Optional[Token],
*,
inviting_corporation: Optional[Corporation] = None,
code_prefix: Optional[str] = None,
) -> StateCode:
"""
Creates a new state code.
See also:
:class:`sni.uac.token.StateCode`
"""
code = str(uuid4())
if code_prefix is not None:
code = code_prefix + ":" + code
state_code = StateCode(
app_token=app_token,
created_on=utils.now(),
inviting_corporation=inviting_corporation,
uuid=code,
)
state_code.save()
logging.info(
"Created state code %s deriving from app token %s",
state_code.uuid,
app_token.uuid if app_token is not None else "N/A",
)
return state_code
[docs]def create_user_token(app_token: Token, owner: User) -> Token:
"""
Derives a new user token from an existing app token, and set the owner to
be the user given in argument.
"""
if app_token.token_type == Token.TokenType.dyn:
new_token = Token( # nosec
created_on=utils.now(),
expires_on=utils.now_plus(days=1),
owner=owner,
parent=app_token,
token_type="use",
uuid=str(uuid4()),
)
elif app_token.token_type == Token.TokenType.per:
new_token = Token( # nosec
created_on=utils.now(),
owner=owner,
parent=app_token,
token_type="use",
uuid=str(uuid4()),
)
else:
raise ValueError("Expected an app token")
new_token.save()
logging.info(
"Created user token %s for app %s", new_token.uuid, app_token.uuid
)
return new_token
# pylint: disable=too-many-branches
[docs]def from_authotization_header(
authorization: str = fastapi.Header(None),
) -> Token:
"""
Validates an ``Authorization: Bearer`` header and returns a
:class:`sni.uac.token.Token`. If the token string is invalid, raises a
``401``. Should be used as a FastAPI dependency.
"""
bearer = "Bearer "
if not authorization or not authorization.startswith(bearer):
raise fastapi.HTTPException(
fastapi.status.HTTP_401_UNAUTHORIZED,
headers={"WWW-Authenticate": "Bearer"},
)
try:
token = get_token_from_jwt(authorization[len(bearer) :])
logging.debug("Successfully validated token %s", token.uuid)
return token
except jwt_exceptions.InvalidSignatureError:
error_msg = "Failed to validate token: invalid signature"
except jwt_exceptions.ExpiredSignatureError:
error_msg = "Failed to validate token: expired signature"
except jwt_exceptions.InvalidAudienceError:
error_msg = "Failed to validate token: invalid audience"
except jwt_exceptions.InvalidIssuerError:
error_msg = "Failed to validate token: invalid issuer"
except jwt_exceptions.InvalidIssuedAtError:
error_msg = "Failed to validate token: invalid issuance date"
except jwt_exceptions.ImmatureSignatureError:
error_msg = "Failed to validate token: immature signature"
except jwt_exceptions.InvalidKeyError:
error_msg = "Failed to validate token: invalid key"
except jwt_exceptions.InvalidAlgorithmError:
error_msg = "Failed to validate token: invalid algorithm"
except jwt_exceptions.MissingRequiredClaimError:
error_msg = "Failed to validate token: missing claim"
except jwt_exceptions.DecodeError:
error_msg = "Failed to validate token: token could not be decoded"
except jwt_exceptions.InvalidTokenError:
error_msg = "Failed to validate token: token is invalid"
except KeyError:
error_msg = "Failed to validate token: payload is invalid"
logging.error(error_msg)
raise fastapi.HTTPException(
fastapi.status.HTTP_401_UNAUTHORIZED,
detail=error_msg,
headers={"WWW-Authenticate": "Bearer"},
)
[docs]def from_authotization_header_nondyn(
tkn: Token = fastapi.Depends(from_authotization_header),
) -> Token:
"""
Validates an ``Authorization: Bearer`` header and returns a
:class:`sni.uac.token.Token`. If the token string is invalid, or if the
token is dynamic, raises a ``401``. Should be used as a FastAPI dependency.
"""
if tkn.token_type == Token.TokenType.dyn:
raise fastapi.HTTPException(
fastapi.status.HTTP_401_UNAUTHORIZED,
detail="Cannot use a dynamic app token for this path.",
)
return tkn
[docs]def get_token_from_jwt(token_str: str) -> Token:
"""
Retrieves a token from its JWT string.
"""
payload = jwt.decode(
token_str,
conf.jwt.secret.get_secret_value(),
algorithm=conf.jwt.algorithm,
issuer=conf.general.root_url,
verify_exp=True,
verify_iss=True,
verify=True,
)
token_uuid = payload["jti"]
return Token.objects(uuid=token_uuid).first()
[docs]def to_jwt(model: Token) -> str:
"""
Derives a JWT token byte array from the a token model.
"""
payload = {
"iat": int(model.created_on.timestamp()),
"iss": conf.general.root_url,
"jti": str(model.uuid),
"nbf": int(model.created_on.timestamp()),
"own": model.owner.character_id,
"typ": model.token_type,
}
if model.expires_on:
payload["exp"] = int(model.expires_on.timestamp())
return jwt.encode(
payload,
conf.jwt.secret.get_secret_value(),
algorithm=conf.jwt.algorithm,
).decode()