Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 4 additions & 5 deletions probe_py/probe_py/analysis.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import collections
from .ptypes import ProbeLog
from .ptypes import ProbeLog, HbGraph, OpQuad
from .ops import CloneOp, WaitOp
from .hb_graph import HbGraph, OpNode


def get_max_parallelism_latest(hb_graph: HbGraph, probe_log: ProbeLog) -> int:
Expand All @@ -10,14 +9,14 @@ def get_max_parallelism_latest(hb_graph: HbGraph, probe_log: ProbeLog) -> int:
counter = 1
max_counter = 1
start_node = [node for node in hb_graph.nodes() if hb_graph.in_degree(node) == 0][0]
queue = collections.deque[tuple[OpNode, OpNode | None]]([(start_node, None)]) # (current_node, parent_node)
queue = collections.deque[tuple[OpQuad, OpQuad | None]]([(start_node, None)]) # (current_node, parent_node)
while queue:
node, parent = queue.popleft()
if node in visited:
continue
if parent:
parent_op = probe_log.get_op(*parent.op_quad()).data
node_op = probe_log.get_op(*node.op_quad()).data
parent_op = probe_log.get_op(parent).data
node_op = probe_log.get_op(node).data

visited.add(node)

Expand Down
171 changes: 86 additions & 85 deletions probe_py/probe_py/dataflow_graph.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
from __future__ import annotations
import tqdm
import collections
import dataclasses
import enum
Expand All @@ -10,30 +9,14 @@
import warnings
import networkx
from . import graph_utils
from . import hb_graph
from .hb_graph_accesses import hb_graph_to_accesses, Access, AccessMode, Phase
from .hb_graph_accesses import hb_graph_to_accesses
from . import ops
from . import ptypes


_Node = typing.TypeVar("_Node")


@dataclasses.dataclass(frozen=True)
class AccessEpoch[_Node]:
"""An access epoch is a set of nodes, denoted by a segment, in which the node may be accessed."""
mode: AccessMode
bounds: graph_utils.Segment[_Node]
version: int | None = None


@dataclasses.dataclass(frozen=True)
class ExecNode:
"""An exec, denoted by Pid and ExecNo"""
pid: ptypes.Pid
exec_no: ptypes.ExecNo


@dataclasses.dataclass(frozen=True)
class InodeVersionNode:
"""A particular version of the inode"""
Expand All @@ -45,18 +28,16 @@ def __str__(self) -> str:


if typing.TYPE_CHECKING:
DataflowGraph: typing.TypeAlias = networkx.DiGraph[hb_graph.OpNode | InodeVersionNode]
CompressedDataflowGraph: typing.TypeAlias = networkx.DiGraph[hb_graph.OpNode | frozenset[InodeVersionNode]]
EpochGraph: typing.TypeAlias = networkx.DiGraph[AccessEpoch[hb_graph.OpNode]]
DataflowGraph: typing.TypeAlias = networkx.DiGraph[ptypes.OpQuint | InodeVersionNode]
CompressedDataflowGraph: typing.TypeAlias = networkx.DiGraph[ptypes.OpQuint | frozenset[InodeVersionNode]]
else:
DataflowGraph = networkx.DiGraph
CompressedDataflowGraph = networkx.DiGraph
EpochGraph = networkx.DiGraph


def accesses_to_dataflow_graph(
probe_log: ptypes.ProbeLog,
accesses_and_nodes: list[Access | hb_graph.OpNode],
accesses_and_quads: list[ptypes.Access | ptypes.OpQuad],
) -> tuple[DataflowGraph, typing.Mapping[ptypes.Inode, frozenset[pathlib.Path]]]:
"""Turn a list of accesses into a dataflow graph, by assigning a version at every access."""

Expand All @@ -65,84 +46,97 @@ class PidState(enum.IntEnum):
WRITING = enum.auto()

