Skip to content

Commit 2adc8e1

Browse files
authored
[EventHubs] fix message type (Azure#28236)
* fix message type * update EventData.message to hold uamqp Message for uamqp transport * update dev reqs * lint
1 parent 557a2ce commit 2adc8e1

File tree

6 files changed

+37
-11
lines changed

6 files changed

+37
-11
lines changed

sdk/eventhub/azure-eventhub/azure/eventhub/_common.py

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -54,17 +54,17 @@
5454
AmqpMessageProperties,
5555
)
5656
from ._pyamqp._message_backcompat import LegacyMessage, LegacyBatchMessage
57-
from ._pyamqp.message import Message
57+
from ._pyamqp.message import Message as pyamqp_Message
5858
from ._transport._pyamqp_transport import PyamqpTransport
5959

6060
if TYPE_CHECKING:
6161
try:
6262
from uamqp import ( # pylint: disable=unused-import
63-
Message as uamqp_Message,
63+
Message, # not importing as uamqp_Message, b/c type is exposed to user
6464
BatchMessage,
6565
)
6666
except ImportError:
67-
uamqp_Message = None
67+
Message = None
6868
BatchMessage = None
6969
from ._transport._base import AmqpTransport
7070

@@ -134,8 +134,8 @@ def __init__(
134134
self._raw_amqp_message = AmqpAnnotatedMessage( # type: ignore
135135
data_body=body, annotations={}, application_properties={}
136136
)
137-
self._uamqp_message: Optional[Union[LegacyMessage, uamqp_Message]] = None
138-
self._message: Message = None # type: ignore
137+
self._uamqp_message: Optional[Union[LegacyMessage, "Message"]] = None
138+
self._message: Union["Message", pyamqp_Message] = None # type: ignore
139139
self._raw_amqp_message.header = AmqpMessageHeader()
140140
self._raw_amqp_message.properties = AmqpMessageProperties()
141141
self.message_id = None
@@ -226,7 +226,7 @@ def from_message_content( # pylint: disable=unused-argument
226226
@classmethod
227227
def _from_message(
228228
cls,
229-
message: Union[uamqp_Message, Message],
229+
message: Union["Message", pyamqp_Message],
230230
raw_amqp_message: Optional[AmqpAnnotatedMessage] = None,
231231
) -> EventData:
232232
# pylint:disable=protected-access
@@ -260,7 +260,7 @@ def _decode_non_data_body_as_str(self, encoding: str = "UTF-8") -> str:
260260
return str(decode_with_recurse(seq_list, encoding))
261261

262262
@property
263-
def message(self) -> LegacyMessage:
263+
def message(self) -> Union["Message", LegacyMessage]:
264264
"""DEPRECATED: Get the underlying LegacyMessage.
265265
This is deprecated and will be removed in a later release.
266266
@@ -278,7 +278,7 @@ def message(self) -> LegacyMessage:
278278
return self._uamqp_message
279279

280280
@message.setter
281-
def message(self, value: "uamqp_Message") -> None:
281+
def message(self, value: "Message") -> None:
282282
"""DEPRECATED: Set the underlying Message.
283283
This is deprecated and will be removed in a later release.
284284
"""
@@ -632,7 +632,7 @@ def message(self) -> Union["BatchMessage", LegacyBatchMessage]:
632632
DeprecationWarning,
633633
)
634634
if not self._uamqp_message:
635-
message = AmqpAnnotatedMessage(message=Message(*self._message))
635+
message = AmqpAnnotatedMessage(message=pyamqp_Message(*self._message))
636636
self._uamqp_message = LegacyBatchMessage(
637637
message,
638638
to_outgoing_amqp_message=PyamqpTransport().to_outgoing_amqp_message,

sdk/eventhub/azure-eventhub/azure/eventhub/_consumer.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
from ._common import EventData
1414
from ._client_base import ConsumerProducerMixin
1515
from ._utils import create_properties, event_position_selector
16+
from ._transport._pyamqp_transport import PyamqpTransport
1617
from ._constants import (
1718
EPOCH_SYMBOL,
1819
TIMEOUT_SYMBOL,
@@ -182,6 +183,8 @@ def _next_message_in_buffer(self):
182183
# pylint:disable=protected-access
183184
message = self._message_buffer.popleft()
184185
event_data = EventData._from_message(message)
186+
if self._amqp_transport != PyamqpTransport:
187+
event_data._uamqp_message == message # pylint: disable=pointless-statement
185188
self._last_received_event = event_data
186189
return event_data
187190

sdk/eventhub/azure-eventhub/azure/eventhub/aio/_consumer_async.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -177,10 +177,12 @@ def _create_handler(
177177
async def _open_with_retry(self) -> None:
178178
await self._do_retryable_operation(self._open, operation_need_param=False)
179179

180+
# only used by _uamqp_transport_async
180181
def _next_message_in_buffer(self):
181182
# pylint:disable=protected-access
182183
message = self._message_buffer.popleft()
183184
event_data = EventData._from_message(message)
185+
event_data._uamqp_message = message
184186
self._last_received_event = event_data
185187
return event_data
186188

sdk/eventhub/azure-eventhub/dev_requirements.txt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
-e ../../identity/azure-identity
44
-e ../azure-mgmt-eventhub
55
azure-mgmt-resource==20.0.0
6-
aiohttp>=3.0
7-
websocket-client
6+
aiohttp>=3.0,<4.0
7+
websocket-client==1.4.2
88
-e ../../../tools/azure-devtools
99
uamqp>=1.6.3,<2.0.0

sdk/eventhub/azure-eventhub/tests/livetest/asynctests/test_receive_async.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,15 @@
88
import pytest
99
import time
1010

11+
try:
12+
import uamqp
13+
except (ModuleNotFoundError, ImportError):
14+
uamqp = None
15+
1116
from azure.eventhub import EventData, TransportType
1217
from azure.eventhub.exceptions import EventHubError
1318
from azure.eventhub.aio import EventHubProducerClient, EventHubConsumerClient
19+
from azure.eventhub._pyamqp._message_backcompat import LegacyMessage
1420

1521

1622
@pytest.mark.liveTest
@@ -27,6 +33,10 @@ async def on_event(partition_context, event):
2733
assert ", sequence_number: " in event_str
2834
assert ", enqueued_time: " in event_str
2935
assert ", partition_key: 0" in event_str
36+
if uamqp_transport:
37+
assert isinstance(event.message, uamqp.Message)
38+
else:
39+
assert isinstance(event.message, LegacyMessage)
3040

3141
on_event.called = False
3242
connection_str, senders = connstr_senders

sdk/eventhub/azure-eventhub/tests/livetest/synctests/test_receive.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,14 @@
88
import pytest
99
import time
1010

11+
try:
12+
import uamqp
13+
except (ModuleNotFoundError, ImportError):
14+
uamqp = None
15+
1116
from azure.eventhub import EventData, TransportType, EventHubConsumerClient
1217
from azure.eventhub.exceptions import EventHubError
18+
from azure.eventhub._pyamqp._message_backcompat import LegacyMessage
1319

1420

1521
@pytest.mark.liveTest
@@ -25,6 +31,11 @@ def on_event(partition_context, event):
2531
assert ", sequence_number: " in event_str
2632
assert ", enqueued_time: " in event_str
2733
assert ", partition_key: 0" in event_str
34+
if uamqp_transport:
35+
assert isinstance(event.message, uamqp.Message)
36+
else:
37+
assert isinstance(event.message, LegacyMessage)
38+
2839
on_event.called = False
2940
connection_str, senders = connstr_senders
3041
client = EventHubConsumerClient.from_connection_string(

0 commit comments

Comments
 (0)