Source code for aioauth.grant_type

"""
.. code-block:: python

    from aioauth import grant_type

Different OAuth 2.0 grant types.

----
"""
from typing import Generic, Optional
from .errors import (
    InvalidClientError,
    InvalidGrantError,
    InvalidRedirectURIError,
    InvalidRequestError,
    InvalidScopeError,
    MismatchingStateError,
    UnauthorizedClientError,
)
from .models import Client
from .requests import TRequest
from .responses import TokenResponse
from .storage import TStorage
from .utils import enforce_list, enforce_str, generate_token


[docs]class GrantTypeBase(Generic[TRequest, TStorage]): """Base grant type that all other grant types inherit from.""" def __init__(self, storage: TStorage, client_id: str, client_secret: Optional[str]): self.storage = storage self.client_id = client_id self.client_secret = client_secret self.scope: Optional[str] = None
[docs] async def create_token_response( self, request: TRequest, client: Client ) -> TokenResponse: """Creates token response to reply to client.""" if self.scope is None: raise RuntimeError("validate_request() must be called first") token = await self.storage.create_token( request, client.client_id, self.scope, generate_token(42), generate_token(48), ) return TokenResponse( expires_in=token.expires_in, refresh_token_expires_in=token.refresh_token_expires_in, access_token=token.access_token, refresh_token=token.refresh_token, scope=token.scope, token_type=token.token_type, )
[docs] async def validate_request(self, request: TRequest) -> Client: """Validates the client request to ensure it is valid.""" client = await self.storage.get_client( request, client_id=self.client_id, client_secret=self.client_secret ) if not client: raise InvalidClientError[TRequest]( request=request, description="Invalid client_id parameter value." ) if not client.check_grant_type(request.post.grant_type): raise UnauthorizedClientError[TRequest](request=request) if not client.check_scope(request.post.scope): raise InvalidScopeError[TRequest](request=request) self.scope = request.post.scope return client
[docs]class AuthorizationCodeGrantType(GrantTypeBase[TRequest, TStorage]): """ The Authorization Code grant type is used by confidential and public clients to exchange an authorization code for an access token. After the user returns to the client via the redirect URL, the application will get the authorization code from the URL and use it to request an access token. It is recommended that all clients use `RFC 7636 <https://tools.ietf.org/html/rfc7636>`_ Proof Key for Code Exchange extension with this flow as well to provide better security. Note: Note that ``aioauth`` implements RFC 7636 out-of-the-box. See `RFC 6749 section 1.3.1 <https://tools.ietf.org/html/rfc6749#section-1.3.1>`_. """
[docs] async def validate_request(self, request: TRequest) -> Client: client = await super().validate_request(request) if not request.post.redirect_uri: raise InvalidRedirectURIError[TRequest]( request=request, description="Mismatching redirect URI." ) if not client.check_redirect_uri(request.post.redirect_uri): raise InvalidRedirectURIError[TRequest]( request=request, description="Invalid redirect URI." ) if not request.post.code: raise InvalidRequestError[TRequest]( request=request, description="Missing code parameter." ) authorization_code = await self.storage.get_authorization_code( request, client.client_id, request.post.code ) if not authorization_code: raise InvalidGrantError[TRequest](request=request) if ( authorization_code.code_challenge and authorization_code.code_challenge_method ): if not request.post.code_verifier: raise InvalidRequestError[TRequest]( request=request, description="Code verifier required." ) is_valid_code_challenge = authorization_code.check_code_challenge( request.post.code_verifier ) if not is_valid_code_challenge: raise MismatchingStateError[TRequest](request=request) if authorization_code.is_expired: raise InvalidGrantError[TRequest](request=request) self.scope = authorization_code.scope return client
[docs] async def create_token_response( self, request: TRequest, client: Client ) -> TokenResponse: token_response = await super().create_token_response(request, client) await self.storage.delete_authorization_code( request, client.client_id, request.post.code, ) return token_response
[docs]class PasswordGrantType(GrantTypeBase[TRequest, TStorage]): """ The Password grant type is a way to exchange a user's credentials for an access token. Because the client application has to collect the user's password and send it to the authorization server, it is not recommended that this grant be used at all anymore. See `RFC 6749 section 1.3.3 <https://tools.ietf.org/html/rfc6749#section-1.3.3>`_. The latest `OAuth 2.0 Security Best Current Practice <https://tools.ietf.org/html/draft-ietf-oauth-security-topics-13#section-3.4>`_ disallows the password grant entirely. """
[docs] async def validate_request(self, request: TRequest) -> Client: client = await super().validate_request(request) if not request.post.username or not request.post.password: raise InvalidRequestError[TRequest]( request=request, description="Invalid credentials given." ) user = await self.storage.authenticate(request) if not user: raise InvalidRequestError[TRequest]( request=request, description="Invalid credentials given." ) return client
[docs]class RefreshTokenGrantType(GrantTypeBase[TRequest, TStorage]): """ The Refresh Token grant type is used by clients to exchange a refresh token for an access token when the access token has expired. This allows clients to continue to have a valid access token without further interaction with the user. See `RFC 6749 section 1.5 <https://tools.ietf.org/html/rfc6749#section-1.5>`_. """
[docs] async def create_token_response( self, request: TRequest, client: Client ) -> TokenResponse: """Validate token request and create token response.""" old_token = await self.storage.get_token( request=request, client_id=client.client_id, refresh_token=request.post.refresh_token, ) if not old_token or old_token.revoked or old_token.refresh_token_expired: raise InvalidGrantError[TRequest](request=request) # Revoke old token await self.storage.revoke_token( request=request, refresh_token=old_token.refresh_token ) # new token should have at max the same scope as the old token # (see https://www.oauth.com/oauth2-servers/making-authenticated-requests/refreshing-an-access-token/) new_scope = old_token.scope if request.post.scope: # restrict requested tokens to requested scopes in the old token new_scope = enforce_str( list( set(enforce_list(old_token.scope)) & set(enforce_list(request.post.scope)) ) ) token = await self.storage.create_token( request, client.client_id, new_scope, generate_token(42), generate_token(48) ) return TokenResponse( expires_in=token.expires_in, refresh_token_expires_in=token.refresh_token_expires_in, access_token=token.access_token, refresh_token=token.refresh_token, scope=token.scope, token_type=token.token_type, )
[docs] async def validate_request(self, request: TRequest) -> Client: client = await super().validate_request(request) if not request.post.refresh_token: raise InvalidRequestError[TRequest]( request=request, description="Missing refresh token parameter." ) return client
[docs]class ClientCredentialsGrantType(GrantTypeBase[TRequest, TStorage]): """ The Client Credentials grant type is used by clients to obtain an access token outside of the context of a user. This is typically used by clients to access resources about themselves rather than to access a user's resources. See `RFC 6749 section 4.4 <https://tools.ietf.org/html/rfc6749#section-4.4>`_. """
[docs] async def validate_request(self, request: TRequest) -> Client: # client_credentials grant requires a client_secret if self.client_secret is None: raise InvalidClientError[TRequest](request) return await super().validate_request(request)