forked from zayn303/tv-program-web-scraper
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathflask_now_playing.py
More file actions
183 lines (161 loc) · 6.55 KB
/
flask_now_playing.py
File metadata and controls
183 lines (161 loc) · 6.55 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
from flask import Flask, jsonify, request, redirect
from datetime import datetime, timedelta
import os, time, threading
from werkzeug.middleware.proxy_fix import ProxyFix
app = Flask(__name__)
app.wsgi_app = ProxyFix(app.wsgi_app, x_for=1, x_proto=1, x_host=1, x_prefix=1)
FILES = ["tv_programs_Disc.txt", "tv_programs_BBC.txt", "tv_programs_NatGeo.txt"]
now_playing = []
# --- optional HTTPS redirect when SSL is enabled ---
FORCE_HTTPS = os.getenv("FORCE_HTTPS", "1") == "1"
@app.before_request
def _force_https():
# Only redirect if we're actually running with SSL (set below)
if getattr(app, "_ssl_enabled", False) and FORCE_HTTPS:
# Respect X-Forwarded-Proto when behind a proxy
proto = request.headers.get("X-Forwarded-Proto", request.scheme)
if proto != "https":
url = request.url.replace("http://", "https://", 1)
return redirect(url, code=301)
# ---------------- your existing code ----------------
def parse_programs(file_path):
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
blocks = content.strip().split('----------------------------------------')
programs = []
for block in blocks:
lines = block.strip().split('\n')
record = {}
for line in lines:
if ': ' in line:
key, value = line.split(': ', 1)
record[key.strip()] = value.strip()
if record:
programs.append(record)
return programs
def _as_dt(date_str, hm_str):
if not date_str or not hm_str:
return None
try:
d = datetime.strptime(date_str, '%d.%m.%Y').date()
t = datetime.strptime(hm_str, '%H:%M').time()
return datetime.combine(d, t)
except ValueError:
return None
def get_current_programs():
now = datetime.now()
today = now.date()
yesterday = today - timedelta(days=1)
tomorrow = today + timedelta(days=1)
current_programs = []
for filename in FILES:
if not os.path.exists(filename):
continue
programs = parse_programs(filename)
on_air = None
next_up = None
next_up_start = None
for p in programs:
ds = p.get('Date')
st = p.get('Start Time')
et = p.get('End Time')
start_dt = _as_dt(ds, st)
end_dt = _as_dt(ds, et)
if not start_dt or not end_dt:
continue
if end_dt <= start_dt:
end_dt += timedelta(days=1)
if start_dt.date() not in (today, yesterday):
continue
if start_dt <= now < end_dt:
on_air = {
"channel": p.get("Channel", filename),
"title": p.get("Title", ""),
"start": start_dt.strftime('%H:%M'),
"end": end_dt.strftime('%H:%M'),
"description": p.get("Description", ""),
"original name": p.get("Original Name", ""),
"year": p.get("Year", ""),
"genre": p.get("Genre", ""),
"score": p.get("Score", "")
}
elif start_dt > now:
if next_up_start is None or start_dt < next_up_start:
next_up_start = start_dt
next_up = {
"channel": p.get("Channel", filename),
"title": p.get("Title", ""),
"start": start_dt.strftime('%H:%M'),
"end": end_dt.strftime('%H:%M'),
"description": p.get("Description", ""),
"original name": p.get("Original Name", ""),
"year": p.get("Year", ""),
"genre": p.get("Genre", ""),
"score": p.get("Score", "")
}
if not on_air and not next_up:
soonest = None
soonest_p = None
for p in programs:
if p.get('Date') != tomorrow.strftime('%d.%m.%Y'):
continue
start_dt = _as_dt(p.get('Date'), p.get('Start Time'))
end_dt = _as_dt(p.get('Date'), p.get('End Time'))
if not start_dt or not end_dt:
continue
if end_dt <= start_dt:
end_dt += timedelta(days=1)
if soonest is None or start_dt < soonest:
soonest = start_dt
soonest_p = {
"channel": p.get("Channel", filename),
"title": p.get("Title", ""),
"start": start_dt.strftime('%H:%M'),
"end": end_dt.strftime('%H:%M'),
"description": p.get("Description", ""),
"original name": p.get("Original Name", ""),
"year": p.get("Year", ""),
"genre": p.get("Genre", ""),
"score": p.get("Score", "")
}
if soonest_p:
next_up = soonest_p
if on_air:
current_programs.append(on_air)
elif next_up:
current_programs.append(next_up)
return current_programs
def scheduler_loop():
global now_playing
while True:
now_playing = get_current_programs()
now = datetime.now()
targets = []
for p in now_playing:
try:
target_hm = p.get('end') or p.get('start')
hh, mm = map(int, target_hm.split(':')[:2])
target = now.replace(hour=hh, minute=mm, second=0, microsecond=0)
if target <= now:
target += timedelta(days=1)
targets.append(target)
except Exception:
pass
wait_seconds = max((min(targets) - now).total_seconds(), 10) if targets else 60
time.sleep(wait_seconds)
@app.route('/now-playing')
def now_playing_api():
return jsonify(now_playing)
if __name__ == '__main__':
threading.Thread(target=scheduler_loop, daemon=True).start()
# Provide cert+key via env vars or files in cwd
cert_file = os.getenv('SSL_CERT_FILE', 'cert.pem')
key_file = os.getenv('SSL_KEY_FILE', 'key.pem')
ssl_ctx = None
if os.path.exists(cert_file) and os.path.exists(key_file):
ssl_ctx = (cert_file, key_file)
app._ssl_enabled = True
else:
app._ssl_enabled = False # will run HTTP only
app.run(host='0.0.0.0', port=int(os.getenv('PORT', 5001)), threaded=True,
ssl_context=ssl_ctx)