|
| 1 | +import logging |
| 2 | + |
| 3 | +from sqlalchemy.orm import Session |
| 4 | + |
| 5 | +from community_manager.dtos.chat import TargetChatMembersDTO |
| 6 | +from community_manager.entrypoint import init_client |
| 7 | +from community_manager.settings import community_manager_settings |
| 8 | +from core.actions.authorization import AuthorizationAction |
| 9 | +from core.constants import UPDATED_WALLETS_SET_NAME, UPDATED_STICKERS_USER_IDS |
| 10 | +from core.services.chat.rule.sticker import TelegramChatStickerCollectionService |
| 11 | +from core.services.chat.user import TelegramChatUserService |
| 12 | +from core.services.superredis import RedisService |
| 13 | + |
| 14 | + |
| 15 | +logger = logging.getLogger(__name__) |
| 16 | + |
| 17 | + |
| 18 | +class CommunityManagerChatAction: |
| 19 | + def __init__(self, db_session: Session): |
| 20 | + self.db_session = db_session |
| 21 | + self.telegram_chat_user_service = TelegramChatUserService(db_session) |
| 22 | + self.telegram_chat_sticker_collection_service = ( |
| 23 | + TelegramChatStickerCollectionService(db_session) |
| 24 | + ) |
| 25 | + self.redis_service = RedisService() |
| 26 | + |
| 27 | + def get_updated_chat_members(self) -> TargetChatMembersDTO: |
| 28 | + """ |
| 29 | + Fetches and updates the target chat members based on specific criteria including |
| 30 | + linked wallets and sticker owners. The method retrieves updated wallet addresses |
| 31 | + and sticker owner IDs from the Redis service, identifies relevant chat members |
| 32 | + from the Telegram chat services, and compiles the processed data into a |
| 33 | + `TargetChatMembersDTO` object. |
| 34 | +
|
| 35 | + :raises ValueError: If unexpected data types are retrieved or processed in the logic. |
| 36 | +
|
| 37 | + :return: A `TargetChatMembersDTO` object containing the updated wallet addresses, |
| 38 | + sticker owner IDs, and the compiled target chat members. |
| 39 | + """ |
| 40 | + wallets = self.redis_service.pop_from_set( |
| 41 | + name=UPDATED_WALLETS_SET_NAME, |
| 42 | + count=community_manager_settings.items_per_task, |
| 43 | + ) |
| 44 | + if isinstance(wallets, str): |
| 45 | + wallets = [wallets] |
| 46 | + |
| 47 | + sticker_owners_ids = ( |
| 48 | + self.redis_service.pop_from_set( |
| 49 | + name=UPDATED_STICKERS_USER_IDS, |
| 50 | + count=community_manager_settings.items_per_task, |
| 51 | + ) |
| 52 | + or [] |
| 53 | + ) |
| 54 | + if isinstance(sticker_owners_ids, str): |
| 55 | + sticker_owners_ids = [sticker_owners_ids] |
| 56 | + sticker_owners_ids = set(map(int, sticker_owners_ids)) |
| 57 | + |
| 58 | + target_chat_members: set[tuple[int, int]] = set() |
| 59 | + |
| 60 | + if wallets: |
| 61 | + chat_members = self.telegram_chat_user_service.get_all_by_linked_wallet( |
| 62 | + addresses=wallets |
| 63 | + ) |
| 64 | + target_chat_members.update( |
| 65 | + {(cm.chat_id, cm.user_id) for cm in chat_members} |
| 66 | + ) |
| 67 | + |
| 68 | + if sticker_owners_ids: |
| 69 | + rules = self.telegram_chat_sticker_collection_service.get_all( |
| 70 | + enabled_only=True |
| 71 | + ) |
| 72 | + unique_chat_ids = {r.chat_id for r in rules} |
| 73 | + target_chat_members.update( |
| 74 | + { |
| 75 | + (chat_id, user_id) |
| 76 | + for chat_id in unique_chat_ids |
| 77 | + for user_id in sticker_owners_ids |
| 78 | + } |
| 79 | + ) |
| 80 | + |
| 81 | + return TargetChatMembersDTO( |
| 82 | + wallets=wallets, |
| 83 | + sticker_owners_ids=sticker_owners_ids, |
| 84 | + target_chat_members=target_chat_members, |
| 85 | + ) |
| 86 | + |
| 87 | + async def sanity_chat_checks(self) -> None: |
| 88 | + """ |
| 89 | + Performs sanity checks on chat members and validates their eligibility. If there are |
| 90 | + any chat members to validate, it initiates the validation process with the help of |
| 91 | + a Telegram service client. Ineligible members are removed based on the validation |
| 92 | + logic. If an error occurs during validation, a fallback mechanism is triggered |
| 93 | + to add wallets and users back to the redis database to try again later. |
| 94 | +
|
| 95 | + The method logs the progress at various stages and handles exceptions to ensure |
| 96 | + fallback processes are executed if needed. |
| 97 | +
|
| 98 | + :raises Exception: If validation of chat members fails during execution. |
| 99 | + """ |
| 100 | + dto = self.get_updated_chat_members() |
| 101 | + if target_chat_members := dto.target_chat_members: |
| 102 | + try: |
| 103 | + logger.info(f"Validating chat members for {target_chat_members}") |
| 104 | + chat_members = self.telegram_chat_user_service.get_all_pairs( |
| 105 | + chat_member_pairs=target_chat_members |
| 106 | + ) |
| 107 | + |
| 108 | + if not chat_members: |
| 109 | + logger.info("No chats to validate. Skipping") |
| 110 | + return |
| 111 | + else: |
| 112 | + logger.info(f"Found {len(chat_members)} chat members to validate") |
| 113 | + |
| 114 | + telethon_service = init_client() |
| 115 | + authorization_action = AuthorizationAction( |
| 116 | + self.db_session, telethon_client=telethon_service.client |
| 117 | + ) |
| 118 | + await authorization_action.kick_ineligible_chat_members( |
| 119 | + chat_members=chat_members |
| 120 | + ) |
| 121 | + logger.info( |
| 122 | + f"Successfully validated {len(chat_members)} chat members. " |
| 123 | + ) |
| 124 | + except Exception as exc: |
| 125 | + logger.error(f"Failed to validate chat members: {exc}", exc_info=True) |
| 126 | + self.fallback_update_chat_members(dto=dto) |
| 127 | + raise exc |
| 128 | + else: |
| 129 | + logger.info("No users to validate. Skipping") |
| 130 | + |
| 131 | + def fallback_update_chat_members(self, dto: TargetChatMembersDTO) -> None: |
| 132 | + """ |
| 133 | + Activates a fallback mechanism to update chat members by storing provided wallets |
| 134 | + and sticker owner IDs in Redis sets. This ensures that the required updates are |
| 135 | + persisted and managed separately if the primary update mechanism fails. |
| 136 | +
|
| 137 | + :param dto: A data transfer object containing the wallets and sticker owner IDs |
| 138 | + to be updated in Redis sets. |
| 139 | + """ |
| 140 | + logger.warning("Activating fallback method for chat members.") |
| 141 | + self.redis_service.add_to_set(UPDATED_WALLETS_SET_NAME, *dto.wallets) |
| 142 | + self.redis_service.add_to_set( |
| 143 | + UPDATED_STICKERS_USER_IDS, *map(str, dto.sticker_owners_ids) |
| 144 | + ) |
0 commit comments