@@ -35,21 +35,21 @@ Flink SQL 目前支持以下物化表操作:
3535
3636```
3737CREATE MATERIALIZED TABLE [catalog_name.][db_name.]table_name
38-
38+
3939[ ([ <table_constraint> ]) ]
40-
40+
4141[COMMENT table_comment]
42-
42+
4343[PARTITIONED BY (partition_column_name1, partition_column_name2, ...)]
44-
44+
4545[WITH (key1=val1, key2=val2, ...)]
46-
46+
4747FRESHNESS = INTERVAL '<num>' { SECOND | MINUTE | HOUR | DAY }
48-
48+
4949[REFRESH_MODE = { CONTINUOUS | FULL }]
50-
50+
5151AS <select_statement>
52-
52+
5353<table_constraint>:
5454 [CONSTRAINT constraint_name] PRIMARY KEY (column_name, ...) NOT ENFORCED
5555```
@@ -69,7 +69,7 @@ AS <select_statement>
6969CREATE MATERIALIZED TABLE my_materialized_table
7070 PARTITIONED BY (ds)
7171 FRESHNESS = INTERVAL ' 1' HOUR
72- AS SELECT
72+ AS SELECT
7373 ds
7474 FROM
7575 ...
@@ -103,9 +103,11 @@ CREATE MATERIALIZED TABLE my_materialized_table
103103
104104` FRESHNESS ` 用于指定物化表的数据新鲜度。
105105
106+ ` FRESHNESS ` 是可选的。如果省略,系统将根据刷新模式使用默认的新鲜度:持续模式使用 ` materialized-table.default-freshness.continuous ` (默认值:3 分钟),全量模式使用 ` materialized-table.default-freshness.full ` (默认值:1 小时)。
107+
106108** 数据新鲜度与刷新模式关系**
107109
108- 数据新鲜度定义了物化表内容滞后于基础表更新的最长时间。它有两个作用,首先通过[ 配置] ({{< ref "docs/dev/table/config" >}}#materialized-table-refresh-mode-freshness-threshold)确定物化表的[ 刷新模式] ({{< ref "docs/dev/table/materialized-table/overview" >}}#刷新模式),然后确定数据刷新频率以满足实际数据新鲜度要求。
110+ 数据新鲜度定义了物化表内容滞后于基础表更新的最长时间。如果未指定,将根据刷新模式使用配置中的默认值。 它有两个作用,首先通过[ 配置] ({{< ref "docs/dev/table/config" >}}#materialized-table-refresh-mode-freshness-threshold)确定物化表的[ 刷新模式] ({{< ref "docs/dev/table/materialized-table/overview" >}}#刷新模式),然后确定数据刷新频率以满足实际数据新鲜度要求。
109111
110112** FRESHNESS 参数详解**
111113
@@ -128,6 +130,22 @@ FRESHNESS = INTERVAL '1' HOUR
128130FRESHNESS = INTERVAL ' 1' DAY
129131```
130132
133+ ** 默认 FRESHNESS 示例:**
134+ (假定 ` materialized-table.default-freshness.continuous ` 为 3 分钟,` materialized-table.default-freshness.full ` 为 1 小时,` materialized-table.refresh-mode.freshness-threshold ` 为 30 分钟)
135+
136+ ``` sql
137+ -- 省略 FRESHNESS,使用持续模式的默认值 3 分钟
138+ -- 对应的刷新管道是一个 checkpoint 间隔为 3 分钟的流处理作业
139+ CREATE MATERIALIZED TABLE my_materialized_table
140+ AS SELECT * FROM source_table;
141+
142+ -- 省略 FRESHNESS 并显式指定全量模式,使用全量模式的默认值 1 小时
143+ -- 对应的刷新管道是一个调度周期为 1 小时的调度工作流
144+ CREATE MATERIALIZED TABLE my_materialized_table_full
145+ REFRESH_MODE = FULL
146+ AS SELECT * FROM source_table;
147+ ```
148+
131149** 不合法的 ` FRESHNESS ` 示例:**
132150
133151``` sql
@@ -147,6 +165,7 @@ FRESHNESS = INTERVAL '5' HOUR
147165```
148166
149167<span class =" label label-danger " >注意</span >
168+ - 如果未指定 FRESHNESS,表将根据刷新模式使用默认的新鲜度间隔:持续模式使用 ` materialized-table.default-freshness.continuous ` (默认值:3 分钟),全量模式使用 ` materialized-table.default-freshness.full ` (默认值:1 小时)。
150169- 尽管物化表数据将尽可能在定义的新鲜度内刷新,但不能保证完全满足新鲜度要求。
151170- 在持续模式下,数据新鲜度和 ` checkpoint ` 间隔一致,设置过短的数据新鲜度可能会对作业性能产生影响。此外,为了优化 ` checkpoint ` 性能,建议[ 开启 Changelog] ({{< ref "docs/ops/state/state_backends" >}}#开启-changelog)。
152171- 在全量模式下,数据新鲜度会转换为 ` cron ` 表达式,因此目前仅支持在预定义时间间隔单位内的新鲜度间隔,这种设计确保了与 ` cron ` 表达式语义的一致性。具体支持以下新鲜度间隔:
@@ -168,14 +187,14 @@ CREATE MATERIALIZED TABLE my_materialized_table
168187 FRESHNESS = INTERVAL ' 1' HOUR
169188 REFRESH_MODE = CONTINUOUS
170189 AS SELECT
171- ...
190+ ...
172191
173192-- 创建的物化表的刷新模式为全量模式,作业的调度周期为 10 分钟。
174193CREATE MATERIALIZED TABLE my_materialized_table
175194 FRESHNESS = INTERVAL ' 10' MINUTE
176195 REFRESH_MODE = FULL
177196 AS SELECT
178- ...
197+ ...
179198```
180199
181200## AS <select_statement>
@@ -204,22 +223,22 @@ CREATE MATERIALIZED TABLE my_materialized_table_continuous
204223 ' partition.fields.ds.date-formatter' = ' yyyy-MM-dd'
205224 )
206225 FRESHNESS = INTERVAL ' 10' SECOND
207- AS
208- SELECT
226+ AS
227+ SELECT
209228 k .ds ,
210229 k .user_id ,
211230 COUNT (* ) AS event_count,
212231 SUM (k .amount ) AS total_amount,
213232 MAX (u .age ) AS max_age
214- FROM
233+ FROM
215234 kafka_catalog .db1 .kafka_table k
216- JOIN
235+ JOIN
217236 user_catalog .db1 .user_table u
218- ON
237+ ON
219238 k .user_id = u .user_id
220- WHERE
239+ WHERE
221240 k .event_type = ' purchase'
222- GROUP BY
241+ GROUP BY
223242 k .ds , k .user_id
224243```
225244
@@ -233,22 +252,22 @@ CREATE MATERIALIZED TABLE my_materialized_table_full
233252 ' partition.fields.ds.date-formatter' = ' yyyy-MM-dd'
234253 )
235254 FRESHNESS = INTERVAL ' 1' HOUR
236- AS
237- SELECT
255+ AS
256+ SELECT
238257 p .ds ,
239258 p .product_id ,
240259 p .product_name ,
241260 AVG (s .sale_price ) AS avg_sale_price,
242261 SUM (s .quantity ) AS total_quantity
243- FROM
262+ FROM
244263 paimon_catalog .db1 .product_table p
245- LEFT JOIN
264+ LEFT JOIN
246265 paimon_catalog .db1 .sales_table s
247- ON
266+ ON
248267 p .product_id = s .product_id
249- WHERE
268+ WHERE
250269 p .category = ' electronics'
251- GROUP BY
270+ GROUP BY
252271 p .ds , p .product_id , p .product_name
253272```
254273
@@ -276,7 +295,7 @@ ALTER MATERIALIZED TABLE [catalog_name.][db_name.]table_name SUSPEND
276295
277296` SUSPEND ` 用于暂停物化表的后台刷新管道。
278297
279- ** 示例:**
298+ ** 示例:**
280299
281300``` sql
282301-- 暂停前指定 SAVEPOINT 路径
@@ -297,7 +316,7 @@ ALTER MATERIALIZED TABLE [catalog_name.][db_name.]table_name RESUME [WITH (key1=
297316
298317` RESUME ` 用于恢复物化表的刷新管道。在恢复时,可以通过 ` WITH ` 子句动态指定物化表的参数,该参数仅对当前恢复的刷新管道生效,并不会持久化到物化表中。
299318
300- ** 示例:**
319+ ** 示例:**
301320
302321``` sql
303322-- 恢复指定的物化表
@@ -358,21 +377,21 @@ ALTER MATERIALIZED TABLE [catalog_name.][db_name.]table_name AS <select_statemen
358377-- 原始物化表定义
359378CREATE MATERIALIZED TABLE my_materialized_table
360379 FRESHNESS = INTERVAL ' 10' SECOND
361- AS
362- SELECT
380+ AS
381+ SELECT
363382 user_id,
364383 COUNT (* ) AS event_count,
365384 SUM (amount) AS total_amount
366- FROM
385+ FROM
367386 kafka_catalog .db1 .events
368- WHERE
387+ WHERE
369388 event_type = ' purchase'
370- GROUP BY
389+ GROUP BY
371390 user_id;
372391
373392-- 修改现有物化表的查询
374393ALTER MATERIALIZED TABLE my_materialized_table
375- AS SELECT
394+ AS SELECT
376395 user_id,
377396 COUNT (* ) AS event_count,
378397 SUM (amount) AS total_amount,
@@ -403,7 +422,3 @@ DROP MATERIALIZED TABLE [IF EXISTS] [catalog_name.][database_name.]table_name
403422-- 删除指定的物化表
404423DROP MATERIALIZED TABLE IF EXISTS my_materialized_table;
405424```
406-
407-
408-
409-
0 commit comments