Module sanic_discord.oauth.oauth

Oauth2 client for sanic.

Expand source code
"Oauth2 client for sanic."

from sanic import Sanic, Request, HTTPResponse

from typing import List, Optional, Tuple

from functools import wraps
from urllib import parse

from .errors import OauthException, StateError
from .access_token import AccessToken
from .http import HttpClient


class Oauth2:
    """
    The following methods are used to generate the URL to redirect to.

    Args:
        app (Sanic): The Sanic app.
        client_id (int): The client ID.
        client_secret (str): The client secret.
        redirect_uri (str): The redirect URI.

    Attributes:
        app (Sanic): The Sanic app.
        client_id (int): The client ID.
        client_secret (str): The client secret.
        redirect_uri (str): The redirect URI.
        http (HttpClient): The client used to make requests.
    """

    def __init__(
        self, app: Sanic, client_id: int, client_secret: str, redirect_uri: str
    ):
        self.app = app
        self.client_id = client_id
        self.client_secret = client_secret
        self.redirect_uri = redirect_uri
        self.http: HttpClient = HttpClient()
        self.app.ctx.oauth2: Oauth2 = self

    async def close(self) -> None:
        """
        Closes the client.
        """
        await self.http.close()

    def exchange_code(self, state: bool = False):
        """
        Exchanges a code for an access token.

        Args:
            state (bool): If you use state in oauth url, you must do True.

        Raises:
            StateError: state is invalid.
        """
        def decorator(func):
            @wraps(func)
            async def wrapper(request: Request, *args, **kwargs) -> HTTPResponse:
                results = (await self._exchange_code(request, state)) + args
                return await func(request, *results, **kwargs)
            return wrapper
        return decorator

    async def _exchange_code(self, request: Request, state: bool = False) -> Tuple[AccessToken, ...]:
        """
        Exchanges a code for an access token.

        Args:
            request (Request): The request.
            state (bool): If you use state in oauth url, you must do True.

        Raises:
            StateError: state is invalid.
        """
        args = []
        if state:
            if request.args.get("state"):
                args.append(request.args.get("state"))
            else:
                raise StateError("state is required.")
        code = request.args.get("code")
        if code is None:
            raise OauthException("code is invaild.")
        args.insert(0, AccessToken(await self.http.exchange_code(
            code, self.redirect_uri, self.client_id, self.client_secret
        ), self.http))
        return tuple(args)

    async def fetch_user(self, access_token: str) -> dict:
        """
        Fetches the user's profile using an access token.

        Args:
            access_token (str): The access token to use.

        Returns:
            dict: The user's profile.
        """
        return await self.http.fetch_user(access_token)
    
    async def fetch_guilds(self, access_token: str) -> List[dict]:
        """
        Get a guilds
        
        Args:
            access_token (str): The access token
        
        Returns:
            List[dict]
        """
        return await self.http.fetch_guilds(access_token)

    async def refresh_token(self, refresh_token: str) -> AccessToken:
        """
        Refreshes an access token using a refresh token.

        Args:
            refresh_token (str): The refresh token to use.

        Returns:
            AccessToken: The new access token.
        """
        return AccessToken(await self.http.refresh_token(
            refresh_token, self.client_id, self.client_secret
        ), self.http)

    def get_authorize_url(self, scope: List[str] = ["identify"], *, state: Optional[str] = None) -> str:
        """
        Generates a URL to authorize the application.

        Args:
            scope (Optional[List[str]]): The scope to request. Defaults to ["identify"].
            state (Optional[str]): The state to use. Defaults to None.

        Returns:
            str: The URL to authorize the application.
        """
        payload = {
            "client_id": self.client_id,
            "scope": ' '.join(scope),
            "response_type": "code",
            "redirect_uri": self.redirect_uri
        }
        if state is not None:
            payload["state"] = state
        return f"{self.http.BASEURL}/oauth2/authorize?" + parse.urlencode(payload)

Classes

class Oauth2 (app: sanic.app.Sanic, client_id: int, client_secret: str, redirect_uri: str)

The following methods are used to generate the URL to redirect to.

Args

app : Sanic
The Sanic app.
client_id : int
The client ID.
client_secret : str
The client secret.
redirect_uri : str
The redirect URI.

Attributes

