11"""Token 同步服务 - 按需启动浏览器"""
22import httpx
33from datetime import datetime
4- from typing import Optional , Dict , Any
4+ from typing import Optional , Dict , Any , List
55from .config import config
66from .browser import browser_manager
77from .database import profile_db
@@ -16,6 +16,53 @@ def __init__(self):
1616 self ._total_error_count = 0
1717 self ._last_batch_time : Optional [datetime ] = None
1818
19+ async def _check_tokens_status (self , emails : List [str ] = None ) -> Dict [str , Any ]:
20+ """从 Flow2API 查询 token 状态
21+
22+ Returns:
23+ {
24+ "success": True,
25+ "tokens": [...],
26+ "needs_refresh_emails": ["email1", "email2"]
27+ }
28+ """
29+ if not config .connection_token :
30+ return {"success" : False , "error" : "未配置 CONNECTION_TOKEN" }
31+
32+ url = f"{ config .flow2api_url } /api/plugin/check-tokens"
33+
34+ try :
35+ async with httpx .AsyncClient (timeout = 30 ) as client :
36+ payload = {}
37+ if emails :
38+ payload ["emails" ] = emails
39+
40+ response = await client .post (
41+ url ,
42+ json = payload ,
43+ headers = {
44+ "Content-Type" : "application/json" ,
45+ "Authorization" : f"Bearer { config .connection_token } "
46+ }
47+ )
48+
49+ if response .status_code == 200 :
50+ data = response .json ()
51+ tokens = data .get ("tokens" , [])
52+ needs_refresh_emails = [
53+ t ["email" ] for t in tokens
54+ if t .get ("needs_refresh" ) and t .get ("is_active" )
55+ ]
56+ return {
57+ "success" : True ,
58+ "tokens" : tokens ,
59+ "needs_refresh_emails" : needs_refresh_emails
60+ }
61+ else :
62+ return {"success" : False , "error" : f"HTTP { response .status_code } " }
63+ except Exception as e :
64+ return {"success" : False , "error" : str (e )}
65+
1966 async def sync_profile (self , profile_id : int ) -> Dict [str , Any ]:
2067 """同步单个 profile"""
2168 profile = await profile_db .get_profile (profile_id )
@@ -37,6 +84,9 @@ async def sync_profile(self, profile_id: int) -> Dict[str, Any]:
3784 self ._total_error_count += 1
3885 return {"success" : False , "error" : "无法提取 Token,请先登录" }
3986
87+ # Log token for debugging
88+ logger .info (f"[{ profile ['name' ]} ] 提取到 Token: { token [:20 ]} ...{ token [- 10 :]} " )
89+
4090 # 推送到 Flow2API
4191 result = await self ._push_to_flow2api (token )
4292
@@ -63,16 +113,83 @@ async def sync_profile(self, profile_id: int) -> Dict[str, Any]:
63113 return result
64114
65115 async def sync_all_profiles (self ) -> Dict [str , Any ]:
66- """同步所有活跃 profile"""
116+ """同步所有活跃 profile(智能模式:只刷新快过期的) """
67117 if not config .connection_token :
68118 return {"success" : False , "error" : "未配置 CONNECTION_TOKEN" }
69119
70120 logger .info ("=" * 40 )
71- logger .info ("开始批量同步 ..." )
121+ logger .info ("开始智能同步 ..." )
72122
73123 self ._last_batch_time = datetime .now ()
74124 profiles = await profile_db .get_active_profiles ()
75125
126+ if not profiles :
127+ logger .info ("没有活跃的 Profile" )
128+ return {"success" : True , "total" : 0 , "synced" : 0 , "skipped" : 0 }
129+
130+ # 获取所有 profile 的 email
131+ profile_emails = {p .get ("email" ): p for p in profiles if p .get ("email" )}
132+
133+ # 查询 Flow2API 哪些 token 需要刷新
134+ check_result = await self ._check_tokens_status (list (profile_emails .keys ()))
135+
136+ if not check_result ["success" ]:
137+ logger .warning (f"无法查询 token 状态: { check_result .get ('error' )} ,回退到全量同步" )
138+ # 回退到全量同步
139+ return await self ._sync_all_profiles_force ()
140+
141+ needs_refresh_emails = set (check_result .get ("needs_refresh_emails" , []))
142+
143+ if not needs_refresh_emails :
144+ logger .info ("所有 token 状态良好,无需刷新" )
145+ return {
146+ "success" : True ,
147+ "total" : len (profiles ),
148+ "synced" : 0 ,
149+ "skipped" : len (profiles ),
150+ "message" : "所有 token 未过期"
151+ }
152+
153+ logger .info (f"需要刷新的 token: { len (needs_refresh_emails )} 个" )
154+
155+ results = []
156+ success_count = 0
157+ error_count = 0
158+ skipped_count = 0
159+
160+ for profile in profiles :
161+ email = profile .get ("email" )
162+ if email and email in needs_refresh_emails :
163+ result = await self .sync_profile (profile ["id" ])
164+ results .append ({
165+ "profile_id" : profile ["id" ],
166+ "profile_name" : profile ["name" ],
167+ ** result
168+ })
169+ if result ["success" ]:
170+ success_count += 1
171+ else :
172+ error_count += 1
173+ else :
174+ skipped_count += 1
175+ logger .info (f"[{ profile ['name' ]} ] token 未过期,跳过" )
176+
177+ logger .info (f"智能同步完成: 成功 { success_count } , 失败 { error_count } , 跳过 { skipped_count } " )
178+
179+ return {
180+ "success" : True ,
181+ "total" : len (profiles ),
182+ "synced" : success_count + error_count ,
183+ "success_count" : success_count ,
184+ "error_count" : error_count ,
185+ "skipped" : skipped_count ,
186+ "results" : results
187+ }
188+
189+ async def _sync_all_profiles_force (self ) -> Dict [str , Any ]:
190+ """强制同步所有 profile(不检查过期状态)"""
191+ profiles = await profile_db .get_active_profiles ()
192+
76193 results = []
77194 success_count = 0
78195 error_count = 0
@@ -89,7 +206,7 @@ async def sync_all_profiles(self) -> Dict[str, Any]:
89206 else :
90207 error_count += 1
91208
92- logger .info (f"批量同步完成 : 成功 { success_count } , 失败 { error_count } " )
209+ logger .info (f"强制同步完成 : 成功 { success_count } , 失败 { error_count } " )
93210
94211 return {
95212 "success" : True ,
0 commit comments