Skip to content

Commit

Permalink
added cache to device managers. devices are now created once, and cac…
Browse files Browse the repository at this point in the history
…hed (until close is called)
  • Loading branch information
Avivsalem committed Nov 9, 2023
1 parent 1592c41 commit 3fa390d
Show file tree
Hide file tree
Showing 35 changed files with 288 additions and 133 deletions.
7 changes: 4 additions & 3 deletions docs/tutorial/devices.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ rabbitmq_device_manager = RabbitMQOutputDeviceManager(hosts=['my_rabbit_host'],
user='USERNAME',
password='PASSWORD')

rabbitmq_queue = rabbitmq_device_manager.get_output_device('MY_QUEUE_NAME') # get the output device (rabbitmq queue)
rabbitmq_queue = rabbitmq_device_manager.get_output_device(
'MY_QUEUE_NAME') # get the output device (rabbitmq queue)
message = Message(b'data to send')
rabbitmq_queue.send_message(message) # sends the message to the queue
```
Expand Down Expand Up @@ -73,8 +74,8 @@ rabbitmq_failover = RabbitMQOutputDeviceManager(hosts=['my_rabbit_failover_host'
device_manager = FailoverOutputDeviceManager(inner_device_manager=rabbitmq_primary,
failover_device_manager=rabbitmq_failover)


rabbitmq_queue = device_manager.get_output_device('MY_QUEUE_NAME') # this is now a FailoverOutputDevice
message = Message(b'data to send')
rabbitmq_queue.send_message(message) # sends the message to the primary queue. if there's an error, then send to the failover queue
rabbitmq_queue.send_message(
message) # sends the message to the primary queue. if there's an error, then send to the failover queue
```
51 changes: 43 additions & 8 deletions messageflux/iodevices/base/input_devices.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import threading
from abc import ABCMeta, abstractmethod
from time import perf_counter
from typing import Optional, List, TypeVar, Generic
from typing import Optional, List, TypeVar, Generic, Dict

from messageflux.iodevices.base.common import MessageBundle, Message, DeviceHeaders
from messageflux.iodevices.base.input_transaction import InputTransaction, NULLTransaction
Expand Down Expand Up @@ -105,7 +105,7 @@ def close(self):
"""
and optional method that cleans device resources if necessary
"""
pass
self._manager.delete_input_device_from_cache(self.name)


class AggregatedInputDevice(InputDevice[TManagerType]):
Expand All @@ -120,7 +120,9 @@ def __init__(self, manager: TManagerType, inner_devices: List[InputDevice]):
:param manager: the input device manager that created this device
:param inner_devices: the list of input devices to read from
"""
super().__init__(manager=manager, name="AggregateInputDevice")
super().__init__(manager=manager,
name="AggregateInputDevice")

self._inner_devices_iterator: StatefulListIterator[InputDevice] = StatefulListIterator(inner_devices)
self._last_read_device: Optional[InputDevice] = None
self._logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -179,6 +181,7 @@ def close(self):
"""
tries to close underlying devices
"""
super().close()
for inner_device in self._inner_devices_iterator:
try:
inner_device.close()
Expand All @@ -191,6 +194,10 @@ class InputDeviceManager(Generic[TInputDeviceType], metaclass=ABCMeta):
this is the base class for input device managers. this class is used to create input devices.
"""

def __init__(self, **kwargs):
super().__init__(**kwargs)
self._input_device_cache: Dict[str, TInputDeviceType] = {}

def __enter__(self):
self.connect()
return self
Expand All @@ -210,8 +217,32 @@ def disconnect(self):
"""
pass

@abstractmethod
def get_input_device(self, name: str) -> TInputDeviceType:
"""
checks the cache for a device with 'name' and returns it. creates it if it's not in cache
:param name: the name of the input device to return
:return: the input device
"""
input_device = self._input_device_cache.get(name, None)
if input_device is None:
input_device = self._create_input_device(name)
self._input_device_cache[name] = input_device

return input_device

def delete_input_device_from_cache(self, name: str) -> bool:
"""
deletes a cached input device from cache.
:param name:the device to delete from cache
:return: True if the device existed and deleted, False otherwise
"""
device = self._input_device_cache.pop(name, None)
return device is not None

@abstractmethod
def _create_input_device(self, name: str) -> TInputDeviceType:
"""
creates an input device. should be implemented in child classes
Expand Down Expand Up @@ -239,7 +270,10 @@ class _NullInputDeviceManager(InputDeviceManager):
this is a stub used to create null device
"""

def get_input_device(self, name: str) -> InputDevice:
def __init__(self, **kwargs):
super().__init__(**kwargs)

def _create_input_device(self, name: str) -> InputDevice:
"""
returns a null device
"""
Expand All @@ -252,7 +286,8 @@ class _NullDevice(InputDevice):
"""

