Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 60 additions & 1 deletion src/core/register.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import uuid
from typing import Optional, Dict, Any, Tuple, Callable, List
from dataclasses import dataclass
from datetime import datetime
from datetime import datetime, timezone

from curl_cffi import requests as cffi_requests

Expand Down Expand Up @@ -1858,6 +1858,52 @@ def _extract_account_id_from_access_token(self, access_token: str) -> str:
except Exception:
return ""

def _extract_token_timestamps(self, token: str) -> Tuple[Optional[datetime], Optional[datetime]]:
"""
从 JWT(access_token / id_token)payload 提取 iat / exp,返回 naive UTC datetime。
"""
try:
raw = str(token or "").strip()
if raw.count(".") < 2:
return None, None
payload = raw.split(".")[1]
import base64
pad = "=" * ((4 - (len(payload) % 4)) % 4)
decoded = base64.urlsafe_b64decode((payload + pad).encode("ascii"))
claims = json.loads(decoded.decode("utf-8"))
if not isinstance(claims, dict):
return None, None

iat = claims.get("iat")
exp = claims.get("exp")

issued_at = (
datetime.fromtimestamp(int(iat), tz=timezone.utc).replace(tzinfo=None)
if iat not in (None, "")
else None
)
expires_at = (
datetime.fromtimestamp(int(exp), tz=timezone.utc).replace(tzinfo=None)
if exp not in (None, "")
else None
)
return issued_at, expires_at
except Exception:
return None, None

@staticmethod
def _parse_iso_datetime_to_naive_utc(value: str) -> Optional[datetime]:
try:
text = str(value or "").strip()
if not text:
return None
parsed = datetime.fromisoformat(text.replace("Z", "+00:00"))
if parsed.tzinfo is None:
return parsed
return parsed.astimezone(timezone.utc).replace(tzinfo=None)
except Exception:
return None

def _ensure_native_required_tokens(self, result: RegistrationResult) -> bool:
"""
原生注册入口要求拿齐:
Expand Down Expand Up @@ -2995,6 +3041,17 @@ def save_to_database(
try:
# 获取默认 client_id
settings = get_settings()
token_issued_at, token_expires_at = self._extract_token_timestamps(
result.access_token or result.id_token
)
metadata_expires_at = None
if isinstance(result.metadata, dict):
metadata_expires_at = self._parse_iso_datetime_to_naive_utc(
str(result.metadata.get("expires") or "")
)
final_expires_at = token_expires_at or metadata_expires_at
now_utc_naive = datetime.now(timezone.utc).replace(tzinfo=None)
final_last_refresh = token_issued_at or (now_utc_naive if result.access_token else None)

with get_db() as db:
# 保存账户信息
Expand All @@ -3013,6 +3070,8 @@ def save_to_database(
refresh_token=result.refresh_token,
id_token=result.id_token,
proxy_used=self.proxy_url,
last_refresh=final_last_refresh,
expires_at=final_expires_at,
extra_data=result.metadata,
source=result.source,
account_label=account_label,
Expand Down
116 changes: 108 additions & 8 deletions src/core/upload/cpa_upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import json
import logging
import base64
from typing import List, Dict, Any, Tuple, Optional
from datetime import datetime
from urllib.parse import quote
Expand All @@ -14,11 +15,58 @@
from ...database.session import get_db
from ...database.models import Account
from ...config.settings import get_settings
from ..timezone_utils import utcnow_naive
from ..timezone_utils import UTC, SHANGHAI_TZ, utcnow_naive

logger = logging.getLogger(__name__)


def _decode_jwt_payload_unverified(token: str) -> Dict[str, Any]:
text = str(token or "").strip()
if not text or text.count(".") < 2:
return {}
try:
payload_b64 = text.split(".")[1]
padding = "=" * (-len(payload_b64) % 4)
raw = base64.urlsafe_b64decode((payload_b64 + padding).encode("utf-8"))
data = json.loads(raw.decode("utf-8"))
return data if isinstance(data, dict) else {}
except Exception:
return {}


def _parse_unix_ts_to_naive_utc(value: Any) -> Optional[datetime]:
try:
if value in (None, ""):
return None
ts = int(value)
return datetime.fromtimestamp(ts, tz=UTC).replace(tzinfo=None)
except Exception:
return None


def _parse_iso_to_naive_utc(value: Any) -> Optional[datetime]:
try:
text = str(value or "").strip()
if not text:
return None
dt = datetime.fromisoformat(text.replace("Z", "+00:00"))
if dt.tzinfo is None:
return dt
return dt.astimezone(UTC).replace(tzinfo=None)
except Exception:
return None


def _format_cpa_datetime(dt: Optional[datetime]) -> str:
if dt is None:
return ""
if dt.tzinfo is None:
dt = dt.replace(tzinfo=UTC)
else:
dt = dt.astimezone(UTC)
return dt.astimezone(SHANGHAI_TZ).strftime("%Y-%m-%dT%H:%M:%S+08:00")


def _normalize_cpa_auth_files_url(api_url: str) -> str:
"""将用户填写的 CPA 地址规范化为 auth-files 接口地址。"""
normalized = (api_url or "").strip().rstrip("/")
Expand Down Expand Up @@ -100,15 +148,67 @@ def generate_token_json(account: Account) -> dict:
Returns:
CPA 格式的 Token 字典
"""
access_token = str(account.access_token or "").strip()
id_token_raw = str(account.id_token or "").strip()
# 兼容历史下游:部分工具只从 id_token 提取 chatgpt_account_id。
# 当注册链路仅拿到 access_token 时,使用 access_token 作为兜底 id_token 供解析。
id_token = id_token_raw or access_token
refresh_token = str(account.refresh_token or "").strip()

