Package sanic_discord

sanic-discord

Discord

Deploy document Downloads Downloads Downloads

Install

pip install sanic-discord

or

pip install "sanic-discord @ git+https://github.com/tuna2134/sanic-discord.git@rewrite"
Expand source code
"""
.. include:: ../README.md
"""
from .oauth import *
from .interaction import *

__version__ = "2.0.2a"
__author__ = "tuna2134"
__all__ = (
    "Oauth2",
    "OauthException",
    "HttpException",
    "NotFoundException",
    "AccessToken",
    "AccessTokenType",
    "StateError",
    "exchange_code",

    "InteractionClient"
)

Sub-modules

sanic_discord.interaction
sanic_discord.oauth

sanic-discord.oauth …

sanic_discord.rest

Functions

def exchange_code(*args, **kwargs) ‑> 

Exchanges a code for an access token.

Expand source code
def exchange_code(*args, **kwargs) -> callable:
    """
    Exchanges a code for an access token.
    """
    def decorator(func):
        @wraps(func)
        async def wrapper(request: Request, *args_wrapper, **kwargs_wrapper) -> HTTPResponse:
            results = (await request.app.ctx.oauth2._exchange_code(request, *args, **kwargs)) + args_wrapper
            return await func(request, *results, **kwargs_wrapper)
        return wrapper
    return decorator

Classes

class AccessToken (data: AccessTokenType, http: HttpClient)

Accesstoken object.

Args

data : AccessTokenType
The access token data.
http : HttpClient
The HTTP client to use.

Attributes

access_token : str
The access token.
token_type : str
The token type.
refresh_token : str
The refresh token.
scope : str
The scope.
expires_in : datetime
The expiration date.
Expand source code
class AccessToken:
    """
    Accesstoken object.

    Args:
        data (AccessTokenType): The access token data.
        http (HttpClient): The HTTP client to use.

    Attributes:
        access_token (str): The access token.
        token_type (str): The token type.
        refresh_token (str): The refresh token.
        scope (str): The scope.
        expires_in (datetime): The expiration date."""

    def __init__(self, data: AccessTokenType, http: HttpClient):
        self.http = http
        self.data = data
        self.access_token = data["access_token"]
        self.token_type = data["token_type"]
        self.refresh_token = data["refresh_token"]
        self.scope = data["scope"]
        self.expires_in = datetime.now(
        ) + timedelta(seconds=self.data["expires_in"])

    async def fetch_user(self) -> dict:
        """
        Fetches the user's profile using an access token.

        Returns:
            dict: The user's profile."""
        return await self.http.fetch_user(self.access_token)
    
    async def fetch_guilds(self) -> List[dict]:
        return await self.http.fetch_guilds(self.access_token)

    async def add_guild(self, guildid: int):
        pass

Methods

async def add_guild(self, guildid: int)
Expand source code
async def add_guild(self, guildid: int):
    pass
async def fetch_guilds(self) ‑> List[dict]
Expand source code
async def fetch_guilds(self) -> List[dict]:
    return await self.http.fetch_guilds(self.access_token)
async def fetch_user(self) ‑> dict

Fetches the user's profile using an access token.

Returns

dict
The user's profile.
Expand source code
async def fetch_user(self) -> dict:
    """
    Fetches the user's profile using an access token.

    Returns:
        dict: The user's profile."""
    return await self.http.fetch_user(self.access_token)
class AccessTokenType (*args, **kwargs)

The type of the access token.

Expand source code
class AccessTokenType(TypedDict):
    """
    The type of the access token."""
    access_token: str
    token_type: str
    expires_in: int
    refresh_token: str
    scope: str

Ancestors

  • builtins.dict

Class variables

var access_token : str
var expires_in : int
var refresh_token : str
var scope : str
var token_type : str
class HttpException (message: Union[str, bytes, ForwardRef(None)] = None, status_code: Optional[int] = None, quiet: Optional[bool] = None, context: Optional[Dict[str, Any]] = None, extra: Optional[Dict[str, Any]] = None)

