From 738d2207f965ae5f6e888bda0aa5f357c2dad8f4 Mon Sep 17 00:00:00 2001 From: ronanstokes-db Date: Wed, 19 Oct 2022 20:06:58 -0700 Subject: [PATCH 1/3] wip --- dbldatagen/data_generator.py | 57 +++++++++++++++++---------- tests/test_streaming.py | 76 ++++++++++++++++++++++++++++++++++-- 2 files changed, 110 insertions(+), 23 deletions(-) diff --git a/dbldatagen/data_generator.py b/dbldatagen/data_generator.py index 0ae8d396..c98a397a 100644 --- a/dbldatagen/data_generator.py +++ b/dbldatagen/data_generator.py @@ -778,29 +778,46 @@ def _getBaseDataFrame(self, startId=0, streaming=False, options=None): if ColumnGenerationSpec.SEED_COLUMN != "id": df1 = df1.withColumnRenamed("id", ColumnGenerationSpec.SEED_COLUMN) + else: + df1 = self._getStreamingBaseDataFrame(startId, options) + + return df1 + def _getStreamingBaseDataFrame(self, startId=0, options=None): + end_id = self._rowCount + startId + id_partitions = (self.partitions if self.partitions is not None + else self.sparkSession.sparkContext.defaultParallelism) + + status = f"Generating streaming data frame with ids from {startId} to {end_id} with {id_partitions} partitions" + self.logger.info(status) + self.executionHistory.append(status) + + df1 = (self.sparkSession.readStream + .format("rate")) + if options is not None: + if "rowsPerSecond" not in options: + options['rowsPerSecond'] = 1 + if "numPartitions" not in options: + options['numPartitions'] = id_partitions else: - status = (f"Generating streaming data frame with ids from {startId} to {end_id} with {id_partitions} partitions") - self.logger.info(status) - self.executionHistory.append(status) + options = { + "rowsPerSecond": 1, + "numPartitions": id_partitions + } - df1 = (self.sparkSession.readStream - .format("rate")) - if options is not None: - if "rowsPerSecond" not in options: - options['rowsPerSecond'] = 1 - if "numPartitions" not in options: - options['numPartitions'] = id_partitions - - for k, v in options.items(): - df1 = df1.option(k, v) - df1 = df1.load().withColumnRenamed("value", ColumnGenerationSpec.SEED_COLUMN) - else: - df1 = (df1.option("rowsPerSecond", 1) - .option("numPartitions", id_partitions) - .load() - .withColumnRenamed("value", ColumnGenerationSpec.SEED_COLUMN) - ) + age_limit_interval = None + + if "ageLimit" in options: + age_limit_interval = options.pop("ageLimit") + assert age_limit_interval is not None and float(age_limit_interval) > 0.0, "invalid age limit" + + for k, v in options.items(): + df1 = df1.option(k, v) + df1 = df1.load().withColumnRenamed("value", ColumnGenerationSpec.SEED_COLUMN) + + if age_limit_interval is not None: + df1 = df1.where(f"""abs(cast(now() as double) - cast(`timestamp` as double )) + < cast({age_limit_interval} as double)""") return df1 diff --git a/tests/test_streaming.py b/tests/test_streaming.py index a4a51b39..9a99ccd2 100644 --- a/tests/test_streaming.py +++ b/tests/test_streaming.py @@ -13,14 +13,14 @@ class TestStreaming(unittest.TestCase): row_count = 100000 column_count = 10 - time_to_run = 15 + time_to_run = 8 rows_per_second = 5000 def test_streaming(self): time_now = int(round(time.time() * 1000)) base_dir = "/tmp/testdatagenerator_{}".format(time_now) - test_dir = os.path.join(base_dir, "data") - checkpoint_dir = os.path.join(base_dir, "checkpoint") + test_dir = os.path.join(base_dir, "data1") + checkpoint_dir = os.path.join(base_dir, "checkpoint1") print(time_now, test_dir, checkpoint_dir) new_data_rows = 0 @@ -80,3 +80,73 @@ def test_streaming(self): # check that we have at least one second of data self.assertGreater(new_data_rows, self.rows_per_second) + + def test_streaming_with_age_limit(self): + print(spark.version) + + time_now = int(round(time.time() * 1000)) + base_dir = "/tmp/testdatagenerator2_{}".format(time_now) + test_dir = os.path.join(base_dir, "data") + checkpoint_dir = os.path.join(base_dir, "checkpoint") + print(time_now, test_dir, checkpoint_dir) + + new_data_rows = 0 + + try: + os.makedirs(test_dir) + os.makedirs(checkpoint_dir) + + testDataSpec = (dg.DataGenerator(sparkSession=spark, name="test_data_set1", rows=self.row_count, + partitions=4, seedMethod='hash_fieldname') + .withIdOutput() + .withColumn("r", FloatType(), expr="floor(rand() * 350) * (86400 + 3600)", + numColumns=self.column_count) + .withColumn("code1", IntegerType(), minValue=100, maxValue=200) + .withColumn("code2", IntegerType(), minValue=0, maxValue=10) + .withColumn("code3", StringType(), values=['a', 'b', 'c']) + .withColumn("code4", StringType(), values=['a', 'b', 'c'], random=True) + .withColumn("code5", StringType(), values=['a', 'b', 'c'], random=True, weights=[9, 1, 1]) + + ) + + dfTestData = testDataSpec.build(withStreaming=True, + options={'rowsPerSecond': self.rows_per_second, + 'ageLimit': 1}) + + (dfTestData + .writeStream + .format("parquet") + .outputMode("append") + .option("path", test_dir) + .option("checkpointLocation", checkpoint_dir) + .start()) + + start_time = time.time() + time.sleep(self.time_to_run) + + # note stopping the stream may produce exceptions - these can be ignored + recent_progress = [] + for x in spark.streams.active: + recent_progress.append(x.recentProgress) + print(x) + x.stop() + + end_time = time.time() + + # read newly written data + df2 = spark.read.format("parquet").load(test_dir) + + new_data_rows = df2.count() + + print("read {} rows from newly written data".format(new_data_rows)) + finally: + shutil.rmtree(base_dir) + + print("*** Done ***") + + print("elapsed time (seconds)", end_time - start_time) + + # check that we have at least one second of data + self.assertGreater(new_data_rows, int(self.rows_per_second / 4)) + + From 94a90e839f69dbee790b77d5dab0ca6e1cb5ae04 Mon Sep 17 00:00:00 2001 From: ronanstokes-db Date: Wed, 19 Oct 2022 22:31:58 -0700 Subject: [PATCH 2/3] wip --- dbldatagen/data_generator.py | 96 ++++++++++++++++++++++++++++++------ 1 file changed, 82 insertions(+), 14 deletions(-) diff --git a/dbldatagen/data_generator.py b/dbldatagen/data_generator.py index c98a397a..493deb10 100644 --- a/dbldatagen/data_generator.py +++ b/dbldatagen/data_generator.py @@ -16,8 +16,18 @@ from .spark_singleton import SparkSingleton from .utils import ensure, topologicalSort, DataGenError, deprecated +START_TIMESTAMP_OPTION = "startTimestamp" +ROWS_PER_SECOND_OPTION = "rowsPerSecond" +AGE_LIMIT_OPTION = "ageLimit" +NUM_PARTITIONS_OPTION = "numPartitions" +ROWS_PER_BATCH_OPTION = "rowsPerBatch" +STREAMING_SOURCE_OPTION = "streamingSource" + _OLD_MIN_OPTION = 'min' _OLD_MAX_OPTION = 'max' +RATE_SOURCE = "rate" +RATE_PER_MICRO_BATCH_SOURCE = "rate-micro-batch" +SPARK_RATE_MICROBATCH_VERSION = "3.2.1" class DataGenerator: @@ -783,36 +793,94 @@ def _getBaseDataFrame(self, startId=0, streaming=False, options=None): return df1 + def _getStreamingSource(self, options=None): + """ get streaming source from options + + :param options: dictionary of options + :returns: streaming source if present in options (popping option from options), or default if not present + + Default streaming source is computed based on whether we are running on Spark version 3.2.1 or later + + if using spark version 3.2.1 or later - `rate-micro-batch` is used as source, otherwise `rate` is used as source + """ + if options is not None: + if STREAMING_SOURCE_OPTION in options: + streaming_source = options.pop(STREAMING_SOURCE_OPTION) + assert streaming_source in [RATE_SOURCE, RATE_PER_MICRO_BATCH_SOURCE], \ + f"Invalid streaming source - only ['{RATE_SOURCE}', ['{RATE_PER_MICRO_BATCH_SOURCE}'] supported" + + # if using Spark 3.2.1, then default should be RATE_PER_MICRO_BATCH_SOURCE + if self.sparkSession.version >= SPARK_RATE_MICROBATCH_VERSION: + streaming_source = RATE_PER_MICRO_BATCH_SOURCE + else: + streaming_source = RATE_SOURCE + + return streaming_source + + def _getCurrentSparkTimestamp(self, asLong=False): + """ get current spark timestamp + + :param asLong: if True, returns current spark timestamp as long, string otherwise + """ + if asLong: + return (self.sparkSession.sql(f"select cast(now() as string) as start_timestamp") + .collect()[0]['start_timestamp']) + else: + return (self.sparkSession.sql(f"select cast(now() as long) as start_timestamp") + .collect()[0]['start_timestamp']) + def _getStreamingBaseDataFrame(self, startId=0, options=None): + """Generate base streaming data frame""" end_id = self._rowCount + startId id_partitions = (self.partitions if self.partitions is not None else self.sparkSession.sparkContext.defaultParallelism) - status = f"Generating streaming data frame with ids from {startId} to {end_id} with {id_partitions} partitions" + # determine streaming source + streaming_source = self._getStreamingSource(options) + + if options is None: + if streaming_source == RATE_SOURCE: + options = { + ROWS_PER_SECOND_OPTION: 1, + } + else: + options = { + ROWS_PER_BATCH_OPTION: 1, + } + + if NUM_PARTITIONS_OPTION not in options: + options[NUM_PARTITIONS_OPTION] = id_partitions + + if streaming_source == RATE_PER_MICRO_BATCH_SOURCE: + if START_TIMESTAMP_OPTION not in options: + options[ START_TIMESTAMP_OPTION] = self._getCurrentSparkTimestamp(asLong=True) + + if ROWS_PER_BATCH_OPTION not in options: + options[ ROWS_PER_BATCH_OPTION] = id_partitions + status = f"Generating streaming data from rate source with {id_partitions} partitions" + + elif streaming_source == RATE_SOURCE: + if ROWS_PER_SECOND_OPTION not in options: + options[ ROWS_PER_SECOND_OPTION] = 1 + status = f"Generating streaming data from rate-micro-batch source with {id_partitions} partitions" + else: + assert streaming_source in [RATE_SOURCE, RATE_PER_MICRO_BATCH_SOURCE], \ + f"Invalid streaming source - only ['{RATE_SOURCE}', ['{RATE_PER_MICRO_BATCH_SOURCE}'] supported" + self.logger.info(status) self.executionHistory.append(status) - df1 = (self.sparkSession.readStream - .format("rate")) - if options is not None: - if "rowsPerSecond" not in options: - options['rowsPerSecond'] = 1 - if "numPartitions" not in options: - options['numPartitions'] = id_partitions - else: - options = { - "rowsPerSecond": 1, - "numPartitions": id_partitions - } + df1 = (self.sparkSession.readStream.format(streaming_source)) age_limit_interval = None - if "ageLimit" in options: + if AGE_LIMIT_OPTION in options: age_limit_interval = options.pop("ageLimit") assert age_limit_interval is not None and float(age_limit_interval) > 0.0, "invalid age limit" for k, v in options.items(): df1 = df1.option(k, v) + df1 = df1.load().withColumnRenamed("value", ColumnGenerationSpec.SEED_COLUMN) if age_limit_interval is not None: From 51adf3968b392662b8dd52054849c8d6a9c56f15 Mon Sep 17 00:00:00 2001 From: ronanstokes-db Date: Thu, 20 Oct 2022 02:34:15 -0700 Subject: [PATCH 3/3] wip --- dbldatagen/data_generator.py | 99 ++++++---- makefile | 8 + python/dev_require_321.txt | 33 ++++ tests/test_streaming.py | 346 ++++++++++++++++++++++++++++++++--- 4 files changed, 422 insertions(+), 64 deletions(-) create mode 100644 python/dev_require_321.txt diff --git a/dbldatagen/data_generator.py b/dbldatagen/data_generator.py index 493deb10..7dc9a0d8 100644 --- a/dbldatagen/data_generator.py +++ b/dbldatagen/data_generator.py @@ -267,7 +267,7 @@ def explain(self, suppressOutput=False): output = ["", "Data generation plan", "====================", f"spec=DateGenerator(name={self.name}, rows={self._rowCount}, startingId={self.starting_id}, partitions={self.partitions})" - , ")", "", f"column build order: {self._buildOrder}", "", "build plan:"] + , ")", "", f"column build order: {self._buildOrder}", "", "build plan:"] for plan_action in self._buildPlan: output.append(" ==> " + plan_action) @@ -793,7 +793,7 @@ def _getBaseDataFrame(self, startId=0, streaming=False, options=None): return df1 - def _getStreamingSource(self, options=None): + def _getStreamingSource(self, options=None, spark_version=None): """ get streaming source from options :param options: dictionary of options @@ -803,17 +803,22 @@ def _getStreamingSource(self, options=None): if using spark version 3.2.1 or later - `rate-micro-batch` is used as source, otherwise `rate` is used as source """ + streaming_source = None if options is not None: if STREAMING_SOURCE_OPTION in options: - streaming_source = options.pop(STREAMING_SOURCE_OPTION) + streaming_source = options[STREAMING_SOURCE_OPTION] assert streaming_source in [RATE_SOURCE, RATE_PER_MICRO_BATCH_SOURCE], \ f"Invalid streaming source - only ['{RATE_SOURCE}', ['{RATE_PER_MICRO_BATCH_SOURCE}'] supported" - # if using Spark 3.2.1, then default should be RATE_PER_MICRO_BATCH_SOURCE - if self.sparkSession.version >= SPARK_RATE_MICROBATCH_VERSION: - streaming_source = RATE_PER_MICRO_BATCH_SOURCE - else: - streaming_source = RATE_SOURCE + if spark_version is None: + spark_version = self.sparkSession.version + + if streaming_source is None: + # if using Spark 3.2.1, then default should be RATE_PER_MICRO_BATCH_SOURCE + if spark_version >= SPARK_RATE_MICROBATCH_VERSION: + streaming_source = RATE_PER_MICRO_BATCH_SOURCE + else: + streaming_source = RATE_SOURCE return streaming_source @@ -823,61 +828,77 @@ def _getCurrentSparkTimestamp(self, asLong=False): :param asLong: if True, returns current spark timestamp as long, string otherwise """ if asLong: - return (self.sparkSession.sql(f"select cast(now() as string) as start_timestamp") - .collect()[0]['start_timestamp']) - else: return (self.sparkSession.sql(f"select cast(now() as long) as start_timestamp") - .collect()[0]['start_timestamp']) + .collect()[0]['start_timestamp']) + else: + return (self.sparkSession.sql(f"select cast(now() as string) as start_timestamp") + .collect()[0]['start_timestamp']) - def _getStreamingBaseDataFrame(self, startId=0, options=None): - """Generate base streaming data frame""" - end_id = self._rowCount + startId - id_partitions = (self.partitions if self.partitions is not None - else self.sparkSession.sparkContext.defaultParallelism) + def _prepareStreamingOptions(self, options=None, spark_version=None): + default_streaming_partitions = (self.partitions if self.partitions is not None + else self.sparkSession.sparkContext.defaultParallelism) - # determine streaming source - streaming_source = self._getStreamingSource(options) + streaming_source = self._getStreamingSource(options, spark_version) if options is None: - if streaming_source == RATE_SOURCE: - options = { - ROWS_PER_SECOND_OPTION: 1, - } - else: - options = { - ROWS_PER_BATCH_OPTION: 1, - } + new_options = ({ROWS_PER_SECOND_OPTION: default_streaming_partitions} if streaming_source == RATE_SOURCE + else {ROWS_PER_BATCH_OPTION: default_streaming_partitions}) + else: + new_options = options.copy() - if NUM_PARTITIONS_OPTION not in options: - options[NUM_PARTITIONS_OPTION] = id_partitions + if NUM_PARTITIONS_OPTION in new_options: + streaming_partitions = new_options[NUM_PARTITIONS_OPTION] + else: + streaming_partitions = default_streaming_partitions + new_options[NUM_PARTITIONS_OPTION] = streaming_partitions if streaming_source == RATE_PER_MICRO_BATCH_SOURCE: - if START_TIMESTAMP_OPTION not in options: - options[ START_TIMESTAMP_OPTION] = self._getCurrentSparkTimestamp(asLong=True) + if START_TIMESTAMP_OPTION not in new_options: + new_options[START_TIMESTAMP_OPTION] = self._getCurrentSparkTimestamp(asLong=True) - if ROWS_PER_BATCH_OPTION not in options: - options[ ROWS_PER_BATCH_OPTION] = id_partitions - status = f"Generating streaming data from rate source with {id_partitions} partitions" + if ROWS_PER_BATCH_OPTION not in new_options: + # generate one row per partition + new_options[ROWS_PER_BATCH_OPTION] = streaming_partitions elif streaming_source == RATE_SOURCE: - if ROWS_PER_SECOND_OPTION not in options: - options[ ROWS_PER_SECOND_OPTION] = 1 - status = f"Generating streaming data from rate-micro-batch source with {id_partitions} partitions" + if ROWS_PER_SECOND_OPTION not in new_options: + new_options[ROWS_PER_SECOND_OPTION] = streaming_partitions else: assert streaming_source in [RATE_SOURCE, RATE_PER_MICRO_BATCH_SOURCE], \ f"Invalid streaming source - only ['{RATE_SOURCE}', ['{RATE_PER_MICRO_BATCH_SOURCE}'] supported" + return streaming_source, new_options + + def _getStreamingBaseDataFrame(self, startId=0, options=None): + """Generate base streaming data frame""" + end_id = self._rowCount + startId + + # determine streaming source + streaming_source, options = self._prepareStreamingOptions(options) + partitions = options[NUM_PARTITIONS_OPTION] + + if streaming_source == RATE_SOURCE: + status = f"Generating streaming data with rate source with {partitions} partitions" + else: + status = f"Generating streaming data with rate-micro-batch source with {partitions} partitions" + self.logger.info(status) self.executionHistory.append(status) - df1 = (self.sparkSession.readStream.format(streaming_source)) - age_limit_interval = None + if STREAMING_SOURCE_OPTION in options: + options.pop(STREAMING_SOURCE_OPTION) + if AGE_LIMIT_OPTION in options: age_limit_interval = options.pop("ageLimit") assert age_limit_interval is not None and float(age_limit_interval) > 0.0, "invalid age limit" + assert AGE_LIMIT_OPTION not in options + assert STREAMING_SOURCE_OPTION not in options + + df1 = self.sparkSession.readStream.format(streaming_source) + for k, v in options.items(): df1 = df1.option(k, v) diff --git a/makefile b/makefile index 953eab87..9cf158cc 100644 --- a/makefile +++ b/makefile @@ -29,6 +29,10 @@ create-dev-env: @echo "$(OK_COLOR)=> making conda dev environment$(NO_COLOR)" conda create -n $(ENV_NAME) python=3.7.5 +create-dev-env-321: + @echo "$(OK_COLOR)=> making conda dev environment for Spark 3.2.1$(NO_COLOR)" + conda create -n $(ENV_NAME) python=3.8.10 + create-github-build-env: @echo "$(OK_COLOR)=> making conda dev environment$(NO_COLOR)" conda create -n pip_$(ENV_NAME) python=3.8 @@ -37,6 +41,10 @@ install-dev-dependencies: @echo "$(OK_COLOR)=> installing dev environment requirements$(NO_COLOR)" pip install -r python/dev_require.txt +install-dev-dependencies321: + @echo "$(OK_COLOR)=> installing dev environment requirements for Spark 3.2.1$(NO_COLOR)" + pip install -r python/dev_require_321.txt + clean-dev-env: @echo "$(OK_COLOR)=> Cleaning dev environment$(NO_COLOR)" @echo "Current version: $(CURRENT_VERSION)" diff --git a/python/dev_require_321.txt b/python/dev_require_321.txt new file mode 100644 index 00000000..56846a62 --- /dev/null +++ b/python/dev_require_321.txt @@ -0,0 +1,33 @@ +# The following packages are used in building the test data generator framework. +# All packages used are already installed in the Databricks runtime environment for version 6.5 or later +numpy==1.20.1 +pandas==1.2.4 +pickleshare==0.7.5 +py4j==0.10.9.3 +pyarrow==4.0.0 +pyspark==3.2.1 +python-dateutil==2.8.1 +six==1.15.0 + +# The following packages are required for development only +wheel==0.36.2 +setuptools==52.0.0 +bumpversion +pytest +pytest-cov +pytest-timeout +rstcheck +prospector + +# The following packages are only required for building documentation and are not required at runtime +sphinx==5.0.0 +sphinx_rtd_theme +nbsphinx +numpydoc==0.8 +pypandoc +ipython==7.16.3 +recommonmark +sphinx-markdown-builder +rst2pdf==0.98 +Jinja2 < 3.1 + diff --git a/tests/test_streaming.py b/tests/test_streaming.py index 9a99ccd2..92ac2bed 100644 --- a/tests/test_streaming.py +++ b/tests/test_streaming.py @@ -1,7 +1,7 @@ import os import shutil import time -import unittest +import pytest from pyspark.sql.types import IntegerType, StringType, FloatType @@ -10,48 +10,344 @@ spark = dg.SparkSingleton.getLocalInstance("streaming tests") -class TestStreaming(unittest.TestCase): +class TestStreaming: row_count = 100000 column_count = 10 time_to_run = 8 rows_per_second = 5000 - def test_streaming(self): + def getTestDataSpec(self): + testDataSpec = (dg.DataGenerator(sparkSession=spark, name="test_data_set1", + rows=self.row_count, + partitions=spark.sparkContext.defaultParallelism, + seedMethod='hash_fieldname') + .withIdOutput() + .withColumn("r", FloatType(), expr="floor(rand() * 350) * (86400 + 3600)", + numColumns=self.column_count) + .withColumn("code1", IntegerType(), minValue=100, maxValue=200) + .withColumn("code2", IntegerType(), minValue=0, maxValue=10) + .withColumn("code3", StringType(), values=['a', 'b', 'c']) + .withColumn("code4", StringType(), values=['a', 'b', 'c'], random=True) + .withColumn("code5", StringType(), values=['a', 'b', 'c'], random=True, weights=[9, 1, 1]) + ) + return testDataSpec + + def test_get_current_spark_timestamp(self): + testDataSpec = dg.DataGenerator(sparkSession=spark, name="test_data_set1", + rows=self.row_count, + partitions=spark.sparkContext.defaultParallelism, + seedMethod='hash_fieldname') + ts = testDataSpec._getCurrentSparkTimestamp(asLong=False) + + assert type(ts) is str + assert ts is not None and len(ts.strip()) > 0 + print(ts) + + def test_get_current_spark_timestamp2(self): + testDataSpec = dg.DataGenerator(sparkSession=spark, name="test_data_set1", + rows=self.row_count, + partitions=spark.sparkContext.defaultParallelism, + seedMethod='hash_fieldname') + ts = testDataSpec._getCurrentSparkTimestamp(asLong=True) + + assert(type(ts) is int) + print(ts) + + def test_get_current_spark_version(self): + assert spark.version > "3.0.0" + assert spark.version <= "6.0.0" + + @pytest.mark.parametrize("options_supplied,expected,spark_version_override", + [(None, "rate" if spark.version < "3.2.1" else "rate-micro-batch", None), + (None, "rate", "3.0.0"), + (None, "rate-micro-batch", "3.2.1"), + ({'streamingSource': 'rate'}, 'rate', None), + ({'streamingSource': 'rate-micro-batch'}, 'rate-micro-batch', None)]) + def test_streaming_source_options(self, options_supplied, expected, spark_version_override): + print("options", options_supplied) + testDataSpec = dg.DataGenerator(sparkSession=spark, name="test_data_set1", + rows=self.row_count, + partitions=spark.sparkContext.defaultParallelism, + seedMethod='hash_fieldname') + + result = testDataSpec._getStreamingSource(options_supplied, spark_version_override) + print("Options:", options_supplied, "retval:", result) + + assert result == expected + + @pytest.mark.parametrize("options_supplied,source_expected,options_expected,spark_version_override", + [(None, "rate" if spark.version < "3.2.1" else "rate-micro-batch", + {'numPartitions': spark.sparkContext.defaultParallelism, + 'rowsPerBatch': spark.sparkContext.defaultParallelism, + 'startTimestamp': "*"} if spark.version >= "3.2.1" + else {'numPartitions': spark.sparkContext.defaultParallelism, + 'rowsPerSecond': spark.sparkContext.defaultParallelism}, None), + + (None, "rate", {'numPartitions': spark.sparkContext.defaultParallelism, + 'rowsPerSecond': spark.sparkContext.defaultParallelism}, "3.0.0"), + + (None, "rate-micro-batch", + {'numPartitions': spark.sparkContext.defaultParallelism, + 'rowsPerBatch': spark.sparkContext.defaultParallelism, + 'startTimestamp': "*"}, "3.2.1"), + + ({'streamingSource': 'rate'}, 'rate', + {'numPartitions': spark.sparkContext.defaultParallelism, + 'streamingSource': 'rate', + 'rowsPerSecond': spark.sparkContext.defaultParallelism}, None), + + ({'streamingSource': 'rate', 'rowsPerSecond': 5000}, 'rate', + {'numPartitions': spark.sparkContext.defaultParallelism, + 'streamingSource': 'rate', + 'rowsPerSecond': 5000}, None), + + ({'streamingSource': 'rate', 'numPartitions': 10}, 'rate', + {'numPartitions': 10, 'rowsPerSecond': 10, 'streamingSource': 'rate'}, None), + + ({'streamingSource': 'rate', 'numPartitions': 10, 'rowsPerSecond': 5000}, 'rate', + {'numPartitions': 10, 'rowsPerSecond': 5000, 'streamingSource': 'rate'}, None), + + ({'streamingSource': 'rate-micro-batch'}, 'rate-micro-batch', + {'streamingSource': 'rate-micro-batch', + 'numPartitions': spark.sparkContext.defaultParallelism, + 'startTimestamp': '*', + 'rowsPerBatch': spark.sparkContext.defaultParallelism}, None), + + ({'streamingSource': 'rate-micro-batch', 'numPartitions':20}, 'rate-micro-batch', + {'streamingSource': 'rate-micro-batch', + 'numPartitions': 20, + 'startTimestamp': '*', + 'rowsPerBatch': 20}, None), + + ({'streamingSource': 'rate-micro-batch', 'numPartitions': 20, 'rowsPerBatch': 4300}, + 'rate-micro-batch', + {'streamingSource': 'rate-micro-batch', + 'numPartitions': 20, + 'startTimestamp': '*', + 'rowsPerBatch': 4300}, None), + ]) + def test_prepare_options(self, options_supplied, source_expected, options_expected, spark_version_override): + testDataSpec = dg.DataGenerator(sparkSession=spark, name="test_data_set1", + rows=self.row_count, + partitions=spark.sparkContext.defaultParallelism, + seedMethod='hash_fieldname') + + streaming_source, new_options = testDataSpec._prepareStreamingOptions(options_supplied, spark_version_override) + print("Options supplied:", options_supplied, "streamingSource:", streaming_source) + + assert streaming_source == source_expected, "unexpected streaming source" + + if streaming_source == "rate-micro-batch": + assert "startTimestamp" in new_options + assert "startTimestamp" in options_expected + if options_expected["startTimestamp"] == "*": + options_expected.pop("startTimestamp") + new_options.pop("startTimestamp") + + print("options expected:", options_expected) + + assert new_options == options_expected, "unexpected options" + + @pytest.fixture + def getBaseDir(self, request): time_now = int(round(time.time() * 1000)) - base_dir = "/tmp/testdatagenerator_{}".format(time_now) - test_dir = os.path.join(base_dir, "data1") - checkpoint_dir = os.path.join(base_dir, "checkpoint1") - print(time_now, test_dir, checkpoint_dir) + base_dir = f"/tmp/testdatagenerator_{request.node.originalname}_{time_now}" + yield base_dir + print("cleaning base dir") + shutil.rmtree(base_dir) - new_data_rows = 0 + @pytest.fixture + def getCheckpoint(self, getBaseDir, request): + checkpoint_dir = os.path.join(getBaseDir, "checkpoint1") + os.makedirs(checkpoint_dir) + + yield checkpoint_dir + print("cleaning checkpoint dir") + + @pytest.fixture + def getDataDir(self, getBaseDir, request): + data_dir = os.path.join(getBaseDir, "data1") + os.makedirs(data_dir) + + yield data_dir + print("cleaning data dir") + + + + def test_fixture1(self, getCheckpoint, getDataDir): + print(getCheckpoint) + print(getDataDir) + + def test_streaming_basic_rate(self, getDataDir, getCheckpoint): + test_dir = getDataDir + checkpoint_dir = getCheckpoint try: - os.makedirs(test_dir) - os.makedirs(checkpoint_dir) - testDataSpec = (dg.DataGenerator(sparkSession=spark, name="test_data_set1", rows=self.row_count, - partitions=4, seedMethod='hash_fieldname') + testDataSpec = (dg.DataGenerator(sparkSession=spark, name="test_data_set1", + rows=self.row_count, + partitions=spark.sparkContext.defaultParallelism, + seedMethod='hash_fieldname') + .withIdOutput()) + + dfTestData = testDataSpec.build(withStreaming=True, + options={'rowsPerSecond': self.rows_per_second, + 'ageLimit': 1, + 'streamingSource': 'rate'}) + + (dfTestData.writeStream + .option("checkpointLocation", checkpoint_dir) + .outputMode("append") + .format("parquet") + .start(test_dir) + ) + + start_time = time.time() + time.sleep(self.time_to_run) + + # note stopping the stream may produce exceptions - these can be ignored + recent_progress = [] + for x in spark.streams.active: + recent_progress.append(x.recentProgress) + print(x) + x.stop() + + end_time = time.time() + + # read newly written data + df2 = spark.read.format("parquet").load(test_dir) + + new_data_rows = df2.count() + + print("read {} rows from newly written data".format(new_data_rows)) + finally: + pass + + print("*** Done ***") + + print("elapsed time (seconds)", end_time - start_time) + + # check that we have at least one second of data + assert new_data_rows > self.rows_per_second + + def test_streaming_basic_rate_micro_batch(self, getDataDir, getCheckpoint): + test_dir = getDataDir + checkpoint_dir = getCheckpoint + + try: + + testDataSpec = (dg.DataGenerator(sparkSession=spark, name="test_data_set1", + rows=self.row_count, + partitions=spark.sparkContext.defaultParallelism, + seedMethod='hash_fieldname') .withIdOutput() - .withColumn("r", FloatType(), expr="floor(rand() * 350) * (86400 + 3600)", - numColumns=self.column_count) .withColumn("code1", IntegerType(), minValue=100, maxValue=200) .withColumn("code2", IntegerType(), minValue=0, maxValue=10) .withColumn("code3", StringType(), values=['a', 'b', 'c']) - .withColumn("code4", StringType(), values=['a', 'b', 'c'], random=True) - .withColumn("code5", StringType(), values=['a', 'b', 'c'], random=True, weights=[9, 1, 1]) - ) dfTestData = testDataSpec.build(withStreaming=True, - options={'rowsPerSecond': self.rows_per_second}) + options={'rowsPerBatch': 1000, + 'streamingSource': 'rate-micro-batch', + 'startTimestamp': 0}) - (dfTestData - .writeStream - .format("parquet") - .outputMode("append") - .option("path", test_dir) - .option("checkpointLocation", checkpoint_dir) - .start()) + (dfTestData.writeStream + .option("checkpointLocation", checkpoint_dir) + .outputMode("append") + .format("parquet") + .start(test_dir) + ) + + start_time = time.time() + time.sleep(self.time_to_run) + + # note stopping the stream may produce exceptions - these can be ignored + recent_progress = [] + for x in spark.streams.active: + recent_progress.append(x.recentProgress) + print(x) + x.stop() + + end_time = time.time() + + # read newly written data + df2 = spark.read.format("parquet").load(test_dir) + + new_data_rows = df2.count() + + print("read {} rows from newly written data".format(new_data_rows)) + finally: + pass + + print("*** Done ***") + + print("elapsed time (seconds)", end_time - start_time) + + # check that we have at least one second of data + assert new_data_rows > self.rows_per_second + + + def test_streaming_rate_source(self): + print(spark.version) + test_dir, checkpoint_dir, base_dir = self.getDataAndCheckpoint("test1") + + new_data_rows = 0 + + self.makeDataAndCheckpointDirs(test_dir, checkpoint_dir) + + try: + + testDataSpec = self.getTestDataSpec() + + dfTestData = testDataSpec.build(withStreaming=True, + options={'rowsPerSecond': self.rows_per_second, + 'ageLimit': 1, + 'streamingSource': 'rate'}) + + start_time = time.time() + time.sleep(self.time_to_run) + + # note stopping the stream may produce exceptions - these can be ignored + recent_progress = [] + for x in spark.streams.active: + recent_progress.append(x.recentProgress) + print(x) + x.stop() + + end_time = time.time() + + # read newly written data + df2 = spark.read.format("parquet").load(test_dir) + + new_data_rows = df2.count() + + print("read {} rows from newly written data".format(new_data_rows)) + finally: + shutil.rmtree(base_dir) + + print("*** Done ***") + + print("elapsed time (seconds)", end_time - start_time) + + # check that we have at least one second of data + self.assertGreater(new_data_rows, self.rows_per_second) + + + def test_streaming(self): + print(spark.version) + test_dir, checkpoint_dir, base_dir = self.getDataAndCheckpoint("test1") + + new_data_rows = 0 + + self.makeDataAndCheckpointDirs(test_dir, checkpoint_dir) + + try: + + testDataSpec = self.getTestDataSpec() + + dfTestData = testDataSpec.build(withStreaming=True, + options={'rowsPerSecond': self.rows_per_second, + 'ageLimit': 1}) start_time = time.time() time.sleep(self.time_to_run)