Skip to content

Commit

Permalink
Merge pull request #556 from ckan/clenanup-clear-action
Browse files Browse the repository at this point in the history
Clean up harvest source clear command
  • Loading branch information
amercader authored Oct 31, 2024
2 parents bf849f1 + 4c40f6b commit ffc7600
Show file tree
Hide file tree
Showing 2 changed files with 106 additions and 122 deletions.
14 changes: 12 additions & 2 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,19 @@ jobs:
needs: lint
strategy:
matrix:
ckan-version: ["2.11", "2.10", 2.9]
include:
- ckan-version: "2.11"
ckan-image: "ckan/ckan-dev:2.11-py3.10"
- ckan-version: "2.10"
ckan-image: "ckan/ckan-dev:2.10-py3.10"
- ckan-version: "2.9"
ckan-image: "ckan/ckan-dev:2.9-py3.9"
fail-fast: false

name: CKAN ${{ matrix.ckan-version }}
runs-on: ubuntu-latest
container:
image: ckan/ckan-dev:${{ matrix.ckan-version }}
image: ${{ matrix.ckan-image }}
services:
solr:
image: ckan/ckan-solr:${{ matrix.ckan-version }}-solr9
Expand Down Expand Up @@ -54,6 +60,10 @@ jobs:
pip install -e .
# Replace default path to CKAN core config file with the one on the container
sed -i -e 's/use = config:.*/use = config:\/srv\/app\/src\/ckan\/test-core.ini/' test.ini
- name: Install requirements (2.9)
run: |
pip install -U pytest-rerunfailures
if: ${{ matrix.ckan-version == '2.9' }}
- name: Setup extension (CKAN >= 2.9)
run: |
ckan -c test.ini db init
Expand Down
214 changes: 94 additions & 120 deletions ckanext/harvest/logic/action/update.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,146 +92,114 @@ def harvest_source_update(context, data_dict):


