Skip to content
forked from google/space

Unified storage framework for the entire machine learning lifecycle

License

Notifications You must be signed in to change notification settings

huan233usc/space

 
 

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

98 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Space: Unified Storage for Machine Learning

Python CI


Unify data in your entire machine learning lifecycle with Space, a comprehensive storage solution that seamlessly handles data from ingestion to training.

Key Features:

  • Ground Truth Database
    • Store and manage multimodal data in open source file formats, row or columnar, local or in cloud.
    • Ingest from various sources, including ML datasets, files, and labeling tools.
    • Support data manipulation (append, insert, update, delete) and version control.
  • OLAP Database and Lakehouse
  • Distributed Data Processing Pipelines
    • Integrate with processing frameworks like Ray for efficient data transformation.
    • Store processed results as Materialized Views (MVs); incrementally update MVs when the source is changed.
  • Seamless Training Framework Integration
    • Access Space datasets and MVs directly via random access interfaces.
    • Convert to popular ML dataset formats (e.g., TFDS, HuggingFace, Ray).

Onboarding Examples

Space 101

  • Space uses Arrow in the API surface, e.g., schema, filter, data IO.
  • All file paths in Space are relative; datasets are immediately usable after downloading or moving.
  • Space stores data itself, or a reference of data, in Parquet files. The reference can be the address of a row in ArrayRecord file, or the path of a standalone file (limitted support, see space.core.schema.types.files).
  • space.TfFeatures is a built-in field type providing serializers for nested dicts of numpy arrays, based on TFDS FeaturesDict.
  • Please find more information in the design and performance docs.

Quick Start

Install

Install:

pip install space-datasets

Or install from code:

cd python
pip install .[dev]

Cluster Setup and Performance Tuning

See the setup and performance doc.

Create and Load Datasets

Create a Space dataset with two index fields (id, image_name) (store in Parquet) and a record field (feature) (store in ArrayRecord).

This example uses the plain binary type for the record field. Space supports a type space.TfFeatures that integrates with the TFDS feature serializer. See more details in a TFDS example.

import pyarrow as pa
from space import Dataset

schema = pa.schema([
  ("id", pa.int64()),
  ("image_name", pa.string()),
  ("feature", pa.binary())])

ds = Dataset.create(
  "/path/to/<mybucket>/example_ds",
  schema,
  primary_keys=["id"],
  record_fields=["feature"])  # Store this field in ArrayRecord files

# Load the dataset from files later:
ds = Dataset.load("/path/to/<mybucket>/example_ds")

Optionally, you can use catalogs to manage datasets by names instead of locations:

from space import DirCatalog

# DirCatalog manages datasets in a directory.
catalog = DirCatalog("/path/to/<mybucket>")

# Same as the creation above.
ds = catalog.create_dataset("example_ds", schema,
  primary_keys=["id"], record_fields=["feature"])

# Same as the load above.
ds = catalog.dataset("example_ds")

# List all datasets and materialized views.
print(catalog.datasets())

Write and Read

Append, delete some data. Each mutation generates a new version of data, represented by an increasing integer ID. We expect to support the Iceberg style tags and branches for better version management.

import pyarrow.compute as pc
from space import RayOptions

# Create a local runner:
runner = ds.local()

# Or create a Ray runner:
runner = ds.ray(ray_options=RayOptions(max_parallelism=8))

# To avoid https://github.com/ray-project/ray/issues/41333, wrap the runner 
# with @ray.remote when running in a remote Ray cluster.
#
# @ray.remote
# def run():
#   return runner.read_all()
#

# Appending data generates a new dataset version `snapshot_id=1`
# Write methods:
# - append(...): no primary key check.
# - insert(...): fail if primary key exists.
# - upsert(...): overwrite if primary key exists.
ids = range(100)
runner.append({
  "id": ids,
  "image_name": [f"{i}.jpg" for i in ids],
  "feature": [f"somedata{i}".encode("utf-8") for i in ids]
})
ds.add_tag("after_append")  # Version management: add tag to snapshot

# Deletion generates a new version `snapshot_id=2`
runner.delete(pc.field("id") == 1)
ds.add_tag("after_delete")

# Show all versions
ds.versions().to_pandas()
# >>>
#    snapshot_id               create_time tag_or_branch
# 0            2 2024-01-12 20:23:57+00:00  after_delete
# 1            1 2024-01-12 20:23:38+00:00  after_append
# 2            0 2024-01-12 20:22:51+00:00          None

# Read options:
# - filter_: optional, apply a filter (push down to reader).
# - fields: optional, field selection.
# - version: optional, snapshot_id or tag, time travel back to an old version.
# - batch_size: optional, output size.
runner.read_all(
  filter_=pc.field("image_name")=="2.jpg",
  fields=["feature"],
  version="after_add"  # or snapshot ID `1`
)

# Read the changes between version 0 and 2.
for change_type, data in runner.diff(0, "after_delete"):
  print(change_type)
  print(data)
  print("===============")

Transform and Materialized Views

Space supports transforming a dataset to a view, and materializing the view to files. The transforms include:

  • Mapping batches using a user defined function (UDF).
  • Filter using a UDF.
  • Joining two views/datasets.

When the source dataset is modified, refreshing the materialized view incrementally synchronizes changes, which saves compute and IO cost. See more details in a Segment Anything example. Reading or refreshing views must be the Ray runner, because they are implemented based on Ray transform.

A materialized view mv can be used as a view mv.view or a dataset mv.dataset. The former always reads data from the source dataset's files and processes all data on-the-fly. The latter directly reads processed data from the MV's files, skips processing data.

Example of map_batches

# A sample transform UDF.
# Input is {"field_name": [values, ...], ...}
def modify_feature_udf(batch):
  batch["feature"] = [d + b"123" for d in batch["feature"]]
  return batch

# Create a view and materialize it.
view = ds.map_batches(
  fn=modify_feature_udf,
  output_schema=ds.schema,
  output_record_fields=["feature"]
)

view_runner = view.ray()
# Reading a view will read the source dataset and apply transforms on it.
# It processes all data using `modify_feature_udf` on the fly.
for d in view_runner.read():
  print(d)

mv = view.materialize("/path/to/<mybucket>/example_mv")
# Or use a catalog:
# mv = catalog.materialize("example_mv", view)

mv_runner = mv.ray()
# Refresh the MV up to version tag `after_add` of the source.
mv_runner.refresh("after_add", batch_size=64)  # Reading batch size
# Or, mv_runner.refresh() refresh to the latest version

# Use the MV runner instead of view runner to directly read from materialized
# view files, no data processing any more.
mv_runner.read_all()

Example of join

See a full example in the Segment Anything example. Creating a materialized view of join result is not supported yet.

# If input is a materialized view, using `mv.dataset` instead of `mv.view`
# Only support 1 join key, it must be primary key of both left and right.
joined_view = mv_left.dataset.join(mv_right.dataset, keys=["id"])

ML Frameworks Integration

There are several ways to integrate Space storage with ML frameworks. Space provides a random access data source for reading data in ArrayRecord files:

from space import RandomAccessDataSource

datasource = RandomAccessDataSource(
  # <field-name>: <storage-location>, for reading data from ArrayRecord files.
  {
    "feature": "/path/to/<mybucket>/example_mv",
  },
  # Don't auto deserialize data, because we store them as plain bytes.
  deserialize=False)

len(datasource)
datasource[2]

A dataset or view can also be read as a Ray dataset:

ray_ds = ds.ray_dataset()
ray_ds.take(2)

Data in Parquet files can be read as a HuggingFace dataset:

from datasets import load_dataset

huggingface_ds = load_dataset("parquet", data_files={"train": ds.index_files()})

Inspect Metadata

List file path of all index (Parquet) files:

ds.index_files()
# Or show more statistics information of Parquet files.
ds.storage.index_manifest()  # Accept filter and snapshot_id

Show statistics information of all ArrayRecord files:

ds.storage.record_manifest()  # Accept filter and snapshot_id

Status

Space is a new project under active development.

🚧 Ongoing tasks:

  • Iceberg style version branches.
  • Performance benchmark and improvement.

Disclaimer

This is not an officially supported Google product.

About

Unified storage framework for the entire machine learning lifecycle

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages

  • Python 82.5%
  • Jupyter Notebook 17.4%
  • Shell 0.1%