app : Sanic
The Sanic app.
client_id : int
The client ID.
client_secret : str
The client secret.
redirect_uri : str
The redirect URI.
http : HttpClient
The client used to make requests.
Expand source code
class Oauth2:
    """
    The following methods are used to generate the URL to redirect to.

    Args:
        app (Sanic): The Sanic app.
        client_id (int): The client ID.
        client_secret (str): The client secret.
        redirect_uri (str): The redirect URI.

    Attributes:
        app (Sanic): The Sanic app.
        client_id (int): The client ID.
        client_secret (str): The client secret.
        redirect_uri (str): The redirect URI.
        http (HttpClient): The client used to make requests.
    """

    def __init__(
        self, app: Sanic, client_id: int, client_secret: str, redirect_uri: str
    ):
        self.app = app
        self.client_id = client_id
        self.client_secret = client_secret
        self.redirect_uri = redirect_uri
        self.http: HttpClient = HttpClient()
        self.app.ctx.oauth2: Oauth2 = self

    async def close(self) -> None:
        """
        Closes the client.
        """
        await self.http.close()

    def exchange_code(self, state: bool = False):
        """
        Exchanges a code for an access token.

        Args:
            state (bool): If you use state in oauth url, you must do True.

        Raises:
            StateError: state is invalid.
        """
        def decorator(func):
            @wraps(func)
            async def wrapper(request: Request, *args, **kwargs) -> HTTPResponse:
                results = (await self._exchange_code(request, state)) + args
                return await func(request, *results, **kwargs)
            return wrapper
        return decorator

    async def _exchange_code(self, request: Request, state: bool = False) -> Tuple[AccessToken, ...]:
        """
        Exchanges a code for an access token.

        Args:
            request (Request): The request.
            state (bool): If you use state in oauth url, you must do True.

        Raises:
            StateError: state is invalid.
        """
        args = []
        if state:
            if request.args.get("state"):
                args.append(request.args.get("state"))
            else:
                raise StateError("state is required.")
        code = request.args.get("code")
        if code is None:
            raise OauthException("code is invaild.")
        args.insert(0, AccessToken(await self.http.exchange_code(
            code, self.redirect_uri, self.client_id, self.client_secret
        ), self.http))
        return tuple(args)

    async def fetch_user(self, access_token: str) -> dict:
        """
        Fetches the user's profile using an access token.

        Args:
            access_token (str): The access token to use.

        Returns:
            dict: The user's profile.
        """
        return await self.http.fetch_user(access_token)
    
    async def fetch_guilds(self, access_token: str) -> List[dict]:
        """
        Get a guilds
        
        Args:
            access_token (str): The access token
        
        Returns:
            List[dict]
        """
        return await self.http.fetch_guilds(access_token)

    async def refresh_token(self, refresh_token: str) -> AccessToken:
        """
        Refreshes an access token using a refresh token.

        Args:
            refresh_token (str): The refresh token to use.

        Returns:
            AccessToken: The new access token.
        """
        return AccessToken(await self.http.refresh_token(
            refresh_token, self.client_id, self.client_secret
        ), self.http)

    def get_authorize_url(self, scope: List[str] = ["identify"], *, state: Optional[str] = None) -> str:
        """
        Generates a URL to authorize the application.

        Args:
            scope (Optional[List[str]]): The scope to request. Defaults to ["identify"].
            state (Optional[str]): The state to use. Defaults to None.

        Returns:
            str: The URL to authorize the application.
        """
        payload = {
            "client_id": self.client_id,
            "scope": ' '.join(scope),
            "response_type": "code",
            "redirect_uri": self.redirect_uri
        }
        if state is not None:
            payload["state"] = state
        return f"{self.http.BASEURL}/oauth2/authorize?" + parse.urlencode(payload)

Methods

async def close(self) ‑> None

Closes the client.

Expand source code
async def close(self) -> None:
    """
    Closes the client.
    """
    await self.http.close()
def exchange_code(self, state: bool = False)

Exchanges a code for an access token.

Args

state : bool
If you use state in oauth url, you must do True.

Raises

StateError
state is invalid.
Expand source code
def exchange_code(self, state: bool = False):
    """
    Exchanges a code for an access token.

    Args:
        state (bool): If you use state in oauth url, you must do True.

    Raises:
        StateError: state is invalid.
    """
    def decorator(func):
        @wraps(func)
        async def wrapper(request: Request, *args, **kwargs) -> HTTPResponse:
            results = (await self._exchange_code(request, state)) + args
            return await func(request, *results, **kwargs)
        return wrapper
    return decorator
async def fetch_guilds(self, access_token: str) ‑> List[dict]

Get a guilds

Args

access_token : str
The access token

Returns

List[dict]

Expand source code
async def fetch_guilds(self, access_token: str) -> List[dict]:
    """
    Get a guilds
    
    Args:
        access_token (str): The access token
    
    Returns:
        List[dict]
    """
    return await self.http.fetch_guilds(access_token)
async def fetch_user(self, access_token: str) ‑> dict

Fetches the user's profile using an access token.

Args

access_token : str
The access token to use.

Returns

dict
The user's profile.
Expand source code
async def fetch_user(self, access_token: str) -> dict:
    """
    Fetches the user's profile using an access token.

    Args:
        access_token (str): The access token to use.

    Returns:
        dict: The user's profile.
    """
    return await self.http.fetch_user(access_token)
def get_authorize_url(self, scope: List[str] = ['identify'], *, state: Optional[str] = None) ‑> str

Generates a URL to authorize the application.

Args

scope : Optional[List[str]]
The scope to request. Defaults to ["identify"].
state : Optional[str]
The state to use. Defaults to None.

Returns

str
The URL to authorize the application.
Expand source code
def get_authorize_url(self, scope: List[str] = ["identify"], *, state: Optional[str] = None) -> str:
    """
    Generates a URL to authorize the application.

    Args:
        scope (Optional[List[str]]): The scope to request. Defaults to ["identify"].
        state (Optional[str]): The state to use. Defaults to None.

    Returns:
        str: The URL to authorize the application.
    """
    payload = {
        "client_id": self.client_id,
        "scope": ' '.join(scope),
        "response_type": "code",
        "redirect_uri": self.redirect_uri
    }
    if state is not None:
        payload["state"] = state
    return f"{self.http.BASEURL}/oauth2/authorize?" + parse.urlencode(payload)
async def refresh_token(self, refresh_token: str) ‑> AccessToken

Refreshes an access token using a refresh token.

Args

refresh_token : str
The refresh token to use.

Returns

AccessToken
The new access token.
Expand source code
async def refresh_token(self, refresh_token: str) -> AccessToken:
    """
    Refreshes an access token using a refresh token.

    Args:
        refresh_token (str): The refresh token to use.

    Returns:
        AccessToken: The new access token.
    """
    return AccessToken(await self.http.refresh_token(
        refresh_token, self.client_id, self.client_secret
    ), self.http)