Skip to content

Commit

Permalink
fix(dumpcap): Handle dumpcap invocation for unix transports
Browse files Browse the repository at this point in the history
Currently there is a flaw in the plugin interface. Plugins cannot
specify how dumpcap should be invoked to capture traffic. The default
is ethernet traffic. There are Unix transports available which are
neither CAN traffic nor ethernet traffic. In this case the wrong
method `_eth_cmd()` is called which cannot create a valid CLI for
dumpcap.
  • Loading branch information
rumpelsepp committed Jan 2, 2024
1 parent 05e50cb commit 7b34f3b
Showing 1 changed file with 37 additions and 17 deletions.
54 changes: 37 additions & 17 deletions src/gallia/dumpcap.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,13 @@
from urllib.parse import urlparse

from gallia.log import get_logger
from gallia.transports import ISOTPTransport, RawCANTransport, TargetURI
from gallia.transports import (
ISOTPTransport,
RawCANTransport,
TargetURI,
UnixLinesTransport,
UnixTransport,
)
from gallia.utils import auto_int, split_host_port

logger = get_logger("gallia.dumpcap")
Expand Down Expand Up @@ -48,25 +54,39 @@ async def start(
artifacts_dir: Path,
) -> Dumpcap | None:
ts = int(datetime.now().timestamp())
if target.scheme in [ISOTPTransport.SCHEME, RawCANTransport.SCHEME]:
outfile = artifacts_dir.joinpath(f"candump-{ts}.pcap.gz")
src_addr = (
auto_int(target.qs["src_addr"][0]) if "src_addr" in target.qs else None
)
dst_addr = (
auto_int(target.qs["dst_addr"][0]) if "dst_addr" in target.qs else None
)
cmd = cls._can_cmd(
target.netloc,
src_addr,
dst_addr,
)
else:
outfile = artifacts_dir.joinpath(f"eth-{ts}.pcap.gz")
cmd = await cls._eth_cmd(target.netloc)

match target.scheme:
case ISOTPTransport.SCHEME | RawCANTransport.SCHEME:
outfile = artifacts_dir.joinpath(f"candump-{ts}.pcap.gz")
src_addr = (
auto_int(target.qs["src_addr"][0])
if "src_addr" in target.qs
else None
)
dst_addr = (
auto_int(target.qs["dst_addr"][0])
if "dst_addr" in target.qs
else None
)
cmd = cls._can_cmd(
target.netloc,
src_addr,
dst_addr,
)
# Unix domain sockets are not supported by dumpcap.
case UnixTransport.SCHEME | UnixLinesTransport.SCHEME:
return None
# There is currently no API for transport plugins to
# register a scheme and a corresponding invocation
# for dumpcap. So this match…case is best effort,
# since it defaults to ethernet.
case _:
outfile = artifacts_dir.joinpath(f"eth-{ts}.pcap.gz")
cmd = await cls._eth_cmd(target.netloc)

if cmd is None:
return None

cmd_str = shlex.join(cmd)
try:
proc = await asyncio.create_subprocess_exec(
Expand Down

0 comments on commit 7b34f3b

Please sign in to comment.