Skip to content

Fix corrupted data before processing + auto recovery#861

Open
aliel wants to merge 15 commits intomainfrom
aliel-fix-corrupted-data
Open

Fix corrupted data before processing + auto recovery#861
aliel wants to merge 15 commits intomainfrom
aliel-fix-corrupted-data

Conversation

@aliel
Copy link
Copy Markdown
Member

@aliel aliel commented Oct 7, 2025

Prevent storage corruption from crashes and detect corrupted stored content

Self proofreading checklist

  • Is my code clear enough and well documented
  • Are my files well typed
  • Are there enough tests

@aliel aliel requested review from nesitor and odesenfans October 7, 2025 11:15
@aliel aliel force-pushed the aliel-fix-corrupted-data branch from 217a671 to 9a253bc Compare October 7, 2025 11:38
Comment thread src/aleph/storage.py Outdated
@aliel aliel force-pushed the aliel-fix-corrupted-data branch from f282a81 to c373113 Compare October 8, 2025 08:59
@aliel aliel requested a review from odesenfans October 14, 2025 07:10
Copy link
Copy Markdown
Collaborator

@odesenfans odesenfans left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One minor comment

Comment thread src/aleph/storage.py Outdated
Copy link
Copy Markdown

@foxpatch-aleph foxpatch-aleph left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The two changes are sound in isolation: atomic-write-via-rename is the correct POSIX approach for preventing torn files, and consolidating the duplicate except clauses plus adding a delete-and-retry for corrupted cached content is a reasonable recovery strategy. However, the new retry path in get_message_content skips the check_for_u0000 guard that is applied to the first fetch, and the PR ships with no tests for the new retry behavior (acknowledged in the checklist). These are worth addressing before merge.

src/aleph/storage.py (line 109): check_for_u0000 (line 83) is called on the original item_content before the JSON parse, but after the retry it is never called on the freshly fetched content. Even though _verify_content_hash ensures the bytes match the declared hash (making a null-byte injection from the network very unlikely), the validation is part of the contract for get_message_content and should be applied consistently. Add check_for_u0000(item_content) before the inner aleph_json.loads call, or refactor the retry into a small helper that runs the full validation pipeline.

src/aleph/storage.py (line 100): use_network=True, use_ipfs=True are already the defaults of get_hash_content, so spelling them out here is redundant. Either keep them for explicitness (add a brief comment like # explicitly bypass cache) or drop them to stay DRY. As-is it reads as if the initial call might not use the network, which is misleading.

src/aleph/services/storage/fileystem_engine.py (line 26): Two concurrent calls to write(filename, …) share the same {filename}.tmp path and will race: the second write_bytes can overwrite the first's temp file before either replace runs, silently dropping one write. In practice this is benign for content-addressed storage (same hash ⟹ same bytes), but it is worth a brief comment explaining why the collision is safe, or use a unique temp name (e.g., f"{filename}.{os.getpid()}.tmp") to be robust regardless of the storage semantics.

src/aleph/services/storage/fileystem_engine.py (line 26): Orphaned .tmp files accumulate on repeated crashes before replace. They are harmless to reads (the read method checks for the exact filename) and are overwritten on the next successful write, but a startup cleanup pass for stale *.tmp files would prevent disk leakage on very crash-prone nodes.

tests/storage/test_get_content.py (line 257): There are no tests for the new retry logic in get_message_content. At minimum, please add: (1) a test where MockStorageEngine returns corrupted bytes and a mocked network returns valid JSON — asserting that the corrupted entry is deleted and the parsed content is returned; (2) a test where both the cache and the network return corrupted bytes — asserting InvalidContent is raised; (3) a test for FileSystemStorageEngine.write verifying that no partial file is left when write_bytes throws. The checklist already flags this, but given that the retry path is the core contribution of the PR, it really should be covered.

Copy link
Copy Markdown

@foxpatch-aleph foxpatch-aleph left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both changes are well-reasoned and address real problems. The temp-file-then-rename pattern in the filesystem engine is a standard approach for crash-safe writes, and the retry-on-corruption logic in get_message_content is conceptually correct. There are two meaningful gaps: check_for_u0000 is not applied to the retried content, and there are no tests covering the new auto-recovery path. A minor design concern exists around leftover .tmp files after a crash, and the retry call passes redundant default arguments.

src/aleph/storage.py (line 106): After fetching fresh content from the network (hash_content.value), check_for_u0000(item_content) is never called before the retry parse at line 110. The first call at line 83 guards against null-byte content from local storage, but the retried content from the network is exempt. A message with \u0000 fetched from a peer would silently bypass this validation. Consider calling check_for_u0000(item_content) here before the inner try.

src/aleph/storage.py (line 100): use_network=True and use_ipfs=True are already the default values for get_hash_content (see signature at line 212–213). Passing them explicitly here is harmless but misleading — it suggests the first call (line 70–72) does not use the network, when in fact it does. The real reason the retry works is that the corrupted file was just deleted, so storage_engine.read returns None and execution falls through to the network path. A comment explaining this would make the intent clearer.

src/aleph/services/storage/fileystem_engine.py (line 26): If the process crashes between write_bytes (line 30) and replace (line 34), a {filename}.tmp file is left on disk. There is currently no startup cleanup for these orphaned files. On a busy node that crashes frequently, they will accumulate. This doesn't cause correctness issues (the read method ignores .tmp files), but it is worth noting in a comment or addressing in a startup routine. The existing comment at line 33 describes this case but does not mention the lack of cleanup.

src/aleph/storage.py (line 91): The new auto-recovery path (corrupted cached content → delete → retry from network) has no test coverage. The existing test_get_content.py has a MockStorageEngine and tests for get_message_content, making it straightforward to add a test: seed the mock storage with malformed bytes for a known hash, mock get_hash_content to return valid content on the second call, and assert that the corrupted file is deleted and the valid content is returned. The PR checklist marks tests as [-], so this is a known gap.

Copy link
Copy Markdown

@foxpatch-aleph foxpatch-aleph left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR correctly addresses two distinct problems: atomic writes via a temp-file+rename pattern for the filesystem engine, and auto-recovery from corrupted local cache by deleting and re-fetching from the network. Both changes are logically sound. The atomic write is correctly implemented for POSIX, and the retry path properly goes through get_hash_content's _verify_content_hash before re-caching, preventing re-storage of bad data. A few minor issues are worth noting: the check_for_u0000 validation is skipped on the retried content, the item_type guard in the retry condition is redundant given the control flow, orphaned .tmp files from crashes have no cleanup path, and there is no test coverage for either new behavior.

src/aleph/storage.py (line 91): The item_type in (ItemType.ipfs, ItemType.storage) guard is redundant. The only way source == ContentSource.DB can be true is when the code took the item_type in (ipfs, storage) branch at line 69 (the inline path sets source = ContentSource.INLINE). Consider removing it to reduce noise, or add a comment explaining the defensive intent.

src/aleph/storage.py (line 106): check_for_u0000(item_content) is called on the original fetch (line 83) but not on the retried content before the second aleph_json.loads (line 110). In practice a \u0000 byte would fail JSON parsing anyway, but the error message and exception path would be different from the original. Consider calling check_for_u0000 on the retried item_content for consistency.

src/aleph/services/storage/fileystem_engine.py (line 26): Concurrent writes to the same filename will both target filename.tmp, and while this is safe in a single-threaded asyncio context (no await between write_bytes and replace), it's fragile. If the engine is ever used from a thread pool (run_in_executor), there will be a race. Consider using a unique temp name per call (e.g. f"{filename}.{os.getpid()}.tmp" or a uuid4 suffix) to make this robust regardless of execution context.

src/aleph/services/storage/fileystem_engine.py (line 26): Orphaned *.tmp files left behind by a crash (written but never renamed) are invisible to read() and exists() but accumulate on disk indefinitely. Consider adding a startup cleanup pass or at least documenting that operators may need to prune *.tmp files manually.

src/aleph/storage.py (line 87): No tests cover the corrupted-cache recovery path. A unit test that mocks storage_engine.read to return invalid JSON, verifies storage_engine.delete is called, and confirms the retry is attempted (and either succeeds or raises InvalidContent) would be valuable. Similarly, FileSystemStorageEngine.write has no test for the atomic-write behavior (e.g. verifying no partial file is visible if an exception occurs mid-write).

foxpatch-aleph
foxpatch-aleph previously approved these changes Apr 21, 2026
Copy link
Copy Markdown

@foxpatch-aleph foxpatch-aleph left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR correctly implements atomic durable writes using the temp-file + fsync + rename pattern, and adds SHA-256 verification for cached storage content with automatic recovery. The implementation handles edge cases properly (corruption detection, retry loops prevented via _from_corruption_recovery flag, IPFS cache skip for performance). Tests comprehensively cover the new functionality including atomic write invariants, corruption detection, and recovery paths. The code is well-documented with clear docstrings explaining the POSIX durability guarantees.

src/aleph/services/storage/fileystem_engine.py (line 67): Minor nit: The comment says "O_DIRECTORY is POSIX-only" but the actual exception handling catches both AttributeError (when os.O_DIRECTORY doesn't exist on Windows) and OSError. Consider updating the comment to clarify both cases are handled.

tests/storage/test_get_content.py (line 32): Minor nit: The MockStorageEngine.delete() raises KeyError if the file doesn't exist, while FileSystemStorageEngine.delete() uses missing_ok=True. For better test realism, consider using self.files.pop(filename, None) instead.

tests/storage/test_get_content.py (line 379): The test test_get_message_content_retry_does_not_loop verifies the flag is passed but doesn't explicitly verify that SHA-256 verification is skipped on retry. Consider adding an assertion that _verify_content_hash is not called during the recovery path.

foxpatch-aleph
foxpatch-aleph previously approved these changes Apr 22, 2026
Copy link
Copy Markdown

@foxpatch-aleph foxpatch-aleph left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a well-designed PR that adds crash-safe atomic durable writes and SHA-256 integrity verification with automatic recovery from corrupted cache. The durable write implementation correctly uses temp-file-then-fsync-then-rename ordering with proper exception handling and cleanup. The SHA-256 verification is correctly scoped to storage-only items from local cache (IPFS is intentionally skipped due to daemon round-trip cost). The JSON parse recovery path is well-guarded with a single-retry guarantee. Tests are comprehensive and correctly validate the self-healing behavior, no-loop guarantees, and the distinction between storage and IPFS handling. The code is well-documented with clear docstrings explaining the atomic write protocol and design rationale.

src/aleph/services/storage/fileystem_engine.py (line 30): Minor: os.replace is synchronous and very fast (just a syscall). Wrapping the entire _write_durably in asyncio.to_thread means the thread sits idle during the rename. This is fine (keeps the implementation simple and the whole method in one thread for consistency), but worth noting that only the I/O-heavy parts (open/write/fsync) truly need the thread.

src/aleph/services/storage/fileystem_engine.py (line 68): Consider using os.O_RDONLY in the os.open call for the directory fd (line 68). Currently it uses only os.O_DIRECTORY which is fine, but adding O_RDONLY makes the intent explicit that we only need to fsync the directory, not write to it. This is a very minor nit.

src/aleph/storage.py (line 116): The condition source == ContentSource.DB and item_type in (ItemType.ipfs, ItemType.storage) correctly limits JSON-parse recovery to cached content only, preventing unnecessary network calls for inline or already-network-fetched content. Good guard.

tests/storage/test_get_content.py (line 330): Good test that the entire path raises InvalidContent when both cache and network are corrupt. The assertion on _verify_content_hash call count (2) confirms no retry loop occurs. One suggestion: consider adding an assertion that the storage engine's files dict was updated (corrupted cache was deleted), though this is implied by the test structure.

tests/storage/test_get_content.py (line 386): The spy on _verify_content_hash with expected call count of 2 is an excellent way to verify no-loop behavior. The traced get_hash_content with call_log is also well-designed. This test effectively validates the control flow.

src/aleph/services/storage/fileystem_engine.py (line 1): Note: the filename fileystem_engine.py contains a typo ('fileystem' instead of 'filesystem'). This exists in the original codebase, so not introducing a breaking rename here is the right call. Worth a follow-up refactor.

foxpatch-aleph
foxpatch-aleph previously approved these changes Apr 22, 2026
Copy link
Copy Markdown

@foxpatch-aleph foxpatch-aleph left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The PR correctly implements durable atomic writes via temp-file + fsync + POSIX rename, and adds a two-tier corruption recovery path (SHA-256 verification for storage items, JSON-parse recovery for both storage and IPFS). The logic is sound, the recovery paths are properly guarded against infinite loops (source changes from DB to P2P/IPFS after retry), and the test coverage is thorough — covering happy paths, mismatch recovery, double-failure scenarios, and call-order invariants.

src/aleph/services/storage/fileystem_engine.py (line 78): The outer except Exception also catches exceptions from the directory-fsync section (e.g. os.close(dir_fd) in its finally). At that point the rename has already succeeded, so temp_path.unlink(missing_ok=True) is a no-op — this is correct, but the comment in the docstring could mention that the cleanup block is best-effort for all exceptions, not just write-phase ones.

src/aleph/storage.py (line 108): check_for_u0000 runs before the JSON-parse recovery block. Content containing \u0000 will raise InvalidContent without the recovery path. This is likely intentional (it's a schema violation, not corruption), but a one-line comment explaining why would help future readers.

src/aleph/storage.py (line 116): The JSON-parse recovery condition checks source == ContentSource.DB and item_type in (ItemType.ipfs, ItemType.storage). After the retry, source is updated to P2P/IPFS (line 131), so the recovery path won't re-trigger — this correctly prevents infinite loops. Well done.

tests/services/test_fileystem_engine.py (line 56): The test_write_fsyncs_before_rename test correctly verifies file-fd fsync precedes os.replace. Note that call_order will also contain the directory-fsync entry (after replace), but the test only asserts file-fsync precedes replace, which is the right invariant.

tests/storage/test_get_content.py (line 386): The test_get_message_content_json_retry_does_not_loop test is well-constructed: it uses a spy on _verify_content_hash and a traced get_hash_content to confirm exactly 2 verification calls (SHA-256 block + network fetch) and exactly 2 get_hash_content calls. This is an excellent regression test.

foxpatch-aleph
foxpatch-aleph previously approved these changes Apr 22, 2026
Copy link
Copy Markdown

@foxpatch-aleph foxpatch-aleph left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The PR correctly implements durable atomic writes for the filesystem storage engine and adds SHA-256 integrity verification with automatic JSON corruption recovery for cached storage items. The logic is sound: SHA-256 self-heal runs first on DB-sourced storage content, and JSON-parse recovery is a separate path that won't loop because the source changes to P2P after any refetch. Tests are comprehensive and cover the happy path, corruption scenarios, and the no-loop guarantee. The durable write implementation follows a well-documented, correct pattern (write to temp, fsync, atomic rename, directory fsync).

src/aleph/services/storage/fileystem_engine.py (line 68): Minor: os.O_DIRECTORY is accessed directly in the expression os.open(str(file_path.parent), os.O_DIRECTORY). On CPython, referencing an undefined constant on the os module raises AttributeError, which is correctly caught. However, some Python implementations may raise NameError instead. Consider catching (AttributeError, NameError, OSError) for maximum portability, though this is unlikely to matter in practice.

src/aleph/storage.py (line 136): The return type tuple[Any, RawContent] for _recover_cached_content uses Any for the parsed JSON content. Consider a more specific union type like dict | list | str | int | float | bool | None for better type safety, though Any is acceptable given how aleph_json.loads returns arbitrary JSON structures.

src/aleph/storage.py (line 108): Note: check_for_u0000 runs once after the SHA-256 block (which may have refetched content), but NOT after the JSON recovery block. This is correct in practice because the recovered content passes the same SHA-256 check as the refetched content in the SHA-256 block, so if check_for_u0000 passed, the recovered content will too. Worth a brief comment if this invariant is important.

tests/storage/test_get_content.py (line 406): The test test_get_message_content_json_retry_does_not_loop uses a spy on _verify_content_hash and asserts call_count == 2. This is a good regression test. One minor observation: the test depends on the internal flow that _fetch_content_from_network calls _verify_content_hash. If that internal call pattern ever changes, the assertion would break. Consider adding a comment explaining this dependency.

Copy link
Copy Markdown

@foxpatch-aleph foxpatch-aleph left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The PR correctly implements crash-resistant durable writes and two corruption recovery paths (SHA-256 mismatch for storage items, JSON-parse recovery for IPFS/storage). The logic is sound: neither path can loop because the source transitions from DB to P2P after recovery, gating the SHA-256 block. Tests are thorough and correctly trace call counts. The only notable nit is a minor over-engineering in the write loop, but it is not a bug.

src/aleph/services/storage/fileystem_engine.py (line 54): The memoryview slicing loop (lines 54-57) is unnecessary for typical content sizes. os.write(fd, content) will write the entire bytes object in a single call for files of this scale. If you wanted to handle partial writes for very large content, the loop should use view[written:] consistently — but since content is already a bytes object, os.write(fd, content) is simpler and sufficient. Consider simplifying to: os.write(fd, content); os.fsync(fd).

src/aleph/storage.py (line 86): The LOGGER.warning at line 86 duplicates the message logged by _recover_cached_content at line 151 when the subsequent JSON-parse recovery path is taken. This is not a bug — the SHA-256 block and JSON block are mutually exclusive in practice — but the dual warning could confuse log readers. Consider consolidating or making the messages distinct.

src/aleph/storage.py (line 115): Good design: the JSON-parse recovery is gated on source == ContentSource.DB, ensuring we only attempt self-heal for corrupted local cache, not for freshly fetched network content (which is already verified in _fetch_content_from_network). This prevents unnecessary network calls.

tests/storage/test_get_content.py (line 406): The spy on _verify_content_hash (line 406) and the call_log on get_hash_content (line 409) together provide excellent coverage of the no-looping invariant. The expected verify_spy.call_count == 2 is correct: one from the initial SHA-256 block (passes) and one from _fetch_content_from_network during recovery (passes). Well-written test.

tests/services/test_fileystem_engine.py (line 71): The test_write_fsyncs_before_rename test correctly verifies that file-fsync occurs before the atomic rename. However, it does not distinguish between the file-fsync and the directory-fsync in the call order — both append 'fsync' to the same list. If you want stricter verification, you could track which fd type each fsync call targets (regular file vs directory), but this is optional.

foxpatch-aleph
foxpatch-aleph previously approved these changes Apr 23, 2026
Copy link
Copy Markdown

@foxpatch-aleph foxpatch-aleph left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR adds durable atomic writes to the filesystem storage engine and SHA-256 integrity verification with auto-recovery for cached storage content. The changes are well-structured, well-tested, and handle edge cases appropriately. The atomic write pattern (write to temp, fsync, rename) is correct. The recovery logic is properly scoped to DB-sourced storage items and avoids infinite loops. The tests are comprehensive, covering happy paths, failure modes, and regression guards.

src/aleph/services/storage/fileystem_engine.py (line 48): Consider that filenames containing dots will produce temp files like 'foo.bar.tmp'. This is fine — the temp suffix is appended after the full filename. Just noting that if a filename itself ends in '.tmp', the resulting temp file would be 'foo.tmp.tmp'. This is unlikely to be a practical issue but worth being aware of.

src/aleph/services/storage/fileystem_engine.py (line 78): The partial-write loop (while written < len(view)) is correct for handling EAGAIN/EINTR, but on Unix os.write on a regular file will typically write the entire buffer. This is still good defensive coding.

src/aleph/services/storage/fileystem_engine.py (line 83): Using os.replace (not os.rename) is the right choice here — it atomically overwrites the destination on both POSIX and Windows, unlike os.rename which fails on Windows if the destination exists.

src/aleph/services/storage/fileystem_engine.py (line 99): The exception handler cleans up the temp file but does NOT close dir_fd if an exception occurs between lines 88-97 and the directory fsync hasn't completed yet. In practice this is very unlikely (the directory fsync only runs after a successful rename), but if os.replace itself raised, dir_fd would already be closed by the finally block at line 97. This is fine — the only risk is if an exception in the directory-fsync section (lines 92-97) leaves dir_fd open, but the finally at line 97 always closes it. No bug here.

src/aleph/storage.py (line 85): The SHA-256 verification is correctly gated on both item_type == ItemType.storage and source == ContentSource.DB. This avoids the expensive IPFS hash computation on the hot path while still protecting storage items from cache corruption.

src/aleph/storage.py (line 113): The comment on line 113 is helpful. The placement of check_for_u0000 before JSON parsing is correct — it's a schema violation that should fail regardless of recovery attempts.

src/aleph/storage.py (line 121): The JSON retry block correctly limits recovery to ContentSource.DB items and only for ipfs/storage types. The retry is single-shot (one retry, then raise), preventing infinite loops.

src/aleph/storage.py (line 149): The _recover_cached_content method is well-named and does exactly one thing: delete the corrupt cache and refetch. Its simplicity is a strength.

tests/services/test_fileystem_engine.py (line 63): The tracking_fsync spy in test_write_fsyncs_before_rename doesn't distinguish between file-fd fsync and directory-fd fsync. It correctly asserts that at least one fsync occurs before replace, but doesn't verify which fsync is which. This is sufficient for the test's purpose.

tests/storage/test_get_content.py (line 343): The test test_get_message_content_sha256_mismatch_network_also_bad_raises is important — it verifies the system doesn't loop when both cache and network return bad data. However, this test relies on _verify_content_hash being called during _fetch_content_from_network. If that verification were ever removed, this test would hang or raise a different error. Consider adding a comment documenting this dependency.

Copy link
Copy Markdown

@foxpatch-aleph foxpatch-aleph left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've already provided a thorough review of PR #861. The changes look solid:

  • Durable atomic writes in fileystem_engine.py correctly implement the temp-file → fsync → rename pattern with proper cleanup on failure
  • SHA-256 verification + recovery in storage.py is properly bounded with no risk of infinite loops
  • Tests are comprehensive and well-structured

The only minor nit is the pre-existing typo fileystem_engine.py (should be filesystem_engine.py), but that's not something to fix in this PR.

Verdict: APPROVE — The code is correct, well-tested, and the recovery logic is sound.

Copy link
Copy Markdown

@foxpatch-aleph foxpatch-aleph left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The PR implements a solid approach to preventing storage corruption: durable atomic writes with fsync in the filesystem engine, SHA-256 verification for cached storage items, and a two-tier recovery system (SHA-256 mismatch for storage items, JSON-parse failure for both IPFS and storage). The control flow is correct — no infinite loops, proper source tracking prevents re-triggering recovery blocks, and network-fetched content is verified before being stored. The tests are well-written and cover the key invariants. There are two minor test coverage gaps and one comment typo.

tests/storage/test_get_content.py (line 369): Missing test for IPFS JSON-parse recovery triggering a network refetch. The test_get_message_content_ipfs_cache_no_sha256 test only covers the happy path where cached IPFS content is valid JSON. Consider adding a test where cached IPFS content is invalid JSON, verifying that the recovery path refetches from the network and returns the correct content.

src/aleph/storage.py (line 113): Typo: 'it is a schema violation' should be "it's a schema violation".

tests/storage/test_get_content.py (line 404): Consider adding a test for inline content with invalid item_content to confirm it raises InvalidContent without triggering any recovery path. This serves as a regression guard since inline content should never go through the DB recovery branches.

@aliel aliel requested review from odesenfans and removed request for nesitor April 23, 2026 14:54
foxpatch-aleph
foxpatch-aleph previously approved these changes Apr 23, 2026
Copy link
Copy Markdown

@foxpatch-aleph foxpatch-aleph left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The PR correctly implements durable atomic writes and SHA-256 integrity verification with JSON-parse recovery. The logic is sound, there are no infinite loops in recovery paths, and the tests comprehensively cover the new behavior. A few minor comments below.

src/aleph/services/storage/fileystem_engine.py (line 97): Minor: if os.close(dir_fd) raises after the rename has already happened, the outer except block tries to unlink temp_path which no longer exists. This is handled by missing_ok=True, but a short comment at line 100 noting that temp_path may already be gone would make this explicit.

src/aleph/storage.py (line 85): Nit: the variable name 'source' is shadowed at lines 103-104 inside the SHA-256 mismatch block. This works correctly (the updated source is used by the JSON-parse block), but using a different local variable name like 'recovery_source' for the refetched content would make the flow easier to follow.

src/aleph/storage.py (line 113): The one-line comment on line 113 uses a backslash comment style. Consider moving it to a regular line above the call for consistency with the rest of the file's comment style.

src/aleph/storage.py (line 121): The JSON-parse recovery correctly checks source == ContentSource.DB so it won't re-try after a SHA-256 refetch (which updates source to P2P). This is correct behavior — the network-fetched content was already verified, so a second recovery attempt would be futile. No changes needed, just confirming the logic is sound.

foxpatch-aleph
foxpatch-aleph previously approved these changes Apr 23, 2026
Copy link
Copy Markdown

@foxpatch-aleph foxpatch-aleph left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well-implemented crash-safe writes and corruption recovery. The durable write pattern in fileystem_engine.py follows best practices, and the two-layer recovery in storage.py (SHA-256 for storage items, JSON-parse fallback for IPFS+storage) is sound with no infinite-loop risk. Test coverage is thorough and correctly validates edge cases including bad network responses. No bugs or security issues found.

src/aleph/services/storage/fileystem_engine.py (line 50): Consider adding a brief comment noting why asyncio.to_thread is used here — it makes the blocking os.open/fsync/os.replace path explicit to future readers, since these are synchronous syscalls running in a method named 'write'.

src/aleph/storage.py (line 96): The storage_engine.delete call on line 96 duplicates the deletion logic from _recover_cached_content (line 161). Consider extracting the SHA-256 mismatch path to also use _recover_cached_content for DRY, though the current approach is fine since the SHA-256 path has an extra LOGGER.warning call.

tests/storage/test_get_content.py (line 421): The comment on line 421 says 'sha256 block + _fetch_content_from_network' but the scenario is actually: SHA-256 passes, JSON parse fails, network returns same bad bytes, network verification fails. The call count assertion (2) is correct, but the comment could be slightly more precise about where the second _verify_content_hash call originates.

foxpatch-aleph
foxpatch-aleph previously approved these changes Apr 27, 2026
Copy link
Copy Markdown

@foxpatch-aleph foxpatch-aleph left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR correctly addresses storage corruption with durable atomic writes and SHA-256 integrity verification with auto-recovery. The code is well-structured, the exception handling is sound, and there's no risk of infinite loops in the recovery paths. Tests are comprehensive and cover the key scenarios. Two minor nits: redundant str() calls around Path objects in os.replace, and a potentially confusing comment in one test's docstring.

src/aleph/services/storage/fileystem_engine.py (line 84): os.replace accepts PathLike objects directly since Python 3.6. The str() conversions are unnecessary — consider os.replace(temp_path, file_path) for cleaner code.

tests/storage/test_get_content.py (line 424): The docstring comment '(passes, same bytes)' may confuse future readers into thinking the assertion is wrong. Consider rephrasing to '(bytes match their own hash)' for clarity.

foxpatch-aleph
foxpatch-aleph previously approved these changes Apr 27, 2026
Copy link
Copy Markdown

@foxpatch-aleph foxpatch-aleph left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR correctly adds durable atomic writes to the filesystem storage engine and implements a two-tier corruption recovery system (SHA-256 verification for storage items + JSON-parse recovery for cached IPFS/storage content). The implementation is correct, well-documented, and the tests provide thorough coverage of both happy paths and failure modes. The exception handling, cleanup logic, and recovery flow are all sound with no looping or resource leaks.

src/aleph/services/storage/fileystem_engine.py (line 79): Minor: view[written:] allocates a new view slice each loop iteration. Consider os.write(fd, content[written:]) or tracking an index to avoid the slice allocation. Negligible perf impact in practice.

Copy link
Copy Markdown

@foxpatch-aleph foxpatch-aleph left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The core logic for durable writes and SHA-256 self-healing is well-designed and well-documented. However, the test file contains a real bug: test_get_message_content_inline_invalid_json_raises_no_recovery references an undefined p2p_mock variable, which will raise NameError at runtime. There is also a minor concern about the SHA-256 verification gap for storage items fetched via the JSON recovery path (network verification happens, but the result is written to cache without a post-write check).

tests/storage/test_get_content.py (line 501): BUG: p2p_mock is not defined in this test function. This will raise NameError at runtime. The test should either define a mock and assert it was not called, or remove the assertion entirely since the test's purpose is only to verify that parse_message raises InvalidMessageFormat.

src/aleph/storage.py (line 104): Minor: After JSON recovery for storage items, the source becomes P2P and the SHA-256 self-heal block is skipped. The content was already verified by _fetch_content_from_network before being written to cache, so this is safe in practice. However, if _fetch_content_from_network were ever modified to skip verification, this gap would become a real issue. Consider adding a comment or a post-write verification assertion to document this trust boundary.

src/aleph/storage.py (line 85): Minor style: Consider adding a blank line before if item_type == ItemType.storage for visual separation from the preceding block, per PEP 8 top-level function statement grouping conventions.

tests/storage/test_get_content.py (line 446): Consider adding an assertion that _verify_content_hash was called (or not called) for the IPFS corrupt JSON test, to make the test's assumptions explicit. Currently it mocks _verify_content_hash entirely, so the test passes regardless of whether IPFS verification behavior changes in the future.

@aliel aliel force-pushed the aliel-fix-corrupted-data branch 2 times, most recently from a7f22d0 to e94be2d Compare April 27, 2026 17:53
Copy link
Copy Markdown

@foxpatch-aleph foxpatch-aleph left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR correctly addresses storage corruption prevention with durable atomic writes (temp file + fsync + os.replace) and a two-tier recovery system (SHA-256 integrity check for storage items, JSON-parse recovery for both storage and IPFS). The tests are comprehensive, covering happy paths, failure modes, and edge cases. There are no bugs or security issues. One minor code duplication nit: the SHA-256 mismatch recovery block inlines the same delete+refetch logic that _recover_cached_content provides.

src/aleph/storage.py (line 96): The SHA-256 mismatch recovery block (lines 96-105) duplicates the logic of _recover_cached_content (lines 150-168) — both delete the cache entry and refetch via get_hash_content. Consider refactoring the SHA-256 block to call _recover_cached_content instead, which would also centralize the warning log message. This is a minor code-quality nit and not required for correctness.

src/aleph/services/storage/fileystem_engine.py (line 79): The memoryview slicing view[written:] creates a new slice on each iteration. Since memoryview supports absolute positioning, os.write(fd, view[written:len(view)]) or simply os.write(fd, view) (which writes the entire buffer) would be slightly more efficient. The current code is correct but does unnecessary slice allocation. Minor nit.

src/aleph/storage.py (line 122): For storage items where SHA-256 passed but JSON parsing fails (a hash-collision edge case), the JSON recovery path will still attempt a network refetch. This is correct behavior (defensive) but will result in an unnecessary P2P round-trip in an astronomically unlikely scenario. No change needed — the behavior is sound.

Copy link
Copy Markdown

@foxpatch-aleph foxpatch-aleph left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The PR makes a well-structured addition: durable atomic writes to prevent corruption on crashes, SHA-256 self-heal for cached storage items, and JSON-parse recovery for cached IPFS/storage items. The logic is sound, the tests are comprehensive, and the recovery paths are bounded (no loops). There are a few minor notes below.

src/aleph/services/storage/fileystem_engine.py (line 98): The finally: os.close(dir_fd) at line 98 will execute even if os.open at line 90 raised. However, the except (AttributeError, OSError) at line 91 returns early before reaching the try block containing os.fsync(dir_fd). The structure is: outer try (line 74) wraps everything including the inner try (line 89). If os.open fails, we jump to the outer except (line 100), which cleans up the temp file and re-raises. The finally at line 97 only runs if we reach it, which requires os.open to succeed. So the code is correct, but the nesting is slightly confusing. Consider moving the directory fsync into its own function for clarity.

src/aleph/services/storage/fileystem_engine.py (line 78): The write loop written += os.write(fd, view[written:]) is correct but note that os.write can write fewer bytes than requested even without an error (partial write). The loop handles this correctly. However, if os.write raises (e.g., EINTR), the fd is closed in the inner finally but the outer except will clean up the temp file and re-raise. This is the correct behavior.

src/aleph/storage.py (line 85): The SHA-256 verification block only triggers for ItemType.storage with source == ContentSource.DB. This is well-motivated (IPFS hash computation is expensive). The comment at line 83-84 explains why IPFS is skipped. Good.

src/aleph/storage.py (line 122): The JSON recovery block checks source == ContentSource.DB. After the SHA-256 self-heal path (lines 90-105), source is updated to recovery_content.source (which is ContentSource.P2P). This means if the SHA-256 self-heal fetches content that is still not valid JSON, the JSON recovery block will NOT trigger (since source is now P2P). This is a potential issue: the SHA-256 self-heal path writes verified bytes to cache, but if those bytes are invalid JSON, the code falls through to the JSON parse which will raise InvalidContent without retrying from the network again.

However, looking more carefully: the SHA-256 self-heal calls get_hash_content(use_network=True), which calls _fetch_content_from_network, which verifies the bytes via _verify_content_hash. If the network returns valid bytes that pass SHA-256 but are not valid JSON, that's the correct content — the JSON parse failure is not due to corruption but due to the content genuinely not being JSON. Raising InvalidContent in that case is correct behavior.

So this is actually fine. The comment is just to note the reasoning.

tests/storage/test_get_content.py (line 344): Good test that the network returns bad bytes and the flow raises InvalidContent without looping. The test comment correctly notes it depends on _fetch_content_from_network calling _verify_content_hash.

tests/storage/test_get_content.py (line 406): The test_get_message_content_json_retry_does_not_loop test has a subtle assumption: it uses bad_json_bytes = b'definitely-not-json' and sets content_hash = _sha256_hex(bad_json_bytes). This means the cached content's hash matches the content itself. When the SHA-256 block runs, it will pass (the bytes match their hash). Then the JSON parse fails, triggering recovery. During recovery, _fetch_content_from_network calls p2p_http_request_hash which returns the same bad_json_bytes, and _verify_content_hash passes because the bytes match their hash. The second JSON parse also fails, raising InvalidContent.

The test verifies verify_spy.call_count == 2 (once from the SHA-256 block, once from _fetch_content_from_network). This is correct and well-documented.

tests/services/test_fileystem_engine.py (line 56): The test_write_fsyncs_before_rename test tracks call order for fsync and replace. Note that os.fsync is called twice (once on the file fd, once on the directory fd), but the test only asserts that at least one fsync call precedes the replace, which is correct.

@aliel aliel requested a review from amalcaraz April 27, 2026 19:05
aliel added 15 commits April 28, 2026 14:22
…very guard

  - Rewrite FileSystemStorageEngine.write() with fsync-before-rename for
    full POSIX durability (guards against kernel crashes, not just process crashes)
  - Verify SHA-256 on cache read for ItemType.storage; corrupt entries are
    deleted and refetched from the network automatically
  - Add _from_corruption_recovery guard to get_hash_content to prevent
    any recursion in the self-healing path
  - Add tests for atomic write invariants and all corruption recovery paths
…ify asyncio.to_thread usage and verify_spy comment
@aliel aliel force-pushed the aliel-fix-corrupted-data branch from e94be2d to 2786c95 Compare April 28, 2026 12:22
Copy link
Copy Markdown

@foxpatch-aleph foxpatch-aleph left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The PR implements two solid improvements: (1) durable atomic writes in the filesystem engine using temp-file + fsync + atomic rename + directory fsync, and (2) SHA-256 integrity verification on cache reads for storage-type items with automatic recovery. The recovery flow correctly avoids infinite loops by only re-triggering for ContentSource.DB. The test suite is thorough, covering happy paths, mismatch scenarios, loop prevention, and the IPFS-skip behavior. The code is well-documented with clear docstrings and comments explaining design choices. No genuine bugs found.

src/aleph/services/storage/fileystem_engine.py (line 70): If os.open() for the temp file raises (e.g. permission denied), the exception propagates without cleanup. In theory a partial temp file could remain from a failed os.open. In practice this is extremely unlikely (would require os.open to create the file then fail before returning), and the O_TRUNC flag on the next attempt handles it. Not a bug worth fixing.

tests/services/test_filesystem_engine.py (line 81): Nice test — the call-order tracking for fsync-before-rename is clean. Minor note: the os.open() for the directory fsync would also call os.fsync, so call_order would have two "fsync" entries before "replace". The assertion checks that at least one fsync precedes replace, which is correct.

tests/storage/test_get_content.py (line 293): Good test — the SHA-256 mismatch triggers delete + refetch, and the refetched content is verified by _fetch_content_from_network before being cached. One scenario not explicitly tested: what if get_hash_content raises ContentCurrentlyUnavailable during recovery (no network available)? The exception would bubble up to the caller. That seems like the correct behavior but could be worth a test.

src/aleph/storage.py (line 97): The recovery path calls get_hash_content(use_network=True, use_ipfs=True). If both network and IPFS are unavailable, get_hash_content raises ContentCurrentlyUnavailable. This exception is not caught in the SHA-256 recovery block (lines 90-105) and will propagate to the caller. This seems intentional and correct — better to fail than serve corrupt data.

Copy link
Copy Markdown
Collaborator

@odesenfans odesenfans left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The hash check on every access to messages is too big of a performance hit. We should do this in the repair ops performed at boot, with a command-line flag to trigger it (and not check every file on every boot, that's overkill as well).

The atomic write part is good, just 1-2 comments on the implementation.

Comment thread src/aleph/storage.py
Comment on lines +85 to +105
if item_type == ItemType.storage and source == ContentSource.DB:
try:
await self._verify_content_hash(
item_content, ItemType.storage, item_hash
)
except InvalidContent:
LOGGER.warning(
"Cached content for '%s' failed SHA-256 verification; "
"deleting and refetching.",
item_hash,
)
await self.storage_engine.delete(filename=item_hash)
recovery_content = await self.get_hash_content(
item_hash,
engine=ItemType(item_type),
use_network=True,
use_ipfs=True,
)
# Refetched bytes were verified by _fetch_content_from_network.
item_content = recovery_content.value
source = recovery_content.source
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This really can't be done on every access to the storage, the performance impact would be far too great. I'd rather have it be part of the repair options, to have it checked on boot once, but on every file access it's a big no.

Comment thread src/aleph/storage.py
LOGGER.warning(error_msg)
raise InvalidContent(error_msg)
except json.decoder.JSONDecodeError as e:
except (aleph_json.DecodeError, json.decoder.JSONDecodeError) as e:
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why add json.decoder.JSONDecodeError? This seems useless, aleph_json.DecodeError is already defined as the error returned by the JSON module we use (orjson iirc, can be configured to regular json too)

Comment thread src/aleph/services/storage/fileystem_engine.py
Comment on lines +100 to +107
except Exception:
try:
# temp_path may already be gone if os.replace succeeded before
# the exception (e.g. an error in the dir-fsync section).
temp_path.unlink(missing_ok=True)
except OSError:
pass
raise
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cleanup of the temporary directory should be left to the caller, either declare the temporary file in this function or perform the cleanup in the caller.


async def delete(self, filename: str):
del self.files[filename]
self.files.pop(filename, None)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What was the problem with del?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants