diff --git a/common/utils/src/main/resources/error/error-conditions.json b/common/utils/src/main/resources/error/error-conditions.json index 60bce107186f4..470eb9501a4df 100644 --- a/common/utils/src/main/resources/error/error-conditions.json +++ b/common/utils/src/main/resources/error/error-conditions.json @@ -565,6 +565,12 @@ ], "sqlState" : "22KD3" }, + "CANNOT_USE_MULTI_ALIASES_IN_WATERMARK_CLAUSE" : { + "message" : [ + "Multiple aliases are not supported in watermark clause." + ], + "sqlState" : "42000" + }, "CANNOT_WRITE_STATE_STORE" : { "message" : [ "Error writing state store files for provider ." diff --git a/docs/sql-ref-ansi-compliance.md b/docs/sql-ref-ansi-compliance.md index 8d2c13beff975..7832648c97e82 100644 --- a/docs/sql-ref-ansi-compliance.md +++ b/docs/sql-ref-ansi-compliance.md @@ -497,6 +497,7 @@ Below is a list of all the keywords in Spark SQL. |DEFAULT|non-reserved|non-reserved|non-reserved| |DEFINED|non-reserved|non-reserved|non-reserved| |DEFINER|non-reserved|non-reserved|non-reserved| +|DELAY|non-reserved|non-reserved|non-reserved| |DELETE|non-reserved|non-reserved|reserved| |DELIMITED|non-reserved|non-reserved|non-reserved| |DESC|non-reserved|non-reserved|non-reserved| @@ -791,6 +792,7 @@ Below is a list of all the keywords in Spark SQL. |VIEW|non-reserved|non-reserved|non-reserved| |VIEWS|non-reserved|non-reserved|non-reserved| |VOID|non-reserved|non-reserved|non-reserved| +|WATERMARK|non-reserved|non-reserved|non-reserved| |WEEK|non-reserved|non-reserved|non-reserved| |WEEKS|non-reserved|non-reserved|non-reserved| |WHEN|reserved|non-reserved|reserved| diff --git a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseLexer.g4 b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseLexer.g4 index e402067926f2a..793d63485855e 100644 --- a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseLexer.g4 +++ b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseLexer.g4 @@ -206,6 +206,7 @@ DECLARE: 'DECLARE'; DEFAULT: 'DEFAULT'; DEFINED: 'DEFINED'; DEFINER: 'DEFINER'; +DELAY: 'DELAY'; DELETE: 'DELETE'; DELIMITED: 'DELIMITED'; DESC: 'DESC'; @@ -499,6 +500,7 @@ VERSION: 'VERSION'; VIEW: 'VIEW'; VIEWS: 'VIEWS'; VOID: 'VOID'; +WATERMARK: 'WATERMARK'; WEEK: 'WEEK'; WEEKS: 'WEEKS'; WHEN: 'WHEN'; diff --git a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 index 8efab99d4ec83..d9b860ea46f07 100644 --- a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 +++ b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 @@ -376,8 +376,10 @@ createPipelineDatasetHeader ; streamRelationPrimary - : STREAM multipartIdentifier optionsClause? tableAlias #streamTableName - | STREAM LEFT_PAREN multipartIdentifier RIGHT_PAREN optionsClause? tableAlias #streamTableName + : STREAM multipartIdentifier optionsClause? watermarkClause? + tableAlias #streamTableName + | STREAM LEFT_PAREN multipartIdentifier RIGHT_PAREN + optionsClause? watermarkClause? tableAlias #streamTableName ; setResetStatement @@ -921,6 +923,10 @@ lateralView : LATERAL VIEW (OUTER)? qualifiedName LEFT_PAREN (expression (COMMA expression)*)? RIGHT_PAREN tblName=identifier (AS? colName+=identifier (COMMA colName+=identifier)*)? ; +watermarkClause + : WATERMARK colName=namedExpression DELAY OF delay=interval + ; + setQuantifier : DISTINCT | ALL @@ -995,9 +1001,11 @@ identifierComment relationPrimary : streamRelationPrimary #streamRelation | identifierReference temporalClause? - optionsClause? sample? tableAlias #tableName - | LEFT_PAREN query RIGHT_PAREN sample? tableAlias #aliasedQuery - | LEFT_PAREN relation RIGHT_PAREN sample? tableAlias #aliasedRelation + optionsClause? sample? watermarkClause? tableAlias #tableName + | LEFT_PAREN query RIGHT_PAREN sample? watermarkClause? + tableAlias #aliasedQuery + | LEFT_PAREN relation RIGHT_PAREN sample? + watermarkClause? tableAlias #aliasedRelation | inlineTable #inlineTableDefault2 | functionTable #tableValuedFunction ; @@ -1006,6 +1014,8 @@ optionsClause : WITH options=propertyList ; +// Unlike all other types of expression for relation, we do not support watermarkClause for +// inlineTable. inlineTable : VALUES expression (COMMA expression)* tableAlias ; @@ -1042,10 +1052,13 @@ functionTableArgument | functionArgument ; +// This is only used in relationPrimary where having watermarkClause makes sense. If this becomes +// referred by other clause, please check wheter watermarkClause makes sense to the clause. +// If not, consider separate this rule. functionTable : funcName=functionName LEFT_PAREN (functionTableArgument (COMMA functionTableArgument)*)? - RIGHT_PAREN tableAlias + RIGHT_PAREN watermarkClause? tableAlias ; tableAlias @@ -1793,6 +1806,7 @@ ansiNonReserved | DEFAULT | DEFINED | DEFINER + | DELAY | DELETE | DELIMITED | DESC @@ -2035,6 +2049,7 @@ ansiNonReserved | WEEK | WEEKS | WHILE + | WATERMARK | WINDOW | WITHOUT | YEAR @@ -2160,6 +2175,7 @@ nonReserved | DEFAULT | DEFINED | DEFINER + | DELAY | DELETE | DELIMITED | DESC @@ -2439,6 +2455,7 @@ nonReserved | VIEW | VIEWS | VOID + | WATERMARK | WEEK | WEEKS | WHILE diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveEventTimeWatermark.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveEventTimeWatermark.scala new file mode 100644 index 0000000000000..f245ec8707760 --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveEventTimeWatermark.scala @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.analysis + +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.expressions.{Expression, NamedExpression} +import org.apache.spark.sql.catalyst.plans.logical.{EventTimeWatermark, LogicalPlan, Project} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.catalyst.trees.TreePattern + +/** + * Resolve [[UnresolvedEventTimeWatermark]] to [[EventTimeWatermark]]. + */ +object ResolveEventTimeWatermark extends Rule[LogicalPlan] { + override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUpWithPruning( + _.containsPattern(TreePattern.UNRESOLVED_EVENT_TIME_WATERMARK), ruleId) { + + case u: UnresolvedEventTimeWatermark if u.eventTimeColExpr.resolved && u.childrenResolved => + val uuid = java.util.UUID.randomUUID() + + if (u.eventTimeColExpr.isInstanceOf[MultiAlias]) { + throw new AnalysisException( + errorClass = "CANNOT_USE_MULTI_ALIASES_IN_WATERMARK_CLAUSE", + messageParameters = Map() + ) + } + + val namedExpression = u.eventTimeColExpr match { + case e: NamedExpression => e + case e: Expression => UnresolvedAlias(e) + } + + if (u.child.outputSet.contains(namedExpression)) { + // We don't need to have projection since the attribute being referenced will be available. + EventTimeWatermark(uuid, namedExpression.toAttribute, u.delay, u.child) + } else { + // We need to inject projection as we can't find the matching column directly in the + // child output. + val proj = Project(Seq(namedExpression, UnresolvedStar(None)), u.child) + val attrRef = proj.projectList.head.toAttribute + EventTimeWatermark(uuid, attrRef, u.delay, proj) + } + } +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala index b759c70266f7a..9928284e09050 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala @@ -31,6 +31,7 @@ import org.apache.spark.sql.connector.catalog.TableWritePrivilege import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors} import org.apache.spark.sql.types.{DataType, Metadata, StructType} import org.apache.spark.sql.util.{CaseInsensitiveStringMap, SchemaUtils} +import org.apache.spark.unsafe.types.CalendarInterval import org.apache.spark.util.ArrayImplicits._ /** @@ -1228,3 +1229,15 @@ case class UnresolvedExecuteImmediate( final override val nodePatterns: Seq[TreePattern] = Seq(EXECUTE_IMMEDIATE) } + +case class UnresolvedEventTimeWatermark( + eventTimeColExpr: Expression, + delay: CalendarInterval, + child: LogicalPlan) + extends UnresolvedUnaryNode { + + final override val nodePatterns: Seq[TreePattern] = Seq(UNRESOLVED_EVENT_TIME_WATERMARK) + + override protected def withNewChildInternal( + newChild: LogicalPlan): UnresolvedEventTimeWatermark = copy(child = newChild) +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala index 1d7cf5455e57b..06c8650596e15 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala @@ -34,7 +34,7 @@ import org.apache.spark.sql.catalyst.trees.CurrentOrigin import org.apache.spark.sql.catalyst.types.DataTypeUtils import org.apache.spark.sql.catalyst.util.CollationFactory import org.apache.spark.sql.types._ -import org.apache.spark.unsafe.types.UTF8String +import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String} /** * A collection of implicit conversions that create a DSL for constructing catalyst data structures. @@ -566,6 +566,17 @@ package object dsl extends SQLConfHelper { } def deduplicate(colNames: Attribute*): LogicalPlan = Deduplicate(colNames, logicalPlan) + + def watermark(expr: Expression, delayThreshold: CalendarInterval): LogicalPlan = { + val namedExpression = expr match { + case e: NamedExpression => e + case e: Expression => UnresolvedAlias(e) + } + val proj = Project(Seq(namedExpression, UnresolvedStar(None)), logicalPlan) + val attrRef = proj.projectList.head.toAttribute + + EventTimeWatermark(java.util.UUID.randomUUID(), attrRef, delayThreshold, proj) + } } } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index e43e32f04fbf1..fe5724b11fd46 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -2076,6 +2076,22 @@ class AstBuilder extends DataTypeAstBuilder query) } + /** + * Add an [[EventTimeWatermark]] to a logical plan. + */ + private def withWatermark( + ctx: WatermarkClauseContext, + query: LogicalPlan): LogicalPlan = withOrigin(ctx) { + val expression = visitNamedExpression(ctx.namedExpression()) + val delayInterval = visitInterval(ctx.delay) + + val delay = IntervalUtils.fromIntervalString(delayInterval.toString) + require(!IntervalUtils.isNegative(delay), + s"delay threshold (${delayInterval.toString}) should not be negative.") + + UnresolvedEventTimeWatermark(expression, delay, query) + } + /** * Create a single relation referenced in a FROM clause. This method is used when a part of the * join condition is nested, for example: @@ -2252,7 +2268,8 @@ class AstBuilder extends DataTypeAstBuilder val relation = createUnresolvedRelation(ctx.identifierReference, Option(ctx.optionsClause)) val table = mayApplyAliasPlan( ctx.tableAlias, relation.optionalMap(ctx.temporalClause)(withTimeTravel)) - table.optionalMap(ctx.sample)(withSample) + val sample = table.optionalMap(ctx.sample)(withSample) + sample.optionalMap(ctx.watermarkClause)(withWatermark) } override def visitVersion(ctx: VersionContext): Option[String] = { @@ -2392,7 +2409,9 @@ class AstBuilder extends DataTypeAstBuilder val tvfAliases = if (aliases.nonEmpty) UnresolvedTVFAliases(ident, tvf, aliases) else tvf - tvfAliases.optionalMap(func.tableAlias.strictIdentifier)(aliasPlan) + val watermarkClause = func.watermarkClause() + val tvfWithWatermark = tvfAliases.optionalMap(watermarkClause)(withWatermark) + tvfWithWatermark.optionalMap(func.tableAlias.strictIdentifier)(aliasPlan) }) } @@ -2404,7 +2423,9 @@ class AstBuilder extends DataTypeAstBuilder optionsClause = Option(ctx.optionsClause), writePrivileges = Seq.empty, isStreaming = true) - mayApplyAliasPlan(ctx.tableAlias, tableStreamingRelation) + + val tableWithWatermark = tableStreamingRelation.optionalMap(ctx.watermarkClause)(withWatermark) + mayApplyAliasPlan(ctx.tableAlias, tableWithWatermark) } /** @@ -2447,7 +2468,8 @@ class AstBuilder extends DataTypeAstBuilder */ override def visitAliasedRelation(ctx: AliasedRelationContext): LogicalPlan = withOrigin(ctx) { val relation = plan(ctx.relation).optionalMap(ctx.sample)(withSample) - mayApplyAliasPlan(ctx.tableAlias, relation) + val watermark = relation.optionalMap(ctx.watermarkClause)(withWatermark) + mayApplyAliasPlan(ctx.tableAlias, watermark) } /** @@ -2460,7 +2482,7 @@ class AstBuilder extends DataTypeAstBuilder */ override def visitAliasedQuery(ctx: AliasedQueryContext): LogicalPlan = withOrigin(ctx) { val relation = plan(ctx.query).optionalMap(ctx.sample)(withSample) - if (ctx.tableAlias.strictIdentifier == null) { + val alias = if (ctx.tableAlias.strictIdentifier == null) { // For un-aliased subqueries, use a default alias name that is not likely to conflict with // normal subquery names, so that parent operators can only access the columns in subquery by // unqualified names. Users can still use this special qualifier to access columns if they @@ -2469,6 +2491,7 @@ class AstBuilder extends DataTypeAstBuilder } else { mayApplyAliasPlan(ctx.tableAlias, relation) } + alias.optionalMap(ctx.watermarkClause)(withWatermark) } /** diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleIdCollection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleIdCollection.scala index c68b8a2c29af9..51418238e31a8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleIdCollection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleIdCollection.scala @@ -98,6 +98,7 @@ object RuleIdCollection { "org.apache.spark.sql.catalyst.analysis.ResolveRowLevelCommandAssignments" :: "org.apache.spark.sql.catalyst.analysis.ResolveSetVariable" :: "org.apache.spark.sql.catalyst.analysis.ResolveExecuteImmediate" :: + "org.apache.spark.sql.catalyst.analysis.ResolveEventTimeWatermark" :: "org.apache.spark.sql.catalyst.analysis.ResolveTableSpec" :: "org.apache.spark.sql.catalyst.analysis.ResolveTimeZone" :: "org.apache.spark.sql.catalyst.analysis.ResolveUnion" :: diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreePatterns.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreePatterns.scala index c35aa7403d767..dbc151c12dda3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreePatterns.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreePatterns.scala @@ -170,6 +170,7 @@ object TreePattern extends Enumeration { // Unresolved Plan patterns (Alphabetically ordered) val PLAN_WITH_UNRESOLVED_IDENTIFIER: Value = Value + val UNRESOLVED_EVENT_TIME_WATERMARK: Value = Value val UNRESOLVED_HAVING: Value = Value val UNRESOLVED_HINT: Value = Value val UNRESOLVED_FUNC: Value = Value diff --git a/sql/core/src/main/scala/org/apache/spark/sql/classic/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/classic/Dataset.scala index b8ffa09dfa05c..9b40e6fabdc9e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/classic/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/classic/Dataset.scala @@ -595,9 +595,8 @@ class Dataset[T] private[sql]( val parsedDelay = IntervalUtils.fromIntervalString(delayThreshold) require(!IntervalUtils.isNegative(parsedDelay), s"delay threshold ($delayThreshold) should not be negative.") - EliminateEventTimeWatermark( - EventTimeWatermark(util.UUID.randomUUID(), UnresolvedAttribute(eventTime), - parsedDelay, logicalPlan)) + EventTimeWatermark(util.UUID.randomUUID(), UnresolvedAttribute(eventTime), + parsedDelay, logicalPlan) } /** @inheritdoc */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala index 5abb1e75543ad..c967497b660c7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.internal import org.apache.spark.annotation.Unstable import org.apache.spark.sql.{DataSourceRegistration, ExperimentalMethods, SparkSessionExtensions, UDTFRegistration} import org.apache.spark.sql.artifact.ArtifactManager -import org.apache.spark.sql.catalyst.analysis.{Analyzer, EvalSubqueriesForTimeTravel, FunctionRegistry, InvokeProcedures, ReplaceCharWithVarchar, ResolveDataSource, ResolveExecuteImmediate, ResolveSessionCatalog, ResolveTranspose, TableFunctionRegistry} +import org.apache.spark.sql.catalyst.analysis.{Analyzer, EvalSubqueriesForTimeTravel, FunctionRegistry, InvokeProcedures, ReplaceCharWithVarchar, ResolveDataSource, ResolveEventTimeWatermark, ResolveExecuteImmediate, ResolveSessionCatalog, ResolveTranspose, TableFunctionRegistry} import org.apache.spark.sql.catalyst.analysis.resolver.ResolverExtension import org.apache.spark.sql.catalyst.catalog.{FunctionExpressionBuilder, SessionCatalog} import org.apache.spark.sql.catalyst.expressions.{Expression, ExtractSemiStructuredFields} @@ -246,6 +246,7 @@ abstract class BaseSessionStateBuilder( new InvokeProcedures(session) +: ResolveExecuteImmediate(session, this.catalogManager) +: ExtractSemiStructuredFields +: + ResolveEventTimeWatermark +: customResolutionRules override val postHocResolutionRules: Seq[Rule[LogicalPlan]] = diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkWithWatermarkDefInSelectSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkWithWatermarkDefInSelectSuite.scala new file mode 100644 index 0000000000000..072c9b71d2928 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkWithWatermarkDefInSelectSuite.scala @@ -0,0 +1,267 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import java.{util => ju} +import java.text.SimpleDateFormat + +import org.scalatest.BeforeAndAfter + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.{DataFrame, Row} +import org.apache.spark.sql.catalyst.util.DateTimeTestUtils.UTC +import org.apache.spark.sql.execution.streaming.runtime.MemoryStream +import org.apache.spark.sql.functions.{col, timestamp_seconds} +import org.apache.spark.sql.streaming.StateStoreMetricsTest + +class EventTimeWatermarkWithWatermarkDefInSelectSuite + extends StateStoreMetricsTest + with BeforeAndAfter + with Logging { + + import testImplicits._ + + after { + sqlContext.streams.active.foreach(_.stop()) + } + + test("event time and watermark metrics with watermark in select DML - case 1") { + // All event time metrics where watermarking is set + val inputData = MemoryStream[Int] + val df = inputData.toDF() + .withColumn("eventTime", timestamp_seconds(col("value"))) + df.createOrReplaceTempView("stream_src") + val aggWithWatermark = spark.sql( + """ + |SELECT + | CAST(window.start AS LONG), CAST(count(*) AS LONG) AS count + |FROM + | stream_src WATERMARK eventTime DELAY OF INTERVAL 10 seconds + |GROUP BY window(eventTime, '5 seconds') + |""".stripMargin) + + testWindowedAggregation(inputData, aggWithWatermark) + } + + test("event time and watermark metrics with watermark in select DML - case 2") { + // All event time metrics where watermarking is set + val inputData = MemoryStream[Int] + val df = inputData.toDF() + df.createOrReplaceTempView("stream_src") + val aggWithWatermark = spark.sql( + """ + |SELECT + | CAST(window.start AS LONG), CAST(count(*) AS LONG) AS count + |FROM + | stream_src + | WATERMARK timestamp_seconds(value) AS eventTime DELAY OF INTERVAL 10 seconds + |GROUP BY window(eventTime, '5 seconds') + |""".stripMargin) + + testWindowedAggregation(inputData, aggWithWatermark) + } + + private def testWindowedAggregation( + inputData: MemoryStream[Int], + dataFrame: DataFrame): Unit = { + testStream(dataFrame)( + AddData(inputData, 15), + CheckAnswer(), + assertEventStats(min = 15, max = 15, avg = 15, wtrmark = 0), + AddData(inputData, 10, 12, 14), + CheckAnswer(), + assertEventStats(min = 10, max = 14, avg = 12, wtrmark = 5), + AddData(inputData, 25), + CheckAnswer((10, 3)), + assertEventStats(min = 25, max = 25, avg = 25, wtrmark = 5) + ) + } + + test("stream-stream join with watermark in select DML - case 1") { + val leftInput = MemoryStream[(Int, Int)] + val rightInput = MemoryStream[(Int, Int)] + + val df1 = leftInput.toDF().toDF("leftKey", "time") + .select($"leftKey", timestamp_seconds($"time") as "leftTime", + ($"leftKey" * 2) as "leftValue") + val df2 = rightInput.toDF().toDF("rightKey", "time") + .select($"rightKey", timestamp_seconds($"time") as "rightTime", + ($"rightKey" * 3) as "rightValue") + + df1.createOrReplaceTempView("stream_left") + df2.createOrReplaceTempView("stream_right") + + val joined = spark.sql( + """ + |SELECT + | leftKey, rightKey, CAST(leftTime AS INTEGER), CAST(rightTime AS INTEGER) + |FROM + | stream_left WATERMARK leftTime DELAY OF INTERVAL 0 second + |FULL OUTER JOIN + | stream_right WATERMARK rightTime DELAY OF INTERVAL 0 second + |ON + | leftKey = rightKey AND leftTime BETWEEN rightTime - INTERVAL 5 SECONDS + | AND rightTime + INTERVAL 5 SECONDS + |""".stripMargin) + + testStreamStreamTimeIntervalJoin(leftInput, rightInput, joined) + } + + test("stream-stream join with watermark in select DML - case 2") { + val leftInput = MemoryStream[(Int, Int)] + val rightInput = MemoryStream[(Int, Int)] + + val df1 = leftInput.toDF().toDF("leftKey", "time") + val df2 = rightInput.toDF().toDF("rightKey", "time") + + df1.createOrReplaceTempView("stream_left") + df2.createOrReplaceTempView("stream_right") + + val joined = spark.sql( + """ + |SELECT + | leftKey, rightKey, CAST(leftTime AS INTEGER), CAST(rightTime AS INTEGER) + |FROM + |( + | SELECT + | leftKey, leftTime, leftKey * 2 AS leftValue + | FROM + | stream_left + | WATERMARK timestamp_seconds(time) AS leftTime DELAY OF INTERVAL 0 second + |) + |FULL OUTER JOIN + |( + | SELECT + | rightKey, rightTime, rightKey * 3 AS rightValue + | FROM + | stream_right + | WATERMARK timestamp_seconds(time) AS rightTime DELAY OF INTERVAL 0 second + |) + |ON + | leftKey = rightKey AND leftTime BETWEEN rightTime - INTERVAL 5 SECONDS + | AND rightTime + INTERVAL 5 SECONDS + |""".stripMargin) + + testStreamStreamTimeIntervalJoin(leftInput, rightInput, joined) + } + + private def testStreamStreamTimeIntervalJoin( + leftInput: MemoryStream[(Int, Int)], + rightInput: MemoryStream[(Int, Int)], + dataFrame: DataFrame): Unit = { + testStream(dataFrame)( + AddData(leftInput, (1, 5), (3, 5)), + CheckNewAnswer(), + // states + // left: (1, 5), (3, 5) + // right: nothing + assertNumStateRows(total = 2, updated = 2), + AddData(rightInput, (1, 10), (2, 5)), + // Match left row in the state. + CheckNewAnswer(Row(1, 1, 5, 10)), + // states + // left: (1, 5), (3, 5) + // right: (1, 10), (2, 5) + assertNumStateRows(total = 4, updated = 2), + AddData(rightInput, (1, 9)), + // Match left row in the state. + CheckNewAnswer(Row(1, 1, 5, 9)), + // states + // left: (1, 5), (3, 5) + // right: (1, 10), (2, 5), (1, 9) + assertNumStateRows(total = 5, updated = 1), + // Increase event time watermark to 20s by adding data with time = 30s on both inputs. + AddData(leftInput, (1, 7), (1, 30)), + CheckNewAnswer(Row(1, 1, 7, 9), Row(1, 1, 7, 10)), + // states + // left: (1, 5), (3, 5), (1, 7), (1, 30) + // right: (1, 10), (2, 5), (1, 9) + assertNumStateRows(total = 7, updated = 2), + // Watermark = 30 - 10 = 20, no matched row. + // Generate outer join result for all non-matched rows when the watermark advances. + AddData(rightInput, (0, 30)), + CheckNewAnswer(Row(3, null, 5, null), Row(null, 2, null, 5)), + // states + // left: (1, 30) + // right: (0, 30) + // + // states evicted + // left: (1, 5), (3, 5), (1, 5) (below watermark = 20) + // right: (1, 10), (2, 5), (1, 9) (below watermark = 20) + assertNumStateRows(total = 2, updated = 1) + ) + } + + test("stream-batch join followed by time window aggregation") { + val inputData = MemoryStream[Int] + val df = inputData.toDF() + .withColumn("eventTime", timestamp_seconds(col("value"))) + df.createOrReplaceTempView("stream_src") + + val batchDf = spark.range(0, 50).map { i => + if (i % 2 == 0) (i, "even") else (i, "odd") + }.toDF("value", "batch_value") + batchDf.createOrReplaceTempView("batch_src") + + val agg = spark.sql( + """ + |SELECT + | CAST(window.start AS LONG), batch_value, CAST(count(*) AS LONG) AS count + |FROM + | stream_src WATERMARK eventTime DELAY OF INTERVAL 10 seconds + |JOIN + | batch_src + |ON + | stream_src.value = batch_src.value + |GROUP BY batch_src.batch_value, window(eventTime, '5 seconds') + |""".stripMargin) + + testStream(agg)( + AddData(inputData, 15), + CheckAnswer(), + AddData(inputData, 10, 11, 14), + CheckAnswer(), + AddData(inputData, 25), + CheckAnswer((10, "even", 2), (10, "odd", 1)) + ) + } + + /** Assert event stats generated on that last batch with data in it */ + private def assertEventStats(body: ju.Map[String, String] => Unit): AssertOnQuery = { + Execute("AssertEventStats") { q => + body(q.recentProgress.filter(_.numInputRows > 0).lastOption.get.eventTime) + } + } + + /** Assert event stats generated on that last batch with data in it */ + private def assertEventStats(min: Long, max: Long, avg: Double, wtrmark: Long): AssertOnQuery = { + assertEventStats { e => + assert(e.get("min") === formatTimestamp(min), s"min value mismatch") + assert(e.get("max") === formatTimestamp(max), s"max value mismatch") + assert(e.get("avg") === formatTimestamp(avg.toLong), s"avg value mismatch") + assert(e.get("watermark") === formatTimestamp(wtrmark), s"watermark value mismatch") + } + } + + private val timestampFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") // ISO8601 + timestampFormat.setTimeZone(ju.TimeZone.getTimeZone(UTC)) + + private def formatTimestamp(sec: Long): String = { + timestampFormat.format(new ju.Date(sec * 1000)) + } +} diff --git a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/SqlPipelineSuite.scala b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/SqlPipelineSuite.scala index e921a6bfe2abc..fdde60c7c8756 100644 --- a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/SqlPipelineSuite.scala +++ b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/SqlPipelineSuite.scala @@ -1006,4 +1006,57 @@ class SqlPipelineSuite extends PipelineTest with SharedSparkSession { parameters = Map.empty ) } + + test("Streaming Table with watermark clause") { + withTempDir { tmpDir => + spark.sql("SELECT * FROM RANGE(3)").write.format("parquet").mode("append") + .save(tmpDir.getCanonicalPath) + + val externalTableIdent = fullyQualifiedIdentifier("t") + spark.sql(s"CREATE TABLE $externalTableIdent (id string, eventTime timestamp)") + + withTable(externalTableIdent.quotedString) { + spark.sql(s"INSERT INTO $externalTableIdent VALUES ('a', timestamp_seconds(1))") + spark.sql(s"INSERT INTO $externalTableIdent VALUES ('b', timestamp_seconds(2))") + spark.sql(s"INSERT INTO $externalTableIdent VALUES ('a', timestamp_seconds(3))") + + val unresolvedDataflowGraph = unresolvedDataflowGraphFromSql( + sqlText = + s""" + |CREATE STREAMING TABLE b + |AS + |SELECT + | CAST(window.start AS LONG) AS wStart, + | CAST(window.end AS LONG) AS wEnd, + | id, + | count(*) as cnt + |FROM + | STREAM $externalTableIdent WATERMARK eventTime DELAY OF INTERVAL 10 seconds + |GROUP BY window(eventTime, '5 seconds'), id + |""".stripMargin + ) + + val updateContext = new PipelineUpdateContextImpl( + unresolvedDataflowGraph, eventCallback = _ => ()) + updateContext.pipelineExecution.runPipeline() + updateContext.pipelineExecution.awaitCompletion() + + val datasetFullyQualifiedName = fullyQualifiedIdentifier("b").quotedString + + assert( + spark.sql(s"SELECT * FROM $datasetFullyQualifiedName").collect().toSet == Set() + ) + + spark.sql(s"INSERT INTO $externalTableIdent VALUES ('a', timestamp_seconds(20))") + + updateContext.pipelineExecution.runPipeline() + updateContext.pipelineExecution.awaitCompletion() + + checkAnswer( + spark.sql(s"SELECT * FROM $datasetFullyQualifiedName ORDER BY wStart, wEnd, id"), + Seq(Row(0L, 5L, "a", 2L), Row(0L, 5L, "b", 1L)) + ) + } + } + } }