Skip to content

Commit

Permalink
fix: Remove auto-discovery of sessions from multiple scanners
Browse files Browse the repository at this point in the history
Since we have a dedicated scanner for identifying available sessions it
does not make sense to implement a low-effort variant of
sessions-scanning for multiple scanners with totally different tasks.

Moreover, this change allows to run these scanners without requiring the
availability of DiagnosticSessionControl, as long as no --sessions flag
is given.
  • Loading branch information
ferdinandjarisch authored and rumpelsepp committed Oct 27, 2023
1 parent 635d5f8 commit c961e90
Show file tree
Hide file tree
Showing 4 changed files with 293 additions and 314 deletions.
217 changes: 110 additions & 107 deletions src/gallia/commands/scan/uds/identifiers.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ def configure_parser(self) -> None:
nargs="?",
const=1,
type=int,
help="Check current session via read DID [for every nth DataIdentifier] and try to recover session",
help="Check current session via read DID [for every nth DataIdentifier] and try to recover session; only takes affect if --sessions is given",
)
self.parser.add_argument(
"--skip",
Expand All @@ -94,6 +94,7 @@ def configure_parser(self) -> None:
- 0x10-0x2f
- 0x01:0xf3,0x10-0x2f
Multiple session specific skips are separated by space.
Only takes affect if --sessions is given.
""",
)
self.parser.add_argument(
Expand All @@ -104,130 +105,132 @@ def configure_parser(self) -> None:

async def main(self, args: Namespace) -> None:
if args.sessions is None:
logger.info("No sessions specified, starting with session scan")
# Only until 0x80 because the eight bit is "SuppressResponse"
sessions = [
s
for s in range(1, 0x80)
if s not in args.skip or args.skip[s] is not None
]
sessions = await self.ecu.find_sessions(sessions)
logger.result(f"Found {len(sessions)} sessions: {g_repr(sessions)}")
logger.notice("Performing scan in current session")
await self.perform_scan(args)

else:
sessions = [
sessions: list[int] = [
s
for s in args.sessions
if s not in args.skip or args.skip[s] is not None
]
logger.info(f"testing sessions {g_repr(sessions)}")

logger.info(f"testing sessions {g_repr(sessions)}")
# TODO: Unified shortened output necessary here
logger.info(f"skipping identifiers {reprlib.repr(args.skip)}")

# TODO: Unified shortened output necessary here
logger.info(f"skipping identifiers {reprlib.repr(args.skip)}")
for session in sessions:
logger.notice(f"Switching to session {g_repr(session)}")
resp: UDSResponse = await self.ecu.set_session(session)
if isinstance(resp, NegativeResponse):
logger.warning(
f"Switching to session {g_repr(session)} failed: {resp}"
)
continue

for session in sessions:
logger.notice(f"Switching to session {g_repr(session)}")
resp: UDSResponse = await self.ecu.set_session(session)
if isinstance(resp, NegativeResponse):
logger.warning(f"Switching to session {g_repr(session)} failed: {resp}")
logger.result(f"Starting scan in session: {g_repr(session)}")

await self.perform_scan(args, session)

logger.result(f"Scan in session {g_repr(session)} is complete!")
logger.info(f"Leaving session {g_repr(session)} via hook")
await self.ecu.leave_session(session)

async def perform_scan(self, args: Namespace, session: None | int = None) -> None:
positive_DIDs = 0
abnormal_DIDs = 0
timeout_DIDs = 0
sub_functions = [0x00]

if args.sid == UDSIsoServices.RoutineControl:
if not args.payload:
logger.warning(
"Scanning RoutineControl with empty payload can successfully execute some "
+ "routines that might have irreversible effects without elevated privileges"
)

# Scan all three subfunctions (startRoutine, stopRoutine, requestRoutineResults)
sub_functions = list(map(int, RoutineControlSubFuncs))

if args.sid == UDSIsoServices.SecurityAccess and args.end > 0xFF:
logger.warning(
"Service 0x27 SecurityAccess only accepts subFunctions (1-byte identifiers); "
+ f"limiting END to {g_repr(0xff)} instead of {g_repr(args.end)}"
)
args.end = 0xFF

for DID, sub_function in product(
range(args.start, args.end + 1), sub_functions
):
if session in args.skip and DID in args.skip[session]:
logger.info(f"{g_repr(DID)}: skipped")
continue

logger.result(f"Starting scan in session: {g_repr(session)}")
positive_DIDs = 0
abnormal_DIDs = 0
timeout_DIDs = 0
sub_functions = [0x00]

if args.sid == UDSIsoServices.RoutineControl:
if not args.payload:
logger.warning(
"Scanning RoutineControl with empty payload can successfully execute some "
+ "routines, such as switching from plant mode to field mode, which can only "
+ "be reversed with a valid token!"
if (
session is not None
and args.check_session
and DID % args.check_session == 0
):
# Check session and try to recover from wrong session (max 3 times), else skip session
if not await self.ecu.check_and_set_session(session):
logger.error(
f"Aborting scan on session {g_repr(session)}; current DID was {g_repr(DID)}"
)

# Scan all three subfunctions (startRoutine, stopRoutine, requestRoutineResults)
sub_functions = list(map(int, RoutineControlSubFuncs))
break

if args.sid == UDSIsoServices.SecurityAccess:
if args.end > 0xFF:
logger.warning(
"Service 0x27 SecurityAccess only accepts subFunctions (1-byte identifiers); "
+ f"limiting END to {g_repr(0xff)} instead of {g_repr(args.end)}"
if DID & 0b10000000:
logger.info(
"Keep in mind that you set the SuppressResponse Bit (8th bit): "
+ f"{g_repr(DID)} = 0b{DID:b}"
)
args.end = 0xFF
pdu = bytes([args.sid, DID])

elif args.sid == UDSIsoServices.RoutineControl:
pdu = bytes(
[args.sid, sub_function, DID >> 8, DID & 0xFF]
) # Needs extra byte for sub function

# DefaultBehavior, e.g. for ReadDataByIdentifier/WriteDataByIdentifier
else:
pdu = bytes([args.sid, DID >> 8, DID & 0xFF])

if args.payload:
pdu += args.payload

try:
resp = await self.ecu.send_raw(
pdu, config=UDSRequestConfig(tags=["ANALYZE"], max_retry=3)
)
except asyncio.TimeoutError:
logger.result(f"{g_repr(DID)}: Retries exceeded")
timeout_DIDs += 1
continue
except IllegalResponse as e:
logger.warning(g_repr(e))
continue

for DID, sub_function in product(
range(args.start, args.end + 1), sub_functions
):
if session in args.skip and DID in args.skip[session]:
logger.info(f"{g_repr(DID)}: skipped")
continue
if isinstance(resp, NegativeResponse):
if suggests_service_not_supported(resp):
logger.info(
f"{g_repr(DID)}: {resp}; does session {g_repr(session)} "
f"support service {service_repr(args.sid)}?"
)

if args.check_session and DID % args.check_session == 0:
# Check session and try to recover from wrong session (max 3 times), else skip session
if not await self.ecu.check_and_set_session(session):
logger.error(
f"Aborting scan on session {g_repr(session)}; current DID was {g_repr(DID)}"
)
if args.skip_not_supported:
break

if args.sid == UDSIsoServices.SecurityAccess:
if DID & 0b10000000:
logger.info(
"Keep in mind that you set the SuppressResponse Bit (8th bit): "
+ f"{g_repr(DID)} = 0b{DID:b}"
)
pdu = bytes([args.sid, DID])

elif args.sid == UDSIsoServices.RoutineControl:
pdu = bytes(
[args.sid, sub_function, DID >> 8, DID & 0xFF]
) # Needs extra byte for sub function
# RequestOutOfRange is a common reply for invalid/unknown DataIdentifiers
elif resp.response_code == UDSErrorCodes.requestOutOfRange:
logger.info(f"{g_repr(DID)}: {resp}")

# DefaultBehaviour, e.g. for ReadDataByIdentifier/WriteDataByIdentifier
else:
pdu = bytes([args.sid, DID >> 8, DID & 0xFF])

if args.payload:
pdu += args.payload

try:
resp = await self.ecu.send_raw(
pdu, config=UDSRequestConfig(tags=["ANALYZE"], max_retry=3)
)
except asyncio.TimeoutError:
logger.result(f"{g_repr(DID)}: Retries exceeded")
timeout_DIDs += 1
continue
except IllegalResponse as e:
logger.warning(g_repr(e))

if isinstance(resp, NegativeResponse):
if suggests_service_not_supported(resp):
logger.info(
f"{g_repr(DID)}: {resp}; does session {g_repr(session)} "
f"support service {service_repr(args.sid)}?"
)

if args.skip_not_supported:
break

# RequestOutOfRange is a common reply for invalid DataIdentifiers
elif resp.response_code == UDSErrorCodes.requestOutOfRange:
logger.info(f"{g_repr(DID)}: {resp}")

else:
logger.result(f"{g_repr(DID)}: {resp}")
abnormal_DIDs += 1
else:
logger.result(f"{g_repr(DID)}: {resp}")
positive_DIDs += 1

logger.result(f"Scan in session {g_repr(session)} is complete!")
logger.result(f"Positive replies: {positive_DIDs}")
logger.result(f"Abnormal replies: {abnormal_DIDs}")
logger.result(f"Timeouts: {timeout_DIDs}")

logger.info(f"Leaving session {g_repr(session)} via hook")
await self.ecu.leave_session(session)
abnormal_DIDs += 1
else:
logger.result(f"{g_repr(DID)}: {resp}")
positive_DIDs += 1

logger.result(f"Positive replies: {positive_DIDs}")
logger.result(f"Abnormal replies: {abnormal_DIDs}")
logger.result(f"Timeouts: {timeout_DIDs}")
Loading

0 comments on commit c961e90

Please sign in to comment.