Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions src/litdata/utilities/dataset_utilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,16 @@ def load_index_file(input_dir: str) -> dict[str, Any]:
return data
except FileNotFoundError:
raise FileNotFoundError(f"Index file not found at {index_filepath}.")
except json.decoder.JSONDecodeError:
with open(index_filepath) as f:
raw_data = f.read()
raw_data += "}" # close the json content it has been truncated by a character
data = json.loads(raw_data) # load json from string
if "chunks" not in data and "shards" in data:
# load mds shard-based index file and adapt to chunks format
return adapt_mds_shards_to_chunks(data)

return data
Comment on lines +315 to +324
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice catch @robTheBuildr 🙌

This feels like a reasonable hotfix if the issue is truly just a missing closing brace. That said, I wonder if we can be certain it’s only ever truncated by a single character. My assumption is that this might have happened if the index dump step was interrupted or corrupted mid-write — please correct me if I’m off here.

Since the index files aren’t compressed (just chunks), I assume this could potentially affect both with or without compression enabled.

Given that index writing is done in a sequential step and in a single process, I’m a bit unsure about the exact root cause.

merge_cache._merge_no_wait(node_rank if num_nodes > 1 else None, getattr(self, "existing_index", None))
self._upload_index(output_dir, cache_dir, num_nodes, node_rank)

if node_rank is None:
with open(os.path.join(self._cache_dir, _INDEX_FILENAME), "w") as f:
data = {"chunks": chunks_info, "config": config, "updated_at": str(time())}
json.dump(data, f, sort_keys=True)
else:
with open(os.path.join(self._cache_dir, f"{node_rank}-{_INDEX_FILENAME}"), "w") as f:
json.dump({"chunks": chunks_info, "config": config}, f, sort_keys=True)

Do you think we should consider making the index writing atomic (e.g. write to a temp file and then rename) to avoid partial writes altogether?

Happy to hear your thoughts here.

cc: @tchaton

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would tend to agree with @ bhimrazy. It would be great to understand how this can happen. I have never seen it before. I wonder if this could be related to the new folder type.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I agree too, if we look at the code preceding this error:

# Check if `index.json` file exists in cache path
    if not os.path.exists(cache_index_filepath) and isinstance(input_dir.url, str):
        assert input_dir.url is not None
        if index_path is not None:
            copy_index_to_cache_index_filepath(index_path, cache_index_filepath)
        else:
            # Merge data_connection_id from resolved directory into storage_options for R2 connections
            merged_storage_options = storage_options.copy() if storage_options is not None else {}
            if hasattr(input_dir, "data_connection_id") and input_dir.data_connection_id:
                merged_storage_options["data_connection_id"] = input_dir.data_connection_id

            downloader = get_downloader(input_dir.url, input_dir.path, [], merged_storage_options, session_options)
            downloader.download_file(os.path.join(input_dir.url, _INDEX_FILENAME), cache_index_filepath)

This file is either copied from elsewhere in the filesystem or downloaded from a url.
I have tried redownloading it on error and it has not resolved

That being said, its indicative of a partial write so I will try the fix suggested by @bhimrazy to resolve alternatively.



def adapt_mds_shards_to_chunks(data: dict[str, Any]) -> dict[str, Any]:
Expand Down
Loading