Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions backend/community_manager/actions/chat.py
Original file line number Diff line number Diff line change
Expand Up @@ -800,6 +800,10 @@ def get_updated_chat_members(self) -> TargetChatMembersDTO:

target_chat_members: set[tuple[int, int]] = set()

logger.info(
f"Retrieved {len(wallets)} wallets and {len(sticker_owners_telegram_ids)} sticker owners from Redis."
)

if wallets:
chat_members = self.telegram_chat_user_service.get_all_by_linked_wallet(
addresses=wallets
Expand Down
29 changes: 10 additions & 19 deletions backend/core/src/core/actions/sticker/external.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,7 @@
StickerItemDTO,
ExternalStickerItemDTO,
)
from core.services.sticker.character import StickerCharacterService
from core.services.sticker.collection import StickerCollectionService
from core.services.sticker.item import StickerItemService
from core.services.superredis import RedisService


Expand All @@ -24,18 +22,12 @@ def __init__(self, db_session: Session) -> None:
self.sticker_collection_service = StickerCollectionService(
db_session=db_session
)
self.sticker_character_service = StickerCharacterService(db_session=db_session)
self.sticker_item_service = StickerItemService(db_session=db_session)
self.redis_service = RedisService()

@staticmethod
def get_metadata_cache_key(collection_id: int) -> str:
return f"sticker-dom::{collection_id}::ownership-metadata"

# @staticmethod
# def get_data_cache_key(collection_id: int) -> str:
# return f"sticker-dom::{collection_id}::ownership-data"

