Skip to content

Commit 7e39fcb

Browse files
author
Pablo Panero
committed
event bus: initial implementation
1 parent 76274aa commit 7e39fcb

File tree

10 files changed

+290
-2
lines changed

10 files changed

+290
-2
lines changed

invenio_records_resources/config.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
# -*- coding: utf-8 -*-
22
#
3-
# Copyright (C) 2020 CERN.
3+
# Copyright (C) 2020-2022 CERN.
44
# Copyright (C) 2020 Northwestern University.
55
#
66
# Invenio-Records-Resources is free software; you can redistribute it and/or
@@ -16,3 +16,7 @@
1616
SITE_UI_URL = "https://127.0.0.1:5000"
1717

1818
SITE_API_URL = "https://127.0.0.1:5000/api"
19+
20+
RECORDS_RESOURCES_EVENTS_HANDLERS = {}
21+
22+
RECORDS_RESOURCES_EVENTS_QUEUE = "events"
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
# -*- coding: utf-8 -*-
2+
#
3+
# Copyright (C) 2022 CERN.
4+
#
5+
# Invenio-Records-Resources is free software; you can redistribute it and/or
6+
# modify it under the terms of the MIT License; see LICENSE file for more
7+
# details.
8+
9+
"""Module for event driven actions support."""
10+
11+
from .bus import EventBus
12+
from .events import Event
13+
14+
__all__ = (
15+
"Event",
16+
"EventBus"
17+
)
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
# -*- coding: utf-8 -*-
2+
#
3+
# Copyright (C) 2022 CERN.
4+
#
5+
# Invenio-Records-Resources is free software; you can redistribute it and/or
6+
# modify it under the terms of the MIT License; see LICENSE file for more
7+
# details.
8+
9+
"""Events bus module."""
10+
11+
from pickle import dumps, loads
12+
13+
from flask import current_app
14+
from invenio_queues.proxies import current_queues
15+
16+
17+
class EventBus:
18+
"""Event bus."""
19+
20+
def __init__(self, queue_name=None):
21+
"""Constructor."""
22+
self._queue_name = queue_name or \
23+
current_app.config["RECORDS_RESOURCES_EVENTS_QUEUE"]
24+
self._queue = None
25+
26+
for name, queue in current_queues.queues.items():
27+
if name == self._queue_name:
28+
self._queue = queue
29+
break
30+
31+
def publish(self, event):
32+
"""Publish an event to the bus queue."""
33+
return self._queue.publish([dumps(event)])
34+
35+
def consume(self):
36+
"""Consume an event from the bus queue."""
37+
for event in self._queue.consume(): # consume() returns a generator
38+
yield loads(event)
39+
40+
def active_consumer(self):
41+
"""Returns a consumer that stays open."""
42+
# TODO: see usage in handlers.py
43+
pass
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
# -*- coding: utf-8 -*-
2+
#
3+
# Copyright (C) 2022 CERN.
4+
#
5+
# Invenio-Records-Resources is free software; you can redistribute it and/or
6+
# modify it under the terms of the MIT License; see LICENSE file for more
7+
# details.
8+
9+
"""Events module."""
10+
11+
from dataclasses import dataclass
12+
from datetime import datetime
13+
from typing import ClassVar
14+
15+
16+
@dataclass
17+
class Event:
18+
"""Base event."""
19+
20+
created: datetime
21+
type: str
22+
action: str
23+
handling_key: str
24+
25+
26+
@dataclass
27+
class RecordEvent(Event):
28+
"""Record related events."""
29+
30+
recid: str
31+
type: ClassVar[str] = "RECORD"
32+
handling_key: ClassVar[str] = "RECORD"
33+
34+
35+
@dataclass
36+
class RecordCreatedEvent(RecordEvent):
37+
"""Record related events."""
38+
39+
action: ClassVar[str] = "PUBLISHED"
40+
handling_key: ClassVar[str] = f"{RecordEvent.type}.{action}"
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
# -*- coding: utf-8 -*-
2+
#
3+
# Copyright (C) 2022 CERN.
4+
#
5+
# Invenio-Records-Resources is free software; you can redistribute it and/or
6+
# modify it under the terms of the MIT License; see LICENSE file for more
7+
# details.
8+
9+
"""Event handlers module."""
10+
11+
from dataclasses import asdict
12+
from datetime import datetime
13+
14+
from celery import shared_task
15+
from flask import current_app
16+
17+
from .bus import EventBus
18+
19+
20+
def _handlers_for_key(key):
21+
"""Returns the handlers for a key."""
22+
config_handlers = current_app.config["RECORDS_RESOURCES_EVENTS_HANDLERS"]
23+
keys_parts = key.split(".")
24+
25+
event_handlers = []
26+
curr_key = ""
27+
for part in keys_parts:
28+
curr_key = f"{curr_key}.{part}"
29+
try:
30+
event_handlers.expand(config_handlers[curr_key])
31+
except KeyError:
32+
current_app.logger.warning(f"No handler for key {curr_key}")
33+
34+
return event_handlers
35+
36+
37+
def _handle_event(event, handler=None):
38+
"""Executes the handlers configured for an event."""
39+
handlers = _handlers_for_key(event.handling_key)
40+
41+
for handler in handlers:
42+
func = handler
43+
async_ = True
44+
if isinstance(handler, tuple):
45+
func = handler[0]
46+
async_ = handler[1]
47+
48+
if async_:
49+
func.delay(**asdict(event))
50+
else:
51+
func(**asdict(event))
52+
53+
# audit logging
54+
current_app.logger.info(
55+
f"{event.type}-{event.action} handled successfully."
56+
)
57+
58+
59+
@shared_task(ignore_result=True)
60+
def handle_events(queue_name=None, max_events=1000, ttl=300):
61+
"""Handle events queue.
62+
63+
:param max_events: maximum number of events to process by the task.
64+
:param ttl: time to live (in seconds) for the task.
65+
"""
66+
bus = EventBus(queue_name)
67+
start = datetime.timestamp(datetime.now())
68+
end = start
69+
spawn_new = False
70+
with bus.active_consumer() as consumer:
71+
while max_events > 0 and (start + ttl) > end:
72+
spawn_new = False
73+
event = consumer.consume() # blocking
74+
_handle_event(event) # execute all handlers
75+
end = datetime.timestamp(datetime.now())
76+
spawn_new = True
77+
78+
if spawn_new:
79+
handle_events.delay(
80+
queue_name=queue_name, max_events=max_events, ttl=ttl
81+
)

