-
Notifications
You must be signed in to change notification settings - Fork 30
Bug fixes and improvements #6
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
9b00c13
478c018
c32d1ec
7df618a
466787e
9f8f593
2256cc5
9ede86a
605e2e0
80fd30b
ab54b60
8acb0c2
c6b9b02
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,2 @@ | ||
| .idea | ||
| output |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,18 +1,23 @@ | ||
| #!/usr/bin/env python3 | ||
|
|
||
| import argparse | ||
| import json | ||
| import os | ||
| import re | ||
| import string | ||
| import subprocess | ||
| import sys | ||
| import json | ||
| import time | ||
| import argparse | ||
|
|
||
| from base64 import b64decode | ||
| from lxml.html import fromstring | ||
|
|
||
| import requests | ||
|
|
||
| XOR_KEY = 'bla_bla_bla' | ||
|
|
||
| OUTPUT_PATH = "output" | ||
|
|
||
| VALID_FILENAME_CHARS = set(f" -_.(){string.ascii_letters}{string.digits}") | ||
|
|
||
| headers = { | ||
| 'authority': 'play.boomstream.com', | ||
| 'pragma': 'no-cache', | ||
|
|
@@ -26,27 +31,38 @@ | |
| 'sec-fetch-dest': 'document', | ||
| 'accept-language': 'en-US,en;q=0.9,ru;q=0.8,es;q=0.7,de;q=0.6'} | ||
|
|
||
| class App(): | ||
| def valid_filename(s): | ||
| return ''.join(c for c in s if c in VALID_FILENAME_CHARS or c.isalpha()) | ||
|
|
||
| def output_path(path): | ||
| return os.path.join(OUTPUT_PATH, path) | ||
|
|
||
| def ensure_folder_exists(path): | ||
| if not os.path.exists(path): | ||
| os.mkdir(path) | ||
|
|
||
| def run_bash(command): | ||
| exit_code, output = subprocess.getstatusoutput(command) | ||
| if exit_code != 0: | ||
| print(output) | ||
| raise ValueError(f'failed with exit code {exit_code}') | ||
| return output | ||
|
|
||
| class App(object): | ||
|
|
||
| def __init__(self): | ||
| parser = argparse.ArgumentParser(description='boomstream.com downloader') | ||
| parser.add_argument('--url', type=str, required=True) | ||
|
Owner
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would say |
||
| parser.add_argument('--pin', type=str, required=True) | ||
| parser.add_argument('--entity', type=str, required=True) | ||
| parser.add_argument('--pin', type=str, required=False) | ||
| parser.add_argument('--use-cache', action='store_true', required=False) | ||
| parser.add_argument('--resolution', type=str, required=False) | ||
| self.args = parser.parse_args() | ||
|
|
||
| def get_token(self): | ||
| if 'records' in self.config['mediaData'] and len(self.config['mediaData']['records']) > 0: | ||
| return b64decode(self.config['mediaData']['records'][0]['token']).decode('utf-8') | ||
| else: | ||
| return b64decode(self.config['mediaData']['token']).decode('utf-8') | ||
| return b64decode(self.config['mediaData']['token']).decode('utf-8') | ||
|
Comment on lines
-40
to
+62
Owner
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. When records are not available yet, token can be retrieved from 'mediaData' directly. The same works for m3u8 playlist. |
||
|
|
||
| def get_m3u8_url(self): | ||
| if 'records' in self.config['mediaData'] and len(self.config['mediaData']['records']) > 0: | ||
| return b64decode(self.config['mediaData']['records'][0]['links']['hls']).decode('utf-8') | ||
| else: | ||
| return b64decode(self.config['mediaData']['links']['hls']).decode('utf-8') | ||
| return b64decode(self.config['mediaData']['links']['hls']).decode('utf-8') | ||
|
|
||
| def get_boomstream_config(self, page): | ||
| """ | ||
|
|
@@ -65,19 +81,20 @@ def get_boomstream_config(self, page): | |
| if result is None: | ||
| raise Exception("Could not get boomstreamConfig from the main page") | ||
|
|
||
| with open('boomstream.config.json', 'wt') as f: | ||
| with open(output_path('boomstream.config.json'), 'wt') as f: | ||
| del result["translations"] | ||
| f.write(json.dumps(result, ensure_ascii=False, indent=4)) | ||
|
|
||
| return result | ||
|
|
||
| def get_playlist(self, url): | ||
| if self.args.use_cache and os.path.exists('boomstream.playlist.m3u8'): | ||
| return open('boomstream.playlist.m3u8').read() | ||
| if self.args.use_cache and os.path.exists(output_path('boomstream.playlist.m3u8')): | ||
| with open(output_path('boomstream.playlist.m3u8')) as f: | ||
| return f.read() | ||
|
|
||
| r = requests.get(url, headers=headers) | ||
|
|
||
| with open('boomstream.playlist.m3u8', 'wt') as f: | ||
| with open(output_path('boomstream.playlist.m3u8'), 'wt') as f: | ||
| f.write(r.text) | ||
|
|
||
| return r.text | ||
|
|
@@ -111,8 +128,7 @@ def extract_chunklist_urls(self, playlist): | |
|
|
||
| def get_chunklist(self, playlist): | ||
| all_chunklists = self.extract_chunklist_urls(playlist) | ||
| print("This video is available in the following resolutions: %s" % \ | ||
| ", ".join(i[0] for i in all_chunklists)) | ||
| print(f"This video is available in the following resolutions: {', '.join(i[0] for i in all_chunklists)}") | ||
|
|
||
| if self.args.resolution is not None: | ||
| url = None | ||
|
|
@@ -127,17 +143,18 @@ def get_chunklist(self, playlist): | |
| # If the resolution is not specified in args, pick the best one | ||
| url = sorted(all_chunklists, key=lambda x: x[2])[-1][1] | ||
|
|
||
| print("URL: %s" % url) | ||
| print(f"URL: {url}") | ||
|
|
||
| if url is None: | ||
| raise Exception("Could not find chunklist in playlist data") | ||
|
|
||
| if self.args.use_cache and os.path.exists('boomstream.chunklist.m3u8'): | ||
| return open('boomstream.chunklist.m3u8').read() | ||
| if self.args.use_cache and os.path.exists(output_path('boomstream.chunklist.m3u8')): | ||
| with open(output_path('boomstream.chunklist.m3u8')) as f: | ||
| return f.read() | ||
|
|
||
| r = requests.get(url, headers=headers) | ||
|
|
||
| with open('boomstream.chunklist.m3u8', 'wt') as f: | ||
| with open(output_path('boomstream.chunklist.m3u8'), 'wt') as f: | ||
| f.write(r.text) | ||
|
|
||
| return r.text | ||
|
|
@@ -171,7 +188,7 @@ def encrypt(self, source_text, key): | |
| key += key | ||
|
|
||
| for i in range(0, len(source_text)): | ||
| result += '%0.2x' % (ord(source_text[i]) ^ ord(key[i])) | ||
| result += f'{ord(source_text[i]) ^ ord(key[i]):02x}' | ||
|
|
||
| return result | ||
|
|
||
|
|
@@ -180,91 +197,123 @@ def get_aes_key(self, xmedia_ready): | |
| Returns IV and 16-byte key which will be used to decrypt video chunks | ||
| """ | ||
| decr = self.decrypt(xmedia_ready, XOR_KEY) | ||
| print('Decrypted X-MEDIA-READY: %s' % decr) | ||
| print(f'Decrypted X-MEDIA-READY: {decr}') | ||
|
|
||
| key = None | ||
| iv = ''.join(['%0.2x' % ord(c) for c in decr[20:36]]) | ||
| iv = ''.join([f'{ord(c):02x}' for c in decr[20:36]]) | ||
|
|
||
| key_url = 'https://play.boomstream.com/api/process/' + \ | ||
| self.encrypt(decr[0:20] + self.token, XOR_KEY) | ||
|
|
||
| print('key url = %s' % key_url) | ||
| print(f'key url = {key_url}') | ||
|
|
||
| r = requests.get(key_url, headers=headers) | ||
| key = r.text | ||
| print("IV = %s" % iv) | ||
| print("Key = %s" % key) | ||
| print(f"IV = {iv}") | ||
| print(f"Key = {key}") | ||
| return iv, key | ||
|
|
||
| def download_chunks(self, chunklist, iv, key): | ||
| i = 0 | ||
|
|
||
| if not os.path.exists(key): | ||
| os.mkdir(key) | ||
| ensure_folder_exists(output_path(key)) | ||
|
|
||
| # Convert the key to format suitable for openssl command-line tool | ||
| hex_key = ''.join(['%0.2x' % ord(c) for c in key]) | ||
| hex_key = ''.join([f'{ord(c):02x}' for c in key]) | ||
|
|
||
| filenames = [] | ||
|
|
||
| i = 0 | ||
| for line in chunklist.split('\n'): | ||
| if not line.startswith('https://'): | ||
| continue | ||
| outf = os.path.join(key, "%0.5d" % i) + ".ts" | ||
| if os.path.exists(outf): | ||
| outf = output_path(os.path.join(key, f"{i:05d}.ts")) | ||
| filenames.append(outf) | ||
| if os.path.exists(outf) and os.path.getsize(outf) > 0: | ||
| i += 1 | ||
| print("Chunk #%s exists [%s]" % (i, outf)) | ||
| print(f"Chunk #{i} exists [{outf}]") | ||
| continue | ||
| print("Downloading chunk #%s" % i) | ||
| os.system('curl -s "%s" | openssl aes-128-cbc -K "%s" -iv "%s" -d > %s' % \ | ||
| (line, hex_key, iv, outf)) | ||
| print(f"Downloading chunk #{i}") | ||
| run_bash(f'curl -s "{line}" | openssl aes-128-cbc -K "{hex_key}" -iv "{iv}" -d > {outf}') | ||
| i += 1 | ||
| return filenames | ||
|
|
||
| def merge_chunks(self, key): | ||
| def merge_chunks(self, filenames, key, expected_result_duration): | ||
|
Owner
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good idea with |
||
| """ | ||
| Merges all chunks into one file and encodes it to MP4 | ||
| """ | ||
| print("Merging chunks...") | ||
| os.system("cat %s/*.ts > %s.ts" % (key, key,)) | ||
| run_bash(f"cat {' '.join(filenames)} > {output_path(key)}.ts") | ||
| print("Encoding to MP4") | ||
| os.system('ffmpeg -i %s.ts -c copy "%s".mp4' % (key, self.get_title(),)) | ||
| run_bash(f'ffmpeg -nostdin -y -i {output_path(key)}.ts -c copy {output_path(key)}.mp4') | ||
|
|
||
| result_format = run_bash(f'ffprobe -i {output_path(key)}.mp4 -show_format') | ||
| result_duration = float([line[len("duration="):] for line in result_format.split('\n') if line.startswith("duration=")][0]) | ||
| print(f"Result duration: {result_duration:.2f}") | ||
| print(f"Expected duration: {expected_result_duration:.2f}") | ||
| if abs(result_duration - expected_result_duration) > 2: | ||
|
Owner
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is nice, but depends on |
||
| raise ValueError(f"unexpected result duration: {expected_result_duration:.2f} != {result_duration:.2f}") | ||
|
|
||
| ensure_folder_exists(output_path("results")) | ||
| result_filename = output_path(os.path.join("results", f"{valid_filename(self.get_title())}.mp4")) | ||
| os.rename(f'{output_path(key)}.mp4', result_filename) | ||
|
|
||
| def get_title(self): | ||
| return self.config['entity']['title'] | ||
|
|
||
| def get_access_cookies(self): | ||
| pin = self.args.pin | ||
| if pin is None: | ||
| return {} | ||
| r = requests.post("https://play.boomstream.com/api/subscriptions/recovery", | ||
| headers={'content-type': 'application/json;charset=UTF-8'}, | ||
| data=f'{{"entity":"{self.args.entity}","code":"{pin}"}}') | ||
| response = json.loads(r.text) | ||
| if "data" not in response or "cookie" not in response["data"]: | ||
| if "errors" not in response or "code" not in response: | ||
| raise ValueError(f"unexpected response on authorization: {r.text}") | ||
| else: | ||
| raise ValueError(f"authorization failed: {response['code']} {response['errors']}") | ||
| cookie = response["data"]["cookie"] | ||
| return {cookie["name"]: cookie["value"]} | ||
|
|
||
| def run(self): | ||
| if self.args.use_cache and os.path.exists('result.html'): | ||
| page = open('result.html').read() | ||
| ensure_folder_exists(OUTPUT_PATH) | ||
|
|
||
| cookies = self.get_access_cookies() | ||
|
|
||
| result_path = output_path('result.html') | ||
|
|
||
| if self.args.use_cache and os.path.exists(result_path): | ||
| page = open(result_path).read() | ||
| else: | ||
| r = requests.get(self.args.url, headers=headers) | ||
| r = requests.get(f'https://play.boomstream.com/{self.args.entity}', headers=headers, cookies=cookies) | ||
|
Owner
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think for user would be easier to copy the URL from browser command line than extracting and copying |
||
|
|
||
| with open('result.html', 'wt') as f: | ||
| with open(result_path, 'wt') as f: | ||
| f.write(r.text) | ||
|
|
||
| page = r.text | ||
|
|
||
| self.config = self.get_boomstream_config(page) | ||
| if len(self.config['mediaData']['records']) == 0: | ||
| print("Video record is not available. Probably, the live streaming" \ | ||
| "has not finished yet. Please, try to download once the translation" \ | ||
| "is finished." \ | ||
| "If you're sure that translation is finished, please create and issue" \ | ||
| "in project github tracker and attach your boomstream.config.json file") | ||
| return 1 | ||
|
|
||
| if "mediaData" not in self.config or "duration" not in self.config['mediaData']: | ||
| raise ValueError( | ||
| "Video config is not available. Probably, the live streaming has not finished yet, or you use " | ||
| "an incorrect pin code. If you're sure that translation is finished and pin code is correct, please " | ||
| "create an issue in project github tracker and attach your boomstream.config.json file.") | ||
| self.token = self.get_token() | ||
| self.m3u8_url = self.get_m3u8_url() | ||
| self.expected_result_duration = float(self.config['mediaData']['duration']) | ||
|
|
||
| print("Token = %s" % self.token) | ||
| print("Playlist: %s" % self.m3u8_url) | ||
| print(f"Token = {self.token}") | ||
| print(f"Playlist: {self.m3u8_url}") | ||
|
|
||
| playlist = self.get_playlist(self.m3u8_url) | ||
| chunklist = self.get_chunklist(playlist) | ||
|
|
||
| xmedia_ready = self.get_xmedia_ready(chunklist) | ||
|
|
||
| print('X-MEDIA-READY: %s' % xmedia_ready) | ||
| print(f'X-MEDIA-READY: {xmedia_ready}') | ||
| iv, key = self.get_aes_key(xmedia_ready) | ||
| self.download_chunks(chunklist, iv, key) | ||
| self.merge_chunks(key) | ||
| filenames = self.download_chunks(chunklist, iv, key) | ||
| self.merge_chunks(filenames, key, self.expected_result_duration) | ||
|
|
||
| if __name__ == '__main__': | ||
| app = App() | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is important.