The base exception for all HTTP exceptions.

Expand source code
class HttpException(SanicException):
    """
    The base exception for all HTTP exceptions.
    """
    status_code = 500

Ancestors

  • sanic.exceptions.SanicException
  • builtins.Exception
  • builtins.BaseException

Subclasses

Class variables

var status_code
class InteractionClient (app: sanic.app.Sanic, token: str, public_key: str)

interaction client

Args

token : str
The bot token.
public_key : str
The public key.

Attributes

http : HttpClient
The http client.
verify_key : nacl.signing.VerifyKey
The public key.
Expand source code
class InteractionClient:
    """
    interaction client
    
    Args:
        token (str): The bot token.
        public_key (str): The public key.
        
    Attributes:
        http (HttpClient): The http client.
        verify_key (nacl.signing.VerifyKey): The public key."""
    def __init__(self, app: Sanic, token: str, public_key: str):
        self.app = app
        self.http = HttpClient(token, public_key)
        self.verify_key = VerifyKey(bytes.fromhex(public_key))
        self.interaction_event: callable = None

    async def close(self) -> None:
        """
        Closes the client.
        """
        await self.http.close()

    def on_interaction(self, func: callable) -> callable:
        """
        Decorator for handling interaction requests.
        Interaction event cacher."""
        self.interaction_event = func
        return func

    async def handle_interaction(self, request: Request) -> HTTPResponse:
        """
        Handles an interaction request.
        """
        signature = request.headers["X-Signature-Ed25519"]
        timestamp = request.headers["X-Signature-Timestamp"]
        body = request.body.decode("utf-8")
        try:
            self.verify_key.verify(f'{timestamp}{body}'.encode(), bytes.fromhex(signature))
        except BadSignatureError:
            raise InvaildSignatureError("Invalid signature")
        else:
            return await self.interaction_event(request.json)

Methods

async def close(self) ‑> None

Closes the client.

Expand source code
async def close(self) -> None:
    """
    Closes the client.
    """
    await self.http.close()
async def handle_interaction(self, request: sanic.request.Request) ‑> sanic.response.HTTPResponse

Handles an interaction request.

Expand source code
async def handle_interaction(self, request: Request) -> HTTPResponse:
    """
    Handles an interaction request.
    """
    signature = request.headers["X-Signature-Ed25519"]
    timestamp = request.headers["X-Signature-Timestamp"]
    body = request.body.decode("utf-8")
    try:
        self.verify_key.verify(f'{timestamp}{body}'.encode(), bytes.fromhex(signature))
    except BadSignatureError:
        raise InvaildSignatureError("Invalid signature")
    else:
        return await self.interaction_event(request.json)
def on_interaction(self, func: ) ‑> 

Decorator for handling interaction requests. Interaction event cacher.

Expand source code
def on_interaction(self, func: callable) -> callable:
    """
    Decorator for handling interaction requests.
    Interaction event cacher."""
    self.interaction_event = func
    return func
class NotFoundException (message: Union[str, bytes, ForwardRef(None)] = None, status_code: Optional[int] = None, quiet: Optional[bool] = None, context: Optional[Dict[str, Any]] = None, extra: Optional[Dict[str, Any]] = None)

If the request returns a 404.

Expand source code
class NotFoundException(HttpException):
    """
    If the request returns a 404."""
    status_code = 404

Ancestors

  • HttpException
  • sanic.exceptions.SanicException
  • builtins.Exception
  • builtins.BaseException

Class variables

var status_code
class Oauth2 (app: sanic.app.Sanic, client_id: int, client_secret: str, redirect_uri: str)

The following methods are used to generate the URL to redirect to.

Args

app : Sanic
The Sanic app.
client_id : int
The client ID.
client_secret : str
The client secret.
redirect_uri : str
The redirect URI.

Attributes