@staticmethod
def get_collections_cache_key() -> str:
return "sticker-dom::collections"
Expand Down Expand Up @@ -63,31 +55,30 @@ def get_collection_cache_value(
hash_object = hashlib.sha256(collections_raw.encode())
return hash_object.hexdigest()

@staticmethod
def map_external_data_to_internal(
self, collection_id: int, items: list[ExternalStickerItemDTO]
collection_id: int,
items: list[ExternalStickerItemDTO],
characters_id_by_external_id: dict[int, int],
) -> list[StickerItemDTO]:
"""
Maps external data to internal data format for stickers by integrating external data items
with internal user and character references. The method filters out items that lack a corresponding
internal user or character mapping, logging the appropriate debug or warning messages in such cases.

:param collection_id: Unique identifier for the sticker collection.
:type collection_id: int
:param items: List of external sticker items to be mapped to the internal format.
:type items: list[ExternalStickerItemDTO]
:param characters_id_by_external_id: Dictionary mapping external character IDs to their corresponding internal ID.
:return: A list of StickerItemDTO instances representing the mapped internal sticker items.
:rtype: list[StickerItemDTO]
"""
characters = self.sticker_character_service.get_all(collection_id=collection_id)
characters_by_external_id = {
character.external_id: character for character in characters
}

internally_mapped_items = []

for item in items:
if not (character := characters_by_external_id.get(item.character_id)):
# There is a desynchronization between Sticker Dom and the database.
if not (
character_id := characters_id_by_external_id.get(item.character_id)
):
# It would mean there is a desynchronization between Sticker Dom and the database.
logger.warning(
f"Missing character {item.character_id!r} for collection {collection_id!r}. Skipping item."
)
Expand All @@ -97,7 +88,7 @@ def map_external_data_to_internal(
StickerItemDTO(
id=item.id,
collection_id=collection_id,
character_id=character.id,
character_id=character_id,
telegram_user_id=item.telegram_user_id,
instance=item.instance,
)
Expand Down
4 changes: 3 additions & 1 deletion backend/core/src/core/dtos/sticker.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,9 @@ def from_raw(cls, raw: bytes | str, collection_id: int) -> Self:
json_data = json.loads(raw)
ownership_data = [
ExternalStickerItemDTO(
id=f"{collection_id}_{character_id}_{instance_id}_{user_id}",
# ID should not contain ID of the owner (user_id) to ensure
# that after changing the owner it'll stay the same
id=f"{collection_id}_{character_id}_{instance_id}",
collection_id=collection_id,
character_id=character_id,
telegram_user_id=user_id,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
"""migrate sticker ownership data

Revision ID: 42065519a495
Revises: 1b167f412dfd
Create Date: 2025-11-21 09:52:47.531079

"""
from typing import Sequence

from alembic import op
import sqlalchemy as sa
import logging
from sqlalchemy import exc as sa_exc
import redis as _redis

# Import Redis service and redis-set constant so migration can perform one-time enqueue
from core.services.superredis import RedisService
from core.constants import UPDATED_STICKERS_USER_IDS

logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)

# revision identifiers, used by Alembic.
revision: str = "42065519a495"
down_revision: str | None = "1b167f412dfd"
branch_labels: str | Sequence[str] | None = None
depends_on: str | Sequence[str] | None = None


def upgrade() -> None:
# New simpler migration: delete only old-format rows that already have a
# corresponding new-format record, then enqueue managed users to trigger a
# refresh. This assumes your background refresh task has created the new
# records already.
conn = op.get_bind()

# Require that new-format records exist before proceeding.
count_new_format = conn.execute(
sa.text(
"SELECT COUNT(*) FROM sticker_item WHERE (length(id) - length(replace(id, '_', ''))) = 2"
)
).scalar_one()
count_new_format = int(count_new_format)

if count_new_format == 0:
logger.error(
f"{count_new_format} new-format sticker_item rows found; aborting migration"
)
raise Exception("Aborting migration due to insufficient data")

# Detect old-format IDs by counting underscores (old has 3 underscores: collection_character_instance_user)
count_old = conn.execute(
sa.text(
"SELECT COUNT(*) FROM sticker_item WHERE (length(id) - length(replace(id, '_', ''))) = 3"
)
).scalar()
count_old = int(count_old)

if count_old == 0:
logger.warning("No old-format sticker_item rows found; migration complete")
else:
logger.info(
"Found %d old-format sticker_item rows; they will be removed", count_old
)
conn.execute(
sa.text(
"""
DELETE FROM sticker_item si
WHERE (length(si.id) - length(replace(si.id, '_', ''))) = 3;
"""
)
)

# After deletion, enqueue all managed users to the Redis set to trigger double-checks
try:
redis = RedisService()
rows = conn.execute(
sa.text(
"""
SELECT DISTINCT u.telegram_id
FROM telegram_chat_user tcu
JOIN telegram_chat_sticker_collection tcs ON tcu.chat_id = tcs.chat_id
JOIN "user" u ON tcu.user_id = u.id
WHERE tcu.is_managed = true AND u.telegram_id IS NOT NULL;
"""
)
).fetchall()

telegram_ids = [str(r[0]) for r in rows if r and r[0] is not None]
if telegram_ids:
chunk_size = 1000
for i in range(0, len(telegram_ids), chunk_size):
batch = telegram_ids[i : i + chunk_size]
try:
redis.add_to_set(UPDATED_STICKERS_USER_IDS, *batch)
except _redis.RedisError as r_e:
logger.exception(
"Redis error while adding telegram ids during migration: %s",
r_e,
)
# continue trying remaining batches
except sa_exc.SQLAlchemyError as db_e:
logger.exception(
"Database error while selecting telegram ids during migration: %s", db_e
)
except _redis.RedisError as r:
logger.exception(
"Redis connection/setup error (non-fatal) while trying to enqueue telegram ids during migration: %s",
r,
)
except Exception as e:
logger.exception(
"Non-fatal unexpected error while trying to enqueue telegram ids into Redis during migration: %s",
e,
)


def downgrade() -> None:
# This migration is destructive (deletes old-format rows). Downgrade is a no-op.
logger.warning(
"Downgrade called for migration 42065519a495: no-op (deleted rows cannot be restored by this migration)"
)
return
54 changes: 50 additions & 4 deletions backend/core/src/core/services/sticker/item.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
from typing import Iterable
from typing import Iterable, Sequence, Any

from sqlalchemy import delete
from sqlalchemy.orm import load_only, QueryableAttribute

from core.models.sticker import StickerItem
from core.services.base import BaseService
Expand All @@ -16,7 +19,28 @@ def get_all(
collection_id: int | None = None,
character_id: int | None = None,
item_ids: Iterable[str] | None = None,
_load_attributes: list[QueryableAttribute[Any]] | None = None,
) -> list[StickerItem]:
"""
Retrieve all sticker items based on provided filter criteria.

This method filters sticker items from the database query based on the
specified parameters.
If a filter parameter is omitted, it will not restrict the query for that parameter.
The function returns a list of all matching sticker items.

:param telegram_user_id: The Telegram user ID to filter by.
If None, this filter is not applied.
:param collection_id: The collection ID to filter by.
If None, this filter is not applied.
:param character_id: The character ID to filter by.
If None, this filter is not applied.
:param item_ids: An iterable of item IDs to filter by.
If None, this filter is not applied.
:param _load_attributes: A list of attribute names to load for each StickerItem.
If None, all attributes are loaded.
:return: A list of `StickerItem` instances that match the specified criteria.
"""
query = self.db_session.query(StickerItem)
if telegram_user_id is not None:
query = query.filter(StickerItem.telegram_user_id == telegram_user_id)
Expand All @@ -27,6 +51,9 @@ def get_all(
if item_ids is not None:
query = query.filter(StickerItem.id.in_(item_ids))

if _load_attributes:
query = query.options(load_only(*_load_attributes))

return query.all()

def create(
Expand Down Expand Up @@ -65,7 +92,26 @@ def delete(self, item_id: str) -> None:
def bulk_delete(
self,
item_ids: Iterable[str],
) -> None:
self.db_session.query(StickerItem).filter(StickerItem.id.in_(item_ids)).delete(
synchronize_session="fetch"
) -> Sequence[int]:
"""
Deletes multiple sticker items from the database based on the provided item IDs.

This function executes a batch deletion operation for sticker items using the
given list of IDs and returns the corresponding `telegram_user_id` values for
the deleted items.

:param item_ids: An iterable of strings representing the IDs of the sticker
items to be deleted.
:return: A sequence of integers representing the `telegram_user_id` values
of the deleted sticker items.
"""
telegram_user_ids = (
self.db_session.execute(
delete(StickerItem)
.where(StickerItem.id.in_(item_ids))
.returning(StickerItem.telegram_user_id)
)
.scalars()
.all()
)
return telegram_user_ids
13 changes: 10 additions & 3 deletions backend/core/src/core/services/user.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,25 @@
from typing import Iterable
from typing import Iterable, Any

from sqlalchemy.orm import joinedload
from sqlalchemy.orm import joinedload, load_only, QueryableAttribute

from core.dtos.user import TelegramUserDTO
from core.models.user import User
from core.services.base import BaseService


class UserService(BaseService):
def get_all(self, telegram_ids: Iterable[int] | None = None) -> list[User]:
def get_all(
self,
telegram_ids: Iterable[int] | None = None,
_load_attributes: list[QueryableAttribute[Any]] | None = None,
) -> list[User]:
query = self.db_session.query(User)
if telegram_ids:
query = query.filter(User.telegram_id.in_(telegram_ids))

if _load_attributes:
query = query.options(load_only(*_load_attributes))

return query.all()

def get_by_telegram_id(self, telegram_id: int) -> User:
Expand Down
19 changes: 19 additions & 0 deletions backend/core/src/core/utils/misc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
from itertools import islice
from typing import Iterable, Any


def batched(it: Iterable[Any], size: int) -> Iterable[list[Any]]:
"""
Splits an iterable into batches of the specified size. Each batch is returned
as a list. The function continues splitting until the iterable is exhausted.
If the number of remaining items is less than the requested batch
size, the final batch will contain all remaining items.

:param it: The input iterable to split into batches.
:param size: The desired size of each batch.
:return: An iterable that yields lists, where each list represents a batch
of the specified size.
"""
it = iter(it)
while chunk := list(islice(it, size)):
yield chunk
Loading