-
Notifications
You must be signed in to change notification settings - Fork 50
Description
On every stream attempted the log returns "[PlayStream] CHANNEL_KEY not found."
Is there a noted issue or is there a work around in place?
looking through the addon.py script i can see the attempt to look for the channel key via consts referring to some sort of channel_key
Channel key has now changed to obfuscated variables which can easily be grabbed through a trial and error process on the server_lookup api call; https://chevy.giokko.ru/server_lookup?channel_id=[var value]
After getting the correct server key I then noticed that the authorisation is also failing...
Which after viewing the iframe video source has also moved to obfuscated variables... nice.
With a little fucking around you can still get the full auth, client token and client country.
Which returns a valid mono.css link, after requesting to this final hls link it does return 200, yet I cannot get it to play, nor the m3u8 playlist link.... So is this fully broken? I did ask someone at daddylive re this as only certain players work for different streams, as we know this only looks at player2 which I couldn't get to work for any stream, I was told the player would be fixed "soon" so possibly a daddylive issue?
Anyway here's my full testing script on a single well known channel;
import requests
import re
import base64
import json
from urllib.parse import urlparse, urljoin, quote_plus
import sys
class MockKodi:
@staticmethod
def getLanguage(format_code):
return 'en'
xbmc = MockKodi()
def extract_obfuscated_token(page):
"""Extract token from obfuscated JavaScript"""
# Pattern 1: Look for SESSION_TOKEN assignment
session_pattern = r"window\.SESSION_TOKEN\s*=\s*(\w+);"
session_match = re.search(session_pattern, page)
if not session_match:
return None
token_var = session_match.group(1)
# Find the variable definition
var_pattern = rf"const\s+{token_var}\s*=\s*(\w+)\((\w+)\.join\(''\)\);"
var_match = re.search(var_pattern, page)
if not var_match:
return None
decoder_func = var_match.group(1) # _159a0334
data_var = var_match.group(2) # _56eb45b0a
# Find the decoder function
decoder_pattern = rf"const\s+{decoder_func}\s*=\s*\(s\)\s*=>\s*(\w+)\(s\);"
decoder_match = re.search(decoder_pattern, page)
if decoder_match:
decode_func = decoder_match.group(1) # "atob"
else:
# Try inline function
decoder_pattern = rf"const\s+{decoder_func}\s*=\s*\(s\)\s*=>\s*([^;]+);"
decoder_match = re.search(decoder_pattern, page)
if decoder_match:
decode_func = decoder_match.group(1)
else:
decode_func = "atob" # Default to atob
# Find the data array
array_pattern = rf"const\s+{data_var}\s*=\s*\[(.*?)\];"
array_match = re.search(array_pattern, page, re.DOTALL)
if not array_match:
return None
# Extract array values
array_content = array_match.group(1)
# Parse the array - remove quotes and split
chunks = re.findall(r'["\']([^"\']+)["\']', array_content)
if not chunks:
return None
# Combine chunks and decode
combined = ''.join(chunks)
try:
if 'atob' in decode_func.lower() or 'b64' in decode_func.lower():
# Base64 decode
decoded = base64.b64decode(combined).decode('utf-8')
return decoded
else:
# Unknown decoder
log(f"Unknown decoder function: {decode_func}")
return combined
except Exception as e:
log(f"Decode error: {e}")
return None
def extract_auth_data_modern(page):
"""Extract auth data from modern obfuscated JavaScript"""
auth_data = {
'token': None,
'country': None,
'ts': None,
'heartbeat': None
}
# Try to extract SESSION_TOKEN
session_token = extract_obfuscated_token(page)
if session_token:
auth_data['token'] = session_token
log(f"Extracted SESSION_TOKEN: {session_token[:50]}...")
# Look for other auth patterns
patterns = [
(r'["\']country["\']\s*:\s*["\']([^"\']+)["\']', 'country'),
(r'country\s*=\s*["\']([^"\']+)["\']', 'country'),
(r'["\']ts["\']\s*:\s*["\']([^"\']+)["\']', 'ts'),
(r'timestamp["\']?\s*:\s*["\']([^"\']+)["\']', 'ts'),
(r'["\']time["\']\s*:\s*["\']([^"\']+)["\']', 'ts'),
]
for pattern, key in patterns:
match = re.search(pattern, page, re.I)
if match:
auth_data[key] = match.group(1)
log(f"Found {key}: {auth_data[key]}")
# Look for heartbeat URL
hb_patterns = [
r'https://([^/]+)/heartbeat',
r'["\']heartbeat["\']\s*:\s*["\']([^"\']+)["\']',
r'heartbeat\s*=\s*["\']([^"\']+)["\']',
]
for pattern in hb_patterns:
match = re.search(pattern, page, re.I)
if match:
if 'https://' in match.group(0):
auth_data['heartbeat'] = match.group(0)
else:
auth_data['heartbeat'] = match.group(1)
log(f"Found heartbeat: {auth_data['heartbeat']}")
break
return auth_data
def get_active_base():
"""Mock function - returns base URL"""
return "https://daddyhd.com"
def log(msg):
"""Simple logging"""
print(f"[TEST] {msg}")
def find_all_const_variables(page):
"""Find ALL const variables in the page"""
const_pattern = r'const\s+(\w+)\s*=\s*["\']([^"\']+)["\']'
return re.findall(const_pattern, page, re.I)
def test_channel_key_candidate(candidate_value, page_url, session, UA):
"""Test if a candidate value works as a channel key"""
# Try server lookup with this candidate
server_lookup_url = f"https://chevy.giokko.ru/server_lookup?channel_id={quote_plus(candidate_value)}"
headers = {
'User-Agent': UA,
'Referer': page_url,
'Origin': urlparse(page_url).scheme + '://' + urlparse(page_url).netloc,
}
try:
response = session.get(server_lookup_url, headers=headers, timeout=5)
if response.status_code == 200:
data = response.json()
if 'server_key' in data:
return True, data['server_key']
except:
pass
return False, None
def find_channel_key_in_page(page, page_url, session, UA):
"""Find channel key in HTML - try multiple approaches"""
# Approach 1: Original pattern
ck_rx = re.compile(
r'const\s+(?:CHANNEL_KEY|CHANNEL_ID|CH(?:ANNEL)?_?KEY?)\s*=\s*["\']([^"\']+)["\']',
re.I
)
m_ck = ck_rx.search(page)
if m_ck:
key = m_ck.group(1).strip()
log(f"Found with original pattern: {key}")
return key
# Approach 2: Find ALL const variables and test them
log("Trying all const variables...")
all_consts = find_all_const_variables(page)
for var_name, var_value in all_consts:
var_value = var_value.strip()
# Skip obviously wrong values
if len(var_value) < 2 or len(var_value) > 50:
continue
# Skip common non-channel values
skip_patterns = [
'http://', 'https://', '.php', '.js', '.css',
'function', 'undefined', 'null', 'true', 'false'
]
if any(pattern in var_value.lower() for pattern in skip_patterns):
continue
if not var_name.startswith("_"):
continue
log(f"Testing const {var_name} = '{var_value}'")
# Test if this works as a channel key
works, server_key = test_channel_key_candidate(var_value, page_url, session, UA)
if works:
log(f"✅ SUCCESS! Channel key: {var_value} (from var {var_name})")
log(f"Server key: {server_key}")
return var_value
# Approach 3: Look for patterns like "premium35", "channel35", etc.
pattern = re.compile(r'["\'](premium\d+|channel\d+|stream\d+|vid\d+|live\d+)["\']', re.I)
matches = pattern.findall(page)
for match in matches:
log(f"Testing pattern match: {match}")
works, server_key = test_channel_key_candidate(match, page_url, session, UA)
if works:
log(f"✅ SUCCESS! Channel key: {match}")
log(f"Server key: {server_key}")
return match
return None
def test_playstream(stream_link):
"""Test PlayStream function with ALL security checks and return final HLS URL"""
UA = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"
session = requests.Session()
try:
base = get_active_base()
print(f"\n{'='*60}")
print(f"Testing stream link: {stream_link}")
print('='*60)
def _origin(u):
try:
p = urlparse(u)
return f'{p.scheme}://{p.netloc}'
except:
return ''
def _fetch(u, ref=None, note=''):
headers = {
'User-Agent': UA,
'Referer': base,
'Origin': _origin(base),
}
if ref:
headers['Referer'] = ref
headers['Origin'] = _origin(ref)
log(f'GET {u} (ref={headers.get("Referer")}) {note}')
r = session.get(u, headers=headers, timeout=10)
r.raise_for_status()
return r.text, r.url
# Step 1: Fetch initial page
html1, url1 = _fetch(stream_link, ref=base, note='entry')
# Step 2: Try to find player URLs with fallbacks
player_urls = []
# First, try Player 2
m = re.search(r'data-url="([^"]+)"\s+title="PLAYER\s*2"', html1, re.I)
if m:
url2 = urljoin(url1, m.group(1).replace('//cast', '/cast'))
player_urls.append(('player2', url2))
# Try other players (1, 3, 4, 5)
for player_num in [1, 3, 4, 5]:
m = re.search(rf'data-url="([^"]+)"\s+title="PLAYER\s*{player_num}"', html1, re.I)
if m:
url = urljoin(url1, m.group(1).replace('//cast', '/cast'))
player_urls.append((f'player{player_num}', url))
# Fallback to iframe
if not player_urls:
m = re.search(r'iframe\s+src="([^"]+)"', html1, re.I)
if m:
url2 = urljoin(url1, m.group(1))
player_urls.append(('iframe', url2))
if not player_urls:
log('No iframe/player found after entry page.')
return None
# Step 3: Try each player URL until we find channel key
channel_key = None
page = None
page_url = None
last_error = None
for player_name, player_url in player_urls:
try:
log(f"Trying {player_name}: {player_url}")
# Fetch player iframe
html2, url2_final = _fetch(player_url, ref=url1, note=f'{player_name}/outer iframe')
# Check for inner iframe
m_if = re.search(r'iframe\s+src="([^"]+)"', html2, re.I)
if m_if:
url3 = urljoin(url2_final, m_if.group(1))
html3, url3_final = _fetch(url3, ref=url2_final, note='inner iframe')
test_page = html3
test_page_url = url3_final
else:
test_page = html2
test_page_url = url2_final
# Try to find channel key with new method
test_channel_key = find_channel_key_in_page(test_page, test_page_url, session, UA)
if test_channel_key:
channel_key = test_channel_key
page = test_page
page_url = test_page_url
log(f"✓ Found CHANNEL_KEY with {player_name}: {channel_key}")
break
else:
log(f"✗ No CHANNEL_KEY with {player_name}")
except Exception as e:
last_error = f"{player_name} error: {e}"
log(f"✗ {last_error}")
continue
# If no player worked, try alternative source paths
if not channel_key and player_urls:
log("Trying alternative source paths...")
original_url = player_urls[0][1] # First player URL
# Extract channel ID from URL or use default
channel_id_match = re.search(r'stream-(\d+)', original_url)
channel_id = channel_id_match.group(1) if channel_id_match else '35'
# Generate alternative URLs
source_paths = ['stream', 'cast', 'watch', 'plus', 'casting', 'player']
for source in source_paths:
# Create alternative URL
alt_url = re.sub(r'/(stream|cast|watch|plus|casting|player)/', f'/{source}/', original_url)
if alt_url == original_url: # If no replacement happened
base_path = original_url.split('/stream-')[0] if '/stream-' in original_url else f'https://daddyhd.com'
alt_url = f'{base_path}/{source}/stream-{channel_id}.php'
try:
log(f"Trying alternative: {source} -> {alt_url}")
html2, url2_final = _fetch(alt_url, ref=url1, note=f'{source}/iframe')
# Check for inner iframe
m_if = re.search(r'iframe\s+src="([^"]+)"', html2, re.I)
if m_if:
url3 = urljoin(url2_final, m_if.group(1))
html3, url3_final = _fetch(url3, ref=url2_final, note='inner iframe')
test_page = html3
test_page_url = url3_final
else:
test_page = html2
test_page_url = url2_final
test_channel_key = find_channel_key_in_page(test_page, test_page_url, session, UA)
if test_channel_key:
channel_key = test_channel_key
page = test_page
page_url = test_page_url
log(f"✓ Found CHANNEL_KEY with {source} path: {channel_key}")
break
else:
log(f"✗ No CHANNEL_KEY with {source} path")
except Exception as e:
log(f"✗ {source} path error: {e}")
continue
if not channel_key:
log(f'CHANNEL_KEY not found after trying all sources. {last_error}')
return None
log(f"Final page URL: {page_url}")
log(f"Page size: {len(page)} bytes")
# ====== ADDED FROM PRODUCTION FUNCTION ======
# Step 4: Extract auth data
auth_token = ''
auth_country = ''
auth_ts = ''
base_domain = 'giokko.ru'
#with open('debug_page.html', 'w', encoding='utf-8') as f:
# f.write(page)
client_token = ''
try:
# Extract auth data using modern method
auth_data = extract_auth_data_modern(page)
auth_token = auth_data.get('token', '')
auth_country = auth_data.get('country', '')
auth_ts = auth_data.get('ts', '')
# Fallback to old patterns if new method fails
if not auth_token:
m_tok = re.search(r'const\s+AUTH_TOKEN\s*=\s*"([^"]+)"', page)
if m_tok:
auth_token = m_tok.group(1).strip()
if not auth_country:
m_cty = re.search(r'const\s+AUTH_COUNTRY\s*=\s*"([^"]+)"', page)
if m_cty:
auth_country = m_cty.group(1).strip()
if not auth_ts:
m_ts = re.search(r'const\s+AUTH_TS\s*=\s*"([^"]+)"', page)
if m_ts:
auth_ts = m_ts.group(1).strip()
# Log what we found
if auth_token:
log(f'AUTH_TOKEN/SESSION_TOKEN found (len={len(auth_token)})')
if len(auth_token) < 100:
log(f'Token: {auth_token}')
else:
log(f'Token sample: {auth_token[:50]}...')
if auth_country:
log(f'AUTH_COUNTRY = {auth_country}')
if auth_ts:
log(f'AUTH_TS = {auth_ts}')
# Heartbeat URL
heartbeat_host = None
if auth_data.get('heartbeat'):
heartbeat_url = auth_data['heartbeat']
log(f'heartbeat URL: {heartbeat_url}')
# Extract host from URL
if heartbeat_url.startswith('http'):
parsed = urlparse(heartbeat_url)
heartbeat_host = parsed.netloc
base_domain = '.'.join(parsed.netloc.split('.')[-2:])
log(f'heartbeat host: {heartbeat_host}, base_domain: {base_domain}')
else:
# Fallback to old heartbeat detection
m_hb = re.search(r"https://([^/]+)/heartbeat", page)
if m_hb:
heartbeat_host = m_hb.group(1).strip()
m_dom = re.search(r"\.([A-Za-z0-9-]+\.[A-Za-z]{2,})$", heartbeat_host)
if m_dom:
base_domain = m_dom.group(1).lower()
log(f'heartbeat host: {heartbeat_host}, base_domain: {base_domain}')
else:
log('heartbeat URL not found; using legacy base_domain.')
except Exception as e:
log(f'AUTH/heartbeat parse error: {e}')
# Step 7: Cookie.php call for giokko.ru
try:
if 'giokko.ru' in page:
try:
cookie_headers = {
'User-Agent': UA,
'Referer': page_url,
'Origin': _origin(page_url)
}
session.get('https://security.giokko.ru/cookie.php', headers=cookie_headers, timeout=10)
session.cookies.set('access', 'true', domain='giokko.ru')
session.cookies.set('access', 'true', domain='security.giokko.ru')
log('cookie.php called and access cookie set in session (legacy giokko).')
except Exception as e:
log(f'cookie.php / cookie set failed: {e}')
# Check for BUNDLE
bundle_rx = re.compile(r'const\s+(?:BUNDLE|IJXX|XKZK)\s*=\s*["\']([^"\']+)["\']', re.I)
m_b = bundle_rx.search(page)
if m_b:
log('BUNDLE found, using legacy auth flow.')
try:
bundle_raw = base64.b64decode(m_b.group(1)).decode('utf-8', 'ignore')
parts = json.loads(bundle_raw)
for k, v in list(parts.items()):
try:
parts[k] = base64.b64decode(v).decode('utf-8', 'ignore')
except:
pass
except Exception as e:
log(f'Failed to decode BUNDLE: {e}')
parts = {}
b_host = (parts.get('b_host') or '').strip()
b_script = (parts.get('b_script') or '').strip()
b_ts = (parts.get('b_ts') or '').strip()
b_rnd = (parts.get('b_rnd') or '').strip()
b_sig = (parts.get('b_sig') or '').strip()
if b_script and re.search(r'(?i)\ba\.php$', b_script):
b_script = re.sub(r'(?i)\ba\.php$', 'auth.php', b_script)
if b_host and b_script and b_ts and b_rnd and b_sig:
auth_base = urljoin(b_host if b_host.startswith('http') else _origin(page_url), b_script)
auth_url = (
f'{auth_base}?channel_id={quote_plus(channel_key)}'
f'&ts={quote_plus(b_ts)}&rnd={quote_plus(b_rnd)}&sig={quote_plus(b_sig)}'
)
try:
session.get(
auth_url,
headers={'User-Agent': UA, 'Referer': page_url, 'Origin': _origin(page_url)},
timeout=10
)
log(f'Legacy auth called: {auth_url}')
except Exception as e:
log(f'Legacy auth failed: {e}')
else:
log('BUNDLE not found, trying AUTH2.')
except Exception as e:
log(f'BUNDLE block error: {e}')
# Step 8: AUTH2 formData processing
try:
fields = dict(
re.findall(
r"formData\.append\('([^']+)'\s*,\s*(var_[A-Za-z0-9]+)\)",
page
)
)
values = dict(
re.findall(
r"const\s+(var_[A-Za-z0-9]+)\s*=\s*\"([^\"]+)\"",
page
)
)
final = {field: values.get(varname, '') for field, varname in fields.items()}
if final:
auth2_url = 'https://security.giokko.ru/auth2.php'
log(f'AUTH2 URL = {auth2_url}')
log(f'AUTH2 payload keys = {list(final.keys())}')
files = {k: (None, v) for k, v in final.items()}
auth_headers = {
'User-Agent': UA,
'Accept': '*/*',
'Accept-Language': 'en-US,en;q=0.9',
'Origin': _origin(page_url),
'Referer': page_url,
'Sec-Fetch-Site': 'cross-site',
'Sec-Fetch-Mode': 'cors',
'Sec-Fetch-Dest': 'empty',
}
auth_headers['Cookie'] = 'access=true'
r_auth = session.post(
auth2_url,
headers=auth_headers,
files=files,
timeout=10
)
log(f'auth2.php status={r_auth.status_code}')
log(f'auth2.php body={r_auth.text[:150]}')
if r_auth.status_code != 200:
log('auth2.php did not return 200; stream may be blocked.')
else:
log('AUTH2: no formData/var_* pattern found.')
except Exception as e:
log(f'AUTH2 error: {e}')
# ====== END OF ADDED SECURITY CHECKS ======
host_raw = _origin(page_url)
# Step 9: Try server lookup
server_lookup_url = None
# Try to find server lookup URL from page
try:
m_srv = re.search(r"fetchWithRetry\('([^']*server_lookup[^']*)", page)
if m_srv:
base_srv = m_srv.group(1)
if not base_srv.startswith('http'):
base_srv = urljoin(host_raw + '/', base_srv)
if 'channel_id=' in base_srv:
if base_srv.endswith('='):
server_lookup_url = base_srv + quote_plus(channel_key)
else:
server_lookup_url = base_srv
else:
sep = '&' if '?' in base_srv else '?'
server_lookup_url = f'{base_srv}{sep}channel_id={quote_plus(channel_key)}'
except Exception as e:
log(f'server_lookup parse error: {e}')
if not server_lookup_url:
server_lookup_url = f'https://chevy.giokko.ru/server_lookup?channel_id={quote_plus(channel_key)}'
log(f'Server lookup: {server_lookup_url}')
# Step 10: Call server lookup with all headers
lookup_headers = {
'User-Agent': UA,
'Referer': page_url,
'Origin': host_raw
}
if auth_token:
lookup_headers['Authorization'] = f'Bearer {auth_token}'
lookup_headers['X-Channel-Key'] = channel_key
if client_token:
lookup_headers['X-Client-Token'] = client_token
lookup_headers['X-User-Agent'] = UA
lookup_resp = session.get(server_lookup_url, headers=lookup_headers, timeout=10)
try:
data = lookup_resp.json()
except:
log(f'Server lookup JSON decode failed: {lookup_resp.text[:200]}')
return None
server_key = (data.get('server_key') or '').strip()
log(f'server_key from lookup: {server_key}')
if not server_key:
log('No server_key returned')
return None
# Step 11: Build final HLS URL
if server_key == 'top1/cdn':
hls_url = f'https://top1.{base_domain}/top1/cdn/{channel_key}/mono.css'
else:
hls_url = f'https://{server_key}new.{base_domain}/{server_key}/{channel_key}/mono.css'
# Build headers string
cookie_val = 'access=true'
if auth_token:
cookie_val = f'access=true; eplayer_session={auth_token}'
header_dict = {
'Referer': f'{host_raw}/',
'Origin': host_raw,
'Connection': 'Keep-Alive',
'User-Agent': UA,
'Cookie': cookie_val,
}
if auth_token:
header_dict['Authorization'] = f'Bearer {auth_token}'
header_dict['X-Channel-Key'] = channel_key
if client_token:
header_dict['X-Client-Token'] = client_token
header_dict['X-User-Agent'] = UA
print(header_dict)
header_str = '&'.join(f'{k}={quote_plus(v)}' for k, v in header_dict.items())
final_m3u8 = f'{hls_url}|{header_str}'
print(f"\n{'='*60}")
print("✅ SUCCESS! Final HLS URL generated:")
print('='*60)
print(f"\n{final_m3u8}")
# Also print clean version for testing
clean_url = hls_url.replace('.css', '.m3u8')
print(f"\n{'='*60}")
print("Clean m3u8 URL (for VLC testing):")
print('='*60)
print(f"\n{clean_url}")
print(f"\nRequired headers:")
for k, v in header_dict.items():
print(f" {k}: {v}")
return final_m3u8
except Exception as e:
import traceback
log(f"Exception: {e}\n{traceback.format_exc()}")
return None
# Test function
def quick_test():
"""Quick test with common patterns"""
test_links = [
"https://daddyhd.com/watch.php?id=35",
]
for link in test_links:
result = test_playstream(link)
if result:
print(f"\n✅ Working link: {link}")
print(f"Final URL length: {len(result)} chars")
break
else:
print(f"\n❌ Failed: {link}")
if __name__ == "__main__":
if len(sys.argv) > 1:
# Test specific link from command line
stream_link = sys.argv[1]
test_playstream(stream_link)
else:
# Run quick test
quick_test()```