Skip to content

Commit

Permalink
Make subscribe call and set_current_values call using the kuksa.val.v…
Browse files Browse the repository at this point in the history
…2 interface
  • Loading branch information
lukasmittag committed Sep 5, 2024
1 parent fae987d commit 41726d9
Show file tree
Hide file tree
Showing 13 changed files with 876 additions and 424 deletions.
2 changes: 1 addition & 1 deletion .gitmodules
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@

[submodule "submodules/kuksa-databroker"]
[submodule "kuksa-databroker"]
path = submodules/kuksa-databroker
url = https://github.com/SoftwareDefinedVehicle/kuksa-databroker.git
branch = rel-0.5.0
12 changes: 0 additions & 12 deletions kuksa-client/kuksa/__init__.py

This file was deleted.

12 changes: 0 additions & 12 deletions kuksa-client/kuksa/val/__init__.py

This file was deleted.

1 change: 0 additions & 1 deletion kuksa-client/kuksa/val/v1/README.md

This file was deleted.

12 changes: 0 additions & 12 deletions kuksa-client/kuksa/val/v1/__init__.py

This file was deleted.

1 change: 0 additions & 1 deletion kuksa-client/kuksa/val/v1/types.proto

This file was deleted.

1 change: 0 additions & 1 deletion kuksa-client/kuksa/val/v1/val.proto

This file was deleted.

13 changes: 7 additions & 6 deletions kuksa-client/kuksa_client/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,13 +136,14 @@ def path_completer(self, text, line, begidx, endidx):

def subscribeCallback(self, logPath, resp):
if logPath is None:
self.async_alert(
highlight(
json.dumps(json.loads(resp), indent=2),
lexers.JsonLexer(),
formatters.TerminalFormatter(),
with self.terminal_lock:
self.async_alert(
highlight(
json.dumps(json.loads(resp), indent=2),
lexers.JsonLexer(),
formatters.TerminalFormatter(),
)
)
)
else:
with logPath.open("a", encoding="utf-8") as logFile:
logFile.write(resp + "\n")
Expand Down
62 changes: 41 additions & 21 deletions kuksa-client/kuksa_client/cli_backend/grpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,18 @@ def __init__(self, config):
self.run = False

self.AttrDict = {
"value": (kuksa_client.grpc.Field.VALUE, kuksa_client.grpc.View.CURRENT_VALUE),
"targetValue": (kuksa_client.grpc.Field.ACTUATOR_TARGET, kuksa_client.grpc.View.TARGET_VALUE),
"metadata": (kuksa_client.grpc.Field.METADATA, kuksa_client.grpc.View.METADATA),
"value": (
kuksa_client.grpc.Field.VALUE,
kuksa_client.grpc.View.CURRENT_VALUE,
),
"targetValue": (
kuksa_client.grpc.Field.ACTUATOR_TARGET,
kuksa_client.grpc.View.TARGET_VALUE,
),
"metadata": (
kuksa_client.grpc.Field.METADATA,
kuksa_client.grpc.View.METADATA,
),
}

def connection_established(self) -> bool:
Expand Down Expand Up @@ -112,8 +121,10 @@ def getValue(self, path: str, attribute="value", timeout=5):
def getValues(self, paths: Iterable[str], attribute="value", timeout=5):
if attribute in self.AttrDict:
field, view = self.AttrDict[attribute]
entries = [kuksa_client.grpc.EntryRequest(
path=path, view=view, fields=(field,)) for path in paths]
entries = [
kuksa_client.grpc.EntryRequest(path=path, view=view, fields=(field,))
for path in paths
]
requestArgs = {'entries': entries}
return self._sendReceiveMsg(("get", requestArgs), timeout)

Expand All @@ -127,28 +138,33 @@ def setValues(self, updates: Dict[str, Any], attribute="value", timeout=5):
if attribute in self.AttrDict:
field, _ = self.AttrDict[attribute]
entry_updates = []
v1 = True
for path, value in updates.items():

if field is kuksa_client.grpc.Field.VALUE:
entry = kuksa_client.grpc.DataEntry(
path=path, value=kuksa_client.grpc.Datapoint(value=value))
path=path,
value=kuksa_client.grpc.Datapoint(value=value),
)
v1 = False
elif field is kuksa_client.grpc.Field.ACTUATOR_TARGET:
entry = kuksa_client.grpc.DataEntry(
path=path, actuator_target=kuksa_client.grpc.Datapoint(
value=value),
path=path,
actuator_target=kuksa_client.grpc.Datapoint(value=value),
)
elif field is kuksa_client.grpc.Field.METADATA:
try:
metadata_dict = json.loads(value)
except json.JSONDecodeError:
return json.dumps({"error": "Metadata value needs to be a valid JSON object"})
entry = kuksa_client.grpc.DataEntry(
path=path, metadata=kuksa_client.grpc.Metadata.from_dict(
metadata_dict),
path=path,
metadata=kuksa_client.grpc.Metadata.from_dict(metadata_dict),
)
entry_updates.append(kuksa_client.grpc.EntryUpdate(
entry=entry, fields=(field,)))
requestArgs = {'updates': entry_updates}
entry_updates.append(
kuksa_client.grpc.EntryUpdate(entry=entry, fields=(field,))
)
requestArgs = {"updates": entry_updates, "v1": v1}
return self._sendReceiveMsg(("set", requestArgs), timeout)
return json.dumps({"error": "Invalid Attribute"})

Expand All @@ -175,11 +191,14 @@ def subscribe(self, path: str, callback, attribute="value", timeout=5):
def subscribeMultiple(self, paths: Iterable[str], callback, attribute="value", timeout=5):
if attribute in self.AttrDict:
field, view = self.AttrDict[attribute]
entries = [kuksa_client.grpc.SubscribeEntry(
path=path, view=view, fields=(field,)) for path in paths]
entries = [
kuksa_client.grpc.SubscribeEntry(path=path, view=view, fields=(field,))
for path in paths
]
requestArgs = {
'entries': entries,
'callback': callback_wrapper(callback),
"entries": entries,
"v1": False,
"callback": callback_wrapper(callback),
}
return self._sendReceiveMsg(("subscribe", requestArgs), timeout)

Expand Down Expand Up @@ -222,8 +241,7 @@ def _sendReceiveMsg(self, req, timeout):
# Async function to handle the gRPC calls
async def _grpcHandler(self, vss_client: kuksa_client.grpc.aio.VSSClient):
self.run = True
subscriber_manager = kuksa_client.grpc.aio.SubscriberManager(
vss_client)
subscriber_manager = kuksa_client.grpc.aio.SubscriberManager(vss_client)
self.grpc_connection_established = True
while self.run:
try:
Expand Down Expand Up @@ -273,7 +291,9 @@ def updateVSSTree(self, jsonStr, timeout=5):
async def mainLoop(self):
if self.insecure:

async with kuksa_client.grpc.aio.VSSClient(self.serverIP, self.serverPort, token=self.token) as vss_client:
async with kuksa_client.grpc.aio.VSSClient(
self.serverIP, self.serverPort, token=self.token
) as vss_client:
logger.info("gRPC channel connected.")
await self._grpcHandler(vss_client)
else:
Expand All @@ -282,7 +302,7 @@ async def mainLoop(self):
self.serverPort,
root_certificates=self.cacertificate,
tls_server_name=self.tls_server_name,
token=self.token
token=self.token,
) as vss_client:
logger.info("Secure gRPC channel connected.")
await self._grpcHandler(vss_client)
Loading

0 comments on commit 41726d9

Please sign in to comment.