claims = _decode_jwt_payload_unverified(access_token) or _decode_jwt_payload_unverified(id_token)
auth_claims = claims.get("https://api.openai.com/auth") if isinstance(claims, dict) else {}
auth_claims = auth_claims if isinstance(auth_claims, dict) else {}
profile_claims = claims.get("https://api.openai.com/profile") if isinstance(claims, dict) else {}
profile_claims = profile_claims if isinstance(profile_claims, dict) else {}

account_id = str(
account.account_id
or auth_claims.get("chatgpt_account_id")
or claims.get("chatgpt_account_id")
or ""
).strip()
workspace_id = str(account.workspace_id or account_id).strip()
chatgpt_user_id = str(
auth_claims.get("chatgpt_user_id")
or auth_claims.get("user_id")
or claims.get("chatgpt_user_id")
or claims.get("user_id")
or ""
).strip()

email = str(
account.email
or profile_claims.get("email")
or ""
).strip()

token_issued_at = _parse_unix_ts_to_naive_utc(claims.get("iat"))
token_expires_at = _parse_unix_ts_to_naive_utc(claims.get("exp"))
extra_data = account.extra_data if isinstance(account.extra_data, dict) else {}
metadata_expires_at = _parse_iso_to_naive_utc(extra_data.get("expires"))

last_refresh = account.last_refresh or token_issued_at or account.registered_at
expires_at = account.expires_at or token_expires_at or metadata_expires_at

client_id = str(
claims.get("client_id")
or account.client_id
or ""
).strip()

return {
"type": "codex",
"email": account.email,
"expired": account.expires_at.strftime("%Y-%m-%dT%H:%M:%S+08:00") if account.expires_at else "",
"id_token": account.id_token or "",
"account_id": account.account_id or "",
"access_token": account.access_token or "",
"last_refresh": account.last_refresh.strftime("%Y-%m-%dT%H:%M:%S+08:00") if account.last_refresh else "",
"refresh_token": account.refresh_token or "",
"email": email,
"expired": _format_cpa_datetime(expires_at),
"id_token": id_token,
"account_id": account_id,
"workspace_id": workspace_id,
"chatgpt_account_id": account_id,
"chatgpt_user_id": chatgpt_user_id,
"client_id": client_id,
"access_token": access_token,
"last_refresh": _format_cpa_datetime(last_refresh),
"refresh_token": refresh_token,
}


