Skip to content

Commit 91ea9b2

Browse files
committed
Task queue, Vagrant changes
1 parent 04d0cfc commit 91ea9b2

15 files changed

+191
-10
lines changed

Vagrantfile

+6
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
VAGRANT_VERSION = "2"
22
VM_BOX = "bento/ubuntu-16.04"
3+
USE_GIT_VERSION = false
34

45
hosts = [
56
{name: "microblog", ip: "192.168.33.10"}
@@ -16,7 +17,12 @@ Vagrant.configure(VAGRANT_VERSION) do |config|
1617

1718
hosts.each_with_index do |elem, index|
1819
config.vm.define elem[:name] do |machine|
20+
if USE_GIT_VERSION
1921
machine.vm.synced_folder ".", "/vagrant", disabled: true
22+
else
23+
machine.vm.synced_folder ".", "/opt/microblog", type: "rsync",
24+
rsync__exclude: File.readlines('.gitignore').each(&:chomp!)
25+
end
2026
machine.vm.synced_folder "./vagrant", "/automated"
2127
machine.vm.network :private_network, ip: elem[:ip]
2228
machine.vm.hostname = elem[:name]

app/__init__.py

+5
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@
1111
from flask_babel import Babel, lazy_gettext as _l
1212
from elasticsearch import Elasticsearch
1313
from config import Config
14+
from redis import Redis
15+
import rq
1416

1517
db = SQLAlchemy()
1618
migrate = Migrate()
@@ -37,6 +39,9 @@ def create_app(config_class=Config):
3739
app.elasticsearch = Elasticsearch([app.config['ELASTICSEARCH_URL']]) \
3840
if app.config['ELASTICSEARCH_URL'] else None
3941

42+
app.redis = Redis.from_url(app.config['REDIS_URL'])
43+
app.task_queue = rq.Queue('microblog-tasks', connection=app.redis)
44+
4045
if app.config['ERROR_HANDLER'] and app.config['ERROR_HANDLER'] == "simple":
4146
from app.errors import simple as errors_handler
4247
errors_handler.set_error_handler(app)

app/cli.py

+3-2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
from app import db
2-
from app.models import User, Post, Message, Notification
2+
from app.models import User, Post, Message, Notification, Task
33
from datetime import datetime, timedelta
44
import click
55
import os
@@ -83,4 +83,5 @@ def show_all_attrs(value):
8383

8484
@app.shell_context_processor
8585
def make_shell_context():
86-
return {'db': db, 'User': User, 'Post': Post, 'Message': Message, 'Notification': Notification}
86+
return {'db': db, 'User': User, 'Post': Post, 'Message': Message,
87+
'Notification': Notification, 'Task': Task}

app/email.py

+8-2
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,14 @@ def send_async_email(app, msg):
77
with app.app_context():
88
mail.send(msg)
99

10-
def send_email(subject, sender, recipients, text_body, html_body):
10+
def send_email(subject, sender, recipients, text_body, html_body, attachments=None, sync=False):
1111
msg = Message(subject, sender=sender, recipients=recipients)
1212
msg.body = text_body
1313
msg.html = html_body
14-
Thread(target=send_async_email, args=(current_app._get_current_object(), msg)).start()
14+
if attachments:
15+
for attachment in attachments:
16+
msg.attach(*attachment)
17+
if sync:
18+
mail.send(msg)
19+
else:
20+
Thread(target=send_async_email, args=(current_app._get_current_object(), msg)).start()

app/main/routes.py

+10
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,16 @@ def before_request():
1818
g.search_form = SearchForm() if current_app.elasticsearch else None
1919
g.locale = str(get_locale())
2020

21+
@bp.route("/export_posts")
22+
@login_required
23+
def export_posts():
24+
if current_user.get_task_in_progress('export_posts'):
25+
flash(_("An export task is in progress already"))
26+
else:
27+
current_user.launch_task('export_posts', _("Exporting posts..."))
28+
db.session.commit()
29+
return redirect(url_for('main.user', username=current_user.username))
30+
2131
@bp.route("/notifications")
2232
@login_required
2333
def notifications():

app/models.py

+33
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77
from hashlib import md5
88
import jwt
99
import json
10+
import redis
11+
import rq
1012
from app.search import add_to_index, query_index, remove_from_index
1113

1214
followers = db.Table('followers',
@@ -78,10 +80,23 @@ class User(SearchableMixin, UserMixin, db.Model):
7880
backref='recipient', lazy='dynamic')
7981
last_message_read_time = db.Column(db.DateTime)
8082
notifications = db.relationship('Notification', backref='user', lazy='dynamic')
83+
tasks = db.relationship('Task', backref='user', lazy='dynamic')
8184

8285
def __repr__(self):
8386
return '<User {}>'.format(self.username)
8487

88+
def launch_task(self, name, description, *args, **kwargs):
89+
rq_job = current_app.task_queue.enqueue('app.tasks.' + name, self.id, *args, **kwargs)
90+
task = Task(id=rq_job.get_id(), name=name, description=description, user=self)
91+
db.session.add(task)
92+
return task
93+
94+
def get_tasks_in_progress(self):
95+
return Task.query.filter_by(user=self, complete=False).all()
96+
97+
def get_task_in_progress(self, name):
98+
return Task.query.filter_by(name=name, user=self, complete=False).first()
99+
85100
def add_notification(self, name, data):
86101
self.notifications.filter_by(name=name).delete()
87102
n = Notification(name=name, payload_json=json.dumps(data), user=self)
@@ -182,6 +197,24 @@ class Notification(db.Model):
182197
def get_data(self):
183198
return json.loads(str(self.payload_json))
184199

200+
class Task(db.Model):
201+
id = db.Column(db.String(36), primary_key=True)
202+
name = db.Column(db.String(128), index=True)
203+
description = db.Column(db.String(128))
204+
user_id = db.Column(db.Integer, db.ForeignKey('user.id'))
205+
complete = db.Column(db.Boolean, default=False)
206+
207+
def get_rq_job(self):
208+
rq_job = None
209+
try:
210+
rq_job = rq.job.Job.fetch(self.id, connection=current_app.redis)
211+
except expression as identifier:
212+
rq_job = None
213+
return rq_job
214+
215+
def get_progress(self):
216+
job = get_rq_job()
217+
return job.meta.get('progress', 0) if job is not None else 100
185218
#
186219

187220
db.event.listen(db.session, 'before_commit', Post.before_commit)

app/tasks.py

+57
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
import time
2+
import sys
3+
import json
4+
from rq import get_current_job
5+
from app import create_app, db
6+
from app.models import Task, User, Post
7+
from app.email import send_email
8+
from flask import render_template
9+
10+
app = create_app()
11+
app.app_context().push()
12+
13+
def example(seconds):
14+
job = get_current_job()
15+
print('Starting task')
16+
for i in range(seconds):
17+
job.meta['progress'] = 100.0 * i / seconds
18+
job.save_meta()
19+
print(i)
20+
time.sleep(1)
21+
job.meta['progress'] = 100
22+
job.save_meta()
23+
print('Task completed')
24+
25+
def _set_task_progress(progress):
26+
job = get_current_job()
27+
if job:
28+
job.meta['progress'] = progress
29+
job.save_meta()
30+
task = Task.query.get(job.get_id())
31+
task.user.add_notification('task_progress', {'task_id': job.get_id(), 'progress': progress})
32+
if progress >= 100:
33+
task.complete = True
34+
db.session.commit()
35+
36+
def export_posts(user_id):
37+
try:
38+
user = User.query.get(user_id)
39+
_set_task_progress(0)
40+
data = []
41+
i = 0
42+
total_posts = user.posts.count()
43+
for post in user.posts.order_by(Post.timestamp.asc()):
44+
data.append({'body': post.body, 'timestamp': post.timestamp.isoformat() + 'Z'})
45+
time.sleep(5)
46+
i += 1
47+
_set_task_progress(100 * i // total_posts)
48+
send_email('[Microblog] Your blog posts',
49+
sender=app.config['ADMINS'][0],
50+
recipients=[user.email],
51+
text_body=render_template('email/export_posts.txt', user=user),
52+
html_body=render_template('email/export_posts.html', user=user),
53+
attachments=[('posts.json', 'application/json', json.dumps({'posts': data}, indent=4))],
54+
sync=True)
55+
except:
56+
_set_task_progress(100)
57+
app.logger.error('Unhandled exception', exc_info=sys.exc_info())

app/templates/email/export_posts.html

+4
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
<p>Dear {{ user.username }},</p>
2+
<p>Please find attached the archive of your posts that you requested.</p>
3+
<p>Sincerely,</p>
4+
<p>The Microblog Team</p>

app/templates/email/export_posts.txt

+7
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
Dear {{ user.username }},
2+
3+
Please find attached the archive of your posts that you requested.
4+
5+
Sincerely,
6+
7+
The Microblog Team

app/templates/main/user.html

+7
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,13 @@ <h1>{{ _("User: %(username)s", username=user.username) }}</h1>
2727
<span>
2828
<a href="{{ url_for('main.edit_profile') }}">{{ _("Edit profile") }}</a>
2929
</span>
30+
{% if not current_user.get_task_in_progress('export_posts') %}
31+
<span>
32+
<a href="{{ url_for('main.export_posts') }}">
33+
{{ _("Export posts") }}
34+
</a>
35+
</span>
36+
{% endif %}
3037
{% else %}
3138
{% if not current_user.is_following(user) %}
3239
<span>

app/templates/msg/messages.html

-1
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
<div class="row">
55
<div class="col-md-8 col-md-offset-2">
66
<h1>{{ _("Messages") }}</h1>
7-
{% if messages %}
87
{% for post in messages %}
98
{% include 'main/_post.html' %}
109
{% else %}

automated.sh

+10-4
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,18 @@ DEBIAN_FRONTEND=noninteractive apt-get -y install python3 python3-venv python3-d
2020
DB_ROOT=$(python3 -c "import uuid; print(uuid.uuid4().hex)")
2121
debconf-set-selections <<< "mysql-server mysql-server/root_password password ${DB_ROOT}"
2222
debconf-set-selections <<< "mysql-server mysql-server/root_password_again password ${DB_ROOT}"
23-
DEBIAN_FRONTEND=noninteractive apt-get -y install mysql-server elasticsearch postfix supervisor nginx
23+
DEBIAN_FRONTEND=noninteractive apt-get -y install mysql-server elasticsearch postfix supervisor nginx redis-server
2424

2525
sed -i 's/#START_DAEMON/START_DAEMON/' /etc/default/elasticsearch
2626
systemctl restart elasticsearch
2727

28-
mkdir -p /opt/microblog
29-
cd /opt/microblog
30-
git clone https://github.com/avoidik/microblog .
28+
if [[ ! -d "/opt/microblog" ]]; then
29+
mkdir -p /opt/microblog
30+
cd /opt/microblog
31+
git clone https://github.com/avoidik/microblog .
32+
else
33+
cd /opt/microblog
34+
fi
3135
pip3 install -U pip setuptools wheel
3236
python3 -m venv venv
3337
source venv/bin/activate
@@ -71,6 +75,8 @@ flask seed --no-destructive
7175
chown -R vagrant:vagrant /opt/microblog
7276

7377
echo "export FLASK_APP=\"microblog.py\"" >> /home/vagrant/.profile
78+
echo "export LC_ALL=C.UTF-8" >> /home/vagrant/.profile
79+
echo "export LANG=C.UTF-8" >> /home/vagrant/.profile
7480

7581
supervisorctl reload
7682
service nginx reload

config.py

+1
Original file line numberDiff line numberDiff line change
@@ -21,3 +21,4 @@ class Config(object):
2121
ERROR_HANDLER = os.environ.get("ERROR_HANDLER") or None
2222
LOG_TO_STDOUT = os.environ.get("LOG_TO_STDOUT")
2323
SESSION_COOKIE_NAME = "microblog"
24+
REDIS_URL = os.environ.get('REDIS_URL') or 'redis://'
+38
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
"""tasks
2+
3+
Revision ID: a93eeb179002
4+
Revises: c8af8ee4d608
5+
Create Date: 2018-01-27 23:31:36.434246
6+
7+
"""
8+
from alembic import op
9+
import sqlalchemy as sa
10+
11+
12+
# revision identifiers, used by Alembic.
13+
revision = 'a93eeb179002'
14+
down_revision = 'c8af8ee4d608'
15+
branch_labels = None
16+
depends_on = None
17+
18+
19+
def upgrade():
20+
# ### commands auto generated by Alembic - please adjust! ###
21+
op.create_table('task',
22+
sa.Column('id', sa.String(length=36), nullable=False),
23+
sa.Column('name', sa.String(length=128), nullable=True),
24+
sa.Column('description', sa.String(length=128), nullable=True),
25+
sa.Column('user_id', sa.Integer(), nullable=True),
26+
sa.Column('complete', sa.Boolean(), nullable=True),
27+
sa.ForeignKeyConstraint(['user_id'], ['user.id'], ),
28+
sa.PrimaryKeyConstraint('id')
29+
)
30+
op.create_index(op.f('ix_task_name'), 'task', ['name'], unique=False)
31+
# ### end Alembic commands ###
32+
33+
34+
def downgrade():
35+
# ### commands auto generated by Alembic - please adjust! ###
36+
op.drop_index(op.f('ix_task_name'), table_name='task')
37+
op.drop_table('task')
38+
# ### end Alembic commands ###

requirements.txt

+2-1
Original file line numberDiff line numberDiff line change
@@ -14,4 +14,5 @@ python-dotenv
1414
elasticsearch
1515
gunicorn
1616
pymysql
17-
psycopg2
17+
psycopg2
18+
rq

0 commit comments

Comments
 (0)