Source code for aioauth.server

"""
.. code-block:: python

    from aioauth import server

Memory object and interface used to initialize an OAuth2.0 server instance.

Warning:
    Note that :py:class:`aioauth.server.AuthorizationServer` is not
    depedent on any server framework, nor serves at any specific
    endpoint. Instead, it is used to create an interface that can be
    used in conjunction with a server framework like ``FastAPI`` or
    ``aiohttp`` to create a fully functional OAuth 2.0 server.
    Check out the *Examples* portion of the documentation to understand
    how it can be leveraged in your own project.

----
"""
import sys
from dataclasses import asdict
from http import HTTPStatus
from typing import Any, Dict, Generic, List, Optional, Tuple, Type, Union


if sys.version_info >= (3, 8):
    from typing import get_args
else:
    from typing_extensions import get_args

from .collections import HTTPHeaderDict
from .constances import default_headers
from .errors import (
    InsecureTransportError,
    InvalidClientError,
    InvalidRedirectURIError,
    InvalidRequestError,
    MethodNotAllowedError,
    TemporarilyUnavailableError,
    UnsupportedGrantTypeError,
    UnsupportedResponseTypeError,
    UnsupportedTokenTypeError,
)
from .grant_type import (
    AuthorizationCodeGrantType,
    ClientCredentialsGrantType,
    GrantTypeBase,
    PasswordGrantType,
    RefreshTokenGrantType,
)
from .requests import TRequest
from .response_type import (
    ResponseTypeAuthorizationCode,
    ResponseTypeIdToken,
    ResponseTypeNone,
    ResponseTypeToken,
)
from .responses import (
    Response,
    TokenActiveIntrospectionResponse,
    TokenInactiveIntrospectionResponse,
)
from .storage import TStorage
from .types import (
    GrantType,
    RequestMethod,
    ResponseType,
    TokenType,
)
from .utils import (
    build_uri,
    catch_errors_and_unavailability,
    decode_auth_headers,
    enforce_list,
)


