Skip to content

Commit

Permalink
Rework class generation for eventsub and additional data
Browse files Browse the repository at this point in the history
This reworks the class generation for eventsub.
It also adds the Metadata, Header and NotificationSubscription to the BaseClass for all event notifications.
  • Loading branch information
chillymosh committed Jan 10, 2025
1 parent bfff39e commit b06aa3f
Show file tree
Hide file tree
Showing 6 changed files with 238 additions and 13 deletions.
2 changes: 1 addition & 1 deletion twitchio/eventsub/websockets.py
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,7 @@ async def _process_notification(self, data: NotificationMessage) -> None:
event = _SUB_MAPPING.get(sub_type, sub_type.removeprefix("channel.")).replace(".", "_")

try:
payload_class = create_event_instance(sub_type, data["payload"]["event"], http=self._http)
payload_class = create_event_instance(sub_type, data, http=self._http)
except ValueError:
logger.warning("Websocket '%s' received an unhandled eventsub event: '%s'.", self, event)
return
Expand Down
236 changes: 230 additions & 6 deletions twitchio/models/eventsub_.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

from __future__ import annotations

import datetime
from typing import TYPE_CHECKING, Any, ClassVar, Literal, NamedTuple, cast

from twitchio.assets import Asset
Expand All @@ -38,11 +39,17 @@


if TYPE_CHECKING:
import datetime