parent_pid = probe_log.get_parent_pid_map()
pid_to_state = collections.defaultdict[ptypes.Pid, PidState](lambda: PidState.READING)
last_op_in_process = dict[ptypes.Pid, hb_graph.OpNode]()
pid_to_state = dict[ptypes.Pid, PidState]()
last_op_in_process = dict[ptypes.Pid, ptypes.OpQuint]()
inode_to_version = collections.defaultdict[ptypes.Inode, int](lambda: 0)
inode_to_paths = collections.defaultdict[ptypes.Inode, set[pathlib.Path]](set)
dataflow_graph = DataflowGraph()

def add_node(node: hb_graph.OpNode) -> None:
pid_to_state[node.pid] = PidState.READING
if program_order_predecessor := last_op_in_process.get(node.pid):
dataflow_graph.add_edge(program_order_predecessor, node)
def add_quad(quad: ptypes.OpQuad, label: str) -> None:
pid_to_state[quad.pid] = PidState.READING
if program_order_predecessor := last_op_in_process.get(quad.pid):
quint = program_order_predecessor.deduplicate(quad)
dataflow_graph.add_edge(program_order_predecessor, quint, label=label + " (from pred)")
else:
if parent := parent_pid.get(node.pid):
dataflow_graph.add_edge(last_op_in_process[parent], node)
quint = ptypes.OpQuint.from_quad(quad)
if parent := parent_pid.get(quad.pid):
dataflow_graph.add_edge(last_op_in_process[parent], quint, label=label + " (from parent)")
else:
pass
# Found initial node of root proc
last_op_in_process[node.pid] = node
# Found initial quad of root proc
last_op_in_process[quad.pid] = quint

def ensure_state(node: hb_graph.OpNode, desired_state: PidState) -> None:
if desired_state == PidState.WRITING and pid_to_state[node.pid] == PidState.READING:
def ensure_state(quad: ptypes.OpQuad, desired_state: PidState) -> ptypes.OpQuint:
if quad.pid not in pid_to_state:
warnings.warn(ptypes.UnusualProbeLog(
f"Encountered {quad}, but there are no nodes on process {quad.pid}.",
))
add_quad(quad, "init")
if desired_state == PidState.WRITING and pid_to_state[quad.pid] == PidState.READING:
# Reading -> writing for free
pid_to_state[node.pid] = PidState.WRITING
elif desired_state == PidState.READING and pid_to_state[node.pid] == PidState.WRITING:
# Writing -> reading by starting a new node.
add_node(node)
assert pid_to_state[node.pid] == desired_state
pid_to_state[quad.pid] = PidState.WRITING
elif desired_state == PidState.READING and pid_to_state[quad.pid] == PidState.WRITING:
# Writing -> reading by starting a new quad.
add_quad(quad, "r→w")
assert pid_to_state[quad.pid] == desired_state
return last_op_in_process[quad.pid]

for access_or_node in accesses_and_nodes:
match access_or_node:
case Access():
access = access_or_node
for access_or_quad in accesses_and_quads:
match access_or_quad:
case ptypes.Access():
access = access_or_quad
version_num = inode_to_version[access.inode]
inode_to_paths[access.inode].add(access.path)
version = InodeVersionNode(access.inode, version_num)
next_version = InodeVersionNode(access.inode, version_num + 1)
ensure_state(access.op_node, PidState.READING if access.mode.is_side_effect_free() else PidState.WRITING)
if (op_node := last_op_in_process.get(access.op_node.pid)) is None:
warnings.warn(ptypes.UnusualProbeLog(f"Can't find last node from process {access.op_node.pid}"))
continue
match access.mode:
case AccessMode.WRITE:
if access.phase == Phase.BEGIN:
dataflow_graph.add_edge(op_node, next_version)
dataflow_graph.add_edge(version, next_version)
case AccessMode.TRUNCATE_WRITE:
if access.phase == Phase.END:
dataflow_graph.add_edge(op_node, next_version)
case AccessMode.READ_WRITE:
if access.phase == Phase.BEGIN:
dataflow_graph.add_edge(version, op_node)
if access.phase == Phase.END:
dataflow_graph.add_edge(op_node, next_version)
dataflow_graph.add_edge(version, next_version)
case AccessMode.READ | AccessMode.EXEC | AccessMode.DLOPEN:
if access.phase == Phase.BEGIN:
dataflow_graph.add_edge(version, op_node)
case ptypes.AccessMode.WRITE:
if access.phase == ptypes.Phase.BEGIN:
quint = ensure_state(access.op_node, PidState.WRITING)
dataflow_graph.add_edge(quint, next_version, label="mutating write")
dataflow_graph.add_edge(version, next_version, label="mutating write")
inode_to_version[access.inode] += 1
case ptypes.AccessMode.TRUNCATE_WRITE:
if access.phase == ptypes.Phase.END:
quint = ensure_state(access.op_node, PidState.WRITING)
dataflow_graph.add_edge(quint, next_version, label="truncating write")
inode_to_version[access.inode] += 1
case ptypes.AccessMode.READ_WRITE:
if access.phase == ptypes.Phase.BEGIN:
quint = ensure_state(access.op_node, PidState.READING)
dataflow_graph.add_edge(version, quint, label="read & write")
if access.phase == ptypes.Phase.END:
quint = ensure_state(access.op_node, PidState.WRITING)
dataflow_graph.add_edge(quint, next_version, label="read/write")
dataflow_graph.add_edge(version, next_version, label="read/write")
inode_to_version[access.inode] += 1
case ptypes.AccessMode.READ | ptypes.AccessMode.EXEC | ptypes.AccessMode.DLOPEN:
if access.phase == ptypes.Phase.BEGIN:
quint = ensure_state(access.op_node, PidState.READING)
dataflow_graph.add_edge(version, quint, label="read")
case _:
raise TypeError()
case hb_graph.OpNode():
node = access_or_node
op_data = probe_log.get_op(*node.op_quad()).data
case ptypes.OpQuad():
quad = access_or_quad
op_data = probe_log.get_op(quad).data
match op_data:
# us -> our child
# Therefore, we have to be in writing mode
case ops.CloneOp():
if op_data.task_type == ptypes.TaskType.TASK_PID and not (op_data.flags & os.CLONE_THREAD):
ensure_state(node, PidState.WRITING)
ensure_state(quad, PidState.WRITING)
case ops.SpawnOp():
ensure_state(node, PidState.WRITING)
ensure_state(quad, PidState.WRITING)
case ops.InitExecEpochOp():
add_node(node)
add_quad(quad, "init")

inode_to_paths2 = {inode: frozenset(paths) for inode, paths in inode_to_paths.items()}
return dataflow_graph, inode_to_paths2