def __init__(self):
super(_NullDevice, self).__init__(_NullInputDeviceManager(), '__NULL__')
super().__init__(manager=_NullInputDeviceManager(),
name='__NULL__')

def _read_message(self,
cancellation_token: threading.Event,
Expand Down Expand Up @@ -282,8 +317,8 @@ def __init__(self,
:param device_headers: Additional Headers that may return data from device, or affect its operation.
:param transaction: the transaction returned by the reading device
"""
super(ReadResult, self).__init__(message=message,
device_headers=device_headers)
super().__init__(message=message,
device_headers=device_headers)

self._transaction = transaction

Expand Down
34 changes: 31 additions & 3 deletions messageflux/iodevices/base/output_devices.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from abc import ABCMeta, abstractmethod
from typing import Optional, TypeVar, Generic
from typing import Optional, TypeVar, Generic, Dict

from messageflux.iodevices.base.common import Message, DeviceHeaders, MessageBundle
from messageflux.utils import AggregatedException
Expand Down Expand Up @@ -73,14 +73,18 @@ def close(self):
"""
and optional method that cleans device resources if necessary
"""
pass
self._manager.delete_output_device_from_cache(self.name)


class OutputDeviceManager(Generic[TOutputDeviceType], metaclass=ABCMeta):
"""
this is a base class for output device managers. it is used to create output devices
"""

def __init__(self, **kwargs):
super().__init__(**kwargs)
self._output_device_cache: Dict[str, TOutputDeviceType] = {}

def __enter__(self):
self.connect()
return self
Expand All @@ -100,11 +104,35 @@ def disconnect(self):
"""
pass

@abstractmethod
def delete_output_device_from_cache(self, name: str) -> bool:
"""
deletes a cached output device from cache.
:param name:the device to delete from cache
:return: True if the device existed and deleted, False otherwise
"""
device = self._output_device_cache.pop(name, None)
return device is not None

def get_output_device(self, name: str) -> TOutputDeviceType:
"""
creates an output device. this should be implemented by child classes
:param name: the name of the output device to create
:return: the created output device
"""
output_device = self._output_device_cache.get(name, None)
if output_device is None:
output_device = self._create_output_device(name)
self._output_device_cache[name] = output_device

return output_device

@abstractmethod
def _create_output_device(self, name: str) -> TOutputDeviceType:
"""
creates an output device. this should be implemented by child classes
:param name: the name of the output device to create
:return: the created output device
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ def __init__(self,
:param input_devices: the devices that this aggregate device reads from
"""
super(CollectionInputDevice, self).__init__(device_manager, device_name)
super().__init__(manager=device_manager,
name=device_name)

self._input_devices = input_devices
self._logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -84,6 +86,7 @@ def close(self):
"""
closes the connection to device
"""
super().close()
for device in self._input_devices:
try:
device.close()
Expand All @@ -98,13 +101,15 @@ class CollectionInputDeviceManager(InputDeviceManager[CollectionInputDevice]):

def __init__(self,
inner_managers: List[InputDeviceManager],
collection_maker: Callable[[List[InputDevice]], Collection[InputDevice]]):
collection_maker: Callable[[List[InputDevice]], Collection[InputDevice]], **kwargs):
"""
This class is used to create Collection InputDevices
:param inner_managers: the actual InputDeviceManager instances to generate devices from
:param collection_maker: the callable to make the iterable collection from list of devices
"""
super().__init__(**kwargs)

self._inner_managers = inner_managers
self._logger = logging.getLogger(__name__)
self._collection_maker = collection_maker
Expand Down Expand Up @@ -137,7 +142,7 @@ def disconnect(self):
self._logger.warning(
f'Error closing underlying manager {type(manager).__name__}', exc_info=True)

def get_input_device(self, name: str) -> CollectionInputDevice:
def _create_input_device(self, name: str) -> CollectionInputDevice:
"""
Returns an input device by name
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ def __init__(self,
:param device_name: the name of this device
:param output_devices: the outgoing devices to send to
"""
super(CollectionOutputDevice, self).__init__(device_manager, device_name)
super().__init__(manager=device_manager,
name=device_name)

self._output_devices = output_devices

self._logger = logging.getLogger(__name__)
Expand All @@ -47,6 +49,8 @@ def close(self):
"""
closes the connection to device
"""
super().close()

for device in self._output_devices:
try:
device.close()
Expand All @@ -61,13 +65,14 @@ class CollectionOutputDeviceManager(OutputDeviceManager[CollectionOutputDevice])

def __init__(self,
inner_managers: List[OutputDeviceManager],
collection_maker: Callable[[List[OutputDevice]], Collection[OutputDevice]]):
collection_maker: Callable[[List[OutputDevice]], Collection[OutputDevice]], **kwargs):
"""
This class is used to create Collection OutputDevices
:param inner_managers: the actual OutputDeviceManager instances to generate devices from
:param collection_maker: the callable to make the iterable collection from list of devices
"""
super().__init__(**kwargs)
self._inner_managers = inner_managers
self._logger = logging.getLogger(__name__)
self._collection_maker = collection_maker
Expand Down Expand Up @@ -100,7 +105,7 @@ def disconnect(self):
self._logger.warning(
f'Error closing underlying manager {type(manager).__name__}', exc_info=True)

