| 1 | +# We can't simply create the SparkApplication object here as we have to wait for Kafka to be ready because |
| 2 | +# * We currently don't restart failed Spark applications (see https://github.com/stackabletech/spark-k8s-operator/issues/157) |
| 3 | +# * We currently auto-create topics and we need all the brokers to be available so that the topic is distributed among all the brokers |
| 4 | +--- |
| 5 | +apiVersion: batch/v1 |
| 6 | +kind: Job |
| 7 | +metadata: |
| 8 | + name: create-spark-ingestion-job |
| 9 | +spec: |
| 10 | + template: |
| 11 | + spec: |
| 12 | + serviceAccountName: demo-serviceaccount |
| 13 | + initContainers: |
| 14 | + - name: wait-for-testdata |
| 15 | + image: docker.stackable.tech/stackable/tools:0.2.0-stackable0.3.0 |
| 16 | + command: ["bash", "-c", "echo 'Waiting for all kafka brokers to be ready' && kubectl wait --for=condition=ready --timeout=30m pod -l app.kubernetes.io/instance=kafka -l app.kubernetes.io/name=kafka"] |
| 17 | + containers: |
| 18 | + - name: create-spark-ingestion-job |
| 19 | + image: docker.stackable.tech/stackable/tools:0.2.0-stackable0.3.0 |
| 20 | + command: ["bash", "-c", "echo 'Submitting Spark job' && kubectl apply -f /tmp/manifest/spark-ingestion-job.yaml"] |
| 21 | + volumeMounts: |
| 22 | + - name: manifest |
| 23 | + mountPath: /tmp/manifest |
| 24 | + volumes: |
| 25 | + - name: manifest |
| 26 | + configMap: |
| 27 | + name: create-spark-ingestion-job-manifest |
| 28 | + restartPolicy: OnFailure |
| 29 | + backoffLimit: 50 |
| 30 | +--- |
| 31 | +apiVersion: v1 |
| 32 | +kind: ConfigMap |
| 33 | +metadata: |
| 34 | + name: create-spark-ingestion-job-manifest |
| 35 | +data: |
| 36 | + spark-ingestion-job.yaml: | |
| 37 | + --- |
| 38 | + apiVersion: spark.stackable.tech/v1alpha1 |
| 39 | + kind: SparkApplication |
| 40 | + metadata: |
| 41 | + name: spark-ingest-into-warehouse |
| 42 | + spec: |
| 43 | + version: "1.0" |
| 44 | + sparkImage: docker.stackable.tech/sbernauer/pyspark-k8s-with-iceberg:latest3 # docker.stackable.tech/stackable/pyspark-k8s:3.3.0-stackable0.2.0 |
| 45 | + mode: cluster |
| 46 | + mainApplicationFile: local:///stackable/spark/jobs/spark-ingest-into-warehouse.py |
| 47 | + # deps: |
| 48 | + # packages: |
| 49 | + # - org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:0.14.1 |
| 50 | + # - org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 |
| 51 | + sparkConf: |
| 52 | + spark.hadoop.fs.s3a.endpoint: http://minio:9000 |
| 53 | + spark.hadoop.fs.s3a.path.style.access: "true" |
| 54 | + spark.hadoop.fs.s3a.access.key: trino |
| 55 | + spark.hadoop.fs.s3a.secret.key: trinotrino |
| 56 | + spark.sql.extensions: org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions |
| 57 | + spark.sql.catalog.warehouse: org.apache.iceberg.spark.SparkCatalog |
| 58 | + spark.sql.catalog.warehouse.type: hive |
| 59 | + spark.sql.catalog.warehouse.uri: thrift://hive-iceberg:9083 |
| 60 | + volumes: |
| 61 | + - name: script |
| 62 | + configMap: |
| 63 | + name: write-iceberg-table-script |
| 64 | + job: |
| 65 | + resources: |
| 66 | + cpu: |
| 67 | + min: "100m" |
| 68 | + max: "1" |
| 69 | + driver: |
| 70 | + resources: |
| 71 | + cpu: |
| 72 | + min: "1" |
| 73 | + max: "1" |
| 74 | + memory: |
| 75 | + limit: "2Gi" |
| 76 | + volumeMounts: |
| 77 | + - name: script |
| 78 | + mountPath: /stackable/spark/jobs |
| 79 | + executor: |
| 80 | + instances: 4 |
| 81 | + resources: |
| 82 | + cpu: |
| 83 | + min: "2" |
| 84 | + max: "4" |
| 85 | + memory: |
| 86 | + limit: "12Gi" |
| 87 | + volumeMounts: |
| 88 | + - name: script |
| 89 | + mountPath: /stackable/spark/jobs |
| 90 | +--- |
| 91 | +apiVersion: v1 |
| 92 | +kind: ConfigMap |
| 93 | +metadata: |
| 94 | + name: write-iceberg-table-script |
| 95 | +data: |
| 96 | + spark-ingest-into-warehouse.py: | |
| 97 | + from pyspark.sql import SparkSession |
| 98 | + from pyspark.sql.types import StructType, StructField, StringType, LongType, ShortType, FloatType, DoubleType, BooleanType, TimestampType, MapType, ArrayType |
| 99 | + from pyspark.sql.functions import col, from_json, expr |
| 100 | + import time |
| 101 | + from datetime import datetime, timedelta |
| 102 | +
| 103 | + spark = SparkSession.builder.appName("spark-ingest-into-warehouse").getOrCreate() |
| 104 | + # spark.sparkContext.setLogLevel("DEBUG") |
| 105 | +
| 106 | + spark.sql("CREATE SCHEMA IF NOT EXISTS warehouse.water_levels LOCATION 's3a://warehouse/water-levels/'") |
| 107 | + spark.sql("CREATE SCHEMA IF NOT EXISTS warehouse.smart_city LOCATION 's3a://warehouse/smart-city/'") |
| 108 | +
| 109 | + # Todo add PARTITIONED BY (days(timestamp)) |
| 110 | + # Currently fails with org.apache.spark.sql.AnalysisException: days(timestamp) ASC NULLS FIRST is not currently supported |
| 111 | + # Don't forget to add option("fanout-enabled", "true") to iceberg sink as well |
| 112 | + # see https://github.com/apache/iceberg/issues/5625 |
| 113 | + spark.sql("CREATE TABLE IF NOT EXISTS warehouse.water_levels.measurements (station_uuid string, timestamp timestamp, value float) USING iceberg") |
| 114 | + spark.sql("CREATE TABLE IF NOT EXISTS warehouse.water_levels.stations (uuid string, number bigint, short_name string, long_name string, km float, agency string, latitude double, longitude double, water_short_name string, water_long_name string) USING iceberg") |
| 115 | + spark.sql("CREATE TABLE IF NOT EXISTS warehouse.smart_city.shared_bikes_bike_status (bike_id string, vehicle_type_id string, latitude double, longitude double, is_reserved boolean, is_disabled boolean, last_reported timestamp) USING iceberg") |
| 116 | + spark.sql("CREATE TABLE IF NOT EXISTS warehouse.smart_city.shared_bikes_station_information (station_id string, name string, latitude double, longitude double) USING iceberg") |
| 117 | + spark.sql("CREATE TABLE IF NOT EXISTS warehouse.smart_city.shared_bikes_station_status (station_id string, num_bikes_available short, is_installed boolean, is_renting boolean, is_returning boolean, vehicle_types_available map<string,short>, last_reported timestamp) USING iceberg") |
| 118 | +
| 119 | + schema = StructType([ |
| 120 | + StructField("station_uuid", StringType(), True), |
| 121 | + StructField("timestamp", TimestampType(), True), |
| 122 | + StructField("value", FloatType(), True), |
| 123 | + ]) |
| 124 | + spark \ |
| 125 | + .readStream \ |
| 126 | + .format("kafka") \ |
| 127 | + .option("kafka.bootstrap.servers", "kafka:9092") \ |
| 128 | + .option("subscribe", "water_levels_measurements") \ |
| 129 | + .option("startingOffsets", "earliest") \ |
| 130 | + .option("maxOffsetsPerTrigger", 50000000) \ |
| 131 | + .load() \ |
| 132 | + .selectExpr("cast(key as string)", "cast(value as string)") \ |
| 133 | + .withColumn("json", from_json(col("value"), schema)) \ |
| 134 | + .select("json.station_uuid", "json.timestamp", "json.value") \ |
| 135 | + .writeStream \ |
| 136 | + .queryName("ingest water_level measurements") \ |
| 137 | + .format("iceberg") \ |
| 138 | + .outputMode("append") \ |
| 139 | + .trigger(processingTime='5 minutes') \ |
| 140 | + .option("path", "warehouse.water_levels.measurements") \ |
| 141 | + .option("checkpointLocation", "s3a://warehouse/water-levels/measurements/checkpoints") \ |
| 142 | + .start() |
| 143 | +
| 144 | + schema = StructType([ \ |
| 145 | + StructField("uuid", StringType(), True), \ |
| 146 | + StructField("number", StringType(), True), \ |
| 147 | + StructField("shortname", StringType(), True), \ |
| 148 | + StructField("longname", StringType(), True), \ |
| 149 | + StructField("km", FloatType(), True), \ |
| 150 | + StructField("agency", StringType(), True), \ |
| 151 | + StructField("latitude", DoubleType(), True), \ |
| 152 | + StructField("longitude", DoubleType(), True), \ |
| 153 | + StructField("water", \ |
| 154 | + StructType([StructField("shortname", StringType(), True), StructField("longname", StringType(), True)]), \ |
| 155 | + True), \ |
| 156 | + ]) |
| 157 | + spark \ |
| 158 | + .readStream \ |
| 159 | + .format("kafka") \ |
| 160 | + .option("kafka.bootstrap.servers", "kafka:9092") \ |
| 161 | + .option("subscribe", "water_levels_stations") \ |
| 162 | + .option("startingOffsets", "earliest") \ |
| 163 | + .option("maxOffsetsPerTrigger", 10000) \ |
| 164 | + .load() \ |
| 165 | + .selectExpr("cast(key as string)", "cast(value as string)") \ |
| 166 | + .withColumn("json", from_json(col("value"), schema)) \ |
| 167 | + .selectExpr( \ |
| 168 | + "json.uuid", \ |
| 169 | + "cast(json.number as bigint) as number", \ |
| 170 | + "json.shortname as short_name", \ |
| 171 | + "json.longname as long_name", \ |
| 172 | + "json.km", "json.agency", \ |
| 173 | + "json.latitude", \ |
| 174 | + "json.longitude", \ |
| 175 | + "json.water.shortname as water_short_name", \ |
| 176 | + "json.water.longname as water_long_name" \ |
| 177 | + ) \ |
| 178 | + .writeStream \ |
| 179 | + .queryName("ingest water_level stations") \ |
| 180 | + .format("iceberg") \ |
| 181 | + .outputMode("append") \ |
| 182 | + .trigger(processingTime='2 minutes') \ |
| 183 | + .option("path", "warehouse.water_levels.stations") \ |
| 184 | + .option("checkpointLocation", "s3a://warehouse/water-levels/stations/checkpoints") \ |
| 185 | + .start() |
| 186 | +
| 187 | + schema = StructType([ \ |
| 188 | + StructField("station_id", StringType(), True), \ |
| 189 | + StructField("lat", DoubleType(), True), \ |
| 190 | + StructField("lon", DoubleType(), True), \ |
| 191 | + StructField("name", StringType(), True), \ |
| 192 | + ]) |
| 193 | + spark \ |
| 194 | + .readStream \ |
| 195 | + .format("kafka") \ |
| 196 | + .option("kafka.bootstrap.servers", "kafka:9092") \ |
| 197 | + .option("subscribe", "shared_bikes_station_information") \ |
| 198 | + .option("startingOffsets", "earliest") \ |
| 199 | + .option("maxOffsetsPerTrigger", 10000) \ |
| 200 | + .load() \ |
| 201 | + .selectExpr("cast(key as string)", "cast(value as string)") \ |
| 202 | + .withColumn("json", from_json(col("value"), schema)) \ |
| 203 | + .selectExpr("json.station_id", "json.name as name", "json.lat as latitude", "json.lon as longitude") \ |
| 204 | + .writeStream \ |
| 205 | + .queryName("ingest smart_city shared_bikes_station_information") \ |
| 206 | + .format("iceberg") \ |
| 207 | + .outputMode("append") \ |
| 208 | + .trigger(processingTime='2 minutes') \ |
| 209 | + .option("path", "warehouse.smart_city.shared_bikes_station_information") \ |
| 210 | + .option("checkpointLocation", "s3a://warehouse/smart-city/shared_bikes_station_information/checkpoints") \ |
| 211 | + .start() |
| 212 | +
| 213 | + schema = StructType([ \ |
| 214 | + StructField("station_id", StringType(), True), \ |
| 215 | + StructField("is_installed", BooleanType(), True), \ |
| 216 | + StructField("last_reported", TimestampType(), True), \ |
| 217 | + StructField("num_bikes_available", ShortType(), True), \ |
| 218 | + StructField("is_renting", BooleanType(), True), \ |
| 219 | + StructField("is_returning", BooleanType(), True), \ |
| 220 | + StructField("vehicle_types_available", ArrayType(StructType([StructField("count", ShortType(), True), StructField("vehicle_type_id", StringType(), True)]), True), True), \ |
| 221 | + ]) |
| 222 | + spark \ |
| 223 | + .readStream \ |
| 224 | + .format("kafka") \ |
| 225 | + .option("kafka.bootstrap.servers", "kafka:9092") \ |
| 226 | + .option("subscribe", "shared_bikes_station_status") \ |
| 227 | + .option("startingOffsets", "earliest") \ |
| 228 | + .option("maxOffsetsPerTrigger", 10000) \ |
| 229 | + .load() \ |
| 230 | + .selectExpr("cast(key as string)", "cast(value as string)") \ |
| 231 | + .withColumn("json", from_json(col("value"), schema)) \ |
| 232 | + .selectExpr( \ |
| 233 | + "json.station_id", \ |
| 234 | + "json.num_bikes_available", \ |
| 235 | + "json.is_installed", \ |
| 236 | + "json.is_renting", \ |
| 237 | + "json.is_returning", \ |
| 238 | + "map_from_arrays(json.vehicle_types_available.vehicle_type_id, json.vehicle_types_available.count) as vehicle_types_available", \ |
| 239 | + "json.last_reported" \ |
| 240 | + ) \ |
| 241 | + .writeStream \ |
| 242 | + .queryName("ingest smart_city shared_bikes_station_status") \ |
| 243 | + .format("iceberg") \ |
| 244 | + .outputMode("append") \ |
| 245 | + .trigger(processingTime='2 minutes') \ |
| 246 | + .option("path", "warehouse.smart_city.shared_bikes_station_status") \ |
| 247 | + .option("checkpointLocation", "s3a://warehouse/smart-city/shared_bikes_station_status/checkpoints") \ |
| 248 | + .start() |
| 249 | +
| 250 | + schema = StructType([ \ |
| 251 | + StructField("lat", DoubleType(), True), \ |
| 252 | + StructField("lon", DoubleType(), True), \ |
| 253 | + StructField("bike_id", StringType(), True), \ |
| 254 | + StructField("is_reserved", BooleanType(), True), \ |
| 255 | + StructField("is_disabled", BooleanType(), True), \ |
| 256 | + StructField("vehicle_type_id", StringType(), True), \ |
| 257 | + StructField("last_reported", TimestampType(), True), \ |
| 258 | + ]) |
| 259 | + spark \ |
| 260 | + .readStream \ |
| 261 | + .format("kafka") \ |
| 262 | + .option("kafka.bootstrap.servers", "kafka:9092") \ |
| 263 | + .option("subscribe", "shared_bikes_bike_status") \ |
| 264 | + .option("startingOffsets", "earliest") \ |
| 265 | + .option("maxOffsetsPerTrigger", 10000) \ |
| 266 | + .load() \ |
| 267 | + .selectExpr("cast(key as string)", "cast(value as string)") \ |
| 268 | + .withColumn("json", from_json(col("value"), schema)) \ |
| 269 | + .selectExpr("json.bike_id", "json.vehicle_type_id", "json.lat as latitude", "json.lon as longitude", "json.is_reserved", "json.is_disabled", "json.last_reported") \ |
| 270 | + .writeStream \ |
| 271 | + .queryName("ingest smart_city shared_bikes_bike_status") \ |
| 272 | + .format("iceberg") \ |
| 273 | + .outputMode("append") \ |
| 274 | + .trigger(processingTime='2 minutes') \ |
| 275 | + .option("path", "warehouse.smart_city.shared_bikes_bike_status") \ |
| 276 | + .option("checkpointLocation", "s3a://warehouse/smart-city/shared_bikes_bike_status/checkpoints") \ |
| 277 | + .start() |
| 278 | +
| 279 | + # key: table name |
| 280 | + # value: compaction strategy |
| 281 | + tables_to_compact = { |
| 282 | + # "water_levels.measurements": ", strategy => 'sort', sort_order => 'station_uuid ASC NULLS LAST,timestamp DESC NULLS LAST'", |
| 283 | + "water_levels.measurements": "", |
| 284 | + "water_levels.stations": "", |
| 285 | + "warehouse.smart_city.shared_bikes_station_information": "", |
| 286 | + "warehouse.smart_city.shared_bikes_station_status": "", |
| 287 | + "warehouse.smart_city.shared_bikes_bike_status": "", |
| 288 | + } |
| 289 | +
| 290 | + while True: |
| 291 | + expire_before = (datetime.now() - timedelta(hours=4)).strftime("%Y-%m-%d %H:%M:%S") |
| 292 | + for table, table_compaction_strategy in tables_to_compact.items(): |
| 293 | + print(f"[{table}] Expiring snapshots older than 4 hours ({expire_before})") |
| 294 | + spark.sql(f"CALL warehouse.system.expire_snapshots(table => '{table}', older_than => TIMESTAMP '{expire_before}', retain_last => 50, stream_results => true)") |
| 295 | +
| 296 | + print(f"[{table}] Removing orphaned files") |
| 297 | + spark.sql(f"CALL warehouse.system.remove_orphan_files(table => '{table}')") |
| 298 | +
| 299 | + print(f"[{table}] Starting compaction") |
| 300 | + spark.sql(f"CALL warehouse.system.rewrite_data_files(table => '{table}'{table_compaction_strategy})") |
| 301 | + print(f"[{table}] Finished compaction") |
| 302 | +
| 303 | + print("All tables compacted. Waiting 25min before scheduling next run...") |
| 304 | + time.sleep(25 * 60) # Assuming compaction takes 5 min run every 30 minutes |
