Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
174 changes: 167 additions & 7 deletions src/core/upload/sub2api_upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,25 +3,165 @@
将账号以 sub2api-data 格式批量导入到 Sub2API 平台
"""

import json
import logging
from datetime import datetime, timezone
from typing import List, Tuple, Optional
from typing import Any, List, Optional, Tuple

from curl_cffi import requests as cffi_requests

from ...database.session import get_db
from ...database.models import Account
from ...database.session import get_db

logger = logging.getLogger(__name__)


def _extract_sub2api_data(response) -> Any:
try:
payload = response.json()
except Exception:
payload = None

if response.status_code >= 400:
if isinstance(payload, dict) and payload.get("message"):
raise ValueError(str(payload["message"]))
response_text = (response.text or "").strip()
raise ValueError(
f"HTTP {response.status_code}" + (f" - {response_text[:200]}" if response_text else "")
)

if isinstance(payload, dict):
if payload.get("code") not in (None, 0):
raise ValueError(str(payload.get("message") or "Sub2API 返回错误"))
return payload.get("data", payload)

if payload is None:
raise ValueError("Sub2API 返回了无法解析的响应")

return payload


def _normalize_remote_sub2api_proxy(proxy: dict) -> dict:
protocol = str(proxy.get("protocol") or "").strip()
host = str(proxy.get("host") or "").strip()
try:
port = int(proxy.get("port") or 0)
except (TypeError, ValueError):
port = 0

normalized = {
"id": proxy.get("id"),
"name": str(proxy.get("name") or "").strip(),
"protocol": protocol,
"host": host,
"port": port,
"username": str(proxy.get("username") or "").strip(),
"password": str(proxy.get("password") or "").strip(),
"status": str(proxy.get("status") or "inactive").strip() or "inactive",
}
if not normalized["name"]:
if normalized["id"] is not None:
normalized["name"] = f"Proxy {normalized['id']}"
else:
normalized["name"] = f"{protocol}://{host}:{port}"
return normalized


def _build_sub2api_proxy_key(proxy: dict) -> str:
normalized = _normalize_remote_sub2api_proxy(proxy)
return (
f"{normalized['protocol']}|{normalized['host']}|{normalized['port']}|"
f"{normalized['username']}|{normalized['password']}"
)


def _build_sub2api_proxy_payload(proxy: dict) -> dict:
normalized = _normalize_remote_sub2api_proxy(proxy)
if not normalized["protocol"] or not normalized["host"] or normalized["port"] <= 0:
raise ValueError(f"远端 Sub2API 代理 {normalized['id']} 配置不完整,无法生成上传数据")

return {
"proxy_key": _build_sub2api_proxy_key(normalized),
"name": normalized["name"],
"protocol": normalized["protocol"],
"host": normalized["host"],
"port": normalized["port"],
"username": normalized["username"],
"password": normalized["password"],
"status": normalized["status"],
}


def fetch_remote_sub2api_proxies(api_url: str, api_key: str) -> List[dict]:
if not api_url:
raise ValueError("Sub2API URL 未配置")
if not api_key:
raise ValueError("Sub2API API Key 未配置")

url = api_url.rstrip("/") + "/api/v1/admin/proxies/all"

try:
response = cffi_requests.get(
url,
headers={"x-api-key": api_key},
proxies=None,
timeout=15,
impersonate="chrome110",
)
data = _extract_sub2api_data(response)
if not isinstance(data, list):
raise ValueError("远端 Sub2API 代理列表格式异常")
return [_normalize_remote_sub2api_proxy(item) for item in data if isinstance(item, dict)]
except ValueError:
raise
except cffi_requests.exceptions.ConnectionError as e:
raise ValueError(f"无法连接到远端 Sub2API 服务: {e}") from e
except cffi_requests.exceptions.Timeout as e:
raise ValueError("拉取远端 Sub2API 代理列表超时") from e
except Exception as e:
logger.error(f"拉取远端 Sub2API 代理列表异常: {e}")
raise ValueError(f"拉取远端 Sub2API 代理列表失败: {e}") from e


def fetch_remote_sub2api_proxy(api_url: str, api_key: str, proxy_id: int) -> dict:
if proxy_id is None:
raise ValueError("远端 Sub2API 代理 ID 不能为空")
if not api_url:
raise ValueError("Sub2API URL 未配置")
if not api_key:
raise ValueError("Sub2API API Key 未配置")

url = api_url.rstrip("/") + f"/api/v1/admin/proxies/{proxy_id}"

try:
response = cffi_requests.get(
url,
headers={"x-api-key": api_key},
proxies=None,
timeout=15,
impersonate="chrome110",
)
data = _extract_sub2api_data(response)
if not isinstance(data, dict):
raise ValueError("远端 Sub2API 代理详情格式异常")
return _normalize_remote_sub2api_proxy(data)
except ValueError:
raise
except cffi_requests.exceptions.ConnectionError as e:
raise ValueError(f"无法连接到远端 Sub2API 服务: {e}") from e
except cffi_requests.exceptions.Timeout as e:
raise ValueError("拉取远端 Sub2API 代理详情超时") from e
except Exception as e:
logger.error(f"拉取远端 Sub2API 代理详情异常: {e}")
raise ValueError(f"拉取远端 Sub2API 代理详情失败: {e}") from e


def upload_to_sub2api(
accounts: List[Account],
api_url: str,
api_key: str,
concurrency: int = 3,
priority: int = 50,
proxy_id: Optional[int] = None,
) -> Tuple[bool, str]:
"""
上传账号列表到 Sub2API 平台(不走代理)
Expand All @@ -32,6 +172,7 @@ def upload_to_sub2api(
api_key: Admin API Key(x-api-key header)
concurrency: 账号并发数,默认 3
priority: 账号优先级,默认 50
proxy_id: 远端 Sub2API 代理 ID,用于生成 Sub2API 识别的 proxy_key(可选)

Returns:
(成功标志, 消息)
Expand All @@ -47,12 +188,20 @@ def upload_to_sub2api(

exported_at = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")

proxy_payload = None
if proxy_id is not None:
try:
remote_proxy = fetch_remote_sub2api_proxy(api_url, api_key, proxy_id)
proxy_payload = _build_sub2api_proxy_payload(remote_proxy)
except ValueError as e:
return False, str(e)

account_items = []
for acc in accounts:
if not acc.access_token:
continue
expires_at = int(acc.expires_at.timestamp()) if acc.expires_at else 0
account_items.append({
account_item = {
"name": acc.email,
"platform": "openai",
"type": "oauth",
Expand Down Expand Up @@ -82,7 +231,10 @@ def upload_to_sub2api(
"priority": priority,
"rate_multiplier": 1,
"auto_pause_on_expired": True,
})
}
if proxy_payload is not None:
account_item["proxy_key"] = proxy_payload["proxy_key"]
account_items.append(account_item)

if not account_items:
return False, "所有账号均缺少 access_token,无法上传"
Expand All @@ -92,7 +244,7 @@ def upload_to_sub2api(
"type": "sub2api-data",
"version": 1,
"exported_at": exported_at,
"proxies": [],
"proxies": [proxy_payload] if proxy_payload is not None else [],
"accounts": account_items,
},
"skip_default_group_bind": True,
Expand Down Expand Up @@ -138,6 +290,7 @@ def batch_upload_to_sub2api(
api_key: str,
concurrency: int = 3,
priority: int = 50,
proxy_id: Optional[int] = None,
) -> dict:
"""
批量上传指定 ID 的账号到 Sub2API 平台
Expand Down Expand Up @@ -169,7 +322,14 @@ def batch_upload_to_sub2api(
if not accounts:
return results

success, message = upload_to_sub2api(accounts, api_url, api_key, concurrency, priority)
success, message = upload_to_sub2api(
accounts,
api_url,
api_key,
concurrency,
priority,
proxy_id=proxy_id,
)

if success:
for acc in accounts:
Expand Down
4 changes: 3 additions & 1 deletion src/database/crud.py
Original file line number Diff line number Diff line change
Expand Up @@ -596,13 +596,15 @@ def create_sub2api_service(
api_url: str,
api_key: str,
enabled: bool = True,
priority: int = 0
priority: int = 0,
default_remote_proxy_id: Optional[int] = None,
) -> Sub2ApiService:
"""创建 Sub2API 服务配置"""
svc = Sub2ApiService(
name=name,
api_url=api_url,
api_key=api_key,
default_remote_proxy_id=default_remote_proxy_id,
enabled=enabled,
priority=priority,
)
Expand Down
1 change: 1 addition & 0 deletions src/database/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ class Sub2ApiService(Base):
name = Column(String(100), nullable=False) # 服务名称
api_url = Column(String(500), nullable=False) # API URL (host)
api_key = Column(Text, nullable=False) # x-api-key
default_remote_proxy_id = Column(Integer, nullable=True) # 默认远端代理 ID
enabled = Column(Boolean, default=True)
priority = Column(Integer, default=0) # 优先级
created_at = Column(DateTime, default=datetime.utcnow)
Expand Down
1 change: 1 addition & 0 deletions src/database/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ def migrate_tables(self):
("accounts", "subscription_at", "DATETIME"),
("accounts", "cookies", "TEXT"),
("proxies", "is_default", "BOOLEAN DEFAULT 0"),
("sub2api_services", "default_remote_proxy_id", "INTEGER"),
]

# 确保新表存在(create_tables 已处理,此处兜底)
Expand Down
26 changes: 19 additions & 7 deletions src/web/routes/accounts.py
Original file line number Diff line number Diff line change
Expand Up @@ -796,6 +796,7 @@ class Sub2ApiUploadRequest(BaseModel):
service_id: Optional[int] = None
concurrency: int = 3
priority: int = 50
proxy_id: Optional[int] = None # 远端 Sub2API 代理 ID


class BatchSub2ApiUploadRequest(BaseModel):
Expand All @@ -808,6 +809,7 @@ class BatchSub2ApiUploadRequest(BaseModel):
service_id: Optional[int] = None # 指定 Sub2API 服务 ID,不传则使用第一个启用的
concurrency: int = 3
priority: int = 50
proxy_id: Optional[int] = None # 远端 Sub2API 代理 ID


@router.post("/batch-upload-sub2api")
Expand All @@ -817,6 +819,7 @@ async def batch_upload_accounts_to_sub2api(request: BatchSub2ApiUploadRequest):
# 解析指定的 Sub2API 服务
api_url = None
api_key = None
svc = None
if request.service_id:
with get_db() as db:
svc = crud.get_sub2api_service_by_id(db, request.service_id)
Expand All @@ -828,10 +831,11 @@ async def batch_upload_accounts_to_sub2api(request: BatchSub2ApiUploadRequest):
with get_db() as db:
svcs = crud.get_sub2api_services(db, enabled=True)
if svcs:
api_url = svcs[0].api_url
api_key = svcs[0].api_key
svc = svcs[0]
api_url = svc.api_url
api_key = svc.api_key

if not api_url or not api_key:
if not api_url or not api_key or not svc:
raise HTTPException(status_code=400, detail="未找到可用的 Sub2API 服务,请先在设置中配置")

with get_db() as db:
Expand All @@ -840,10 +844,13 @@ async def batch_upload_accounts_to_sub2api(request: BatchSub2ApiUploadRequest):
request.status_filter, request.email_service_filter, request.search_filter
)

selected_proxy_id = request.proxy_id if "proxy_id" in request.model_fields_set else svc.default_remote_proxy_id

results = batch_upload_to_sub2api(
ids, api_url, api_key,
concurrency=request.concurrency,
priority=request.priority,
proxy_id=selected_proxy_id,
)
return results

Expand All @@ -855,9 +862,11 @@ async def upload_account_to_sub2api(account_id: int, request: Optional[Sub2ApiUp
service_id = request.service_id if request else None
concurrency = request.concurrency if request else 3
priority = request.priority if request else 50
proxy_id = request.proxy_id if request and "proxy_id" in request.model_fields_set else None

api_url = None
api_key = None
svc = None
if service_id:
with get_db() as db:
svc = crud.get_sub2api_service_by_id(db, service_id)
Expand All @@ -869,10 +878,11 @@ async def upload_account_to_sub2api(account_id: int, request: Optional[Sub2ApiUp
with get_db() as db:
svcs = crud.get_sub2api_services(db, enabled=True)
if svcs:
api_url = svcs[0].api_url
api_key = svcs[0].api_key
svc = svcs[0]
api_url = svc.api_url
api_key = svc.api_key

if not api_url or not api_key:
if not api_url or not api_key or not svc:
raise HTTPException(status_code=400, detail="未找到可用的 Sub2API 服务,请先在设置中配置")

with get_db() as db:
Expand All @@ -884,7 +894,9 @@ async def upload_account_to_sub2api(account_id: int, request: Optional[Sub2ApiUp

success, message = upload_to_sub2api(
[account], api_url, api_key,
concurrency=concurrency, priority=priority
concurrency=concurrency,
priority=priority,
proxy_id=proxy_id if request and "proxy_id" in request.model_fields_set else svc.default_remote_proxy_id,
)
if success:
return {"success": True, "message": message}
Expand Down
7 changes: 6 additions & 1 deletion src/web/routes/registration.py
Original file line number Diff line number Diff line change
Expand Up @@ -462,7 +462,12 @@ def _run_sync_registration_task(task_uuid: str, email_service_type: str, proxy:
if not _svc:
continue
log_callback(f"[Sub2API] 正在把账号发往服务站: {_svc.name}")
_ok, _msg = upload_to_sub2api([saved_account], _svc.api_url, _svc.api_key)
_ok, _msg = upload_to_sub2api(
[saved_account],
_svc.api_url,
_svc.api_key,
proxy_id=_svc.default_remote_proxy_id,
)
log_callback(f"[Sub2API] {'成功' if _ok else '失败'}({_svc.name}): {_msg}")
except Exception as _e:
log_callback(f"[Sub2API] 异常({_sid}): {_e}")
Expand Down
Loading
Loading