Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .github/codespell_ignore_words.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# this file is used to ignore words in the codespell check (pre-commit)

crate
13 changes: 13 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -120,3 +120,16 @@ status.json

# use the below name for your optimize dataset directory for examples
example_optimize_dataset


# --- rust .gitignore ---
# Generated by Cargo
# will have compiled files and executables
debug/
target/

# These are backup files generated by rustfmt
**/*.rs.bk

# MSVC Windows builds of rustc generate these, which store debugging information
*.pdb
1 change: 1 addition & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ repos:
hooks:
- id: codespell
additional_dependencies: [tomli]
args: ["--ignore-words=.github/codespell_ignore_words.txt"]
exclude: >
(?x)^(
.*\.ipynb
Expand Down
171 changes: 171 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

22 changes: 22 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
[package]
name = "litdata"
version = "0.2.52"
edition = "2021"
authors = ["Lightning AI et al. <[email protected]>"]
license = "Apache-2.0"
description = "Data processing and streaming library for fast AI model training."
documentation = "https://github.com/Lightning-AI/litdata/"
homepage = "https://github.com/Lightning-AI/litdata/"
repository = "https://github.com/Lightning-AI/litdata/"
keywords = ["deep learning", "pytorch", "AI", "streaming", "cloud", "data processing"]
readme = "README.md"

[lib]
name = "_core"
# "cdylib" is necessary to produce a shared library for Python to import from.
crate-type = ["cdylib"]

[dependencies]
# "extension-module" tells pyo3 we want to build an extension module (skips linking against libpython.so)
# "abi3-py39" tells pyo3 (and maturin) to build using the stable ABI with minimum Python version 3.9
pyo3 = { version = "0.22.4", features = ["extension-module", "abi3-py39"] }
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ export SPHINX_MOCK_REQUIREMENTS=0
setup: install-dependencies install-pre-commit
@echo "==================== Setup Finished ===================="
@echo "All set! Ready to go!"
uv pip install -U pyopenssl

test: clean
uv pip install -q -r requirements.txt
Expand Down
8 changes: 8 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,19 @@
# limitations under the License.

[build-system]
build-backend = "maturin"

requires = [
"maturin>=1,<2",
"setuptools",
"wheel",
]

[tool.maturin]
module-name = "litdata._core"
python-packages = [ "litdata" ]
python-source = "src"

[tool.ruff]
target-version = "py39"
line-length = 120
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,4 @@ boto3
requests
tifffile
obstore
maturin
21 changes: 21 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
// pip install --upgrade pyOpenSSL cryptography

use pyo3::prelude::*;
pub mod litdata_core;

#[pyfunction]
fn hello_from_bin() -> String {
"RUST: Hello from LitData!".to_string()
}

/// A Python module implemented in Rust. The name of this function (`_core`) must match
/// the `lib.name` setting in the `Cargo.toml`, else Python will not be able to
/// import the module.
#[pymodule]
fn _core(m: &Bound<'_, PyModule>) -> PyResult<()> {
m.add_function(wrap_pyfunction!(hello_from_bin, m)?)?;
m.add_class::<litdata_core::LitDataLoaderCore>()?;
// m.add_class::<rust_impl::fs::s3::S3Storage>()?;
// m.add_class::<rust_impl::streaming_data_provider::StreamingDataProvider>()?;
Ok(())
}
2 changes: 2 additions & 0 deletions src/litdata/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import warnings

from litdata.__about__ import * # noqa: F403
from litdata._core import hello_from_bin
from litdata.constants import _LIGHTNING_SDK_AVAILABLE
from litdata.processing.functions import map, merge_datasets, optimize, walk
from litdata.raw.dataset import StreamingRawDataset
Expand Down Expand Up @@ -47,6 +48,7 @@
"index_parquet_dataset",
"index_hf_dataset",
"breakpoint",
"hello_from_bin",
]

if _LIGHTNING_SDK_AVAILABLE:
Expand Down
59 changes: 59 additions & 0 deletions src/litdata/_core.pyi
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
def hello_from_bin() -> str: ...

# StreamingDataProvider
# -> on start, download x upcoming items in advance
# -> get_next_k_item() => get next k upcomig items
#
# ------ how it works ------
# 1. ChunksConfig has a property `self.streaming_data_provider` which is an instance of StreamingDataProvider
# 2. When dataset.py __iter__() is called, it gets the chunk order and __next__() will get the sample item order.
# 3. The chunk order and sample item order is stored in `set_chunk` and `set_sample_index`.
# 4. But, we will not only get chunk and sample order for current epoch, but also for next epoch to be better prepared.
# 5. For dataset's epoch 1, we will call on_start() to download offset array for all chunk indexes in parallel.
# 6. Downloaded items returned by on_start() and in future by get_next_k_item()
# are deserialized and then stored in `config.index_to_sample_data`.
# 7. when an item read is requested, get_next_k_item() will be called to get the next k items.
# 8. For every subsequent epoch (2, 3, ...), we will get the chunk and sample order for the next epoch
# and then call `set_chunk` and `set_sample_index` to update the chunk and sample order for next epoch.
class StreamingDataProvider:
def __init__(
self,
epoch: int,
remote_dir: str,
chunks: list[dict[str, str]],
on_start_pre_item_download_count: int,
get_next_k_item_count: int,
) -> None: ...
def on_start(self) -> list[tuple[int, int, int, bytes]]: ...
def get_next_k_item(self) -> list[tuple[int, int, int, bytes]]: ...
def set_epoch(self, epoch: int) -> None: ...
def set_chunk_and_sample_index(self, epoch: int, chunk_index: list[int], sample_index: list[list[int]]) -> None: ...
def set_chunk(
self, epoch: int, chunk_index: list[int], chunk_index_begin: list[tuple[int, int, int, int]]
) -> None: ...
def set_sample_index(self, epoch: int, sample_index: list[list[int]]) -> None: ...

# S3Storage
class S3Storage:
def __init__(self, remote_dir: str) -> None: ...
def byte_range_download(self, remote_path: str, local_path: str, num_workers: int) -> None: ...

class LitDataLoaderCore:
index: int
worker_chunks: list[int]
worker_intervals: list[tuple[int, int]]
batch_size: int
pre_download: int
prefetch_workers: int
prefetch_factor: int

def __init__(
self,
worker_chunks: list[int],
worker_intervals: list[tuple[int, int]],
batch_size: int,
pre_download: int,
prefetch_workers: int,
prefetch_factor: int,
) -> None: ...
def __iter__(self) -> any: ...
Loading
Loading