Skip to content

Commit

Permalink
Added dynamic subscription mgmt to NotificationReceiver
Browse files Browse the repository at this point in the history
Details:

* In the NotificationReceiver class, added support for managing subscriptions
  for topics dynamically with new methods 'subscribe()', 'unsubscribe()',
  'is_subscribed()' and 'get_subscription()'.

Signed-off-by: Andreas Maier <[email protected]>
  • Loading branch information
andy-maier committed Jul 14, 2023
1 parent 31cc4e1 commit 73dba59
Show file tree
Hide file tree
Showing 4 changed files with 290 additions and 10 deletions.
4 changes: 4 additions & 0 deletions docs/changes.rst
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,10 @@ Released: not yet
classes, described the optimized lookup behavior for auto-updated managers
and optimized access via the name-to-URI cache.

* In the NotificationReceiver class, added support for managing subscriptions
for topics dynamically with new methods 'subscribe()', 'unsubscribe()',
'is_subscribed()' and 'get_subscription()'.

**Cleanup:**

**Known issues:**
Expand Down
146 changes: 146 additions & 0 deletions tests/unit/zhmcclient/test_notification.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,10 @@
import threading
from mock import patch
import six
import pytest

from zhmcclient._notification import NotificationReceiver
from zhmcclient._exceptions import SubscriptionNotFound


class MockedStompConnection(object):
Expand Down Expand Up @@ -74,6 +76,14 @@ def subscribe(self, destination, id, ack):
assert self._state_connected
self._subscriptions.append((destination, id, ack))

def unsubscribe(self, id):
# pylint: disable=redefined-builtin
"""Mocks the same-named method of stomp.Connection."""
assert self._state_connected
for _dest, _id, _ack in self._subscriptions:
if _id == id:
self._subscriptions.remove((_dest, _id, _ack))

def disconnect(self):
"""Mocks the same-named method of stomp.Connection."""
assert self._state_connected
Expand Down Expand Up @@ -105,6 +115,14 @@ def mock_sender_run(self):
self._listener.on_message(headers, message_str)
self._listener.on_disconnected()

def mock_get_subscription(self, topic):
"""Find the subscription with the specified topic name and return it"""
for _dest, _id, _ack in self._subscriptions:
dest = '/topic/' + topic
if _dest == dest:
return (_dest, _id, _ack)
return None


