Skip to content

[FLINK]Improve Watermark Semantics in Gluten Flink #12340

Description

@lgbo-ustc

Description

Improve Watermark Semantics in Gluten Flink

Background

Gluten Flink currently supports the basic propagation of watermark timestamps through the native execution path, but it does not yet fully match Flink's runtime watermark semantics. The current implementation can emit native StatefulWatermark events and translate them into Flink Watermark events, but several parts of the event-time runtime contract are either incomplete or missing.

This issue tracks the work required to gradually improve watermark support in Gluten Flink while minimizing risk. The proposed approach is to start with low-risk Java-side runtime fixes, then move to native operator semantics, and finally address source-level coordination and watermark alignment.

Goals

  • Preserve existing data record routing semantics unless a task explicitly changes them.
  • Improve watermark observability and runtime behavior step by step.
  • Align Gluten Flink watermark behavior with Flink's event-time semantics.
  • Add unit and end-to-end tests for each milestone.

Tasks

  • Complete the basic watermark event path [FLINK]Update Watermark Gauge in Gluten Output Collector #12341

    • Update GlutenOutputCollector.emitWatermark(...) to update WatermarkGauge.
    • Update GlutenOutputCollector.collect(...) to update the gauge before forwarding watermark elements.
    • Add unit tests to verify watermark gauge advancement for source/operator watermark output.
    • Keep existing semantics unchanged; this task should only fix observability and basic runtime behavior.
  • Implement WatermarkStatus propagation

    • Implement Java-side emitWatermarkStatus(WatermarkStatus) and broadcast it to all downstream outputs.
    • Add processWatermarkStatus support in GlutenOneInputOperator and GlutenTwoInputOperator.
    • Add notifyWatermarkStatus(id, status[, inputIndex]) to velox4j JNI.
    • Add processWatermarkStatus to native StatefulTask and StatefulOperator.
    • Extend CombinedWatermarkStatus to support active/idle state updates.
    • Ensure Flink IDLE and ACTIVE statuses can pass through the Gluten operator chain.
  • Implement idle source handling

    • Use idleTimeout_ in native WatermarkAssigner.
    • Detect idle input through either a native timer or a Java mailbox timer.
    • Emit WatermarkStatus.IDLE when the source becomes idle.
    • Emit WatermarkStatus.ACTIVE when new data arrives after an idle period.
    • Exclude idle inputs from min-watermark calculation in multi-input operators.
    • Emit idle status downstream when all inputs are idle.
    • Ensure idle sources do not block global watermark progress.
  • Implement periodic watermark emission

    • Align with Flink's AUTO_WATERMARK_INTERVAL semantics.
    • Add periodic emission because the current native path only emits watermarks when data arrives and crosses a threshold.
    • Evaluate implementation option A: register a Java processing-time timer and periodically call native onPeriodicEmit.
    • Evaluate implementation option B: use a native timer service to trigger the watermark assigner periodically.
    • Track maxTimestampSeen and lastEmittedWatermark in WatermarkAssigner.
    • Emit the current advanceable watermark on each interval to avoid stalls with sparse input.
    • Match Flink WatermarkGenerator.onPeriodicEmit behavior.
  • Fix null semantics for watermark expressions

    • Check nulls in the watermark expression output timestampVector.
    • Skip null watermark results instead of reading raw values or failing unexpectedly.
    • Clarify whether the rowtime field itself may be null; do not conflate rowtime nullability with watermark expression nullability.
    • Add SQL tests such as WATERMARK FOR ts AS CASE WHEN ... THEN ts - INTERVAL ... ELSE NULL END.
    • Align expression-based watermark behavior with Flink planner/runtime semantics.
  • Complete end-of-input and final watermark handling

    • Implement bounded-input finish interfaces in Java Gluten operators.
    • Add native finishInput(inputIndex) and finishInput() APIs.
    • When all inputs finish, advance the final watermark.
    • Flush event-time windows, joins, and timers.
    • Emit Watermark.MAX_WATERMARK.
    • Drain pending output before closing operators.
    • Fix the commented-out max-watermark flush in WindowAggregator::close().
    • Unify end-of-input behavior across LocalWindowAggregator, WindowAggregator, and WindowJoin.
    • Ensure bounded streams and batch-like streams produce complete final window results.
  • Complete source-level watermark support

    • Define the intended source support scope.
    • Short term: support source-level watermarks in legacy GlutenSourceFunction.
    • Medium term: maintain per-split watermarks inside native connector splits.
    • Long term: restore or emulate Flink SourceCoordinator semantics.
    • For Kafka source, add partition/split-level watermark support.
    • For Kafka source, add split idle support.
    • For Kafka source, add source idle support.
    • Ensure multi-partition sources do not block watermark progress when one partition is idle.
  • Implement watermark alignment

    • Treat this as a separate large milestone and implement it after the previous tasks.
    • Decide the architecture first.
    • Option A: preserve Flink SourceCoordinator and only offload reader-side data reading to native.
    • Option B: implement alignment in a Gluten-owned source coordinator.
    • Support source reporting of current watermark.
    • Support coordinator-side group minimum watermark calculation.
    • Support pausing sources/splits that are too far ahead.
    • Support resuming paused sources/splits.
    • Support checkpoint and restore of alignment state.
    • Prefer option A if possible, because the current Gluten source path has been rewritten to legacy source and reimplementing Source API semantics would be costly.
    • Align behavior with Flink's new Source API watermark alignment semantics.
  • Complete control event and multi-output semantics

    • Implement or explicitly reject LatencyMarker forwarding.
    • Implement or explicitly reject RecordAttributes forwarding.
    • Implement side-output watermark/status propagation, or reject unsupported side-output plans during planning/offload.
    • Avoid runtime throw new RuntimeException("Not implemented for gluten") for unsupported stream events when the plan can be rejected earlier.
    • Prefer planner-time fail-fast behavior for unsupported events.
  • Add tests and acceptance coverage

    • Java unit test: watermark gauge advancement.
    • Java unit test: watermark status forwarding.
    • Java unit test: one-input and two-input watermark merging.
    • Java unit test: idle input does not block watermark progress.
    • Java unit test: final watermark handling.
    • Native unit test: WatermarkAssigner periodic emission.
    • Native unit test: null watermark expression handling.
    • Native unit test: CombinedWatermarkStatus active/idle behavior.
    • Native unit test: window final flush.
    • E2E SQL test: Kafka/Nexmark idle source.
    • E2E SQL test: tumble, hop, and cumulate windows.
    • E2E SQL test: two-input join.
    • E2E SQL test: bounded source final results.
    • E2E SQL test: watermark progress after checkpoint restore.

Gluten version

None

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or request

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions