Skip to content

Conversation

@sjawhar
Copy link

@sjawhar sjawhar commented Dec 3, 2025

NOTE: The code below was entirely LLM-written. Will gladly clean up / rewrite if a contribution of this type would be accepted. Please read on for context.

Our org uses DVC for a bunch of stuff. In most of our pipeline repos we have a CI check that verifies that the pipeline has been fully reproduced (dvc repro --dry --allow-missing) and all data has been pushed to the remote (dvc data status --not-in-remote) before merging to main. Recently, this CI check on one of our repos started timing out because of how long it's taking. I thought I knew what the bottleneck was (inefficient remote checking), confirmed with a profiler, then stuck models on the problem. I've included flamegraphs and cProfile logs from before and after for comparison.

So, is there a version of this change that you'd accept? Note that it also requires a small change to DVC itself.

Cheers!

BEFORE
dvc_flamegraph

AFTER
dvc_profile_final

dvc_cprofile.zip

@github-project-automation github-project-automation bot moved this to Backlog in DVC Dec 3, 2025
@CLAassistant
Copy link

CLAassistant commented Dec 3, 2025

CLA assistant check
All committers have signed the CLA.

@skshetry
Copy link
Collaborator

skshetry commented Dec 3, 2025

Definitely interested, and open to contributions. I actually implemented something similar a few months ago in DVC:

That approach, however, broke --no-remote-refresh, which I didn’t want to affect in the minor releases. The full implementation turned out to be more complex, so I ended up reverting the PR. If we remove the --no-remote-refresh flag, the implementation becomes simpler, but breaks compatibility.

And I'd like to maintain the compatibility.

build_entry() internally does fs.info() call, so if we can pass info to it, it would not call fs.info() again. Which it does in your implementation.

Regarding the implementation, for bulk checks, we should leverage fs.info() in batches. This functionality already exists in some form:

https://github.com/treeverse/dvc-objects/blob/0c04cec4c0d97416fad9535e19d0de39f288556a/src/dvc_objects/fs/base.py#L587

It can make batched asyncio calls to fs._info() or falls back to using fs.info() in a threadpool executor. The only issue is that it currently raises an error if a file is missing, even in batch mode which is something we’d need to handle, maybe by extending fs.info() with return_exceptions=True|False or other mechanisms.

Utilizing that, I think we can implement batched remote exists check that would be fast enough for all cases.

@skshetry
Copy link
Collaborator

skshetry commented Dec 3, 2025

all data has been pushed to the remote (dvc data status --not-in-remote) before merging to main. Recently, this CI check on one of our repos started timing out because of how long it's taking.

Do you use --not-in-remote with --granular?
If not, how many .dvc files or output do you have? Because without --granular, dvc only makes one single request per output. For tracked directories, it won't check files inside, just the .dir file.

DVC pushes .dir file at the end after all the entries tracked by that .dir is pushed, so the result should be same in an ideal condition.

- use return_exceptions=True for batch retrieval
- skip unnecessary network calls by accepting cached_info
- do a single fs.info call, then pass that info to build_entry
- we group storage instances by their underlying ODB path to unify
  batches and perform the fs.info call for the entire batch
@falko17
Copy link

falko17 commented Dec 7, 2025

Hi @skshetry! Sami asked me to take over his PR for now.

I've significantly rewritten the code1 to fit with your suggestions. So to summarize:

  • fs.info now has a return_exceptions parameter, which is used by the bulk_*_exists methods
  • We group storage instances by their underlying ODB path to unify batches, then perform the fs.info call for the entire batch and pass the resulting info to build_entry.
  • This also uses the existing batch functionality from fs.info instead of using another ThreadPool on top of it.
  • Finally, there are some smaller fixes/changes, like that the progress bar now updates correctly for bulk calls.

Changes are available here (these are links to diffs from the current respective treeverse:main):

I can make a new PR with the other two repos later on, but I first wanted to comment here and see if you'd accept this approach at all or if there are any bigger changes I should implement first.

Footnotes

  1. It's still a bit messy and could be improved, but I wanted to get your opinion on the approach first.

@skshetry
Copy link
Collaborator

skshetry commented Dec 7, 2025

Contributions are always welcome. Please go ahead and open the pull requests, and we can discuss details during review.

Comment on lines 553 to 555
else:
for entry in callback.wrap(storage_entries):
results[entry] = storage.exists(entry, **kwargs)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please create bulk_exists with this naive implementation on the base class. We can optimize this in the future, and will also cleanup the code.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added a naive bulk_exists to the base class.

Comment on lines +576 to +580
# Maps from path to info
cached_info: dict[str, Any] = {
p: info if not isinstance(info, Exception) else None
for p, info in zip(all_paths, batch_info)
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we caching?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is so that when we call bulk_exists for the other storage instances that have the same ODB path, we don't have to call fs.info again and instead re-use the info we've already retrieved.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for the other storage instances that have the same ODB path

What is the usecase? When would those different instances have the same path?

Copy link

@falko17 falko17 Dec 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Short disclaimer, I have to admit I'm not that familiar with DVC internals here so it's entirely possible I'm misunderstanding some part of this 😅

But I noticed that it does sometimes happen that separate storage instances have the same remote path. As a concrete example, when I tried out the example repo I didn't actually see a speed-up with the bulk changes here (before implementing the caching part) when running dvc data status --not-in-remote. When I looked into it with the help of pdb, I saw that there were multiple remotes with the same path (in this case due to different outputs):

Output of `p list(by_storage.keys())` in index.py:546
[ObjectStorage(key=('model.pkl',), odb=HashFileDB(fs=<dvc_http.HTTPSFileSystem object at 0x7de6f062ccd0>, path='https://remote.dvc.org/get-started/files/md5', read_only=False), index=<dvc_data.index.index.DataIndex object at 0x7de6f07f3680>, read_only=False), ObjectStorage(key=('eval',), odb=HashFileDB(fs=<dvc_http.HTTPSFileSystem object at 0x7de6f05b63f0>, path='https://remote.dvc.org/get-started/files/md5', read_only=False), index=<dvc_data.index.index.DataIndex object at 0x7de6f060c6b0>, read_only=False), ObjectStorage(key=('data', 'prepared'), odb=HashFileDB(fs=<dvc_http.HTTPSFileSystem object at 0x7de6f0a4a900>, path='https://remote.dvc.org/get-started/files/md5', read_only=False), index=<dvc_data.index.index.DataIndex object at 0x7de6f05b5940>, read_only=False), ObjectStorage(key=('data', 'features'), odb=HashFileDB(fs=<dvc_http.HTTPSFileSystem object at 0x7de6f062c7d0>, path='https://remote.dvc.org/get-started/files/md5', read_only=False), index=<dvc_data.index.index.DataIndex object at 0x7de6f05d96d0>, read_only=False)]

value = cast("str", entry.hash_info.value)
key = self.odb._oid_parts(value)

if isinstance(info, Exception) or info is None:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should only handle FileNotFoundError, and fail on other cases.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, changed it accordingly.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants