Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions code_review_graph/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,31 @@

# BFS engine: "sql" (SQLite recursive CTE) or "networkx" (Python-side BFS)
BFS_ENGINE = os.environ.get("CRG_BFS_ENGINE", "sql")

# ---------------------------------------------------------------------------
# Impact-radius scoring
# ---------------------------------------------------------------------------
# Change risk does not propagate equally across edge kinds: a direct call is
# a stronger coupling than an import, which is stronger than mere file
# membership. Each traversal hop multiplies the running score by the edge's
# weight and by IMPACT_DEPTH_DECAY; paths whose score falls below
# IMPACT_SCORE_FLOOR are not expanded further. Impacted nodes are ranked
# (and truncated) by their best-path score instead of arbitrary scan order.
#
# These weights model risk propagation for reviews; communities.EDGE_WEIGHTS
# models clustering affinity and intentionally differs (e.g. TESTED_BY is a
# weak clustering signal but a strong "this test is affected" signal).
IMPACT_EDGE_WEIGHTS: dict[str, float] = {
"CALLS": 1.0,
"INHERITS": 0.9,
"OVERRIDES": 0.9,
"IMPLEMENTS": 0.9,
"TESTED_BY": 0.7,
"REFERENCES": 0.6,
"DEPENDS_ON": 0.6,
"IMPORTS_FROM": 0.5,
"CONTAINS": 0.3,
}
IMPACT_DEFAULT_EDGE_WEIGHT = 0.5
IMPACT_DEPTH_DECAY = float(os.environ.get("CRG_IMPACT_DEPTH_DECAY", "0.6"))
IMPACT_SCORE_FLOOR = float(os.environ.get("CRG_IMPACT_SCORE_FLOOR", "0.05"))
170 changes: 136 additions & 34 deletions code_review_graph/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,15 @@

import networkx as nx

from .constants import BFS_ENGINE, MAX_IMPACT_DEPTH, MAX_IMPACT_NODES
from .constants import (
BFS_ENGINE,
IMPACT_DEFAULT_EDGE_WEIGHT,
IMPACT_DEPTH_DECAY,
IMPACT_EDGE_WEIGHTS,
IMPACT_SCORE_FLOOR,
MAX_IMPACT_DEPTH,
MAX_IMPACT_NODES,
)
from .migrations import get_schema_version, run_migrations
from .parser import EdgeInfo, NodeInfo

Expand Down Expand Up @@ -623,9 +631,12 @@ def get_impact_radius(

Returns dict with:
- changed_nodes: nodes in changed files
- impacted_nodes: nodes reachable via edges
- impacted_nodes: nodes reachable via edges, ordered by
impact score (best-path score, highest first)
- impacted_files: unique set of affected files
- edges: connecting edges
- impact_scores: qualified_name -> best-path score for each
returned impacted node (edge-kind weight x depth decay per hop)
"""
if BFS_ENGINE == "networkx":
return self._get_impact_radius_networkx(
Expand Down Expand Up @@ -656,6 +667,7 @@ def get_impact_radius_sql(
"edges": [],
"truncated": False,
"total_impacted": 0,
"impact_scores": {},
}

# Seed qualified names
Expand All @@ -673,6 +685,7 @@ def get_impact_radius_sql(
"edges": [],
"truncated": False,
"total_impacted": 0,
"impact_scores": {},
}

# Build recursive CTE — use a temp table for the seed set to
Expand All @@ -692,42 +705,87 @@ def get_impact_radius_sql(
batch,
)

# Edge-kind weights live in a temp table so the recursive CTE can
# join them per hop.
self._conn.execute(
"CREATE TEMP TABLE IF NOT EXISTS _impact_weights "
"(kind TEXT PRIMARY KEY, weight REAL NOT NULL)"
)
self._conn.execute("DELETE FROM _impact_weights")
self._conn.executemany(
"INSERT INTO _impact_weights (kind, weight) VALUES (?, ?)",
list(IMPACT_EDGE_WEIGHTS.items()),
)

# Each hop multiplies the running score by the edge weight and the
# depth decay; paths below the score floor stop expanding. SQLite
# cannot aggregate inside the recursive term, so a node may be
# visited once per distinct (depth, score) path; the outer GROUP BY
# keeps its best score, and ORDER BY makes LIMIT keep the
# highest-signal nodes instead of arbitrary scan order. Seeds score
# 1.0 and every hop multiplies by < 1, so seeds always sort first
# and the LIMIT still admits up to max_nodes non-seed nodes.
# Edge endpoints without a node row ("ghost" qualified names from
# unresolved targets) stay in the recursion as traversal bridges but
# are excluded from the final selection so they cannot consume LIMIT
# slots that _batch_get_nodes would silently drop.
cte_sql = """
WITH RECURSIVE impacted(node_qn, depth) AS (
SELECT qn, 0 FROM _impact_seeds
WITH RECURSIVE impacted(node_qn, depth, score) AS (
SELECT qn, 0, 1.0 FROM _impact_seeds
UNION
SELECT e.target_qualified, i.depth + 1
SELECT e.target_qualified, i.depth + 1,
i.score * COALESCE(w.weight, ?) * ?
FROM impacted i
JOIN edges e ON e.source_qualified = i.node_qn
LEFT JOIN _impact_weights w ON w.kind = e.kind
WHERE i.depth < ?
AND i.score * COALESCE(w.weight, ?) * ? > ?
UNION
SELECT e.source_qualified, i.depth + 1
SELECT e.source_qualified, i.depth + 1,
i.score * COALESCE(w.weight, ?) * ?
FROM impacted i
JOIN edges e ON e.target_qualified = i.node_qn
LEFT JOIN _impact_weights w ON w.kind = e.kind
WHERE i.depth < ?
AND i.score * COALESCE(w.weight, ?) * ? > ?
)
SELECT DISTINCT node_qn, MIN(depth) AS min_depth
SELECT node_qn, MIN(depth) AS min_depth, MAX(score) AS impact_score
FROM impacted
WHERE node_qn IN (SELECT qualified_name FROM nodes)
GROUP BY node_qn
ORDER BY impact_score DESC, node_qn
LIMIT ?
"""
hop = (
IMPACT_DEFAULT_EDGE_WEIGHT, IMPACT_DEPTH_DECAY,
max_depth,
IMPACT_DEFAULT_EDGE_WEIGHT, IMPACT_DEPTH_DECAY,
IMPACT_SCORE_FLOOR,
)
rows = self._conn.execute(
cte_sql, (max_depth, max_depth, max_nodes + len(seeds)),
cte_sql, hop + hop + (max_nodes + len(seeds),),
).fetchall()

# Split into seeds vs impacted
# Split into seeds vs impacted, keeping each node's best-path score.
score_by_qn: dict[str, float] = {}
impacted_qns: set[str] = set()
for r in rows:
qn = r[0]
score_by_qn[qn] = r[2]
if qn not in seeds:
impacted_qns.add(qn)

# Batch-fetch nodes
# Batch-fetch nodes, then restore best-score-first order.
changed_nodes = self._batch_get_nodes(seeds)
impacted_nodes = self._batch_get_nodes(impacted_qns)
impacted_nodes.sort(
key=lambda n: (-score_by_qn.get(n.qualified_name, 0.0), n.qualified_name),
)

total_impacted = len(impacted_nodes)
truncated = total_impacted > max_nodes
# The LIMIT above already capped non-seed rows at max_nodes, so a
# saturated result means more nodes may exist beyond the cutoff.
truncated = total_impacted >= max_nodes
if truncated:
impacted_nodes = impacted_nodes[:max_nodes]

Expand All @@ -745,6 +803,12 @@ def get_impact_radius_sql(
"edges": relevant_edges,
"truncated": truncated,
"total_impacted": total_impacted,
"impact_scores": {
n.qualified_name: round(
score_by_qn.get(n.qualified_name, 0.0), 4,
)
for n in impacted_nodes
},
}

# -- NetworkX BFS version (legacy) ------------------------------------
Expand All @@ -764,37 +828,51 @@ def _get_impact_radius_networkx(
for n in nodes:
seeds.add(n.qualified_name)

visited: set[str] = set()
frontier = seeds.copy()
# Weighted BFS mirroring the SQL engine: each hop multiplies the
# running score by the edge weight and the depth decay, sub-floor
# paths stop expanding, and a node re-enters the frontier whenever a
# better-scoring path reaches it (a deep CALLS chain can outscore a
# shallow CONTAINS hop, so first-visit BFS is not enough).
best: dict[str, float] = dict.fromkeys(seeds, 1.0)
frontier: dict[str, float] = dict(best)
depth = 0
impacted: set[str] = set()

while frontier and depth < max_depth:
visited.update(frontier)
next_frontier: set[str] = set()
for qn in frontier:
if qn in nxg:
for neighbor in nxg.neighbors(qn):
if neighbor not in visited:
next_frontier.add(neighbor)
impacted.add(neighbor)
if qn in nxg:
for pred in nxg.predecessors(qn):
if pred not in visited:
next_frontier.add(pred)
impacted.add(pred)
next_frontier -= visited
if len(visited) + len(next_frontier) > max_nodes:
break
next_frontier: dict[str, float] = {}
for qn, score in frontier.items():
if qn not in nxg:
continue
out_edges = nxg.out_edges(qn, data=True)
in_edges = nxg.in_edges(qn, data=True)
neighbors = [
(target, data) for _, target, data in out_edges
] + [
(source, data) for source, _, data in in_edges
]
for other_qn, data in neighbors:
weight = IMPACT_EDGE_WEIGHTS.get(
data.get("kind", ""), IMPACT_DEFAULT_EDGE_WEIGHT,
)
new_score = score * weight * IMPACT_DEPTH_DECAY
if new_score <= IMPACT_SCORE_FLOOR:
continue
if new_score > best.get(other_qn, 0.0):
best[other_qn] = new_score
next_frontier[other_qn] = new_score
frontier = next_frontier
depth += 1

changed_nodes = self._batch_get_nodes(seeds)
impacted_qns = impacted - seeds
impacted_qns = set(best) - seeds
impacted_nodes = self._batch_get_nodes(impacted_qns)
impacted_nodes.sort(
key=lambda n: (-best.get(n.qualified_name, 0.0), n.qualified_name),
)

total_impacted = len(impacted_nodes)
truncated = total_impacted > max_nodes
# Same saturation semantics as the SQL engine: a full result means
# more nodes may exist beyond the cutoff.
truncated = total_impacted >= max_nodes
if truncated:
impacted_nodes = impacted_nodes[:max_nodes]

Expand All @@ -812,6 +890,10 @@ def _get_impact_radius_networkx(
"edges": relevant_edges,
"truncated": truncated,
"total_impacted": total_impacted,
"impact_scores": {
n.qualified_name: round(best.get(n.qualified_name, 0.0), 4)
for n in impacted_nodes
},
}

def get_subgraph(self, qualified_names: list[str]) -> dict[str, Any]:
Expand Down Expand Up @@ -1282,14 +1364,34 @@ def load_flow_adjacency(self) -> "FlowAdjacency":
# --- Internal helpers ---

def _build_networkx_graph(self) -> nx.DiGraph:
"""Build (or return cached) in-memory NetworkX directed graph from all edges."""
"""Build (or return cached) in-memory NetworkX directed graph from all edges.

DiGraph keeps a single edge per (source, target) pair. When parallel
edges of different kinds exist (e.g. a file both CONTAINS and CALLS a
function), keep the kind with the highest impact weight so weighted
traversals see the same best-path scores as the SQL engine, which
reads every edge row. Consumers that ignore ``kind`` (betweenness
centrality) only depend on topology and are unaffected.
"""
with self._cache_lock:
if self._nxg_cache is not None:
return self._nxg_cache
g: nx.DiGraph = nx.DiGraph()
rows = self._conn.execute("SELECT * FROM edges").fetchall()
for r in rows:
g.add_edge(r["source_qualified"], r["target_qualified"], kind=r["kind"])
source = r["source_qualified"]
target = r["target_qualified"]
kind = r["kind"]
if g.has_edge(source, target):
existing = g[source][target].get("kind", "")
if (
IMPACT_EDGE_WEIGHTS.get(kind, IMPACT_DEFAULT_EDGE_WEIGHT)
<= IMPACT_EDGE_WEIGHTS.get(
existing, IMPACT_DEFAULT_EDGE_WEIGHT,
)
):
continue
g.add_edge(source, target, kind=kind)
self._nxg_cache = g
return g

Expand Down
10 changes: 9 additions & 1 deletion code_review_graph/tools/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,16 @@ def get_impact_radius(
abs_files, max_depth=max_depth, max_nodes=max_results
)

impact_scores = result.get("impact_scores", {})

changed_dicts = [node_to_dict(n) for n in result["changed_nodes"]]
impacted_dicts = [node_to_dict(n) for n in result["impacted_nodes"]]
impacted_dicts = []
for n in result["impacted_nodes"]:
d = node_to_dict(n)
score = impact_scores.get(n.qualified_name)
if score is not None:
d["impact_score"] = score
impacted_dicts.append(d)
edge_dicts = [edge_to_dict(e) for e in result["edges"]]
truncated = result["truncated"]
total_impacted = result["total_impacted"]
Expand Down
Loading