Skip to content

Commit

Permalink
Rename version flag
Browse files Browse the repository at this point in the history
  • Loading branch information
lukasmittag committed Oct 4, 2024
1 parent 3563fc3 commit 0232a52
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 50 deletions.
52 changes: 28 additions & 24 deletions kuksa-client/kuksa_client/grpc/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1070,7 +1070,7 @@ def set_current_values(self, updates: Dict[str, Datapoint], **rpc_kwargs) -> Non
EntryUpdate(DataEntry(path, value=dp), (Field.VALUE,))
for path, dp in updates.items()
],
v1=False,
try_v2=True,
**rpc_kwargs,
)

Expand Down Expand Up @@ -1142,7 +1142,7 @@ def subscribe_current_values(
SubscribeEntry(path, View.CURRENT_VALUE, (Field.VALUE,))
for path in paths
),
v1=False,
try_v2=True,
**rpc_kwargs,
):
yield {update.entry.path: update.entry.value for update in updates}
Expand Down Expand Up @@ -1220,7 +1220,7 @@ def get(self, entries: Iterable[EntryRequest], **rpc_kwargs) -> List[DataEntry]:

@check_connected
def set(
self, updates: Collection[EntryUpdate], v1: bool = True, **rpc_kwargs
self, updates: Collection[EntryUpdate], try_v2: bool = True, **rpc_kwargs
) -> None:
"""
Parameters:
Expand All @@ -1239,15 +1239,8 @@ def set(
paths_with_required_type.update(
self.get_value_types(paths_without_type, **rpc_kwargs)
)
if v1:
req = self._prepare_set_request(updates, paths_with_required_type)
try:
resp = self.client_stub_v1.Set(req, **rpc_kwargs)
except RpcError as exc:
raise VSSClientError.from_grpc_error(exc) from exc
self._process_set_response(resp)
else:
logger.info("Using v2")
if try_v2:
logger.debug("Trying v2")
if len(updates) == 0:
raise VSSClientError(
error={
Expand All @@ -1265,13 +1258,22 @@ def set(
resp = self.client_stub_v2.PublishValueRequest(req, **rpc_kwargs)
except RpcError as exc:
if exc.code() == grpc.StatusCode.UNIMPLEMENTED:
logger.debug("v2 not available fall back to v1 instead")
self.set(updates)
else:
raise VSSClientError.from_grpc_error(exc) from exc
else:
logger.debug("Trying v1")
req = self._prepare_set_request(updates, paths_with_required_type)
try:
resp = self.client_stub_v1.Set(req, **rpc_kwargs)
except RpcError as exc:
raise VSSClientError.from_grpc_error(exc) from exc
self._process_set_response(resp)

@check_connected
def subscribe(
self, entries: Iterable[SubscribeEntry], v1: bool = True, **rpc_kwargs
self, entries: Iterable[SubscribeEntry], try_v2: bool = True, **rpc_kwargs
) -> Iterator[List[EntryUpdate]]:
"""
Parameters:
Expand All @@ -1282,17 +1284,8 @@ def subscribe(
rpc_kwargs["metadata"] = self.generate_metadata_header(
rpc_kwargs.get("metadata")
)
if v1:
req = self._prepare_subscribe_request(entries)
resp_stream = self.client_stub_v1.Subscribe(req, **rpc_kwargs)
try:
for resp in resp_stream:
logger.debug("%s: %s", type(resp).__name__, resp)
yield [EntryUpdate.from_message(update) for update in resp.updates]
except RpcError as exc:
raise VSSClientError.from_grpc_error(exc) from exc
else:
logger.info("Using v2")
if try_v2:
logger.debug("Trying v2")
req = self._prepare_subscribev2_request(entries)
resp_stream = self.client_stub_v2.Subscribe(req, **rpc_kwargs)
try:
Expand All @@ -1304,9 +1297,20 @@ def subscribe(
]
except RpcError as exc:
if exc.code() == grpc.StatusCode.UNIMPLEMENTED:
logger.debug("v2 not available fall back to v1 instead")
self.subscribe(entries)
else:
raise VSSClientError.from_grpc_error(exc) from exc
else:
logger.debug("Trying v1")
req = self._prepare_subscribe_request(entries)
resp_stream = self.client_stub_v1.Subscribe(req, **rpc_kwargs)
try:
for resp in resp_stream:
logger.debug("%s: %s", type(resp).__name__, resp)
yield [EntryUpdate.from_message(update) for update in resp.updates]
except RpcError as exc:
raise VSSClientError.from_grpc_error(exc) from exc

@check_connected
def authorize(self, token: str, **rpc_kwargs) -> str:
Expand Down
55 changes: 29 additions & 26 deletions kuksa-client/kuksa_client/grpc/aio.py
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ async def set_current_values(
EntryUpdate(DataEntry(path, value=dp), (Field.VALUE,))
for path, dp in updates.items()
],
v1=False,
try_v2=True,
**rpc_kwargs,
)