app : Sanic
The Sanic app.
client_id : int
The client ID.
client_secret : str
The client secret.
redirect_uri : str
The redirect URI.
http : HttpClient
The client used to make requests.
Expand source code
class Oauth2:
    """
    The following methods are used to generate the URL to redirect to.

    Args:
        app (Sanic): The Sanic app.
        client_id (int): The client ID.
        client_secret (str): The client secret.
        redirect_uri (str): The redirect URI.

    Attributes:
        app (Sanic): The Sanic app.
        client_id (int): The client ID.
        client_secret (str): The client secret.
        redirect_uri (str): The redirect URI.
        http (HttpClient): The client used to make requests.
    """

    def __init__(
        self, app: Sanic, client_id: int, client_secret: str, redirect_uri: str
    ):
        self.app = app
        self.client_id = client_id
        self.client_secret = client_secret
        self.redirect_uri = redirect_uri
        self.http: HttpClient = HttpClient()
        self.app.ctx.oauth2: Oauth2 = self

    async def close(self) -> None:
        """
        Closes the client.
        """
        await self.http.close()

    def exchange_code(self, state: bool = False):
        """
        Exchanges a code for an access token.

        Args:
            state (bool): If you use state in oauth url, you must do True.

        Raises:
            StateError: state is invalid.
        """
        def decorator(func):
            @wraps(func)
            async def wrapper(request: Request, *args, **kwargs) -> HTTPResponse:
                results = (await self._exchange_code(request, state)) + args
                return await func(request, *results, **kwargs)
            return wrapper
        return decorator

    async def _exchange_code(self, request: Request, state: bool = False) -> Tuple[AccessToken, ...]:
        """
        Exchanges a code for an access token.

        Args:
            request (Request): The request.
            state (bool): If you use state in oauth url, you must do True.

        Raises:
            StateError: state is invalid.
        """
        args = []
        if state:
            if request.args.get("state"):
                args.append(request.args.get("state"))
            else:
                raise StateError("state is required.")
        code = request.args.get("code")
        if code is None:
            raise OauthException("code is invaild.")
        args.insert(0, AccessToken(await self.http.exchange_code(
            code, self.redirect_uri, self.client_id, self.client_secret
        ), self.http))
        return tuple(args)

    async def fetch_user(self, access_token: str) -> dict:
        """
        Fetches the user's profile using an access token.

        Args:
            access_token (str): The access token to use.

        Returns:
            dict: The user's profile.
        """
        return await self.http.fetch_user(access_token)
    
    async def fetch_guilds(self, access_token: str) -> List[dict]:
        """
        Get a guilds
        
        Args:
            access_token (str): The access token
        
        Returns:
            List[dict]
        """
        return await self.http.fetch_guilds(access_token)

    async def refresh_token(self, refresh_token: str) -> AccessToken:
        """
        Refreshes an access token using a refresh token.

        Args:
            refresh_token (str): The refresh token to use.

        Returns:
            AccessToken: The new access token.
        """
        return AccessToken(await self.http.refresh_token(
            refresh_token, self.client_id, self.client_secret
        ), self.http)

    def get_authorize_url(self, scope: List[str] = ["identify"], *, state: Optional[str] = None) -> str:
        """
        Generates a URL to authorize the application.

        Args:
            scope (Optional[List[str]]): The scope to request. Defaults to ["identify"].
            state (Optional[str]): The state to use. Defaults to None.

        Returns:
            str: The URL to authorize the application.
        """
        payload = {
            "client_id": self.client_id,
            "scope": ' '.join(scope),
            "response_type": "code",
            "redirect_uri": self.redirect_uri
        }
        if state is not None:
            payload["state"] = state
        return f"{self.http.BASEURL}/oauth2/authorize?" + parse.urlencode(payload)

Methods

async def close(self) ‑> None

Closes the client.

Expand source code
async def close(self) -> None:
    """
    Closes the client.
    """
    await self.http.close()
def exchange_code(self, state: bool = False)

Exchanges a code for an access token.

Args

state : bool
If you use state in oauth url, you must do True.

Raises

