Skip to content
This repository was archived by the owner on Nov 14, 2022. It is now read-only.

Commit b0a6ee1

Browse files
author
Dimitar Tasev
authored
Merge pull request #1257 from ISISScientificComputing/1256_manage_db_connection_lifetime
Manage DB connection lifetime during Message processing & show FQDN in logs instead of hostname
2 parents 1b2aae3 + f228dd1 commit b0a6ee1

File tree

5 files changed

+59
-11
lines changed

5 files changed

+59
-11
lines changed

queue_processors/queue_processor/handle_message.py

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
"""
1515
import logging
1616
import socket
17+
from contextlib import contextmanager
1718
from typing import Optional
1819
from django.db import transaction
1920
from django.utils import timezone
@@ -42,9 +43,35 @@ def __init__(self, queue_listener):
4243

4344
self._logger = logging.getLogger("handle_queue_message")
4445

46+
self.database = None
47+
self.data_model = None
48+
49+
def connect(self):
50+
"""
51+
Starts a connection to the database
52+
"""
4553
self.database = db_access.start_database()
4654
self.data_model = self.database.data_model
4755

56+
def disconnect(self):
57+
"""
58+
Disconnects from the database
59+
"""
60+
self.database.disconnect()
61+
self.database = None
62+
self.data_model = None
63+
64+
@contextmanager
65+
def connected(self):
66+
"""
67+
Context manager for the connection state to the DB
68+
"""
69+
self.connect()
70+
try:
71+
yield
72+
finally:
73+
self.disconnect()
74+
4875
def data_ready(self, message: Message):
4976
"""
5077
Called when destination queue was data_ready.
@@ -250,7 +277,7 @@ def _common_reduction_run_update(reduction_run, status, message):
250277
reduction_run.finished = timezone.now()
251278
reduction_run.message = message.message
252279
reduction_run.reduction_log = message.reduction_log
253-
message.admin_log += "Running on host: %s" % socket.gethostname()
280+
message.admin_log += "Running on host: %s" % socket.getfqdn()
254281
reduction_run.admin_log = message.admin_log
255282

256283
@staticmethod

queue_processors/queue_processor/queue_listener.py

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -75,14 +75,15 @@ def on_message(self, headers, message):
7575
# has to submit an acknowledgement for receiving the message
7676
# (otherwise I think that it is not removed from the queue but I am not sure about that)
7777
self.client.ack(headers["message-id"], headers["subscription"])
78-
try:
79-
if destination == '/queue/DataReady':
80-
self.message_handler.data_ready(message)
81-
else:
82-
self.logger.error("Received a message on an unknown topic '%s'", destination)
83-
except Exception as exp: # pylint:disable=broad-except
84-
self.logger.error("Unhandled exception encountered: %s %s\n\n%s",
85-
type(exp).__name__, exp, traceback.format_exc())
78+
with self.message_handler.connected():
79+
try:
80+
if destination == '/queue/DataReady':
81+
self.message_handler.data_ready(message)
82+
else:
83+
self.logger.error("Received a message on an unknown topic '%s'", destination)
84+
except Exception as exp: # pylint:disable=broad-except
85+
self.logger.error("Unhandled exception encountered: %s %s\n\n%s",
86+
type(exp).__name__, exp, traceback.format_exc())
8687

8788

8889
def setup_connection(consumer_name) -> Tuple[QueueClient, QueueListener]:

queue_processors/queue_processor/tests/test_handle_message.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ def setUp(self):
7676
})
7777
with patch("logging.getLogger") as patched_logger:
7878
self.handler = HandleMessage(self.mocked_client)
79+
self.handler.connect()
7980
self.mocked_logger = patched_logger.return_value
8081

8182
db_handle = model.database.access.start_database()
@@ -91,6 +92,8 @@ def setUp(self):
9192
self.reduction_run.save()
9293

9394
def tearDown(self) -> None:
95+
if self.handler.database is not None:
96+
self.handler.disconnect()
9497
self.experiment.delete()
9598
self.instrument.delete()
9699
self.reduction_run.delete()
@@ -392,6 +395,21 @@ def test_create_run_records_valid_rb_number(self):
392395
reduction_run, _, _ = self.handler.create_run_records(self.msg)
393396
assert reduction_run.experiment.reference_number == self.msg.rb_number
394397

398+
def test_connected(self):
399+
"""
400+
Test the connected context manager properly starts/clears the DB connection
401+
"""
402+
# Disconnect first to remove state from TestCase.setUp
403+
self.handler.disconnect()
404+
405+
with self.handler.connected():
406+
assert self.handler.database is not None
407+
assert self.handler.data_model is not None
408+
409+
# at the end of the context there should be a disconnect
410+
assert self.handler.database is None
411+
assert self.handler.data_model is None
412+
395413

396414
if __name__ == '__main__':
397415
main()

queue_processors/queue_processor/tests/test_queue_listener.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ class TestQueueListener(TestCase):
5555
"""
5656
def setUp(self):
5757
self.mocked_client = mock.Mock(spec=QueueClient)
58-
self.mocked_handler = mock.Mock(spec=HandleMessage)
58+
self.mocked_handler = mock.MagicMock(spec=HandleMessage)
5959
self.headers = self._get_header()
6060

6161
with patch("queue_processors.queue_processor.queue_listener"
@@ -96,6 +96,7 @@ def test_on_message_can_receive_a_prepopulated_message(self):
9696
self.mocked_logger.info.assert_called_once()
9797
self.mocked_client.ack.assert_called_once_with(self.headers["message-id"], self.headers["subscription"])
9898
self.mocked_handler.data_ready.assert_called_once()
99+
self.mocked_handler.connected.assert_called_once()
99100
self.assertIsInstance(self.mocked_handler.data_ready.call_args[0][0], Message)
100101

101102
def test_on_message_sends_acknowledgement(self):
@@ -106,6 +107,7 @@ def test_on_message_sends_acknowledgement(self):
106107
self.mocked_logger.info.assert_called_once()
107108
self.mocked_client.ack.assert_called_once_with(self.headers["message-id"], self.headers["subscription"])
108109
self.mocked_handler.data_ready.assert_called_once()
110+
self.mocked_handler.connected.assert_called_once()
109111
self.assertIsInstance(self.mocked_handler.data_ready.call_args[0][0], Message)
110112

111113
def test_on_message_handler_catches_exceptions(self):

utils/clients/queue_client.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ def subscribe_queues(self, queue_list, consumer_name, listener):
9494
for queue in queue_list:
9595
# prefetchSize limits the processing to 1 message at a time
9696
self._connection.subscribe(destination=queue,
97-
id=socket.gethostname(),
97+
id=socket.getfqdn(),
9898
ack="client-individual",
9999
header={'activemq.prefetchSize': '1'})
100100
self._logger.info("[%s] Subscribing to %s", consumer_name, queue)

0 commit comments

Comments
 (0)