def harvest_source_clear(context, data_dict):
'''
"""
Clears all datasets, jobs and objects related to a harvest source, but
keeps the source itself. This is useful to clean history of long running
harvest sources to start again fresh.
:param id: the id of the harvest source to clear
:type id: string
'''
"""

check_access('harvest_source_clear', context, data_dict)
check_access("harvest_source_clear", context, data_dict)

harvest_source_id = data_dict.get('id')
harvest_source_id = data_dict.get("id")

source = HarvestSource.get(harvest_source_id)
if not source:
log.error('Harvest source %s does not exist', harvest_source_id)
raise NotFound('Harvest source %s does not exist' % harvest_source_id)
log.error("Harvest source %s does not exist", harvest_source_id)
raise NotFound("Harvest source %s does not exist" % harvest_source_id)

harvest_source_id = source.id

# Clear all datasets from this source from the index
harvest_source_index_clear(context, data_dict)

model = context['model']

# CKAN-2.6 or above: related don't exist any more
if toolkit.check_ckan_version(max_version='2.5.99'):

sql = '''select id from related where id in (
select related_id from related_dataset where dataset_id in (
select package_id from harvest_object
where harvest_source_id = '{harvest_source_id}'));'''.format(
harvest_source_id=harvest_source_id)
result = model.Session.execute(sql)
ids = []
for row in result:
ids.append(row[0])
related_ids = "('" + "','".join(ids) + "')"

sql = '''begin;
update package set state = 'to_delete' where id in (
select package_id from harvest_object
where harvest_source_id = '{harvest_source_id}');'''.format(
harvest_source_id=harvest_source_id)

# CKAN-2.3 or above: delete resource views, resource revisions & resources
if toolkit.check_ckan_version(min_version='2.3'):
sql += '''
delete from resource_view where resource_id in (
select id from resource where package_id in (
select id from package where state = 'to_delete'));
delete from resource_revision where package_id in (
select id from package where state = 'to_delete');
delete from resource where package_id in (
select id from package where state = 'to_delete');
'''
# Backwards-compatibility: support ResourceGroup (pre-CKAN-2.3)
else:
sql += '''
delete from resource_revision where resource_group_id in (
select id from resource_group where package_id in (
select id from package where state = 'to_delete'));
delete from resource where resource_group_id in (
select id from resource_group where package_id in (
select id from package where state = 'to_delete'));
delete from resource_group_revision where package_id in (
select id from package where state = 'to_delete');
delete from resource_group where package_id in (
select id from package where state = 'to_delete');
'''
# CKAN pre-2.5: authz models were removed in migration 078
if toolkit.check_ckan_version(max_version='2.4.99'):
sql += '''
delete from package_role where package_id in (
select id from package where state = 'to_delete');
delete from user_object_role where id not in (
select user_object_role_id from package_role)
and context = 'Package';
'''

sql += '''
delete from harvest_object_error where harvest_object_id in (
select id from harvest_object
where harvest_source_id = '{harvest_source_id}');
delete from harvest_object_extra where harvest_object_id in (
select id from harvest_object
where harvest_source_id = '{harvest_source_id}');
delete from harvest_object where harvest_source_id = '{harvest_source_id}';
delete from harvest_gather_error where harvest_job_id in (
select id from harvest_job where source_id = '{harvest_source_id}');
delete from harvest_job where source_id = '{harvest_source_id}';
delete from package_tag_revision where package_id in (
select id from package where state = 'to_delete');
delete from member_revision where table_id in (
select id from package where state = 'to_delete');
delete from package_extra_revision where package_id in (
select id from package where state = 'to_delete');
delete from package_revision where id in (
select id from package where state = 'to_delete');
delete from package_tag where package_id in (
select id from package where state = 'to_delete');
delete from package_extra where package_id in (
select id from package where state = 'to_delete');
delete from package_relationship_revision where subject_package_id in (
select id from package where state = 'to_delete');
delete from package_relationship_revision where object_package_id in (
select id from package where state = 'to_delete');
delete from package_relationship where subject_package_id in (
select id from package where state = 'to_delete');
delete from package_relationship where object_package_id in (
select id from package where state = 'to_delete');
delete from member where table_id in (
select id from package where state = 'to_delete');
'''.format(
harvest_source_id=harvest_source_id)

if toolkit.check_ckan_version(max_version='2.5.99'):
sql += '''
delete from related_dataset where dataset_id in (
select id from package where state = 'to_delete');
delete from related where id in {related_ids};
delete from package where id in (
select id from package where state = 'to_delete');
'''.format(related_ids=related_ids)
else:
# CKAN-2.6 or above: related don't exist any more
sql += '''
delete from package where id in (
select id from package where state = 'to_delete');
'''

sql += '''
commit;
'''
model = context["model"]

sql = "BEGIN;"

sql += f"""
UPDATE package set state = 'to_delete' WHERE id IN (
SELECT package_id FROM harvest_object
WHERE harvest_source_id = '{harvest_source_id}');"""

sql += """
CREATE TEMP TABLE package_ids_to_delete AS (
SELECT id FROM package WHERE state = 'to_delete');
"""

sql += f"""
DELETE FROM harvest_object_error WHERE harvest_object_id IN (
SELECT id FROM harvest_object
WHERE harvest_source_id = '{harvest_source_id}');
DELETE FROM harvest_object_extra WHERE harvest_object_id IN (
SELECT id FROM harvest_object
WHERE harvest_source_id = '{harvest_source_id}');
DELETE FROM harvest_object WHERE harvest_source_id = '{harvest_source_id}';
DELETE FROM harvest_gather_error WHERE harvest_job_id IN (
SELECT id FROM harvest_job WHERE source_id = '{harvest_source_id}');
DELETE FROM harvest_job WHERE source_id = '{harvest_source_id}';
"""

if toolkit.check_ckan_version(max_version="2.10.99"):
# Revision tables were dropped IN CKAN 2.10

sql += """
DELETE FROM package_tag_revision WHERE package_id IN (
SELECT id FROM package_ids_to_delete);
DELETE FROM member_revision WHERE table_id IN (
SELECT id FROM package_ids_to_delete);
DELETE FROM package_extra_revision WHERE package_id IN (
SELECT id FROM package_ids_to_delete);
DELETE FROM package_revision WHERE id IN (
SELECT id FROM package_ids_to_delete);
DELETE FROM package_relationship_revision WHERE subject_package_id IN (
SELECT id FROM package_ids_to_delete);
DELETE FROM package_relationship_revision WHERE object_package_id IN (
SELECT id FROM package_ids_to_delete);
DELETE FROM resource_revision WHERE package_id IN (
SELECT id FROM package_ids_to_delete);
"""

if toolkit.check_ckan_version(max_version="2.11.99"):
# Package_extra table were dropped IN CKAN 2.12
sql += """
DELETE FROM package_extra WHERE package_id IN (
SELECT id FROM package_ids_to_delete);
"""

sql += """
DELETE FROM package_tag WHERE package_id IN (
SELECT id FROM package_ids_to_delete);
DELETE FROM package_relationship WHERE subject_package_id IN (
SELECT id FROM package_ids_to_delete);
DELETE FROM package_relationship WHERE object_package_id IN (
SELECT id FROM package_ids_to_delete);
DELETE FROM member WHERE table_id IN (
SELECT id FROM package_ids_to_delete);
DELETE FROM resource_view WHERE resource_id IN (
SELECT id FROM resource WHERE package_id IN (
SELECT id FROM package_ids_to_delete));
DELETE FROM resource WHERE package_id IN (
SELECT id FROM package_ids_to_delete);
"""

sql += """
DELETE FROM package WHERE id IN (
SELECT id FROM package_ids_to_delete);
"""

sql += """
COMMIT;
"""
model.Session.execute(sql)

# Refresh the index for this source to update the status object
get_action('harvest_source_reindex')(context, {'id': harvest_source_id})
get_action("harvest_source_reindex")(context, {"id": harvest_source_id})

return {'id': harvest_source_id}
return {"id": harvest_source_id}


def harvest_abort_failed_jobs(context, data_dict):
Expand Down Expand Up @@ -974,7 +942,10 @@ def harvest_source_reindex(context, data_dict):

if 'extras_as_string' in context:
del context['extras_as_string']
context.update({'ignore_auth': True})
context.update({
'ignore_auth': True,
'validate': False,
})
package_dict = logic.get_action('harvest_source_show')(
context, {'id': harvest_source_id})
log.debug('Updating search index for harvest source: %s',
Expand All @@ -991,6 +962,9 @@ def harvest_source_reindex(context, data_dict):
if key not in config:
new_dict[key] = value

# TODO: workaroud for extras exception caused by convert_from_extras
new_dict.pop("extras", None)

package_index = PackageSearchIndex()
package_index.index_package(new_dict, defer_commit=defer_commit)

Expand Down

0 comments on commit ffc7600

Please sign in to comment.