-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathapp.py
174 lines (146 loc) · 5.91 KB
/
app.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
from flask import Flask, render_template, request, flash, send_file, url_for, jsonify, redirect
import spotify_get_song_names as spt
import youtube_downloader as yt
from config import Prod, Dev
from zipfile import ZipFile
import os
from os.path import basename
from io import BytesIO
import shutil
import time
import spotify_get_song_names as spt
from celery import Celery
import os
import boto3
# Initialize Flask App
app = Flask(__name__)
app.config.from_object(Prod)
# Initialize Celery
celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL'])
celery.conf.update(app.config)
AWS_ACCESS_KEY_ID = os.getenv('AWS_ACCESS_KEY_ID')
AWS_SECRET_ACCESS_KEY = os.getenv('AWS_SECRET_ACCESS_KEY')
AWS_REGION = os.getenv('AWS_REGION')
AWS_BUCKET_NAME = os.getenv('AWS_BUCKET_NAME')
local_download_directory = os.path.join(os.getcwd(), 'static', 'music_files')
@app.route('/')
def index():
return render_template('index.html')
@app.route('/playlist_check', methods=('GET', 'POST'))
def playlist_check():
if request.method == 'POST':
playlist_id = request.json["playlist_id"]
print(playlist_id)
if not playlist_id:
flash('Playlist Link is required!')
playlist_too_long = spt.playlist_too_long(playlist_id)
return jsonify({'response': playlist_too_long})
@app.route('/download', methods=('GET', 'POST'))
def download():
if request.method == 'POST':
playlist_id = request.json["playlist_id"]
filetype = request.json["filetype"]
print(playlist_id)
if not playlist_id:
flash('Playlist Link is required!')
# First third of progress bar would be song title retrieval, second third would be download, last would be zipping
task = background_process.apply_async(args=(playlist_id, filetype))
result = jsonify({}), 202, {'Location': url_for('progress', task_id=task.id)}
return result
def zipping(data, dirName):
# create a ZipFile object
with ZipFile(data, 'w') as zipObj:
# Iterate over all the files in directory
for folderName, subfolders, filenames in os.walk(dirName):
for filename in filenames:
#create complete filepath of file in directory
filePath = os.path.join(folderName, filename)
# Add file to zip
zipObj.write(filePath, basename(filePath))
@app.route('/send_zip_file', methods=('GET', 'POST'))
def send_zip_file():
path = local_download_directory
shutil.rmtree(path, ignore_errors=True)
os.makedirs(path)
# initialize active storage
s3 = boto3.client(
's3',
region_name=AWS_REGION,
aws_access_key_id=AWS_ACCESS_KEY_ID,
aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
)
# download files into local directory
for key in s3.list_objects(Bucket=AWS_BUCKET_NAME)['Contents']:
key_filename = key["Key"].split("public/", 1)[1]
download_filename = os.path.join(path, key_filename)
s3.download_file(Bucket=AWS_BUCKET_NAME, Key=key['Key'], Filename=download_filename)
s3.delete_object(Bucket=AWS_BUCKET_NAME, Key=key['Key'])
print('downloaded song: ', key_filename)
data = BytesIO()
zipping(data, path)
data.seek(0)
shutil.rmtree(path, ignore_errors=True)
return send_file(data, mimetype='application/zip', as_attachment=True, download_name='music_playlist.zip')
@celery.task(bind=True)
def background_process(self, playlist_link, filetype):
path = local_download_directory
shutil.rmtree(path, ignore_errors=True)
process = 0
song_titles = spt.track_data_extractor(playlist_link)
process = 30
single_song_percent = int(70 / len(song_titles))
self.update_state(state='PROGRESS', meta={'current': process, 'total': 100, 'status': 'downloading songs'})
# initialize active storage
s3 = boto3.client(
's3',
region_name=AWS_REGION,
aws_access_key_id=AWS_ACCESS_KEY_ID,
aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
)
for song in song_titles:
filename = yt.download_from_link(song, filetype)
self.update_state(state='PROGRESS', meta={'current': process, 'total': 100, 'status': 'downloading songs'})
filepath = os.path.join(path, filename)
if os.path.exists(filepath):
# upload to active storage here
key = 'public/' + filename
s3.upload_file(Filename=filepath, Bucket=AWS_BUCKET_NAME, Key=key)
process += single_song_percent
self.update_state(state='PROGRESS', meta={'current': process, 'total': 100, 'status': 'downloading songs'})
shutil.rmtree(path, ignore_errors=True)
self.update_state(state='SUCCESS', meta={'current': 100, 'total': 100, 'status': 'Finished downloads! Zipping now...','result': 42})
time.sleep(2)
return {'current': 100, 'total': 100, 'status': 'Finished downloads! Zipping now...',
'result': 42}
@app.route('/progress/<task_id>', methods=('GET', 'POST'))
def progress(task_id):
task = background_process.AsyncResult(task_id)
if task.state == 'PENDING':
# job did not start yet
response = {
'state': task.state,
'current': 0,
'total': 1,
'status': 'Scanning through playlist...'
}
elif task.state != 'FAILURE':
response = {
'state': task.state,
'current': task.info.get('current', 0),
'total': task.info.get('total', 1),
'status': task.info.get('status', '')
}
if 'result' in task.info:
response['result'] = task.info['result']
else:
# something went wrong in the background job
response = {
'state': task.state,
'current': 1,
'total': 1,
'status': str(task.info), # this is the exception raised
}
json_response = jsonify(response)
return json_response
if __name__ == "__main__":
app.run(host=app.config.get("DOMAIN"))