Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit ef4721c

Browse files
committedJun 26, 2023
client_async: Allow throwing an exception upon socket error during
wakeup When wakeup() is called, we sometime notice that we get an endless prints: "Unable to send to wakeup socket!". Those prints are spamming the logs. This commit aims to address it by allowing restating the application via an intentional exception raise. This behavior is configurable and its default is backward compatible. Signed-off-by: shimon-armis <[email protected]>
1 parent 7ac6c6e commit ef4721c

File tree

1 file changed

+9
-2
lines changed

1 file changed

+9
-2
lines changed
 

‎kafka/client_async.py

+9-2
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,8 @@ class KafkaClient(object):
154154
sasl mechanism handshake. Default: one of bootstrap servers
155155
sasl_oauth_token_provider (AbstractTokenProvider): OAuthBearer token provider
156156
instance. (See kafka.oauth.abstract). Default: None
157+
raise_upon_socket_err_during_wakeup (bool): If set to True, raise an exception
158+
upon socket error during wakeup(). Default: False
157159
"""
158160

159161
DEFAULT_CONFIG = {
@@ -192,7 +194,8 @@ class KafkaClient(object):
192194
'sasl_plain_password': None,
193195
'sasl_kerberos_service_name': 'kafka',
194196
'sasl_kerberos_domain_name': None,
195-
'sasl_oauth_token_provider': None
197+
'sasl_oauth_token_provider': None,
198+
'raise_upon_socket_err_during_wakeup': False
196199
}
197200

198201
def __init__(self, **configs):
@@ -243,6 +246,8 @@ def __init__(self, **configs):
243246
check_timeout = self.config['api_version_auto_timeout_ms'] / 1000
244247
self.config['api_version'] = self.check_version(timeout=check_timeout)
245248

249+
self._raise_upon_socket_err_during_wakeup = self.config['raise_upon_socket_err_during_wakeup']
250+
246251
def _can_bootstrap(self):
247252
effective_failures = self._bootstrap_fails // self._num_bootstrap_hosts
248253
backoff_factor = 2 ** effective_failures
@@ -933,8 +938,10 @@ def wakeup(self):
933938
except socket.timeout:
934939
log.warning('Timeout to send to wakeup socket!')
935940
raise Errors.KafkaTimeoutError()
936-
except socket.error:
941+
except socket.error as e:
937942
log.warning('Unable to send to wakeup socket!')
943+
if self._raise_upon_socket_err_during_wakeup:
944+
raise e
938945

939946
def _clear_wake_fd(self):
940947
# reading from wake socket should only happen in a single thread

0 commit comments

Comments
 (0)
Please sign in to comment.