From 0f0e35755eee0662283ae28269debaf91c468f02 Mon Sep 17 00:00:00 2001 From: Aliaksei Yaletski Date: Wed, 5 Jun 2024 22:23:11 +0200 Subject: [PATCH] gh-213 `Native tags` tags processor periodically fails with deadlock in the database (#215) close gh-213 --- CHANGELOG.md | 2 +- changes/unreleased.md | 2 +- ffun/ffun/ontology/entities.py | 3 +- ffun/ffun/ontology/operations.py | 30 +++++---- ffun/ffun/ontology/tests/test_operations.py | 73 +++++++++++++++++++++ 5 files changed, 93 insertions(+), 17 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5ee7c485..778d6d6e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,7 +6,7 @@ Generated by [Changy](https://github.com/Tiendil/changy/tree/main). ## Unreleased -No changes. +- [gh-213](https://github.com/Tiendil/feeds.fun/issues/213) Fixed: `Native tags` tags processor periodically fails with deadlock in the database. ## 1.7.0 on 2024-06-02 diff --git a/changes/unreleased.md b/changes/unreleased.md index 373eb997..850d3f69 100644 --- a/changes/unreleased.md +++ b/changes/unreleased.md @@ -1,2 +1,2 @@ -No changes. +- [gh-213](https://github.com/Tiendil/feeds.fun/issues/213) Fixed: `Native tags` tags processor periodically fails with deadlock in the database. diff --git a/ffun/ffun/ontology/entities.py b/ffun/ffun/ontology/entities.py index 8e77dbed..2522d20a 100644 --- a/ffun/ffun/ontology/entities.py +++ b/ffun/ffun/ontology/entities.py @@ -4,6 +4,7 @@ import pydantic from ffun.core import utils +from ffun.core.entities import BaseEntity class TagPropertyType(int, enum.Enum): @@ -16,7 +17,7 @@ class TagCategory(str, enum.Enum): feed_tag = "feed-tag" -class TagProperty(pydantic.BaseModel): +class TagProperty(BaseEntity): tag_id: int type: TagPropertyType value: str diff --git a/ffun/ffun/ontology/operations.py b/ffun/ffun/ontology/operations.py index 193f7e55..e783d58f 100644 --- a/ffun/ffun/ontology/operations.py +++ b/ffun/ffun/ontology/operations.py @@ -2,6 +2,7 @@ from typing import Iterable from bidict import bidict +from pypika import PostgreSQLQuery from ffun.core import logging from ffun.core.postgresql import ExecuteType, execute @@ -131,24 +132,25 @@ async def tech_copy_relations(execute: ExecuteType, entry_from_id: uuid.UUID, en async def apply_tags_properties(execute: ExecuteType, properties: Iterable[TagProperty]) -> None: - sql = """ - INSERT INTO o_tags_properties (tag_id, type, value, processor_id, created_at) - VALUES (%(tag_id)s, %(type)s, %(value)s, %(processor_id)s, %(created_at)s) - ON CONFLICT (tag_id, type, processor_id) DO UPDATE SET value = %(value)s - """ + + if not properties: + return + + query = PostgreSQLQuery.into("o_tags_properties").columns("tag_id", "type", "value", "processor_id", "created_at") for property in properties: - await execute( - sql, - { - "tag_id": property.tag_id, - "type": property.type, - "value": property.value, - "processor_id": property.processor_id, - "created_at": property.created_at, - }, + query = query.insert( + property.tag_id, + property.type, + property.value, + property.processor_id, + property.created_at, ) + query = query.on_conflict("tag_id", "type", "processor_id").do_update("value") + + await execute(str(query)) + async def get_tags_for_entries(execute: ExecuteType, entries_ids: list[uuid.UUID]) -> dict[uuid.UUID, set[int]]: sql = """SELECT * FROM o_relations WHERE entry_id = ANY(%(entries_ids)s)""" diff --git a/ffun/ffun/ontology/tests/test_operations.py b/ffun/ffun/ontology/tests/test_operations.py index f1038672..305cdd3b 100644 --- a/ffun/ffun/ontology/tests/test_operations.py +++ b/ffun/ffun/ontology/tests/test_operations.py @@ -5,11 +5,14 @@ from ffun.core.postgresql import execute, transaction from ffun.core.tests.helpers import TableSizeDelta, TableSizeNotChanged from ffun.library.entities import Entry +from ffun.ontology.entities import ProcessorTag from ffun.ontology.operations import ( _get_relations_for_entry_and_tags, _register_relations_processors, _save_tags, apply_tags, + apply_tags_properties, + get_tags_properties, remove_relations_for_entries, tech_copy_relations, ) @@ -256,3 +259,73 @@ async def test_success( three_tags_ids[2]: {fake_processor_id, another_fake_processor_id}, }, ) + + +class TestApplyTagsProperties: + + @pytest.mark.asyncio + async def test_no_properties(self) -> None: + async with TableSizeNotChanged("o_tags_properties"): + await apply_tags_properties(execute, []) + + @pytest.mark.asyncio + async def test_first_time_save( + self, + three_tags_by_uids: dict[str, int], + three_processor_tags: tuple[ProcessorTag, ProcessorTag, ProcessorTag], + fake_processor_id: int, + ) -> None: + + properties = [] + + for tag in three_processor_tags: + tag.link = f"https://example.com?{tag.raw_uid}" + properties.extend( + tag.build_properties_for(tag_id=three_tags_by_uids[tag.raw_uid], processor_id=fake_processor_id) + ) + + async with TableSizeDelta("o_tags_properties", delta=3): + await apply_tags_properties(execute, properties) + + loaded_tags_properties = await get_tags_properties(three_tags_by_uids.values()) + + loaded_tags_properties.sort(key=lambda x: x.tag_id) + properties.sort(key=lambda x: x.tag_id) + + assert loaded_tags_properties == properties + + @pytest.mark.asyncio + async def test_save_duplicated( + self, + three_tags_by_uids: dict[str, int], + three_processor_tags: tuple[ProcessorTag, ProcessorTag, ProcessorTag], + fake_processor_id: int, + ) -> None: + + properties = [] + + for tag in three_processor_tags: + tag.link = f"https://example.com?{tag.raw_uid}" + properties.extend( + tag.build_properties_for(tag_id=three_tags_by_uids[tag.raw_uid], processor_id=fake_processor_id) + ) + + async with TableSizeDelta("o_tags_properties", delta=3): + await apply_tags_properties(execute, properties) + + changed_properties = [ + properties[0].replace(), + properties[1].replace(value="https://example.com?another-uid"), + properties[2].replace(), + ] + + async with TableSizeNotChanged("o_tags_properties"): + await apply_tags_properties(execute, changed_properties) + + loaded_tags_properties = await get_tags_properties(three_tags_by_uids.values()) + + loaded_tags_properties.sort(key=lambda x: x.tag_id) + properties.sort(key=lambda x: x.tag_id) + + assert loaded_tags_properties != properties + assert loaded_tags_properties == changed_properties