Skip to content

Utils

Contains helper functions that is used throughout the project that doesn't pertain to a specific file or module.

from aioauth import utils

build_error_response(exc, request, skip_redirect_on_exc=(OAuth2Error,))

Generate an OAuth HTTP response from the given exception

Parameters:

Name Type Description Default
exc Exception

Exception used to generate HTTP response

required
request Request

oauth request object

required
skip_redirect_on_exc Tuple[Type[OAuth2Error], ...]

Exception types to skip redirect on

(OAuth2Error,)

Returns:

Type Description
Response

OAuth HTTP response

Source code in aioauth/utils.py
def build_error_response(
    exc: Exception,
    request: Request,
    skip_redirect_on_exc: Tuple[Type[OAuth2Error], ...] = (OAuth2Error,),
) -> Response:
    """
    Generate an OAuth HTTP response from the given exception

    Args:
        exc: Exception used to generate HTTP response
        request: oauth request object
        skip_redirect_on_exc: Exception types to skip redirect on

    Returns:
        OAuth HTTP response
    """
    error: Union[TemporarilyUnavailableError, ServerError]
    if isinstance(exc, skip_redirect_on_exc):
        content = ErrorResponse(error=exc.error, description=exc.description)
        log.debug("%s %r", exc, request)
        return Response(
            content=asdict(content),
            status_code=exc.status_code,
            headers=exc.headers,
        )
    if isinstance(exc, OAuth2Error):
        log.debug("%s %r", exc, request)
        query: Dict[str, str] = {"error": exc.error}
        if exc.description:
            query["error_description"] = exc.description
        if request.settings.ERROR_URI:
            query["error_uri"] = request.settings.ERROR_URI
        if exc.state:
            query["state"] = exc.state
        location = build_uri(request.query.redirect_uri, query)
        return Response(
            status_code=HTTPStatus.FOUND,
            headers=HTTPHeaderDict({"location": location}),
        )
    error = ServerError(request=request)
    log.exception("Exception caught while processing request.", exc_info=exc)
    content = ErrorResponse(error=error.error, description=error.description)
    return Response(
        content=asdict(content),
        status_code=error.status_code,
        headers=error.headers,
    )

build_uri(url, query_params=None, fragment=None)

Builds an URI string from passed url, query_params, and fragment.

Parameters:

Name Type Description Default
url str

URL string.

required
query_params Optional[Dict]

Paramaters that contain the query.

None
fragment Optional[Dict]

Fragment of the page.

None

Returns:

Type Description
str

URL containing the original url, and the added

str

query_params and fragment.

Source code in aioauth/utils.py
def build_uri(
    url: str, query_params: Optional[Dict] = None, fragment: Optional[Dict] = None
) -> str:
    """
    Builds an URI string from passed `url`, `query_params`, and
    ``fragment``.

    Args:
        url: URL string.
        query_params: Paramaters that contain the query.
        fragment: Fragment of the page.

    Returns:
        URL containing the original `url`, and the added
        `query_params` and `fragment`.
    """
    if query_params is None:
        query_params = {}

    if fragment is None:
        fragment = {}

    parsed_url = urlparse(url)
    parsed_params = {k: v[0] for k, v in parse_qs(parsed_url.query or "").items()}
    query_params = {**parsed_params, **query_params}

    uri = urlunsplit(
        (
            parsed_url.scheme,
            parsed_url.netloc,
            parsed_url.path,
            urlencode(query_params, quote_via=quote),
            urlencode(fragment, quote_via=quote),
        )
    )
    return uri

catch_errors_and_unavailability(skip_redirect_on_exc=(OAuth2Error,))

Decorator that adds error catching to the function passed.

Parameters:

Name Type Description Default
f

A callable.

required

Returns:

Type Description
Callable[..., Callable[..., Coroutine[Any, Any, Response]]]

A callable with error catching capabilities.

