diff --git a/src/gallia/commands/primitive/uds/dddi.py b/src/gallia/commands/primitive/uds/dddi.py new file mode 100644 index 000000000..5de2ab6e7 --- /dev/null +++ b/src/gallia/commands/primitive/uds/dddi.py @@ -0,0 +1,180 @@ +# SPDX-FileCopyrightText: AISEC Pentesting Team +# +# SPDX-License-Identifier: Apache-2.0 + +import sys +from typing import Annotated, cast + +from pydantic import BeforeValidator + +from gallia.command import UDSScanner +from gallia.command.config import AutoInt, Field, err_int +from gallia.command.uds import UDSScannerConfig +from gallia.log import get_logger +from gallia.services.uds import NegativeResponse +from gallia.services.uds.core.utils import g_repr + +logger = get_logger(__name__) + + +class DDDIPrimitiveConfig(UDSScannerConfig): + properties: bool = Field( + False, + description="Read and store the ECU properties prior and after scan", + cli_group=UDSScannerConfig._cli_group, + config_section=UDSScannerConfig._config_section, + ) + session: AutoInt = Field(0x01, description="The session in which the requests are made") + + +def parse_definitions(value: str | tuple[int, ...], expected_len: int) -> tuple[int, ...]: + if isinstance(value, tuple): + if len(value) != expected_len: + raise ValueError(f"Need exactly {expected_len} values for each definition") + + return value + + values = value.split(":") + + if len(values) != expected_len: + raise ValueError(f"Need exactly {expected_len} values for each definition") + + return tuple(err_int(x, 0) for x in values) + + +def parse_id(value: str | tuple[int, int, int]) -> tuple[int, int, int]: + return cast(tuple[int, int, int], parse_definitions(value, 3)) + + +def parse_mem(value: str | tuple[int, int]) -> tuple[int, int]: + return cast(tuple[int, int], parse_definitions(value, 2)) + + +class DefineByIdentifierDDDIPrimitiveConfig(DDDIPrimitiveConfig): + data_identifier: AutoInt = Field( + description="The new dynamically defined data identifier", positional=True + ) + sources: list[Annotated[tuple[int, int, int], BeforeValidator(parse_id)]] = Field( + description="The definitions of the source data to be included in the dynamically defined data. Each of them consists of a data identifier, the start byte of the corresponding data record (1-indexed) and its length in bytes", + metavar="ID:START:LENGTH", + ) + + +class DefineByMemoryAddressDDDIPrimitiveConfig(DDDIPrimitiveConfig): + data_identifier: AutoInt = Field( + description="The new dynamically defined data identifier", positional=True + ) + sources: list[Annotated[tuple[int, int], BeforeValidator(parse_mem)]] = Field( + description="The definitions of the source data to be included in the dynamically defined data. each of them consists of a memory address and the length in bytes", + metavar="ADDRESS:LENGTH", + ) + address_format: AutoInt | None = Field( + None, + description="The addressAndLengthFormatIdentifier, which can be set manually or deduced automatically if not given explicitly", + ) + + +class ClearDynamicallyDefinedDataIdentifierDDDIPrimitiveConfig(DDDIPrimitiveConfig): + data_identifier: AutoInt | None = Field( + description="The dynamically defined data identifier to be cleared. Omit if all dynamically defined identifiers should be cleared." + ) + + +class DDDIPrimitive(UDSScanner): + """dynamically define data identifiers""" + + CONFIG_TYPE = DDDIPrimitiveConfig + SHORT_HELP = "DynamicallyDefineDataIdentifiers" + + def __init__(self, config: DDDIPrimitiveConfig): + super().__init__(config) + self.config: DDDIPrimitiveConfig = config + + async def main(self) -> None: + try: + await self.ecu.check_and_set_session(self.config.session) + except Exception as e: + logger.critical(f"Could not change to session: {g_repr(self.config.session)}: {e!r}") + sys.exit(1) + + +class DefineByIdentifierDDDIPrimitive(DDDIPrimitive): + CONFIG_TYPE = DefineByIdentifierDDDIPrimitiveConfig + SHORT_HELP = "DefineByIdentifier" + + def __init__(self, config: DefineByIdentifierDDDIPrimitiveConfig): + super().__init__(config) + self.config: DefineByIdentifierDDDIPrimitiveConfig = config + + async def main(self) -> None: + await super().main() + + source_identifiers = [] + start_positions = [] + lengths = [] + + for identifier, start, length in self.config.sources: + source_identifiers.append(identifier) + start_positions.append(start) + lengths.append(length) + + response = await self.ecu.define_by_identifier( + self.config.data_identifier, source_identifiers, start_positions, lengths + ) + + if isinstance(response, NegativeResponse): + logger.error(response) + else: + # There is not real data returned, only echoes + logger.result("Success") + + +class DefineByMemoryAddressDDDIPrimitive(DDDIPrimitive): + CONFIG_TYPE = DefineByMemoryAddressDDDIPrimitiveConfig + SHORT_HELP = "DefineByMemoryAddress" + + def __init__(self, config: DefineByMemoryAddressDDDIPrimitiveConfig): + super().__init__(config) + self.config: DefineByMemoryAddressDDDIPrimitiveConfig = config + + async def main(self) -> None: + await super().main() + + addresses = [] + lengths = [] + + for address, length in self.config.sources: + addresses.append(address) + lengths.append(length) + + response = await self.ecu.define_by_memory_address( + self.config.data_identifier, addresses, lengths, self.config.address_format + ) + + if isinstance(response, NegativeResponse): + logger.error(response) + else: + # There is not real data returned, only echoes + logger.result("Success") + + +class ClearDynamicallyDefinedDataIdentifierDDDIPrimitive(DDDIPrimitive): + CONFIG_TYPE = ClearDynamicallyDefinedDataIdentifierDDDIPrimitiveConfig + SHORT_HELP = "ClearDynamicallyDefinedDataIdentifier" + + def __init__(self, config: ClearDynamicallyDefinedDataIdentifierDDDIPrimitiveConfig): + super().__init__(config) + self.config: ClearDynamicallyDefinedDataIdentifierDDDIPrimitiveConfig = config + + async def main(self) -> None: + await super().main() + + response = await self.ecu.clear_dynamically_defined_data_identifier( + self.config.data_identifier + ) + + if isinstance(response, NegativeResponse): + logger.error(response) + else: + # There is not real data returned, only echoes + logger.result("Success") diff --git a/src/gallia/plugins/uds.py b/src/gallia/plugins/uds.py index 915622fa2..c5babe54d 100644 --- a/src/gallia/plugins/uds.py +++ b/src/gallia/plugins/uds.py @@ -9,6 +9,11 @@ from gallia.commands import HSFZDiscoverer from gallia.commands.discover.doip import DoIPDiscoverer from gallia.commands.primitive.generic.pdu import GenericPDUPrimitive +from gallia.commands.primitive.uds.dddi import ( + ClearDynamicallyDefinedDataIdentifierDDDIPrimitive, + DefineByIdentifierDDDIPrimitive, + DefineByMemoryAddressDDDIPrimitive, +) from gallia.commands.primitive.uds.dtc import ( ClearDTCPrimitive, ControlDTCPrimitive, @@ -72,6 +77,14 @@ def commands(cls) -> Mapping[str, CommandTree | type[BaseCommand]]: description="Universal Diagnostic Services", subtree={ "rdbi": ReadByIdentifierPrimitive, + "dddi": CommandTree( + description="DynamicallyDefineDataIdentifiers", + subtree={ + "identifier": DefineByIdentifierDDDIPrimitive, + "memory": DefineByMemoryAddressDDDIPrimitive, + "clear": ClearDynamicallyDefinedDataIdentifierDDDIPrimitive, + }, + ), "dtc": CommandTree( description="DiagnosticTroubleCodes", subtree={ diff --git a/src/gallia/services/uds/core/client.py b/src/gallia/services/uds/core/client.py index 3e6ebbe61..90377ed4a 100644 --- a/src/gallia/services/uds/core/client.py +++ b/src/gallia/services/uds/core/client.py @@ -897,6 +897,86 @@ async def request_transfer_exit( config, ) + async def define_by_identifier( + self, + dynamically_defined_data_identifier: int, + source_data_identifiers: int | Sequence[int], + positions_in_source_data_record: int | Sequence[int], + memory_sizes: int | Sequence[int], + suppress_response: bool = False, + ) -> service.NegativeResponse | service.DefineByIdentifierResponse: + """Defines a data identifier which combines data from multiple existing data identifiers on the UDS server. + This is an implementation of the UDS request for the defineByIdentifier sub-function of the + service DynamicallyDefineDataIdentifier (0x2C). + + :param dynamically_defined_data_identifier: The new data identifier. + :param source_data_identifiers: The source data identifiers which refer to the data to be included in the new data identifier. + :param positions_in_source_data_record: The start positions for each source data identifier. Note, that the position is 1-indexed. + :param memory_sizes: The number of bytes for each source data identifier, starting from the starting position. + :param suppress_response: If set to True, the server is advised to not send back a positive + response. + """ + return await self.request( + service.DefineByIdentifierRequest( + dynamically_defined_data_identifier, + source_data_identifiers, + positions_in_source_data_record, + memory_sizes, + suppress_response, + ) + ) + + async def define_by_memory_address( + self, + dynamically_defined_data_identifier: int, + memory_addresses: int | Sequence[int], + memory_sizes: int | Sequence[int], + address_and_length_format_identifier: int | None = None, + suppress_response: bool = False, + ) -> service.NegativeResponse | service.DefineByMemoryAddressResponse: + """Defines a data identifier which combines data from multiple existing memory regions on the UDS server. + This is an implementation of the UDS request for the defineByMemoryAddress sub-function of the + service DynamicallyDefineDataIdentifier (0x2C). + While it exposes each parameter of the corresponding specification, + some parameters can be computed from the remaining ones and can therefore be omitted. + + :param dynamically_defined_data_identifier: The new data identifier. + :param memory_addresses: The memory addresses for each source data. + :param memory_sizes: The number of bytes for each source data, starting from the memory address. + :param address_and_length_format_identifier: The byte lengths of the memory address and + size. If omitted, this parameter is computed + based on the memory_address and memory_size + or data_record parameters. + :param suppress_response: If set to True, the server is advised to not send back a positive + response. + """ + return await self.request( + service.DefineByMemoryAddressRequest( + dynamically_defined_data_identifier, + memory_addresses, + memory_sizes, + address_and_length_format_identifier, + suppress_response, + ) + ) + + async def clear_dynamically_defined_data_identifier( + self, dynamically_defined_data_identifier: int | None, suppress_response: bool = False + ) -> service.ClearDynamicallyDefinedDataIdentifierResponse: + """Clears either a specific dynamically defined data identifier or all if no data identifier is given. + This is an implementation of the UDS request for the clearDynamicallyDefinedDataIdentifier sub-function of the + service DynamicallyDefineDataIdentifier (0x2C). + + :param dynamically_defined_data_identifier: The dynamically defined data identifier to be cleared, or None if all are to be cleared. + :param suppress_response: If set to True, the server is advised to not send back a positive + response. + """ + return await self.request( + service.ClearDynamicallyDefinedDataIdentifierRequest( + dynamically_defined_data_identifier, suppress_response + ) + ) + @overload async def request( self, request: service.RawRequest, config: UDSRequestConfig | None = None @@ -1084,6 +1164,23 @@ async def request( config: UDSRequestConfig | None = None, ) -> service.NegativeResponse | service.RequestTransferExitResponse: ... + @overload + async def request( + self, request: service.DefineByIdentifierRequest, config: UDSRequestConfig | None = None + ) -> service.DefineByIdentifierResponse: ... + + @overload + async def request( + self, request: service.DefineByMemoryAddressRequest, config: UDSRequestConfig | None = None + ) -> service.DefineByMemoryAddressResponse: ... + + @overload + async def request( + self, + request: service.ClearDynamicallyDefinedDataIdentifierRequest, + config: UDSRequestConfig | None = None, + ) -> service.ClearDynamicallyDefinedDataIdentifierResponse: ... + async def request( self, request: service.UDSRequest, config: UDSRequestConfig | None = None ) -> service.UDSResponse: diff --git a/src/gallia/services/uds/core/constants.py b/src/gallia/services/uds/core/constants.py index 775a4072f..d04decc64 100644 --- a/src/gallia/services/uds/core/constants.py +++ b/src/gallia/services/uds/core/constants.py @@ -236,3 +236,10 @@ class DTCFormatIdentifier(IntEnum): @unique class DataIdentifier(IntEnum): ActiveDiagnosticSessionDataIdentifier = 0xF186 + + +@unique +class DynamicallyDefineDataIdentifierSubFuncs(IntEnum): + defineByIdentifier = 0x01 + defineByMemoryAddress = 0x02 + clearDynamicallyDefinedDataIdentifier = 0x03 diff --git a/src/gallia/services/uds/core/service.py b/src/gallia/services/uds/core/service.py index 23d8b121c..c643e83ff 100644 --- a/src/gallia/services/uds/core/service.py +++ b/src/gallia/services/uds/core/service.py @@ -14,6 +14,7 @@ from gallia.log import get_logger from gallia.services.uds.core.constants import ( DTCFormatIdentifier, + DynamicallyDefineDataIdentifierSubFuncs, InputOutputControlParameter, ReadDTCInformationSubFuncs, RoutineControlSubFuncs, @@ -22,6 +23,7 @@ UDSIsoServicesEchoLength, ) from gallia.services.uds.core.utils import ( + address_and_length_fmt, address_and_size_length, any_repr, bytes_repr, @@ -1456,6 +1458,443 @@ class ReadMemoryByAddress(UDSService, service_id=UDSIsoServices.ReadMemoryByAddr # ************************************** +T_DynamicallyDefineDataIdentifierResponse = TypeVar( + "T_DynamicallyDefineDataIdentifierResponse", bound="_DynamicallyDefineDataIdentifierResponse" +) + + +class _DynamicallyDefineDataIdentifierResponse( + SpecializedSubFunctionResponse, + ABC, + minimal_length=2, + maximal_length=4, + service_id=UDSIsoServices.DynamicallyDefineDataIdentifier, + sub_function_id=0, +): + def __init__(self, dynamically_defined_data_identifier: int | None = None): + super().__init__() + + if dynamically_defined_data_identifier is not None: + check_data_identifier(dynamically_defined_data_identifier) + + self.dynamically_defined_data_identifier = dynamically_defined_data_identifier + + @property + def pdu(self) -> bytes: + if self.dynamically_defined_data_identifier is None: + return pack("!BB", self.RESPONSE_SERVICE_ID, self.SUB_FUNCTION_ID) + else: + return pack( + "!BBH", + self.RESPONSE_SERVICE_ID, + self.SUB_FUNCTION_ID, + self.dynamically_defined_data_identifier, + ) + + @classmethod + def _from_pdu( + cls: type[T_DynamicallyDefineDataIdentifierResponse], pdu: bytes + ) -> T_DynamicallyDefineDataIdentifierResponse: + dynamically_defined_data_identifier: int | None = None + + if len(pdu) > 2: + dynamically_defined_data_identifier = from_bytes(pdu[2:]) + + return cls(dynamically_defined_data_identifier) + + def matches(self, request: UDSRequest) -> bool: + return ( + isinstance(request, _DynamicallyDefineDataIdentifierRequest) + and self.sub_function == request.sub_function + ) + + +class _DynamicallyDefineDataIdentifierRequest( + SpecializedSubFunctionRequest, + ABC, + minimal_length=2, + maximal_length=None, + service_id=UDSIsoServices.DynamicallyDefineDataIdentifier, + sub_function_id=0, + response_type=_DynamicallyDefineDataIdentifierResponse, +): + def __init__( + self, dynamically_defined_data_identifier: int | None, suppress_response: bool = False + ): + super().__init__(suppress_response) + + if dynamically_defined_data_identifier is not None: + check_data_identifier(dynamically_defined_data_identifier) + + self.dynamically_defined_data_identifier = dynamically_defined_data_identifier + + +class DefineByIdentifierResponse( + _DynamicallyDefineDataIdentifierResponse, + minimal_length=4, + maximal_length=4, + service_id=UDSIsoServices.DynamicallyDefineDataIdentifier, + sub_function_id=DynamicallyDefineDataIdentifierSubFuncs.defineByIdentifier, +): + def __init__(self, dynamically_defined_data_identifier: int): + super().__init__(dynamically_defined_data_identifier) + + +class DefineByIdentifierRequest( + _DynamicallyDefineDataIdentifierRequest, + minimal_length=8, + maximal_length=None, + service_id=UDSIsoServices.DynamicallyDefineDataIdentifier, + sub_function_id=DynamicallyDefineDataIdentifierSubFuncs.defineByIdentifier, + response_type=DefineByIdentifierResponse, +): + def __init__( + self, + dynamically_defined_data_identifier: int, + source_data_identifiers: int | Sequence[int], + positions_in_source_data_record: int | Sequence[int], + memory_sizes: int | Sequence[int], + suppress_response: bool = False, + ): + """Defines a data identifier which combines data from multiple existing data identifiers on the UDS server. + This is an implementation of the UDS request for the defineByIdentifier sub-function of the + service DynamicallyDefineDataIdentifier (0x2C). + + :param dynamically_defined_data_identifier: The new data identifier. + :param source_data_identifiers: The source data identifiers which refer to the data to be included in the new data identifier. + :param positions_in_source_data_record: The start positions for each source data identifier. Note, that the position is 1-indexed. + :param memory_sizes: The number of bytes for each source data identifier, starting from the starting position. + :param suppress_response: If set to True, the server is advised to not send back a positive + response. + """ + super().__init__(dynamically_defined_data_identifier, suppress_response) + + if not isinstance(source_data_identifiers, int): + self.source_data_identifiers = list(source_data_identifiers) + else: + self.source_data_identifiers = [source_data_identifiers] + + if not isinstance(positions_in_source_data_record, int): + self.positions_in_source_data_record = list(positions_in_source_data_record) + else: + self.positions_in_source_data_record = [positions_in_source_data_record] + + if not isinstance(memory_sizes, int): + self.memory_sizes = list(memory_sizes) + else: + self.memory_sizes = [memory_sizes] + + if len(self.source_data_identifiers) != len(self.positions_in_source_data_record): + raise ValueError( + f"The number of source data identifiers does not match the number of " + f"positions in source data records: " + f"{len(self.source_data_identifiers)} != {len(self.positions_in_source_data_record)}" + ) + + if len(self.source_data_identifiers) != len(self.memory_sizes): + raise ValueError( + f"The number of source data identifiers does not match the number of " + f"memory sizes: " + f"{len(self.source_data_identifiers)} != {len(self.memory_sizes)}" + ) + + for identifier in self.source_data_identifiers: + check_data_identifier(identifier) + + @property + def source_data_identifier(self) -> int: + return self.source_data_identifiers[0] + + @source_data_identifier.setter + def source_data_identifier(self, source_data_identifier: int) -> None: + self.source_data_identifiers[0] = source_data_identifier + + @property + def position_in_source_data_record(self) -> int: + return self.positions_in_source_data_record[0] + + @position_in_source_data_record.setter + def position_in_source_data_record(self, position_in_source_data_record: int) -> None: + self.positions_in_source_data_record[0] = position_in_source_data_record + + @property + def memory_size(self) -> int: + return self.memory_sizes[0] + + @memory_size.setter + def memory_size(self, memory_size: int) -> None: + self.memory_sizes[0] = memory_size + + @property + def pdu(self) -> bytes: + pdu = pack( + "!BBH", + self.SERVICE_ID, + self.sub_function_with_suppress_response_bit, + self.dynamically_defined_data_identifier, + ) + + for source_data_identifier, position_in_source_data_record, memory_size in zip( + self.source_data_identifiers, self.positions_in_source_data_record, self.memory_sizes + ): + pdu = ( + pdu + + to_bytes(source_data_identifier, 2) + + to_bytes(position_in_source_data_record, 1) + + to_bytes(memory_size, 1) + ) + + return pdu + + @classmethod + def _from_pdu(cls, pdu: bytes) -> DefineByIdentifierRequest: + dynamically_defined_data_identifier = from_bytes(pdu[2:4]) + source_data_identifiers: list[int] = [] + positions_in_source_data_record: list[int] = [] + memory_sizes: list[int] = [] + + if len(pdu) % 4 != 0: + raise ValueError("The format of the PDU does not comply to the standard") + + for i in range(4, len(pdu), 4): + source_data_identifiers.append(from_bytes(pdu[i : i + 2])) + positions_in_source_data_record.append(pdu[i + 2]) + memory_sizes.append(pdu[i + 3]) + + return DefineByIdentifierRequest( + dynamically_defined_data_identifier, + source_data_identifiers, + positions_in_source_data_record, + memory_sizes, + cls.suppress_response_set(pdu), + ) + + +class DefineByMemoryAddressResponse( + _DynamicallyDefineDataIdentifierResponse, + minimal_length=4, + maximal_length=4, + service_id=UDSIsoServices.DynamicallyDefineDataIdentifier, + sub_function_id=DynamicallyDefineDataIdentifierSubFuncs.defineByMemoryAddress, +): + def __init__(self, dynamically_defined_data_identifier: int): + super().__init__(dynamically_defined_data_identifier) + + +class DefineByMemoryAddressRequest( + _DynamicallyDefineDataIdentifierRequest, + minimal_length=7, + maximal_length=None, + service_id=UDSIsoServices.DynamicallyDefineDataIdentifier, + sub_function_id=DynamicallyDefineDataIdentifierSubFuncs.defineByMemoryAddress, + response_type=DefineByMemoryAddressResponse, +): + def __init__( + self, + dynamically_defined_data_identifier: int, + memory_addresses: int | Sequence[int], + memory_sizes: int | Sequence[int], + address_and_length_format_identifier: int | None = None, + suppress_response: bool = False, + ): + """Defines a data identifier which combines data from multiple existing memory regions on the UDS server. + This is an implementation of the UDS request for the defineByMemoryAddress sub-function of the + service DynamicallyDefineDataIdentifier (0x2C). + While it exposes each parameter of the corresponding specification, + some parameters can be computed from the remaining ones and can therefore be omitted. + + :param dynamically_defined_data_identifier: The new data identifier. + :param memory_addresses: The memory addresses for each source data. + :param memory_sizes: The number of bytes for each source data, starting from the memory address. + :param address_and_length_format_identifier: The byte lengths of the memory address and + size. If omitted, this parameter is computed + based on the memory_address and memory_size + or data_record parameters. + :param suppress_response: If set to True, the server is advised to not send back a positive + response. + """ + super().__init__(dynamically_defined_data_identifier, suppress_response) + + if not isinstance(memory_addresses, int): + self.memory_addresses = list(memory_addresses) + else: + self.memory_addresses = [memory_addresses] + + if not isinstance(memory_sizes, int): + self.memory_sizes = list(memory_sizes) + else: + self.memory_sizes = [memory_sizes] + + if len(self.memory_addresses) != len(self.memory_sizes): + raise ValueError( + f"The number of memory addresses does not match the number of " + f"memory sizes: " + f"{len(self.memory_addresses)} != {len(self.memory_sizes)}" + ) + + max_computed_address_length = 0 + max_computed_size_length = 0 + + # In case the address_and_length_format_identifier is None, this calculates it based on the longest address and size + # Otherwise it checks for all addresses and size if they can be represented with its length constraints. + for address, size in zip(self.memory_addresses, self.memory_sizes): + computed_address_and_length_format_identifier, _, _ = uds_memory_parameters( + address, size, address_and_length_format_identifier + ) + + computed_address_length, computed_size_length = address_and_size_length( + computed_address_and_length_format_identifier + ) + max_computed_address_length = max(computed_address_length, max_computed_address_length) + max_computed_size_length = max(computed_size_length, max_computed_size_length) + + if address_and_length_format_identifier is None: + address_and_length_format_identifier = address_and_length_fmt( + max_computed_address_length, max_computed_size_length + ) + + self.address_and_length_format_identifier = address_and_length_format_identifier + + @property + def memory_address(self) -> int: + return self.memory_addresses[0] + + @memory_address.setter + def memory_address(self, memory_address: int) -> None: + self.memory_addresses[0] = memory_address + + @property + def memory_size(self) -> int: + return self.memory_sizes[0] + + @memory_size.setter + def memory_size(self, memory_size: int) -> None: + self.memory_sizes[0] = memory_size + + @property + def pdu(self) -> bytes: + pdu = pack( + "!BBHB", + self.SERVICE_ID, + self.sub_function_with_suppress_response_bit, + self.dynamically_defined_data_identifier, + self.address_and_length_format_identifier, + ) + + for memory_address, memory_size in zip(self.memory_addresses, self.memory_sizes): + _, address, size = uds_memory_parameters( + memory_address, memory_size, self.address_and_length_format_identifier + ) + pdu = pdu + address + size + + return pdu + + @classmethod + def _from_pdu(cls, pdu: bytes) -> DefineByMemoryAddressRequest: + dynamically_defined_data_identifier = from_bytes(pdu[2:4]) + address_and_length_format_identifier = pdu[4] + address_length, size_length = address_and_size_length(address_and_length_format_identifier) + memory_addresses: list[int] = [] + memory_sizes: list[int] = [] + + if (len(pdu) - 5) % (address_length + size_length) != 0: + raise ValueError("The format of the PDU does not comply to the standard") + + for i in range(5, len(pdu), address_length + size_length): + memory_addresses.append(from_bytes(pdu[i : i + address_length])) + memory_sizes.append( + from_bytes(pdu[i + address_length : i + address_length + size_length]) + ) + + return DefineByMemoryAddressRequest( + dynamically_defined_data_identifier, + memory_addresses, + memory_sizes, + address_and_length_format_identifier, + cls.suppress_response_set(pdu), + ) + + +class ClearDynamicallyDefinedDataIdentifierResponse( + _DynamicallyDefineDataIdentifierResponse, + minimal_length=2, + maximal_length=4, + service_id=UDSIsoServices.DynamicallyDefineDataIdentifier, + sub_function_id=DynamicallyDefineDataIdentifierSubFuncs.clearDynamicallyDefinedDataIdentifier, +): + pass + + +class ClearDynamicallyDefinedDataIdentifierRequest( + _DynamicallyDefineDataIdentifierRequest, + minimal_length=2, + maximal_length=4, + service_id=UDSIsoServices.DynamicallyDefineDataIdentifier, + sub_function_id=DynamicallyDefineDataIdentifierSubFuncs.clearDynamicallyDefinedDataIdentifier, + response_type=ClearDynamicallyDefinedDataIdentifierResponse, +): + def __init__( + self, dynamically_defined_data_identifier: int | None, suppress_response: bool = False + ): + """Clears either a specific dynamically defined data identifier or all if no data identifier is given. + This is an implementation of the UDS request for the clearDynamicallyDefinedDataIdentifier sub-function of the + service DynamicallyDefineDataIdentifier (0x2C). + + :param dynamically_defined_data_identifier: The dynamically defined data identifier to be cleared, or None if all are to be cleared. + :param suppress_response: If set to True, the server is advised to not send back a positive + response. + """ + super().__init__(dynamically_defined_data_identifier, suppress_response) + + @property + def pdu(self) -> bytes: + if self.dynamically_defined_data_identifier is None: + return pack( + "!BBH", + self.SERVICE_ID, + self.sub_function_with_suppress_response_bit, + self.dynamically_defined_data_identifier, + ) + else: + return pack( + "!BB", + self.SERVICE_ID, + self.sub_function_with_suppress_response_bit, + ) + + @classmethod + def _from_pdu(cls, pdu: bytes) -> ClearDynamicallyDefinedDataIdentifierRequest: + dynamically_defined_data_identifier: int | None = None + + if len(pdu) > 2: + dynamically_defined_data_identifier = from_bytes(pdu[2:]) + + return cls(dynamically_defined_data_identifier) + + +class DynamicallyDefineDataIdentifier( + SpecializedSubFunctionService, service_id=UDSIsoServices.DynamicallyDefineDataIdentifier +): + class DefineByIdentifier( + SubFunction, sub_function_id=DynamicallyDefineDataIdentifierSubFuncs.defineByIdentifier + ): + Request = DefineByIdentifierRequest + Response = DefineByIdentifierResponse + + class DefineByMemoryAddress( + SubFunction, sub_function_id=DynamicallyDefineDataIdentifierSubFuncs.defineByMemoryAddress + ): + Request = DefineByMemoryAddressRequest + Response = DefineByMemoryAddressResponse + + class ClearDynamicallyDefinedDataIdentifier( + SubFunction, + sub_function_id=DynamicallyDefineDataIdentifierSubFuncs.clearDynamicallyDefinedDataIdentifier, + ): + Request = ClearDynamicallyDefinedDataIdentifierRequest + Response = ClearDynamicallyDefinedDataIdentifierResponse + + # ****************************** # * Write memory by identifier * # ****************************** diff --git a/src/gallia/services/uds/core/utils.py b/src/gallia/services/uds/core/utils.py index 81e48db5a..4481aace1 100644 --- a/src/gallia/services/uds/core/utils.py +++ b/src/gallia/services/uds/core/utils.py @@ -219,6 +219,19 @@ def address_and_size_length(address_and_length_fmt: int) -> tuple[int, int]: return addr_length, size_length +def address_and_length_fmt(address_length: int, size_length: int) -> int: + """Computes the addressAndLengthFormatIdentifier which is used throughout multiple UDS services from its individual lengths. + + :param address_length: The memory address length. + :param size_length: The memory size length. + :return: The addressAndLengthFormatIdentifier combining the individual lengths. + """ + check_range(address_length, "address length", 0, 15) + check_range(size_length, "size length", 0, 15) + + return (size_length << 4) + address_length + + def sub_function_split(sub_function: int) -> tuple[int, bool]: """ Returns the subFunction without suppress bit and if the bit was set.