Skip to content

Commit

Permalink
Fix minor things
Browse files Browse the repository at this point in the history
  • Loading branch information
lukasmittag committed Aug 27, 2024
1 parent ed32677 commit 8a0b494
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 16 deletions.
12 changes: 12 additions & 0 deletions kuksa-client/kuksa_client/grpc/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -624,6 +624,18 @@ def _prepare_set_request(
req.updates.append(update.to_message())
logger.debug("%s: %s", type(req).__name__, req)
return req

def _prepare_streamed_set_request(
self, updates: Collection[EntryUpdate], paths_with_required_type: Dict[str, DataType],
) -> val_pb2.StreamedUpdateRequest:
req = val_pb2.StreamedUpdateRequest(updates=[])
for update in updates:
value_type = paths_with_required_type.get(update.entry.path)
if value_type is not None:
update.entry.value_type = value_type
req.updates.append(update.to_message())
logger.debug("%s: %s", type(req).__name__, req)
return req

def _process_set_response(self, response: val_pb2.SetResponse) -> None:
logger.debug("%s: %s", type(response).__name__, response)
Expand Down
38 changes: 22 additions & 16 deletions kuksa-client/kuksa_client/grpc/aio.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,13 @@
logger = logging.getLogger(__name__)


async def _process_streamed_update_response(stream_resp_iter):
try:
for response in stream_resp_iter:
logger.debug("Response %s", response)
except AioRpcError as exc:
raise VSSClientError.from_grpc_error(exc) from exc

class VSSClient(BaseVSSClient):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
Expand All @@ -72,10 +79,18 @@ def __init__(self, queue):
self.queue = queue

async def __anext__(self):
logging.info("Waiting on upddate queue")
upd = await self.queue.get()
logging.info("Update %s", upd)
return upd
logging.info("Waiting on update queue")
try:
upd = await self.queue.get()
logging.info("Update %s", upd)
return upd
except asyncio.CancelledError:
logging.info("Async iterator was cancelled")
raise StopAsyncIteration
except Exception as e:
logging.error("An error occurred: %s", e)
raise


def __aiter__(self):
return self
Expand All @@ -102,17 +117,7 @@ async def connect(self, target_host=None):
logger.info("ASYNCHEY")
if self.streaming is True:
logger.info("Will use streaming mode for set")
stream_resp_iter = self.client_stub.StreamedUpdate(self.StreamedUpdateGenerator(self.queue))
logger.info("rtkrtkrtiotnj")

try:
async for response in stream_resp_iter:
print("Response %s", response)
except AioRpcError as exc:
raise VSSClientError.from_grpc_error(exc) from exc
# if isinstance(response, val_pb2.StreamedUpdateResponse):
# if response.error.code > 0:
# logger.error("Streaming error: %s", response.error.code)
self.client_stub.StreamedUpdate(self.StreamedUpdateGenerator(self.queue))

if self.ensure_startup_connection:
logger.debug("Connected to server: %s", await self.get_server_info())
Expand Down Expand Up @@ -354,12 +359,13 @@ async def set(self, updates: Collection[EntryUpdate], **rpc_kwargs) -> None:
path for path, data_type in paths_with_required_type.items() if data_type is DataType.UNSPECIFIED
]
paths_with_required_type.update(await self.get_value_types(paths_without_type, **rpc_kwargs))
req = self._prepare_set_request(updates, paths_with_required_type)

if self.streaming is True:
logger.info(" Stream set request")
req = self._prepare_streamed_set_request(updates, paths_with_required_type)
await self.queue.put(req)
else:
req = self._prepare_set_request(updates, paths_with_required_type)
try:
resp = await self.client_stub.Set(req, **rpc_kwargs)
except AioRpcError as exc:
Expand Down

0 comments on commit 8a0b494

Please sign in to comment.