[docs]class AuthorizationServer(Generic[TRequest, TStorage]): """Interface for initializing an OAuth 2.0 server.""" response_types: Dict[ResponseType, Any] = { "token": ResponseTypeToken[TRequest, TStorage], "code": ResponseTypeAuthorizationCode[TRequest, TStorage], "none": ResponseTypeNone[TRequest, TStorage], "id_token": ResponseTypeIdToken[TRequest, TStorage], } grant_types: Dict[GrantType, Any] = { "authorization_code": AuthorizationCodeGrantType[TRequest, TStorage], "client_credentials": ClientCredentialsGrantType[TRequest, TStorage], "password": PasswordGrantType[TRequest, TStorage], "refresh_token": RefreshTokenGrantType[TRequest, TStorage], } def __init__( self, storage: TStorage, response_types: Optional[Dict] = None, grant_types: Optional[Dict] = None, ): self.storage = storage if response_types is not None: self.response_types = response_types if grant_types is not None: self.grant_types = grant_types
[docs] def is_secure_transport(self, request: TRequest) -> bool: """ Verifies the request was sent via a protected SSL tunnel. Note: This method simply checks if the request URL contains ``https://`` at the start of it. It does **not** ensure if the SSL certificate is valid. Args: request: :py:class:`aioauth.requests.Request` object. Returns: Flag representing whether or not the transport is secure. """ if request.settings.INSECURE_TRANSPORT: return True return request.url.lower().startswith("https://")
[docs] def validate_request(self, request: TRequest, allowed_methods: List[RequestMethod]): if not request.settings.AVAILABLE: raise TemporarilyUnavailableError[TRequest](request=request) if not self.is_secure_transport(request): raise InsecureTransportError[TRequest](request=request) if request.method not in allowed_methods: headers = HTTPHeaderDict( {**default_headers, "allow": ", ".join(allowed_methods)} ) raise MethodNotAllowedError[TRequest](request=request, headers=headers)
[docs] @catch_errors_and_unavailability() async def create_token_introspection_response(self, request: TRequest) -> Response: """ Returns a response object with introspection of the passed token. For more information see `RFC7662 section 2.1 <https://tools.ietf.org/html/rfc7662#section-2.1>`_. Note: The API endpoint that leverages this function is usually ``/introspect``. Example: Below is an example utilizing FastAPI as the server framework. .. code-block:: python from aioauth_fastapi.utils import to_oauth2_request, to_fastapi_response @app.get("/token/introspect") async def introspect(request: fastapi.Request) -> fastapi.Response: # Converts a fastapi.Request to an aioauth.Request. oauth2_request: aioauth.Request = await to_oauth2_request(request) # Creates the response via this function call. oauth2_response: aioauth.Response = await server.create_token_introspection_response(oauth2_request) # Converts an aioauth.Response to a fastapi.Response. response: fastapi.Response = await to_fastapi_response(oauth2_response) return response Args: request: An :py:class:`aioauth.requests.Request` object. Returns: response: An :py:class:`aioauth.responses.Response` object. """ self.validate_request(request, ["POST"]) client_id, _ = self.get_client_credentials(request) token_types: Tuple[TokenType, ...] = get_args(TokenType) token_type: TokenType = "refresh_token" access_token = None refresh_token = request.post.token if request.post.token_type_hint in token_types: token_type = request.post.token_type_hint if token_type == "access_token": access_token = request.post.token refresh_token = None token = await self.storage.get_token( request=request, client_id=client_id, access_token=access_token, refresh_token=refresh_token, token_type=token_type, ) token_response: Union[ TokenInactiveIntrospectionResponse, TokenActiveIntrospectionResponse ] if token and not token.is_expired and not token.revoked: token_response = TokenActiveIntrospectionResponse( scope=token.scope, client_id=token.client_id, expires_in=token.expires_in, token_type=token.token_type, ) else: token_response = TokenInactiveIntrospectionResponse() content = asdict(token_response) return Response( content=content, status_code=HTTPStatus.OK, headers=default_headers )
[docs] def get_client_credentials(self, request: TRequest) -> Tuple[str, str]: client_id = request.post.client_id client_secret = request.post.client_secret if client_id is None or client_secret is None: authorization = request.headers.get("Authorization", "") # Get client credentials from the Authorization header. try: client_id, client_secret = decode_auth_headers(authorization) except ValueError as exc: raise InvalidClientError[TRequest]( description="Invalid client_id parameter value.", request=request, ) from exc return client_id, client_secret
[docs] @catch_errors_and_unavailability() async def create_token_response(self, request: TRequest) -> Response: """Endpoint to obtain an access and/or ID token by presenting an authorization grant or refresh token. Validates a token request and creates a token response. For more information see `RFC6749 section 4.1.3 <https://tools.ietf.org/html/rfc6749#section-4.1.3>`_. Note: The API endpoint that leverages this function is usually ``/token``. Example: Below is an example utilizing FastAPI as the server framework. .. code-block:: python from aioauth_fastapi.utils import to_oauth2_request, to_fastapi_response @app.post("/token") async def token(request: fastapi.Request) -> fastapi.Response: # Converts a fastapi.Request to an aioauth.Request. oauth2_request: aioauth.Request = await to_oauth2_request(request) # Creates the response via this function call. oauth2_response: aioauth.Response = await server.create_token_response(oauth2_request) # Converts an aioauth.Response to a fastapi.Response. response: fastapi.Response = await to_fastapi_response(oauth2_response) return response Args: request: An :py:class:`aioauth.requests.Request` object. Returns: response: An :py:class:`aioauth.responses.Response` object. """ self.validate_request(request, ["POST"]) client_secret: Optional[str] = None if request.post.grant_type == "client_credentials": # client_secret is required for the client_credentials grant type # https://www.oauth.com/oauth2-servers/access-tokens/client-credentials/ client_id, client_secret = self.get_client_credentials(request) else: # for other grant types, client_secret is required if the client has one: # If the client type is confidential or the client was issued client credentials # (or assigned other authentication requirements), the client MUST authenticate # with the authorization server as described in Section 3.2.1. # https://www.rfc-editor.org/rfc/rfc6749#section-4.1.3 try: client_id, client_secret = self.get_client_credentials(request) except InvalidClientError as exc: # When InvalidClientError is raised here it probably means that # client_secret could not be found and the basic auth header # had no useful data. client_secret is optional for the password # grant type, so make sure we have a client_id and try to proceed. client_id = request.post.client_id # client_secret must not be None. When client_secret is None, # storage.get_client will not run standard checks on the client_secret client_secret = request.post.client_secret or "" if not client_id: raise exc if not request.post.grant_type: # grant_type request value is empty raise InvalidRequestError[TRequest]( request=request, description="Request is missing grant type." ) GrantTypeClass: Type[ Union[ GrantTypeBase[TRequest, TStorage], AuthorizationCodeGrantType[TRequest, TStorage], PasswordGrantType[TRequest, TStorage], RefreshTokenGrantType[TRequest, TStorage], ClientCredentialsGrantType[TRequest, TStorage], ] ] try: GrantTypeClass = self.grant_types[request.post.grant_type] except KeyError as exc: # grant_type request value is invalid raise UnsupportedGrantTypeError[TRequest](request=request) from exc grant_type = GrantTypeClass( storage=self.storage, client_id=client_id, client_secret=client_secret ) client = await grant_type.validate_request(request) response = await grant_type.create_token_response(request, client) content = asdict(response) return Response( content=content, status_code=HTTPStatus.OK, headers=default_headers )
[docs] @catch_errors_and_unavailability( skip_redirect_on_exc=( MethodNotAllowedError, InvalidClientError, InvalidRedirectURIError, ) ) async def create_authorization_response(self, request: TRequest) -> Response: """ Endpoint to interact with the resource owner and obtain an authorization grant. Validate authorization request and create authorization response. For more information see `RFC6749 section 4.1.1 <https://tools.ietf.org/html/rfc6749#section-4.1.1>`_. Note: The API endpoint that leverages this function is usually ``/authorize``. Example: Below is an example utilizing FastAPI as the server framework. .. code-block:: python from aioauth.fastapi.utils import to_oauth2_request, to_fastapi_response @app.post("/authorize") async def authorize(request: fastapi.Request) -> fastapi.Response: # Converts a fastapi.Request to an aioauth.Request. oauth2_request: aioauth.Request = await to_oauth2_request(request) # Creates the response via this function call. oauth2_response: aioauth.Response = await server.create_authorization_response(oauth2_request) # Converts an aioauth.Response to a fastapi.Response. response: fastapi.Response = await to_fastapi_response(oauth2_response) return response Args: request: An :py:class:`aioauth.requests.Request` object. Returns: response: An :py:class:`aioauth.responses.Response` object. """ self.validate_request(request, ["GET", "POST"]) response_type_list = enforce_list(request.query.response_type) response_type_classes = set() # Combined responses responses = {} # URI fragment fragment = {} # URI query params query = {} # Response content content = {} if not response_type_list: raise InvalidRequestError[TRequest]( request=request, description="Missing response_type parameter." ) if request.query.state: responses["state"] = request.query.state for response_type in response_type_list: ResponseTypeClass = self.response_types.get(response_type) if ResponseTypeClass: response_type_classes.add(ResponseTypeClass) if not response_type_classes: raise UnsupportedResponseTypeError(request=request) for ResponseTypeClass in response_type_classes: response_type = ResponseTypeClass(storage=self.storage) client = await response_type.validate_request(request) response = await response_type.create_authorization_response( request, client ) responses.update(asdict(response)) # See: https://openid.net/specs/oauth-v2-multiple-response-types-1_0.html#Combinations if "code" in response_type_list: """ The TYPE_CODE has lowest priority. The response will be placed in query. """ query = responses if "token" in response_type_list: """ The TYPE_TOKEN has middle priority. The response will be placed in fragment. """ query = {} fragment = responses if "id_token" in response_type_list: """ The TYPE_ID_TOKEN has highest priority. The response can be placed in query, fragment or content depending on the response_mode. """ if request.query.response_mode == "form_post": query = {} fragment = {} content = responses elif request.query.response_mode == "fragment": query = {} content = {} fragment = responses elif request.query.response_mode == "query": content = {} fragment = {} query = responses location = build_uri(request.query.redirect_uri, query, fragment) return Response( status_code=HTTPStatus.FOUND, headers=HTTPHeaderDict({"location": location}), content=content, )
[docs] @catch_errors_and_unavailability() async def revoke_token(self, request: TRequest) -> Response: """Endpoint to revoke an access token or refresh token. For more information see `RFC7009 <https://tools.ietf.org/html/rfc7009>`_. Note: The API endpoint that leverages this function is usually ``/revoke``. Example: Below is an example utilizing FastAPI as the server framework. .. code-block:: python from aioauth_fastapi.utils import to_oauth2_request, to_fastapi_response @app.post("/revoke") async def revoke(request: fastapi.Request) -> fastapi.Response: # Converts a fastapi.Request to an aioauth.Request. oauth2_request: aioauth.Request = await to_oauth2_request(request) # Creates the response via this function call. oauth2_response: aioauth.Response = await server.revoke_token(oauth2_request) # Converts an aioauth.Response to a fastapi.Response. response: fastapi.Response = await to_fastapi_response(oauth2_response) return response Args: request: An :py:class:`aioauth.requests.Request` object. Returns: response: An :py:class:`aioauth.responses.Response` object. """ self.validate_request(request, ["POST"]) client_id, _ = self.get_client_credentials(request) if not request.post.token: raise InvalidRequestError[TRequest]( request=request, description="Request is missing token." ) if request.post.token_type_hint and request.post.token_type_hint not in { "refresh_token", "access_token", }: raise UnsupportedTokenTypeError[TRequest](request=request) access_token = ( request.post.token if request.post.token_type_hint != "refresh_token" else None ) refresh_token = ( request.post.token if request.post.token_type_hint != "access_token" else None ) token = await self.storage.get_token( request=request, client_id=client_id, access_token=access_token, refresh_token=refresh_token, token_type=request.post.token_type_hint, ) if token: await self.storage.revoke_token( request=request, access_token=access_token, refresh_token=refresh_token, token_type=request.post.token_type_hint, ) return Response(status_code=HTTPStatus.NO_CONTENT)