Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix maintaining pipeline when using AMQP #2533

Open
wants to merge 1 commit into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
### Configuration

### Core
- Fix maintaining pipeline connection when using AMQP (PR# by Kamil Mankowski).
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
- Fix maintaining pipeline connection when using AMQP (PR# by Kamil Mankowski).
- AMQP: Fix maintaining pipeline connection when during interrupted connections (PR#2533 by Kamil Mankowski).


### Development

Expand Down
20 changes: 14 additions & 6 deletions intelmq/lib/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -511,6 +511,7 @@ def __init__(self, logger, pipeline_args: dict = None, load_balance=False, is_mu
if pika is None:
raise ValueError("To use AMQP you must install the 'pika' library.")
self.properties = pika.BasicProperties(delivery_mode=2) # message persistence
self.heartbeat = 10

def load_configurations(self, queues_type):
self.host = self.pipeline_args.get(f"{queues_type}_pipeline_host", "10.0.0.1")
Expand All @@ -533,9 +534,9 @@ def load_configurations(self, queues_type):
self.kwargs['ssl_options'] = pika.SSLOptions(context=ssl.create_default_context(ssl.Purpose.SERVER_AUTH))
pika_version = tuple(int(x) for x in pika.__version__.split('.'))
if pika_version < (0, 11):
self.kwargs['heartbeat_interval'] = 10
self.kwargs['heartbeat_interval'] = self.heartbeat
else:
self.kwargs['heartbeat'] = 10
self.kwargs['heartbeat'] = self.heartbeat
if pika_version < (1, ):
# https://groups.google.com/forum/#!topic/pika-python/gz7lZtPRq4Q
self.publish_raises_nack = False
Expand Down Expand Up @@ -607,7 +608,10 @@ def _send(self, destination_queue, message, reconnect=True):
mandatory=True,
)
except Exception as exc: # UnroutableError, NackError in 1.0.0
if reconnect and isinstance(exc, pika.exceptions.ConnectionClosed):
if reconnect and (
isinstance(exc, pika.exceptions.ConnectionClosed) or
isinstance(exc, pika.exceptions.StreamLostError)
):
Comment on lines +611 to +614
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if reconnect and (
isinstance(exc, pika.exceptions.ConnectionClosed) or
isinstance(exc, pika.exceptions.StreamLostError)
):
if reconnect and isinstance(exc, (pika.exceptions.ConnectionClosed, pika.exceptions.StreamLostError)):

self.logger.debug('Error sending the message. '
'Will re-connect and re-send.',
exc_info=True)
Expand Down Expand Up @@ -645,9 +649,13 @@ def _receive(self) -> bytes:
if self.source_queue is None:
raise exceptions.ConfigurationError('pipeline', 'No source queue given.')
try:
method, header, body = next(self.channel.consume(self.source_queue))
if method:
self.delivery_tag = method.delivery_tag
method, body = None, None
while not (method or body):
method, _, body = next(
self.channel.consume(self.source_queue, inactivity_timeout=self.heartbeat / 2)
)
Comment on lines +654 to +656
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please add an inline comment on why there's a need for a loop and why you chose heartbeat/2? That will make our lives easier when reading the code later on.

if method:
self.delivery_tag = method.delivery_tag
except Exception as exc:
raise exceptions.PipelineError(exc)
else:
Expand Down
Loading