Skip to content

Commit ea289f3

Browse files
committed
[server] Manager users for sasl/plain authentication via cluster properties.
1 parent 183685c commit ea289f3

16 files changed

Lines changed: 696 additions & 16 deletions

File tree

fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -476,6 +476,16 @@ public class ConfigOptions {
476476
+ "Each listener can be associated with a specific authentication protocol. "
477477
+ "Listeners not included in the map will use PLAINTEXT by default, which does not require authentication.");
478478

479+
public static final ConfigOption<List<String>> SERVER_SASL_USERS =
480+
key("security.sasl.users")
481+
.stringType()
482+
.asList()
483+
.noDefaultValue()
484+
.withDescription(
485+
"List of user credentials for SASL/PLAIN authentication in 'username:password' format. "
486+
+ "For example: 'admin:admin-secret,bob:bob-secret'. "
487+
+ "This is syntactic sugar that auto-generates the JAAS config string.");
488+
479489
public static final ConfigOption<Integer> TABLET_SERVER_ID =
480490
key("tablet-server.id")
481491
.intType()

fluss-common/src/main/java/org/apache/fluss/security/auth/AuthenticationFactory.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,8 @@ public static Supplier<ClientAuthenticator> loadClientAuthenticatorSupplier(
8383
* authenticators.
8484
*/
8585
public static Map<String, Supplier<ServerAuthenticator>> loadServerAuthenticatorSuppliers(
86-
Configuration configuration) {
86+
Supplier<Configuration> configurationSupplier) {
87+
Configuration configuration = configurationSupplier.get();
8788
PluginManager pluginManager = PluginUtils.createPluginManagerFromRootFolder(configuration);
8889
Map<String, Supplier<ServerAuthenticator>> serverAuthenticators = new HashMap<>();
8990
Map<String, String> protocolMap =
@@ -98,7 +99,9 @@ public static Map<String, Supplier<ServerAuthenticator>> loadServerAuthenticator
9899

99100
serverAuthenticators.put(
100101
protocolEntry.getKey(),
101-
() -> serverAuthenticatorPlugin.createServerAuthenticator(configuration));
102+
() ->
103+
serverAuthenticatorPlugin.createServerAuthenticator(
104+
configurationSupplier.get()));
102105
}
103106
return serverAuthenticators;
104107
}

fluss-common/src/test/java/org/apache/fluss/security/auth/AuthenticationFactoryTest.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,9 @@ void testConflictingAuthenticationPlugin() {
4949
.isExactlyInstanceOf(ValidationException.class)
5050
.hasMessageContaining(errorMsg);
5151
assertThatThrownBy(
52-
() -> AuthenticationFactory.loadServerAuthenticatorSuppliers(configuration))
52+
() ->
53+
AuthenticationFactory.loadServerAuthenticatorSuppliers(
54+
() -> configuration))
5355
.isExactlyInstanceOf(ValidationException.class)
5456
.hasMessageContaining(errorMsg);
5557
}
@@ -66,7 +68,9 @@ void testNoAuthenticationPlugin() {
6668
.isExactlyInstanceOf(ValidationException.class)
6769
.hasMessageContaining(errorMsg);
6870
assertThatThrownBy(
69-
() -> AuthenticationFactory.loadServerAuthenticatorSuppliers(configuration))
71+
() ->
72+
AuthenticationFactory.loadServerAuthenticatorSuppliers(
73+
() -> configuration))
7074
.isExactlyInstanceOf(ValidationException.class)
7175
.hasMessageContaining(errorMsg);
7276
}
@@ -79,7 +83,7 @@ void testIdentifierCaseInsensitive() {
7983
assertThat(AuthenticationFactory.loadClientAuthenticatorSupplier(configuration).get())
8084
.isInstanceOf(TestIdentifierClientAuthenticator.class);
8185
assertThat(
82-
AuthenticationFactory.loadServerAuthenticatorSuppliers(configuration)
86+
AuthenticationFactory.loadServerAuthenticatorSuppliers(() -> configuration)
8387
.values()
8488
.stream()
8589
.findAny()
@@ -93,7 +97,7 @@ void testIdentifierCaseInsensitive() {
9397
assertThat(AuthenticationFactory.loadClientAuthenticatorSupplier(configuration2).get())
9498
.isInstanceOf(TestIdentifierClientAuthenticator.class);
9599
assertThat(
96-
AuthenticationFactory.loadServerAuthenticatorSuppliers(configuration)
100+
AuthenticationFactory.loadServerAuthenticatorSuppliers(() -> configuration)
97101
.values()
98102
.stream()
99103
.findAny()
Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.flink.procedure;
19+
20+
import org.apache.fluss.config.cluster.AlterConfig;
21+
import org.apache.fluss.config.cluster.AlterConfigOpType;
22+
23+
import org.apache.flink.table.annotation.ArgumentHint;
24+
import org.apache.flink.table.annotation.DataTypeHint;
25+
import org.apache.flink.table.annotation.ProcedureHint;
26+
import org.apache.flink.table.procedure.ProcedureContext;
27+
28+
import java.util.ArrayList;
29+
import java.util.List;
30+
31+
/**
32+
* Procedure to append values to list-type cluster configurations dynamically.
33+
*
34+
* <p>This procedure appends new values to existing list-type configurations. The APPEND operation
35+
* only works on configurations defined as list types (e.g., {@code security.sasl.users}). The
36+
* changes are:
37+
*
38+
* <ul>
39+
* <li>Validated by the CoordinatorServer before persistence
40+
* <li>Persisted in ZooKeeper for durability
41+
* <li>Applied to all relevant servers (Coordinator and TabletServers)
42+
* <li>Survive server restarts
43+
* </ul>
44+
*
45+
* <p>Usage examples:
46+
*
47+
* <pre>
48+
* -- Append a user to the SASL user list
49+
* CALL sys.append_cluster_configs('security.sasl.users', 'bob:bob-secret');
50+
*
51+
* -- Append multiple key-value pairs at one time
52+
* CALL sys.append_cluster_configs('security.sasl.users', 'bob:bob-secret', 'security.sasl.users', 'alice:alice-secret');
53+
* </pre>
54+
*
55+
* <p><b>Note:</b> APPEND operations are only supported for list-type configuration keys. The server
56+
* will reject the change if the configuration key is not a list type.
57+
*/
58+
public class AppendClusterConfigsProcedure extends ProcedureBase {
59+
60+
@ProcedureHint(
61+
argument = {@ArgumentHint(name = "config_pairs", type = @DataTypeHint("STRING"))},
62+
isVarArgs = true)
63+
public String[] call(ProcedureContext context, String... configPairs) throws Exception {
64+
try {
65+
if (configPairs.length == 0) {
66+
throw new IllegalArgumentException(
67+
"config_pairs cannot be null or empty. "
68+
+ "Please specify valid configuration pairs.");
69+
}
70+
71+
if (configPairs.length % 2 != 0) {
72+
throw new IllegalArgumentException(
73+
"config_pairs must be set in pairs. "
74+
+ "Please specify valid configuration pairs.");
75+
}
76+
77+
List<AlterConfig> configList = new ArrayList<>();
78+
List<String> resultMessage = new ArrayList<>();
79+
80+
for (int i = 0; i < configPairs.length; i += 2) {
81+
String configKey = configPairs[i].trim();
82+
if (configKey.isEmpty()) {
83+
throw new IllegalArgumentException(
84+
"Config key cannot be null or empty. "
85+
+ "Please specify a valid configuration key.");
86+
}
87+
String configValue = configPairs[i + 1];
88+
89+
AlterConfig alterConfig =
90+
new AlterConfig(configKey, configValue, AlterConfigOpType.APPEND);
91+
configList.add(alterConfig);
92+
resultMessage.add(
93+
String.format(
94+
"Successfully appended '%s' to configuration '%s'. ",
95+
configValue, configKey));
96+
}
97+
98+
admin.alterClusterConfigs(configList).get();
99+
100+
return resultMessage.toArray(new String[0]);
101+
} catch (IllegalArgumentException e) {
102+
throw e;
103+
} catch (Exception e) {
104+
throw new RuntimeException(
105+
String.format("Failed to append cluster config: %s", e.getMessage()), e);
106+
}
107+
}
108+
}

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/ProcedureManager.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,9 @@ private enum ProcedureEnum {
7373
SET_CLUSTER_CONFIGS("sys.set_cluster_configs", SetClusterConfigsProcedure.class),
7474
GET_CLUSTER_CONFIGS("sys.get_cluster_configs", GetClusterConfigsProcedure.class),
7575
RESET_CLUSTER_CONFIGS("sys.reset_cluster_configs", ResetClusterConfigsProcedure.class),
76+
APPEND_CLUSTER_CONFIGS("sys.append_cluster_configs", AppendClusterConfigsProcedure.class),
77+
SUBTRACT_CLUSTER_CONFIGS(
78+
"sys.subtract_cluster_configs", SubtractClusterConfigsProcedure.class),
7679
ADD_SERVER_TAG("sys.add_server_tag", AddServerTagProcedure.class),
7780
REMOVE_SERVER_TAG("sys.remove_server_tag", RemoveServerTagProcedure.class),
7881
REBALANCE("sys.rebalance", RebalanceProcedure.class),
Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.flink.procedure;
19+
20+
import org.apache.fluss.config.cluster.AlterConfig;
21+
import org.apache.fluss.config.cluster.AlterConfigOpType;
22+
23+
import org.apache.flink.table.annotation.ArgumentHint;
24+
import org.apache.flink.table.annotation.DataTypeHint;
25+
import org.apache.flink.table.annotation.ProcedureHint;
26+
import org.apache.flink.table.procedure.ProcedureContext;
27+
28+
import java.util.ArrayList;
29+
import java.util.List;
30+
31+
/**
32+
* Procedure to subtract (remove) values from list-type cluster configurations dynamically.
33+
*
34+
* <p>This procedure removes specific values from existing list-type configurations. The SUBTRACT
35+
* operation only works on configurations defined as list types (e.g., {@code security.sasl.users}).
36+
* If the list becomes empty after subtraction, the configuration key is removed entirely. The
37+
* changes are:
38+
*
39+
* <ul>
40+
* <li>Validated by the CoordinatorServer before persistence
41+
* <li>Persisted in ZooKeeper for durability
42+
* <li>Applied to all relevant servers (Coordinator and TabletServers)
43+
* <li>Survive server restarts
44+
* </ul>
45+
*
46+
* <p>Usage examples:
47+
*
48+
* <pre>
49+
* -- Remove a user from the SASL user list
50+
* CALL sys.subtract_cluster_configs('security.sasl.users', 'bob:bob-secret');
51+
*
52+
* -- Remove multiple key-value pairs at one time
53+
* CALL sys.subtract_cluster_configs('security.sasl.users', 'bob:bob-secret', 'security.sasl.users', 'alice:alice-secret');
54+
* </pre>
55+
*
56+
* <p><b>Note:</b> SUBTRACT operations are only supported for list-type configuration keys. The
57+
* server will reject the change if the configuration key is not a list type. Subtracting a value
58+
* that does not exist in the list is a no-op.
59+
*/
60+
public class SubtractClusterConfigsProcedure extends ProcedureBase {
61+
62+
@ProcedureHint(
63+
argument = {@ArgumentHint(name = "config_pairs", type = @DataTypeHint("STRING"))},
64+
isVarArgs = true)
65+
public String[] call(ProcedureContext context, String... configPairs) throws Exception {
66+
try {
67+
if (configPairs.length == 0) {
68+
throw new IllegalArgumentException(
69+
"config_pairs cannot be null or empty. "
70+
+ "Please specify valid configuration pairs.");
71+
}
72+
73+
if (configPairs.length % 2 != 0) {
74+
throw new IllegalArgumentException(
75+
"config_pairs must be set in pairs. "
76+
+ "Please specify valid configuration pairs.");
77+
}
78+
79+
List<AlterConfig> configList = new ArrayList<>();
80+
List<String> resultMessage = new ArrayList<>();
81+
82+
for (int i = 0; i < configPairs.length; i += 2) {
83+
String configKey = configPairs[i].trim();
84+
if (configKey.isEmpty()) {
85+
throw new IllegalArgumentException(
86+
"Config key cannot be null or empty. "
87+
+ "Please specify a valid configuration key.");
88+
}
89+
String configValue = configPairs[i + 1];
90+
91+
AlterConfig alterConfig =
92+
new AlterConfig(configKey, configValue, AlterConfigOpType.SUBTRACT);
93+
configList.add(alterConfig);
94+
resultMessage.add(
95+
String.format(
96+
"Successfully subtracted '%s' from configuration '%s'. ",
97+
configValue, configKey));
98+
}
99+
100+
admin.alterClusterConfigs(configList).get();
101+
102+
return resultMessage.toArray(new String[0]);
103+
} catch (IllegalArgumentException e) {
104+
throw e;
105+
} catch (Exception e) {
106+
throw new RuntimeException(
107+
String.format("Failed to subtract cluster config: %s", e.getMessage()), e);
108+
}
109+
}
110+
}

0 commit comments

Comments
 (0)