@@ -35,6 +35,7 @@ def close(self) -> None:
3535
3636T = TypeVar ("T" )
3737
38+
3839class EventStream (SupportsRead [Iterator [dict [str , Any ]]], SupportsClose ):
3940 """
4041 Allows you to stream events
@@ -65,17 +66,20 @@ def _enforce_ratelimit(self, *, n: int) -> None:
6566 pass
6667
6768 @abstractmethod
68- def set_var (self , variable : str , value : T ) -> None :
69+ def set_var (self , variable : str , value : T , * , max_retries : int ) -> None :
6970 """
7071 Sets a cloud variable.
7172
7273 Args:
7374 variable (str): The name of the cloud variable that should be set (provided without the cloud emoji)
7475 value (Any): The value the cloud variable should be set to
76+
77+ Kwargs:
78+ max_retries (int) : Maximum number of times to retry setting the var if setting fails before raising an exception
7579 """
7680
7781 @abstractmethod
78- def set_vars (self , var_value_dict : dict [str , T ], * , intelligent_waits : bool = True ):
82+ def set_vars (self , var_value_dict : dict [str , T ], * , intelligent_waits : bool = True , max_retries : int ):
7983 """
8084 Sets multiple cloud variables at once (works for an unlimited amount of variables).
8185
@@ -84,6 +88,7 @@ def set_vars(self, var_value_dict: dict[str, T], *, intelligent_waits: bool = Tr
8488
8589 Kwargs:
8690 intelligent_waits (boolean): When enabled, the method will automatically decide how long to wait before performing this cloud variable set, to make sure no rate limits are triggered
91+ max_retries (int) : Maximum number of times to retry setting the var if setting fails before raising an exception
8792 """
8893
8994 @abstractmethod
@@ -129,10 +134,10 @@ def create_event_stream(self) -> EventStream:
129134 def _enforce_ratelimit (self , * , n : int ) -> None :
130135 pass
131136
132- def set_var (self , variable : str , value : T ) -> None :
137+ def set_var (self , variable : str , value : T , * , max_retries : int ) -> None :
133138 pass
134139
135- def set_vars (self , var_value_dict : dict [str , T ], * , intelligent_waits : bool = True ):
140+ def set_vars (self , var_value_dict : dict [str , T ], * , intelligent_waits : bool = True , max_retries : int ):
136141 pass
137142
138143 def get_var (self , var , * , recorder_initial_values = {}) -> Any :
@@ -229,6 +234,9 @@ class BaseCloud(AnyCloud[Union[str, int]]):
229234
230235 print_connect_messages: Whether to print a message on every connect to the cloud server. Defaults to False.
231236 """
237+
238+ _PACKET_FAILURE_SLEEPDURATIONS = (0.1 , 0.2 , 1.5 )
239+
232240 project_id : Optional [Union [str , int ]]
233241 cloud_host : str
234242 ws_shortterm_ratelimit : float
@@ -278,57 +286,26 @@ def _assert_auth(self):
278286 raise exceptions .Unauthenticated (
279287 "You need to use session.connect_cloud (NOT get_cloud) in order to perform this operation." )
280288
281- def _send_packet (self , packet ):
289+ def _send_recursive (self , data : str , * , current_depth : int , max_depth = 0 ):
282290 try :
283- self .websocket .send (json . dumps ( packet ) + " \n " )
291+ self .websocket .send (data )
284292 except Exception :
285- time .sleep (0.1 )
286- self .connect ()
287- time .sleep (0.1 )
288- try :
289- self .websocket .send (json .dumps (packet ) + "\n " )
290- except Exception :
291- time .sleep (0.2 )
293+ if current_depth < max_depth :
294+ sleep_duration = self ._PACKET_FAILURE_SLEEPDURATIONS [min (current_depth , len (self ._PACKET_FAILURE_SLEEPDURATIONS )- 1 )]
295+ time .sleep (sleep_duration )
292296 self .connect ()
293- time .sleep (0.2 )
294- try :
295- self .websocket .send (json .dumps (packet ) + "\n " )
296- except Exception :
297- time .sleep (1.6 )
298- self .connect ()
299- time .sleep (1.4 )
300- try :
301- self .websocket .send (json .dumps (packet ) + "\n " )
302- except Exception :
303- self .active_connection = False
304- raise exceptions .CloudConnectionError (f"Sending packet failed three times in a row: { packet } " )
305-
306- def _send_packet_list (self , packet_list ):
297+ time .sleep (sleep_duration )
298+ self ._send_recursive (data , current_depth = current_depth + 1 , max_depth = max_depth )
299+ else :
300+ self .active_connection = False
301+ raise exceptions .CloudConnectionError (f"Sending packet failed { max_depth + 1 } tries: { data } " )
302+
303+ def _send_packet (self , packet , * , max_retries = 2 ):
304+ self ._send_recursive (json .dumps (packet ) + "\n " , max_depth = max_retries )
305+
306+ def _send_packet_list (self , packet_list , * , max_retries = 2 ):
307307 packet_string = "" .join ([json .dumps (packet ) + "\n " for packet in packet_list ])
308- try :
309- self .websocket .send (packet_string )
310- except Exception :
311- time .sleep (0.1 )
312- self .connect ()
313- time .sleep (0.1 )
314- try :
315- self .websocket .send (packet_string )
316- except Exception :
317- time .sleep (0.2 )
318- self .connect ()
319- time .sleep (0.2 )
320- try :
321- self .websocket .send (packet_string )
322- except Exception :
323- time .sleep (1.6 )
324- self .connect ()
325- time .sleep (1.4 )
326- try :
327- self .websocket .send (packet_string )
328- except Exception :
329- self .active_connection = False
330- raise exceptions .CloudConnectionError (
331- f"Sending packet list failed four times in a row: { packet_list } " )
308+ self ._send_recursive (packet_string , max_depth = max_retries )
332309
333310 def _handshake (self ):
334311 packet = {"method" : "handshake" , "user" : self .username , "project_id" : self .project_id }
@@ -389,13 +366,16 @@ def _enforce_ratelimit(self, *, n):
389366 if sleep_time > 0 :
390367 time .sleep (sleep_time )
391368
392- def set_var (self , variable , value ):
369+ def set_var (self , variable , value , * , max_retries : int = 2 ):
393370 """
394371 Sets a cloud variable.
395372
396373 Args:
397374 variable (str): The name of the cloud variable that should be set (provided without the cloud emoji)
398375 value (str): The value the cloud variable should be set to
376+
377+ Kwargs:
378+ max_retries (int) : Maximum number of times to retry setting the var if setting fails before raising an exception
399379 """
400380 self ._assert_valid_value (value )
401381 if not isinstance (variable , str ):
@@ -414,10 +394,10 @@ def set_var(self, variable, value):
414394 "user" : self .username ,
415395 "project_id" : self .project_id ,
416396 }
417- self ._send_packet (packet )
397+ self ._send_packet (packet , max_retries = max_retries )
418398 self .last_var_set = time .time ()
419399
420- def set_vars (self , var_value_dict , * , intelligent_waits = True ):
400+ def set_vars (self , var_value_dict , * , intelligent_waits = True , max_retries : int = 2 ):
421401 """
422402 Sets multiple cloud variables at once (works for an unlimited amount of variables).
423403
@@ -426,13 +406,14 @@ def set_vars(self, var_value_dict, *, intelligent_waits=True):
426406
427407 Kwargs:
428408 intelligent_waits (boolean): When enabled, the method will automatically decide how long to wait before performing this cloud variable set, to make sure no rate limits are triggered
409+ max_retries (int) : Maximum number of times to retry setting the var if setting fails before raising an exception
429410 """
430411 if not self .active_connection :
431412 self .connect ()
432413 if intelligent_waits :
433- self ._enforce_ratelimit (n = len (list ( var_value_dict . keys ()) ))
414+ self ._enforce_ratelimit (n = len (var_value_dict ))
434415
435- self .var_sets_since_first += len (list ( var_value_dict . keys ()) )
416+ self .var_sets_since_first += len (var_value_dict )
436417
437418 packet_list = []
438419 for variable in var_value_dict :
@@ -449,7 +430,7 @@ def set_vars(self, var_value_dict, *, intelligent_waits=True):
449430 "project_id" : self .project_id ,
450431 }
451432 packet_list .append (packet )
452- self ._send_packet_list (packet_list )
433+ self ._send_packet_list (packet_list , max_retries = max_retries )
453434 self .last_var_set = time .time ()
454435
455436 def get_var (self , var , * , recorder_initial_values = {}):
0 commit comments