From 13a0952a86e7db244edee5983c778f6374497276 Mon Sep 17 00:00:00 2001 From: zihao256 <2638779206@qq.com> Date: Sun, 22 Mar 2026 17:00:10 +0800 Subject: [PATCH] feat(accounts): add server-side concurrent batch operations --- src/core/openai/token_refresh.py | 16 +- src/web/routes/accounts.py | 83 ++++-- src/web/routes/payment.py | 73 ++++-- static/css/style.css | 55 ++++ static/js/accounts.js | 226 +++++++++++++---- templates/accounts.html | 28 +++ tests/test_account_batch_operation_routes.py | 252 +++++++++++++++++++ 7 files changed, 625 insertions(+), 108 deletions(-) create mode 100644 tests/test_account_batch_operation_routes.py diff --git a/src/core/openai/token_refresh.py b/src/core/openai/token_refresh.py index 394c56e2..f1d2faab 100644 --- a/src/core/openai/token_refresh.py +++ b/src/core/openai/token_refresh.py @@ -323,10 +323,24 @@ def validate_account_token(account_id: int, proxy_url: Optional[str] = None) -> with get_db() as db: account = crud.get_account_by_id(db, account_id) if not account: + logger.warning(f"验证账号失败,账号不存在: {account_id}") return False, "账号不存在" if not account.access_token: + logger.warning(f"验证账号 {account.email} 失败: 缺少 access_token") + crud.update_account(db, account_id, status="failed") return False, "账号没有 access_token" + logger.info(f"开始验证账号 Token: {account.email}") manager = TokenRefreshManager(proxy_url=proxy_url) - return manager.validate_token(account.access_token) + is_valid, error = manager.validate_token(account.access_token) + crud.update_account( + db, + account_id, + status="active" if is_valid else "failed", + ) + if is_valid: + logger.info(f"账号 Token 验证成功: {account.email}") + else: + logger.warning(f"账号 Token 验证失败: {account.email}, 原因: {error}") + return is_valid, error diff --git a/src/web/routes/accounts.py b/src/web/routes/accounts.py index a6a597fc..0b6c9b04 100644 --- a/src/web/routes/accounts.py +++ b/src/web/routes/accounts.py @@ -5,10 +5,11 @@ import json import logging import zipfile +from concurrent.futures import ThreadPoolExecutor from datetime import datetime from typing import List, Optional -from fastapi import APIRouter, HTTPException, Query, BackgroundTasks, Body +from fastapi import APIRouter, HTTPException, Query, Body from fastapi.responses import StreamingResponse from pydantic import BaseModel @@ -99,6 +100,22 @@ class BatchUpdateRequest(BaseModel): # ============== Helper Functions ============== +def normalize_batch_concurrency(concurrency: Optional[int]) -> int: + """限制批处理并发数在 2-10 之间。""" + if concurrency is None: + return 4 + return max(2, min(10, concurrency)) + + +def run_batch_concurrently(items: List[int], concurrency: Optional[int], worker): + """使用本地线程池并发执行批处理任务。""" + if not items: + return [] + + max_workers = min(len(items), normalize_batch_concurrency(concurrency)) + with ThreadPoolExecutor(max_workers=max_workers, thread_name_prefix="account_batch") as executor: + return list(executor.map(worker, items)) + def resolve_account_ids( db, ids: List[int], @@ -193,7 +210,6 @@ async def list_accounts( accounts=[account_to_response(acc) for acc in accounts] ) - @router.get("/{account_id}", response_model=AccountResponse) async def get_account(account_id: int): """获取单个账号详情""" @@ -576,6 +592,7 @@ class BatchRefreshRequest(BaseModel): """批量刷新请求""" ids: List[int] = [] proxy: Optional[str] = None + concurrency: Optional[int] = 4 select_all: bool = False status_filter: Optional[str] = None email_service_filter: Optional[str] = None @@ -591,6 +608,7 @@ class BatchValidateRequest(BaseModel): """批量验证请求""" ids: List[int] = [] proxy: Optional[str] = None + concurrency: Optional[int] = 4 select_all: bool = False status_filter: Optional[str] = None email_service_filter: Optional[str] = None @@ -598,7 +616,7 @@ class BatchValidateRequest(BaseModel): @router.post("/batch-refresh") -async def batch_refresh_tokens(request: BatchRefreshRequest, background_tasks: BackgroundTasks): +def batch_refresh_tokens(request: BatchRefreshRequest): """批量刷新账号 Token""" proxy = _get_proxy(request.proxy) @@ -614,23 +632,30 @@ async def batch_refresh_tokens(request: BatchRefreshRequest, background_tasks: B request.status_filter, request.email_service_filter, request.search_filter ) - for account_id in ids: + def worker(account_id: int): try: - result = do_refresh(account_id, proxy) - if result.success: - results["success_count"] += 1 - else: - results["failed_count"] += 1 - results["errors"].append({"id": account_id, "error": result.error_message}) + return {"id": account_id, "result": do_refresh(account_id, proxy)} except Exception as e: + return {"id": account_id, "error": str(e)} + + for item in run_batch_concurrently(ids, request.concurrency, worker): + if item.get("error") is not None: results["failed_count"] += 1 - results["errors"].append({"id": account_id, "error": str(e)}) + results["errors"].append({"id": item["id"], "error": item["error"]}) + continue + + result = item["result"] + if result.success: + results["success_count"] += 1 + else: + results["failed_count"] += 1 + results["errors"].append({"id": item["id"], "error": result.error_message}) return results @router.post("/{account_id}/refresh") -async def refresh_account_token(account_id: int, request: Optional[TokenRefreshRequest] = Body(default=None)): +def refresh_account_token(account_id: int, request: Optional[TokenRefreshRequest] = Body(default=None)): """刷新单个账号的 Token""" proxy = _get_proxy(request.proxy if request else None) result = do_refresh(account_id, proxy) @@ -649,7 +674,7 @@ async def refresh_account_token(account_id: int, request: Optional[TokenRefreshR @router.post("/batch-validate") -async def batch_validate_tokens(request: BatchValidateRequest): +def batch_validate_tokens(request: BatchValidateRequest): """批量验证账号 Token 有效性""" proxy = _get_proxy(request.proxy) @@ -665,31 +690,37 @@ async def batch_validate_tokens(request: BatchValidateRequest): request.status_filter, request.email_service_filter, request.search_filter ) - for account_id in ids: + def worker(account_id: int): try: is_valid, error = do_validate(account_id, proxy) - results["details"].append({ + return { "id": account_id, "valid": is_valid, - "error": error - }) - if is_valid: - results["valid_count"] += 1 - else: - results["invalid_count"] += 1 + "error": error, + } except Exception as e: - results["invalid_count"] += 1 - results["details"].append({ + return { "id": account_id, "valid": False, - "error": str(e) - }) + "error": str(e), + } + + for item in run_batch_concurrently(ids, request.concurrency, worker): + results["details"].append({ + "id": item["id"], + "valid": item["valid"], + "error": item["error"] + }) + if item["valid"]: + results["valid_count"] += 1 + else: + results["invalid_count"] += 1 return results @router.post("/{account_id}/validate") -async def validate_account_token(account_id: int, request: Optional[TokenValidateRequest] = Body(default=None)): +def validate_account_token(account_id: int, request: Optional[TokenValidateRequest] = Body(default=None)): """验证单个账号的 Token 有效性""" proxy = _get_proxy(request.proxy if request else None) is_valid, error = do_validate(account_id, proxy) diff --git a/src/web/routes/payment.py b/src/web/routes/payment.py index ef9ff6b3..9f7d78ba 100644 --- a/src/web/routes/payment.py +++ b/src/web/routes/payment.py @@ -13,7 +13,7 @@ from ...database.models import Account from ...database import crud from ...config.settings import get_settings -from .accounts import resolve_account_ids +from .accounts import resolve_account_ids, run_batch_concurrently from ...core.openai.payment import ( generate_plus_link, generate_team_link, @@ -50,12 +50,12 @@ class MarkSubscriptionRequest(BaseModel): class BatchCheckSubscriptionRequest(BaseModel): ids: List[int] = [] proxy: Optional[str] = None + concurrency: Optional[int] = 4 select_all: bool = False status_filter: Optional[str] = None email_service_filter: Optional[str] = None search_filter: Optional[str] = None - # ============== 支付链接生成 ============== @router.post("/generate-link") @@ -122,6 +122,40 @@ def open_browser_incognito(request: OpenIncognitoRequest): # ============== 订阅状态 ============== +def _check_subscription_for_account(account: Account, proxy: Optional[str], db) -> dict: + logger.info(f"开始检测账号订阅状态: {account.email}") + try: + status = check_subscription_status(account, proxy) + account.subscription_type = None if status == "free" else status + account.subscription_at = datetime.utcnow() if status != "free" else account.subscription_at + db.commit() + logger.info(f"账号订阅检测成功: {account.email}, 结果: {status}") + return { + "id": account.id, + "email": account.email, + "success": True, + "subscription_type": status, + } + except Exception as e: + db.rollback() + logger.warning(f"账号订阅检测失败: {account.email}, 原因: {e}") + return { + "id": account.id, + "email": account.email, + "success": False, + "error": str(e), + } + + +def _check_subscription_for_account_id(account_id: int, proxy: Optional[str]) -> dict: + with get_db() as db: + account = db.query(Account).filter(Account.id == account_id).first() + if not account: + return {"id": account_id, "email": None, "success": False, "error": "账号不存在"} + + return _check_subscription_for_account(account, proxy, db) + + @router.post("/accounts/batch-check-subscription") def batch_check_subscription(request: BatchCheckSubscriptionRequest): """批量检测账号订阅状态""" @@ -134,29 +168,16 @@ def batch_check_subscription(request: BatchCheckSubscriptionRequest): db, request.ids, request.select_all, request.status_filter, request.email_service_filter, request.search_filter ) - for account_id in ids: - account = db.query(Account).filter(Account.id == account_id).first() - if not account: - results["failed_count"] += 1 - results["details"].append( - {"id": account_id, "email": None, "success": False, "error": "账号不存在"} - ) - continue - - try: - status = check_subscription_status(account, proxy) - account.subscription_type = None if status == "free" else status - account.subscription_at = datetime.utcnow() if status != "free" else account.subscription_at - db.commit() - results["success_count"] += 1 - results["details"].append( - {"id": account_id, "email": account.email, "success": True, "subscription_type": status} - ) - except Exception as e: - results["failed_count"] += 1 - results["details"].append( - {"id": account_id, "email": account.email, "success": False, "error": str(e)} - ) + + def worker(account_id: int): + return _check_subscription_for_account_id(account_id, proxy) + + for detail in run_batch_concurrently(ids, request.concurrency, worker): + results["details"].append(detail) + if detail["success"]: + results["success_count"] += 1 + else: + results["failed_count"] += 1 return results @@ -178,5 +199,3 @@ def mark_subscription(account_id: int, request: MarkSubscriptionRequest): db.commit() return {"success": True, "subscription_type": request.subscription_type} - - diff --git a/static/css/style.css b/static/css/style.css index 4c40acc9..0b9a0289 100644 --- a/static/css/style.css +++ b/static/css/style.css @@ -776,6 +776,18 @@ body { box-shadow: 0 0 0 3px var(--primary-light); } +.batch-concurrency-control { + display: inline-flex; + align-items: center; + gap: 8px; + color: var(--text-secondary); + font-size: 0.875rem; +} + +.batch-concurrency-control .form-select { + min-width: 84px; +} + /* ============================================ 分页 ============================================ */ @@ -1065,6 +1077,40 @@ body { to { transform: translateX(100%); } } +.batch-progress-panel { + flex: 1 1 100%; + background: var(--surface-hover); + border: 1px solid var(--border); + border-radius: var(--radius-lg); + padding: var(--spacing-md); +} + +.batch-progress-header { + display: flex; + justify-content: space-between; + align-items: flex-start; + gap: var(--spacing-md); + margin-bottom: var(--spacing-sm); +} + +.batch-progress-title { + font-size: 0.95rem; + font-weight: 600; + color: var(--text-primary); +} + +.batch-progress-meta, +.batch-progress-stats { + font-size: 0.875rem; + color: var(--text-secondary); +} + +.batch-progress-percent { + font-size: 1rem; + font-weight: 700; + color: var(--primary-color); +} + /* ============================================ 空状态 ============================================ */ @@ -1260,6 +1306,15 @@ body { align-items: stretch; } + .batch-concurrency-control { + justify-content: space-between; + } + + .batch-progress-header { + align-items: stretch; + flex-direction: column; + } + .form-row { grid-template-columns: 1fr; } diff --git a/static/js/accounts.js b/static/js/accounts.js index fe9848c3..f99a0c1f 100644 --- a/static/js/accounts.js +++ b/static/js/accounts.js @@ -11,6 +11,13 @@ let selectedAccounts = new Set(); let isLoading = false; let selectAllPages = false; // 是否选中了全部页 let currentFilters = { status: '', email_service: '', search: '' }; // 当前筛选条件 +let batchOperationRunning = false; +let activeBatchButton = null; +let activeBatchButtonText = ''; + +const MIN_BATCH_CONCURRENCY = 2; +const MAX_BATCH_CONCURRENCY = 10; +const DEFAULT_BATCH_CONCURRENCY = 4; // DOM 元素 const elements = { @@ -28,6 +35,13 @@ const elements = { batchUploadBtn: document.getElementById('batch-upload-btn'), batchCheckSubBtn: document.getElementById('batch-check-sub-btn'), batchDeleteBtn: document.getElementById('batch-delete-btn'), + batchConcurrency: document.getElementById('batch-concurrency'), + batchProgressPanel: document.getElementById('batch-progress-panel'), + batchProgressTitle: document.getElementById('batch-progress-title'), + batchProgressMeta: document.getElementById('batch-progress-meta'), + batchProgressBar: document.getElementById('batch-progress-bar'), + batchProgressPercent: document.getElementById('batch-progress-percent'), + batchProgressStats: document.getElementById('batch-progress-stats'), exportBtn: document.getElementById('export-btn'), exportMenu: document.getElementById('export-menu'), selectAll: document.getElementById('select-all'), @@ -473,10 +487,11 @@ function selectAllPagesAction() { function updateBatchButtons() { const count = getEffectiveCount(); elements.batchDeleteBtn.disabled = count === 0; - elements.batchRefreshBtn.disabled = count === 0; - elements.batchValidateBtn.disabled = count === 0; + elements.batchRefreshBtn.disabled = count === 0 || batchOperationRunning; + elements.batchValidateBtn.disabled = count === 0 || batchOperationRunning; elements.batchUploadBtn.disabled = count === 0; - elements.batchCheckSubBtn.disabled = count === 0; + elements.batchCheckSubBtn.disabled = count === 0 || batchOperationRunning; + elements.batchConcurrency.disabled = batchOperationRunning; elements.exportBtn.disabled = count === 0; elements.batchDeleteBtn.textContent = count > 0 ? `🗑️ 删除 (${count})` : '🗑️ 批量删除'; @@ -484,6 +499,106 @@ function updateBatchButtons() { elements.batchValidateBtn.textContent = count > 0 ? `✅ 验证 (${count})` : '✅ 验证Token'; elements.batchUploadBtn.textContent = count > 0 ? `☁️ 上传 (${count})` : '☁️ 上传'; elements.batchCheckSubBtn.textContent = count > 0 ? `🔍 检测 (${count})` : '🔍 检测订阅'; + + if (batchOperationRunning && activeBatchButton) { + activeBatchButton.textContent = activeBatchButtonText; + } +} + +function getBatchConcurrency() { + const rawValue = parseInt(elements.batchConcurrency?.value || DEFAULT_BATCH_CONCURRENCY, 10); + const value = Number.isFinite(rawValue) ? rawValue : DEFAULT_BATCH_CONCURRENCY; + return Math.min(MAX_BATCH_CONCURRENCY, Math.max(MIN_BATCH_CONCURRENCY, value)); +} + +function setBatchOperationRunning(isRunning, button = null, busyText = '') { + batchOperationRunning = isRunning; + activeBatchButton = isRunning ? button : null; + activeBatchButtonText = isRunning ? busyText : ''; + updateBatchButtons(); +} + +function setBatchProgressIndeterminate(title, meta, statsText) { + elements.batchProgressPanel.style.display = 'block'; + elements.batchProgressTitle.textContent = title; + elements.batchProgressMeta.textContent = meta; + elements.batchProgressStats.textContent = statsText; + elements.batchProgressPercent.textContent = '--'; + elements.batchProgressBar.classList.add('indeterminate'); + elements.batchProgressBar.style.width = '100%'; +} + +function updateBatchProgress(state) { + const labels = state.labels || { success: '成功', failed: '失败' }; + const percent = state.total > 0 ? Math.round((state.completed / state.total) * 100) : 0; + + elements.batchProgressPanel.style.display = 'block'; + elements.batchProgressTitle.textContent = state.title; + elements.batchProgressMeta.textContent = state.meta; + elements.batchProgressPercent.textContent = `${percent}%`; + elements.batchProgressStats.textContent = + `总计 ${state.total} · 已完成 ${state.completed} · ${labels.success} ${state.success} · ${labels.failed} ${state.failed}`; + elements.batchProgressBar.classList.remove('indeterminate'); + elements.batchProgressBar.style.width = `${percent}%`; +} + +async function executeServerBatchOperation(options) { + const count = getEffectiveCount(); + if (count === 0) return; + + if (batchOperationRunning) { + toast.warning('已有批量任务正在执行,请等待当前任务完成'); + return; + } + + if (options.confirmMessage) { + const confirmed = await confirm(options.confirmMessage(count)); + if (!confirmed) return; + } + + setBatchOperationRunning(true, options.button, options.busyText); + const concurrency = getBatchConcurrency(); + setBatchProgressIndeterminate( + options.progressTitle, + `服务端执行中 · 并发 ${concurrency}`, + `已提交 ${count} 个账号` + ); + + try { + const result = await api.post( + options.endpoint, + buildBatchPayload({ concurrency }) + ); + const stats = options.extractStats(result, count); + + updateBatchProgress({ + title: options.progressTitle, + meta: `服务端批处理完成 · 并发 ${concurrency}`, + total: stats.total, + completed: stats.total, + success: stats.success, + failed: stats.failed, + labels: options.labels + }); + + toast[options.toastType || 'success'](options.summary(stats)); + if (options.afterComplete) { + await options.afterComplete(stats); + } + } catch (error) { + updateBatchProgress({ + title: options.progressTitle, + meta: `执行失败:${error.message}`, + total: 0, + completed: 0, + success: 0, + failed: 0, + labels: options.labels + }); + toast.error(`${options.errorPrefix}: ${error.message}`); + } finally { + setBatchOperationRunning(false); + } } // 刷新单个账号Token @@ -505,42 +620,46 @@ async function refreshToken(id) { // 批量刷新Token async function handleBatchRefresh() { - const count = getEffectiveCount(); - if (count === 0) return; - - const confirmed = await confirm(`确定要刷新选中的 ${count} 个账号的Token吗?`); - if (!confirmed) return; - - elements.batchRefreshBtn.disabled = true; - elements.batchRefreshBtn.textContent = '刷新中...'; - - try { - const result = await api.post('/accounts/batch-refresh', buildBatchPayload()); - toast.success(`成功刷新 ${result.success_count} 个,失败 ${result.failed_count} 个`); - loadAccounts(); - } catch (error) { - toast.error('批量刷新失败: ' + error.message); - } finally { - updateBatchButtons(); - } + await executeServerBatchOperation({ + button: elements.batchRefreshBtn, + busyText: '刷新中...', + progressTitle: '批量刷新 Token', + labels: { success: '成功', failed: '失败' }, + confirmMessage: (count) => `确定要刷新选中的 ${count} 个账号的Token吗?`, + endpoint: '/accounts/batch-refresh', + extractStats: (result, count) => ({ + total: result.success_count + result.failed_count || count, + success: result.success_count, + failed: result.failed_count + }), + summary: (stats) => `Token 刷新完成:成功 ${stats.success},失败 ${stats.failed}`, + errorPrefix: '批量刷新失败', + afterComplete: async () => { + await Promise.allSettled([loadStats(), loadAccounts()]); + } + }); } // 批量验证Token async function handleBatchValidate() { - if (getEffectiveCount() === 0) return; - - elements.batchValidateBtn.disabled = true; - elements.batchValidateBtn.textContent = '验证中...'; - - try { - const result = await api.post('/accounts/batch-validate', buildBatchPayload()); - toast.info(`有效: ${result.valid_count},无效: ${result.invalid_count}`); - loadAccounts(); - } catch (error) { - toast.error('批量验证失败: ' + error.message); - } finally { - updateBatchButtons(); - } + await executeServerBatchOperation({ + button: elements.batchValidateBtn, + busyText: '验证中...', + progressTitle: '批量验证 Token', + labels: { success: '有效', failed: '无效' }, + endpoint: '/accounts/batch-validate', + extractStats: (result, count) => ({ + total: result.valid_count + result.invalid_count || count, + success: result.valid_count, + failed: result.invalid_count + }), + summary: (stats) => `Token 验证完成:有效 ${stats.success},无效 ${stats.failed}`, + toastType: 'info', + errorPrefix: '批量验证失败', + afterComplete: async () => { + await Promise.allSettled([loadStats(), loadAccounts()]); + } + }); } // 查看账号详情 @@ -932,25 +1051,24 @@ async function markSubscription(id) { // 批量检测订阅状态 async function handleBatchCheckSubscription() { - const count = getEffectiveCount(); - if (count === 0) return; - const confirmed = await confirm(`确定要检测选中的 ${count} 个账号的订阅状态吗?`); - if (!confirmed) return; - - elements.batchCheckSubBtn.disabled = true; - elements.batchCheckSubBtn.textContent = '检测中...'; - - try { - const result = await api.post('/payment/accounts/batch-check-subscription', buildBatchPayload()); - let message = `成功: ${result.success_count}`; - if (result.failed_count > 0) message += `, 失败: ${result.failed_count}`; - toast.success(message); - loadAccounts(); - } catch (e) { - toast.error('批量检测失败: ' + e.message); - } finally { - updateBatchButtons(); - } + await executeServerBatchOperation({ + button: elements.batchCheckSubBtn, + busyText: '检测中...', + progressTitle: '批量检测订阅', + labels: { success: '成功', failed: '失败' }, + confirmMessage: (count) => `确定要检测选中的 ${count} 个账号的订阅状态吗?`, + endpoint: '/payment/accounts/batch-check-subscription', + extractStats: (result, count) => ({ + total: result.success_count + result.failed_count || count, + success: result.success_count, + failed: result.failed_count + }), + summary: (stats) => `订阅检测完成:成功 ${stats.success},失败 ${stats.failed}`, + errorPrefix: '批量检测失败', + afterComplete: async () => { + await loadAccounts(); + } + }); } // ============== Sub2API 上传 ============== diff --git a/templates/accounts.html b/templates/accounts.html index 32b67314..9c07115e 100644 --- a/templates/accounts.html +++ b/templates/accounts.html @@ -116,6 +116,20 @@