diff --git a/app/messages/notification_service.py b/app/messages/notification_service.py index 0038d83..e7237cd 100644 --- a/app/messages/notification_service.py +++ b/app/messages/notification_service.py @@ -1,6 +1,10 @@ import base64 +import hashlib +import hmac import re import smtplib +import time +from email.utils import parsedate_to_datetime from email.header import Header from email.mime.multipart import MIMEMultipart from email.mime.text import MIMEText @@ -14,6 +18,45 @@ class NotificationService: def __init__(self): self.headers = {"Content-Type": "application/json"} + self._feishu_time_cache: dict[str, float] = {"timestamp": 0.0, "fetched_at": 0.0} + + async def _get_feishu_timestamp(self) -> str: + """Get Feishu-compatible timestamp with multi-source fallback and sanity checks.""" + now = time.time() + cached_ts = self._feishu_time_cache.get("timestamp", 0.0) + fetched_at = self._feishu_time_cache.get("fetched_at", 0.0) + if cached_ts and (now - fetched_at) < 300: + return str(int(cached_ts)) + + time_sources = [ + "https://open.feishu.cn/", + "https://www.feishu.cn/", + "https://www.cloudflare.com/", + ] + try: + async with httpx.AsyncClient(timeout=10.0, follow_redirects=True) as client: + for url in time_sources: + for method in ("HEAD", "GET"): + try: + resp = await client.request(method, url) + date_header = resp.headers.get("Date") or resp.headers.get("date") + if not date_header: + continue + dt = parsedate_to_datetime(date_header) + server_ts = dt.timestamp() + if abs(server_ts - now) > 3600: + continue + self._feishu_time_cache["timestamp"] = server_ts + self._feishu_time_cache["fetched_at"] = now + return str(int(server_ts)) + except Exception: + continue + except Exception: + pass + + self._feishu_time_cache["timestamp"] = now + self._feishu_time_cache["fetched_at"] = now + return str(int(now)) async def _async_post(self, url: str, json_data: dict[str, Any], proxy: str | None = None) -> dict[str, Any]: try: @@ -26,6 +69,60 @@ async def _async_post(self, url: str, json_data: dict[str, Any], proxy: str | No logger.info(f"Push failed, push address: {url}, Error message: {e}") return {"error": str(e)} + @staticmethod + def _build_feishu_payload(title: str, content: str, msg_type: str, language: str) -> dict[str, Any]: + if msg_type == "post": + lines = [line for line in content.splitlines() if line.strip()] + if not lines: + lines = [content] + post_content = [[{"tag": "text", "text": line}] for line in lines] + return { + "msg_type": "post", + "content": {"post": {language: {"title": title, "content": post_content}}}, + } + + return {"msg_type": "text", "content": {"text": content}} + + @staticmethod + def _generate_feishu_sign(secret: str, timestamp: str) -> str: + key = f"{timestamp}\n{secret}".encode("utf-8") + msg = b"" + digest = hmac.new(key, msg, hashlib.sha256).digest() + return base64.b64encode(digest).decode("utf-8") + + async def send_to_feishu( + self, + url: str, + title: str, + content: str, + msg_type: str = "text", + sign_secret: str | None = None, + language: str = "zh_cn", + ) -> dict[str, Any]: + results = {"success": [], "error": []} + sign_secret = sign_secret.strip() if sign_secret else None + timestamp: str | None = None + if sign_secret: + timestamp = await self._get_feishu_timestamp() + api_list = [u.strip() for u in url.replace(",", ",").split(",") if u.strip()] + for api in api_list: + payload = self._build_feishu_payload(title=title, content=content, msg_type=msg_type, language=language) + if sign_secret: + payload["timestamp"] = timestamp + payload["sign"] = self._generate_feishu_sign(secret=sign_secret, timestamp=timestamp) + + resp = await self._async_post(api, payload) + if resp.get("StatusCode") == 0 or resp.get("code") == 0: + results["success"].append(api) + else: + results["error"].append(api) + if "error" in resp: + logger.info(f"Feishu push failed, push address: {api}, Failure message: {resp.get('error')}") + else: + logger.info(f"Feishu push failed, push address: {api}, Failure message: {resp}") + + return results + async def send_to_dingtalk( self, url: str, content: str, number: Optional[str] = None, is_atall: bool = False ) -> dict[str, list[str]]: @@ -234,20 +331,3 @@ async def send_to_serverchan( logger.info(f"ServerChan push failed, SCKEY/SendKey: {key}, Error message: {resp.get('message')}") return results - - async def send_to_feishu( - self, url: str, content: str - ) -> dict[str, list[str]]: - results = {"success": [], "error": []} - api_list = [u.strip() for u in url.replace(",", ",").split(",") if u.strip()] - for api in api_list: - json_data = { - "msg_type": "text", - "content": {"text": content} - } - resp = await self._async_post(api, json_data) - if resp.get("msg") == 'success': - results["success"].append(api) - else: - results["error"].append(api) - return results