Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add DynamicallyDefineDataIdentifier service #639

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
180 changes: 180 additions & 0 deletions src/gallia/commands/primitive/uds/dddi.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
# SPDX-FileCopyrightText: AISEC Pentesting Team
#
# SPDX-License-Identifier: Apache-2.0

import sys
from typing import Annotated, cast

from pydantic import BeforeValidator

from gallia.command import UDSScanner
from gallia.command.config import AutoInt, Field, err_int
from gallia.command.uds import UDSScannerConfig
from gallia.log import get_logger
from gallia.services.uds import NegativeResponse
from gallia.services.uds.core.utils import g_repr

logger = get_logger(__name__)


class DDDIPrimitiveConfig(UDSScannerConfig):
properties: bool = Field(
False,
description="Read and store the ECU properties prior and after scan",
cli_group=UDSScannerConfig._cli_group,
config_section=UDSScannerConfig._config_section,
)
session: AutoInt = Field(0x01, description="The session in which the requests are made")


def parse_definitions(value: str | tuple[int, ...], expected_len: int) -> tuple[int, ...]:
if isinstance(value, tuple):
if len(value) != expected_len:
raise ValueError(f"Need exactly {expected_len} values for each definition")

return value

values = value.split(":")

if len(values) != expected_len:
raise ValueError(f"Need exactly {expected_len} values for each definition")

return tuple(err_int(x, 0) for x in values)


def parse_id(value: str | tuple[int, int, int]) -> tuple[int, int, int]:
return cast(tuple[int, int, int], parse_definitions(value, 3))


def parse_mem(value: str | tuple[int, int]) -> tuple[int, int]:
return cast(tuple[int, int], parse_definitions(value, 2))


class DefineByIdentifierDDDIPrimitiveConfig(DDDIPrimitiveConfig):
data_identifier: AutoInt = Field(
description="The new dynamically defined data identifier", positional=True
)
sources: list[Annotated[tuple[int, int, int], BeforeValidator(parse_id)]] = Field(
description="The definitions of the source data to be included in the dynamically defined data. Each of them consists of a data identifier, the start byte of the corresponding data record (1-indexed) and its length in bytes",
metavar="ID:START:LENGTH",
)


class DefineByMemoryAddressDDDIPrimitiveConfig(DDDIPrimitiveConfig):
data_identifier: AutoInt = Field(
description="The new dynamically defined data identifier", positional=True
)
sources: list[Annotated[tuple[int, int], BeforeValidator(parse_mem)]] = Field(
description="The definitions of the source data to be included in the dynamically defined data. each of them consists of a memory address and the length in bytes",
metavar="ADDRESS:LENGTH",
)
address_format: AutoInt | None = Field(
None,
description="The addressAndLengthFormatIdentifier, which can be set manually or deduced automatically if not given explicitly",
)


class ClearDynamicallyDefinedDataIdentifierDDDIPrimitiveConfig(DDDIPrimitiveConfig):
data_identifier: AutoInt | None = Field(
description="The dynamically defined data identifier to be cleared. Omit if all dynamically defined identifiers should be cleared."
)


class DDDIPrimitive(UDSScanner):
"""dynamically define data identifiers"""

CONFIG_TYPE = DDDIPrimitiveConfig
SHORT_HELP = "DynamicallyDefineDataIdentifiers"

def __init__(self, config: DDDIPrimitiveConfig):
super().__init__(config)
self.config: DDDIPrimitiveConfig = config

async def main(self) -> None:
try:
await self.ecu.check_and_set_session(self.config.session)
except Exception as e:
logger.critical(f"Could not change to session: {g_repr(self.config.session)}: {e!r}")
sys.exit(1)


class DefineByIdentifierDDDIPrimitive(DDDIPrimitive):
CONFIG_TYPE = DefineByIdentifierDDDIPrimitiveConfig
SHORT_HELP = "DefineByIdentifier"

def __init__(self, config: DefineByIdentifierDDDIPrimitiveConfig):
super().__init__(config)
self.config: DefineByIdentifierDDDIPrimitiveConfig = config

async def main(self) -> None:
await super().main()

source_identifiers = []
start_positions = []
lengths = []

for identifier, start, length in self.config.sources:
source_identifiers.append(identifier)
start_positions.append(start)
lengths.append(length)

response = await self.ecu.define_by_identifier(
self.config.data_identifier, source_identifiers, start_positions, lengths
)

if isinstance(response, NegativeResponse):
logger.error(response)
else:
# There is not real data returned, only echoes
logger.result("Success")


class DefineByMemoryAddressDDDIPrimitive(DDDIPrimitive):
CONFIG_TYPE = DefineByMemoryAddressDDDIPrimitiveConfig
SHORT_HELP = "DefineByMemoryAddress"

def __init__(self, config: DefineByMemoryAddressDDDIPrimitiveConfig):
super().__init__(config)
self.config: DefineByMemoryAddressDDDIPrimitiveConfig = config

async def main(self) -> None:
await super().main()

addresses = []
lengths = []

for address, length in self.config.sources:
addresses.append(address)
lengths.append(length)

response = await self.ecu.define_by_memory_address(
self.config.data_identifier, addresses, lengths, self.config.address_format
)

if isinstance(response, NegativeResponse):
logger.error(response)
else:
# There is not real data returned, only echoes
logger.result("Success")


class ClearDynamicallyDefinedDataIdentifierDDDIPrimitive(DDDIPrimitive):
CONFIG_TYPE = ClearDynamicallyDefinedDataIdentifierDDDIPrimitiveConfig
SHORT_HELP = "ClearDynamicallyDefinedDataIdentifier"

def __init__(self, config: ClearDynamicallyDefinedDataIdentifierDDDIPrimitiveConfig):
super().__init__(config)
self.config: ClearDynamicallyDefinedDataIdentifierDDDIPrimitiveConfig = config

async def main(self) -> None:
await super().main()

response = await self.ecu.clear_dynamically_defined_data_identifier(
self.config.data_identifier
)

if isinstance(response, NegativeResponse):
logger.error(response)
else:
# There is not real data returned, only echoes
logger.result("Success")
13 changes: 13 additions & 0 deletions src/gallia/plugins/uds.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,11 @@
from gallia.commands import HSFZDiscoverer
from gallia.commands.discover.doip import DoIPDiscoverer
from gallia.commands.primitive.generic.pdu import GenericPDUPrimitive
from gallia.commands.primitive.uds.dddi import (
ClearDynamicallyDefinedDataIdentifierDDDIPrimitive,
DefineByIdentifierDDDIPrimitive,
DefineByMemoryAddressDDDIPrimitive,
)
from gallia.commands.primitive.uds.dtc import (
ClearDTCPrimitive,
ControlDTCPrimitive,
Expand Down Expand Up @@ -72,6 +77,14 @@ def commands(cls) -> Mapping[str, CommandTree | type[BaseCommand]]:
description="Universal Diagnostic Services",
subtree={
"rdbi": ReadByIdentifierPrimitive,
"dddi": CommandTree(
description="DynamicallyDefineDataIdentifiers",
subtree={
"identifier": DefineByIdentifierDDDIPrimitive,
"memory": DefineByMemoryAddressDDDIPrimitive,
"clear": ClearDynamicallyDefinedDataIdentifierDDDIPrimitive,
},
),
"dtc": CommandTree(
description="DiagnosticTroubleCodes",
subtree={
Expand Down
97 changes: 97 additions & 0 deletions src/gallia/services/uds/core/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -897,6 +897,86 @@ async def request_transfer_exit(
config,
)

async def define_by_identifier(
self,
dynamically_defined_data_identifier: int,
source_data_identifiers: int | Sequence[int],
positions_in_source_data_record: int | Sequence[int],
memory_sizes: int | Sequence[int],
suppress_response: bool = False,
) -> service.NegativeResponse | service.DefineByIdentifierResponse:
"""Defines a data identifier which combines data from multiple existing data identifiers on the UDS server.
This is an implementation of the UDS request for the defineByIdentifier sub-function of the
service DynamicallyDefineDataIdentifier (0x2C).

:param dynamically_defined_data_identifier: The new data identifier.
:param source_data_identifiers: The source data identifiers which refer to the data to be included in the new data identifier.
:param positions_in_source_data_record: The start positions for each source data identifier. Note, that the position is 1-indexed.
:param memory_sizes: The number of bytes for each source data identifier, starting from the starting position.
:param suppress_response: If set to True, the server is advised to not send back a positive
response.
"""
return await self.request(
service.DefineByIdentifierRequest(
dynamically_defined_data_identifier,
source_data_identifiers,
positions_in_source_data_record,
memory_sizes,
suppress_response,
)
)

async def define_by_memory_address(
self,
dynamically_defined_data_identifier: int,
memory_addresses: int | Sequence[int],
memory_sizes: int | Sequence[int],
address_and_length_format_identifier: int | None = None,
suppress_response: bool = False,
) -> service.NegativeResponse | service.DefineByMemoryAddressResponse:
"""Defines a data identifier which combines data from multiple existing memory regions on the UDS server.
This is an implementation of the UDS request for the defineByMemoryAddress sub-function of the
service DynamicallyDefineDataIdentifier (0x2C).
While it exposes each parameter of the corresponding specification,
some parameters can be computed from the remaining ones and can therefore be omitted.

:param dynamically_defined_data_identifier: The new data identifier.
:param memory_addresses: The memory addresses for each source data.
:param memory_sizes: The number of bytes for each source data, starting from the memory address.
:param address_and_length_format_identifier: The byte lengths of the memory address and
size. If omitted, this parameter is computed
based on the memory_address and memory_size
or data_record parameters.
:param suppress_response: If set to True, the server is advised to not send back a positive
response.
"""
return await self.request(
service.DefineByMemoryAddressRequest(
dynamically_defined_data_identifier,
memory_addresses,
memory_sizes,
address_and_length_format_identifier,
suppress_response,
)
)

async def clear_dynamically_defined_data_identifier(
self, dynamically_defined_data_identifier: int | None, suppress_response: bool = False
) -> service.ClearDynamicallyDefinedDataIdentifierResponse:
"""Clears either a specific dynamically defined data identifier or all if no data identifier is given.
This is an implementation of the UDS request for the clearDynamicallyDefinedDataIdentifier sub-function of the
service DynamicallyDefineDataIdentifier (0x2C).

:param dynamically_defined_data_identifier: The dynamically defined data identifier to be cleared, or None if all are to be cleared.
:param suppress_response: If set to True, the server is advised to not send back a positive
response.
"""
return await self.request(
service.ClearDynamicallyDefinedDataIdentifierRequest(
dynamically_defined_data_identifier, suppress_response
)
)

@overload
async def request(
self, request: service.RawRequest, config: UDSRequestConfig | None = None
Expand Down Expand Up @@ -1084,6 +1164,23 @@ async def request(
config: UDSRequestConfig | None = None,
) -> service.NegativeResponse | service.RequestTransferExitResponse: ...

@overload
async def request(
self, request: service.DefineByIdentifierRequest, config: UDSRequestConfig | None = None
) -> service.DefineByIdentifierResponse: ...

@overload
async def request(
self, request: service.DefineByMemoryAddressRequest, config: UDSRequestConfig | None = None
) -> service.DefineByMemoryAddressResponse: ...

@overload
async def request(
self,
request: service.ClearDynamicallyDefinedDataIdentifierRequest,
config: UDSRequestConfig | None = None,
) -> service.ClearDynamicallyDefinedDataIdentifierResponse: ...

async def request(
self, request: service.UDSRequest, config: UDSRequestConfig | None = None
) -> service.UDSResponse:
Expand Down
7 changes: 7 additions & 0 deletions src/gallia/services/uds/core/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,3 +236,10 @@ class DTCFormatIdentifier(IntEnum):
@unique
class DataIdentifier(IntEnum):
ActiveDiagnosticSessionDataIdentifier = 0xF186


@unique
class DynamicallyDefineDataIdentifierSubFuncs(IntEnum):
defineByIdentifier = 0x01
defineByMemoryAddress = 0x02
clearDynamicallyDefinedDataIdentifier = 0x03
Loading
Loading