StateError
state is invalid.
Expand source code
def exchange_code(self, state: bool = False):
    """
    Exchanges a code for an access token.

    Args:
        state (bool): If you use state in oauth url, you must do True.

    Raises:
        StateError: state is invalid.
    """
    def decorator(func):
        @wraps(func)
        async def wrapper(request: Request, *args, **kwargs) -> HTTPResponse:
            results = (await self._exchange_code(request, state)) + args
            return await func(request, *results, **kwargs)
        return wrapper
    return decorator
async def fetch_guilds(self, access_token: str) ‑> List[dict]

Get a guilds

Args

access_token : str
The access token

Returns

List[dict]

Expand source code
async def fetch_guilds(self, access_token: str) -> List[dict]:
    """
    Get a guilds
    
    Args:
        access_token (str): The access token
    
    Returns:
        List[dict]
    """
    return await self.http.fetch_guilds(access_token)
async def fetch_user(self, access_token: str) ‑> dict

Fetches the user's profile using an access token.

Args

access_token : str
The access token to use.

Returns

dict
The user's profile.
Expand source code
async def fetch_user(self, access_token: str) -> dict:
    """
    Fetches the user's profile using an access token.

    Args:
        access_token (str): The access token to use.

    Returns:
        dict: The user's profile.
    """
    return await self.http.fetch_user(access_token)
def get_authorize_url(self, scope: List[str] = ['identify'], *, state: Optional[str] = None) ‑> str

Generates a URL to authorize the application.

Args

scope : Optional[List[str]]
The scope to request. Defaults to ["identify"].
state : Optional[str]
The state to use. Defaults to None.

Returns

str
The URL to authorize the application.
Expand source code
def get_authorize_url(self, scope: List[str] = ["identify"], *, state: Optional[str] = None) -> str:
    """
    Generates a URL to authorize the application.

    Args:
        scope (Optional[List[str]]): The scope to request. Defaults to ["identify"].
        state (Optional[str]): The state to use. Defaults to None.

    Returns:
        str: The URL to authorize the application.
    """
    payload = {
        "client_id": self.client_id,
        "scope": ' '.join(scope),
        "response_type": "code",
        "redirect_uri": self.redirect_uri
    }
    if state is not None:
        payload["state"] = state
    return f"{self.http.BASEURL}/oauth2/authorize?" + parse.urlencode(payload)
async def refresh_token(self, refresh_token: str) ‑> AccessToken

Refreshes an access token using a refresh token.

Args

refresh_token : str
The refresh token to use.

Returns

AccessToken
The new access token.
Expand source code
async def refresh_token(self, refresh_token: str) -> AccessToken:
    """
    Refreshes an access token using a refresh token.

    Args:
        refresh_token (str): The refresh token to use.

    Returns:
        AccessToken: The new access token.
    """
    return AccessToken(await self.http.refresh_token(
        refresh_token, self.client_id, self.client_secret
    ), self.http)
class OauthException (message: Union[str, bytes, ForwardRef(None)] = None, status_code: Optional[int] = None, quiet: Optional[bool] = None, context: Optional[Dict[str, Any]] = None, extra: Optional[Dict[str, Any]] = None)

The base exception for all Oauth2 exceptions.

Expand source code
class OauthException(SanicException):
    """
    The base exception for all Oauth2 exceptions.
    """
    status_code = 403

Ancestors

  • sanic.exceptions.SanicException
  • builtins.Exception
  • builtins.BaseException

Subclasses

Class variables

var status_code
class StateError (message: Union[str, bytes, ForwardRef(None)] = None, status_code: Optional[int] = None, quiet: Optional[bool] = None, context: Optional[Dict[str, Any]] = None, extra: Optional[Dict[str, Any]] = None)

The exception raised when the state is invalid.

Expand source code
class StateError(OauthException):
    """
    The exception raised when the state is invalid.
    """
    status_code = 400

Ancestors

  • OauthException
  • sanic.exceptions.SanicException
  • builtins.Exception
  • builtins.BaseException

Class variables

var status_code