Expand Down Expand Up @@ -302,7 +302,7 @@ async def subscribe_current_values(
SubscribeEntry(path, View.CURRENT_VALUE, (Field.VALUE,))
for path in paths
),
v1=False,
try_v2=True,
**rpc_kwargs,
):
yield {update.entry.path: update.entry.value for update in updates}
Expand All @@ -327,7 +327,7 @@ async def subscribe_target_values(
SubscribeEntry(path, View.TARGET_VALUE, (Field.ACTUATOR_TARGET,))
for path in paths
),
v1=False,
try_v2=True,
**rpc_kwargs,
):
yield {
Expand Down Expand Up @@ -383,7 +383,7 @@ async def get(

@check_connected_async
async def set(
self, updates: Collection[EntryUpdate], v1: bool = True, **rpc_kwargs
self, updates: Collection[EntryUpdate], try_v2: bool = False, **rpc_kwargs
) -> None:
"""
Parameters:
Expand All @@ -402,15 +402,8 @@ async def set(
paths_with_required_type.update(
await self.get_value_types(paths_without_type, **rpc_kwargs)
)
if v1 is True:
req = self._prepare_set_request(updates, paths_with_required_type)
try:
resp = await self.client_stub_v1.Set(req, **rpc_kwargs)
except AioRpcError as exc:
raise VSSClientError.from_grpc_error(exc) from exc
self._process_set_response(resp)
else:
logger.info("Using v2")
if try_v2:
logger.debug("Trying v2")
if len(updates) == 0:
raise VSSClientError(
error={
Expand All @@ -428,15 +421,24 @@ async def set(
resp = await self.client_stub_v2.PublishValue(req, **rpc_kwargs)
except AioRpcError as exc:
if exc.code() == grpc.StatusCode.UNIMPLEMENTED:
logger.debug("v2 not available fall back to v1 instead")
await self.set(updates)
else:
raise VSSClientError.from_grpc_error(exc) from exc
else:
logger.debug("Trying v1")
req = self._prepare_set_request(updates, paths_with_required_type)
try:
resp = await self.client_stub_v1.Set(req, **rpc_kwargs)
except AioRpcError as exc:
raise VSSClientError.from_grpc_error(exc) from exc
self._process_set_response(resp)

@check_connected_async_iter
async def subscribe(
self,
entries: Iterable[SubscribeEntry],
v1: bool = True,
try_v2: bool = False,
**rpc_kwargs,
) -> AsyncIterator[List[EntryUpdate]]:
"""
Expand All @@ -447,18 +449,8 @@ async def subscribe(
rpc_kwargs["metadata"] = self.generate_metadata_header(
rpc_kwargs.get("metadata")
)
if v1:
logger.info("Using v1")
req = self._prepare_subscribe_request(entries)
resp_stream = self.client_stub_v1.Subscribe(req, **rpc_kwargs)
try:
async for resp in resp_stream:
logger.debug("%s: %s", type(resp).__name__, resp)
yield [EntryUpdate.from_message(update) for update in resp.updates]
except AioRpcError as exc:
raise VSSClientError.from_grpc_error(exc) from exc
else:
logger.info("Using v2")
if try_v2:
logger.debug("Trying v2")
req = self._prepare_subscribev2_request(entries)
resp_stream = self.client_stub_v2.Subscribe(req, **rpc_kwargs)
try:
Expand All @@ -470,9 +462,20 @@ async def subscribe(
]
except AioRpcError as exc:
if exc.code() == grpc.StatusCode.UNIMPLEMENTED:
logger.debug("v2 not available fall back to v1 instead")
await self.subscribe(entries)
else:
raise VSSClientError.from_grpc_error(exc) from exc
else:
logger.debug("Trying v1")
req = self._prepare_subscribe_request(entries)
resp_stream = self.client_stub_v1.Subscribe(req, **rpc_kwargs)
try:
async for resp in resp_stream:
logger.debug("%s: %s", type(resp).__name__, resp)
yield [EntryUpdate.from_message(update) for update in resp.updates]
except AioRpcError as exc:
raise VSSClientError.from_grpc_error(exc) from exc

@check_connected_async
async def authorize(self, token: str, **rpc_kwargs) -> str:
Expand Down

0 comments on commit 0232a52

Please sign in to comment.