Source code in aioauth/utils.py
def catch_errors_and_unavailability(
    skip_redirect_on_exc: Tuple[Type[OAuth2Error], ...] = (OAuth2Error,)
) -> Callable[..., Callable[..., Coroutine[Any, Any, Response]]]:
    """
    Decorator that adds error catching to the function passed.

    Args:
        f: A callable.

    Returns:
        A callable with error catching capabilities.
    """

    def decorator(
        f: Callable[..., Coroutine[Any, Any, Response]]
    ) -> Callable[..., Coroutine[Any, Any, Response]]:
        @functools.wraps(f)
        async def wrapper(self, request: Request, *args, **kwargs) -> Response:
            try:
                response = await f(self, request, *args, **kwargs)
            except Exception as exc:
                response = build_error_response(
                    exc=exc, request=request, skip_redirect_on_exc=skip_redirect_on_exc
                )
            return response

        return wrapper

    return decorator

create_s256_code_challenge(code_verifier)

Create S256 code challenge with the passed code_verifier.

Note

This function implements: base64url(sha256(ascii(code_verifier))).

Args: code_verifier: Code verifier string.

Returns:

Type Description
str

Representation of the S256 code challenge with the passed

str

code_verifier.

Source code in aioauth/utils.py
def create_s256_code_challenge(code_verifier: str) -> str:
    """
    Create S256 code challenge with the passed `code_verifier`.

    Note:
        This function implements: `base64url(sha256(ascii(code_verifier)))`.
    Args:
        code_verifier: Code verifier string.

    Returns:
        Representation of the S256 code challenge with the passed
        `code_verifier`.
    """
    code_verifier_bytes = code_verifier.encode("utf-8")
    data = hashlib.sha256(code_verifier_bytes).digest()
    return base64.urlsafe_b64encode(data).rstrip(b"=").decode()

decode_auth_headers(authorization)

Decodes an encoded HTTP basic authentication string. Returns a tuple of the form (client_id, client_secret), and raises a aioauth.errors.InvalidClientError exception if nothing could be decoded.

Parameters:

Name Type Description Default
authorization str

Authorization header string.

required

Returns:

Type Description
Tuple[str, str]

Tuple of the form (client_id, client_secret).

Raises:

Type Description
ValueError

Invalid authorization header string.

Source code in aioauth/utils.py
def decode_auth_headers(authorization: str) -> Tuple[str, str]:
    """
    Decodes an encoded HTTP basic authentication string.
    Returns a tuple of the form ``(client_id, client_secret)``, and
    raises a `aioauth.errors.InvalidClientError` exception if nothing
    could be decoded.

    Args:
        authorization: Authorization header string.

    Returns:
        Tuple of the form `(client_id, client_secret)`.

    Raises:
        ValueError: Invalid `authorization` header string.
    """
    scheme, param = get_authorization_scheme_param(authorization)
    if not authorization or scheme.lower() != "basic":
        raise ValueError("Invalid authorization header string.")

    try:
        data = b64decode(param).decode("ascii")
    except (ValueError, UnicodeDecodeError, binascii.Error) as exc:
        raise ValueError("Invalid base64 encoding.") from exc

    client_id, separator, client_secret = data.partition(":")

    if not separator:
        raise ValueError("Separator was not provided.")

    return client_id, client_secret

encode_auth_headers(client_id, client_secret)

Encodes the authentication header using base64 encoding.

Parameters:

Name Type Description Default
client_id str

The client's id.

required
client_secret str

The client's secret.

required

Returns:

Type Description
HTTPHeaderDict

A case insensitive dictionary that contains the

HTTPHeaderDict

Authorization header set to basic and the authorization

HTTPHeaderDict

header.

Source code in aioauth/utils.py
def encode_auth_headers(client_id: str, client_secret: str) -> HTTPHeaderDict:
    """
    Encodes the authentication header using base64 encoding.

    Args:
        client_id: The client's id.
        client_secret: The client's secret.

    Returns:
        A case insensitive dictionary that contains the
        `Authorization` header set to `basic` and the authorization
        header.
    """
    authorization = b64encode(f"{client_id}:{client_secret}".encode("ascii"))
    return HTTPHeaderDict(Authorization=f"basic {authorization.decode()}")

enforce_list(scope)

Converts a space separated string to a list of scopes.

Note

If an iterable is passed to this method it will return a list representation of the iterable. Use enforce_str to convert iterables to a scope string.

Parameters:

Name Type Description Default
scope Optional[Union[str, List, Set, Tuple]]

An iterable or string that contains scopes.

required

Returns:

Type Description
List

A list of scopes.

Source code in aioauth/utils.py
def enforce_list(scope: Optional[Union[str, List, Set, Tuple]]) -> List:
    """
    Converts a space separated string to a list of scopes.

    Note:
        If an iterable is passed to this method it will return a list
        representation of the iterable. Use `enforce_str` to
        convert iterables to a scope string.

    Args:
        scope: An iterable or string that contains scopes.

    Returns:
        A list of scopes.
    """
    if isinstance(scope, (tuple, list, set)):
        return [str(s) for s in scope]
    elif scope is None:
        return []
    else:
        return scope.strip().split(" ")

enforce_str(scope)

Converts a list of scopes to a space separated string.

Note

If a string is passed to this method it will simply return an empty string back. Use enforce_list to convert strings to scope lists.

Parameters:

Name Type Description Default
scope List

An iterable or string that contains a list of scope.

required

Returns:

Type Description
str

A string of scopes seperated by spaces.

Raises:

Type Description
TypeError

The scope value passed is not of the proper type.

Source code in aioauth/utils.py
def enforce_str(scope: List) -> str:
    """
    Converts a list of scopes to a space separated string.

    Note:
        If a string is passed to this method it will simply return an
        empty string back. Use `enforce_list` to convert
        strings to scope lists.

    Args:
        scope: An iterable or string that contains a list of scope.

    Returns:
        A string of scopes seperated by spaces.

    Raises:
        TypeError: The `scope` value passed is not of the proper type.
    """
    if isinstance(scope, (set, tuple, list)):
        return " ".join([str(s) for s in scope])

    return ""

generate_token(length=30, chars=UNICODE_ASCII_CHARACTER_SET)

Generates a non-guessable OAuth token. OAuth (1 and 2) does not specify the format of tokens except that they should be strings of random characters. Tokens should not be guessable and entropy when generating the random characters is important. Which is why SystemRandom is used instead of the default random.choice method.

Parameters:

Name Type Description Default
length int

Length of the generated token.

30
chars str

The characters to use to generate the string.

UNICODE_ASCII_CHARACTER_SET

Returns:

Type Description
str

Random string of length length and characters in chars.

Source code in aioauth/utils.py
def generate_token(length: int = 30, chars: str = UNICODE_ASCII_CHARACTER_SET) -> str:
    """Generates a non-guessable OAuth token.
    OAuth (1 and 2) does not specify the format of tokens except that
    they should be strings of random characters. Tokens should not be
    guessable and entropy when generating the random characters is
    important. Which is why SystemRandom is used instead of the default
    random.choice method.

    Args:
        length: Length of the generated token.
        chars: The characters to use to generate the string.

    Returns:
        Random string of length `length` and characters in `chars`.
    """
    rand = random.SystemRandom()
    return "".join(rand.choice(chars) for _ in range(length))

get_authorization_scheme_param(authorization_header_value)

Retrieves the authorization schema parameters from the authorization header.

Parameters:

Name Type Description Default
authorization_header_value str

Value of the authorization header.

required

Returns:

Type Description
Tuple[str, str]

Tuple of the format (scheme, param).

Source code in aioauth/utils.py
def get_authorization_scheme_param(
    authorization_header_value: str,
) -> Tuple[str, str]:
    """
    Retrieves the authorization schema parameters from the authorization
    header.

    Args:
        authorization_header_value: Value of the authorization header.

    Returns:
        Tuple of the format `(scheme, param)`.
    """
    if not authorization_header_value:
        return "", ""
    scheme, _, param = authorization_header_value.partition(" ")
    return scheme, param