Skip to content

Commit

Permalink
Merge pull request #84 from Avivsalem/staging
Browse files Browse the repository at this point in the history
Staging
  • Loading branch information
Avivsalem authored Aug 10, 2023
2 parents 0bdaaf6 + 821fec3 commit 4f07aa3
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 11 deletions.
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.4.0
0.5.0
20 changes: 10 additions & 10 deletions messageflux/message_handling_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,14 +106,14 @@ class BatchMessageHandlerBase(metaclass=ABCMeta):
a batch message handler base class. used to handle a batch of messages
"""

def connect(self):
def prepare(self):
"""
called when the service starts.
can be overrided by child class to perform some initialization logic
"""
pass

def disconnect(self):
def shutdown(self):
"""
called when the service stops.
can be overrided by child class to perform some cleanup logic
Expand Down Expand Up @@ -147,10 +147,10 @@ def __init__(self, *,

def _prepare_service(self):
super()._prepare_service()
self._message_handler.connect()
self._message_handler.prepare()

def _finalize_service(self, exception: Optional[Exception] = None):
self._message_handler.disconnect()
self._message_handler.shutdown()
super()._finalize_service(exception)

def _handle_message_batch(self, batch: List[Tuple[InputDevice, ReadResult]]):
Expand All @@ -162,14 +162,14 @@ class MessageHandlerBase(metaclass=ABCMeta):
a message handler base class. used to handle a single message
"""

def connect(self):
def prepare(self):
"""
called when the service starts.
can be overrided by child class to perform some initialization logic
"""
pass

def disconnect(self):
def shutdown(self):
"""
called when the service stops.
can be overrided by child class to perform some cleanup logic
Expand All @@ -196,19 +196,19 @@ class _BatchMessageHandlerAdapter(BatchMessageHandlerBase):
def __init__(self, massage_handler: MessageHandlerBase):
self._message_handler = massage_handler

def connect(self):
def prepare(self):
"""
called when the service starts.
can be overrided by child class to perform some initialization logic
"""
self._message_handler.connect()
self._message_handler.prepare()

def disconnect(self):
def shutdown(self):
"""
called when the service stops.
can be overrided by child class to perform some cleanup logic
"""
self._message_handler.disconnect()
self._message_handler.shutdown()

def handle_message_batch(self, batch: List[Tuple[InputDevice, ReadResult]]):
"""
Expand Down
16 changes: 16 additions & 0 deletions messageflux/pipeline_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,20 @@ class PipelineHandlerBase(metaclass=ABCMeta):
a pipeline handler base class. used to handle a single message that passes through the pipeline
"""

def prepare(self):
"""
called when the service starts.
can be overrided by child class to perform some initialization logic
"""
pass

def shutdown(self):
"""
called when the service stops.
can be overrided by child class to perform some cleanup logic
"""
pass

@abstractmethod
def handle_message(self,
input_device: InputDevice,
Expand Down Expand Up @@ -97,6 +111,7 @@ def _prepare_service(self):
super()._prepare_service()
if self._output_device_manager is not None:
self._output_device_manager.connect()
self._pipeline_handler.prepare()

def _handle_message_batch(self, batch: List[Tuple[InputDevice, ReadResult]]):
for input_device, read_result in batch:
Expand All @@ -121,3 +136,4 @@ def _finalize_service(self, exception: Optional[Exception] = None):
finally:
if self._output_device_manager is not None:
self._output_device_manager.disconnect()
self._pipeline_handler.shutdown()

0 comments on commit 4f07aa3

Please sign in to comment.