Expand Down
2 changes: 2 additions & 0 deletions src/database/crud.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ def create_account(
id_token: Optional[str] = None,
cookies: Optional[str] = None,
proxy_used: Optional[str] = None,
last_refresh: Optional['datetime'] = None,
expires_at: Optional['datetime'] = None,
extra_data: Optional[Dict[str, Any]] = None,
status: Optional[str] = None,
Expand Down Expand Up @@ -88,6 +89,7 @@ def create_account(
id_token=id_token,
cookies=cookies,
proxy_used=proxy_used,
last_refresh=last_refresh,
expires_at=expires_at,
extra_data=extra_data or {},
status=status or 'active',
Expand Down
89 changes: 89 additions & 0 deletions tests/test_cpa_upload_token_json.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
import base64
import json
from datetime import datetime

from src.core.upload.cpa_upload import generate_token_json
from src.database.models import Account


def _jwt_with_payload(payload: dict) -> str:
header = {"alg": "none", "typ": "JWT"}

def _b64(obj: dict) -> str:
raw = json.dumps(obj, separators=(",", ":"), ensure_ascii=False).encode("utf-8")
return base64.urlsafe_b64encode(raw).decode("ascii").rstrip("=")

return f"{_b64(header)}.{_b64(payload)}."


def test_generate_token_json_backfills_time_and_account_id_from_access_token():
payload = {
"iat": 1775019821, # 2026-04-01T05:03:41Z
"exp": 1775883821, # 2026-04-11T05:03:41Z
"https://api.openai.com/auth": {
"chatgpt_account_id": "acct_from_jwt",
},
"https://api.openai.com/profile": {
"email": "jwt@example.com",
},
}

account = Account(
email="ericvillegas9964@outlook.com",
email_service="luckmail",
access_token=_jwt_with_payload(payload),
account_id="",
last_refresh=None,
expires_at=None,
)

token_data = generate_token_json(account)

assert token_data["account_id"] == "acct_from_jwt"
assert token_data["email"] == "ericvillegas9964@outlook.com"
assert token_data["last_refresh"] == "2026-04-01T13:03:41+08:00"
assert token_data["expired"] == "2026-04-11T13:03:41+08:00"
assert token_data["id_token"] == account.access_token
assert token_data["workspace_id"] == "acct_from_jwt"
assert token_data["chatgpt_account_id"] == "acct_from_jwt"


def test_generate_token_json_prefers_db_timestamps_over_jwt_claims():
payload = {
"iat": 1775019821,
"exp": 1775883821,
"https://api.openai.com/auth": {"chatgpt_account_id": "acct_from_jwt"},
}

account = Account(
email="db@example.com",
email_service="luckmail",
access_token=_jwt_with_payload(payload),
account_id="acct_from_db",
last_refresh=datetime(2026, 4, 2, 0, 0, 0), # naive UTC
expires_at=datetime(2026, 4, 3, 0, 0, 0), # naive UTC
)

token_data = generate_token_json(account)

assert token_data["account_id"] == "acct_from_db"
assert token_data["last_refresh"] == "2026-04-02T08:00:00+08:00"
assert token_data["expired"] == "2026-04-03T08:00:00+08:00"
assert token_data["chatgpt_account_id"] == "acct_from_db"


def test_generate_token_json_uses_metadata_expires_when_db_and_jwt_missing():
account = Account(
email="meta@example.com",
email_service="luckmail",
access_token="",
account_id="acct_meta",
last_refresh=None,
expires_at=None,
extra_data={"expires": "2026-06-30T05:03:46.241Z"},
)

token_data = generate_token_json(account)

assert token_data["expired"] == "2026-06-30T13:03:46+08:00"
assert token_data["id_token"] == ""
12 changes: 12 additions & 0 deletions webui.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,18 @@ def main():
parser.add_argument("--access-password", help="Web UI 访问密钥 (也可通过 WEBUI_ACCESS_PASSWORD 环境变量设置)")
args = parser.parse_args()

# 先准备运行目录与数据库,再应用命令行/环境变量覆盖。
# 否则 update_settings 会在数据库未初始化时退回默认内存配置,
# 导致像 registration_entry_flow 这类已保存设置在重启后看起来“被重置”。
_load_dotenv()
data_dir = project_root / "data"
logs_dir = project_root / "logs"
data_dir.mkdir(exist_ok=True)
logs_dir.mkdir(exist_ok=True)
os.environ.setdefault("APP_DATA_DIR", str(data_dir))
os.environ.setdefault("APP_LOGS_DIR", str(logs_dir))
initialize_database()

# 更新配置
from src.config.settings import update_settings

Expand Down