def hb_graph_to_dataflow_graph2(
probe_log: ptypes.ProbeLog,
hbg: hb_graph.HbGraph,
hbg: ptypes.HbGraph,
check: bool = False,
) -> tuple[DataflowGraph, typing.Mapping[ptypes.Inode, frozenset[pathlib.Path]]]:
accesses = list(hb_graph_to_accesses(probe_log, hbg))
Expand All @@ -160,8 +154,8 @@ def combine_indistinguishable_inodes(
else:
warnings.warn(ptypes.UnusualProbeLog("Dataflow graph is cyclic"))
def same_neighbors(
node0: hb_graph.OpNode | InodeVersionNode,
node1: hb_graph.OpNode | InodeVersionNode,
node0: ptypes.OpQuad | InodeVersionNode,
node1: ptypes.OpQuad | InodeVersionNode,
) -> bool:
return (
isinstance(node0, InodeVersionNode)
Expand All @@ -172,10 +166,10 @@ def same_neighbors(
and
frozenset(dataflow_graph.successors(node0)) == frozenset(dataflow_graph.successors(node1))
)
def node_mapper(node_set: frozenset[hb_graph.OpNode | InodeVersionNode]) -> hb_graph.OpNode | frozenset[InodeVersionNode]:
def node_mapper(node_set: frozenset[ptypes.OpQuint | InodeVersionNode]) -> ptypes.OpQuint | frozenset[InodeVersionNode]:
first_node = next(iter(node_set))
if isinstance(first_node, hb_graph.OpNode):
assert all(isinstance(node, hb_graph.OpNode) for node in node_set)
if isinstance(first_node, ptypes.OpQuint):
assert all(isinstance(node, ptypes.OpQuint) for node in node_set)
return first_node
else:
assert all(isinstance(node, InodeVersionNode) for node in node_set)
Expand Down Expand Up @@ -241,16 +235,21 @@ def label_nodes(
) -> None:
count = dict[tuple[ptypes.Pid, ptypes.ExecNo], int]()
root_pid = probe_log.get_root_pid()
for node in tqdm.tqdm(
networkx.topological_sort(dataflow_graph),
total=len(dataflow_graph),
desc="Labelling DFG nodes",
):
if networkx.is_directed_acyclic_graph(dataflow_graph):
nodes = list(networkx.topological_sort(dataflow_graph))
cycle = []
else:
nodes = list(dataflow_graph.nodes())
cycle = list(networkx.find_cycle(dataflow_graph))
warnings.warn(ptypes.UnusualProbeLog(
"Dataflow graph contains a cycle (marked in red).",
))
for node in nodes:
data = dataflow_graph.nodes(data=True)[node]
match node:
case hb_graph.OpNode():
case ptypes.OpQuad():
data["shape"] = "oval"
op = probe_log.get_op(node.pid, node.exec_no, node.pid.main_thread(), 0)
op = probe_log.get_op(node)
if node.op_no == 0:
count[(node.pid, node.exec_no)] = 1
if node.exec_no != 0:
Expand All @@ -273,7 +272,7 @@ def label_nodes(
data["label"] = ""
if (node.pid, node.exec_no) not in count:
warnings.warn(ptypes.UnusualProbeLog(
f"{node.pid, node.exec_no} never counted before"
f"{node.pid, node.exec_no} never counted before",
))
count[(node.pid, node.exec_no)] = 99
count[(node.pid, node.exec_no)] += 1
Expand All @@ -291,7 +290,7 @@ def shorten_path(input: pathlib.Path) -> str:
inode_labels = []
for inode_version in inode_versions[:max_inodes_per_set]:
inode_label = []
inode_label.append(f"{inode_version.inode.number} v{inode_version.version}")
inode_label.append(f"{inode_version.inode} v{inode_version.version}")
paths = inodes_to_path.get(inode_version.inode, frozenset[pathlib.Path]())
for path in sorted(paths, key=lambda path: len(str(path)))[:max_paths_per_inode]:
inode_label.append(shorten_path(path))
Expand All @@ -301,3 +300,5 @@ def shorten_path(input: pathlib.Path) -> str:
data["label"] = "\n".join(inode_labels)
data["shape"] = "rectangle"
data["id"] = str(hash(node))
for a, b in cycle:
dataflow_graph.edges[a, b]["color"] = "red" # type: ignore
5 changes: 4 additions & 1 deletion probe_py/probe_py/file_closure.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import warnings
import pathlib
import typing
from . import ptypes
from .ptypes import ProbeLog, initial_exec_no, InodeVersion, Pid
from .ops import Path, ChdirOp, OpenOp, CloseOp, InitExecEpochOp, ExecOp, Op
from .consts import AT_FDCWD
Expand Down Expand Up @@ -230,7 +231,9 @@ def copy_file_closure(
console.print(f"Skipping {resolved_path}")
elif resolved_path.exists():
if ino_ver is not None and InodeVersion.from_local_path(resolved_path) != ino_ver:
warnings.warn(f"{resolved_path} changed in between the time of `probe record` and now.")
warnings.warn(ptypes.UnusualProbeLog(
f"{resolved_path} changed in between the time of `probe record` and now.",
))
if resolved_path.is_dir():
destination_path.mkdir(exist_ok=True, parents=True)
elif copy:
Expand Down
Loading