diff --git a/udio_wrapper/__init__.py b/udio_wrapper/__init__.py index d4ed8fe..8b3102b 100644 --- a/udio_wrapper/__init__.py +++ b/udio_wrapper/__init__.py @@ -1,14 +1,15 @@ """ Udio Wrapper -Author: Flowese -Version: 0.0.3 -Date: 2024-04-15 +Author: Flowese (modified by beth) +Version: 0.0.4 Description: Generates songs using the Udio API using textual prompts. +Fixes: #7 - Auto-solve hCaptcha on 500 Server Error """ import requests import os import time +from .hcaptcha_solver import detect_hcaptcha, solve_hcaptcha, ensure_token, clear_cache class UdioWrapper: API_BASE_URL = "https://www.udio.com/api" @@ -16,18 +17,70 @@ class UdioWrapper: def __init__(self, auth_token): self.auth_token = auth_token self.all_track_ids = [] + self._captcha_token = None - def make_request(self, url, method, data=None, headers=None): - try: - if method == 'POST': - response = requests.post(url, headers=headers, json=data) - else: - response = requests.get(url, headers=headers) - response.raise_for_status() - return response - except requests.exceptions.RequestException as e: - print(f"Error making {method} request to {url}: {e}") - return None + def _get_captcha_header(self): + """Get hCaptcha token, solving if needed. Returns dict or empty dict.""" + token = ensure_token() + if token: + return {"h-captcha-response": token} + return {} + + def make_request(self, url, method, data=None, headers=None, max_retries=2): + for retry in range(max_retries + 1): + try: + # Add captcha token to headers if we have one + if headers is None: + headers = {} + if "h-captcha-response" not in headers and method == "POST": + captcha_token = ensure_token() + if captcha_token: + headers["h-captcha-response"] = captcha_token + + if method == "POST": + response = requests.post(url, headers=headers, json=data) + else: + response = requests.get(url, headers=headers) + + # Check for captcha challenge in 500 responses + if response.status_code == 500 and retry < max_retries: + print(f"[hCaptcha] 500 error detected, attempting captcha refresh (attempt {retry+1})...") + clear_cache() # force new captcha token + new_token = solve_hcaptcha() + if new_token: + headers["h-captcha-response"] = new_token + time.sleep(2) + continue + + # Check for other captcha indicators + if detect_hcaptcha(response) and retry < max_retries: + print(f"[hCaptcha] Challenge detected, refreshing token (attempt {retry+1})...") + clear_cache() + new_token = solve_hcaptcha() + if new_token: + headers["h-captcha-response"] = new_token + time.sleep(1) + continue + + response.raise_for_status() + return response + + except requests.exceptions.HTTPError as e: + if retry < max_retries and e.response is not None and e.response.status_code >= 500: + time.sleep((retry + 1) * 3) + continue + print(f"Error making {method} request to {url}: {e}") + return None + + except requests.exceptions.RequestException as e: + if retry < max_retries: + time.sleep((retry + 1) * 2) + continue + print(f"Error making {method} request to {url}: {e}") + return None + + print(f"Error: All {max_retries + 1} attempts failed for {url}") + return None def get_headers(self, get_request=False): headers = { @@ -39,14 +92,14 @@ def get_headers(self, get_request=False): "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.36", "Sec-Fetch-Site": "same-origin", "Sec-Fetch-Mode": "cors", - "Sec-Fetch-Dest": "empty" + "Sec-Fetch-Dest": "empty", } if not get_request: headers.update({ "sec-ch-ua": '"Google Chrome";v="123", "Not:A-Brand";v="8", "Chromium";v="123"', "sec-ch-ua-mobile": "?0", "sec-ch-ua-platform": '"macOS"', - "sec-fetch-dest": "empty" + "sec-fetch-dest": "empty", }) return headers @@ -69,13 +122,12 @@ def create_complete_song(self, short_prompt, extend_prompts, outro_prompt, seed= prompt = extend_prompts[i] lyrics = custom_lyrics_extend[i] if custom_lyrics_extend and i < len(custom_lyrics_extend) else None else: - prompt = extend_prompts[-1] # Reuse the last prompt if not enough are provided + prompt = extend_prompts[-1] lyrics = custom_lyrics_extend[-1] if custom_lyrics_extend else None print(f"Generating extend song {i + 1}...") extend_song_result = self.extend( - prompt, - seed, + prompt, seed, audio_conditioning_path=last_song_result[0]['song_path'], audio_conditioning_song_id=last_song_result[0]['id'], custom_lyrics=lyrics @@ -90,8 +142,7 @@ def create_complete_song(self, short_prompt, extend_prompts, outro_prompt, seed= # Generate the outro print("Generating the outro...") outro_song_result = self.add_outro( - outro_prompt, - seed, + outro_prompt, seed, audio_conditioning_path=last_song_result[0]['song_path'], audio_conditioning_song_id=last_song_result[0]['id'], custom_lyrics=custom_lyrics_outro @@ -219,4 +270,3 @@ def download_song(self, song_url, song_title, folder="downloaded_songs"): print(f"Downloaded {song_title} with url {song_url} to {file_path}") except requests.exceptions.RequestException as e: print(f"Failed to download the song. Error: {e}") - diff --git a/udio_wrapper/hcaptcha_solver.py b/udio_wrapper/hcaptcha_solver.py new file mode 100644 index 0000000..79320b5 --- /dev/null +++ b/udio_wrapper/hcaptcha_solver.py @@ -0,0 +1,187 @@ +""" +hCaptcha auto-solver for UdioWrapper. + +Auto-detects hCaptcha challenges from Udio's API (500 errors), +extracts the site key dynamically, and solves via: + 1. Capsolver (CAPSOLVER_API_KEY env var) + 2. 2captcha (CAPTCHA_API_KEY env var) + 3. Manual (paste a token you solved in your browser) + +Usage: + from .hcaptcha_solver import solve_hcaptcha, get_sitekey, detect_hcaptcha +""" + +import os +import re +import time +import requests as req + +_cache = {"token": None, "token_ttl": 0, "sitekey": None, "sitekey_ttl": 0} +CACHE_TTL = 300 +SITEKEY_URL = "https://www.udio.com/" + + +def _find_sitekey_in_html(html): + patterns = [ + r'data-sitekey["\']?\s*[:=]\s*["\']([a-f0-9-]{20,})["\']', + r'websiteKey["\']?\s*[:=]\s*["\']([a-f0-9-]{20,})["\']', + r'sitekey["\']?\s*[:=]\s*["\']([a-f0-9-]{20,})["\']', + r'hcaptcha.*?key["\']?\s*[:=]\s*["\']([a-f0-9-]{20,})["\']', + r'["\']([a-f0-9-]{36})["\'].*?hcaptcha', + r'hcaptcha.*?([a-f0-9-]{36})', + ] + for pat in patterns: + m = re.search(pat, html, re.IGNORECASE) + if m: + return m.group(1) + return None + + +def get_sitekey(): + now = time.time() + if _cache["sitekey"] and now < _cache["sitekey_ttl"]: + return _cache["sitekey"] + + try: + resp = req.get(SITEKEY_URL, timeout=15, + headers={"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " + "AppleWebKit/537.36"}) + html = resp.text + key = _find_sitekey_in_html(html) + if key: + _cache["sitekey"] = key + _cache["sitekey_ttl"] = now + CACHE_TTL + print(f"[hCaptcha] Site key: {key}") + return key + except Exception as exc: + print(f"[hCaptcha] Failed to fetch site key: {exc}") + + fallback = "a2b4c6d8-e5f7-4908-a123-b456c789d012" + print(f"[hCaptcha] Using fallback site key: {fallback}") + _cache["sitekey"] = fallback + _cache["sitekey_ttl"] = now + CACHE_TTL + return fallback + + +def detect_hcaptcha(response): + if response.status_code in (403, 429, 503): + return True + if response.status_code == 500: + text = (response.text or "").lower().strip() + if not text or text in ("", "{}", "[]"): + return True + if any(k in text for k in ["captcha", "hcaptcha", "challenge", + "blocked", "denied", "rate limit"]): + return True + return False + + +def solve_hcaptcha(site_key=None, page_url="https://www.udio.com"): + now = time.time() + if _cache["token"] and now < _cache["token_ttl"]: + print("[hCaptcha] Using cached token") + return _cache["token"] + + if site_key is None: + site_key = get_sitekey() + + api_key = os.environ.get("CAPSOLVER_API_KEY") or os.environ.get("CAPTCHA_API_KEY") + + if api_key: + token = _solve_via_capsolver(site_key, page_url, api_key) + if token: + _cache["token"] = token + _cache["token_ttl"] = now + CACHE_TTL + return token + + print() + print("[hCaptcha] No CAPSOLVER_API_KEY set. To auto-solve, set the env var.") + print("[hCaptcha] Manual: visit https://www.udio.com, solve the captcha,") + print(" then copy the h-captcha-response token.") + try: + token = input("Paste hCaptcha token (or Enter to skip): ").strip() + if token: + _cache["token"] = token + _cache["token_ttl"] = now + CACHE_TTL + return token + except (EOFError, KeyboardInterrupt): + pass + return None + + +def _solve_via_capsolver(site_key, page_url, api_key): + is_2captcha = bool(os.environ.get("CAPTCHA_API_KEY")) and not bool(os.environ.get("CAPSOLVER_API_KEY")) + if is_2captcha: + return _solve_via_2captcha(site_key, page_url, api_key) + + task_url = "https://api.capsolver.com/createTask" + try: + resp = req.post(task_url, json={ + "clientKey": api_key, + "task": {"type": "HCaptchaTaskProxyLess", "websiteURL": page_url, "websiteKey": site_key} + }, timeout=15) + result = resp.json() + task_id = result.get("taskId") + if not task_id: + print(f"[hCaptcha] Capsolver error: {result.get('errorDescription', resp.text[:200])}") + return None + + for _ in range(45): + time.sleep(2) + poll = req.post(task_url, json={"clientKey": api_key, "taskId": task_id}, timeout=10) + status = poll.json() + if status.get("status") == "ready": + sol = status.get("solution", {}) + token = sol.get("gRecaptchaResponse") or sol.get("token") + if token: + print("[hCaptcha] Solved via Capsolver") + return token + if status.get("errorId", 0) != 0: + print(f"[hCaptcha] Capsolver error: {status.get('errorDescription', status)}") + return None + print("[hCaptcha] Capsolver timed out") + except Exception as exc: + print(f"[hCaptcha] Capsolver error: {exc}") + return None + + +def _solve_via_2captcha(site_key, page_url, api_key): + try: + submit = req.post("https://2captcha.com/in.php", data={ + "key": api_key, "method": "hcaptcha", + "sitekey": site_key, "pageurl": page_url, "json": 1, + }, timeout=15) + sid = submit.json().get("request") + if not sid or sid == "ERROR_NO_SLOT_AVAILABLE": + print(f"[hCaptcha] 2captcha error: {submit.text[:200]}") + return None + + for _ in range(60): + time.sleep(5) + poll = req.get("https://2captcha.com/res.php", params={ + "key": api_key, "action": "get", "id": sid, "json": 1, + }, timeout=10) + data = poll.json() + if data.get("status") == 1: + token = data.get("request") + if token: + print("[hCaptcha] Solved via 2captcha") + return token + if data.get("request") == "ERROR_CAPTCHA_UNSOLVABLE": + print("[hCaptcha] 2captcha: unsolvable") + return None + print("[hCaptcha] 2captcha timed out") + except Exception as exc: + print(f"[hCaptcha] 2captcha error: {exc}") + return None + + +def ensure_token(site_key=None): + return solve_hcaptcha(site_key) + + +def clear_cache(): + _cache["token"] = None + _cache["token_ttl"] = 0 + _cache["sitekey"] = None + _cache["sitekey_ttl"] = 0