Skip to content

Commit

Permalink
gh-213 Native tags tags processor periodically fails with deadlock …
Browse files Browse the repository at this point in the history
…in the database (#215)

close gh-213
  • Loading branch information
Tiendil authored Jun 5, 2024
1 parent 73747f1 commit 0f0e357
Show file tree
Hide file tree
Showing 5 changed files with 93 additions and 17 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ Generated by [Changy](https://github.com/Tiendil/changy/tree/main).

## Unreleased

No changes.
- [gh-213](https://github.com/Tiendil/feeds.fun/issues/213) Fixed: `Native tags` tags processor periodically fails with deadlock in the database.

## 1.7.0 on 2024-06-02

Expand Down
2 changes: 1 addition & 1 deletion changes/unreleased.md
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@

No changes.
- [gh-213](https://github.com/Tiendil/feeds.fun/issues/213) Fixed: `Native tags` tags processor periodically fails with deadlock in the database.
3 changes: 2 additions & 1 deletion ffun/ffun/ontology/entities.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import pydantic

from ffun.core import utils
from ffun.core.entities import BaseEntity


class TagPropertyType(int, enum.Enum):
Expand All @@ -16,7 +17,7 @@ class TagCategory(str, enum.Enum):
feed_tag = "feed-tag"


class TagProperty(pydantic.BaseModel):
class TagProperty(BaseEntity):
tag_id: int
type: TagPropertyType
value: str
Expand Down
30 changes: 16 additions & 14 deletions ffun/ffun/ontology/operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from typing import Iterable

from bidict import bidict
from pypika import PostgreSQLQuery

from ffun.core import logging
from ffun.core.postgresql import ExecuteType, execute
Expand Down Expand Up @@ -131,24 +132,25 @@ async def tech_copy_relations(execute: ExecuteType, entry_from_id: uuid.UUID, en


async def apply_tags_properties(execute: ExecuteType, properties: Iterable[TagProperty]) -> None:
sql = """
INSERT INTO o_tags_properties (tag_id, type, value, processor_id, created_at)
VALUES (%(tag_id)s, %(type)s, %(value)s, %(processor_id)s, %(created_at)s)
ON CONFLICT (tag_id, type, processor_id) DO UPDATE SET value = %(value)s
"""

if not properties:
return

query = PostgreSQLQuery.into("o_tags_properties").columns("tag_id", "type", "value", "processor_id", "created_at")

for property in properties:
await execute(
sql,
{
"tag_id": property.tag_id,
"type": property.type,
"value": property.value,
"processor_id": property.processor_id,
"created_at": property.created_at,
},
query = query.insert(
property.tag_id,
property.type,
property.value,
property.processor_id,
property.created_at,
)

query = query.on_conflict("tag_id", "type", "processor_id").do_update("value")

await execute(str(query))


async def get_tags_for_entries(execute: ExecuteType, entries_ids: list[uuid.UUID]) -> dict[uuid.UUID, set[int]]:
sql = """SELECT * FROM o_relations WHERE entry_id = ANY(%(entries_ids)s)"""
Expand Down
73 changes: 73 additions & 0 deletions ffun/ffun/ontology/tests/test_operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,14 @@
from ffun.core.postgresql import execute, transaction
from ffun.core.tests.helpers import TableSizeDelta, TableSizeNotChanged
from ffun.library.entities import Entry
from ffun.ontology.entities import ProcessorTag
from ffun.ontology.operations import (
_get_relations_for_entry_and_tags,
_register_relations_processors,
_save_tags,
apply_tags,
apply_tags_properties,
get_tags_properties,
remove_relations_for_entries,
tech_copy_relations,
)
Expand Down Expand Up @@ -256,3 +259,73 @@ async def test_success(
three_tags_ids[2]: {fake_processor_id, another_fake_processor_id},
},
)


class TestApplyTagsProperties:

@pytest.mark.asyncio
async def test_no_properties(self) -> None:
async with TableSizeNotChanged("o_tags_properties"):
await apply_tags_properties(execute, [])

@pytest.mark.asyncio
async def test_first_time_save(
self,
three_tags_by_uids: dict[str, int],
three_processor_tags: tuple[ProcessorTag, ProcessorTag, ProcessorTag],
fake_processor_id: int,
) -> None:

properties = []

for tag in three_processor_tags:
tag.link = f"https://example.com?{tag.raw_uid}"
properties.extend(
tag.build_properties_for(tag_id=three_tags_by_uids[tag.raw_uid], processor_id=fake_processor_id)
)

async with TableSizeDelta("o_tags_properties", delta=3):
await apply_tags_properties(execute, properties)

loaded_tags_properties = await get_tags_properties(three_tags_by_uids.values())

loaded_tags_properties.sort(key=lambda x: x.tag_id)
properties.sort(key=lambda x: x.tag_id)

assert loaded_tags_properties == properties

@pytest.mark.asyncio
async def test_save_duplicated(
self,
three_tags_by_uids: dict[str, int],
three_processor_tags: tuple[ProcessorTag, ProcessorTag, ProcessorTag],
fake_processor_id: int,
) -> None:

properties = []

for tag in three_processor_tags:
tag.link = f"https://example.com?{tag.raw_uid}"
properties.extend(
tag.build_properties_for(tag_id=three_tags_by_uids[tag.raw_uid], processor_id=fake_processor_id)
)

async with TableSizeDelta("o_tags_properties", delta=3):
await apply_tags_properties(execute, properties)

changed_properties = [
properties[0].replace(),
properties[1].replace(value="https://example.com?another-uid"),
properties[2].replace(),
]

async with TableSizeNotChanged("o_tags_properties"):
await apply_tags_properties(execute, changed_properties)

loaded_tags_properties = await get_tags_properties(three_tags_by_uids.values())

loaded_tags_properties.sort(key=lambda x: x.tag_id)
properties.sort(key=lambda x: x.tag_id)

assert loaded_tags_properties != properties
assert loaded_tags_properties == changed_properties

0 comments on commit 0f0e357

Please sign in to comment.