Skip to content

Commit

Permalink
Merge pull request #24 from dimagi/sr/celery-2
Browse files Browse the repository at this point in the history
Import larger UCRs via celery
  • Loading branch information
sravfeyn authored May 12, 2023
2 parents 977320f + 766140a commit d249269
Show file tree
Hide file tree
Showing 12 changed files with 315 additions and 44 deletions.
13 changes: 13 additions & 0 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,16 @@ jobs:
# Use custom ports for services to avoid accidentally connecting to
# GitHub action runner's default installations
- 5432:5432
redis:
# Docker Hub image
image: redis
# Set health checks to wait until redis has started
options: >-
--health-cmd "redis-cli ping"
--health-interval 10s
--health-timeout 5s
ports:
- 6379:6379
steps:
- name: "Checkout ${{ github.ref }} ( ${{ github.sha }} )"
uses: actions/checkout@v2
Expand All @@ -41,6 +51,9 @@ jobs:
python -m pip install --upgrade pip
pip install .
pip install -r requirements_test.txt
- name: Create shared_dir
run: |
mkdir shared_dir
- name: Run Tests
run: |
pytest -W ignore::DeprecationWarning
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -115,4 +115,5 @@ tests/.superset/
*.sublime-project
*.sublime-workspace
superset_config.py
shared_dir/
*superset.db
13 changes: 13 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,19 @@ You should now be able to run superset using the `superset run` command from the
OAuth integration should now start working.


### Importing UCRs using Redis and Celery


Celery is used to import UCRs that are larger than
`hq_superset.views.ASYNC_DATASOURCE_IMPORT_LIMIT_IN_BYTES`. If you need
to import UCRs larger than this, you need to run celery to import them.
Here is how celery can be run locally.

- Install and run Redis
- Add Redis and celery config sections from `superset_config.example.py` to your local `superset_config.py`.
- Run `celery --app=superset.tasks.celery_app:app worker --pool=prefork -O fair -c 4` in the superset virtualenv.


### Testing

Tests use pytest, which is included in `requirements_dev.txt`:
Expand Down
18 changes: 18 additions & 0 deletions hq_superset/tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import os
from superset.extensions import celery_app

import logging
from .utils import AsyncImportHelper

logger = logging.getLogger(__name__)


@celery_app.task(name='refresh_hq_datasource_task')
def refresh_hq_datasource_task(domain, datasource_id, display_name, export_path, datasource_defn, user_id):
from .views import refresh_hq_datasource
try:
refresh_hq_datasource(domain, datasource_id, display_name, export_path, datasource_defn, user_id)
except Exception:
AsyncImportHelper(domain, datasource_id).mark_as_complete()
os.remove(export_path)
raise
8 changes: 8 additions & 0 deletions hq_superset/templates/hq_datasource_list.html
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,11 @@ <h2>Import From CommCareHQ</h2>
</td>
{% if ucr_id_to_pks.get(ds.id, None) %}
<td class="table-cell" role="cell">
{% if ds.is_import_in_progress %}
<p class="alert alert-warning" title="This is being imported in the background">Refreshing</p>
{% else %}
<a href="/hq_datasource/update/{{ds.id}}?name={{ ds.display_name | urlencode}}">Refresh</a>
{% endif %}
</td>
<td class="table-cell" role="cell">
<a href="/superset/explore/table/{{ ucr_id_to_pks.get(ds.id) }}/">{{ds.id}}</a>
Expand All @@ -29,7 +33,11 @@ <h2>Import From CommCareHQ</h2>
</td>
{% else %}
<td class="table-cell" role="cell">
{% if ds.is_import_in_progress %}
<p class="alert alert-warning" title="This is being imported in the background">Importing</p>
{% else %}
<a href="/hq_datasource/update/{{ds.id}}?name={{ ds.display_name | urlencode}}">Import</a>
{% endif %}
</td>
<td class="table-cell alert alert-warning" role="cell">
Not Imported yet
Expand Down
9 changes: 8 additions & 1 deletion hq_superset/tests/config_for_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,14 @@
]

superset_config.SQLALCHEMY_DATABASE_URI = "sqlite:///:memory:"

superset_config.SHARED_DIR = "shared_dir"
superset_config.ENABLE_ASYNC_UCR_IMPORTS = True
superset_config.CACHE_CONFIG = {
'CACHE_TYPE': 'RedisCache',
'CACHE_DEFAULT_TIMEOUT': 300,
'CACHE_KEY_PREFIX': 'superset_',
'CACHE_REDIS_URL': 'redis://localhost:6379/0'
}

superset_config.AUTH_USER_REGISTRATION = True
superset_config.AUTH_USER_REGISTRATION_ROLE = "Gamma"
Expand Down
77 changes: 73 additions & 4 deletions hq_superset/tests/test_views.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import datetime
import json
import jwt
import os
import pickle

from io import StringIO
from unittest.mock import patch, MagicMock
Expand All @@ -23,6 +25,16 @@ def __init__(self, json_data, status_code):
def json(self):
return self.json_data

@property
def content(self):
return pickle.dumps(self.json_data)


class UserMock():
user_id = '123'

def get_id(self):
return self.user_id

class OAuthMock():

Expand Down Expand Up @@ -77,6 +89,7 @@ def get(self, url, token):
'a/test1/api/v0.5/ucr_data_source/': MockResponse(self.test1_datasources, 200),
'a/test2/api/v0.5/ucr_data_source/': MockResponse(self.test2_datasources, 200),
'a/test1/api/v0.5/ucr_data_source/test1_ucr1/': MockResponse(TEST_DATASOURCE, 200),
'a/test1/configurable_reports/data_sources/export/test1_ucr1/?format=csv': MockResponse(TEST_UCR_CSV_V1, 200),
}[url]


Expand Down Expand Up @@ -104,7 +117,7 @@ def setUp(self):
self.app.appbuilder.sm.oauth_remotes = {"commcare": self.oauth_mock}

gamma_role = self.app.appbuilder.sm.find_role('Gamma')
self.app.appbuilder.sm.add_user(**self.oauth_mock.user_json, role=[gamma_role])
self.user = self.app.appbuilder.sm.add_user(**self.oauth_mock.user_json, role=[gamma_role])

def login(self, client):
# bypass oauth-workflow by skipping login and oauth flow
Expand Down Expand Up @@ -205,7 +218,7 @@ def test_datasource_upload(self, *args):
self.login(client)
client.get('/domain/select/test1/', follow_redirects=True)
ucr_id = self.oauth_mock.test1_datasources['objects'][0]['id']
with patch("hq_superset.views.refresh_hq_datasource") as refresh_mock:
with patch("hq_superset.views.trigger_datasource_refresh") as refresh_mock:
refresh_mock.return_value = redirect("/tablemodelview/list/")
client.get(f'/hq_datasource/update/{ucr_id}?name=ds1', follow_redirects=True)
refresh_mock.assert_called_once_with(
Expand All @@ -214,6 +227,62 @@ def test_datasource_upload(self, *args):
'ds1'
)

@patch('hq_superset.views.get_valid_cchq_oauth_token', return_value={})
@patch('hq_superset.views.os.remove')
def test_trigger_datasource_refresh(self, *args):
from hq_superset.views import (
ASYNC_DATASOURCE_IMPORT_LIMIT_IN_BYTES,
trigger_datasource_refresh
)
domain = 'test1'
ds_name = 'ds_name'
file_path = '/file_path'
ucr_id = self.oauth_mock.test1_datasources['objects'][0]['id']

def _test_sync_or_async(ds_size, routing_method, user_id):

with patch("hq_superset.views.download_datasource") as download_ds_mock, \
patch("hq_superset.views.get_datasource_defn") as ds_defn_mock, \
patch(routing_method) as refresh_mock, \
patch("hq_superset.views.g") as mock_g:
mock_g.user = UserMock()
download_ds_mock.return_value = file_path, ds_size
ds_defn_mock.return_value = TEST_DATASOURCE
trigger_datasource_refresh(domain, ucr_id, ds_name)
refresh_mock.assert_called_once_with(
domain,
ucr_id,
ds_name,
file_path,
TEST_DATASOURCE,
user_id
)

# When datasource size is more than the limit, it should get
# queued via celery
_test_sync_or_async(
ASYNC_DATASOURCE_IMPORT_LIMIT_IN_BYTES + 1,
"hq_superset.views.queue_refresh_task",
UserMock().user_id
)
# When datasource size is within the limit, it should get
# refreshed directly
_test_sync_or_async(
ASYNC_DATASOURCE_IMPORT_LIMIT_IN_BYTES - 1,
"hq_superset.views.refresh_hq_datasource",
None
)

