Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion rustchain_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,17 @@ def db_exports(options: ExportOptions) -> dict[str, list[dict[str, Any]]]:
}


def _sanitize_csv_cell(value: Any) -> Any:
"""Neutralize spreadsheet formula injection in CSV output cells.

Values beginning with =, +, -, @, tab, carriage return, or newline are
prefixed with a single quote so spreadsheet applications treat them as text.
"""
if isinstance(value, str) and value and value[0] in "=+-\t\r\n@":
return "'" + value
return value


def write_csv(path: Path, rows: list[dict[str, Any]], default_headers: list[str] | None = None) -> None:
if rows:
fieldnames = sorted({key for row in rows for key in row.keys()})
Expand All @@ -306,7 +317,9 @@ def write_csv(path: Path, rows: list[dict[str, Any]], default_headers: list[str]
writer = csv.DictWriter(handle, fieldnames=fieldnames)
if fieldnames:
writer.writeheader()
writer.writerows(rows)
writer.writerows(
{k: _sanitize_csv_cell(v) for k, v in row.items()} for row in rows
)


def write_json(path: Path, rows: list[dict[str, Any]]) -> None:
Expand Down
41 changes: 41 additions & 0 deletions tests/test_rustchain_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,47 @@ def test_balance_amount_normalizes_micro_columns_by_source(self):
self.assertEqual(exporter.balance_amount_rtc({"balance_urtc": 999_999}), 0.999999)
self.assertEqual(exporter.balance_amount_rtc({"balance_rtc": 0.5}), 0.5)

def test_csv_sanitize_neutralizes_formula_injection(self):
dangerous = [
"=SUM(A1:A10)",
"+1+2",
"-1+2",
"@SUM(A1)",
"\t=cmd",
"\r=cmd",
"\n=cmd",
"normal",
"",
"123",
]
sanitized = [exporter._sanitize_csv_cell(v) for v in dangerous]
self.assertEqual(sanitized[0], "'=SUM(A1:A10)")
self.assertEqual(sanitized[1], "'+1+2")
self.assertEqual(sanitized[2], "'-1+2")
self.assertEqual(sanitized[3], "'@SUM(A1)")
self.assertEqual(sanitized[4], "'\t=cmd")
self.assertEqual(sanitized[5], "'\r=cmd")
self.assertEqual(sanitized[6], "'\n=cmd")
self.assertEqual(sanitized[7], "normal")
self.assertEqual(sanitized[8], "")
self.assertEqual(sanitized[9], "123")

def test_csv_write_sanitizes_malicious_miner_id(self):
with tempfile.TemporaryDirectory() as tmp:
path = Path(tmp) / "malicious.csv"
rows = [
{"miner_id": "=cmd|'/c calc'!A0", "device_arch": "x86"},
{"miner_id": "safeRTC", "device_arch": "G4"},
]
exporter.write_csv(path, rows)
with path.open(newline="", encoding="utf-8") as handle:
reader = csv.DictReader(handle)
result = list(reader)
# CSV reader preserves the leading single-quote sanitizer prefix
# when the value itself contains a quote character
self.assertEqual(result[0]["miner_id"], "'=cmd|'/c calc'!A0")
self.assertEqual(result[1]["miner_id"], "safeRTC")


if __name__ == "__main__":
unittest.main()
Loading