def receiver_run(receiver, msg_items):
"""
Expand Down Expand Up @@ -244,3 +262,131 @@ def test_one_message(self):
msg0 = msg_items[0]
assert msg0[0] == self.std_headers
assert msg0[1] == message_obj


class TestNotificationSubscriptionMgmt(object):
"""
Test class for subscription management.
"""

def setup_method(self):
"""
Setup that is called by pytest before each test method.
"""
# pylint: disable=attribute-defined-outside-init

self.topics = ('fake-topic1', 'fake-topic2')
self.hmc = 'fake-hmc'
self.userid = 'fake-userid'
self.password = 'fake-password'
self.std_headers = {
'notification-type': 'fake-type'
}

@patch(target='stomp.Connection', new=MockedStompConnection)
def test_is_subscribed(self):
"""Test function for is_subscribed() method."""

receiver = NotificationReceiver(self.topics, self.hmc, self.userid,
self.password)
conn = receiver._conn # pylint: disable=protected-access

# pylint: disable=no-member
assert conn.mock_get_subscription('fake-topic1')
# pylint: disable=no-member
assert conn.mock_get_subscription('fake-topic2')

result = receiver.is_subscribed('fake-topic1')
assert result is True

result = receiver.is_subscribed('foo')
assert result is False

@patch(target='stomp.Connection', new=MockedStompConnection)
def test_get_subscription(self):
"""Test function for get_subscription() method."""

receiver = NotificationReceiver(self.topics, self.hmc, self.userid,
self.password)
conn = receiver._conn # pylint: disable=protected-access

id_value1 = receiver.get_subscription('fake-topic1')
id_value2 = receiver.get_subscription('fake-topic2')
assert id_value1 != id_value2

# Check that the subscriptions are still in place
# pylint: disable=no-member
assert conn.mock_get_subscription('fake-topic1')
# pylint: disable=no-member
assert conn.mock_get_subscription('fake-topic2')

@patch(target='stomp.Connection', new=MockedStompConnection)
def test_get_subscription_nonexisting(self):
"""Test function for get_subscription() method for a non-existing
subscription.."""

receiver = NotificationReceiver(self.topics, self.hmc, self.userid,
self.password)
conn = receiver._conn # pylint: disable=protected-access

with pytest.raises(SubscriptionNotFound):
receiver.get_subscription('bla')

# Check that the subscriptions are still in place
# pylint: disable=no-member
assert conn.mock_get_subscription('fake-topic1')
# pylint: disable=no-member
assert conn.mock_get_subscription('fake-topic2')

@patch(target='stomp.Connection', new=MockedStompConnection)
def test_subscribe(self):
"""Test function for subscribe() method."""

receiver = NotificationReceiver(self.topics, self.hmc, self.userid,
self.password)
conn = receiver._conn # pylint: disable=protected-access

receiver.subscribe('foo')

# pylint: disable=no-member
assert conn.mock_get_subscription('foo')

# Check that the subscriptions are still in place
# pylint: disable=no-member
assert conn.mock_get_subscription('fake-topic1')
# pylint: disable=no-member
assert conn.mock_get_subscription('fake-topic2')

@patch(target='stomp.Connection', new=MockedStompConnection)
def test_unsubscribe(self):
"""Test function for unsubscribe() method."""

receiver = NotificationReceiver(self.topics, self.hmc, self.userid,
self.password)
conn = receiver._conn # pylint: disable=protected-access

receiver.unsubscribe('fake-topic1')

# pylint: disable=no-member
assert not conn.mock_get_subscription('fake-topic1')

# Check that the subscriptions are still in place
# pylint: disable=no-member
assert conn.mock_get_subscription('fake-topic2')

@patch(target='stomp.Connection', new=MockedStompConnection)
def test_unsubscribe_nonexisting(self):
"""Test function for unsubscribe() for a non-existing subscription."""

receiver = NotificationReceiver(self.topics, self.hmc, self.userid,
self.password)
conn = receiver._conn # pylint: disable=protected-access

with pytest.raises(SubscriptionNotFound):
receiver.unsubscribe('bla')

# Check that the subscriptions are still in place
# pylint: disable=no-member
assert conn.mock_get_subscription('fake-topic1')
# pylint: disable=no-member
assert conn.mock_get_subscription('fake-topic2')
36 changes: 34 additions & 2 deletions zhmcclient/_exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@
'ServerAuthError', 'ParseError', 'VersionError', 'HTTPError',
'OperationTimeout', 'StatusTimeout', 'NoUniqueMatch', 'NotFound',
'MetricsResourceNotFound', 'NotificationError',
'NotificationJMSError', 'NotificationParseError', 'ConsistencyError',
'CeasedExistence']
'NotificationJMSError', 'NotificationParseError',
'SubscriptionNotFound', 'ConsistencyError', 'CeasedExistence']


class Error(Exception):
Expand Down Expand Up @@ -1372,6 +1372,38 @@ def str_def(self):
format(self.__class__.__name__, self.args[0])


class SubscriptionNotFound(NotificationError):
"""
This exception indicates that a subscripton for a topic was not found.
Derived from :exc:`~zhmcclient.NotificationError`.
"""

def __init__(self, msg):
"""
Parameters:
msg (:term:`string`):
A human readable message describing the problem.
``args[0]`` will be set to the ``msg`` parameter.
"""
# pylint: disable=useless-super-delegation
super(SubscriptionNotFound, self).__init__(msg)

def str_def(self):
"""
:term:`string`: The exception as a string in a Python definition-style
format, e.g. for parsing by scripts:
.. code-block:: text
classname={}; message={}
"""
return "classname={!r}; message={!r};". \
format(self.__class__.__name__, self.args[0])


class ConsistencyError(Error):
# pylint: disable=abstract-method
"""
Expand Down
114 changes: 106 additions & 8 deletions zhmcclient/_notification.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,14 @@
$ zhmc partition start {cpc-name} {partition-name}
"""

import os
import threading
import json

from ._logging import logged_api_call
from ._constants import DEFAULT_STOMP_PORT
from ._exceptions import NotificationJMSError, NotificationParseError
from ._exceptions import NotificationJMSError, NotificationParseError, \
SubscriptionNotFound

__all__ = ['NotificationReceiver']

Expand Down Expand Up @@ -131,14 +133,23 @@ def __init__(self, topic_names, host, userid, password,
self._userid = userid
self._password = password

# Subscription ID numbers that are in use.
# Each subscription for a topic gets its own unique ID.
# - key: topic name
# - value: Subscription ID number
self._sub_ids = {}

# Next subscription ID number to be used.
# After allocating a subscription ID number, this number is increased.
# It is never decreased again.
self._next_sub_id = 1

# Process PID, used to ensure uniqueness of subscription ID
self._process_pid = os.getpid()

# Wait timeout to honor keyboard interrupts after this time:
self._wait_timeout = 10.0 # seconds

# Subscription ID. We use some value that allows to identify on the
# HMC that this is the zhmcclient, but otherwise we are not using
# this value ourselves.
self._sub_id = 'zhmcclient.%s' % id(self)

# Sync variables for thread-safe handover between listener thread and
# receiver thread:
self._handover_dict = {}
Expand All @@ -157,8 +168,95 @@ def __init__(self, topic_names, host, userid, password,
self._conn.connect(self._userid, self._password, wait=True)

for topic_name in self._topic_names:
dest = "/topic/" + topic_name
self._conn.subscribe(destination=dest, id=self._sub_id, ack='auto')
self.subscribe(topic_name)

def _id_value(self, sub_id):
"""
Create the subscription ID from the subscription ID number.
"""
id_value = ('zhmcclient.{pid}.{conn_id}.{sub_id}'.
format(pid=self._process_pid, conn_id=id(self),
sub_id=sub_id))
return id_value

@logged_api_call
def subscribe(self, topic_name):
"""
Subscribe this notification receiver for a topic.
Parameters:
topic_name (:term:`string`): Name of the HMC notification topic.
Must not be `None`.
Returns:
string: Subscription ID
"""
dest = "/topic/" + topic_name
sub_id = self._next_sub_id
self._next_sub_id += 1
self._sub_ids[topic_name] = sub_id
id_value = self._id_value(sub_id)
self._conn.subscribe(destination=dest, id=id_value, ack='auto')
return id_value

@logged_api_call
def unsubscribe(self, topic_name):
"""
Unsubscribe this notification receiver from a topic.
If the topic is not currently subscribed for by this receiver,
SubscriptionNotFound is raised.
Parameters:
topic_name (:term:`string`): Name of the HMC notification topic.
Must not be `None`.
Raises:
SubscriptionNotFound: Topic is not currently subscribed for.
"""
try:
sub_id = self._sub_ids[topic_name]
except KeyError:
raise SubscriptionNotFound(
"Subscription topic {!r} is not currently subscribed for")
id_value = self._id_value(sub_id)
self._conn.unsubscribe(id=id_value)

@logged_api_call
def is_subscribed(self, topic_name):
"""
Return whether this notification receiver is currently subscribed for a
topic.
Parameters:
topic_name (:term:`string`): Name of the HMC notification topic.
Must not be `None`.
"""
return topic_name in self._sub_ids

@logged_api_call
def get_subscription(self, topic_name):
"""
Return the subscription ID for a topic this notification receiver is
subscribed for.
If the topic is not currently subscribed for by this receiver,
SubscriptionNotFound is raised.
Parameters:
topic_name (:term:`string`): Name of the HMC notification topic.
Must not be `None`.
"""
try:
sub_id = self._sub_ids[topic_name]
except KeyError:
raise SubscriptionNotFound(
"Subscription topic {!r} is not currently subscribed for")
return self._id_value(sub_id)

@logged_api_call
def notifications(self):
Expand Down

0 comments on commit 73dba59

Please sign in to comment.