22# Copyright (c) Microsoft Corporation. All rights reserved.
33# Licensed under the MIT License. See License.txt in the project root for license information.
44# --------------------------------------------------------------------------------------------
5- from typing import Any , Union , TYPE_CHECKING
5+ from typing import Any , Union , Optional , TYPE_CHECKING
66import logging
77from weakref import WeakSet
88
1515)
1616from ._servicebus_sender import ServiceBusSender
1717from ._servicebus_receiver import ServiceBusReceiver
18+ from ._common .auto_lock_renewer import AutoLockRenewer
1819from ._common ._configuration import Configuration
1920from ._common .utils import (
2021 create_authentication ,
2122 generate_dead_letter_entity_name ,
2223 strip_protocol_from_uri ,
2324)
24- from ._common .constants import ServiceBusSubQueue
25+ from ._common .constants import (
26+ ServiceBusSubQueue ,
27+ ServiceBusReceiveMode ,
28+ )
2529
2630if TYPE_CHECKING :
27- from azure .core .credentials import TokenCredential , AzureSasCredential , AzureNamedKeyCredential
31+ from azure .core .credentials import (
32+ TokenCredential ,
33+ AzureSasCredential ,
34+ AzureNamedKeyCredential ,
35+ )
36+
2837
2938_LOGGER = logging .getLogger (__name__ )
3039
@@ -75,15 +84,32 @@ class ServiceBusClient(object):
7584
7685 """
7786
78- def __init__ (self , fully_qualified_namespace , credential , ** kwargs ):
79- # type: (str, Union[TokenCredential, AzureSasCredential, AzureNamedKeyCredential], Any) -> None
87+ def __init__ (
88+ self ,
89+ fully_qualified_namespace : str ,
90+ credential : Union [
91+ "TokenCredential" , "AzureSasCredential" , "AzureNamedKeyCredential"
92+ ],
93+ * ,
94+ retry_total : int = 3 ,
95+ retry_backoff_factor : float = 0.8 ,
96+ retry_backoff_max : int = 120 ,
97+ retry_mode : str = "exponential" ,
98+ ** kwargs : Any
99+ ) -> None :
80100 # If the user provided http:// or sb://, let's be polite and strip that.
81101 self .fully_qualified_namespace = strip_protocol_from_uri (
82102 fully_qualified_namespace .strip ()
83103 )
84104
85105 self ._credential = credential
86- self ._config = Configuration (** kwargs )
106+ self ._config = Configuration (
107+ retry_total = retry_total ,
108+ retry_backoff_factor = retry_backoff_factor ,
109+ retry_backoff_max = retry_backoff_max ,
110+ retry_mode = retry_mode ,
111+ ** kwargs
112+ )
87113 self ._connection = None
88114 # Optional entity name, can be the name of Queue or Topic. Intentionally not advertised, typically be needed.
89115 self ._entity_name = kwargs .get ("entity_name" )
@@ -134,8 +160,16 @@ def close(self):
134160 self ._connection .destroy ()
135161
136162 @classmethod
137- def from_connection_string (cls , conn_str , ** kwargs ):
138- # type: (str, Any) -> ServiceBusClient
163+ def from_connection_string (
164+ cls ,
165+ conn_str : str ,
166+ * ,
167+ retry_total : int = 3 ,
168+ retry_backoff_factor : float = 0.8 ,
169+ retry_backoff_max : int = 120 ,
170+ retry_mode : str = "exponential" ,
171+ ** kwargs : Any
172+ ) -> "ServiceBusClient" :
139173 """
140174 Create a ServiceBusClient from a connection string.
141175
@@ -181,6 +215,10 @@ def from_connection_string(cls, conn_str, **kwargs):
181215 fully_qualified_namespace = host ,
182216 entity_name = entity_in_conn_str or kwargs .pop ("entity_name" , None ),
183217 credential = credential , # type: ignore
218+ retry_total = retry_total ,
219+ retry_backoff_factor = retry_backoff_factor ,
220+ retry_backoff_max = retry_backoff_max ,
221+ retry_mode = retry_mode ,
184222 ** kwargs
185223 )
186224
@@ -227,8 +265,20 @@ def get_queue_sender(self, queue_name, **kwargs):
227265 self ._handlers .add (handler )
228266 return handler
229267
230- def get_queue_receiver (self , queue_name , ** kwargs ):
231- # type: (str, Any) -> ServiceBusReceiver
268+ def get_queue_receiver (
269+ self ,
270+ queue_name : str ,
271+ * ,
272+ session_id : Optional [str ] = None ,
273+ sub_queue : Optional [Union [ServiceBusSubQueue , str ]] = None ,
274+ receive_mode : Union [
275+ ServiceBusReceiveMode , str
276+ ] = ServiceBusReceiveMode .PEEK_LOCK ,
277+ max_wait_time : Optional [float ] = None ,
278+ auto_lock_renewer : Optional [AutoLockRenewer ] = None ,
279+ prefetch_count : int = 0 ,
280+ ** kwargs : Any
281+ ) -> ServiceBusReceiver :
232282 """Get ServiceBusReceiver for the specific queue.
233283
234284 :param str queue_name: The path of specific Service Bus Queue the client connects to.
@@ -280,8 +330,7 @@ def get_queue_receiver(self, queue_name, **kwargs):
280330 "the connection string used to construct the ServiceBusClient."
281331 )
282332
283- sub_queue = kwargs .get ("sub_queue" , None )
284- if sub_queue and kwargs .get ("session_id" ):
333+ if sub_queue and session_id :
285334 raise ValueError (
286335 "session_id and sub_queue can not be specified simultaneously. "
287336 "To connect to the sub queue of a sessionful queue, "
@@ -314,6 +363,12 @@ def get_queue_receiver(self, queue_name, **kwargs):
314363 retry_total = self ._config .retry_total ,
315364 retry_backoff_factor = self ._config .retry_backoff_factor ,
316365 retry_backoff_max = self ._config .retry_backoff_max ,
366+ session_id = session_id ,
367+ sub_queue = sub_queue ,
368+ receive_mode = receive_mode ,
369+ max_wait_time = max_wait_time ,
370+ auto_lock_renewer = auto_lock_renewer ,
371+ prefetch_count = prefetch_count ,
317372 ** kwargs
318373 )
319374 self ._handlers .add (handler )
@@ -361,8 +416,21 @@ def get_topic_sender(self, topic_name, **kwargs):
361416 self ._handlers .add (handler )
362417 return handler
363418
364- def get_subscription_receiver (self , topic_name , subscription_name , ** kwargs ):
365- # type: (str, str, Any) -> ServiceBusReceiver
419+ def get_subscription_receiver (
420+ self ,
421+ topic_name : str ,
422+ subscription_name : str ,
423+ * ,
424+ session_id : Optional [str ] = None ,
425+ sub_queue : Optional [Union [ServiceBusSubQueue , str ]] = None ,
426+ receive_mode : Union [
427+ ServiceBusReceiveMode , str
428+ ] = ServiceBusReceiveMode .PEEK_LOCK ,
429+ max_wait_time : Optional [float ] = None ,
430+ auto_lock_renewer : Optional [AutoLockRenewer ] = None ,
431+ prefetch_count : int = 0 ,
432+ ** kwargs : Any
433+ ) -> ServiceBusReceiver :
366434 """Get ServiceBusReceiver for the specific subscription under the topic.
367435
368436 :param str topic_name: The name of specific Service Bus Topic the client connects to.
@@ -417,8 +485,7 @@ def get_subscription_receiver(self, topic_name, subscription_name, **kwargs):
417485 "the connection string used to construct the ServiceBusClient."
418486 )
419487
420- sub_queue = kwargs .get ("sub_queue" , None )
421- if sub_queue and kwargs .get ("session_id" ):
488+ if sub_queue and session_id :
422489 raise ValueError (
423490 "session_id and sub_queue can not be specified simultaneously. "
424491 "To connect to the sub queue of a sessionful subscription, "
@@ -446,6 +513,12 @@ def get_subscription_receiver(self, topic_name, subscription_name, **kwargs):
446513 retry_total = self ._config .retry_total ,
447514 retry_backoff_factor = self ._config .retry_backoff_factor ,
448515 retry_backoff_max = self ._config .retry_backoff_max ,
516+ session_id = session_id ,
517+ sub_queue = sub_queue ,
518+ receive_mode = receive_mode ,
519+ max_wait_time = max_wait_time ,
520+ auto_lock_renewer = auto_lock_renewer ,
521+ prefetch_count = prefetch_count ,
449522 ** kwargs
450523 )
451524 except ValueError :
@@ -467,6 +540,12 @@ def get_subscription_receiver(self, topic_name, subscription_name, **kwargs):
467540 retry_total = self ._config .retry_total ,
468541 retry_backoff_factor = self ._config .retry_backoff_factor ,
469542 retry_backoff_max = self ._config .retry_backoff_max ,
543+ session_id = session_id ,
544+ sub_queue = sub_queue ,
545+ receive_mode = receive_mode ,
546+ max_wait_time = max_wait_time ,
547+ auto_lock_renewer = auto_lock_renewer ,
548+ prefetch_count = prefetch_count ,
470549 ** kwargs
471550 )
472551 self ._handlers .add (handler )
0 commit comments