diff --git a/kuksa-client/kuksa_client/grpc/__init__.py b/kuksa-client/kuksa_client/grpc/__init__.py index 3e2c248..240fd4f 100644 --- a/kuksa-client/kuksa_client/grpc/__init__.py +++ b/kuksa-client/kuksa_client/grpc/__init__.py @@ -624,6 +624,18 @@ def _prepare_set_request( req.updates.append(update.to_message()) logger.debug("%s: %s", type(req).__name__, req) return req + + def _prepare_streamed_set_request( + self, updates: Collection[EntryUpdate], paths_with_required_type: Dict[str, DataType], + ) -> val_pb2.StreamedUpdateRequest: + req = val_pb2.StreamedUpdateRequest(updates=[]) + for update in updates: + value_type = paths_with_required_type.get(update.entry.path) + if value_type is not None: + update.entry.value_type = value_type + req.updates.append(update.to_message()) + logger.debug("%s: %s", type(req).__name__, req) + return req def _process_set_response(self, response: val_pb2.SetResponse) -> None: logger.debug("%s: %s", type(response).__name__, response) diff --git a/kuksa-client/kuksa_client/grpc/aio.py b/kuksa-client/kuksa_client/grpc/aio.py index e51c2fc..6d98284 100644 --- a/kuksa-client/kuksa_client/grpc/aio.py +++ b/kuksa-client/kuksa_client/grpc/aio.py @@ -51,6 +51,13 @@ logger = logging.getLogger(__name__) +async def _process_streamed_update_response(stream_resp_iter): + try: + for response in stream_resp_iter: + logger.debug("Response %s", response) + except AioRpcError as exc: + raise VSSClientError.from_grpc_error(exc) from exc + class VSSClient(BaseVSSClient): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) @@ -72,10 +79,18 @@ def __init__(self, queue): self.queue = queue async def __anext__(self): - logging.info("Waiting on upddate queue") - upd = await self.queue.get() - logging.info("Update %s", upd) - return upd + logging.info("Waiting on update queue") + try: + upd = await self.queue.get() + logging.info("Update %s", upd) + return upd + except asyncio.CancelledError: + logging.info("Async iterator was cancelled") + raise StopAsyncIteration + except Exception as e: + logging.error("An error occurred: %s", e) + raise + def __aiter__(self): return self @@ -102,17 +117,7 @@ async def connect(self, target_host=None): logger.info("ASYNCHEY") if self.streaming is True: logger.info("Will use streaming mode for set") - stream_resp_iter = self.client_stub.StreamedUpdate(self.StreamedUpdateGenerator(self.queue)) - logger.info("rtkrtkrtiotnj") - - try: - async for response in stream_resp_iter: - print("Response %s", response) - except AioRpcError as exc: - raise VSSClientError.from_grpc_error(exc) from exc - # if isinstance(response, val_pb2.StreamedUpdateResponse): - # if response.error.code > 0: - # logger.error("Streaming error: %s", response.error.code) + self.client_stub.StreamedUpdate(self.StreamedUpdateGenerator(self.queue)) if self.ensure_startup_connection: logger.debug("Connected to server: %s", await self.get_server_info()) @@ -354,12 +359,13 @@ async def set(self, updates: Collection[EntryUpdate], **rpc_kwargs) -> None: path for path, data_type in paths_with_required_type.items() if data_type is DataType.UNSPECIFIED ] paths_with_required_type.update(await self.get_value_types(paths_without_type, **rpc_kwargs)) - req = self._prepare_set_request(updates, paths_with_required_type) if self.streaming is True: logger.info(" Stream set request") + req = self._prepare_streamed_set_request(updates, paths_with_required_type) await self.queue.put(req) else: + req = self._prepare_set_request(updates, paths_with_required_type) try: resp = await self.client_stub.Set(req, **rpc_kwargs) except AioRpcError as exc: