Skip to content

Conversation

@raminqaf
Copy link
Contributor

What is the purpose of the change

This pull request makes the FRESHNESS clause optional when creating materialized tables. When freshness is not specified, the system uses configurable defaults based on the refresh mode: materialized-table.default-freshness.continuous (default: 3 minutes) for CONTINUOUS mode, or materialized-table.default-freshness.full (default: 1 hour) for FULL mode. This provides users with more flexibility and allows catalogs to implement custom freshness and refresh mode determination logic.

Brief change log

  • Introduced MaterializedTableEnricher interface for pluggable freshness and refresh mode resolution logic
  • Implemented DefaultMaterializedTableEnricher with threshold-based refresh mode determination
  • Added two configuration options:
    • materialized-table.default-freshness.continuous (default: 3 minutes)
    • materialized-table.default-freshness.full (default: 1 hour)
  • Made FRESHNESS clause optional in CREATE MATERIALIZED TABLE syntax
  • Updated SqlCreateMaterializedTableConverter to use the enricher for determining freshness and refresh mode
  • Refactored refresh mode determination from being computed once in converter to being computed on-demand via interface default methods
  • Updated documentation to reflect optional freshness and mode-specific defaults
  • Maintained backward compatibility with existing catalog implementations

Verifying this change

This change is already covered by existing tests, such as:

  • SqlMaterializedTableNodeToOperationConverterTest.testFullRefreshMode() - validates refresh mode determination
  • SqlMaterializedTableNodeToOperationConverterTest.testContinuousRefreshMode() - validates continuous mode behavior
  • Existing materialized table integration tests verify end-to-end functionality

Additionally, this change maintains backward compatibility with existing materialized table implementations through default interface methods in CatalogMaterializedTable.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @Public(Evolving): yes (CatalogMaterializedTable interface)
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): no
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
  • The S3 file system connector: no

Documentation

  • Does this pull request introduce a new feature? yes
  • If yes, how is the feature documented? docs / JavaDocs
    • Updated materialized table documentation (overview.md and statements.md)
    • Added JavaDocs for new interfaces and classes
    • Updated configuration documentation to include new options

@flinkbot
Copy link
Collaborator

flinkbot commented Oct 20, 2025

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

@raminqaf raminqaf changed the title [FLINK-38532][table][FLIP-551] Make FRESHNESS Optional for Materialized Tables [FLINK-38532][table][FLIP-551] Make FRESHNESS Optional for Materialized Tables Oct 20, 2025
## Data Freshness

Data freshness defines the maximum amount of time that the materialized table’s content should lag behind updates to the base tables. Freshness is not a guarantee. Instead, it is a target that Flink attempts to meet. Data in materialized table is refreshed as closely as possible within the freshness.
Data freshness defines the maximum amount of time that the materialized table's content should lag behind updates to the base tables. Freshness is not a guarantee. Instead, it is a target that Flink attempts to meet. Data in materialized table is refreshed as closely as possible within the freshness.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also Chinese doc should be updated

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nits in the words (it is up to you if you want to address)

  • Data in materialized table -> The data in materialized table
  • within the freshness.-> within the freshness target.

Copy link
Contributor

@davidradl davidradl Oct 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if it would be helpful to relate freshness to checkpointing. The code picks up a freshness value from CheckpointingOptions.CHECKPOINTING_INTERVAL, but this is not referenced in the docs - I suggest the user should be made aware of how this config value effects freshness.

* non-null values for both properties.
*/
@Experimental
public class EnrichmentResult {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure it is the right name or package...
it is inside org.apache.flink.table.catalog, however it is related to only some specific use case of Materialized Tables

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will rename this to MaterializedTableEnrichmentResult (will update the FLIP accordingly). The reason I chose this package was that in the same package we declare the FreshnessInterval as well.

Comment on lines 51 to 61
validateCronConstraints(intervalFreshness, SECOND_CRON_UPPER_BOUND);
break;
case MINUTE:
validateCronConstraints(intervalFreshness, MINUTE_CRON_UPPER_BOUND);
break;
case HOUR:
validateCronConstraints(intervalFreshness, HOUR_CRON_UPPER_BOUND);
break;
case DAY:
validateDayConstraints(intervalFreshness);
break;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder: what is the reason of having all these validations for interval freshness spreaded between this and IntervalFreshness?
Can we have then in one place?

Copy link
Contributor Author

@raminqaf raminqaf Oct 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the reason this validation is here, its because it is used in the internal API. I can move this to the IntervalFreshness class and get rid of the IntervalFreshnessUtils entirely. WDYT?

@raminqaf raminqaf force-pushed the FLINK-38532 branch 3 times, most recently from ab10ec2 to e3be129 Compare October 20, 2025 10:33
@github-actions github-actions bot added the community-reviewed PR has been reviewed by the community. label Oct 20, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

community-reviewed PR has been reviewed by the community.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants