diff --git a/bindings/pyroot/pythonizations/python/ROOT/_pythonization/_tmva/_batchgenerator.py b/bindings/pyroot/pythonizations/python/ROOT/_pythonization/_tmva/_batchgenerator.py index 6cf046e90a97a..bb6c0047e96d4 100644 --- a/bindings/pyroot/pythonizations/python/ROOT/_pythonization/_tmva/_batchgenerator.py +++ b/bindings/pyroot/pythonizations/python/ROOT/_pythonization/_tmva/_batchgenerator.py @@ -2,9 +2,10 @@ # Author: Kristupas Pranckietis, Vilnius University 05/2024 # Author: Nopphakorn Subsa-Ard, King Mongkut's University of Technology Thonburi (KMUTT) (TH) 08/2024 # Author: Vincenzo Eduardo Padulano, CERN 10/2024 +# Author: Martin Føll, University of Oslo (UiO) & CERN 05/2025 ################################################################################ -# Copyright (C) 1995-2024, Rene Brun and Fons Rademakers. # +# Copyright (C) 1995-2025, Rene Brun and Fons Rademakers. # # All rights reserved. # # # # For the licensing terms see $ROOTSYS/LICENSE. # @@ -20,12 +21,13 @@ import numpy as np import tensorflow as tf import torch + import ROOT class BaseGenerator: def get_template( self, - x_rdf: RNode, + x_rdf: ROOT.RDF.RNode, columns: list[str] = list(), max_vec_sizes: dict[str, int] = dict(), ) -> Tuple[str, list[int]]: @@ -80,9 +82,10 @@ def get_template( def __init__( self, - rdataframe: RNode, + rdataframe: ROOT.RDF.RNode, batch_size: int, chunk_size: int, + block_size: int, columns: list[str] = list(), max_vec_sizes: dict[str, int] = dict(), vec_padding: int = 0, @@ -92,6 +95,7 @@ def __init__( max_chunks: int = 0, shuffle: bool = True, drop_remainder: bool = True, + set_seed: int = 0, ): """Wrapper around the Cpp RBatchGenerator @@ -126,6 +130,10 @@ def __init__( drop_remainder (bool): Drop the remainder of data that is too small to compose full batch. Defaults to True. + set_seed (int): + For reproducibility: Set the seed for the random number generator used + to split the dataset into training and validation and shuffling of the chunks + Defaults to 0 which means that the seed is set to the random device. """ import ROOT @@ -154,11 +162,6 @@ def __init__( self.noded_rdf = RDF.AsRNode(rdataframe) - if ROOT.Internal.RDF.GetDataSourceLabel(self.noded_rdf) != "TTreeDS": - raise ValueError( - "RNode object must be created out of TTrees or files of TTree" - ) - if isinstance(target, str): target = [target] @@ -221,15 +224,16 @@ def __init__( self.generator = TMVA.Experimental.Internal.RBatchGenerator(template)( self.noded_rdf, chunk_size, + block_size, batch_size, self.given_columns, - self.num_columns, max_vec_sizes_list, vec_padding, validation_split, max_chunks, shuffle, drop_remainder, + set_seed, ) atexit.register(self.DeActivate) @@ -238,6 +242,9 @@ def __init__( def is_active(self): return self.generator.IsActive() + def is_training_active(self): + return self.generator.TrainingIsActive() + def Activate(self): """Initialize the generator to be used for a loop""" self.generator.Activate() @@ -246,6 +253,30 @@ def DeActivate(self): """Deactivate the generator""" self.generator.DeActivate() + def ActivateTrainingEpoch(self): + """Activate the generator""" + self.generator.ActivateTrainingEpoch() + + def ActivateValidationEpoch(self): + """Activate the generator""" + self.generator.ActivateValidationEpoch() + + def DeActivateTrainingEpoch(self): + """Deactivate the generator""" + self.generator.DeActivateTrainingEpoch() + + def DeActivateValidationEpoch(self): + """Deactivate the generator""" + self.generator.DeActivateValidationEpoch() + + def CreateTrainBatches(self): + """Deactivate the generator""" + self.generator.CreateTrainBatches() + + def CreateValidationBatches(self): + """Deactivate the generator""" + self.generator.CreateValidationBatches() + def GetSample(self): """ Return a sample of data that has the same size and types as the actual @@ -445,12 +476,14 @@ def GetValidationBatch(self) -> Any: class LoadingThreadContext: def __init__(self, base_generator: BaseGenerator): self.base_generator = base_generator - + # create training batches from the first chunk + self.base_generator.CreateTrainBatches(); + def __enter__(self): - self.base_generator.Activate() + self.base_generator.ActivateTrainingEpoch() def __exit__(self, type, value, traceback): - self.base_generator.DeActivate() + self.base_generator.DeActivateTrainingEpoch() return True @@ -469,6 +502,7 @@ def __init__(self, base_generator: BaseGenerator, conversion_function: Callable) self.base_generator = base_generator self.conversion_function = conversion_function + def Activate(self): """Start the loading of training batches""" self.base_generator.Activate() @@ -503,6 +537,7 @@ def last_batch_no_of_rows(self) -> int: return self.base_generator.generator.TrainRemainderRows() def __iter__(self): + self._callable = self.__call__() return self @@ -522,16 +557,28 @@ def __call__(self) -> Any: Union[np.NDArray, torch.Tensor]: A batch of data """ - with LoadingThreadContext(self.base_generator): + with LoadingThreadContext(self.base_generator): while True: batch = self.base_generator.GetTrainBatch() - if batch is None: break - yield self.conversion_function(batch) + + return None + +class LoadingThreadContextVal: + def __init__(self, base_generator: BaseGenerator): + self.base_generator = base_generator + # create validation batches from the first chunk + self.base_generator.CreateValidationBatches() - return None + def __enter__(self): + self.base_generator.ActivateValidationEpoch() + + def __exit__(self, type, value, traceback): + self.base_generator.DeActivateValidationEpoch() + return True + class ValidationRBatchGenerator: @@ -588,27 +635,27 @@ def __next__(self): return batch def __call__(self) -> Any: - """Loop through the validation batches + """Start the loading of batches and yield the results Yields: Union[np.NDArray, torch.Tensor]: A batch of data """ - if self.base_generator.is_active: - self.base_generator.DeActivate() - - while True: - batch = self.base_generator.GetValidationBatch() - - if not batch: - break - - yield self.conversion_function(batch) - - + + with LoadingThreadContextVal(self.base_generator): + while True: + batch = self.base_generator.GetValidationBatch() + if batch is None: + self.base_generator.DeActivateValidationEpoch() + break + yield self.conversion_function(batch) + + return None + def CreateNumPyGenerators( - rdataframe: RNode, + rdataframe: ROOT.RDF.RNode, batch_size: int, chunk_size: int, + block_size: int, columns: list[str] = list(), max_vec_sizes: dict[str, int] = dict(), vec_padding: int = 0, @@ -618,6 +665,7 @@ def CreateNumPyGenerators( max_chunks: int = 0, shuffle: bool = True, drop_remainder=True, + set_seed: int = 0, ) -> Tuple[TrainRBatchGenerator, ValidationRBatchGenerator]: """ Return two batch generators based on the given ROOT file and tree or RDataFrame @@ -676,6 +724,7 @@ def CreateNumPyGenerators( rdataframe, batch_size, chunk_size, + block_size, columns, max_vec_sizes, vec_padding, @@ -685,6 +734,7 @@ def CreateNumPyGenerators( max_chunks, shuffle, drop_remainder, + set_seed, ) train_generator = TrainRBatchGenerator( @@ -702,9 +752,10 @@ def CreateNumPyGenerators( def CreateTFDatasets( - rdataframe: RNode, + rdataframe: ROOT.RDF.RNode, batch_size: int, chunk_size: int, + block_size: int, columns: list[str] = list(), max_vec_sizes: dict[str, int] = dict(), vec_padding: int = 0, @@ -714,6 +765,7 @@ def CreateTFDatasets( max_chunks: int = 0, shuffle: bool = True, drop_remainder=True, + set_seed: int = 0, ) -> Tuple[tf.data.Dataset, tf.data.Dataset]: """ Return two Tensorflow Datasets based on the given ROOT file and tree or RDataFrame @@ -771,6 +823,7 @@ def CreateTFDatasets( rdataframe, batch_size, chunk_size, + block_size, columns, max_vec_sizes, vec_padding, @@ -780,6 +833,7 @@ def CreateTFDatasets( max_chunks, shuffle, drop_remainder, + set_seed, ) train_generator = TrainRBatchGenerator( @@ -847,9 +901,10 @@ def CreateTFDatasets( def CreatePyTorchGenerators( - rdataframe: RNode, + rdataframe: ROOT.RDF.RNode, batch_size: int, chunk_size: int, + block_size: int, columns: list[str] = list(), max_vec_sizes: dict[str, int] = dict(), vec_padding: int = 0, @@ -859,6 +914,7 @@ def CreatePyTorchGenerators( max_chunks: int = 0, shuffle: bool = True, drop_remainder=True, + set_seed: int = 0, ) -> Tuple[TrainRBatchGenerator, ValidationRBatchGenerator]: """ Return two Tensorflow Datasets based on the given ROOT file and tree or RDataFrame @@ -914,6 +970,7 @@ def CreatePyTorchGenerators( rdataframe, batch_size, chunk_size, + block_size, columns, max_vec_sizes, vec_padding, @@ -923,6 +980,7 @@ def CreatePyTorchGenerators( max_chunks, shuffle, drop_remainder, + set_seed, ) train_generator = TrainRBatchGenerator( diff --git a/bindings/pyroot/pythonizations/test/rbatchgenerator_completeness.py b/bindings/pyroot/pythonizations/test/rbatchgenerator_completeness.py index 0d35e0ac01406..43453702c553c 100644 --- a/bindings/pyroot/pythonizations/test/rbatchgenerator_completeness.py +++ b/bindings/pyroot/pythonizations/test/rbatchgenerator_completeness.py @@ -1,14 +1,15 @@ import unittest import os import ROOT +from ROOT import RVec import numpy as np from random import randrange - class RBatchGeneratorMultipleFiles(unittest.TestCase): file_name1 = "first_half.root" file_name2 = "second_half.root" + file_name3 = "vector_columns.root" tree_name = "mytree" # default constants @@ -34,6 +35,13 @@ def create_5_entries_file(self): .Define("b2", "(double) b1 * b1")\ .Snapshot(self.tree_name, self.file_name2) + def create_vector_file(self, num_of_entries=10): + df3 = ROOT.RDataFrame(10)\ + .Define("b1", "(int) rdfentry_")\ + .Define("v1", "ROOT::VecOps::RVec{ b1, b1 * 10}")\ + .Define("v2", "ROOT::VecOps::RVec{ b1 * 100, b1 * 1000}")\ + .Snapshot(self.tree_name, self.file_name3) + def teardown_file(self, file): os.remove(file) @@ -47,16 +55,17 @@ def test01_each_element_is_generated_unshuffled(self): df, batch_size=3, chunk_size=5, + block_size=2, target="b2", validation_split=0.4, shuffle=False, drop_remainder=False ) - results_x_train = [0.0, 1.0, 2.0, 5.0, 6.0, 7.0] - results_x_val = [3.0, 4.0, 8.0, 9.0] - results_y_train = [0.0, 1.0, 4.0, 25.0, 36.0, 49.0] - results_y_val = [9.0, 16.0, 64.0, 81.0] + results_x_train = [0.0, 1.0, 2.0, 3.0, 4.0, 5.0] + results_x_val = [6.0, 7.0, 8.0, 9.0] + results_y_train = [0.0, 1.0, 4.0, 9.0, 16.0, 25.0] + results_y_val = [36.0, 49.0, 64.0, 81.0] collected_x_train = [] collected_x_val = [] @@ -64,21 +73,21 @@ def test01_each_element_is_generated_unshuffled(self): collected_y_val = [] train_iter = iter(gen_train) - val_iter = iter(gen_validation) - - for _ in range(self.n_train_batch): - x, y = next(train_iter) - self.assertTrue(x.shape == (3, 1)) - self.assertTrue(y.shape == (3, 1)) - collected_x_train.append(x.tolist()) - collected_y_train.append(y.tolist()) - + val_iter = iter(gen_validation) + for _ in range(self.n_val_batch): x, y = next(val_iter) self.assertTrue(x.shape == (3, 1)) self.assertTrue(y.shape == (3, 1)) collected_x_val.append(x.tolist()) collected_y_val.append(y.tolist()) + + for _ in range(self.n_train_batch): + x, y = next(train_iter) + self.assertTrue(x.shape == (3, 1)) + self.assertTrue(y.shape == (3, 1)) + collected_x_train.append(x.tolist()) + collected_y_train.append(y.tolist()) x, y = next(val_iter) self.assertTrue(x.shape == (self.val_remainder, 1)) @@ -114,6 +123,7 @@ def test02_each_element_is_generated_shuffled(self): df, batch_size=3, chunk_size=5, + block_size=1, target="b2", validation_split=0.4, shuffle=True, @@ -178,6 +188,7 @@ def test03_chunk_input_smaller_than_batch_size(self): df, batch_size=3, chunk_size=3, + block_size=2, target="b2", validation_split=0.4, shuffle=False, @@ -202,6 +213,7 @@ def test04_dropping_remainder(self): df, batch_size=3, chunk_size=5, + block_size=1, target="b2", validation_split=0.4, shuffle=False, @@ -244,17 +256,18 @@ def test05_more_than_one_file(self): df, batch_size=3, chunk_size=5, + block_size=1, target="b2", validation_split=0.4, shuffle=False, drop_remainder=False ) - results_x_train = [0.0, 1.0, 2.0, 5.0, 6.0, 7.0, 10.0, 11.0, 12.0] - results_x_val = [3.0, 4.0, 8.0, 9.0, 13.0, 14.0] + results_x_train = [0.0, 1.0, 2.0, 5.0, 6.0, 7.0, 3.0, 4.0, 8.0] + results_x_val = [9.0, 10.0, 11.0, 12.0, 13.0, 14.0] results_y_train = [0.0, 1.0, 4.0, 25.0, - 36.0, 49.0, 100.0, 121.0, 144.0] - results_y_val = [9.0, 16.0, 64.0, 81.0, 169.0, 196.0] + 36.0, 49.0, 9.0, 16.0, 64.0] + results_y_val = [81.0, 100.0, 121.0, 144.0, 169.0, 196.0] collected_x_train = [] collected_x_val = [] @@ -309,6 +322,7 @@ def test06_multiple_target_columns(self): df, batch_size=3, chunk_size=5, + block_size=1, target=["b2", "b4"], weights="b3", validation_split=0.4, @@ -316,14 +330,14 @@ def test06_multiple_target_columns(self): drop_remainder=False ) - results_x_train = [0.0, 1.0, 2.0, 5.0, 6.0, 7.0] - results_x_val = [3.0, 4.0, 8.0, 9.0] + results_x_train = [0.0, 1.0, 2.0, 3.0, 4.0, 5.0] + results_x_val = [6.0, 7.0, 8.0, 9.0] results_y_train = [0.0, 0.0, 1.0, 100.0, 4.0, - 200.0, 25.0, 500.0, 36.0, 600.0, 49.0, 700.0] - results_y_val = [9.0, 300.0, 16.0, 400.0, 64.0, 800.0, 81.0, 900.0] - results_z_train = [0.0, 10.0, 20.0, 50.0, 60.0, 70.0] - results_z_val = [30.0, 40.0, 80.0, 90.0] - + 200.0, 9.0, 300.0, 16.0, 400.0, 25.0, 500.0] + results_y_val = [36.0, 600.0, 49.0, 700.0, 64.0, 800.0, 81.0, 900.0] + results_z_train = [0.0, 10.0, 20.0, 30.0, 40.0, 50.0] + results_z_val = [60.0, 70.0, 80.0, 90.0] + collected_x_train = [] collected_x_val = [] collected_y_train = [] @@ -399,6 +413,7 @@ def test07_multiple_input_columns(self): df, batch_size=3, chunk_size=5, + block_size=1, target="b2", validation_split=0.4, shuffle=False, @@ -406,10 +421,10 @@ def test07_multiple_input_columns(self): ) results_x_train = [0.0, 0.0, 1.0, 10.0, 2.0, - 20.0, 5.0, 50.0, 6.0, 60.0, 7.0, 70.0] - results_x_val = [3.0, 30.0, 4.0, 40.0, 8.0, 80.0, 9.0, 90.0] - results_y_train = [0.0, 1.0, 4.0, 25.0, 36.0, 49.] - results_y_val = [9.0, 16.0, 64.0, 81.0] + 20.0, 3.0, 30.0, 4.0, 40.0, 5.0, 50.0] + results_x_val = [6.0, 60.0, 7.0, 70.0, 8.0, 80.0, 9.0, 90.0] + results_y_train = [0.0, 1.0, 4.0, 9.0, 16.0, 25.0] + results_y_val = [36.0, 49.0, 64.0, 81.0] collected_x_train = [] collected_x_val = [] @@ -469,6 +484,7 @@ def test08_filtered(self): dff, batch_size=3, chunk_size=5, + block_size=1, target="b2", validation_split=0.4, shuffle=False, @@ -536,6 +552,7 @@ def test09_filtered_last_chunk(self): dff, batch_size=3, chunk_size=9, + block_size=1, target="b2", validation_split=0, shuffle=False, @@ -589,9 +606,10 @@ def test10_two_epochs_shuffled(self): df, batch_size=3, chunk_size=5, + block_size=1, target="b2", validation_split=0.4, - shuffle=True, + shuffle=False, drop_remainder=False ) @@ -661,6 +679,7 @@ def test11_number_of_training_and_validation_batches_remainder(self): df, batch_size=3, chunk_size=5, + block_size=1, target="b2", validation_split=0.4, shuffle=False, @@ -708,6 +727,7 @@ def test12_PyTorch(self): df, batch_size=3, chunk_size=5, + block_size=1, target=["b2", "b4"], weights="b3", validation_split=0.4, @@ -715,13 +735,13 @@ def test12_PyTorch(self): drop_remainder=False ) - results_x_train = [0.0, 1.0, 2.0, 5.0, 6.0, 7.0] - results_x_val = [3.0, 4.0, 8.0, 9.0] + results_x_train = [0.0, 1.0, 2.0, 3.0, 4.0, 5.0] + results_x_val = [6.0, 7.0, 8.0, 9.0] results_y_train = [0.0, 0.0, 1.0, 100.0, 4.0, - 200.0, 25.0, 500.0, 36.0, 600.0, 49.0, 700.0] - results_y_val = [9.0, 300.0, 16.0, 400.0, 64.0, 800.0, 81.0, 900.0] - results_z_train = [0.0, 10.0, 20.0, 50.0, 60.0, 70.0] - results_z_val = [30.0, 40.0, 80.0, 90.0] + 200.0, 9.0, 300.0, 16.0, 400.0, 25.0, 500.0] + results_y_val = [36.0, 600.0, 49.0, 700.0, 64.0, 800.0, 81.0, 900.0] + results_z_train = [0.0, 10.0, 20.0, 30.0, 40.0, 50.0] + results_z_val = [60.0, 70.0, 80.0, 90.0] collected_x_train = [] collected_x_val = [] @@ -801,6 +821,7 @@ def test13_TensorFlow(self): df, batch_size=3, chunk_size=5, + block_size=1, target=["b2", "b4"], weights="b3", validation_split=0.4, @@ -808,15 +829,14 @@ def test13_TensorFlow(self): drop_remainder=False ) - results_x_train = [0.0, 1.0, 2.0, 5.0, 6.0, 7.0] - results_x_val = [3.0, 4.0, 8.0, 9.0, 0.0, 0.0] + results_x_train = [0.0, 1.0, 2.0, 3.0, 4.0, 5.0] + results_x_val = [6.0, 7.0, 8.0, 9.0, 0.0, 0.0] results_y_train = [0.0, 0.0, 1.0, 100.0, 4.0, - 200.0, 25.0, 500.0, 36.0, 600.0, 49.0, 700.0] - results_y_val = [9.0, 300.0, 16.0, 400.0, 64.0, - 800.0, 81.0, 900.0, 0.0, 0.0, 0.0, 0.0] - results_z_train = [0.0, 10.0, 20.0, 50.0, 60.0, 70.0] - results_z_val = [30.0, 40.0, 80.0, 90.0, 0.0, 0.0] - + 200.0, 9.0, 300.0, 16.0, 400.0, 25.0, 500.0] + results_y_val = [36.0, 600.0, 49.0, 700.0, 64.0, 800.0, 81.0, 900.0, 0.0, 0.0, 0.0, 0.0] + results_z_train = [0.0, 10.0, 20.0, 30.0, 40.0, 50.0] + results_z_val = [60.0, 70.0, 80.0, 90.0, 0.0, 0.0] + collected_x_train = [] collected_x_val = [] collected_y_train = [] @@ -906,6 +926,7 @@ def test(size_of_batch, size_of_chunk, num_of_entries): df, batch_size=size_of_batch, chunk_size=size_of_chunk, + block_size=1, target=["b3", "b5"], weights="b2", validation_split=0.3, @@ -996,5 +1017,159 @@ def test(size_of_batch, size_of_chunk, num_of_entries): test(batch_size, chunk_size, entries_in_rdf) + def test15_two_runs_set_seed(self): + self.create_file() + + try: + + both_runs_collected_x_val = [] + both_runs_collected_y_val = [] + + df = ROOT.RDataFrame(self.tree_name, self.file_name1) + for _ in range(2): + + gen_train, gen_validation = ROOT.TMVA.Experimental.CreateNumPyGenerators( + df, + batch_size=3, + chunk_size=5, + block_size=2, + target="b2", + validation_split=0.4, + shuffle=True, + drop_remainder=False, + set_seed = 42 + ) + + collected_x_train = [] + collected_x_val = [] + collected_y_train = [] + collected_y_val = [] + + iter_train = iter(gen_train) + iter_val = iter(gen_validation) + + for _ in range(self.n_train_batch): + x, y = next(iter_train) + self.assertTrue(x.shape == (3, 1)) + self.assertTrue(y.shape == (3, 1)) + collected_x_train.append(x.tolist()) + collected_y_train.append(y.tolist()) + + for _ in range(self.n_val_batch): + x, y = next(iter_val) + self.assertTrue(x.shape == (3, 1)) + self.assertTrue(y.shape == (3, 1)) + collected_x_val.append(x.tolist()) + collected_y_val.append(y.tolist()) + + x, y = next(iter_val) + self.assertTrue(x.shape == (self.val_remainder, 1)) + self.assertTrue(y.shape == (self.val_remainder, 1)) + collected_x_val.append(x.tolist()) + collected_y_val.append(y.tolist()) + + flat_x_train = { + x for xl in collected_x_train for xs in xl for x in xs} + flat_x_val = { + x for xl in collected_x_val for xs in xl for x in xs} + flat_y_train = { + y for yl in collected_y_train for ys in yl for y in ys} + flat_y_val = { + y for yl in collected_y_val for ys in yl for y in ys} + + self.assertEqual(len(flat_x_train), 6) + self.assertEqual(len(flat_x_val), 4) + self.assertEqual(len(flat_y_train), 6) + self.assertEqual(len(flat_y_val), 4) + + both_runs_collected_x_val.append(collected_x_val) + both_runs_collected_y_val.append(collected_y_val) + self.assertEqual( + both_runs_collected_x_val[0], both_runs_collected_x_val[1]) + self.assertEqual( + both_runs_collected_y_val[0], both_runs_collected_y_val[1]) + finally: + self.teardown_file(self.file_name1) + + def test16_vector_padding(self): + self.create_vector_file() + + try: + df = ROOT.RDataFrame(self.tree_name, self.file_name3) + max_vec_sizes = {"v1": 3, "v2": 2} + + gen_train, gen_validation = ROOT.TMVA.Experimental.CreateNumPyGenerators( + df, + batch_size=3, + chunk_size=5, + block_size=2, + target="b1", + validation_split=0.4, + max_vec_sizes=max_vec_sizes, + shuffle=False, + drop_remainder=False, + ) + + + results_x_train = [0.0, 0.0, 0.0, 0.0, 0.0, + 1.0, 10.0, 0, 100.0, 1000.0, + 2.0, 20.0, 0, 200.0, 2000.0, + 3.0, 30.0, 0, 300.0, 3000.0, + 4.0, 40.0, 0, 400.0, 4000.0, + 5.0, 50.0, 0, 500.0, 5000.0] + results_y_train = [0.0, 1.0, 2.0, 3.0, 4.0, 5.0] + results_x_val = [6.0, 60.0, 0.0, 600.0, 6000.0, + 7.0, 70.0, 0.0, 700.0, 7000.0, + 8.0, 80.0, 0.0, 800.0, 8000.0, + 9.0, 90.0, 0.0, 900.0, 9000.0] + results_y_val = [6.0, 7.0, 8.0, 9.0] + + collected_x_train = [] + collected_x_val = [] + collected_y_train = [] + collected_y_val = [] + + train_iter = iter(gen_train) + val_iter = iter(gen_validation) + + for _ in range(self.n_val_batch): + x, y = next(val_iter) + self.assertTrue(x.shape == (3, 5)) + self.assertTrue(y.shape == (3, 1)) + collected_x_val.append(x.tolist()) + collected_y_val.append(y.tolist()) + + for _ in range(self.n_train_batch): + x, y = next(train_iter) + self.assertTrue(x.shape == (3, 5)) + self.assertTrue(y.shape == (3, 1)) + collected_x_train.append(x.tolist()) + collected_y_train.append(y.tolist()) + + x, y = next(val_iter) + self.assertTrue(x.shape == (self.val_remainder, 5)) + self.assertTrue(y.shape == (self.val_remainder, 1)) + collected_x_val.append(x.tolist()) + collected_y_val.append(y.tolist()) + + flat_x_train = [ + x for xl in collected_x_train for xs in xl for x in xs] + flat_x_val = [x for xl in collected_x_val for xs in xl for x in xs] + flat_y_train = [ + y for yl in collected_y_train for ys in yl for y in ys] + flat_y_val = [y for yl in collected_y_val for ys in yl for y in ys] + + self.assertEqual(results_x_train, flat_x_train) + self.assertEqual(results_x_val, flat_x_val) + self.assertEqual(results_y_train, flat_y_train) + self.assertEqual(results_y_val, flat_y_val) + + self.teardown_file(self.file_name3) + + except: + self.teardown_file(self.file_name3) + raise + + if __name__ == '__main__': unittest.main() diff --git a/tmva/tmva/CMakeLists.txt b/tmva/tmva/CMakeLists.txt index c15e2dc980b9d..c67a261b5b176 100644 --- a/tmva/tmva/CMakeLists.txt +++ b/tmva/tmva/CMakeLists.txt @@ -446,6 +446,7 @@ ROOT_STANDARD_LIBRARY_PACKAGE(TMVAUtils TMVA/BatchGenerator/RBatchGenerator.hxx TMVA/BatchGenerator/RBatchLoader.hxx TMVA/BatchGenerator/RChunkLoader.hxx + TMVA/BatchGenerator/RChunkConstructor.hxx SOURCES diff --git a/tmva/tmva/inc/TMVA/BatchGenerator/RBatchGenerator.hxx b/tmva/tmva/inc/TMVA/BatchGenerator/RBatchGenerator.hxx index d56419f5cf8d4..0b0e017169872 100644 --- a/tmva/tmva/inc/TMVA/BatchGenerator/RBatchGenerator.hxx +++ b/tmva/tmva/inc/TMVA/BatchGenerator/RBatchGenerator.hxx @@ -2,9 +2,10 @@ // Author: Kristupas Pranckietis, Vilnius University 05/2024 // Author: Nopphakorn Subsa-Ard, King Mongkut's University of Technology Thonburi (KMUTT) (TH) 08/2024 // Author: Vincenzo Eduardo Padulano, CERN 10/2024 +// Author: Martin Føll, University of Oslo (UiO) & CERN 05/2025 /************************************************************************* - * Copyright (C) 1995-2024, Rene Brun and Fons Rademakers. * + * Copyright (C) 1995-2025, Rene Brun and Fons Rademakers. * * All rights reserved. * * * * For the licensing terms see $ROOTSYS/LICENSE. * @@ -32,27 +33,40 @@ namespace TMVA { namespace Experimental { namespace Internal { +// clang-format off +/** +\class ROOT::TMVA::Experimental::Internal::RBatchGenerator +\ingroup tmva +\brief + +In this class, the processes of loading chunks (see RChunkLoader) and creating batches from those chunks (see RBatchLoader) are combined, allowing batches from the training and validation sets to be loaded directly from a dataset in an RDataFrame. +*/ + template class RBatchGenerator { private: - std::mt19937 fRng; - std::mt19937 fFixedRng; - std::random_device::result_type fFixedSeed; - + std::vector fCols; + // clang-format on std::size_t fChunkSize; std::size_t fMaxChunks; std::size_t fBatchSize; + std::size_t fBlockSize; + std::size_t fNumColumns; + std::size_t fNumChunkCols; std::size_t fNumEntries; + std::size_t fSetSeed; + std::size_t fSumVecSizes; + ROOT::RDF::RResultPtr> fEntries; float fValidationSplit; - std::variant>, std::shared_ptr>> fChunkLoader; - + std::unique_ptr> fChunkLoader; std::unique_ptr fBatchLoader; std::unique_ptr fLoadingThread; - std::unique_ptr> fChunkTensor; + std::size_t fTrainingChunkNum; + std::size_t fValidationChunkNum; ROOT::RDF::RNode &f_rdf; @@ -64,54 +78,110 @@ private: bool fNotFiltered; bool fUseWholeFile; + bool fEpochActive{false}; + bool fTrainingEpochActive{false}; + bool fValidationEpochActive{false}; + + std::size_t fNumTrainingEntries; + std::size_t fNumValidationEntries; + + std::size_t fNumTrainingChunks; + std::size_t fNumValidationChunks; + + std::size_t fLeftoverTrainingBatchSize; + std::size_t fLeftoverValidationBatchSize; + + std::size_t fNumFullTrainingBatches; + std::size_t fNumFullValidationBatches; + + std::size_t fNumLeftoverTrainingBatches; + std::size_t fNumLeftoverValidationBatches; + + std::size_t fNumTrainingBatches; + std::size_t fNumValidationBatches; + + TMVA::Experimental::RTensor fTrainTensor; + TMVA::Experimental::RTensor fTrainChunkTensor; + + TMVA::Experimental::RTensor fValidationTensor; + TMVA::Experimental::RTensor fValidationChunkTensor; + public: - RBatchGenerator(ROOT::RDF::RNode &rdf, const std::size_t chunkSize, const std::size_t batchSize, - const std::vector &cols, const std::size_t numColumns, + RBatchGenerator(ROOT::RDF::RNode &rdf, const std::size_t chunkSize, const std::size_t blockSize, + const std::size_t batchSize, const std::vector &cols, const std::vector &vecSizes = {}, const float vecPadding = 0.0, const float validationSplit = 0.0, const std::size_t maxChunks = 0, bool shuffle = true, - bool dropRemainder = true) - : fRng(std::random_device{}()), - fFixedSeed(std::uniform_int_distribution{}(fRng)), - f_rdf(rdf), + bool dropRemainder = true, const std::size_t setSeed = 0) + + : f_rdf(rdf), + fCols(cols), fChunkSize(chunkSize), + fBlockSize(blockSize), fBatchSize(batchSize), fValidationSplit(validationSplit), fMaxChunks(maxChunks), fDropRemainder(dropRemainder), + fSetSeed(setSeed), fShuffle(shuffle), fNotFiltered(f_rdf.GetFilterNames().empty()), - fUseWholeFile(maxChunks == 0) + fUseWholeFile(maxChunks == 0), + fNumColumns(cols.size()), + fTrainTensor({0, 0}), + fTrainChunkTensor({0, 0}), + fValidationTensor({0, 0}), + fValidationChunkTensor({0, 0}) { - // Create tensor to load the chunk into - fChunkTensor = - std::make_unique>(std::vector{fChunkSize, numColumns}); + fNumEntries = f_rdf.Count().GetValue(); + fEntries = f_rdf.Take("rdfentry_"); + + fSumVecSizes = std::accumulate(vecSizes.begin(), vecSizes.end(), 0); + fNumChunkCols = fNumColumns + fSumVecSizes - vecSizes.size(); + + // add the last element in entries to not go out of range when filling chunks + fEntries->push_back((*fEntries)[fNumEntries - 1] + 1); + + fChunkLoader = + std::make_unique>(f_rdf, fNumEntries, fEntries, fChunkSize, fBlockSize, fValidationSplit, + fCols, vecSizes, vecPadding, fShuffle, fSetSeed); + fBatchLoader = std::make_unique(fChunkSize, fBatchSize, fNumChunkCols); + + // split the dataset into training and validation sets + fChunkLoader->SplitDataset(); + + // number of training and validation entries after the split + fNumValidationEntries = static_cast(fValidationSplit * fNumEntries); + fNumTrainingEntries = fNumEntries - fNumValidationEntries; + + fLeftoverTrainingBatchSize = fNumTrainingEntries % fBatchSize; + fLeftoverValidationBatchSize = fNumValidationEntries % fBatchSize; + + fNumFullTrainingBatches = fNumTrainingEntries / fBatchSize; + fNumFullValidationBatches = fNumValidationEntries / fBatchSize; - if (fNotFiltered) { - fNumEntries = f_rdf.Count().GetValue(); + fNumLeftoverTrainingBatches = fLeftoverTrainingBatchSize == 0 ? 0 : 1; + fNumLeftoverValidationBatches = fLeftoverValidationBatchSize == 0 ? 0 : 1; - fChunkLoader = std::make_unique>( - f_rdf, *fChunkTensor, fChunkSize, cols, vecSizes, vecPadding); - } else { - auto report = f_rdf.Report(); - fNumEntries = f_rdf.Count().GetValue(); - std::size_t numAllEntries = report.begin()->GetAll(); + if (dropRemainder) { + fNumTrainingBatches = fNumFullTrainingBatches; + fNumValidationBatches = fNumFullValidationBatches; + } - fChunkLoader = std::make_unique>( - f_rdf, *fChunkTensor, fChunkSize, cols, fNumEntries, numAllEntries, vecSizes, vecPadding); + else { + fNumTrainingBatches = fNumFullTrainingBatches + fNumLeftoverTrainingBatches; + fNumValidationBatches = fNumFullValidationBatches + fNumLeftoverValidationBatches; } - std::size_t maxBatches = ceil((fChunkSize / fBatchSize) * (1 - fValidationSplit)); + // number of training and validation chunks, calculated in RChunkConstructor + fNumTrainingChunks = fChunkLoader->GetNumTrainingChunks(); + fNumValidationChunks = fChunkLoader->GetNumValidationChunks(); - // limits the number of batches that can be contained in the batchqueue based on the chunksize - fBatchLoader = std::make_unique(*fChunkTensor, fBatchSize, numColumns, - maxBatches); + fTrainingChunkNum = 0; + fValidationChunkNum = 0; } ~RBatchGenerator() { DeActivate(); } - /// \brief De-activate the loading process by deactivating the batchgenerator - /// and joining the loading thread void DeActivate() { { @@ -140,193 +210,104 @@ public: fIsActive = true; } - fFixedRng.seed(fFixedSeed); fBatchLoader->Activate(); // fLoadingThread = std::make_unique(&RBatchGenerator::LoadChunks, this); - if (fNotFiltered) { - fLoadingThread = std::make_unique(&RBatchGenerator::LoadChunksNoFilters, this); - } else { - fLoadingThread = std::make_unique(&RBatchGenerator::LoadChunksFilters, this); - } } - /// \brief Returns the next batch of training data if available. - /// Returns empty RTensor otherwise. - /// \return - const TMVA::Experimental::RTensor &GetTrainBatch() - { - // Get next batch if available - return fBatchLoader->GetTrainBatch(); - } + void ActivateEpoch() { fEpochActive = true; } - /// \brief Returns the next batch of validation data if available. - /// Returns empty RTensor otherwise. - /// \return - const TMVA::Experimental::RTensor &GetValidationBatch() - { - // Get next batch if available - return fBatchLoader->GetValidationBatch(); - } + void DeActivateEpoch() { fEpochActive = false; } - std::size_t NumberOfTrainingBatches() - { - std::size_t entriesForTraining = - (fNumEntries / fChunkSize) * (fChunkSize - floor(fChunkSize * fValidationSplit)) + fNumEntries % fChunkSize - - floor(fValidationSplit * (fNumEntries % fChunkSize)); + void ActivateTrainingEpoch() { fTrainingEpochActive = true; } - if (fDropRemainder || !(entriesForTraining % fBatchSize)) { - return entriesForTraining / fBatchSize; - } + void DeActivateTrainingEpoch() { fTrainingEpochActive = false; } - return entriesForTraining / fBatchSize + 1; - } - - /// @brief Return number of training remainder rows - /// @return - std::size_t TrainRemainderRows() - { - std::size_t entriesForTraining = - (fNumEntries / fChunkSize) * (fChunkSize - floor(fChunkSize * fValidationSplit)) + fNumEntries % fChunkSize - - floor(fValidationSplit * (fNumEntries % fChunkSize)); + void ActivateValidationEpoch() { fValidationEpochActive = true; } - if (fDropRemainder || !(entriesForTraining % fBatchSize)) { - return 0; - } + void DeActivateValidationEpoch() { fValidationEpochActive = false; } - return entriesForTraining % fBatchSize; - } - - /// @brief Calculate number of validation batches and return it - /// @return - std::size_t NumberOfValidationBatches() + /// \brief Create training batches by first loading a chunk (see RChunkLoader) and split it into batches (see RBatchLoader) + void CreateTrainBatches() { - std::size_t entriesForValidation = (fNumEntries / fChunkSize) * floor(fChunkSize * fValidationSplit) + - floor((fNumEntries % fChunkSize) * fValidationSplit); - - if (fDropRemainder || !(entriesForValidation % fBatchSize)) { - - return entriesForValidation / fBatchSize; - } - return entriesForValidation / fBatchSize + 1; + fChunkLoader->CreateTrainingChunksIntervals(); + fTrainingEpochActive = true; + fTrainingChunkNum = 0; + fChunkLoader->LoadTrainingChunk(fTrainChunkTensor, fTrainingChunkNum); + std::size_t lastTrainingBatch = fNumTrainingChunks - fTrainingChunkNum; + fBatchLoader->CreateTrainingBatches(fTrainChunkTensor, lastTrainingBatch, fLeftoverTrainingBatchSize, + fDropRemainder); + fTrainingChunkNum++; } - /// @brief Return number of validation remainder rows - /// @return - std::size_t ValidationRemainderRows() + /// \brief Creates validation batches by first loading a chunk (see RChunkLoader), and then split it into batches (see RBatchLoader) + void CreateValidationBatches() { - std::size_t entriesForValidation = (fNumEntries / fChunkSize) * floor(fChunkSize * fValidationSplit) + - floor((fNumEntries % fChunkSize) * fValidationSplit); - if (fDropRemainder || !(entriesForValidation % fBatchSize)) { - - return 0; - } - - return entriesForValidation % fBatchSize; + fChunkLoader->CreateValidationChunksIntervals(); + fValidationEpochActive = true; + fValidationChunkNum = 0; + fChunkLoader->LoadValidationChunk(fValidationChunkTensor, fValidationChunkNum); + std::size_t lastValidationBatch = fNumValidationChunks - fValidationChunkNum; + fBatchLoader->CreateValidationBatches(fValidationChunkTensor, lastValidationBatch, fLeftoverValidationBatchSize, + fDropRemainder); + fValidationChunkNum++; } - /// @brief Load chunks when no filters are applied on rdataframe - void LoadChunksNoFilters() + /// \brief Loads a training batch from the queue + TMVA::Experimental::RTensor GetTrainBatch() { - for (std::size_t currentChunk = 0, currentEntry = 0; - ((currentChunk < fMaxChunks) || fUseWholeFile) && currentEntry < fNumEntries; currentChunk++) { - - // stop the loop when the loading is not active anymore - { - std::lock_guard lock(fIsActiveMutex); - if (!fIsActive) - return; - } - - // A pair that consists the proccessed, and passed events while loading the chunk - std::size_t report = std::get>>(fChunkLoader)->LoadChunk(currentEntry); - currentEntry += report; - - CreateBatches(report); + auto batchQueue = fBatchLoader->GetNumTrainingBatchQueue(); + + // load the next chunk if the queue is empty + if (batchQueue < 1 && fTrainingChunkNum < fNumTrainingChunks) { + fChunkLoader->LoadTrainingChunk(fTrainChunkTensor, fTrainingChunkNum); + std::size_t lastTrainingBatch = fNumTrainingChunks - fTrainingChunkNum; + fBatchLoader->CreateTrainingBatches(fTrainChunkTensor, lastTrainingBatch, fLeftoverTrainingBatchSize, + fDropRemainder); + fTrainingChunkNum++; } - if (!fDropRemainder) { - fBatchLoader->LastBatches(); + else { + ROOT::Internal::RDF::ChangeBeginAndEndEntries(f_rdf, 0, fNumEntries); } - fBatchLoader->DeActivate(); + // Get next batch if available + return fBatchLoader->GetTrainBatch(); } - void LoadChunksFilters() + /// \brief Loads a validation batch from the queue + TMVA::Experimental::RTensor GetValidationBatch() { - std::size_t currentChunk = 0; - for (std::size_t processedEvents = 0, currentRow = 0; - ((currentChunk < fMaxChunks) || fUseWholeFile) && processedEvents < fNumEntries; currentChunk++) { - - // stop the loop when the loading is not active anymore - { - std::lock_guard lock(fIsActiveMutex); - if (!fIsActive) - return; - } - - // A pair that consists the proccessed, and passed events while loading the chunk - std::pair report = - std::get>>(fChunkLoader)->LoadChunk(currentRow); - - currentRow += report.first; - processedEvents += report.second; - - CreateBatches(report.second); + auto batchQueue = fBatchLoader->GetNumValidationBatchQueue(); + + // load the next chunk if the queue is empty + if (batchQueue < 1 && fValidationChunkNum < fNumValidationChunks) { + fChunkLoader->LoadValidationChunk(fValidationChunkTensor, fValidationChunkNum); + std::size_t lastValidationBatch = fNumValidationChunks - fValidationChunkNum; + fBatchLoader->CreateValidationBatches(fValidationChunkTensor, lastValidationBatch, + fLeftoverValidationBatchSize, fDropRemainder); + fValidationChunkNum++; } - if (currentChunk < fMaxChunks || fUseWholeFile) { - CreateBatches(std::get>>(fChunkLoader)->LastChunk()); + else { + ROOT::Internal::RDF::ChangeBeginAndEndEntries(f_rdf, 0, fNumEntries); } - if (!fDropRemainder) { - fBatchLoader->LastBatches(); - } - - fBatchLoader->DeActivate(); - } - - /// \brief Create batches - /// \param processedEvents - void CreateBatches(std::size_t processedEvents) - { - auto &&[trainingIndices, validationIndices] = createIndices(processedEvents); - - fBatchLoader->CreateTrainingBatches(trainingIndices); - fBatchLoader->CreateValidationBatches(validationIndices); + // Get next batch if available + return fBatchLoader->GetValidationBatch(); } - /// \brief split the events of the current chunk into training and validation events, shuffle if needed - /// \param events - std::pair, std::vector> createIndices(std::size_t events) - { - // Create a vector of number 1..events - std::vector row_order = std::vector(events); - std::iota(row_order.begin(), row_order.end(), 0); - - if (fShuffle) { - // Shuffle the entry indices at every new epoch - std::shuffle(row_order.begin(), row_order.end(), fFixedRng); - } - - // calculate the number of events used for validation - std::size_t num_validation = floor(events * fValidationSplit); + std::size_t NumberOfTrainingBatches() { return fNumTrainingBatches; } + std::size_t NumberOfValidationBatches() { return fNumValidationBatches; } - // Devide the vector into training and validation and return - std::vector trainingIndices = - std::vector({row_order.begin(), row_order.end() - num_validation}); - std::vector validationIndices = - std::vector({row_order.end() - num_validation, row_order.end()}); - - if (fShuffle) { - std::shuffle(trainingIndices.begin(), trainingIndices.end(), fRng); - } - - return std::make_pair(trainingIndices, validationIndices); - } + std::size_t TrainRemainderRows() { return fLeftoverTrainingBatchSize; } + std::size_t ValidationRemainderRows() { return fLeftoverValidationBatchSize; } bool IsActive() { return fIsActive; } + bool TrainingIsActive() { return fTrainingEpochActive; } + /// \brief Returns the next batch of validation data if available. + /// Returns empty RTensor otherwise. }; } // namespace Internal diff --git a/tmva/tmva/inc/TMVA/BatchGenerator/RBatchLoader.hxx b/tmva/tmva/inc/TMVA/BatchGenerator/RBatchLoader.hxx index 0da8eb4f38010..012473ec712b0 100644 --- a/tmva/tmva/inc/TMVA/BatchGenerator/RBatchLoader.hxx +++ b/tmva/tmva/inc/TMVA/BatchGenerator/RBatchLoader.hxx @@ -2,9 +2,10 @@ // Author: Kristupas Pranckietis, Vilnius University 05/2024 // Author: Nopphakorn Subsa-Ard, King Mongkut's University of Technology Thonburi (KMUTT) (TH) 08/2024 // Author: Vincenzo Eduardo Padulano, CERN 10/2024 +// Author: Martin Føll, University of Oslo (UiO) & CERN 05/2025 /************************************************************************* - * Copyright (C) 1995-2024, Rene Brun and Fons Rademakers. * + * Copyright (C) 1995-2025, Rene Brun and Fons Rademakers. * * All rights reserved. * * * * For the licensing terms see $ROOTSYS/LICENSE. * @@ -30,9 +31,19 @@ namespace TMVA { namespace Experimental { namespace Internal { +// clang-format off +/** +\class ROOT::TMVA::Experimental::Internal::RBatchLoader +\ingroup tmva +\brief Building and loading the batches from loaded chunks in RChunkLoader + +In this class the chunks that are loaded into memory (see RChunkLoader) are split into batches used in the ML training which are loaded into a queue. This is done for both the training and validation chunks separatly. +*/ + class RBatchLoader { private: - const TMVA::Experimental::RTensor &fChunkTensor; + // clang-format on + std::size_t fChunkSize; std::size_t fBatchSize; std::size_t fNumColumns; std::size_t fMaxBatches; @@ -44,71 +55,48 @@ private: std::mutex fBatchLock; std::condition_variable fBatchCondition; + // queuse of tensors of the training and validation batches std::queue>> fTrainingBatchQueue; std::queue>> fValidationBatchQueue; - std::unique_ptr> fCurrentBatch; - std::unique_ptr> fTrainingRemainder; - std::unique_ptr> fValidationRemainder; + // number of training and validation batches in the queue + std::size_t fNumTrainingBatchQueue; + std::size_t fNumValidationBatchQueue; -public: - RBatchLoader(const TMVA::Experimental::RTensor &chunkTensor, const std::size_t batchSize, - const std::size_t numColumns, const std::size_t maxBatches) - : fChunkTensor(chunkTensor), fBatchSize(batchSize), fNumColumns(numColumns), fMaxBatches(maxBatches) - { - // Create remainders tensors - fTrainingRemainder = - std::make_unique>(std::vector{fBatchSize - 1, fNumColumns}); - fValidationRemainder = - std::make_unique>(std::vector{fBatchSize - 1, fNumColumns}); - } + // current batch that is loaded into memeory + std::unique_ptr> fCurrentBatch; + + // primary and secondary batches used to create batches from a chunk + std::unique_ptr> fPrimaryLeftoverTrainingBatch; + std::unique_ptr> fSecondaryLeftoverTrainingBatch; - ~RBatchLoader() { DeActivate(); } + std::unique_ptr> fPrimaryLeftoverValidationBatch; + std::unique_ptr> fSecondaryLeftoverValidationBatch; public: - /// \brief Return a batch of data as a unique pointer. - /// After the batch has been processed, it should be destroyed. - /// \return Training batch - const TMVA::Experimental::RTensor &GetTrainBatch() + RBatchLoader(std::size_t chunkSize, std::size_t batchSize, std::size_t numColumns) + : fChunkSize(chunkSize), fBatchSize(batchSize), fNumColumns(numColumns) { - std::unique_lock lock(fBatchLock); - fBatchCondition.wait(lock, [this]() { return !fTrainingBatchQueue.empty() || !fIsActive; }); - if (fTrainingBatchQueue.empty()) { - fCurrentBatch = std::make_unique>(std::vector({0})); - return *fCurrentBatch; - } + fPrimaryLeftoverTrainingBatch = + std::make_unique>(std::vector{0, fNumColumns}); + fSecondaryLeftoverTrainingBatch = + std::make_unique>(std::vector{0, fNumColumns}); - fCurrentBatch = std::move(fTrainingBatchQueue.front()); - fTrainingBatchQueue.pop(); + fPrimaryLeftoverValidationBatch = + std::make_unique>(std::vector{0, fNumColumns}); + fSecondaryLeftoverValidationBatch = + std::make_unique>(std::vector{0, fNumColumns}); - fBatchCondition.notify_all(); - - return *fCurrentBatch; + fNumTrainingBatchQueue = fTrainingBatchQueue.size(); + fNumValidationBatchQueue = fValidationBatchQueue.size(); } - /// \brief Returns a batch of data for validation - /// The owner of this batch has to be with the RBatchLoader. - /// This is because the same validation batches should be used in all epochs. - /// \return Validation batch - const TMVA::Experimental::RTensor &GetValidationBatch() - { - if (fValidationBatchQueue.empty()) { - fCurrentBatch = std::make_unique>(std::vector({0})); - return *fCurrentBatch; - } - - fCurrentBatch = std::move(fValidationBatchQueue.front()); - fValidationBatchQueue.pop(); - - return *fCurrentBatch; - } - - /// \brief Activate the batchloader so it will accept chunks to batch +public: void Activate() { - fTrainingRemainderRow = 0; - fValidationRemainderRow = 0; + // fTrainingRemainderRow = 0; + // fValidationRemainderRow = 0; { std::lock_guard lock(fBatchLock); @@ -128,158 +116,247 @@ public: fBatchCondition.notify_all(); } + /// \brief Return a batch of data as a unique pointer. + /// After the batch has been processed, it should be destroyed. + /// \param[in] chunkTensor RTensor with the data from the chunk + /// \param[in] idxs Index of batch in the chunk + /// \return Training batch std::unique_ptr> - CreateBatch(const TMVA::Experimental::RTensor &chunkTensor, std::span idxs, - std::size_t batchSize) + CreateBatch(TMVA::Experimental::RTensor &chunkTensor, std::size_t idxs) { auto batch = - std::make_unique>(std::vector({batchSize, fNumColumns})); - - for (std::size_t i = 0; i < batchSize; i++) { - std::copy(chunkTensor.GetData() + (idxs[i] * fNumColumns), - chunkTensor.GetData() + ((idxs[i] + 1) * fNumColumns), batch->GetData() + i * fNumColumns); - } + std::make_unique>(std::vector({fBatchSize, fNumColumns})); + std::copy(chunkTensor.GetData() + (idxs * fBatchSize * fNumColumns), + chunkTensor.GetData() + ((idxs + 1) * fBatchSize * fNumColumns), batch->GetData()); return batch; } - std::unique_ptr> - CreateFirstBatch(const TMVA::Experimental::RTensor &remainderTensor, std::size_t remainderTensorRow, - std::span eventIndices) + + /// \brief Loading the training batch from the queue + /// \return Training batch + TMVA::Experimental::RTensor GetTrainBatch() { - auto batch = - std::make_unique>(std::vector({fBatchSize, fNumColumns})); - for (size_t i = 0; i < remainderTensorRow; i++) { - std::copy(remainderTensor.GetData() + i * fNumColumns, remainderTensor.GetData() + (i + 1) * fNumColumns, - batch->GetData() + i * fNumColumns); + if (fTrainingBatchQueue.empty()) { + fCurrentBatch = std::make_unique>(std::vector({0})); + return *fCurrentBatch; } - for (std::size_t i = 0; i < (fBatchSize - remainderTensorRow); i++) { - std::copy(fChunkTensor.GetData() + eventIndices[i] * fNumColumns, - fChunkTensor.GetData() + (eventIndices[i] + 1) * fNumColumns, - batch->GetData() + (i + remainderTensorRow) * fNumColumns); - } + fCurrentBatch = std::move(fTrainingBatchQueue.front()); + fTrainingBatchQueue.pop(); - return batch; + return *fCurrentBatch; } - /// @brief save to remaining data when the whole chunk has to be saved - /// @param chunkTensor - /// @param remainderTensor - /// @param remainderTensorRow - /// @param eventIndices - void SaveRemainingData(TMVA::Experimental::RTensor &remainderTensor, const std::size_t remainderTensorRow, - const std::vector eventIndices, const std::size_t start = 0) + /// \brief Loading the validation batch from the queue + /// \return Training batch + TMVA::Experimental::RTensor GetValidationBatch() { - for (std::size_t i = start; i < eventIndices.size(); i++) { - std::copy(fChunkTensor.GetData() + eventIndices[i] * fNumColumns, - fChunkTensor.GetData() + (eventIndices[i] + 1) * fNumColumns, - remainderTensor.GetData() + (i - start + remainderTensorRow) * fNumColumns); + + if (fValidationBatchQueue.empty()) { + fCurrentBatch = std::make_unique>(std::vector({0})); + return *fCurrentBatch; } + + fCurrentBatch = std::move(fValidationBatchQueue.front()); + fValidationBatchQueue.pop(); + + return *fCurrentBatch; } - /// \brief Create training batches from the given chunk of data based on the given event indices - /// Batches are added to the training queue of batches - /// \param chunkTensor - /// \param eventIndices - void CreateTrainingBatches(const std::vector &eventIndices) + /// \brief Creating the training batches from a chunk and add them to the queue. + /// \param[in] chunkTensor RTensor with the data from the chunk + /// \param[in] lastbatch Check if the batch in the chunk is the last one + /// \param[in] leftoverBatchSize Size of the leftover batch in the training dataset + /// \param[in] dromRemainder Bool to drop the remainder batch or not + void CreateTrainingBatches(TMVA::Experimental::RTensor &chunkTensor, int lastbatch, + std::size_t leftoverBatchSize, bool dropRemainder) { - // Wait until less than a full chunk of batches are in the queue before splitting the next chunk into - // batches - { - std::unique_lock lock(fBatchLock); - fBatchCondition.wait(lock, [this]() { return (fTrainingBatchQueue.size() < fMaxBatches) || !fIsActive; }); - if (!fIsActive) - return; - } + std::size_t ChunkSize = chunkTensor.GetShape()[0]; + std::size_t Batches = ChunkSize / fBatchSize; + std::size_t LeftoverBatchSize = ChunkSize % fBatchSize; + // create a vector of batches std::vector>> batches; - if (eventIndices.size() + fTrainingRemainderRow >= fBatchSize) { - batches.emplace_back(CreateFirstBatch(*fTrainingRemainder, fTrainingRemainderRow, eventIndices)); - } else { - SaveRemainingData(*fTrainingRemainder, fTrainingRemainderRow, eventIndices); - fTrainingRemainderRow += eventIndices.size(); - return; - } - - // Create tasks of fBatchSize until all idx are used - std::size_t start = fBatchSize - fTrainingRemainderRow; - for (; (start + fBatchSize) <= eventIndices.size(); start += fBatchSize) { - // Grab the first fBatchSize indices - std::span idxs{eventIndices.data() + start, eventIndices.data() + start + fBatchSize}; - + // fill the full batches from the chunk into a vector + for (std::size_t i = 0; i < Batches; i++) { // Fill a batch - batches.emplace_back(CreateBatch(fChunkTensor, idxs, fBatchSize)); + batches.emplace_back(CreateBatch(chunkTensor, i)); } - { - std::unique_lock lock(fBatchLock); - for (std::size_t i = 0; i < batches.size(); i++) { - fTrainingBatchQueue.push(std::move(batches[i])); + // copy the remaining entries from the chunk into a leftover batch + TMVA::Experimental::RTensor LeftoverBatch({LeftoverBatchSize, fNumColumns}); + std::copy(chunkTensor.GetData() + (Batches * fBatchSize * fNumColumns), + chunkTensor.GetData() + (Batches * fBatchSize * fNumColumns + LeftoverBatchSize * fNumColumns), + LeftoverBatch.GetData()); + + // calculate how many empty slots are left in fPrimaryLeftoverTrainingBatch + std::size_t PrimaryLeftoverSize = (*fPrimaryLeftoverTrainingBatch).GetShape()[0]; + std::size_t emptySlots = fBatchSize - PrimaryLeftoverSize; + + // copy LeftoverBatch to end of fPrimaryLeftoverTrainingBatch + if (emptySlots >= LeftoverBatchSize) { + (*fPrimaryLeftoverTrainingBatch) = + (*fPrimaryLeftoverTrainingBatch).Resize({PrimaryLeftoverSize + LeftoverBatchSize, fNumColumns}); + std::copy(LeftoverBatch.GetData(), LeftoverBatch.GetData() + (LeftoverBatchSize * fNumColumns), + fPrimaryLeftoverTrainingBatch->GetData() + (PrimaryLeftoverSize * fNumColumns)); + + // copy LeftoverBatch to end of fPrimaryLeftoverTrainingBatch and add it to the batch vector + if (emptySlots == LeftoverBatchSize) { + auto copy = + std::make_unique>(std::vector{fBatchSize, fNumColumns}); + std::copy(fPrimaryLeftoverTrainingBatch->GetData(), + fPrimaryLeftoverTrainingBatch->GetData() + (fBatchSize * fNumColumns), copy->GetData()); + batches.emplace_back(std::move(copy)); + + // reset fPrimaryLeftoverTrainingBatch and fSecondaryLeftoverTrainingBatch + *fPrimaryLeftoverTrainingBatch = *fSecondaryLeftoverTrainingBatch; + fSecondaryLeftoverValidationBatch = + std::make_unique>(std::vector{0, fNumColumns}); } } - fBatchCondition.notify_all(); - - fTrainingRemainderRow = eventIndices.size() - start; - SaveRemainingData(*fTrainingRemainder, 0, eventIndices, start); - } - - /// \brief Create validation batches from the given chunk based on the given event indices - /// Batches are added to the vector of validation batches - /// \param chunkTensor - /// \param eventIndices - void CreateValidationBatches(const std::vector &eventIndices) - { - if (eventIndices.size() + fValidationRemainderRow >= fBatchSize) { - fValidationBatchQueue.push(CreateFirstBatch(*fValidationRemainder, fValidationRemainderRow, eventIndices)); - } else { - SaveRemainingData(*fValidationRemainder, fValidationRemainderRow, eventIndices); - fValidationRemainderRow += eventIndices.size(); - return; + // copy LeftoverBatch to both fPrimaryLeftoverTrainingBatch and fSecondaryLeftoverTrainingBatch + else if (emptySlots < LeftoverBatchSize) { + // copy the first part of LeftoverBatch to end of fPrimaryLeftoverTrainingBatch + (*fPrimaryLeftoverTrainingBatch) = (*fPrimaryLeftoverTrainingBatch).Resize({fBatchSize, fNumColumns}); + std::copy(LeftoverBatch.GetData(), LeftoverBatch.GetData() + (emptySlots * fNumColumns), + fPrimaryLeftoverTrainingBatch->GetData() + (PrimaryLeftoverSize * fNumColumns)); + + // copy the last part of LeftoverBatch to the end of fSecondaryLeftoverTrainingBatch + (*fSecondaryLeftoverTrainingBatch) = + (*fSecondaryLeftoverTrainingBatch).Resize({LeftoverBatchSize - emptySlots, fNumColumns}); + std::copy(LeftoverBatch.GetData() + (emptySlots * fNumColumns), + LeftoverBatch.GetData() + (LeftoverBatchSize * fNumColumns), + fSecondaryLeftoverTrainingBatch->GetData()); + + // add fPrimaryLeftoverTrainingBatch to the batch vector + auto copy = + std::make_unique>(std::vector{fBatchSize, fNumColumns}); + std::copy(fPrimaryLeftoverTrainingBatch->GetData(), + fPrimaryLeftoverTrainingBatch->GetData() + (fBatchSize * fNumColumns), copy->GetData()); + batches.emplace_back(std::move(copy)); + + // exchange fPrimaryLeftoverTrainingBatch and fSecondaryLeftoverValidationBatch + *fPrimaryLeftoverTrainingBatch = *fSecondaryLeftoverTrainingBatch; + + // restet fSecondaryLeftoverValidationBatch + fSecondaryLeftoverValidationBatch = + std::make_unique>(std::vector{0, fNumColumns}); } - // Create tasks of fBatchSize untill all idx are used - std::size_t start = fBatchSize - fValidationRemainderRow; - for (; (start + fBatchSize) <= eventIndices.size(); start += fBatchSize) { - - std::vector idx; + // copy the content of fPrimaryLeftoverTrainingBatch to the leftover batch from the chunk + if (lastbatch == 1) { - for (std::size_t i = start; i < (start + fBatchSize); i++) { - idx.push_back(eventIndices[i]); + if (dropRemainder == false && leftoverBatchSize > 0) { + auto copy = std::make_unique>( + std::vector{leftoverBatchSize, fNumColumns}); + std::copy((*fPrimaryLeftoverTrainingBatch).GetData(), + (*fPrimaryLeftoverTrainingBatch).GetData() + (leftoverBatchSize * fNumColumns), copy->GetData()); + batches.emplace_back(std::move(copy)); } - fValidationBatchQueue.push(CreateBatch(fChunkTensor, idx, fBatchSize)); + fPrimaryLeftoverTrainingBatch = + std::make_unique>(std::vector{0, fNumColumns}); + fSecondaryLeftoverTrainingBatch = + std::make_unique>(std::vector{0, fNumColumns}); } - fValidationRemainderRow = eventIndices.size() - start; - SaveRemainingData(*fValidationRemainder, 0, eventIndices, start); + // append the batches from the batch vector from the chunk to the training batch queue + for (std::size_t i = 0; i < batches.size(); i++) { + fTrainingBatchQueue.push(std::move(batches[i])); + } } - - void LastBatches() + + /// \brief Creating the validation batches from a chunk and adding them to the queue + /// \param[in] chunkTensor RTensor with the data from the chunk + /// \param[in] lastbatch Check if the batch in the chunk is the last one + /// \param[in] leftoverBatchSize Size of the leftover batch in the validation dataset + /// \param[in] dromRemainder Bool to drop the remainder batch or not + void CreateValidationBatches(TMVA::Experimental::RTensor &chunkTensor, std::size_t lastbatch, + std::size_t leftoverBatchSize, bool dropRemainder) { - { - if (fTrainingRemainderRow) { - std::vector idx = std::vector(fTrainingRemainderRow); - std::iota(idx.begin(), idx.end(), 0); + std::size_t ChunkSize = chunkTensor.GetShape()[0]; + std::size_t NumCols = chunkTensor.GetShape()[1]; + std::size_t Batches = ChunkSize / fBatchSize; + std::size_t LeftoverBatchSize = ChunkSize % fBatchSize; + + std::vector>> batches; - std::unique_ptr> batch = - CreateBatch(*fTrainingRemainder, idx, fTrainingRemainderRow); + for (std::size_t i = 0; i < Batches; i++) { + // Fill a batch + batches.emplace_back(CreateBatch(chunkTensor, i)); + } - std::unique_lock lock(fBatchLock); - fTrainingBatchQueue.push(std::move(batch)); + TMVA::Experimental::RTensor LeftoverBatch({LeftoverBatchSize, NumCols}); + std::copy(chunkTensor.GetData() + (Batches * fBatchSize * NumCols), + chunkTensor.GetData() + (Batches * fBatchSize * NumCols + LeftoverBatchSize * NumCols), + LeftoverBatch.GetData()); + + std::size_t PrimaryLeftoverSize = (*fPrimaryLeftoverValidationBatch).GetShape()[0]; + std::size_t emptySlots = fBatchSize - PrimaryLeftoverSize; + + if (emptySlots >= LeftoverBatchSize) { + (*fPrimaryLeftoverValidationBatch) = + (*fPrimaryLeftoverValidationBatch).Resize({PrimaryLeftoverSize + LeftoverBatchSize, NumCols}); + std::copy(LeftoverBatch.GetData(), LeftoverBatch.GetData() + (LeftoverBatchSize * NumCols), + fPrimaryLeftoverValidationBatch->GetData() + (PrimaryLeftoverSize * NumCols)); + + if (emptySlots == LeftoverBatchSize) { + auto copy = + std::make_unique>(std::vector{fBatchSize, fNumColumns}); + std::copy(fPrimaryLeftoverValidationBatch->GetData(), + fPrimaryLeftoverValidationBatch->GetData() + (fBatchSize * fNumColumns), copy->GetData()); + batches.emplace_back(std::move(copy)); + *fPrimaryLeftoverValidationBatch = *fSecondaryLeftoverValidationBatch; + fSecondaryLeftoverValidationBatch = + std::make_unique>(std::vector{0, fNumColumns}); } } - if (fValidationRemainderRow) { - std::vector idx = std::vector(fValidationRemainderRow); - std::iota(idx.begin(), idx.end(), 0); + else if (emptySlots < LeftoverBatchSize) { + (*fPrimaryLeftoverValidationBatch) = (*fPrimaryLeftoverValidationBatch).Resize({fBatchSize, NumCols}); + std::copy(LeftoverBatch.GetData(), LeftoverBatch.GetData() + (emptySlots * NumCols), + fPrimaryLeftoverValidationBatch->GetData() + (PrimaryLeftoverSize * NumCols)); + (*fSecondaryLeftoverValidationBatch) = + (*fSecondaryLeftoverValidationBatch).Resize({LeftoverBatchSize - emptySlots, NumCols}); + std::copy(LeftoverBatch.GetData() + (emptySlots * NumCols), + LeftoverBatch.GetData() + (LeftoverBatchSize * NumCols), + fSecondaryLeftoverValidationBatch->GetData()); + auto copy = + std::make_unique>(std::vector{fBatchSize, fNumColumns}); + std::copy(fPrimaryLeftoverValidationBatch->GetData(), + fPrimaryLeftoverValidationBatch->GetData() + (fBatchSize * fNumColumns), copy->GetData()); + batches.emplace_back(std::move(copy)); + *fPrimaryLeftoverValidationBatch = *fSecondaryLeftoverValidationBatch; + fSecondaryLeftoverValidationBatch = + std::make_unique>(std::vector{0, fNumColumns}); + } + + if (lastbatch == 1) { + + if (dropRemainder == false && leftoverBatchSize > 0) { + auto copy = std::make_unique>( + std::vector{leftoverBatchSize, fNumColumns}); + std::copy((*fPrimaryLeftoverValidationBatch).GetData(), + (*fPrimaryLeftoverValidationBatch).GetData() + (leftoverBatchSize * fNumColumns), + copy->GetData()); + batches.emplace_back(std::move(copy)); + } + fPrimaryLeftoverValidationBatch = + std::make_unique>(std::vector{0, fNumColumns}); + fSecondaryLeftoverValidationBatch = + std::make_unique>(std::vector{0, fNumColumns}); + } - fValidationBatchQueue.push(CreateBatch(*fValidationRemainder, idx, fValidationRemainderRow)); + for (std::size_t i = 0; i < batches.size(); i++) { + fValidationBatchQueue.push(std::move(batches[i])); } } + std::size_t GetNumTrainingBatchQueue() { return fTrainingBatchQueue.size(); } + std::size_t GetNumValidationBatchQueue() { return fValidationBatchQueue.size(); } }; } // namespace Internal diff --git a/tmva/tmva/inc/TMVA/BatchGenerator/RChunkConstructor.hxx b/tmva/tmva/inc/TMVA/BatchGenerator/RChunkConstructor.hxx new file mode 100644 index 0000000000000..a585503830217 --- /dev/null +++ b/tmva/tmva/inc/TMVA/BatchGenerator/RChunkConstructor.hxx @@ -0,0 +1,245 @@ +// Author: Martin Føll, University of Oslo (UiO) & CERN 05/2025 + +/************************************************************************* + * Copyright (C) 1995-2025, Rene Brun and Fons Rademakers. * + * All rights reserved. * + * * + * For the licensing terms see $ROOTSYS/LICENSE. * + * For the list of contributors see $ROOTSYS/README/CREDITS. * + *************************************************************************/ + +#ifndef TMVA_RCHUNKCONSTRUCTOR +#define TMVA_RCHUNKCONSTRUCTOR + +#include + +#include "TMVA/RTensor.hxx" +#include "ROOT/RDataFrame.hxx" +#include "ROOT/RDF/Utils.hxx" +#include "ROOT/RVec.hxx" + +#include "ROOT/RLogger.hxx" + +namespace TMVA { +namespace Experimental { +namespace Internal { + +// clang-format off +/** +\class ROOT::TMVA::Experimental::Internal::RChunkConstructor +\ingroup tmva +\brief The logic for constructing chunks from a dataset. + +This struct handles the logic for splitting a dataset into smaller subsets +known as chunks, which are constructed from blocks. + +A chunk is the largest portion of the dataset loaded into memory at once, +and each chunk is further divided into batches for machine learning training. + +The dataset is split into disjoint chunks based on a user-defined chunk size. +There are two types of chunks: + - Full chunks: contain exactly the number of entries specified by the chunk size. + - Leftover chunk: contains any remaining entries that don't make up a full chunk. + +Each chunk is constructed from blocks based on a user-defined block size. +There are two types of blocks: + - Full blocks: contain exactly the number of entries specified by the block size. + - Leftover block: contains any remaining entries that don't make up a full block. + +The blocks are defined by their start and end entries, which correspond to positions within the dataset’s total number of entries. +*/ + +struct RChunkConstructor { + // clang-format on + std::size_t fNumEntries{}; + std::size_t fChunkSize{}; + std::size_t fBlockSize{}; + + // size of full and leftover chunks + std::size_t SizeOfFullChunk; + std::size_t SizeOfLeftoverChunk; + + // size of full and leftover blocks in a full and leftover chunk + std::size_t SizeOfFullBlockInFullChunk; + std::size_t SizeOfLeftoverBlockInFullChunk; + std::size_t SizeOfFullBlockInLeftoverChunk; + std::size_t SizeOfLeftoverBlockInLeftoverChunk; + + // number of full, leftover and total chunks + std::size_t FullChunks; + std::size_t LeftoverChunks; + std::size_t Chunks; + + // number of full, leftover and total blocks in a full chunk + std::size_t FullBlocksPerFullChunk; + std::size_t LeftoverBlocksPerFullChunk; + std::size_t BlockPerFullChunk; + + // number of full, leftover and total blocks in the leftover chunk + std::size_t FullBlocksPerLeftoverChunk; + std::size_t LeftoverBlocksPerLeftoverChunk; + std::size_t BlockPerLeftoverChunk; + + // total number of full and leftover blocks in the full chunks + std::size_t FullBlocksInFullChunks; + std::size_t LeftoverBlocksInFullChunks; + + // total number of full and leftover blocks in the leftover chunks + std::size_t FullBlocksInLeftoverChunks; + std::size_t LeftoverBlocksInLeftoverChunks; + + // vector of the different block sizes + std::vector SizeOfBlocks; + + // vector with the number of the different block + std::vector NumberOfDifferentBlocks; + + // total number of blocks + std::size_t NumberOfBlocks; + + // pair of start and end entries in the different block types + std::vector> BlockIntervals; + + std::vector> FullBlockIntervalsInFullChunks; + std::vector> LeftoverBlockIntervalsInFullChunks; + + std::vector> FullBlockIntervalsInLeftoverChunks; + std::vector> LeftoverBlockIntervalsInLeftoverChunks; + + std::vector>> ChunksIntervals; + + std::vector ChunksSizes; + + RChunkConstructor(const std::size_t numEntries, const std::size_t chunkSize, const std::size_t blockSize) + : fNumEntries(numEntries), fChunkSize(chunkSize), fBlockSize(blockSize) + { + // size of full and leftover chunks + SizeOfFullChunk = chunkSize; + SizeOfLeftoverChunk = fNumEntries % SizeOfFullChunk; + + // size of full and leftover blocks in a full and leftover chunk + SizeOfFullBlockInFullChunk = blockSize; + SizeOfLeftoverBlockInFullChunk = SizeOfFullChunk % blockSize; + SizeOfFullBlockInLeftoverChunk = blockSize; + SizeOfLeftoverBlockInLeftoverChunk = SizeOfLeftoverChunk % blockSize; + + // number of full, leftover and total chunks + FullChunks = numEntries / SizeOfFullChunk; + LeftoverChunks = SizeOfLeftoverChunk == 0 ? 0 : 1; + Chunks = FullChunks + LeftoverChunks; + + // number of full, leftover and total blocks in a full chunk + FullBlocksPerFullChunk = SizeOfFullChunk / blockSize; + LeftoverBlocksPerFullChunk = SizeOfLeftoverBlockInFullChunk == 0 ? 0 : 1; + BlockPerFullChunk = FullBlocksPerFullChunk + LeftoverBlocksPerFullChunk; + + // number of full, leftover and total blocks in the leftover chunk + FullBlocksPerLeftoverChunk = SizeOfLeftoverChunk / blockSize; + LeftoverBlocksPerLeftoverChunk = SizeOfLeftoverBlockInLeftoverChunk == 0 ? 0 : 1; + BlockPerLeftoverChunk = FullBlocksPerLeftoverChunk + LeftoverBlocksPerLeftoverChunk; + + // total number of full and leftover blocks in the full chunks + FullBlocksInFullChunks = FullBlocksPerFullChunk * FullChunks; + LeftoverBlocksInFullChunks = LeftoverBlocksPerFullChunk * FullChunks; + + // total number of full and leftover blocks in the leftover chunks + FullBlocksInLeftoverChunks = FullBlocksPerLeftoverChunk * LeftoverChunks; + LeftoverBlocksInLeftoverChunks = LeftoverBlocksPerLeftoverChunk * LeftoverChunks; + + // vector of the different block sizes + SizeOfBlocks = {SizeOfFullBlockInFullChunk, SizeOfLeftoverBlockInFullChunk, SizeOfFullBlockInLeftoverChunk, + SizeOfLeftoverBlockInLeftoverChunk}; + + // vector with the number of the different block + NumberOfDifferentBlocks = {FullBlocksInFullChunks, LeftoverBlocksInFullChunks, FullBlocksInLeftoverChunks, + LeftoverBlocksInLeftoverChunks}; + + // total number of blocks + NumberOfBlocks = std::accumulate(NumberOfDifferentBlocks.begin(), NumberOfDifferentBlocks.end(), 0); + }; + + ////////////////////////////////////////////////////////////////////////// + /// \brief Group the blocks based on the block type (full or leftover) based on the size of the block. + void DistributeBlockIntervals() + { + + std::vector> *> TypesOfBlockIntervals = { + &FullBlockIntervalsInFullChunks, &LeftoverBlockIntervalsInFullChunks, &FullBlockIntervalsInLeftoverChunks, + &LeftoverBlockIntervalsInLeftoverChunks}; + + std::vector IndexOfDifferentBlocks(NumberOfDifferentBlocks.size()); + std::partial_sum(NumberOfDifferentBlocks.begin(), NumberOfDifferentBlocks.end(), IndexOfDifferentBlocks.begin()); + IndexOfDifferentBlocks.insert(IndexOfDifferentBlocks.begin(), 0); + + for (size_t i = 0; i < TypesOfBlockIntervals.size(); ++i) { + size_t start = IndexOfDifferentBlocks[i]; + size_t end = IndexOfDifferentBlocks[i + 1]; + + TypesOfBlockIntervals[i]->insert(TypesOfBlockIntervals[i]->begin(), BlockIntervals.begin() + start, + BlockIntervals.begin() + end); + } + } + + ////////////////////////////////////////////////////////////////////////// + /// \brief Creates chunks from the dataset consisting of blocks with the begin and end entry. + void CreateChunksIntervals() + { + + ChunksIntervals.resize(Chunks); + for (size_t i = 0; i < FullChunks; i++) { + + size_t start_FullBlock = FullBlocksPerFullChunk * i; + size_t end_FullBlock = FullBlocksPerFullChunk * (i + 1); + + size_t start_LeftoverBlock = LeftoverBlocksPerFullChunk * i; + size_t end_LeftoverBlock = LeftoverBlocksPerFullChunk * (i + 1); + + ChunksIntervals[i].insert(ChunksIntervals[i].end(), FullBlockIntervalsInFullChunks.begin() + start_FullBlock, + FullBlockIntervalsInFullChunks.begin() + end_FullBlock); + ChunksIntervals[i].insert(ChunksIntervals[i].end(), + LeftoverBlockIntervalsInFullChunks.begin() + start_LeftoverBlock, + LeftoverBlockIntervalsInFullChunks.begin() + end_LeftoverBlock); + } + + for (size_t i = 0; i < LeftoverChunks; i++) { + + size_t j = i + FullChunks; + size_t start_FullBlock = FullBlocksPerLeftoverChunk * i; + size_t end_FullBlock = FullBlocksPerLeftoverChunk * (i + 1); + + size_t start_LeftoverBlock = LeftoverBlocksPerLeftoverChunk * i; + size_t end_LeftoverBlock = LeftoverBlocksPerLeftoverChunk * (i + 1); + + ChunksIntervals[j].insert(ChunksIntervals[j].end(), + FullBlockIntervalsInLeftoverChunks.begin() + start_FullBlock, + FullBlockIntervalsInLeftoverChunks.begin() + end_FullBlock); + ChunksIntervals[j].insert(ChunksIntervals[j].end(), + LeftoverBlockIntervalsInLeftoverChunks.begin() + start_LeftoverBlock, + LeftoverBlockIntervalsInLeftoverChunks.begin() + end_LeftoverBlock); + } + } + + ////////////////////////////////////////////////////////////////////////// + /// \brief Fills a vector with the size of every chunk from the dataset + void SizeOfChunks() + { + + for (size_t i = 0; i < Chunks; i++) { + std::size_t chunkSize = 0; + for (size_t j = 0; j < ChunksIntervals[i].size(); j++) { + std::size_t start = ChunksIntervals[i][j].first; + std::size_t end = ChunksIntervals[i][j].second; + + std::size_t intervalSize = end - start; + chunkSize += intervalSize; + } + + ChunksSizes.insert(ChunksSizes.end(), chunkSize); + } + } +}; +} // namespace Internal +} // namespace Experimental +} // namespace TMVA + +#endif // TMVA_RCHUNKCONSTRUCTOR diff --git a/tmva/tmva/inc/TMVA/BatchGenerator/RChunkLoader.hxx b/tmva/tmva/inc/TMVA/BatchGenerator/RChunkLoader.hxx index d06003bc7afc5..6d081719fa2b1 100644 --- a/tmva/tmva/inc/TMVA/BatchGenerator/RChunkLoader.hxx +++ b/tmva/tmva/inc/TMVA/BatchGenerator/RChunkLoader.hxx @@ -2,9 +2,10 @@ // Author: Kristupas Pranckietis, Vilnius University 05/2024 // Author: Nopphakorn Subsa-Ard, King Mongkut's University of Technology Thonburi (KMUTT) (TH) 08/2024 // Author: Vincenzo Eduardo Padulano, CERN 10/2024 +// Author: Martin Føll, University of Oslo (UiO) & CERN 05/2025 /************************************************************************* - * Copyright (C) 1995-2024, Rene Brun and Fons Rademakers. * + * Copyright (C) 1995-2025, Rene Brun and Fons Rademakers. * * All rights reserved. * * * * For the licensing terms see $ROOTSYS/LICENSE. * @@ -15,8 +16,10 @@ #define TMVA_RCHUNKLOADER #include +#include #include "TMVA/RTensor.hxx" +#include "TMVA/BatchGenerator/RChunkConstructor.hxx" #include "ROOT/RDataFrame.hxx" #include "ROOT/RDF/Utils.hxx" #include "ROOT/RVec.hxx" @@ -27,268 +30,503 @@ namespace TMVA { namespace Experimental { namespace Internal { -// RChunkLoader class used to load content of a RDataFrame onto a RTensor. +// clang-format off +/** +\class ROOT::TMVA::Experimental::Internal::RChunkLoaderFunctor +\ingroup tmva +\brief Loading chunks made in RChunkLoader into tensors from data from RDataFrame. +*/ + template class RChunkLoaderFunctor { + // clang-format on std::size_t fOffset{}; std::size_t fVecSizeIdx{}; float fVecPadding{}; std::vector fMaxVecSizes{}; - TMVA::Experimental::RTensor &fChunkTensor; - template ::value, int> = 0> - void AssignToTensor(const T &vec) - { - const auto &max_vec_size = fMaxVecSizes[fVecSizeIdx++]; - const auto &vec_size = vec.size(); - if (vec_size < max_vec_size) // Padding vector column to max_vec_size with fVecPadding - { - std::copy(vec.cbegin(), vec.cend(), &fChunkTensor.GetData()[fOffset]); - std::fill(&fChunkTensor.GetData()[fOffset + vec_size], &fChunkTensor.GetData()[fOffset + max_vec_size], - fVecPadding); - } else // Copy only max_vec_size length from vector column - { - std::copy(vec.cbegin(), vec.cbegin() + max_vec_size, &fChunkTensor.GetData()[fOffset]); - } - fOffset += max_vec_size; - } + std::size_t fNumChunkCols; - template ::value, int> = 0> - void AssignToTensor(const T &val) - { - fChunkTensor.GetData()[fOffset++] = val; - } - -public: - RChunkLoaderFunctor(TMVA::Experimental::RTensor &chunkTensor, const std::vector &maxVecSizes, - float vecPadding) - : fChunkTensor(chunkTensor), fMaxVecSizes(maxVecSizes), fVecPadding(vecPadding) - { - } - - void operator()(const ColTypes &...cols) - { - fVecSizeIdx = 0; - (AssignToTensor(cols), ...); - } -}; - -template -class RChunkLoaderFunctorFilters { - -private: - std::size_t fOffset{}; - std::size_t fVecSizeIdx{}; - std::size_t fEntries{}; - std::size_t fChunkSize{}; - float fVecPadding{}; - std::vector fMaxVecSizes{}; - - TMVA::Experimental::RTensor &fChunkTensor; - TMVA::Experimental::RTensor &fRemainderTensor; + int fI; + int fNumColumns; + ////////////////////////////////////////////////////////////////////////// + /// \brief Copy the content of a column into RTensor when the column consits of vectors template ::value, int> = 0> - void AssignToTensor(const T &vec) + void AssignToTensor(const T &vec, int i, int numColumns) { std::size_t max_vec_size = fMaxVecSizes[fVecSizeIdx++]; std::size_t vec_size = vec.size(); if (vec_size < max_vec_size) // Padding vector column to max_vec_size with fVecPadding { - std::copy(vec.begin(), vec.end(), &fChunkTensor.GetData()[fOffset]); - std::fill(&fChunkTensor.GetData()[fOffset + vec_size], &fChunkTensor.GetData()[fOffset + max_vec_size], - fVecPadding); + std::copy(vec.begin(), vec.end(), &fChunkTensor.GetData()[fOffset + numColumns * i]); + std::fill(&fChunkTensor.GetData()[fOffset + numColumns * i + vec_size], + &fChunkTensor.GetData()[fOffset + numColumns * i + max_vec_size], fVecPadding); } else // Copy only max_vec_size length from vector column { - std::copy(vec.begin(), vec.begin() + max_vec_size, &fChunkTensor.GetData()[fOffset]); + std::copy(vec.begin(), vec.begin() + max_vec_size, &fChunkTensor.GetData()[fOffset + numColumns * i]); } fOffset += max_vec_size; - fEntries++; } + ////////////////////////////////////////////////////////////////////////// + /// \brief Copy the content of a column into RTensor when the column consits of single values template ::value, int> = 0> - void AssignToTensor(const T &val) + void AssignToTensor(const T &val, int i, int numColumns) { - fChunkTensor.GetData()[fOffset++] = val; - fEntries++; + fChunkTensor.GetData()[fOffset + numColumns * i] = val; + fOffset++; + // fChunkTensor.GetData()[numColumns * i] = val; } public: - RChunkLoaderFunctorFilters(TMVA::Experimental::RTensor &chunkTensor, - TMVA::Experimental::RTensor &remainderTensor, std::size_t entries, - std::size_t chunkSize, std::size_t &&offset, - const std::vector &maxVecSizes = std::vector(), - const float vecPadding = 0.0) - : fChunkTensor(chunkTensor), - fRemainderTensor(remainderTensor), - fEntries(entries), - fChunkSize(chunkSize), - fOffset(offset), - fMaxVecSizes(maxVecSizes), - fVecPadding(vecPadding) + RChunkLoaderFunctor(TMVA::Experimental::RTensor &chunkTensor, std::size_t numColumns, + const std::vector &maxVecSizes, float vecPadding, int i) + : fChunkTensor(chunkTensor), fMaxVecSizes(maxVecSizes), fVecPadding(vecPadding), fI(i), fNumColumns(numColumns) { } void operator()(const ColTypes &...cols) { fVecSizeIdx = 0; - if (fEntries == fChunkSize) { - fChunkTensor = fRemainderTensor; - fOffset = 0; - } - (AssignToTensor(cols), ...); + (AssignToTensor(cols, fI, fNumColumns), ...); } - - std::size_t &SetEntries() { return fEntries; } - std::size_t &SetOffset() { return fOffset; } }; +// clang-format off +/** +\class ROOT::TMVA::Experimental::Internal::RChunkLoader +\ingroup tmva +\brief Building and loading the chunks from the blocks and chunks constructed in RChunkConstructor + +In this class the blocks are stiches together to form chunks that are loaded into memory. The blocks used to create each chunk comes from different parts of the dataset. This is achieved by shuffling the blocks before distributing them into chunks. The purpose of this process is to reduce bias during machine learning training by ensuring that the data is well mixed. The dataset is also spit into training and validation sets with the user-defined validation split fraction. +*/ + template class RChunkLoader { - private: + // clang-format on + std::size_t fNumEntries; std::size_t fChunkSize; - - std::vector fCols; + std::size_t fBlockSize; + float fValidationSplit; std::vector fVecSizes; + std::size_t fSumVecSizes; std::size_t fVecPadding; + std::size_t fNumChunkCols; + + std::size_t fNumTrainEntries; + std::size_t fNumValidationEntries; ROOT::RDF::RNode &f_rdf; - TMVA::Experimental::RTensor &fChunkTensor; + std::vector fCols; + std::size_t fNumCols; + std::size_t fSetSeed; + + bool fNotFiltered; + bool fShuffle; + + ROOT::RDF::RResultPtr> fEntries; + + std::unique_ptr fTraining; + std::unique_ptr fValidation; public: - /// \brief Constructor for the RChunkLoader - /// \param rdf - /// \param chunkSize - /// \param cols - /// \param vecSizes - /// \param vecPadding - RChunkLoader(ROOT::RDF::RNode &rdf, TMVA::Experimental::RTensor &chunkTensor, const std::size_t chunkSize, - const std::vector &cols, const std::vector &vecSizes = {}, - const float vecPadding = 0.0) + RChunkLoader(ROOT::RDF::RNode &rdf, std::size_t numEntries, + ROOT::RDF::RResultPtr> rdf_entries, const std::size_t chunkSize, + const std::size_t blockSize, const float validationSplit, const std::vector &cols, + const std::vector &vecSizes = {}, const float vecPadding = 0.0, bool shuffle = true, + const std::size_t setSeed = 0) : f_rdf(rdf), - fChunkTensor(chunkTensor), - fChunkSize(chunkSize), + fNumEntries(numEntries), + fEntries(rdf_entries), fCols(cols), fVecSizes(vecSizes), - fVecPadding(vecPadding) + fVecPadding(vecPadding), + fChunkSize(chunkSize), + fBlockSize(blockSize), + fValidationSplit(validationSplit), + fNotFiltered(f_rdf.GetFilterNames().empty()), + fShuffle(shuffle), + fSetSeed(setSeed) { + fNumCols = fCols.size(); + fSumVecSizes = std::accumulate(fVecSizes.begin(), fVecSizes.end(), 0); + + fNumChunkCols = fNumCols + fSumVecSizes - fVecSizes.size(); + + // number of training and validation entries after the split + fNumValidationEntries = static_cast(fValidationSplit * fNumEntries); + fNumTrainEntries = fNumEntries - fNumValidationEntries; + + fTraining = std::make_unique(fNumTrainEntries, fChunkSize, fBlockSize); + fValidation = std::make_unique(fNumValidationEntries, fChunkSize, fBlockSize); } - /// \brief Load a chunk of data using the RChunkLoaderFunctor - /// \param chunkTensor - /// \param currentRow - /// \return Number of processed events - std::size_t LoadChunk(const std::size_t currentRow) + ////////////////////////////////////////////////////////////////////////// + /// \brief Distribute the blocks into training and validation datasets + void SplitDataset() { - RChunkLoaderFunctor func(fChunkTensor, fVecSizes, fVecPadding); + std::random_device rd; + std::mt19937 g; - ROOT::Internal::RDF::ChangeBeginAndEndEntries(f_rdf, currentRow, currentRow + fChunkSize); - auto myCount = f_rdf.Count(); + if (fSetSeed == 0) { + g.seed(rd()); + } else { + g.seed(fSetSeed); + } - // load data - f_rdf.Foreach(func, fCols); + std::vector BlockSizes = {}; - // get loading info - return myCount.GetValue(); - } -}; + // fill the training and validation block sizes + for (size_t i = 0; i < fTraining->NumberOfDifferentBlocks.size(); i++) { + BlockSizes.insert(BlockSizes.end(), fTraining->NumberOfDifferentBlocks[i], fTraining->SizeOfBlocks[i]); + } -template -class RChunkLoaderFilters { + for (size_t i = 0; i < fValidation->NumberOfDifferentBlocks.size(); i++) { + BlockSizes.insert(BlockSizes.end(), fValidation->NumberOfDifferentBlocks[i], fValidation->SizeOfBlocks[i]); + } -private: - ROOT::RDF::RNode &f_rdf; - TMVA::Experimental::RTensor &fChunkTensor; + // make an identity permutation map + std::vector indices(BlockSizes.size()); - std::size_t fChunkSize; - std::vector fCols; - const std::size_t fNumEntries; - std::size_t fNumAllEntries; - std::vector fVecSizes; - std::size_t fVecPadding; - std::size_t fNumColumns; + for (int i = 0; i < indices.size(); ++i) { + indices[i] = i; + } - const std::size_t fPartOfChunkSize; - TMVA::Experimental::RTensor fRemainderChunkTensor; - std::size_t fRemainderChunkTensorRow = 0; + // shuffle the identity permutation to create a new permutation + if (fShuffle) { + std::shuffle(indices.begin(), indices.end(), g); + } -public: - /// \brief Constructor for the RChunkLoader - /// \param rdf - /// \param chunkSize - /// \param cols - /// \param filters - /// \param vecSizes - /// \param vecPadding - RChunkLoaderFilters(ROOT::RDF::RNode &rdf, TMVA::Experimental::RTensor &chunkTensor, - const std::size_t chunkSize, const std::vector &cols, std::size_t numEntries, - std::size_t numAllEntries, const std::vector &vecSizes = {}, - const float vecPadding = 0.0) - : f_rdf(rdf), - fChunkTensor(chunkTensor), - fChunkSize(chunkSize), - fCols(cols), - fNumEntries(numEntries), - fNumAllEntries(numAllEntries), - fVecSizes(vecSizes), - fVecPadding(vecPadding), - fNumColumns(cols.size()), - fPartOfChunkSize(chunkSize / 5), - fRemainderChunkTensor(std::vector{fPartOfChunkSize, fNumColumns}) - { + // use the permuation to shuffle the vector of block sizes + std::vector PermutedBlockSizes(BlockSizes.size()); + for (int i = 0; i < BlockSizes.size(); ++i) { + PermutedBlockSizes[i] = BlockSizes[indices[i]]; + } + + // create a vector for storing the boundaries of the blocks + std::vector BlockBoundaries(BlockSizes.size()); + + // get the boundaries of the blocks with the partial sum of the block sizes + // insert 0 at the beginning for the lower boundary of the first block + std::partial_sum(PermutedBlockSizes.begin(), PermutedBlockSizes.end(), BlockBoundaries.begin()); + BlockBoundaries.insert(BlockBoundaries.begin(), 0); + + // distribute the neighbouring block boudaries into pairs to get the intevals for the blocks + std::vector> BlockIntervals; + for (size_t i = 0; i < BlockBoundaries.size() - 1; ++i) { + BlockIntervals.emplace_back(BlockBoundaries[i], BlockBoundaries[i + 1]); + } + + // use the inverse of the permutation above to order the block intervals in the same order as + // the original vector of block sizes + std::vector> UnpermutedBlockIntervals(BlockIntervals.size()); + for (int i = 0; i < BlockIntervals.size(); ++i) { + UnpermutedBlockIntervals[indices[i]] = BlockIntervals[i]; + } + + // distribute the block intervals between training and validation + fTraining->BlockIntervals.insert(fTraining->BlockIntervals.begin(), UnpermutedBlockIntervals.begin(), + UnpermutedBlockIntervals.begin() + fTraining->NumberOfBlocks); + fValidation->BlockIntervals.insert(fValidation->BlockIntervals.begin(), + UnpermutedBlockIntervals.begin() + fTraining->NumberOfBlocks, + UnpermutedBlockIntervals.end()); + + // distribute the different block intervals types for training and validation + fTraining->DistributeBlockIntervals(); + fValidation->DistributeBlockIntervals(); } - /// \brief Load a chunk of data using the RChunkLoaderFunctor - /// \param chunkTensor - /// \param currentRow - /// \return A pair of size_t defining the number of events processed and how many passed all filters - std::pair LoadChunk(std::size_t currentRow) + ////////////////////////////////////////////////////////////////////////// + /// \brief Create training chunks consisiting of block intervals of different types + void CreateTrainingChunksIntervals() { - for (std::size_t i = 0; i < fRemainderChunkTensorRow; i++) { - std::copy(fRemainderChunkTensor.GetData() + (i * fNumColumns), - fRemainderChunkTensor.GetData() + ((i + 1) * fNumColumns), - fChunkTensor.GetData() + (i * fNumColumns)); + + std::random_device rd; + std::mt19937 g; + + if (fSetSeed == 0) { + g.seed(rd()); + } else { + g.seed(fSetSeed); } - RChunkLoaderFunctorFilters func(fChunkTensor, fRemainderChunkTensor, fRemainderChunkTensorRow, - fChunkSize, fRemainderChunkTensorRow * fNumColumns, fVecSizes, - fVecPadding); + // shuffle the block intervals within each type of block + if (fShuffle) { + std::shuffle(fTraining->FullBlockIntervalsInFullChunks.begin(), + fTraining->FullBlockIntervalsInFullChunks.end(), g); + std::shuffle(fTraining->LeftoverBlockIntervalsInFullChunks.begin(), + fTraining->LeftoverBlockIntervalsInFullChunks.end(), g); + std::shuffle(fTraining->FullBlockIntervalsInLeftoverChunks.begin(), + fTraining->FullBlockIntervalsInLeftoverChunks.end(), g); + std::shuffle(fTraining->LeftoverBlockIntervalsInLeftoverChunks.begin(), + fTraining->LeftoverBlockIntervalsInLeftoverChunks.end(), g); + } + + // reset the chunk intervals and sizes before each epoch + fTraining->ChunksIntervals = {}; + fTraining->ChunksSizes = {}; + + // create the chunks each consisiting of block intervals + fTraining->CreateChunksIntervals(); + + if (fShuffle) { + std::shuffle(fTraining->ChunksIntervals.begin(), fTraining->ChunksIntervals.end(), g); + } + + fTraining->SizeOfChunks(); + } + + ////////////////////////////////////////////////////////////////////////// + /// \brief Create training chunks consisiting of block intervals of different types + void CreateValidationChunksIntervals() + { + std::random_device rd; + std::mt19937 g; - std::size_t passedEvents = 0; - std::size_t processedEvents = 0; + if (fSetSeed == 0) { + g.seed(rd()); + } else { + g.seed(fSetSeed); + } - while ((passedEvents < fChunkSize && passedEvents < fNumEntries) && currentRow < fNumAllEntries) { - ROOT::Internal::RDF::ChangeBeginAndEndEntries(f_rdf, currentRow, currentRow + fPartOfChunkSize); - auto report = f_rdf.Report(); + if (fShuffle) { + std::shuffle(fValidation->FullBlockIntervalsInFullChunks.begin(), + fValidation->FullBlockIntervalsInFullChunks.end(), g); + std::shuffle(fValidation->LeftoverBlockIntervalsInFullChunks.begin(), + fValidation->LeftoverBlockIntervalsInFullChunks.end(), g); + std::shuffle(fValidation->FullBlockIntervalsInLeftoverChunks.begin(), + fValidation->FullBlockIntervalsInLeftoverChunks.end(), g); + std::shuffle(fValidation->LeftoverBlockIntervalsInLeftoverChunks.begin(), + fValidation->LeftoverBlockIntervalsInLeftoverChunks.end(), g); + } - f_rdf.Foreach(func, fCols); + fValidation->ChunksIntervals = {}; + fValidation->ChunksSizes = {}; - processedEvents += report.begin()->GetAll(); - passedEvents += (report.end() - 1)->GetPass(); + fValidation->CreateChunksIntervals(); - currentRow += fPartOfChunkSize; - func.SetEntries() = passedEvents; - func.SetOffset() = passedEvents * fNumColumns; + if (fShuffle) { + std::shuffle(fValidation->ChunksIntervals.begin(), fValidation->ChunksIntervals.end(), g); } - fRemainderChunkTensorRow = passedEvents > fChunkSize ? passedEvents - fChunkSize : 0; + fValidation->SizeOfChunks(); + } + + ////////////////////////////////////////////////////////////////////////// + /// \brief Load the nth chunk from the training dataset into a tensor + /// \param[in] TrainChunkTensor RTensor for the training chunk + /// \param[in] chunk Index of the chunk in the dataset + void LoadTrainingChunk(TMVA::Experimental::RTensor &TrainChunkTensor, std::size_t chunk) + { + + std::random_device rd; + std::mt19937 g; + + if (fSetSeed == 0) { + g.seed(rd()); + } else { + g.seed(fSetSeed); + } - return std::make_pair(processedEvents, passedEvents); + std::size_t chunkSize = fTraining->ChunksSizes[chunk]; + + if (chunk < fTraining->Chunks) { + TMVA::Experimental::RTensor Tensor({chunkSize, fNumChunkCols}); + TrainChunkTensor = TrainChunkTensor.Resize({{chunkSize, fNumChunkCols}}); + + // make an identity permutation map + std::vector indices(chunkSize); + std::iota(indices.begin(), indices.end(), 0); + + // shuffle the identity permutation to create a new permutation + if (fShuffle) { + std::shuffle(indices.begin(), indices.end(), g); + } + + // fill a chunk by looping over the blocks in a chunk (see RChunkConstructor) + std::size_t chunkEntry = 0; + std::vector> BlocksInChunk = fTraining->ChunksIntervals[chunk]; + + std::sort(BlocksInChunk.begin(), BlocksInChunk.end(), + [](const std::pair& a, const std::pair& b) { + return a.first < b.first; + }); + + for (std::size_t i = 0; i < BlocksInChunk.size(); i++) { + + // Use the block start and end entry to load into the chunk if the dataframe is not filtered + if (fNotFiltered) { + RChunkLoaderFunctor func(Tensor, fNumChunkCols, fVecSizes, fVecPadding, chunkEntry); + ROOT::Internal::RDF::ChangeBeginAndEndEntries(f_rdf, BlocksInChunk[i].first, BlocksInChunk[i].second); + + f_rdf.Foreach(func, fCols); + chunkEntry += BlocksInChunk[i].second - BlocksInChunk[i].first; + } + + // use the entry column of the dataframe as a map to load the entries that passed the filters + else { + std::size_t blockSize = BlocksInChunk[i].second - BlocksInChunk[i].first; + for (std::size_t j = 0; j < blockSize; j++) { + RChunkLoaderFunctor func(Tensor, fNumChunkCols, fVecSizes, fVecPadding, chunkEntry); + ROOT::Internal::RDF::ChangeBeginAndEndEntries(f_rdf, (*fEntries)[BlocksInChunk[i].first + j], + (*fEntries)[BlocksInChunk[i].first + j + 1]); + f_rdf.Foreach(func, fCols); + chunkEntry++; + } + } + } + + // shuffle data in RTensor with the permutation map defined above + for (std::size_t i = 0; i < chunkSize; i++) { + std::copy(Tensor.GetData() + indices[i] * fNumChunkCols, + Tensor.GetData() + (indices[i] + 1) * fNumChunkCols, + TrainChunkTensor.GetData() + i * fNumChunkCols); + } + } } - std::size_t LastChunk() + ////////////////////////////////////////////////////////////////////////// + /// \brief Load the nth chunk from the validation dataset into a tensor + /// \param[in] ValidationChunkTensor RTensor for the validation chunk + /// \param[in] chunk Index of the chunk in the dataset + void LoadValidationChunk(TMVA::Experimental::RTensor &ValidationChunkTensor, std::size_t chunk) { - for (std::size_t i = 0; i < fRemainderChunkTensorRow; i++) { - std::copy(fRemainderChunkTensor.GetData() + (i * fNumColumns), - fRemainderChunkTensor.GetData() + ((i + 1) * fNumColumns), - fChunkTensor.GetData() + (i * fNumColumns)); + + std::random_device rd; + std::mt19937 g; + + if (fSetSeed == 0) { + g.seed(rd()); + } else { + g.seed(fSetSeed); } - return fRemainderChunkTensorRow; + std::size_t chunkSize = fValidation->ChunksSizes[chunk]; + + if (chunk < fValidation->Chunks) { + TMVA::Experimental::RTensor Tensor({chunkSize, fNumChunkCols}); + ValidationChunkTensor = ValidationChunkTensor.Resize({{chunkSize, fNumChunkCols}}); + + // make an identity permutation map + std::vector indices(chunkSize); + std::iota(indices.begin(), indices.end(), 0); + + // shuffle the identity permutation to create a new permutation + if (fShuffle) { + std::shuffle(indices.begin(), indices.end(), g); + } + + std::size_t chunkEntry = 0; + std::vector> BlocksInChunk = fValidation->ChunksIntervals[chunk]; + + std::sort(BlocksInChunk.begin(), BlocksInChunk.end(), + [](const std::pair& a, const std::pair& b) { + return a.first < b.first; + }); + + for (std::size_t i = 0; i < BlocksInChunk.size(); i++) { + + // use the block start and end entry to load into the chunk if the dataframe is not filtered + if (fNotFiltered) { + RChunkLoaderFunctor func(Tensor, fNumChunkCols, fVecSizes, fVecPadding, chunkEntry); + ROOT::Internal::RDF::ChangeBeginAndEndEntries(f_rdf, BlocksInChunk[i].first, BlocksInChunk[i].second); + f_rdf.Foreach(func, fCols); + chunkEntry += BlocksInChunk[i].second - BlocksInChunk[i].first; + } + + // use the entry column of the dataframe as a map to load the entries that passed the filters + else { + std::size_t blockSize = BlocksInChunk[i].second - BlocksInChunk[i].first; + for (std::size_t j = 0; j < blockSize; j++) { + RChunkLoaderFunctor func(Tensor, fNumChunkCols, fVecSizes, fVecPadding, chunkEntry); + ROOT::Internal::RDF::ChangeBeginAndEndEntries(f_rdf, (*fEntries)[BlocksInChunk[i].first + j], + (*fEntries)[BlocksInChunk[i].first + j + 1]); + + f_rdf.Foreach(func, fCols); + chunkEntry++; + } + } + } + + // shuffle data in RTensor with the permutation map defined above + for (std::size_t i = 0; i < chunkSize; i++) { + std::copy(Tensor.GetData() + indices[i] * fNumChunkCols, + Tensor.GetData() + (indices[i] + 1) * fNumChunkCols, + ValidationChunkTensor.GetData() + i * fNumChunkCols); + } + } } + + std::vector GetTrainingChunkSizes() { return fTraining->ChunksSizes; } + std::vector GetValidationChunkSizes() { return fValidation->ChunksSizes; } + + std::size_t GetNumTrainingEntries() { return fNumTrainEntries; } + std::size_t GetNumValidationEntries() { return fNumValidationEntries; } + + void CheckIfUnique(TMVA::Experimental::RTensor &Tensor) + { + auto tensorSize = Tensor.GetSize(); + TMVA::Experimental::RTensor SqueezeTensor = Tensor.Reshape({1, tensorSize}).Squeeze(); + + std::list allEntries; + for (int i = 0; i < tensorSize; i++) { + allEntries.push_back(SqueezeTensor(0, i)); + } + allEntries.sort(); + allEntries.unique(); + if (allEntries.size() == tensorSize) { + std::cout << "Tensor consists of only unique elements" << std::endl; + } + }; + + void CheckIfOverlap(TMVA::Experimental::RTensor &Tensor1, TMVA::Experimental::RTensor &Tensor2) + { + auto tensorSize1 = Tensor1.GetSize(); + TMVA::Experimental::RTensor SqueezeTensor1 = Tensor1.Reshape({1, tensorSize1}).Squeeze(); + + std::list allEntries1; + for (int i = 0; i < tensorSize1; i++) { + allEntries1.push_back(SqueezeTensor1(0, i)); + } + + auto tensorSize2 = Tensor2.GetSize(); + TMVA::Experimental::RTensor SqueezeTensor2 = Tensor2.Reshape({1, tensorSize2}).Squeeze(); + + std::list allEntries2; + for (int i = 0; i < tensorSize2; i++) { + allEntries2.push_back(SqueezeTensor2(0, i)); + } + + std::set result; + + // Call the set_intersection(), which computes the + // intersection of set1 and set2 and + // inserts the result into the 'result' set + std::set set1(allEntries1.begin(), allEntries1.end()); + std::set set2(allEntries2.begin(), allEntries2.end()); + std::set_intersection(set1.begin(), set1.end(), set2.begin(), set2.end(), inserter(result, result.begin())); + // std::list result = intersection(allEntries1, allEntries2); + + if (result.size() == 0) { + std::cout << "No overlap between the tensors" << std::endl; + } else { + std::cout << "Intersection between tensors: "; + for (int num : result) { + std::cout << num << " "; + } + std::cout << std::endl; + } + }; + + std::size_t GetNumTrainingChunks() { return fTraining->Chunks; } + + std::size_t GetNumValidationChunks() { return fValidation->Chunks; } }; + } // namespace Internal } // namespace Experimental } // namespace TMVA diff --git a/tutorials/machine_learning/RBatchGenerator_NumPy.py b/tutorials/machine_learning/RBatchGenerator_NumPy.py index 6640a1505bec2..e914a44a29d56 100644 --- a/tutorials/machine_learning/RBatchGenerator_NumPy.py +++ b/tutorials/machine_learning/RBatchGenerator_NumPy.py @@ -14,24 +14,31 @@ file_name = str(ROOT.gROOT.GetTutorialDir()) + "/machine_learning/data/Higgs_data.root" batch_size = 128 -chunk_size = 5_000 +chunk_size = 5000 +block_size = 400 rdataframe = ROOT.RDataFrame(tree_name, file_name) +target = "Type" + +num_of_epochs = 2 + gen_train, gen_validation = ROOT.TMVA.Experimental.CreateNumPyGenerators( rdataframe, - batch_size, + batch_size, chunk_size, - validation_split=0.3, - shuffle=True, - drop_remainder=False + block_size, + target = target, + validation_split = 0.3, + shuffle = True, + drop_remainder = True ) -# Loop through training set -for i, b in enumerate(gen_train): - print(f"Training batch {i} => {b.shape}") - +for i in range(num_of_epochs): + # Loop through training set + for i, (x_train, y_train) in enumerate(gen_train): + print(f"Training batch {i + 1} => x: {x_train.shape}, y: {y_train.shape}") -# Loop through Validation set -for i, b in enumerate(gen_validation): - print(f"Validation batch {i} => {b.shape}") + # Loop through Validation set + for i, (x_validation, y_validation) in enumerate(gen_validation): + print(f"Validation batch {i + 1} => x: {x_validation.shape}, y: {y_validation.shape}") diff --git a/tutorials/machine_learning/RBatchGenerator_PyTorch.py b/tutorials/machine_learning/RBatchGenerator_PyTorch.py index 406adfac23212..e85ddd3be90c6 100644 --- a/tutorials/machine_learning/RBatchGenerator_PyTorch.py +++ b/tutorials/machine_learning/RBatchGenerator_PyTorch.py @@ -15,7 +15,8 @@ file_name = str(ROOT.gROOT.GetTutorialDir()) + "/machine_learning/data/Higgs_data.root" batch_size = 128 -chunk_size = 5_000 +chunk_size = 5000 +block_size = 300 rdataframe = ROOT.RDataFrame(tree_name, file_name) @@ -25,10 +26,13 @@ # as PyTorch tensors. gen_train, gen_validation = ROOT.TMVA.Experimental.CreatePyTorchGenerators( rdataframe, - batch_size, + batch_size, chunk_size, - target=target, - validation_split=0.3, + block_size, + target = target, + validation_split = 0.3, + shuffle = True, + drop_remainder=True, ) # Get a list of the columns used for training @@ -56,7 +60,9 @@ def calc_accuracy(targets, pred): number_of_epochs = 2 -for _ in range(number_of_epochs): +for i in range(number_of_epochs): + print("Epoch ", i) + model.train() # Loop through the training set and train model for i, (x_train, y_train) in enumerate(gen_train): # Make prediction and calculate loss @@ -73,14 +79,15 @@ def calc_accuracy(targets, pred): print(f"Training => accuracy: {accuracy}") - ################################################################# - # Validation - ################################################################# + # ################################################################# + # # Validation + # ################################################################# + model.eval() # Evaluate the model on the validation set - for i, (x_train, y_train) in enumerate(gen_validation): + for i, (x_val, y_val) in enumerate(gen_validation): # Make prediction and calculate accuracy - pred = model(x_train) - accuracy = calc_accuracy(y_train, pred) + pred = model(x_val) + accuracy = calc_accuracy(y_val, pred) print(f"Validation => accuracy: {accuracy}") diff --git a/tutorials/machine_learning/RBatchGenerator_TensorFlow.py b/tutorials/machine_learning/RBatchGenerator_TensorFlow.py index 097f6dc458224..d418d1ef7dfc3 100644 --- a/tutorials/machine_learning/RBatchGenerator_TensorFlow.py +++ b/tutorials/machine_learning/RBatchGenerator_TensorFlow.py @@ -18,19 +18,22 @@ file_name = str(ROOT.gROOT.GetTutorialDir()) + "/machine_learning/data/Higgs_data.root" batch_size = 128 -chunk_size = 5_000 +chunk_size = 5000 +block_size = 300 rdataframe = ROOT.RDataFrame(tree_name, file_name) - -target = "Type" +target = ["Type"] # Returns two TF.Dataset for training and validation batches. ds_train, ds_valid = ROOT.TMVA.Experimental.CreateTFDatasets( rdataframe, - batch_size, + batch_size, chunk_size, - validation_split=0.3, - target=target, + block_size, + target = target, + validation_split = 0.3, + shuffle = True, + drop_remainder = True ) num_of_epochs = 2 @@ -61,9 +64,10 @@ tf.keras.layers.Dense(1, activation=tf.nn.sigmoid), ] ) + loss_fn = tf.keras.losses.BinaryCrossentropy() model.compile(optimizer="adam", loss=loss_fn, metrics=["accuracy"]) -# Train model model.fit(ds_train_repeated, steps_per_epoch=train_batches_per_epoch, validation_data=ds_valid_repeated,\ validation_steps=validation_batches_per_epoch, epochs=num_of_epochs) + diff --git a/tutorials/machine_learning/RBatchGenerator_filters_vectors.py b/tutorials/machine_learning/RBatchGenerator_filters_vectors.py index c497748466e04..7666e2ee2868f 100644 --- a/tutorials/machine_learning/RBatchGenerator_filters_vectors.py +++ b/tutorials/machine_learning/RBatchGenerator_filters_vectors.py @@ -13,7 +13,6 @@ import ROOT - tree_name = "test_tree" file_name = ( ROOT.gROOT.GetTutorialDir().Data() @@ -22,6 +21,7 @@ chunk_size = 50 # Defines the size of the chunks batch_size = 5 # Defines the size of the returned batches +block_size = 10 # Defines the size of the blocks that builds up a chunk rdataframe = ROOT.RDataFrame(tree_name, file_name) @@ -36,9 +36,10 @@ filteredrdf, batch_size, chunk_size, + block_size, validation_split=0.3, max_vec_sizes=max_vec_sizes, - shuffle=True, + shuffle=False, ) print(f"Columns: {ds_train.columns}")