def get_output_device(self, name: str) -> CollectionOutputDevice:
def _create_output_device(self, name: str) -> CollectionOutputDevice:
"""
Returns an output device by name
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import logging
from typing import Optional

from messageflux.iodevices.base import OutputDevice, OutputDeviceException, OutputDeviceManager
from messageflux.iodevices.base.common import MessageBundle
from typing import Optional


class FailoverOutputDevice(OutputDevice['FailoverOutputDeviceManager']):
Expand All @@ -24,7 +24,9 @@ def __init__(self,
"""
if device_name is None:
device_name = inner_device.name
super(FailoverOutputDevice, self).__init__(device_manager, device_name)
super().__init__(manager=device_manager,
name=device_name)

self._inner_device = inner_device
self._failover_device = failover_device
self._logger = logging.getLogger(__name__)
Expand All @@ -49,6 +51,8 @@ def close(self):
"""
closes the underlying devices
"""
super().close()

primary_worked = False
try:
self._inner_device.close()
Expand Down Expand Up @@ -92,11 +96,16 @@ class FailoverOutputDeviceManager(OutputDeviceManager[OutputDevice]):
Output Device Manager that uses a failover if the send fails
"""

def __init__(self, inner_device_manager: OutputDeviceManager, failover_device_manager: OutputDeviceManager):
def __init__(self,
inner_device_manager: OutputDeviceManager,
failover_device_manager: OutputDeviceManager,
**kwargs):
"""
:param inner_device_manager: the device to wrap
:param failover_device_manager: the failover device_manager to send to in case of primary fail
"""
super().__init__(**kwargs)

self._inner_device_manager = inner_device_manager
self._failover_device_manager = failover_device_manager
self._logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -144,7 +153,7 @@ def disconnect(self):
else:
raise

def get_output_device(self, name: str) -> OutputDevice:
def _create_output_device(self, name: str) -> OutputDevice:
"""
Returns an output device by name
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,17 @@ def __init__(self,
queue_dir_name: str = DEFAULT_QUEUES_SUB_DIR,
tmp_dir_name: str = DEFAULT_TMPDIR_SUB_DIR,
bookkeeping_dir_name: str = DEFAULT_BOOKKEEPING_SUB_DIR,
serializer: Optional[FileSystemSerializerBase] = None):
serializer: Optional[FileSystemSerializerBase] = None,
**kwargs):
"""
:param root_folder: the root folder to use for the manager
:param queue_dir_name: the name of the subdirectory under root_folder that holds the queues
:param tmp_dir_name: the name of the subdirectory under root_folder to use for temp files
:param bookkeeping_dir_name: the name of the subdirectory under root_folder that holds the book-keeping data
:param serializer: the serializer to use to write messages to files. None will use the default serializer
"""
super().__init__(**kwargs)

self._root_folder = root_folder
self._queues_folder = os.path.join(root_folder, queue_dir_name)
self._tmp_folder = os.path.join(root_folder, tmp_dir_name)
Expand Down
Loading

0 comments on commit 3fa390d

Please sign in to comment.