From 3fa390d8c0032d84d773eeb4e758f3ce095c0ef6 Mon Sep 17 00:00:00 2001 From: avivs Date: Thu, 9 Nov 2023 09:56:31 +0200 Subject: [PATCH] added cache to device managers. devices are now created once, and cached (until close is called) --- docs/tutorial/devices.md | 7 +-- messageflux/iodevices/base/input_devices.py | 51 ++++++++++++++++--- messageflux/iodevices/base/output_devices.py | 34 +++++++++++-- .../collection_input_device.py | 11 ++-- .../collection_output_device.py | 11 ++-- .../failover_output_device.py | 17 +++++-- .../file_system_device_manager_base.py | 5 +- .../file_system/file_system_input_device.py | 14 +++-- .../file_system/file_system_output_device.py | 20 +++++--- .../in_memory_device_manager.py | 19 ++++--- .../message_store_input_device.py | 12 +++-- .../message_store_output_device.py | 7 ++- .../rabbitmq/rabbitmq_device_manager_base.py | 12 +++-- .../rabbitmq/rabbitmq_input_device.py | 16 +++--- .../rabbitmq/rabbitmq_output_device.py | 31 ++++++----- .../rabbitmq_poison_counting_input_device.py | 8 +-- .../roundrobin_io_device_manager.py | 14 ++--- .../short_circuit_input_device.py | 8 ++- .../short_circuit_output_device.py | 8 ++- messageflux/iodevices/sqs/sqs_input_device.py | 13 +++-- messageflux/iodevices/sqs/sqs_manager_base.py | 4 +- .../iodevices/sqs/sqs_output_device.py | 11 ++-- .../transformer_input_device.py | 10 ++-- .../transformer_output_device.py | 10 ++-- .../logging/bulk_rotating_device_handler.py | 12 ++--- .../logging/bulk_rotating_file_handler.py | 10 ++-- .../logging/bulk_rotating_handler_base.py | 4 +- messageflux/message_handling_service.py | 5 +- .../multiprocessing/multiprocessrunner.py | 2 +- messageflux/pipeline_service.py | 2 +- messageflux/utils/__init__.py | 5 +- tests/devices/mocks.py | 8 ++- tests/devices/roundrobin_test.py | 6 ++- tests/devices/short_circuit_device_test.py | 9 ++-- tests/message_handling_service_test.py | 5 +- 35 files changed, 288 insertions(+), 133 deletions(-) diff --git a/docs/tutorial/devices.md b/docs/tutorial/devices.md index 302b39f..b39aab9 100644 --- a/docs/tutorial/devices.md +++ b/docs/tutorial/devices.md @@ -38,7 +38,8 @@ rabbitmq_device_manager = RabbitMQOutputDeviceManager(hosts=['my_rabbit_host'], user='USERNAME', password='PASSWORD') -rabbitmq_queue = rabbitmq_device_manager.get_output_device('MY_QUEUE_NAME') # get the output device (rabbitmq queue) +rabbitmq_queue = rabbitmq_device_manager.get_output_device( + 'MY_QUEUE_NAME') # get the output device (rabbitmq queue) message = Message(b'data to send') rabbitmq_queue.send_message(message) # sends the message to the queue ``` @@ -73,8 +74,8 @@ rabbitmq_failover = RabbitMQOutputDeviceManager(hosts=['my_rabbit_failover_host' device_manager = FailoverOutputDeviceManager(inner_device_manager=rabbitmq_primary, failover_device_manager=rabbitmq_failover) - rabbitmq_queue = device_manager.get_output_device('MY_QUEUE_NAME') # this is now a FailoverOutputDevice message = Message(b'data to send') -rabbitmq_queue.send_message(message) # sends the message to the primary queue. if there's an error, then send to the failover queue +rabbitmq_queue.send_message( + message) # sends the message to the primary queue. if there's an error, then send to the failover queue ``` diff --git a/messageflux/iodevices/base/input_devices.py b/messageflux/iodevices/base/input_devices.py index 0b63a6f..f64138b 100644 --- a/messageflux/iodevices/base/input_devices.py +++ b/messageflux/iodevices/base/input_devices.py @@ -2,7 +2,7 @@ import threading from abc import ABCMeta, abstractmethod from time import perf_counter -from typing import Optional, List, TypeVar, Generic +from typing import Optional, List, TypeVar, Generic, Dict from messageflux.iodevices.base.common import MessageBundle, Message, DeviceHeaders from messageflux.iodevices.base.input_transaction import InputTransaction, NULLTransaction @@ -105,7 +105,7 @@ def close(self): """ and optional method that cleans device resources if necessary """ - pass + self._manager.delete_input_device_from_cache(self.name) class AggregatedInputDevice(InputDevice[TManagerType]): @@ -120,7 +120,9 @@ def __init__(self, manager: TManagerType, inner_devices: List[InputDevice]): :param manager: the input device manager that created this device :param inner_devices: the list of input devices to read from """ - super().__init__(manager=manager, name="AggregateInputDevice") + super().__init__(manager=manager, + name="AggregateInputDevice") + self._inner_devices_iterator: StatefulListIterator[InputDevice] = StatefulListIterator(inner_devices) self._last_read_device: Optional[InputDevice] = None self._logger = logging.getLogger(__name__) @@ -179,6 +181,7 @@ def close(self): """ tries to close underlying devices """ + super().close() for inner_device in self._inner_devices_iterator: try: inner_device.close() @@ -191,6 +194,10 @@ class InputDeviceManager(Generic[TInputDeviceType], metaclass=ABCMeta): this is the base class for input device managers. this class is used to create input devices. """ + def __init__(self, **kwargs): + super().__init__(**kwargs) + self._input_device_cache: Dict[str, TInputDeviceType] = {} + def __enter__(self): self.connect() return self @@ -210,8 +217,32 @@ def disconnect(self): """ pass - @abstractmethod def get_input_device(self, name: str) -> TInputDeviceType: + """ + checks the cache for a device with 'name' and returns it. creates it if it's not in cache + + :param name: the name of the input device to return + :return: the input device + """ + input_device = self._input_device_cache.get(name, None) + if input_device is None: + input_device = self._create_input_device(name) + self._input_device_cache[name] = input_device + + return input_device + + def delete_input_device_from_cache(self, name: str) -> bool: + """ + deletes a cached input device from cache. + + :param name:the device to delete from cache + :return: True if the device existed and deleted, False otherwise + """ + device = self._input_device_cache.pop(name, None) + return device is not None + + @abstractmethod + def _create_input_device(self, name: str) -> TInputDeviceType: """ creates an input device. should be implemented in child classes @@ -239,7 +270,10 @@ class _NullInputDeviceManager(InputDeviceManager): this is a stub used to create null device """ - def get_input_device(self, name: str) -> InputDevice: + def __init__(self, **kwargs): + super().__init__(**kwargs) + + def _create_input_device(self, name: str) -> InputDevice: """ returns a null device """ @@ -252,7 +286,8 @@ class _NullDevice(InputDevice): """ def __init__(self): - super(_NullDevice, self).__init__(_NullInputDeviceManager(), '__NULL__') + super().__init__(manager=_NullInputDeviceManager(), + name='__NULL__') def _read_message(self, cancellation_token: threading.Event, @@ -282,8 +317,8 @@ def __init__(self, :param device_headers: Additional Headers that may return data from device, or affect its operation. :param transaction: the transaction returned by the reading device """ - super(ReadResult, self).__init__(message=message, - device_headers=device_headers) + super().__init__(message=message, + device_headers=device_headers) self._transaction = transaction diff --git a/messageflux/iodevices/base/output_devices.py b/messageflux/iodevices/base/output_devices.py index 8383254..603fbd1 100644 --- a/messageflux/iodevices/base/output_devices.py +++ b/messageflux/iodevices/base/output_devices.py @@ -1,5 +1,5 @@ from abc import ABCMeta, abstractmethod -from typing import Optional, TypeVar, Generic +from typing import Optional, TypeVar, Generic, Dict from messageflux.iodevices.base.common import Message, DeviceHeaders, MessageBundle from messageflux.utils import AggregatedException @@ -73,7 +73,7 @@ def close(self): """ and optional method that cleans device resources if necessary """ - pass + self._manager.delete_output_device_from_cache(self.name) class OutputDeviceManager(Generic[TOutputDeviceType], metaclass=ABCMeta): @@ -81,6 +81,10 @@ class OutputDeviceManager(Generic[TOutputDeviceType], metaclass=ABCMeta): this is a base class for output device managers. it is used to create output devices """ + def __init__(self, **kwargs): + super().__init__(**kwargs) + self._output_device_cache: Dict[str, TOutputDeviceType] = {} + def __enter__(self): self.connect() return self @@ -100,11 +104,35 @@ def disconnect(self): """ pass - @abstractmethod + def delete_output_device_from_cache(self, name: str) -> bool: + """ + deletes a cached output device from cache. + + :param name:the device to delete from cache + :return: True if the device existed and deleted, False otherwise + """ + device = self._output_device_cache.pop(name, None) + return device is not None + def get_output_device(self, name: str) -> TOutputDeviceType: """ creates an output device. this should be implemented by child classes + :param name: the name of the output device to create + :return: the created output device + """ + output_device = self._output_device_cache.get(name, None) + if output_device is None: + output_device = self._create_output_device(name) + self._output_device_cache[name] = output_device + + return output_device + + @abstractmethod + def _create_output_device(self, name: str) -> TOutputDeviceType: + """ + creates an output device. this should be implemented by child classes + :param name: the name of the output device to create :return: the created output device """ diff --git a/messageflux/iodevices/collection_device_wrapper/collection_input_device.py b/messageflux/iodevices/collection_device_wrapper/collection_input_device.py index 8fc2c70..d4832f7 100644 --- a/messageflux/iodevices/collection_device_wrapper/collection_input_device.py +++ b/messageflux/iodevices/collection_device_wrapper/collection_input_device.py @@ -26,7 +26,9 @@ def __init__(self, :param input_devices: the devices that this aggregate device reads from """ - super(CollectionInputDevice, self).__init__(device_manager, device_name) + super().__init__(manager=device_manager, + name=device_name) + self._input_devices = input_devices self._logger = logging.getLogger(__name__) @@ -84,6 +86,7 @@ def close(self): """ closes the connection to device """ + super().close() for device in self._input_devices: try: device.close() @@ -98,13 +101,15 @@ class CollectionInputDeviceManager(InputDeviceManager[CollectionInputDevice]): def __init__(self, inner_managers: List[InputDeviceManager], - collection_maker: Callable[[List[InputDevice]], Collection[InputDevice]]): + collection_maker: Callable[[List[InputDevice]], Collection[InputDevice]], **kwargs): """ This class is used to create Collection InputDevices :param inner_managers: the actual InputDeviceManager instances to generate devices from :param collection_maker: the callable to make the iterable collection from list of devices """ + super().__init__(**kwargs) + self._inner_managers = inner_managers self._logger = logging.getLogger(__name__) self._collection_maker = collection_maker @@ -137,7 +142,7 @@ def disconnect(self): self._logger.warning( f'Error closing underlying manager {type(manager).__name__}', exc_info=True) - def get_input_device(self, name: str) -> CollectionInputDevice: + def _create_input_device(self, name: str) -> CollectionInputDevice: """ Returns an input device by name diff --git a/messageflux/iodevices/collection_device_wrapper/collection_output_device.py b/messageflux/iodevices/collection_device_wrapper/collection_output_device.py index d79e0cc..8ba9445 100644 --- a/messageflux/iodevices/collection_device_wrapper/collection_output_device.py +++ b/messageflux/iodevices/collection_device_wrapper/collection_output_device.py @@ -20,7 +20,9 @@ def __init__(self, :param device_name: the name of this device :param output_devices: the outgoing devices to send to """ - super(CollectionOutputDevice, self).__init__(device_manager, device_name) + super().__init__(manager=device_manager, + name=device_name) + self._output_devices = output_devices self._logger = logging.getLogger(__name__) @@ -47,6 +49,8 @@ def close(self): """ closes the connection to device """ + super().close() + for device in self._output_devices: try: device.close() @@ -61,13 +65,14 @@ class CollectionOutputDeviceManager(OutputDeviceManager[CollectionOutputDevice]) def __init__(self, inner_managers: List[OutputDeviceManager], - collection_maker: Callable[[List[OutputDevice]], Collection[OutputDevice]]): + collection_maker: Callable[[List[OutputDevice]], Collection[OutputDevice]], **kwargs): """ This class is used to create Collection OutputDevices :param inner_managers: the actual OutputDeviceManager instances to generate devices from :param collection_maker: the callable to make the iterable collection from list of devices """ + super().__init__(**kwargs) self._inner_managers = inner_managers self._logger = logging.getLogger(__name__) self._collection_maker = collection_maker @@ -100,7 +105,7 @@ def disconnect(self): self._logger.warning( f'Error closing underlying manager {type(manager).__name__}', exc_info=True) - def get_output_device(self, name: str) -> CollectionOutputDevice: + def _create_output_device(self, name: str) -> CollectionOutputDevice: """ Returns an output device by name diff --git a/messageflux/iodevices/failover_output_device_wrapper/failover_output_device.py b/messageflux/iodevices/failover_output_device_wrapper/failover_output_device.py index c7ea796..b5bfed2 100644 --- a/messageflux/iodevices/failover_output_device_wrapper/failover_output_device.py +++ b/messageflux/iodevices/failover_output_device_wrapper/failover_output_device.py @@ -1,8 +1,8 @@ import logging +from typing import Optional from messageflux.iodevices.base import OutputDevice, OutputDeviceException, OutputDeviceManager from messageflux.iodevices.base.common import MessageBundle -from typing import Optional class FailoverOutputDevice(OutputDevice['FailoverOutputDeviceManager']): @@ -24,7 +24,9 @@ def __init__(self, """ if device_name is None: device_name = inner_device.name - super(FailoverOutputDevice, self).__init__(device_manager, device_name) + super().__init__(manager=device_manager, + name=device_name) + self._inner_device = inner_device self._failover_device = failover_device self._logger = logging.getLogger(__name__) @@ -49,6 +51,8 @@ def close(self): """ closes the underlying devices """ + super().close() + primary_worked = False try: self._inner_device.close() @@ -92,11 +96,16 @@ class FailoverOutputDeviceManager(OutputDeviceManager[OutputDevice]): Output Device Manager that uses a failover if the send fails """ - def __init__(self, inner_device_manager: OutputDeviceManager, failover_device_manager: OutputDeviceManager): + def __init__(self, + inner_device_manager: OutputDeviceManager, + failover_device_manager: OutputDeviceManager, + **kwargs): """ :param inner_device_manager: the device to wrap :param failover_device_manager: the failover device_manager to send to in case of primary fail """ + super().__init__(**kwargs) + self._inner_device_manager = inner_device_manager self._failover_device_manager = failover_device_manager self._logger = logging.getLogger(__name__) @@ -144,7 +153,7 @@ def disconnect(self): else: raise - def get_output_device(self, name: str) -> OutputDevice: + def _create_output_device(self, name: str) -> OutputDevice: """ Returns an output device by name diff --git a/messageflux/iodevices/file_system/file_system_device_manager_base.py b/messageflux/iodevices/file_system/file_system_device_manager_base.py index 2cc70ec..f865447 100644 --- a/messageflux/iodevices/file_system/file_system_device_manager_base.py +++ b/messageflux/iodevices/file_system/file_system_device_manager_base.py @@ -21,7 +21,8 @@ def __init__(self, queue_dir_name: str = DEFAULT_QUEUES_SUB_DIR, tmp_dir_name: str = DEFAULT_TMPDIR_SUB_DIR, bookkeeping_dir_name: str = DEFAULT_BOOKKEEPING_SUB_DIR, - serializer: Optional[FileSystemSerializerBase] = None): + serializer: Optional[FileSystemSerializerBase] = None, + **kwargs): """ :param root_folder: the root folder to use for the manager :param queue_dir_name: the name of the subdirectory under root_folder that holds the queues @@ -29,6 +30,8 @@ def __init__(self, :param bookkeeping_dir_name: the name of the subdirectory under root_folder that holds the book-keeping data :param serializer: the serializer to use to write messages to files. None will use the default serializer """ + super().__init__(**kwargs) + self._root_folder = root_folder self._queues_folder = os.path.join(root_folder, queue_dir_name) self._tmp_folder = os.path.join(root_folder, tmp_dir_name) diff --git a/messageflux/iodevices/file_system/file_system_input_device.py b/messageflux/iodevices/file_system/file_system_input_device.py index c30f5ac..b1710ee 100644 --- a/messageflux/iodevices/file_system/file_system_input_device.py +++ b/messageflux/iodevices/file_system/file_system_input_device.py @@ -33,7 +33,7 @@ def __init__(self, device: 'FileSystemInputDevice', org_path: str, tmp_path: str :param org_path: the original path of the file we read :param tmp_path: the temp path of the file we read """ - super(FileSystemInputTransaction, self).__init__(device=device) + super().__init__(device=device) self._org_path = org_path self._tmp_path = tmp_path self._device_manager = device.manager @@ -283,7 +283,9 @@ def __init__(self, :param min_file_age: the minimum time in seconds since last modification to file, before we to try to read it... :param serializer: the serializer to use for reading messages from files """ - super(FileSystemInputDevice, self).__init__(manager, name) + super().__init__(manager=manager, + name=name) + self.min_file_age = min_file_age self._sorted = fifo self._tmp_folder = tmp_folder @@ -489,7 +491,8 @@ def __init__(self, serializer: Optional[FileSystemSerializerBase] = None, fifo: bool = True, min_input_file_age: int = 0, - transaction_log_save_interval: int = 10): + transaction_log_save_interval: int = 10, + **kwargs): """ :param root_folder: the root folder to use for the manager :param queue_dir_name: the name of the subdirectory under root_folder that holds the queues @@ -504,7 +507,8 @@ def __init__(self, queue_dir_name=queue_dir_name, tmp_dir_name=tmp_dir_name, bookkeeping_dir_name=bookkeeping_dir_name, - serializer=serializer) + serializer=serializer, **kwargs) + self._fifo = fifo self._min_input_file_age = min_input_file_age transaction_log_filename = os.path.join(self.bookkeeping_folder, f'{self._unique_manager_id}.transactionlog') @@ -559,7 +563,7 @@ def transaction_log(self) -> TransactionLog: """ return self._transaction_log - def get_input_device(self, name: str) -> FileSystemInputDevice: + def _create_input_device(self, name: str) -> FileSystemInputDevice: """ Returns an input device by name diff --git a/messageflux/iodevices/file_system/file_system_output_device.py b/messageflux/iodevices/file_system/file_system_output_device.py index 68f5136..37e1c07 100644 --- a/messageflux/iodevices/file_system/file_system_output_device.py +++ b/messageflux/iodevices/file_system/file_system_output_device.py @@ -32,7 +32,9 @@ def __init__(self, :param format_filename: the filename format to save the product, "{filename}-{item_id}" :param serializer: the serializer to use """ - super(FileSystemOutputDevice, self).__init__(manager, name) + super().__init__(manager=manager, + name=name) + self._tmp_folder = tmp_folder self._output_folder = os.path.join(queues_folder, name) try: @@ -99,7 +101,8 @@ def __init__(self, tmp_dir_name: str = FileSystemDeviceManagerBase.DEFAULT_TMPDIR_SUB_DIR, bookkeeping_dir_name: str = FileSystemDeviceManagerBase.DEFAULT_BOOKKEEPING_SUB_DIR, serializer: Optional[FileSystemSerializerBase] = None, - output_filename_format: Optional[str] = None): + output_filename_format: Optional[str] = None, + **kwargs): """ :param root_folder: the root folder to use for the manager :param queue_dir_name: the name of the subdirectory under root_folder that holds the queues @@ -108,11 +111,12 @@ def __init__(self, :param serializer: the serializer to use to write messages to files. None will use the default serializer :param output_filename_format: the filename format to save a product, "{filename}-{item_id}" """ - super(FileSystemOutputDeviceManager, self).__init__(root_folder=root_folder, - queue_dir_name=queue_dir_name, - tmp_dir_name=tmp_dir_name, - bookkeeping_dir_name=bookkeeping_dir_name, - serializer=serializer) + super().__init__(root_folder=root_folder, + queue_dir_name=queue_dir_name, + tmp_dir_name=tmp_dir_name, + bookkeeping_dir_name=bookkeeping_dir_name, + serializer=serializer, + **kwargs) self._output_filename_format = output_filename_format def connect(self): @@ -124,7 +128,7 @@ def connect(self): except Exception as ex: raise OutputDeviceException('Error connection to Device Manager') from ex - def get_output_device(self, name: str) -> FileSystemOutputDevice: + def _create_output_device(self, name: str) -> FileSystemOutputDevice: """ Returns an outgoing device by name diff --git a/messageflux/iodevices/in_memory_device/in_memory_device_manager.py b/messageflux/iodevices/in_memory_device/in_memory_device_manager.py index dca935c..dfa9b1d 100644 --- a/messageflux/iodevices/in_memory_device/in_memory_device_manager.py +++ b/messageflux/iodevices/in_memory_device/in_memory_device_manager.py @@ -50,7 +50,7 @@ class InMemoryTransaction(InputTransaction): """ def __init__(self, device: 'InMemoryInputDevice', message: _QueueMessage): - super().__init__(device) + super().__init__(device=device) self._message = message self._device: InMemoryInputDevice = device @@ -67,7 +67,10 @@ def __init__(self, manager: 'InMemoryDeviceManager', name: str, queue: List[_QueueMessage], queue_not_empty_condition: Condition): - super().__init__(manager, name) + + super().__init__(manager=manager, + name=name) + self._queue = queue self._queue_not_empty = queue_not_empty_condition @@ -106,7 +109,10 @@ def __init__(self, manager: 'InMemoryDeviceManager', name: str, queue: List[_QueueMessage], queue_not_empty_condition: Condition): - super().__init__(manager, name) + + super().__init__(manager=manager, + name=name) + self._queue = queue self._queue_not_empty = queue_not_empty_condition @@ -127,7 +133,8 @@ class InMemoryDeviceManager(InputDeviceManager[InMemoryInputDevice], OutputDevic notice that the messages are shared only within the same manager! """ - def __init__(self): + def __init__(self, **kwargs): + super().__init__(**kwargs) self._queues: Dict[str, Tuple[List[_QueueMessage], Condition]] = {} def _get_queue_tuple(self, name: str) -> Tuple[List[_QueueMessage], Condition]: @@ -141,7 +148,7 @@ def _get_queue_tuple(self, name: str) -> Tuple[List[_QueueMessage], Condition]: return queue, condition - def get_input_device(self, name: str) -> InMemoryInputDevice: + def _create_input_device(self, name: str) -> InMemoryInputDevice: """ creates an input device. @@ -152,7 +159,7 @@ def get_input_device(self, name: str) -> InMemoryInputDevice: return InMemoryInputDevice(self, name, queue, condition) - def get_output_device(self, name: str) -> InMemoryOutputDevice: + def _create_output_device(self, name: str) -> InMemoryOutputDevice: """ creates an output device. diff --git a/messageflux/iodevices/message_store_device_wrapper/message_store_input_device.py b/messageflux/iodevices/message_store_device_wrapper/message_store_input_device.py index a01453e..4841b50 100644 --- a/messageflux/iodevices/message_store_device_wrapper/message_store_input_device.py +++ b/messageflux/iodevices/message_store_device_wrapper/message_store_input_device.py @@ -31,7 +31,7 @@ def __init__(self, :param delete_on_commit: Whether or not we should delete the stored buffer after committing a message from the relevant input device """ - super(MessageStoreInputTransactionWrapper, self).__init__(device) + super().__init__(device=device) self._inner_transaction = inner_transaction self._delete_on_commit = delete_on_commit self._message_store = message_store @@ -67,7 +67,7 @@ def __init__(self, :param message_store: the message store to use :param delete_on_commit: should we the delete the message from the message store on transaction commit? """ - super(MessageStoreInputTransformer, self).__init__(message_store=message_store) + super().__init__(message_store=message_store) self._delete_on_commit = delete_on_commit def transform_incoming_message(self, input_device: TransformerInputDevice, read_result: ReadResult) -> ReadResult: @@ -113,7 +113,7 @@ class MessageStoreInputDeviceManagerWrapper(TransformerInputDeviceManager): def __init__(self, inner_manager: InputDeviceManager, message_store: MessageStoreBase, - delete_on_commit: bool = True): + delete_on_commit: bool = True, **kwargs): """ This class is used to wrap IODeviceManager with message store functionality @@ -124,6 +124,10 @@ def __init__(self, to leave the stored message in order to avoid conflicts between systems that read duplications of the same message. """ + transformer = MessageStoreInputTransformer(message_store=message_store, delete_on_commit=delete_on_commit) - super(MessageStoreInputDeviceManagerWrapper, self).__init__(inner_manager, transformer) + + super().__init__(inner_device_manager=inner_manager, + transformer=transformer, + **kwargs) diff --git a/messageflux/iodevices/message_store_device_wrapper/message_store_output_device.py b/messageflux/iodevices/message_store_device_wrapper/message_store_output_device.py index 07fec05..f3ea1a8 100644 --- a/messageflux/iodevices/message_store_device_wrapper/message_store_output_device.py +++ b/messageflux/iodevices/message_store_device_wrapper/message_store_output_device.py @@ -89,7 +89,8 @@ class MessageStoreOutputDeviceManagerWrapper(TransformerOutputDeviceManager): def __init__(self, inner_manager: OutputDeviceManager, message_store: MessageStoreBase, - size_threshold: int = -1): + size_threshold: int = -1, + **kwargs): """ This class is used to wrap IODeviceManager with message store functionality @@ -102,5 +103,7 @@ def __init__(self, transformer = MessageStoreOutputTransformer(message_store=message_store, size_threshold=size_threshold) - super(MessageStoreOutputDeviceManagerWrapper, self).__init__(inner_manager, transformer) + super().__init__(inner_device_manager=inner_manager, + transformer=transformer, + **kwargs) self.size_threshold = size_threshold diff --git a/messageflux/iodevices/rabbitmq/rabbitmq_device_manager_base.py b/messageflux/iodevices/rabbitmq/rabbitmq_device_manager_base.py index 8e61c7d..30078ce 100644 --- a/messageflux/iodevices/rabbitmq/rabbitmq_device_manager_base.py +++ b/messageflux/iodevices/rabbitmq/rabbitmq_device_manager_base.py @@ -1,11 +1,10 @@ import logging import socket import ssl +import sys from random import shuffle from typing import TYPE_CHECKING, List, Optional, Dict, Any, Union, Type -import sys - from messageflux.utils import ThreadLocalMember, KwargsException DEFAULT_CLIENT_ARGS = {'hostname': socket.gethostname()} @@ -34,9 +33,9 @@ class RabbitMQDeviceManagerBase: since it causes trouble when using this device in multiprocess. """ _connection: Union[ThreadLocalMember[Optional['BlockingConnection']], - Optional['BlockingConnection']] = ThreadLocalMember(init_value=None) + Optional['BlockingConnection']] = ThreadLocalMember(init_value=None) _maintenance_channel: Union[ThreadLocalMember[Optional['BlockingChannel']], - Optional['BlockingChannel']] = ThreadLocalMember(init_value=None) + Optional['BlockingChannel']] = ThreadLocalMember(init_value=None) def __init__(self, hosts: Union[List[str], str], @@ -49,7 +48,8 @@ def __init__(self, connection_type: str = "None", heartbeat: int = 300, connection_attempts: int = 5, - blocked_connection_timeout: Optional[float] = None): + blocked_connection_timeout: Optional[float] = None, + **kwargs): """ This manager used to create RabbitMQ devices (direct queues) @@ -71,6 +71,8 @@ def __init__(self, passing `ConnectionBlockedTimeout` exception to on_close_callback in asynchronous adapters or raising it in `BlockingConnection`. """ + super().__init__(**kwargs) + try: import pika except ImportError as ex: diff --git a/messageflux/iodevices/rabbitmq/rabbitmq_input_device.py b/messageflux/iodevices/rabbitmq/rabbitmq_input_device.py index 5fec1fa..0e141ef 100644 --- a/messageflux/iodevices/rabbitmq/rabbitmq_input_device.py +++ b/messageflux/iodevices/rabbitmq/rabbitmq_input_device.py @@ -38,7 +38,7 @@ def __init__(self, :param channel: the BlockingChannel that the item was read from :param delivery_tag: the delivery tag for this item """ - super(RabbitMQInputTransaction, self).__init__(device=device) + super().__init__(device=device) self._cancellation_token = cancellation_token self._channel = channel self._delivery_tag = delivery_tag @@ -124,7 +124,9 @@ def __init__(self, only relevent if "use_consumer" is True :param bool use_consumer: True to use the 'consume' method, False to use 'basic_get' """ - super().__init__(device_manager, queue_name) + super().__init__(manager=device_manager, + name=queue_name) + self._device_manager = device_manager self._queue_name = queue_name self._logger = logging.getLogger(__name__) @@ -318,6 +320,7 @@ def close(self): """ closes the connection to device """ + super().close() try: if self._channel is not None and self._channel.is_open: if self._use_consumer: @@ -347,8 +350,8 @@ def __init__(self, prefetch_count: int = 1, use_consumer: bool = True, blocked_connection_timeout: Optional[float] = None, - default_direct_exchange: Optional[str] = None - ): + default_direct_exchange: Optional[str] = None, + **kwargs): """ This manager used to create RabbitMQ devices (direct queues) @@ -385,7 +388,8 @@ def __init__(self, connection_type="Input", heartbeat=heartbeat, connection_attempts=connection_attempts, - blocked_connection_timeout=blocked_connection_timeout) + blocked_connection_timeout=blocked_connection_timeout, + **kwargs) self._prefetch_count = prefetch_count self._use_consumer = use_consumer @@ -399,7 +403,7 @@ def _device_factory(self, device_name: str) -> RabbitMQInputDevice: prefetch_count=self._prefetch_count, use_consumer=self._use_consumer) - def get_input_device(self, name: str) -> RabbitMQInputDevice: + def _create_input_device(self, name: str) -> RabbitMQInputDevice: """ Returns an incoming device by name diff --git a/messageflux/iodevices/rabbitmq/rabbitmq_output_device.py b/messageflux/iodevices/rabbitmq/rabbitmq_output_device.py index da28dcb..df09ded 100644 --- a/messageflux/iodevices/rabbitmq/rabbitmq_output_device.py +++ b/messageflux/iodevices/rabbitmq/rabbitmq_output_device.py @@ -27,7 +27,9 @@ def __init__(self, device_manager: 'RabbitMQOutputDeviceManager', routing_key: s :param routing_key: the routing key for this queue :param exchange: the exchange name in RabbitMQ for this output device """ - super(RabbitMQOutputDevice, self).__init__(device_manager, routing_key) + super().__init__(manager=device_manager, + name=routing_key) + self._routing_key = routing_key self._exchange = exchange self._logger = logging.getLogger(__name__) @@ -86,7 +88,7 @@ def __init__(self, max_header_name_length: int = 1024, default_rabbit_headers: Optional[Dict[str, Any]] = None, blocked_connection_timeout: Optional[float] = None, - ): + **kwargs): """ This manager used to create RabbitMQ devices (direct queues) @@ -114,17 +116,18 @@ def __init__(self, passing `ConnectionBlockedTimeout` exception to on_close_callback in asynchronous adapters or raising it in `BlockingConnection`. """ - super(RabbitMQOutputDeviceManager, self).__init__(hosts=hosts, - user=user, - password=password, - port=port, - ssl_context=ssl_context, - virtual_host=virtual_host, - client_args=client_args, - connection_type="Output", - heartbeat=heartbeat, - connection_attempts=connection_attempts, - blocked_connection_timeout=blocked_connection_timeout) + super().__init__(hosts=hosts, + user=user, + password=password, + port=port, + ssl_context=ssl_context, + virtual_host=virtual_host, + client_args=client_args, + connection_type="Output", + heartbeat=heartbeat, + connection_attempts=connection_attempts, + blocked_connection_timeout=blocked_connection_timeout, + **kwargs) self._default_output_exchange = default_output_exchange self._publish_confirm = publish_confirm @@ -309,7 +312,7 @@ def get_outgoing_channel(self) -> 'BlockingChannel': except Exception as ex: raise OutputDeviceException('Could not connect to rabbitmq.') from ex - def get_output_device(self, name: str, exchange: Optional[str] = None) -> RabbitMQOutputDevice: + def _create_output_device(self, name: str, exchange: Optional[str] = None) -> RabbitMQOutputDevice: """ Returns and outgoing device by name diff --git a/messageflux/iodevices/rabbitmq/rabbitmq_poison_counting_input_device.py b/messageflux/iodevices/rabbitmq/rabbitmq_poison_counting_input_device.py index d092128..a192d13 100644 --- a/messageflux/iodevices/rabbitmq/rabbitmq_poison_counting_input_device.py +++ b/messageflux/iodevices/rabbitmq/rabbitmq_poison_counting_input_device.py @@ -61,7 +61,7 @@ def __init__(self, :param poison_counter: the poison counter :param message_id: the message id in this transaction """ - super().__init__(inner_transaction.device) + super().__init__(device=inner_transaction.device) self._inner_transaction = inner_transaction self._poison_counter = poison_counter self._message_id = message_id @@ -104,6 +104,7 @@ def __init__(self, consumer_args=consumer_args, prefetch_count=prefetch_count, use_consumer=use_consumer) + self._max_poison_count = max_poison_count self._poison_counter = poison_counter @@ -207,7 +208,7 @@ def __init__(self, use_consumer: bool = True, blocked_connection_timeout: Optional[float] = None, default_direct_exchange: Optional[str] = None, - ): + **kwargs): """ This manager used to create RabbitMQ devices (direct queues) @@ -247,7 +248,8 @@ def __init__(self, prefetch_count=prefetch_count, use_consumer=use_consumer, blocked_connection_timeout=blocked_connection_timeout, - default_direct_exchange=default_direct_exchange) + default_direct_exchange=default_direct_exchange, + **kwargs) self._max_poison_count = max_poison_count self._poison_counter = poison_counter diff --git a/messageflux/iodevices/round_robin_device_wrapper/roundrobin_io_device_manager.py b/messageflux/iodevices/round_robin_device_wrapper/roundrobin_io_device_manager.py index 53181b2..c368214 100644 --- a/messageflux/iodevices/round_robin_device_wrapper/roundrobin_io_device_manager.py +++ b/messageflux/iodevices/round_robin_device_wrapper/roundrobin_io_device_manager.py @@ -16,15 +16,16 @@ class RoundRobinInputDeviceManager(CollectionInputDeviceManager): This class is used to create RoundRobin InputDevices """ - def __init__(self, inner_managers: List[InputDeviceManager]): + def __init__(self, inner_managers: List[InputDeviceManager], **kwargs): """ This class is used to create RoundRobin IODevices :param inner_managers: the actual InputDeviceManager instances to generate devices from """ - super(RoundRobinInputDeviceManager, self).__init__(inner_managers=inner_managers, - collection_maker=_create_round_robin_collection) + super().__init__(inner_managers=inner_managers, + collection_maker=_create_round_robin_collection, + **kwargs) class RoundRobinOutputDeviceManager(CollectionOutputDeviceManager): @@ -32,11 +33,12 @@ class RoundRobinOutputDeviceManager(CollectionOutputDeviceManager): This class is used to create RoundRobin IODevices """ - def __init__(self, inner_managers: List[OutputDeviceManager]): + def __init__(self, inner_managers: List[OutputDeviceManager], **kwargs): """ This class is used to create RoundRobin IODevices :param inner_managers: the actual OutputDeviceManager instances to generate devices from """ - super(RoundRobinOutputDeviceManager, self).__init__(inner_managers=inner_managers, - collection_maker=_create_round_robin_collection) + super().__init__(inner_managers=inner_managers, + collection_maker=_create_round_robin_collection, + **kwargs) diff --git a/messageflux/iodevices/short_circuit_device_wrapper/short_circuit_input_device.py b/messageflux/iodevices/short_circuit_device_wrapper/short_circuit_input_device.py index 50375bd..d36188f 100644 --- a/messageflux/iodevices/short_circuit_device_wrapper/short_circuit_input_device.py +++ b/messageflux/iodevices/short_circuit_device_wrapper/short_circuit_input_device.py @@ -34,6 +34,7 @@ def close(self): """ closes the inner device """ + super().close() self._inner_device.close() @@ -45,7 +46,8 @@ class ShortCircuitInputDeviceManager(InputDeviceManager[ShortCircuitInputDevice] def __init__(self, inner_device_manager: InputDeviceManager, short_circuit_fail_count: int, - short_circuit_time: int): + short_circuit_time: int, + **kwargs): """ :param inner_device_manager: the inner device manager @@ -53,6 +55,8 @@ def __init__(self, the device will be short circuited :param short_circuit_time: the time that the device will remain short circuited """ + super().__init__(**kwargs) + self._inner_device_manager = inner_device_manager self._short_circuit_fail_count = short_circuit_fail_count self._short_circuit_fail_time = short_circuit_time @@ -69,7 +73,7 @@ def disconnect(self): """ self._inner_device_manager.disconnect() - def get_input_device(self, name: str) -> ShortCircuitInputDevice: + def _create_input_device(self, name: str) -> ShortCircuitInputDevice: """ returns a wrapped input device diff --git a/messageflux/iodevices/short_circuit_device_wrapper/short_circuit_output_device.py b/messageflux/iodevices/short_circuit_device_wrapper/short_circuit_output_device.py index 8aae124..b8666c8 100644 --- a/messageflux/iodevices/short_circuit_device_wrapper/short_circuit_output_device.py +++ b/messageflux/iodevices/short_circuit_device_wrapper/short_circuit_output_device.py @@ -27,6 +27,8 @@ def close(self): """ closes the inner device """ + super().close() + self._inner_device.close() @@ -38,7 +40,9 @@ class ShortCircuitOutputDeviceManager(OutputDeviceManager[ShortCircuitOutputDevi def __init__(self, inner_device_manager: OutputDeviceManager, short_circuit_fail_count: int, - short_circuit_time: int): + short_circuit_time: int, + **kwargs): + super().__init__(**kwargs) self._inner_device_manager = inner_device_manager self._short_circuit_fail_count = short_circuit_fail_count self._short_circuit_fail_time = short_circuit_time @@ -55,7 +59,7 @@ def disconnect(self): """ self._inner_device_manager.disconnect() - def get_output_device(self, name: str) -> ShortCircuitOutputDevice: + def _create_output_device(self, name: str) -> ShortCircuitOutputDevice: """ returns a wrapped output device :param name: the name of the output device to get diff --git a/messageflux/iodevices/sqs/sqs_input_device.py b/messageflux/iodevices/sqs/sqs_input_device.py index d9e180a..0f14d89 100644 --- a/messageflux/iodevices/sqs/sqs_input_device.py +++ b/messageflux/iodevices/sqs/sqs_input_device.py @@ -32,7 +32,7 @@ def __init__(self, device: "SQSInputDevice", message: 'SQSMessage'): :param device: the device that returned this transaction :param message: the received message """ - super(SQSInputTransaction, self).__init__(device=device) + super().__init__(device=device) self._message = message self._logger = logging.getLogger(__name__) @@ -70,7 +70,8 @@ def __init__( :param included_message_attributes: list of message attributes to get for the message. defaults to ALL """ - super().__init__(device_manager, queue_name) + super().__init__(manager=device_manager, + name=queue_name) if included_message_attributes is None: included_message_attributes = ["All"] @@ -142,18 +143,20 @@ class SQSInputDeviceManager(SQSManagerBase, InputDeviceManager[SQSInputDevice]): def __init__(self, *, sqs_resource: Optional['SQSServiceResource'] = None, max_messages_per_request: int = 1, - included_message_attributes: Optional[Union[str, List[str]]] = None, ) -> None: + included_message_attributes: Optional[Union[str, List[str]]] = None, **kwargs) -> None: """ :param sqs_resource: the boto sqs service resource. Defaults to creating from env vars :param max_messages_per_request: maximum messages to retrieve from the queue (max 10) :param included_message_attributes: list of message attributes to get for the message. defaults to ALL """ - super().__init__(sqs_resource=sqs_resource) + super().__init__(sqs_resource=sqs_resource, + **kwargs) + self._device_cache: Dict[str, SQSInputDevice] = {} self._max_messages_per_request = max_messages_per_request self._included_message_attributes = included_message_attributes - def get_input_device(self, name: str) -> SQSInputDevice: + def _create_input_device(self, name: str) -> SQSInputDevice: """ Returns an incoming device by name diff --git a/messageflux/iodevices/sqs/sqs_manager_base.py b/messageflux/iodevices/sqs/sqs_manager_base.py index 823985b..3793514 100644 --- a/messageflux/iodevices/sqs/sqs_manager_base.py +++ b/messageflux/iodevices/sqs/sqs_manager_base.py @@ -17,10 +17,12 @@ class SQSManagerBase: """ def __init__(self, *, - sqs_resource: Optional['SQSServiceResource'] = None) -> None: + sqs_resource: Optional['SQSServiceResource'] = None, **kwargs) -> None: """ :param sqs_resource: the boto sqs service resource. Defaults to creating from env vars """ + super().__init__(**kwargs) + if sqs_resource is None: sqs_resource = boto3.resource('sqs') diff --git a/messageflux/iodevices/sqs/sqs_output_device.py b/messageflux/iodevices/sqs/sqs_output_device.py index 4a74d66..ef51f4f 100644 --- a/messageflux/iodevices/sqs/sqs_output_device.py +++ b/messageflux/iodevices/sqs/sqs_output_device.py @@ -27,7 +27,9 @@ def __init__(self, device_manager: "SQSOutputDeviceManager", queue_name: str): :param device_manager: the SQS device Manager that holds this device :param queue_name: the name of the queue """ - super(SQSOutputDevice, self).__init__(device_manager, queue_name) + super().__init__(manager=device_manager, + name=queue_name) + self._sqs_queue = self.manager.get_queue(queue_name) self._message_group_id = get_random_id() @@ -57,14 +59,15 @@ class SQSOutputDeviceManager(SQSManagerBase, OutputDeviceManager[SQSOutputDevice this manager is used to create SQS devices """ - def __init__(self, sqs_resource: Optional['SQSServiceResource'] = None) -> None: + def __init__(self, sqs_resource: Optional['SQSServiceResource'] = None, **kwargs) -> None: """ :param sqs_resource: the boto sqs service resource. Defaults to creating from env vars """ - super().__init__(sqs_resource=sqs_resource) + super().__init__(sqs_resource=sqs_resource, + **kwargs) self._device_cache: Dict[str, SQSOutputDevice] = {} - def get_output_device(self, name: str) -> SQSOutputDevice: + def _create_output_device(self, name: str) -> SQSOutputDevice: """ Returns and outgoing device by name diff --git a/messageflux/iodevices/transformer_device_wrapper/transformer_input_device.py b/messageflux/iodevices/transformer_device_wrapper/transformer_input_device.py index f5306fb..deed9ea 100644 --- a/messageflux/iodevices/transformer_device_wrapper/transformer_input_device.py +++ b/messageflux/iodevices/transformer_device_wrapper/transformer_input_device.py @@ -53,7 +53,9 @@ def __init__(self, :param inner_device: the inner device that it wraps :param transformer: the transformer to use to transform the incoming messages """ - super(TransformerInputDevice, self).__init__(manager, name) + super().__init__(manager=manager, + name=name) + self._transformer = transformer self._inner_device = inner_device @@ -73,6 +75,7 @@ def close(self): """ closes the inner device """ + super().close() self._inner_device.close() @@ -81,12 +84,13 @@ class TransformerInputDeviceManager(InputDeviceManager[TransformerInputDevice]): a wrapper input device manager, that wraps the devices in transformer input devices """ - def __init__(self, inner_device_manager: InputDeviceManager, transformer: InputTransformerBase): + def __init__(self, inner_device_manager: InputDeviceManager, transformer: InputTransformerBase, **kwargs): """ :param inner_device_manager: the inner device manager :param transformer: the input transformer to use """ + super().__init__(**kwargs) self._inner_device_manager = inner_device_manager self._transformer = transformer @@ -104,7 +108,7 @@ def disconnect(self): self._inner_device_manager.disconnect() self._transformer.disconnect() - def get_input_device(self, name: str) -> TransformerInputDevice: + def _create_input_device(self, name: str) -> TransformerInputDevice: """ returns a wrapped input device diff --git a/messageflux/iodevices/transformer_device_wrapper/transformer_output_device.py b/messageflux/iodevices/transformer_device_wrapper/transformer_output_device.py index 736ab6b..f48211c 100644 --- a/messageflux/iodevices/transformer_device_wrapper/transformer_output_device.py +++ b/messageflux/iodevices/transformer_device_wrapper/transformer_output_device.py @@ -52,7 +52,7 @@ def __init__(self, :param inner_device: the inner device that this device wraps :param transformer: the output transformer to use """ - super().__init__(manager, name) + super().__init__(manager=manager, name=name) self._transformer = transformer self._inner_device = inner_device @@ -64,6 +64,8 @@ def close(self): """ closes the inner device """ + super().close() + self._inner_device.close() @@ -72,12 +74,14 @@ class TransformerOutputDeviceManager(OutputDeviceManager[TransformerOutputDevice a wrapper output device manager, that wraps the devices in transformer output devices """ - def __init__(self, inner_device_manager: OutputDeviceManager, transformer: OutputTransformerBase): + def __init__(self, inner_device_manager: OutputDeviceManager, transformer: OutputTransformerBase, **kwargs): """ :param inner_device_manager: the inner device manager :param transformer: the transformer to use """ + super().__init__(**kwargs) + self._inner_device_manager = inner_device_manager self._transformer = transformer @@ -95,7 +99,7 @@ def disconnect(self): self._inner_device_manager.disconnect() self._transformer.disconnect() - def get_output_device(self, name: str) -> TransformerOutputDevice: + def _create_output_device(self, name: str) -> TransformerOutputDevice: """ returns a wrapped output device :param name: the name of the device to get diff --git a/messageflux/logging/bulk_rotating_device_handler.py b/messageflux/logging/bulk_rotating_device_handler.py index efbfedd..e8fc804 100644 --- a/messageflux/logging/bulk_rotating_device_handler.py +++ b/messageflux/logging/bulk_rotating_device_handler.py @@ -50,11 +50,11 @@ def __init__(self, self._wait_on_queue_timeout = max(wait_on_queue_timeout, 0.1) self._send_to_device_thread: Optional[Thread] = None - super(BulkRotatingDeviceHandler, self).__init__(live_log_path=live_log_path, - bkp_log_path=bkp_log_path, - max_records=max_records, - max_time=max_time, - live_log_prefix=live_log_prefix) + super().__init__(live_log_path=live_log_path, + bkp_log_path=bkp_log_path, + max_records=max_records, + max_time=max_time, + live_log_prefix=live_log_prefix) def _do_send_to_device_thread(self): if self._queue is None: @@ -89,6 +89,6 @@ def close(self): closes the handler (disconnects from the output device) """ try: - super(BulkRotatingDeviceHandler, self).close() + super().close() finally: self._output_device_manager.disconnect() diff --git a/messageflux/logging/bulk_rotating_file_handler.py b/messageflux/logging/bulk_rotating_file_handler.py index 762ca24..6a43605 100644 --- a/messageflux/logging/bulk_rotating_file_handler.py +++ b/messageflux/logging/bulk_rotating_file_handler.py @@ -32,11 +32,11 @@ def __init__(self, self._rotated_log_path = os.path.abspath(rotated_log_path) os.makedirs(self._rotated_log_path, exist_ok=True) - super(BulkRotatingFileHandler, self).__init__(live_log_path=live_log_path, - bkp_log_path=bkp_log_path, - max_records=max_records, - max_time=max_time, - live_log_prefix=live_log_prefix) + super().__init__(live_log_path=live_log_path, + bkp_log_path=bkp_log_path, + max_records=max_records, + max_time=max_time, + live_log_prefix=live_log_prefix) def _move_log_to_destination(self, src_file: str): """ diff --git a/messageflux/logging/bulk_rotating_handler_base.py b/messageflux/logging/bulk_rotating_handler_base.py index f76ff53..5e53966 100644 --- a/messageflux/logging/bulk_rotating_handler_base.py +++ b/messageflux/logging/bulk_rotating_handler_base.py @@ -157,7 +157,7 @@ def _ensure_logger_rotate_thread(self): def emit(self, record): self._ensure_logger_rotate_thread() - super(BulkRotatingHandlerBase, self).emit(record) + super().emit(record) self._record_count += 1 def _move_log_file_to_bkp_dir(self) -> str: @@ -212,6 +212,6 @@ def close(self): """ stops the logger thread and flushes """ - super(BulkRotatingHandlerBase, self).close() + super().close() self._run = False self.doRollover() diff --git a/messageflux/message_handling_service.py b/messageflux/message_handling_service.py index 05fe810..49448be 100644 --- a/messageflux/message_handling_service.py +++ b/messageflux/message_handling_service.py @@ -151,7 +151,7 @@ def _prepare_service(self): def _finalize_service(self, exception: Optional[Exception] = None): self._message_handler.shutdown() - super()._finalize_service(exception) + super()._finalize_service(exception=exception) def _handle_message_batch(self, batch: List[Tuple[InputDevice, ReadResult]]): self._message_handler.handle_message_batch(batch) @@ -223,4 +223,5 @@ def __init__(self, *, message_handler: MessageHandlerBase, **kwargs): """ :param **kwargs: passed to parent as is """ - super().__init__(batch_handler=self._BatchMessageHandlerAdapter(message_handler), **kwargs) + super().__init__(batch_handler=self._BatchMessageHandlerAdapter(message_handler), + **kwargs) diff --git a/messageflux/multiprocessing/multiprocessrunner.py b/messageflux/multiprocessing/multiprocessrunner.py index 7b2e5bb..a692910 100644 --- a/messageflux/multiprocessing/multiprocessrunner.py +++ b/messageflux/multiprocessing/multiprocessrunner.py @@ -82,7 +82,7 @@ def _on_handler_exit(self, handler: SingleProcessHandler): self._run_service_instance(handler.instance_index) def _finalize_service(self, exception: Optional[Exception] = None): - super()._finalize_service(exception) + super()._finalize_service(exception=exception) for handler in self._process_handlers: handler.stop() time.sleep(self._shutdown_timeout) diff --git a/messageflux/pipeline_service.py b/messageflux/pipeline_service.py index b594b15..fa9274a 100644 --- a/messageflux/pipeline_service.py +++ b/messageflux/pipeline_service.py @@ -132,7 +132,7 @@ def _handle_message_batch(self, batch: List[Tuple[InputDevice, ReadResult]]): def _finalize_service(self, exception: Optional[Exception] = None): try: - super()._finalize_service(exception) + super()._finalize_service(exception=exception) finally: if self._output_device_manager is not None: self._output_device_manager.disconnect() diff --git a/messageflux/utils/__init__.py b/messageflux/utils/__init__.py index 7e87bdd..8739b8f 100644 --- a/messageflux/utils/__init__.py +++ b/messageflux/utils/__init__.py @@ -1,10 +1,9 @@ import datetime import os import threading -from typing import Collection, TypeVar, List, Iterator, Generic, Callable, Optional, Any, Union - from itertools import cycle, islice from time import perf_counter +from typing import Collection, TypeVar, List, Iterator, Generic, Callable, Optional, Any, Union EllipsisType = type(...) @@ -26,7 +25,7 @@ class AggregatedException(KwargsException): """ def __init__(self, *args, inner_exceptions: Optional[List[Exception]] = None, **kwargs): - super(AggregatedException, self).__init__(*args, **kwargs) + super().__init__(*args, **kwargs) self.inner_exceptions = inner_exceptions diff --git a/tests/devices/mocks.py b/tests/devices/mocks.py index b5d6425..df7f227 100644 --- a/tests/devices/mocks.py +++ b/tests/devices/mocks.py @@ -42,8 +42,12 @@ def _send_message(self, message_bundle: MessageBundle): class MockErrorDeviceManager(InputDeviceManager, OutputDeviceManager): - def get_input_device(self, name): + + def __init__(self, **kwargs): + super().__init__(**kwargs) + + def _create_input_device(self, name): return MockErrorInputDevice(name) - def get_output_device(self, name): + def _create_output_device(self, name): return MockErrorOutputDevice(name) diff --git a/tests/devices/roundrobin_test.py b/tests/devices/roundrobin_test.py index 456b222..cdcbc61 100644 --- a/tests/devices/roundrobin_test.py +++ b/tests/devices/roundrobin_test.py @@ -31,11 +31,13 @@ def _read_message(self, class ErrorIODeviceManager(InputDeviceManager, OutputDeviceManager): + def __init__(self, **kwargs): + super().__init__(**kwargs) - def get_input_device(self, name): + def _create_input_device(self, name): return ErrorIODevice(None, name) - def get_output_device(self, name): + def _create_output_device(self, name): return ErrorIODevice(None, name) diff --git a/tests/devices/short_circuit_device_test.py b/tests/devices/short_circuit_device_test.py index 8bf22c5..112c3c7 100644 --- a/tests/devices/short_circuit_device_test.py +++ b/tests/devices/short_circuit_device_test.py @@ -1,8 +1,8 @@ import threading +from time import sleep from typing import Optional import pytest -from time import sleep from messageflux.iodevices.base import (InputDeviceManager, OutputDeviceManager, @@ -46,10 +46,13 @@ def _send_message(self, message_bundle: MessageBundle): class ErrorDeviceManager(InputDeviceManager, OutputDeviceManager): - def get_input_device(self, name: str) -> InputDevice: + def __init__(self, **kwargs): + super().__init__(**kwargs) + + def _create_input_device(self, name: str) -> InputDevice: return ErrorInputDevice() - def get_output_device(self, name: str) -> OutputDevice: + def _create_output_device(self, name: str) -> OutputDevice: return ErrorOutputDevice() diff --git a/tests/message_handling_service_test.py b/tests/message_handling_service_test.py index a3f30cb..231f91b 100644 --- a/tests/message_handling_service_test.py +++ b/tests/message_handling_service_test.py @@ -37,10 +37,11 @@ def _read_message(self, cancellation_token: threading.Event, class MockInputDeviceManager(InputDeviceManager[MockInputDevice]): - def __init__(self, input_list): + def __init__(self, input_list, **kwargs): + super().__init__(**kwargs) self.input_list = input_list - def get_input_device(self, name): + def _create_input_device(self, name): return MockInputDevice(self, name, self.input_list)