@patch('hq_superset.views.get_valid_cchq_oauth_token', return_value={})
def test_download_datasource(self, *args):
from hq_superset.views import download_datasource
ucr_id = self.oauth_mock.test1_datasources['objects'][0]['id']
path, size = download_datasource(self.oauth_mock, '_', 'test1', ucr_id)
with open(path, 'rb') as f:
self.assertEqual(pickle.load(f), TEST_UCR_CSV_V1)
self.assertEqual(size, len(pickle.dumps(TEST_UCR_CSV_V1)))
os.remove(path)

@patch('hq_superset.views.get_valid_cchq_oauth_token', return_value={})
def test_refresh_hq_datasource(self, *args):

Expand All @@ -222,14 +291,14 @@ def test_refresh_hq_datasource(self, *args):

ucr_id = self.oauth_mock.test1_datasources['objects'][0]['id']
ds_name = "ds1"
with patch("hq_superset.views.get_csv_file") as csv_mock, \
with patch("hq_superset.views.get_datasource_file") as csv_mock, \
self.app.test_client() as client:
self.login(client)
client.get('/domain/select/test1/', follow_redirects=True)

def _test_upload(test_data, expected_output):
csv_mock.return_value = StringIO(test_data)
refresh_hq_datasource('test1', ucr_id, ds_name)
refresh_hq_datasource('test1', ucr_id, ds_name, '_', TEST_DATASOURCE)
datasets = json.loads(client.get('/api/v1/dataset/').data)
self.assertEqual(len(datasets['result']), 1)
self.assertEqual(datasets['result'][0]['schema'], get_schema_name_for_domain('test1'))
Expand Down
73 changes: 69 additions & 4 deletions hq_superset/utils.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,25 @@
import os
import pandas
import sqlalchemy


from contextlib import contextmanager
from datetime import date, datetime
from superset.extensions import cache_manager
from flask_login import current_user
from zipfile import ZipFile

import pandas
import sqlalchemy


DOMAIN_PREFIX = "hqdomain_"
SESSION_USER_DOMAINS_KEY = "user_hq_domains"
SESSION_OAUTH_RESPONSE_KEY = "oauth_response"
HQ_DB_CONNECTION_NAME = "HQ Data"

# ~5MB
ASYNC_DATASOURCE_IMPORT_LIMIT_IN_BYTES = 5000000

def get_datasource_export_url(domain, datasource_id):
return f"a/{domain}/configurable_reports/data_sources/export/{datasource_id}?format=csv"
return f"a/{domain}/configurable_reports/data_sources/export/{datasource_id}/?format=csv"


def get_datasource_list_url(domain):
Expand Down Expand Up @@ -99,6 +106,33 @@ def parse_date(date_str):
return date_str


class AsyncImportHelper:
def __init__(self, domain, datasource_id):
self.domain = domain
self.datasource_id = datasource_id

@property
def progress_key(self):
return f"{self.domain}_{self.datasource_id}_import_task_id"

@property
def task_id(self):
return cache_manager.cache.get(self.progress_key)

def is_import_in_progress(self):
if not self.task_id:
return False
from celery.result import AsyncResult
res = AsyncResult(self.task_id)
return not res.ready()

def mark_as_in_progress(self, task_id):
cache_manager.cache.set(self.progress_key, task_id)

def mark_as_complete(self):
cache_manager.cache.delete(self.progress_key)


class DomainSyncUtil:

def __init__(self, security_manager):
Expand Down Expand Up @@ -146,3 +180,34 @@ def sync_domain_role(self, domain):
current_user.roles = self.re_eval_roles(current_user.roles, role)
self.sm.get_session.add(current_user)
self.sm.get_session.commit()




@contextmanager
def get_datasource_file(path):
with ZipFile(path) as zipfile:
filename = zipfile.namelist()[0]
yield zipfile.open(filename)


def download_datasource(provider, oauth_token, domain, datasource_id):
import superset
datasource_url = get_datasource_export_url(domain, datasource_id)
response = provider.get(datasource_url, token=oauth_token)
if response.status_code != 200:
raise CCHQApiException("Error downloading the UCR export from HQ")

filename = f"{datasource_id}_{datetime.now()}.zip"
path = os.path.join(superset.config.SHARED_DIR, filename)
with open(path, "wb") as f:
f.write(response.content)

return path, len(response.content)

def get_datasource_defn(provider, oauth_token, domain, datasource_id):
url = get_datasource_details_url(domain, datasource_id)
response = provider.get(url, token=oauth_token)
if response.status_code != 200:
raise CCHQApiException("Error downloading the UCR definition from HQ")
return response.json()
Loading

0 comments on commit d249269

Please sign in to comment.