from twitchio.http import HTTPAsyncIterator, HTTPClient
from twitchio.models.channel_points import CustomRewardRedemption
from twitchio.types_.conduits import Condition, RevocationSubscription, RevocationTransport
from twitchio.types_.conduits import (
Condition,
NotificationMessage,
NotificationMetaData,
NotificationSubscription as _NotificationSubscription,
NotificationTransport,
RevocationSubscription,
RevocationTransport,
)
from twitchio.types_.eventsub import *
from twitchio.types_.responses import (
EventsubSubscriptionResponse,
Expand All @@ -55,17 +62,234 @@ class BaseEvent:
_registry: ClassVar[dict[str, type]] = {}
subscription_type: ClassVar[str | None] = None

def __init__(
self,
*,
subscription_data: Any,
metadata: NotificationMetaData | None = None,
headers: EventSubHeaders | None = None,
) -> None:
self._metadata = metadata
self._headers = headers
self._sub_data = subscription_data

def __init_subclass__(cls, **kwargs: Any) -> None:
super().__init_subclass__(**kwargs)
if cls.subscription_type is not None:
BaseEvent._registry[cls.subscription_type] = cls

@property
def timestamp(self) -> datetime.datetime:
"""The timestamp of the eventsub notification from Twitch in UTC.
If the notification Twitch sends is missing this data, then it will fall back to current UTC time.
Returns
-------
datetime.datetime
The datetime in UTC of the eventsub notification from Twitch.
"""
if self._metadata and (timestamp := self._metadata.get("message_timestamp")):
return parse_timestamp(timestamp)

if self._headers and (timestamp := self._headers.get("Twitch-Eventsub-Message-Timestamp")):
return parse_timestamp(timestamp)

return datetime.datetime.now(datetime.UTC)

@property
def metadata(self) -> Metadata | None:
"""Returns the metadata of a websocket event notification.
Returns
-------
Metadata | None
"""
if self._metadata is not None:
return Metadata(self._metadata)
return None

def create_event_instance(event_type: str, payload: dict[str, Any], http: HTTPClient | None = None) -> Any:
@property
def headers(self) -> Headers | None:
"""Returns eventsub webhook headers as a structured Headers object.
Returns
-------
Headers | None
"""
if self._headers is not None:
return Headers(self._headers)
return None

@property
def subscription_data(self) -> NotificationSubscription:
"""Returns the subscription data of the eventsub notification.
Returns
-------
NotificationSubscription
"""
return NotificationSubscription(self._sub_data)


def create_event_instance(
event_type: str,
raw_data: NotificationMessage | Any,
*,
http: HTTPClient | None = None,
headers: EventSubHeaders | None = None,
) -> Any:
event_cls = BaseEvent._registry.get(event_type)
if event_cls is None:
if not event_cls:
raise ValueError(f"No class registered for event type {event_type}")
return event_cls(payload) if http is None else event_cls(payload, http=http)

payload = raw_data["payload"]["event"] if "payload" in raw_data else raw_data["event"]
metadata = raw_data.get("metadata")
sub_data = raw_data["payload"]["subscription"] if "payload" in raw_data else raw_data["subscription"]
instance = event_cls(payload, http=http)

if isinstance(instance, BaseEvent):
instance._sub_data = sub_data
instance._metadata = metadata
instance._headers = headers

return instance


class Metadata:
"""
Represents the metadata returned from a websocket eventsub notification.
Attributes
-----------
message_id: str
An ID that uniquely identifies the message.
message_type: typing.Literal["notification"]
The type of message, which is set to `notification`.
message_timestamp: datetime.datetime
The UTC date and time that the message was sent.
subscription_type: str
The type of subscription. See `Subscription Types <https://dev.twitch.tv/docs/eventsub/eventsub-subscription-types/#subscription-types>`_.
subscription_version: typing.Literal["1", "2"]
The version number of the subscription type's definition. This is the same value specified in the subscription request.
"""

__slots__ = ("message_id", "message_timestamp", "message_type", "subscription_type", "subscription_version")

def __init__(self, data: NotificationMetaData) -> None:
self.message_id: str = data["message_id"]
self.message_type: Literal["notification"] = data["message_type"]
self.message_timestamp: datetime.datetime = parse_timestamp(data["message_timestamp"])
self.subscription_type: str = data["subscription_type"]
self.subscription_version: Literal["1", "2"] = data["subscription_version"]

def __repr__(self) -> str:
return f"<Metadata message_id={self.message_id}, message_type={self.message_type} subscription_type={self.subscription_type}>"


class Headers:
"""
Represents the headers received from a webhook notification.
Attributes
-----------
message_id: str
An ID that uniquely identifies this message. This is an opaque ID, and is not required to be in any particular format.
message_retry: str
Twitch sends you a notification at least once. If Twitch is unsure of whether you received a notification, it'll resend the event, which means you may receive a notification twice.
message_type: typing.Literal["notification", "webhook_callback_verification", "revocation"]
The type of notification. Possible values are:
- notification — Contains the event's data.
- webhook_callback_verification — Contains the challenge used to verify that you own the event handler.
- revocation — Contains the reason why Twitch revoked your subscription.
message_signature: str
The HMAC signature that you use to verify that Twitch sent the message.
message_timestamp: datetime.datetime: str
The UTC date and time that Twitch sent the notification.
subscription_type: str
The subscription type you subscribed to. For example, `channel.follow`.
subscription_version: str
The version number that identifies the definition of the subscription request. This version matches the version number that you specified in your subscription request.
raw_data: dict[str, str]
The headers as a raw dictionary, as there are additional fields that are not Twitch specific. You can utilise the `.get()` method to retrieve specific headers.
"""

__slots__ = (
"message_id",
"message_retry",
"message_signature",
"message_timestamp",
"message_type",
"raw_data",
"subscription_type",
"subscription_version",
)

def __init__(self, data: EventSubHeaders) -> None:
self.message_id: str = data.get("Twitch-Eventsub-Message-Id", "")
self.message_retry: str = data.get("Twitch-Eventsub-Message-Retry", "")
self.message_type: Literal["notification", "webhook_callback_verification", "revocation"] = data.get(
"Twitch-Eventsub-Message-Type", "notification"
)
self.message_signature: str = data.get("Twitch-Eventsub-Message-Signature", "")
timestamp = data.get("Twitch-Eventsub-Message-Timestamp", datetime.datetime.now(tz=datetime.UTC).isoformat())
self.message_timestamp: datetime.datetime = parse_timestamp(timestamp)
self.subscription_type: str = data.get("Twitch-Eventsub-Subscription-Type", "")
self.subscription_version: str = data.get("Twitch-Eventsub-Subscription-Version", "")

self.raw_data: EventSubHeaders = data

def get(self, key: str) -> str | None:
"""Retrieve a header value by key."""
return self.raw_data.get(key)

def __repr__(self) -> str:
return f"<Headers message_id={self.message_id}, message_type={self.message_type} subscription_type={self.subscription_type}>"


class NotificationSubscription:
"""
Represents the metadata returned from a websocket eventsub notification.
Attributes
-----------
id: str
An ID that uniquely identifies this subscription.
status: str
The subscription's status.
type: str
The notification's subscription type.
version: typing.Literal["1", "2"]
The version number of the subscription type's definition.
cost: int
How much the subscription counts against your limit. See `Subscription Limits <https://dev.twitch.tv/docs/eventsub/manage-subscriptions#subscription-limits>`_.
condition: Condition
This is a TypedDict that contains the conditions under which the event fires.
transport: NotificationTransport
This is a TypedDict that contains information about the transport used for notifications.
created_at: datetime.datetime
The UTC date and time that the subscription was created.
"""

__slots__ = ("condition", "cost", "created_at", "id", "status", "transport", "type", "version")

def __init__(self, data: _NotificationSubscription) -> None:
self.id: str = data["id"]
self.status: str = data["status"]
self.type: str = data["type"]
self.version: Literal["1", "2"] = data["version"]
self.cost: int = data["cost"]
self.condition: Condition = data["condition"]
self.transport: NotificationTransport = data["transport"]
self.created_at: datetime.datetime = parse_timestamp(data["created_at"])

def __repr__(self) -> str:
return (
f"<NotificationSubscription id={self.id}, type={self.type}, status={self.status} created_at={self.created_at}>"
)


class Boundary(NamedTuple):
Expand Down
8 changes: 4 additions & 4 deletions twitchio/types_/conduits.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ class NotificationMetaData(TypedDict):
message_type: Literal["notification"]
message_timestamp: str
subscription_type: str
subscription_version: str
subscription_version: Literal["1", "2"]


class ReconnectMetaData(TypedDict):
Expand Down Expand Up @@ -156,15 +156,15 @@ class Condition(TypedDict, total=False):


class NotificationTransport(TypedDict):
method: Literal["websocket"]
method: Literal["websocket", "webhook"]
session_id: str


class NotificationSubscription(TypedDict):
id: str
status: Literal["enabled"]
status: str
type: str
version: str
version: Literal["1", "2"]
cost: int
condition: Condition
transport: NotificationTransport
Expand Down
1 change: 1 addition & 0 deletions twitchio/types_/eventsub.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@
"ChatSubGiftData",
"ChatUserMessageHoldEvent",
"ChatUserMessageUpdateEvent",
"EventSubHeaders",
"GoalBeginEvent",
"GoalEndEvent",
"GoalProgressEvent",
Expand Down
2 changes: 1 addition & 1 deletion twitchio/web/aio_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ async def eventsub_callback(self, request: web.Request) -> web.Response:
event = _SUB_MAPPING.get(sub_type, sub_type.removeprefix("channel.")).replace(".", "_")

try:
payload_class = create_event_instance(sub_type, data["event"], http=self.client._http)
payload_class = create_event_instance(sub_type, data, http=self.client._http, headers=headers)
except ValueError:
logger.warning("Webhook '%s' received an unhandled eventsub event: '%s'.", self, event)
return web.Response(status=200)
Expand Down
2 changes: 1 addition & 1 deletion twitchio/web/starlette_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ async def eventsub_callback(self, request: Request) -> Response:
event = _SUB_MAPPING.get(sub_type, sub_type.removeprefix("channel.")).replace(".", "_")

try:
payload_class = create_event_instance(sub_type, data["event"], http=self.client._http)
payload_class = create_event_instance(sub_type, data, http=self.client._http, headers=headers)
except ValueError:
logger.warning("Webhook '%s' received an unhandled eventsub event: '%s'.", self, event)
return Response(status_code=200)
Expand Down

0 comments on commit b06aa3f

Please sign in to comment.