Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature] lance support mosaicml streaming #3461

Open
Jay-ju opened this issue Feb 19, 2025 · 13 comments
Open

[Feature] lance support mosaicml streaming #3461

Jay-ju opened this issue Feb 19, 2025 · 13 comments

Comments

@Jay-ju
Copy link
Contributor

Jay-ju commented Feb 19, 2025

like this issue mosaicml/streaming#832
Lance can perform random reads very well, and the primary key can bring good shuffle ability. However, streaming can provide a better shuffle algorithm for training.
So I bring up the possibility of this combination again here and want to discuss the ideas of the community here.

@changhiskhan
Copy link
Contributor

Yeah I think that's a great idea. We've talked about that with Mosaic folks in the past. Do you want to cross post there and maybe we can strike up the collab conversation again? Likely the integration needs to live in mosaic streaming repo at the end of the day. Would you have interest in helping develop that integration?

@Jay-ju
Copy link
Contributor Author

Jay-ju commented Feb 19, 2025

Well, I am interested in participating in this development. I want to confirm if there is a design document already.

@oceanusxiv
Copy link

FWIW, since I first brought this up, I have actually implemented a version of this within an internal system. Unfortunately this means I cannot open source it. However I can describe the rough approach I took.

The integration path I took was to dynamically generate the "fragments" that mosaicml streaming needs, as just metadata necessary to make the necessary lance.take calls for the custom Stream implementation. The needed shards would get downloaded and then immediately written to disk as pyarrow ipc files. Then the shard reader can simply read the ipc files for the specific rows it needs.

The main difficulties of this approach are

  1. Streaming internally creates two threads, both of which are capable of downloading shards (it's managed by a python lock internally), this is a big Nono for lance, since lance calls deal very poorly with python multithreading mechanics. What I ended up having to do was create a multiprocessing "server" to service the shard calls, since the shards are simply stored on disk immediately after the download, it's works. However this whole dual multiprocessing system again plays poorly with torch multiprocessing data loaders, effectively it doubles the number of workers you have to create, each data loading worker needing a corresponding lance worker. I don't know if free threaded python will resolve this situation, but for now this workaround is necessary.
  2. reading ipc files locally has of course, non-negligible access costs, pyarrow ipc is pretty fast to access, but I can't help but feel there might be a better solution here for faster on-disk access. Potentially using the streaming native MDS shard format would be faster.

@wjones127
Copy link
Contributor

this is a big Nono for lance, since lance calls deal very poorly with python multithreading mechanics

I'd be curious to know more what you mean by this. Our intention was that Lance should work fine in multithreaded environments. Everything should be thread safe and most heavy operations should release the GIL. Are there specific things that don't work in threads?

@oceanusxiv
Copy link

oceanusxiv commented Feb 21, 2025

@wjones127 Yes, I basically got a bunch of GIL locks when those two threads try to access the lance dataset (not at the same time, just at all). lance calls will just stall.

It's very possible this has to do with PyTorch data loader forking vs. spawn behavior. but I seem to recall even with using spawn if I didn't additionally spawn an extra worker just for lance, the process still doesn't work.

@chenkovsky
Copy link
Contributor

@wjones127 Yes, I basically got a bunch of GIL locks when those two threads try to access the lance dataset (not at the same time, just at all). lance calls will just stall.

It's very possible this has to do with PyTorch data loader forking vs. spawn behavior. but I seem to recall even with using spawn if I didn't additionally spawn an extra worker just for lance, the process still doesn't work.

@oceanusxiv have you tried pickle unpickle dataset. then i guess two threads share nothing

@oceanusxiv
Copy link

@chenkovsky the "dataset" we're referring to is the lance.dataset object in memory, there's no pickling or unpickling that, it includes all that native Rust bindings.

@chenkovsky
Copy link
Contributor

@chenkovsky the "dataset" we're referring to is the lance.dataset object in memory, there's no pickling or unpickling that, it includes all that native Rust bindings.

@oceanusxiv

I think lance.dataset.LanceDataset supports pickle.
Please see

def __setstate__(self, state):

actually ray support depends on pickle unpickle.

@Jay-ju
Copy link
Contributor Author

Jay-ju commented Feb 27, 2025

FWIW, since I first brought this up, I have actually implemented a version of this within an internal system. Unfortunately this means I cannot open source it. However I can describe the rough approach I took.

The integration path I took was to dynamically generate the "fragments" that mosaicml streaming needs, as just metadata necessary to make the necessary lance.take calls for the custom Stream implementation. The needed shards would get downloaded and then immediately written to disk as pyarrow ipc files. Then the shard reader can simply read the ipc files for the specific rows it needs.

The main difficulties of this approach are

  1. Streaming internally creates two threads, both of which are capable of downloading shards (it's managed by a python lock internally), this is a big Nono for lance, since lance calls deal very poorly with python multithreading mechanics. What I ended up having to do was create a multiprocessing "server" to service the shard calls, since the shards are simply stored on disk immediately after the download, it's works. However this whole dual multiprocessing system again plays poorly with torch multiprocessing data loaders, effectively it doubles the number of workers you have to create, each data loading worker needing a corresponding lance worker. I don't know if free threaded python will resolve this situation, but for now this workaround is necessary.
  2. reading ipc files locally has of course, non-negligible access costs, pyarrow ipc is pretty fast to access, but I can't help but feel there might be a better solution here for faster on-disk access. Potentially using the streaming native MDS shard format would be faster.

@oceanusxiv
Here I actually don't understand. Lance doesn't support the interface for take. Only rowid needs to be passed. So why is it necessary to download to the local first and then go through the ipc file? Can't data be obtained directly based on rowid during training? Is it because of concerns about insufficient performance?

@oceanusxiv
Copy link

@Jay-ju Yes, it's a performance thing, none of this is necessary if the lance dataset exists locally, but when the lance dataset is remote and you must make network calls, it just isn't performant enough if you do a row id pull for every sample. Pulling the remote source into a local cache as shards is a central conceit of the streaming dataset specifically for performance reasons.

@oceanusxiv
Copy link

@chenkovsky Oh I didn't realize that works for dataset itself, yeah this might be worth a try instead of trying to spawn a separate process.

@Jay-ju
Copy link
Contributor Author

Jay-ju commented Feb 28, 2025

@Jay-ju Yes, it's a performance thing, none of this is necessary if the lance dataset exists locally, but when the lance dataset is remote and you must make network calls, it just isn't performant enough if you do a row id pull for every sample. Pulling the remote source into a local cache as shards is a central conceit of the streaming dataset specifically for performance reasons.

@oceanusxiv
Is the prefetch mechanism of torch itself still unable to meet the performance requirements of local cached data?

@oceanusxiv
Copy link

@Jay-ju Correct, it's just not fast enough if you have to do a network call every time you want to retrieve a single sample. It's plenty fast enough if the lance dataset is in the local filesystem.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants