Skip to content

Conversation

robTheBuildr
Copy link
Collaborator

Before submitting
  • Was this discussed/agreed via a Github issue? (no need for typos and docs improvements)
  • Did you read the contributor guideline, Pull Request section?
  • Did you make sure to update the docs?
  • Did you write any new necessary tests?

What does this PR do?

Fixes an issue that occurs every 20-30 streaming datasets, where the index.json file that is read is truncated and missing its closing bracket }.

I attempted both redownloading the file and checking the cache copy location and this did not fix the issue, but with consistency in certain cases the closing bracket is missing.

This is fairly highly reproducible:

Main.py

from litdata import StreamingDataset, StreamingDataLoader, optimize
import time


def should_keep(data):
    if data % 2 == 0:
        yield data


if __name__ == "__main__":
    output_dir = (
        "/teamspace/lightning_storage/cloud-experiments/litdata_optimize_bug/test_11"
    )
    optimize(
        fn=should_keep,
        inputs=list(range(5000)),
        output_dir=output_dir,
        chunk_bytes="64MB",
        num_workers=4,
        compression="zstd",  # Without this, no error.
        mode="overwrite",
    )
    print("done optimizing dataset, now waiting before reading")
    # time.sleep(10)  # There is the error with or without this sleep
    dataset = StreamingDataset(output_dir)
    print(f"length of datset is {len(dataset)}")

    dataloader = StreamingDataLoader(dataset, batch_size=32, num_workers=4)
    for _ in dataloader:
        pass

    print("Successfully iterated through the data")

Iterate with this bash script:

#!/bin/bash
count=0
while python main.py; do
  count=$((count + 1))
  echo "Run #$count succeeded."
  if [ "$count" -ge 100 ]; then
    echo "main.py succeeded 100 times, stopping."
    exit 0
  fi
done

echo "main.py failed on run #$((count + 1)), stopping."

And we will see failure between iteration 20 and 30 every time without this fix.

PR review

Anyone in the community is free to review the PR once the tests have passed.
If we didn't discuss your PR in GitHub issues there's a high chance it will not be merged.

Did you have fun?

Make sure you had fun coding 🙃

@robTheBuildr robTheBuildr changed the title Fix: Repairing slightyly malformed index files Fix(litData): Repairing slightyly malformed index files Sep 23, 2025
@robTheBuildr robTheBuildr changed the title Fix(litData): Repairing slightyly malformed index files Fix(litData): Repairing slightly malformed index files Sep 23, 2025
@robTheBuildr robTheBuildr changed the title Fix(litData): Repairing slightly malformed index files Fix(litData): Handle truncated index.json files without failure Sep 23, 2025
Copy link

codecov bot commented Sep 23, 2025

Codecov Report

❌ Patch coverage is 0% with 8 lines in your changes missing coverage. Please review.
✅ Project coverage is 84%. Comparing base (df92bf8) to head (614e6a4).

❌ Your patch status has failed because the patch coverage (0%) is below the target coverage (50%). You can increase the patch coverage or adjust the target coverage.

Additional details and impacted files
@@         Coverage Diff         @@
##           main   #720   +/-   ##
===================================
- Coverage    84%    84%   -0%     
===================================
  Files        52     52           
  Lines      7278   7286    +8     
===================================
- Hits       6126   6125    -1     
- Misses     1152   1161    +9     
🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

Comment on lines +315 to +324
except json.decoder.JSONDecodeError:
with open(index_filepath) as f:
raw_data = f.read()
raw_data += "}" # close the json content it has been truncated by a character
data = json.loads(raw_data) # load json from string
if "chunks" not in data and "shards" in data:
# load mds shard-based index file and adapt to chunks format
return adapt_mds_shards_to_chunks(data)

return data
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice catch @robTheBuildr 🙌

This feels like a reasonable hotfix if the issue is truly just a missing closing brace. That said, I wonder if we can be certain it’s only ever truncated by a single character. My assumption is that this might have happened if the index dump step was interrupted or corrupted mid-write — please correct me if I’m off here.

Since the index files aren’t compressed (just chunks), I assume this could potentially affect both with or without compression enabled.

Given that index writing is done in a sequential step and in a single process, I’m a bit unsure about the exact root cause.

merge_cache._merge_no_wait(node_rank if num_nodes > 1 else None, getattr(self, "existing_index", None))
self._upload_index(output_dir, cache_dir, num_nodes, node_rank)

if node_rank is None:
with open(os.path.join(self._cache_dir, _INDEX_FILENAME), "w") as f:
data = {"chunks": chunks_info, "config": config, "updated_at": str(time())}
json.dump(data, f, sort_keys=True)
else:
with open(os.path.join(self._cache_dir, f"{node_rank}-{_INDEX_FILENAME}"), "w") as f:
json.dump({"chunks": chunks_info, "config": config}, f, sort_keys=True)

Do you think we should consider making the index writing atomic (e.g. write to a temp file and then rename) to avoid partial writes altogether?

Happy to hear your thoughts here.

cc: @tchaton

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would tend to agree with @ bhimrazy. It would be great to understand how this can happen. I have never seen it before. I wonder if this could be related to the new folder type.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I agree too, if we look at the code preceding this error:

# Check if `index.json` file exists in cache path
    if not os.path.exists(cache_index_filepath) and isinstance(input_dir.url, str):
        assert input_dir.url is not None
        if index_path is not None:
            copy_index_to_cache_index_filepath(index_path, cache_index_filepath)
        else:
            # Merge data_connection_id from resolved directory into storage_options for R2 connections
            merged_storage_options = storage_options.copy() if storage_options is not None else {}
            if hasattr(input_dir, "data_connection_id") and input_dir.data_connection_id:
                merged_storage_options["data_connection_id"] = input_dir.data_connection_id

            downloader = get_downloader(input_dir.url, input_dir.path, [], merged_storage_options, session_options)
            downloader.download_file(os.path.join(input_dir.url, _INDEX_FILENAME), cache_index_filepath)

This file is either copied from elsewhere in the filesystem or downloaded from a url.
I have tried redownloading it on error and it has not resolved

That being said, its indicative of a partial write so I will try the fix suggested by @bhimrazy to resolve alternatively.

Copy link
Collaborator

@deependujha deependujha left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Screenshot 2025-09-23 at 12 59 25 PM

interesting, couldn't reproduce the issue.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants