diff --git a/docs/content/docs/dev/table/sql-gateway/overview.md b/docs/content/docs/dev/table/sql-gateway/overview.md index 0a40983dbaee7..69208053320af 100644 --- a/docs/content/docs/dev/table/sql-gateway/overview.md +++ b/docs/content/docs/dev/table/sql-gateway/overview.md @@ -250,6 +250,12 @@ $ ./sql-gateway -Dkey=value Duration Keepalive time for an idle worker thread. When the number of workers exceeds min workers, excessive threads are killed after this time interval. + +
sql-gateway.read-only
+ false + Boolean + When enabled, the SQL Gateway operates in read-only mode and will reject all modify operations. +
sql-gateway.worker.threads.max
500 diff --git a/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/config/SqlGatewayServiceConfigOptions.java b/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/config/SqlGatewayServiceConfigOptions.java index c838f436b0f9a..4de82bc30824b 100644 --- a/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/config/SqlGatewayServiceConfigOptions.java +++ b/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/config/SqlGatewayServiceConfigOptions.java @@ -97,4 +97,11 @@ public class SqlGatewayServiceConfigOptions { .withDescription( "Keepalive time for an idle worker thread. When the number of workers exceeds min workers, " + "excessive threads are killed after this time interval."); + + public static final ConfigOption SQL_GATEWAY_READ_ONLY_MODE = + key("sql-gateway.read-only") + .booleanType() + .defaultValue(false) + .withDescription( + "When enabled, the SQL Gateway operates in read-only mode and will reject all modify operations."); } diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java index acf5ac7d77205..d3c6336cacb8c 100644 --- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java +++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java @@ -60,6 +60,7 @@ import org.apache.flink.table.factories.PlannerFactoryUtil; import org.apache.flink.table.functions.FunctionDefinition; import org.apache.flink.table.functions.FunctionIdentifier; +import org.apache.flink.table.gateway.api.config.SqlGatewayServiceConfigOptions; import org.apache.flink.table.gateway.api.operation.OperationHandle; import org.apache.flink.table.gateway.api.results.FunctionInfo; import org.apache.flink.table.gateway.api.results.TableInfo; @@ -681,6 +682,17 @@ private ResultFetcher callModifyOperations( TableEnvironmentInternal tableEnv, OperationHandle handle, List modifyOperations) { + // Check if SQL Gateway is in read-only mode + Configuration configuration = sessionContext.getSessionConf().clone(); + configuration.addAll(executionConfig); + boolean isReadOnlyMode = + configuration.get(SqlGatewayServiceConfigOptions.SQL_GATEWAY_READ_ONLY_MODE); + + if (isReadOnlyMode) { + throw new SqlExecutionException( + "SQL Gateway is in read-only mode. Modify operations are not allowed."); + } + TableResultInternal result = tableEnv.executeInternal(modifyOperations); // DeleteFromFilterOperation doesn't have a JobClient if (modifyOperations.size() == 1 diff --git a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java index d64ed8f35f8c4..99b7123c3c6a1 100644 --- a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java +++ b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java @@ -110,6 +110,7 @@ import static org.apache.flink.table.functions.FunctionKind.AGGREGATE; import static org.apache.flink.table.functions.FunctionKind.OTHER; import static org.apache.flink.table.functions.FunctionKind.SCALAR; +import static org.apache.flink.table.gateway.api.config.SqlGatewayServiceConfigOptions.SQL_GATEWAY_READ_ONLY_MODE; import static org.apache.flink.table.gateway.api.results.ResultSet.ResultType.PAYLOAD; import static org.apache.flink.table.gateway.service.utils.SqlGatewayServiceTestUtil.awaitOperationTermination; import static org.apache.flink.table.gateway.service.utils.SqlGatewayServiceTestUtil.createInitializedSession; @@ -1048,6 +1049,37 @@ void testGetOperationSchemaWhenOperationGetError() throws Exception { .satisfies(anyCauseMatches(SqlGatewayException.class, msg))); } + @Test + void testReadOnlyModeWhenOperationInsertError() { + Configuration config = new Configuration(MINI_CLUSTER.getClientConfiguration()); + config.set(SQL_GATEWAY_READ_ONLY_MODE, true); + + String pipelineName = "test-job"; + config.set(PipelineOptions.NAME, pipelineName); + + SessionHandle sessionHandle = service.openSession(defaultSessionEnvironment); + String sourceDdl = "CREATE TABLE source (a STRING) WITH ('connector'='datagen');"; + String sinkDdl = "CREATE TABLE sink (a STRING) WITH ('connector'='blackhole');"; + String selectSql = "SELECT * FROM source;"; + + service.executeStatement(sessionHandle, sourceDdl, -1, config); + service.executeStatement(sessionHandle, sinkDdl, -1, config); + service.executeStatement(sessionHandle, selectSql, -1, config); + + OperationHandle operationHandle = + service.executeStatement( + sessionHandle, + String.format("INSERT INTO sink '%s';", selectSql), + -1, + config); + + assertThatThrownBy(() -> fetchAllResults(service, sessionHandle, operationHandle)) + .satisfies( + anyCauseMatches( + SqlExecutionException.class, + "SQL Gateway is in read-only mode. Modify operations are not allowed.")); + } + // -------------------------------------------------------------------------------------------- private OperationHandle submitDefaultOperation(