diff --git a/kuksa-client/kuksa_client/grpc/__init__.py b/kuksa-client/kuksa_client/grpc/__init__.py index 84c5ede..9ae1adc 100644 --- a/kuksa-client/kuksa_client/grpc/__init__.py +++ b/kuksa-client/kuksa_client/grpc/__init__.py @@ -1070,7 +1070,7 @@ def set_current_values(self, updates: Dict[str, Datapoint], **rpc_kwargs) -> Non EntryUpdate(DataEntry(path, value=dp), (Field.VALUE,)) for path, dp in updates.items() ], - v1=False, + try_v2=True, **rpc_kwargs, ) @@ -1142,7 +1142,7 @@ def subscribe_current_values( SubscribeEntry(path, View.CURRENT_VALUE, (Field.VALUE,)) for path in paths ), - v1=False, + try_v2=True, **rpc_kwargs, ): yield {update.entry.path: update.entry.value for update in updates} @@ -1220,7 +1220,7 @@ def get(self, entries: Iterable[EntryRequest], **rpc_kwargs) -> List[DataEntry]: @check_connected def set( - self, updates: Collection[EntryUpdate], v1: bool = True, **rpc_kwargs + self, updates: Collection[EntryUpdate], try_v2: bool = True, **rpc_kwargs ) -> None: """ Parameters: @@ -1239,15 +1239,8 @@ def set( paths_with_required_type.update( self.get_value_types(paths_without_type, **rpc_kwargs) ) - if v1: - req = self._prepare_set_request(updates, paths_with_required_type) - try: - resp = self.client_stub_v1.Set(req, **rpc_kwargs) - except RpcError as exc: - raise VSSClientError.from_grpc_error(exc) from exc - self._process_set_response(resp) - else: - logger.info("Using v2") + if try_v2: + logger.debug("Trying v2") if len(updates) == 0: raise VSSClientError( error={ @@ -1265,13 +1258,22 @@ def set( resp = self.client_stub_v2.PublishValueRequest(req, **rpc_kwargs) except RpcError as exc: if exc.code() == grpc.StatusCode.UNIMPLEMENTED: + logger.debug("v2 not available fall back to v1 instead") self.set(updates) else: raise VSSClientError.from_grpc_error(exc) from exc + else: + logger.debug("Trying v1") + req = self._prepare_set_request(updates, paths_with_required_type) + try: + resp = self.client_stub_v1.Set(req, **rpc_kwargs) + except RpcError as exc: + raise VSSClientError.from_grpc_error(exc) from exc + self._process_set_response(resp) @check_connected def subscribe( - self, entries: Iterable[SubscribeEntry], v1: bool = True, **rpc_kwargs + self, entries: Iterable[SubscribeEntry], try_v2: bool = True, **rpc_kwargs ) -> Iterator[List[EntryUpdate]]: """ Parameters: @@ -1282,17 +1284,8 @@ def subscribe( rpc_kwargs["metadata"] = self.generate_metadata_header( rpc_kwargs.get("metadata") ) - if v1: - req = self._prepare_subscribe_request(entries) - resp_stream = self.client_stub_v1.Subscribe(req, **rpc_kwargs) - try: - for resp in resp_stream: - logger.debug("%s: %s", type(resp).__name__, resp) - yield [EntryUpdate.from_message(update) for update in resp.updates] - except RpcError as exc: - raise VSSClientError.from_grpc_error(exc) from exc - else: - logger.info("Using v2") + if try_v2: + logger.debug("Trying v2") req = self._prepare_subscribev2_request(entries) resp_stream = self.client_stub_v2.Subscribe(req, **rpc_kwargs) try: @@ -1304,9 +1297,20 @@ def subscribe( ] except RpcError as exc: if exc.code() == grpc.StatusCode.UNIMPLEMENTED: + logger.debug("v2 not available fall back to v1 instead") self.subscribe(entries) else: raise VSSClientError.from_grpc_error(exc) from exc + else: + logger.debug("Trying v1") + req = self._prepare_subscribe_request(entries) + resp_stream = self.client_stub_v1.Subscribe(req, **rpc_kwargs) + try: + for resp in resp_stream: + logger.debug("%s: %s", type(resp).__name__, resp) + yield [EntryUpdate.from_message(update) for update in resp.updates] + except RpcError as exc: + raise VSSClientError.from_grpc_error(exc) from exc @check_connected def authorize(self, token: str, **rpc_kwargs) -> str: diff --git a/kuksa-client/kuksa_client/grpc/aio.py b/kuksa-client/kuksa_client/grpc/aio.py index 0a17af3..2571b2c 100644 --- a/kuksa-client/kuksa_client/grpc/aio.py +++ b/kuksa-client/kuksa_client/grpc/aio.py @@ -229,7 +229,7 @@ async def set_current_values( EntryUpdate(DataEntry(path, value=dp), (Field.VALUE,)) for path, dp in updates.items() ], - v1=False, + try_v2=True, **rpc_kwargs, ) @@ -302,7 +302,7 @@ async def subscribe_current_values( SubscribeEntry(path, View.CURRENT_VALUE, (Field.VALUE,)) for path in paths ), - v1=False, + try_v2=True, **rpc_kwargs, ): yield {update.entry.path: update.entry.value for update in updates} @@ -327,7 +327,7 @@ async def subscribe_target_values( SubscribeEntry(path, View.TARGET_VALUE, (Field.ACTUATOR_TARGET,)) for path in paths ), - v1=False, + try_v2=True, **rpc_kwargs, ): yield { @@ -383,7 +383,7 @@ async def get( @check_connected_async async def set( - self, updates: Collection[EntryUpdate], v1: bool = True, **rpc_kwargs + self, updates: Collection[EntryUpdate], try_v2: bool = False, **rpc_kwargs ) -> None: """ Parameters: @@ -402,15 +402,8 @@ async def set( paths_with_required_type.update( await self.get_value_types(paths_without_type, **rpc_kwargs) ) - if v1 is True: - req = self._prepare_set_request(updates, paths_with_required_type) - try: - resp = await self.client_stub_v1.Set(req, **rpc_kwargs) - except AioRpcError as exc: - raise VSSClientError.from_grpc_error(exc) from exc - self._process_set_response(resp) - else: - logger.info("Using v2") + if try_v2: + logger.debug("Trying v2") if len(updates) == 0: raise VSSClientError( error={ @@ -428,15 +421,24 @@ async def set( resp = await self.client_stub_v2.PublishValue(req, **rpc_kwargs) except AioRpcError as exc: if exc.code() == grpc.StatusCode.UNIMPLEMENTED: + logger.debug("v2 not available fall back to v1 instead") await self.set(updates) else: raise VSSClientError.from_grpc_error(exc) from exc + else: + logger.debug("Trying v1") + req = self._prepare_set_request(updates, paths_with_required_type) + try: + resp = await self.client_stub_v1.Set(req, **rpc_kwargs) + except AioRpcError as exc: + raise VSSClientError.from_grpc_error(exc) from exc + self._process_set_response(resp) @check_connected_async_iter async def subscribe( self, entries: Iterable[SubscribeEntry], - v1: bool = True, + try_v2: bool = False, **rpc_kwargs, ) -> AsyncIterator[List[EntryUpdate]]: """ @@ -447,18 +449,8 @@ async def subscribe( rpc_kwargs["metadata"] = self.generate_metadata_header( rpc_kwargs.get("metadata") ) - if v1: - logger.info("Using v1") - req = self._prepare_subscribe_request(entries) - resp_stream = self.client_stub_v1.Subscribe(req, **rpc_kwargs) - try: - async for resp in resp_stream: - logger.debug("%s: %s", type(resp).__name__, resp) - yield [EntryUpdate.from_message(update) for update in resp.updates] - except AioRpcError as exc: - raise VSSClientError.from_grpc_error(exc) from exc - else: - logger.info("Using v2") + if try_v2: + logger.debug("Trying v2") req = self._prepare_subscribev2_request(entries) resp_stream = self.client_stub_v2.Subscribe(req, **rpc_kwargs) try: @@ -470,9 +462,20 @@ async def subscribe( ] except AioRpcError as exc: if exc.code() == grpc.StatusCode.UNIMPLEMENTED: + logger.debug("v2 not available fall back to v1 instead") await self.subscribe(entries) else: raise VSSClientError.from_grpc_error(exc) from exc + else: + logger.debug("Trying v1") + req = self._prepare_subscribe_request(entries) + resp_stream = self.client_stub_v1.Subscribe(req, **rpc_kwargs) + try: + async for resp in resp_stream: + logger.debug("%s: %s", type(resp).__name__, resp) + yield [EntryUpdate.from_message(update) for update in resp.updates] + except AioRpcError as exc: + raise VSSClientError.from_grpc_error(exc) from exc @check_connected_async async def authorize(self, token: str, **rpc_kwargs) -> str: