Skip to content

Commit 3b35f2f

Browse files
committed
[FLINK-38532] Move validation logic to IntervalFreshness and remove IntervalFreshnessUtils
1 parent 622d562 commit 3b35f2f

File tree

12 files changed

+190
-176
lines changed

12 files changed

+190
-176
lines changed

docs/content/docs/dev/table/materialized-table/overview.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ Materialized Table encompass the following core concepts: Data Freshness, Refres
3434

3535
## Data Freshness
3636

37-
Data freshness defines the maximum amount of time that the materialized table's content should lag behind updates to the base tables. Freshness is not a guarantee. Instead, it is a target that Flink attempts to meet. Data in materialized table is refreshed as closely as possible within the freshness.
37+
Data freshness defines the maximum amount of time that the materialized table's content should lag behind updates to the base tables. Freshness is not a guarantee. Instead, it is a target that Flink attempts to meet. The data in materialized table is refreshed as closely as possible within the freshness target.
3838

3939
Data freshness is optional when creating a materialized table. If not specified, the system uses the default freshness based on the refresh mode: [materialized-table.default-freshness.continuous]({{< ref "docs/dev/table/config" >}}#materialized-table-default-freshness-continuous) (default: 3 minutes) for CONTINUOUS mode, or [materialized-table.default-freshness.full]({{< ref "docs/dev/table/config" >}}#materialized-table-default-freshness-full) (default: 1 hour) for FULL mode.
4040

flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,12 +109,12 @@
109109
import static org.apache.flink.table.api.config.MaterializedTableConfigOptions.SCHEDULE_TIME_DATE_FORMATTER_DEFAULT;
110110
import static org.apache.flink.table.api.internal.TableResultInternal.TABLE_RESULT_OK;
111111
import static org.apache.flink.table.catalog.CatalogBaseTable.TableKind.MATERIALIZED_TABLE;
112+
import static org.apache.flink.table.catalog.IntervalFreshness.convertFreshnessToCron;
112113
import static org.apache.flink.table.factories.WorkflowSchedulerFactoryUtil.WORKFLOW_SCHEDULER_PREFIX;
113114
import static org.apache.flink.table.gateway.api.endpoint.SqlGatewayEndpointFactoryUtils.getEndpointConfig;
114115
import static org.apache.flink.table.gateway.service.utils.Constants.CLUSTER_INFO;
115116
import static org.apache.flink.table.gateway.service.utils.Constants.JOB_ID;
116117
import static org.apache.flink.table.utils.DateTimeUtils.formatTimestampStringWithOffset;
117-
import static org.apache.flink.table.utils.IntervalFreshnessUtils.convertFreshnessToCron;
118118

119119
/** Manager is responsible for execute the {@link MaterializedTableOperation}. */
120120
@Internal

flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1908,7 +1908,8 @@ public ResolvedCatalogMaterializedTable resolveCatalogMaterializedTable(
19081908

19091909
final ResolvedSchema resolvedSchema = table.getUnresolvedSchema().resolve(schemaResolver);
19101910

1911-
final EnrichmentResult enrichmentResult = this.materializedTableEnricher.enrich(table);
1911+
final MaterializedTableEnrichmentResult enrichmentResult =
1912+
this.materializedTableEnricher.enrich(table);
19121913
IntervalFreshness freshness = enrichmentResult.getFreshness();
19131914
RefreshMode resolvedRefreshMode = enrichmentResult.getRefreshMode();
19141915

flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/DefaultMaterializedTableEnricher.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import org.apache.flink.annotation.Internal;
2222
import org.apache.flink.table.catalog.CatalogMaterializedTable.LogicalRefreshMode;
2323
import org.apache.flink.table.catalog.CatalogMaterializedTable.RefreshMode;
24-
import org.apache.flink.table.utils.IntervalFreshnessUtils;
2524

2625
import java.time.Duration;
2726

@@ -60,7 +59,7 @@ private DefaultMaterializedTableEnricher(
6059
}
6160

6261
@Override
63-
public EnrichmentResult enrich(final CatalogMaterializedTable table) {
62+
public MaterializedTableEnrichmentResult enrich(final CatalogMaterializedTable table) {
6463
// Determine the final freshness value
6564
final IntervalFreshness finalFreshness = deriveFreshness(table);
6665

@@ -69,7 +68,7 @@ public EnrichmentResult enrich(final CatalogMaterializedTable table) {
6968
deriveRefreshMode(
7069
table.getLogicalRefreshMode(), finalFreshness, freshnessThreshold);
7170

72-
return new EnrichmentResult(finalFreshness, finalRefreshMode);
71+
return new MaterializedTableEnrichmentResult(finalFreshness, finalRefreshMode);
7372
}
7473

7574
/**
@@ -110,7 +109,7 @@ public RefreshMode deriveRefreshMode(
110109
}
111110

112111
// Validate that freshness can be converted to cron for FULL mode
113-
IntervalFreshnessUtils.validateFreshnessForCron(freshness);
112+
IntervalFreshness.validateFreshnessForCron(freshness);
114113
return RefreshMode.FULL;
115114
}
116115
}

flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/IntervalFreshness.java

Lines changed: 128 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,14 @@
3434
@PublicEvolving
3535
public class IntervalFreshness {
3636

37+
private static final String SECOND_CRON_EXPRESSION_TEMPLATE = "0/%s * * * * ? *";
38+
private static final String MINUTE_CRON_EXPRESSION_TEMPLATE = "0 0/%s * * * ? *";
39+
private static final String HOUR_CRON_EXPRESSION_TEMPLATE = "0 0 0/%s * * ? *";
40+
private static final String ONE_DAY_CRON_EXPRESSION_TEMPLATE = "0 0 0 * * ? *";
41+
private static final long SECOND_CRON_UPPER_BOUND = 60;
42+
private static final long MINUTE_CRON_UPPER_BOUND = 60;
43+
private static final long HOUR_CRON_UPPER_BOUND = 24;
44+
3745
private final int interval;
3846
private final TimeUnit timeUnit;
3947

@@ -48,18 +56,22 @@ public static IntervalFreshness of(String interval, TimeUnit timeUnit) {
4856
}
4957

5058
private static int validateIntervalInput(final String interval) {
51-
final String errorMessage =
52-
String.format(
53-
"The freshness interval currently only supports positive integer type values. But was: %s",
54-
interval);
5559
final int parsedInt;
5660
try {
5761
parsedInt = Integer.parseInt(interval);
5862
} catch (Exception e) {
63+
final String errorMessage =
64+
String.format(
65+
"The freshness interval currently only supports positive integer type values. But was: %s",
66+
interval);
5967
throw new ValidationException(errorMessage, e);
6068
}
6169

6270
if (parsedInt <= 0) {
71+
final String errorMessage =
72+
String.format(
73+
"The freshness interval currently only supports positive integer type values. But was: %s",
74+
interval);
6375
throw new ValidationException(errorMessage);
6476
}
6577
return parsedInt;
@@ -81,6 +93,118 @@ public static IntervalFreshness ofDay(String interval) {
8193
return IntervalFreshness.of(interval, TimeUnit.DAY);
8294
}
8395

96+
/**
97+
* Validates that the given freshness can be converted to a cron expression in full refresh
98+
* mode. Since freshness and cron expression cannot be converted equivalently, there are
99+
* currently only a limited patterns of freshness that are supported.
100+
*
101+
* @param intervalFreshness the freshness to validate
102+
* @throws ValidationException if the freshness cannot be converted to a valid cron expression
103+
*/
104+
public static void validateFreshnessForCron(IntervalFreshness intervalFreshness) {
105+
switch (intervalFreshness.getTimeUnit()) {
106+
case SECOND:
107+
validateCronConstraints(intervalFreshness, SECOND_CRON_UPPER_BOUND);
108+
break;
109+
case MINUTE:
110+
validateCronConstraints(intervalFreshness, MINUTE_CRON_UPPER_BOUND);
111+
break;
112+
case HOUR:
113+
validateCronConstraints(intervalFreshness, HOUR_CRON_UPPER_BOUND);
114+
break;
115+
case DAY:
116+
validateDayConstraints(intervalFreshness);
117+
break;
118+
default:
119+
throw new ValidationException(
120+
String.format(
121+
"Unknown freshness time unit: %s.",
122+
intervalFreshness.getTimeUnit()));
123+
}
124+
}
125+
126+
/**
127+
* Converts the freshness of materialized table to cron expression in full refresh mode. The
128+
* freshness must first pass validation via {@link #validateFreshnessForCron}.
129+
*
130+
* @param intervalFreshness the freshness to convert
131+
* @return the corresponding cron expression
132+
* @throws ValidationException if the freshness cannot be converted to a valid cron expression
133+
*/
134+
public static String convertFreshnessToCron(IntervalFreshness intervalFreshness) {
135+
// First validate that conversion is possible
136+
validateFreshnessForCron(intervalFreshness);
137+
138+
// Then perform the conversion
139+
switch (intervalFreshness.getTimeUnit()) {
140+
case SECOND:
141+
return String.format(
142+
SECOND_CRON_EXPRESSION_TEMPLATE, intervalFreshness.getIntervalInt());
143+
case MINUTE:
144+
return String.format(
145+
MINUTE_CRON_EXPRESSION_TEMPLATE, intervalFreshness.getIntervalInt());
146+
case HOUR:
147+
return String.format(
148+
HOUR_CRON_EXPRESSION_TEMPLATE, intervalFreshness.getIntervalInt());
149+
case DAY:
150+
return ONE_DAY_CRON_EXPRESSION_TEMPLATE;
151+
default:
152+
throw new ValidationException(
153+
String.format(
154+
"Unknown freshness time unit: %s.",
155+
intervalFreshness.getTimeUnit()));
156+
}
157+
}
158+
159+
private static void validateCronConstraints(
160+
IntervalFreshness intervalFreshness, long cronUpperBound) {
161+
int interval = intervalFreshness.getIntervalInt();
162+
TimeUnit timeUnit = intervalFreshness.getTimeUnit();
163+
// Freshness must be less than cronUpperBound for corresponding time unit when convert it
164+
// to cron expression
165+
if (interval >= cronUpperBound) {
166+
throw new ValidationException(
167+
String.format(
168+
"In full refresh mode, freshness must be less than %s when the time unit is %s.",
169+
cronUpperBound, timeUnit));
170+
}
171+
// Freshness must be factors of cronUpperBound for corresponding time unit
172+
if (cronUpperBound % interval != 0) {
173+
throw new ValidationException(
174+
String.format(
175+
"In full refresh mode, only freshness that are factors of %s are currently supported when the time unit is %s.",
176+
cronUpperBound, timeUnit));
177+
}
178+
}
179+
180+
private static void validateDayConstraints(IntervalFreshness intervalFreshness) {
181+
// Since the number of days in each month is different, only one day of freshness is
182+
// currently supported when the time unit is DAY
183+
int interval = intervalFreshness.getIntervalInt();
184+
if (interval > 1) {
185+
throw new ValidationException(
186+
"In full refresh mode, freshness must be 1 when the time unit is DAY.");
187+
}
188+
}
189+
190+
/**
191+
* Creates an IntervalFreshness from a Duration, choosing the most appropriate time unit.
192+
* Prefers larger units when possible (e.g., 60 seconds → 1 minute).
193+
*/
194+
public static IntervalFreshness fromDuration(Duration duration) {
195+
if (duration.equals(duration.truncatedTo(ChronoUnit.DAYS))) {
196+
return new IntervalFreshness((int) duration.toDays(), TimeUnit.DAY);
197+
}
198+
if (duration.equals(duration.truncatedTo(ChronoUnit.HOURS))) {
199+
return new IntervalFreshness((int) duration.toHours(), TimeUnit.HOUR);
200+
}
201+
if (duration.equals(duration.truncatedTo(ChronoUnit.MINUTES))) {
202+
return new IntervalFreshness((int) duration.toMinutes(), TimeUnit.MINUTE);
203+
}
204+
205+
return new IntervalFreshness((int) duration.getSeconds(), TimeUnit.SECOND);
206+
}
207+
84208
/**
85209
* @deprecated Use {@link #getIntervalInt()} instead.
86210
*/
@@ -112,24 +236,6 @@ public Duration toDuration() {
112236
}
113237
}
114238

115-
/**
116-
* Creates an IntervalFreshness from a Duration, choosing the most appropriate time unit.
117-
* Prefers larger units when possible (e.g., 60 seconds → 1 minute).
118-
*/
119-
public static IntervalFreshness fromDuration(Duration duration) {
120-
if (duration.equals(duration.truncatedTo(ChronoUnit.DAYS))) {
121-
return IntervalFreshness.ofDay(String.valueOf(duration.toDays()));
122-
}
123-
if (duration.equals(duration.truncatedTo(ChronoUnit.HOURS))) {
124-
return IntervalFreshness.ofHour(String.valueOf(duration.toHours()));
125-
}
126-
if (duration.equals(duration.truncatedTo(ChronoUnit.MINUTES))) {
127-
return IntervalFreshness.ofMinute(String.valueOf(duration.toMinutes()));
128-
}
129-
130-
return IntervalFreshness.ofSecond(String.valueOf(duration.getSeconds()));
131-
}
132-
133239
@Override
134240
public boolean equals(Object o) {
135241
if (this == o) {

flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/MaterializedTableEnricher.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,5 +43,5 @@ public interface MaterializedTableEnricher {
4343
* freshness
4444
* @return the enrichment result with resolved, non-null freshness and refresh mode
4545
*/
46-
EnrichmentResult enrich(CatalogMaterializedTable catalogMaterializedTable);
46+
MaterializedTableEnrichmentResult enrich(CatalogMaterializedTable catalogMaterializedTable);
4747
}

flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/EnrichmentResult.java renamed to flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/MaterializedTableEnrichmentResult.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,12 +29,13 @@
2929
* non-null values for both properties.
3030
*/
3131
@Experimental
32-
public class EnrichmentResult {
32+
public class MaterializedTableEnrichmentResult {
3333

3434
private final IntervalFreshness freshness;
3535
private final RefreshMode refreshMode;
3636

37-
public EnrichmentResult(final IntervalFreshness freshness, final RefreshMode refreshMode) {
37+
public MaterializedTableEnrichmentResult(
38+
final IntervalFreshness freshness, final RefreshMode refreshMode) {
3839
this.freshness = freshness;
3940
this.refreshMode = refreshMode;
4041
}

0 commit comments

Comments
 (0)