diff --git a/.nojekyll b/.nojekyll new file mode 100644 index 0000000000..e69de29bb2 diff --git a/404.html b/404.html new file mode 100644 index 0000000000..7e3b51e2e5 --- /dev/null +++ b/404.html @@ -0,0 +1,490 @@ + + + +
+ + + + + + + + + + + + + + +In this example, we implement an ededup transform that +removes duplicate documents across all files. In this tutorial, we will show the following:
+ededup
transform to generate the output table.TransformRuntime
to create supporting Ray objects and supplement
+ transform-specific metadata with the information about this statisticsThe complete task involves the following:
+(Currently, the complete code for the noop transform used for this +tutorial can be found in the +ededup transform directory.
+Finally, we show to use the command line to run the transform in a local ray cluster
+One of the basic components of exact dedup implementation is a cache of hashes. That is why we will start +from implementing this support actor. The implementation is fairly straight forward and can be +found here
+First, let's define the transform class. To do this we extend +the base abstract/interface class +AbstractTableTransform, +which requires definition of the following:
+init()
) that accepts a dictionary of configuration
+ data. For this example, the configuration data will only be defined by
+ command line arguments (defined below).transform()
method itself that takes an input table and produces an output
+ table and any associated metadata for that table transformation.Other methods such as flush()
need not be overridden/redefined for this example.
We start with the simple definition of the class, its initializer and the imports required +by subsequent code:
+from argparse import ArgumentParser, Namespace
+from typing import Any
+
+import pyarrow as pa
+import ray
+from data_processing.data_access import DataAccessFactory
+from data_processing.ray import (
+ DefaultTableTransformConfiguration,
+ DefaultTableTransformRuntime,
+ RayUtils,
+ TransformLauncher,
+)
+from data_processing.transform import AbstractTableTransform
+from data_processing.utils import GB, TransformUtils
+from ray.actor import ActorHandle
+
+
+class EdedupTransform(AbstractTableTransform):
+
+ def __init__(self, config: dict):
+ super().__init__(config)
+ self.doc_column = config.get("doc_column", "")
+ self.hashes = config.get("hashes", [])
+
+The EdedupTransform
class extends the AbstractTableTransform
, which defines the required methods.
For purposes of the tutorial and to simulate a more complex processing
+job, our initializer allows our transform to be configurable
+with document column name and a list of hash actors during the call to transform()
.
+Configuration is provided by the framework in a dictionary provided to the initializer.
+Below we will cover how doc_column
and hashes
arguments are made available to the initializer.
Next we define the transform()
method itself, which includes the addition of some
+metadata.
def transform(self, table: pa.Table) -> tuple[list[pa.Table], dict[str, Any]]:
+ if not TransformUtils.validate_columns(table=table, required=[self.doc_column]):
+ return [], {}
+ # Inner variables
+ hashes = set()
+ unique = []
+ hd = {}
+ # Compute unique hashes for the table
+ for text in table[self.doc_column]:
+ # Compute doc hash
+ h = TransformUtils.str_to_hash(TransformUtils.normalize_string(str(text)))
+ if h not in hashes: # Processing this hash for the first time
+ hashes.add(h) # Remember it locally
+ hd[h] = str(text)
+ if len(hd) >= REQUEST_LEN: # time to check remotely
+ unique = unique + self._process_remote_hashes(hd=hd)
+ hd = {}
+ if len(hd) > 0: # Process remaining hashes
+ unique = unique + self._process_remote_hashes(hd=hd)
+
+ # Remove duplicates
+ unique_set = set(unique)
+ mask = [False] * table.num_rows
+ index = 0
+ for text in table[self.doc_column]:
+ str_text = str(text)
+ if str_text in unique_set:
+ mask[index] = True
+ unique_set.remove(str_text)
+ index += 1
+ # Create output table
+ out_table = table.filter(mask)
+ # report statistics
+ stats = {"source_documents": table.num_rows, "result_documents": out_table.num_rows}
+ return [out_table], stats
+
+The single input to this method is the in-memory pyarrow table to be transformed. +The return of this function is a list of tables and optional metadata. In this +case of simple 1:1 table conversion the list will contain a single table, result of removing +duplicates from input table.
+The metadata is a free-form dictionary of keys with numeric values that will be aggregated +by the framework and reported as aggregated job statistics metadata. +If there is no metadata then simply return an empty dictionary.
+First, let's define the transform runtime class. To do this we extend +the base abstract/interface class +DefaultTableTransformRuntime, +which requires definition of the following:
+init()
) that accepts a dictionary of configuration
+ data. For this example, the configuration data will only be defined by
+ command line arguments (defined below).get_transform_config()
method that takes data_access_factory
, statistics actor
, and
+ list of files to process
and produces a dictionary of parameters used by transform.compute_execution_stats()
method that takes take a dictionary of metadata, enhances it and
+ produces an enhanced metadata dictionary.We start with the simple definition of the class and its initializer
+class EdedupRuntime(DefaultTableTransformRuntime):
+
+ def __init__(self, params: dict[str, Any]):
+ super().__init__(params)
+ self.filters = []
+
+Next we define the get_transform_config()
method, which, in this case, creates supporting Ray Actors and
+adds their handles to the transform parameters
def get_transform_config(
+ self, data_access_factory: DataAccessFactory, statistics: ActorHandle, files: list[str]
+ ) -> dict[str, Any]:
+ self.filters = RayUtils.create_actors(
+ clazz=HashFilter,
+ params={},
+ actor_options={"num_cpus": self.params.get("hash_cpu", 0.5)},
+ n_actors=self.params.get("num_hashes", 1),
+ )
+ return {"hashes": self.filters} | self.params
+
+Inputs to this method includes a set of parameters, that moght not be needed for this transformer, but +rather a superset of all parameters that can be used by different implementations of transform runtime ( +see for example block listing, +fuzzy dedup, etc). +The return of this function is a dictionary information for transformer initialization. In this +implementation we add additional parameters to the input dictionary, but in general, it can be a completely +new dictionary build here
+Finally we define the compute_execution_stats()
method, which which enhances metadata collected by statistics
+class
def compute_execution_stats(self, stats: dict[str, Any]) -> dict[str, Any]:
+ # Get filters stats
+ sum_hash = 0
+ sum_hash_mem = 0
+ remote_replies = [f.get_hash_size.remote() for f in self.filters]
+ while remote_replies:
+ # Wait for replies
+ ready, not_ready = ray.wait(remote_replies)
+ for r in ready:
+ h_size, h_memory = ray.get(r)
+ sum_hash = sum_hash + h_size
+ sum_hash_mem = sum_hash_mem + h_memory
+ remote_replies = not_ready
+ dedup_prst = 100 * (1.0 - stats.get("result_documents", 1) / stats.get("source_documents", 1))
+ return {"number of hashes": sum_hash, "hash memory, GB": sum_hash_mem, "de duplication %": dedup_prst} | stats
+
+Input to this method is a dictionary of metadata collected by statistics object. It then enhances it by information +collected by hash actors and custom computations based on statistics data.
+The final class we need to implement is EdedupTableTransformConfiguration
class and its initializer that
+define the following:
First we define the class and its initializer,
+class EdedupTableTransformConfiguration(DefaultTableTransformConfiguration):
+ def __init__(self):
+ super().__init__(name="ededup", runtime_class=EdedupRuntime, transform_class=EdedupTransform)
+ self.params = {}
+
+The initializer extends the DefaultTableTransformConfiguration which provides simple
+capture of our configuration data and enables picklability through the network.
+It also adds a params
field that will be used below to hold the transform's
+configuration data (used in EdedupRuntime.init()
above).
Next, we provide two methods that define and capture the command line configuration that
+is specific to the EdedupTransform
, in this case the number of seconds to sleep during transformation.
+First we define the method establishes the command line arguments.
+This method is given a global argument parser to which the EdedupTransform
arguments are added.
+It is good practice to include a common prefix to all transform-specific options (i.e. pii, lang, etc).
+In our case we will use noop_
.
def add_input_params(self, parser: ArgumentParser) -> None:
+ parser.add_argument("--hash_cpu", type=float, default=0.5, help="number of CPUs per hash")
+ parser.add_argument("--num_hashes", type=int, default=0, help="number of hash actors to use")
+ parser.add_argument("--doc_column", type=str, default="contents", help="key for accessing data")
+
+Next we implement a method that is called after the framework has parsed the CLI args
+and which allows us to capture the EdedupTransform
-specific arguments and optionally validate them.
def apply_input_params(self, args: Namespace) -> bool:
+ if args.num_hashes <= 0:
+ print(f"Number of hashes should be greater then zero, provided {args.num_hashes}")
+ return False
+ self.params["doc_column"] = args.doc_column
+ self.params["hash_cpu"] = args.hash_cpu
+ self.params["num_hashes"] = args.num_hashes
+ print(f"exact dedup params are {self.params}")
+ return True
+
+Next, we show how to launch the framework with the EdedupTransform
using the
+framework's TransformLauncher
class.
if __name__ == "__main__":
+ launcher = TransformLauncher(transform_runtime_config=EdedupTransformConfiguration())
+ launcher.launch()
+
+The launcher requires only an instance of DefaultTableTransformConfiguration
+(our EdedupTransformConfiguration
class).
+A single method launch()
is then invoked to run the transform in a Ray cluster.
Assuming the above main()
is placed in ededup_transform.py
we can run the transform on local data as follows:
python ededup_transform.py --ededup_hash_cpu 0.5 --ededup_num_hashes 2 --ededup_doc_column "contents" \
+ --run_locally True \
+ --data_local_config="{'input_folder': '<project location>/transforms/universal/ededup/test-data/input', 'output_folder': '<project location>/transforms/universal/ededup/output'}"
+
+This is a minimal set of options to run locally. +See the launcher options for a complete list of +transform-independent command line options.
+ + + + + + + + + + + + + +In this section we cover the high-level architecture, some of the core components.
+Transform implementation and examples are provided in the tutorial.
+The architecture is a "standard" implementation of Embarrassingly parallel to +process many input files in parallel using a distribute network of RayWorkers.
+ +The architecture includes the following core components:
+RayLauncher accepts and validates + CLI parameters to establish the Ray Orchestrator with the proper configuration. +It uses the following components, all of which can/do define CLI configuration parameters.:
+After all parameters are validated, the ray cluster is started and the DataAccessFactory, TransformOrchestratorConfiguraiton
+and TransformConfiguration are given to the Ray Orchestrator, via Ray remote() method invocation.
+The Launcher waits for the Ray Orchestrator to complete.
+* Ray Orchestrator is responsible for overall management of
+ the data processing job. It creates the actors, determines the set of input data and distributes the
+ references to the data files to be processed by the workers. More specifically, it performs the following:
+ 1. Uses the DataAccess instance created by the DataAccessFactory to determine the set of the files
+ to be processed.
+ 2. uses the TransformConfiguration to create the TransformRuntime instance
+ 3. Uses the TransformRuntime to optionally apply additional configuration (ray object storage, etc) for the configuration
+ and operation of the Transform.
+ 3. uses the TransformOrchestratorConfiguration to determine the set of RayWorkers to create
+ to execute transformers in parallel, providing the following to each worker:
+ * Ray worker configuration
+ * DataAccessFactory
+ * Transform class and its TransformConfiguration containing the CLI parameters and any TransformRuntime additions.
+ 4. in a load-balanced, round-robin fashion, distributes the names of the input files to the workers for them to transform/process.
Additionally, to provide monitoring of long-running transforms, the orchestrator is instrumented with
+ custom metrics, that are exported to localhost:8080 (this is the endpoint that
+ Prometheus would be configured to scrape).
+ Once all data is processed, the orchestrator will collect execution statistics (from the statistics actor)
+ and build and save it in the form of execution metadata (metadata.json
). Finally, it will return the execution
+ result to the Launcher.
+* Ray worker is responsible for
+reading files (as PyArrow Tables)
+assigned by the orchestrator, applying the transform to the input table and writing out the
+resulting table(s). Metadata produced by each table transformation is aggregated into
+Transform Statistics (below).
+* Transform Statistics is a general
+purpose data collector actor aggregating the numeric metadata from different places of
+the framework (especially metadata produced by the transform).
+These statistics are reported as metadata (metadata.json
) by the orchestrator upon completion.
Some of the core components used by the architecture are definfed here:
+A brief discussion of the Transform components are provided here. +For a more complete discussion, see the tutorials.
+transform()
and flush()
- and provides the bulk of any transform implementation
+convert one Table to 0 or more new Tables. In general, this is not tied to the above Ray infrastructure
+and so can usually be used independent of Ray. get_transform_config()
is used to enable these extensions.