invenio_records_resources/services/uow.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,8 @@ def on_commit(self, uow):
106106

107107
from invenio_db import db
108108

109+
from .events import EventBus
110+
109111

110112
#
111113
# Unit of work operations
@@ -199,6 +201,21 @@ def on_post_commit(self, uow):
199201
self._celery_task.delay(*self._args, **self._kwargs)
200202

201203

204+
class EventOp(Operation):
205+
"""A task to send an event.
206+
207+
All events will be sent after the commit phase.
208+
"""
209+
210+
def __init__(self, event, *args, **kwargs):
211+
"""Constructor."""
212+
self._event = event
213+
214+
def on_post_commit(self, uow):
215+
"""Publish the event to the bus."""
216+
uow._event_bus.publish(self._event)
217+
218+
202219
#
203220
# Unit of work context manager
204221
#
@@ -215,6 +232,7 @@ def __init__(self, session=None):
215232
"""Initialize unit of work context."""
216233
self._session = session or db.session
217234
self._operations = []
235+
self._event_bus = EventBus()
218236
self._dirty = False
219237

220238
def __enter__(self):

run-tests.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ trap cleanup EXIT
3131

3232
python -m check_manifest --ignore ".*-requirements.txt"
3333
python -m sphinx.cmd.build -qnNW docs docs/_build/html
34-
eval "$(docker-services-cli up --db ${DB:-postgresql} --search ${SEARCH:-elasticsearch} --cache ${CACHE:-redis} --env)"
34+
eval "$(docker-services-cli up --db ${DB:-postgresql} --search ${SEARCH:-elasticsearch} --cache ${CACHE:-redis} --mq ${MQ:-rabbitmq} --env)"
3535
python -m pytest $@
3636
tests_exit_code=$?
3737
python -m sphinx.cmd.build -qnNW -b doctest docs docs/_build/doctest

setup.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@
6767
"invenio-indexer>=1.2.1",
6868
"invenio-jsonschemas>=1.1.3",
6969
"invenio-pidstore>=1.2.2",
70+
"invenio-queues>=1.0.0a4",
7071
"invenio-records-permissions>=0.13.0,<0.14.0",
7172
"invenio-records>=1.6.0",
7273
"luqum>=0.11.0",

tests/services/events/conftest.py

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
# -*- coding: utf-8 -*-
2+
#
3+
# Copyright (C) 2022 CERN.
4+
#
5+
# Invenio-Records-Resources is free software; you can redistribute it and/or
6+
# modify it under the terms of the MIT License; see LICENSE file for more
7+
# details.
8+
9+
"""Pytest configuration.
10+
11+
See https://pytest-invenio.readthedocs.io/ for documentation on which test
12+
fixtures are available.
13+
"""
14+
15+
from datetime import timedelta
16+
17+
import pytest
18+
from kombu import Exchange
19+
20+
from invenio_records_resources.services.events.events import \
21+
RecordEvent, RecordCreatedEvent
22+
23+
24+
@pytest.fixture(scope="module")
25+
def app_config(app_config):
26+
"""Application configuration."""
27+
# handlers
28+
app_config["RECORDS_RESOURCES_EVENTS_HANDLERS"] = {
29+
RecordEvent.handling_key: [],
30+
RecordCreatedEvent.handling_key: [
31+
# (sync_handler_task, True),
32+
# (explicit_asyn_handler_task, False),
33+
# implicit_asyn_handler_task,
34+
],
35+
}
36+
37+
# events queue
38+
queue_name = "test-events"
39+
exchange = Exchange(
40+
queue=queue_name,
41+
type="direct",
42+
delivery_mode="persistent", # in-memory and disk
43+
)
44+
45+
app_config["RECORDS_RESOURCES_EVENT_QUEUE"] = queue_name
46+
app_config["QUEUES_DEFINITIONS"] = [
47+
{"name": queue_name, "exchange": exchange}
48+
]
49+
50+
# celery config
51+
app_config["CELERY_ACCEPT_CONTENT"] = ["json", "msgpack", "yaml", "pickle"]
52+
app_config["CELERYBEAT_SCHEDULE"] = {
53+
'event_handling': {
54+
'task': 'invenio_records_resources.services.events.handle_events',
55+
'schedule': timedelta(minutes=5),
56+
},
57+
}
58+
59+
return app_config

tests/services/events/test_bus.py

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
# -*- coding: utf-8 -*-
2+
#
3+
# Copyright (C) 2022 CERN.
4+
#
5+
# Invenio-Records-Resources is free software; you can redistribute it and/or
6+
# modify it under the terms of the MIT License; see LICENSE file for more
7+
# details.
8+
9+
"""Event bus test."""
10+
11+
from datetime import datetime
12+
from time import sleep
13+
14+
from invenio_records_resources.services.events import EventBus
15+
from invenio_records_resources.services.events.events import RecordCreatedEvent
16+
17+
18+
def test_bus_publish_consume(app):
19+
bus = EventBus("test-events")
20+
event = RecordCreatedEvent(created=datetime.now(), recid="12345-abcde")
21+
22+
bus.publish(event)
23+
sleep(10)
24+
consumed_event = bus.consume()
25+
assert event == next(consumed_event)

0 commit comments

Comments
 (0)