Skip to content

Commit 66cbe14

Browse files
committed
fix bug
1 parent 49b6d6e commit 66cbe14

9 files changed

Lines changed: 171 additions & 50 deletions

File tree

fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -499,7 +499,7 @@ public class ConfigOptions {
499499
+ "Listeners not included in the map will use PLAINTEXT by default, which does not require authentication.");
500500

501501
public static final ConfigOption<List<String>> SERVER_SASL_USERS =
502-
key("security.sasl.users")
502+
key("security.sasl.plain.users")
503503
.stringType()
504504
.asList()
505505
.noDefaultValue()

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/AppendClusterConfigsProcedure.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@
3232
* Procedure to append values to list-type cluster configurations dynamically.
3333
*
3434
* <p>This procedure appends new values to existing list-type configurations. The APPEND operation
35-
* only works on configurations defined as list types (e.g., {@code security.sasl.users}). The
35+
* only works on configurations defined as list types (e.g., {@code security.sasl.plain.users}). The
3636
* changes are:
3737
*
3838
* <ul>
@@ -46,10 +46,10 @@
4646
*
4747
* <pre>
4848
* -- Append a user to the SASL user list
49-
* CALL sys.append_cluster_configs('security.sasl.users', 'bob:bob-secret');
49+
* CALL sys.append_cluster_configs('security.sasl.plain.users', 'bob:bob-secret');
5050
*
5151
* -- Append multiple key-value pairs at one time
52-
* CALL sys.append_cluster_configs('security.sasl.users', 'bob:bob-secret', 'security.sasl.users', 'alice:alice-secret');
52+
* CALL sys.append_cluster_configs('security.sasl.plain.users', 'bob:bob-secret', 'security.sasl.plain.users', 'alice:alice-secret');
5353
* </pre>
5454
*
5555
* <p><b>Note:</b> APPEND operations are only supported for list-type configuration keys. The server

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/SubtractClusterConfigsProcedure.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,9 @@
3232
* Procedure to subtract (remove) values from list-type cluster configurations dynamically.
3333
*
3434
* <p>This procedure removes specific values from existing list-type configurations. The SUBTRACT
35-
* operation only works on configurations defined as list types (e.g., {@code security.sasl.users}).
36-
* If the list becomes empty after subtraction, the configuration key is removed entirely. The
37-
* changes are:
35+
* operation only works on configurations defined as list types (e.g., {@code
36+
* security.sasl.plain.users}). If the list becomes empty after subtraction, the configuration key
37+
* is removed entirely. The changes are:
3838
*
3939
* <ul>
4040
* <li>Validated by the CoordinatorServer before persistence
@@ -47,10 +47,10 @@
4747
*
4848
* <pre>
4949
* -- Remove a user from the SASL user list
50-
* CALL sys.subtract_cluster_configs('security.sasl.users', 'bob:bob-secret');
50+
* CALL sys.subtract_cluster_configs('security.sasl.plain.users', 'bob:bob-secret');
5151
*
5252
* -- Remove multiple key-value pairs at one time
53-
* CALL sys.subtract_cluster_configs('security.sasl.users', 'bob:bob-secret', 'security.sasl.users', 'alice:alice-secret');
53+
* CALL sys.subtract_cluster_configs('security.sasl.plain.users', 'bob:bob-secret', 'security.sasl.plain.users', 'alice:alice-secret');
5454
* </pre>
5555
*
5656
* <p><b>Note:</b> SUBTRACT operations are only supported for list-type configuration keys. The

fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/procedure/FlinkProcedureITCase.java

Lines changed: 15 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -833,7 +833,9 @@ void testAddAndDeleteUser() throws Exception {
833833
.collect()) {
834834
List<Row> results = CollectionUtil.iteratorToList(resultIterator);
835835
assertThat(results).hasSize(1);
836-
assertThat((String) results.get(0).getField(1)).contains("bob:bob_pass");
836+
assertThat(results.stream().map(Row::toString).collect(Collectors.toList()))
837+
.containsExactly(
838+
"+I[security.sasl.plain.users, root:password,guest:passwords, bob:bob_pass, DYNAMIC_SERVER_CONFIG]");
837839
}
838840

839841
// Verify "bob" can authenticate by creating a catalog with bob's credentials
@@ -854,24 +856,18 @@ void testAddAndDeleteUser() throws Exception {
854856
bobCatalog, ConfigOptions.SERVER_SASL_USERS.key()))
855857
.collect()) {
856858
List<Row> results = CollectionUtil.iteratorToList(resultIterator);
857-
assertThat(results).hasSize(1);
859+
assertThat(results.stream().map(Row::toString).collect(Collectors.toList()))
860+
.containsExactly(
861+
"+I[security.sasl.plain.users, root:password,guest:passwords,bob:bob_pass, DYNAMIC_SERVER_CONFIG]");
858862
}
859863
tEnv.executeSql("drop catalog " + bobCatalog);
860864

861865
// Step 2: Delete user "bob" via subtract_cluster_configs
862-
try (CloseableIterator<Row> resultIterator =
863-
tEnv.executeSql(
864-
String.format(
865-
"Call %s.sys.subtract_cluster_configs('%s', 'bob:bob_pass')",
866-
CATALOG_NAME, ConfigOptions.SERVER_SASL_USERS.key()))
867-
.collect()) {
868-
List<Row> results = CollectionUtil.iteratorToList(resultIterator);
869-
assertThat(results).hasSize(1);
870-
assertThat(results.get(0).getField(0))
871-
.asString()
872-
.contains("Successfully subtracted")
873-
.contains(ConfigOptions.SERVER_SASL_USERS.key());
874-
}
866+
tEnv.executeSql(
867+
String.format(
868+
"Call %s.sys.subtract_cluster_configs('%s', 'bob:bob_pass')",
869+
CATALOG_NAME, ConfigOptions.SERVER_SASL_USERS.key()))
870+
.await();
875871

876872
// Verify "bob" was deleted from config
877873
try (CloseableIterator<Row> resultIterator =
@@ -882,9 +878,9 @@ void testAddAndDeleteUser() throws Exception {
882878
.collect()) {
883879
List<Row> results = CollectionUtil.iteratorToList(resultIterator);
884880
// After subtracting the only dynamically-added entry, the config may be empty
885-
if (!results.isEmpty()) {
886-
assertThat((String) results.get(0).getField(1)).doesNotContain("bob");
887-
}
881+
assertThat(results.stream().map(Row::toString).collect(Collectors.toList()))
882+
.containsExactly(
883+
"+I[security.sasl.plain.users, root:password,guest:passwords, DYNAMIC_SERVER_CONFIG]");
888884
}
889885

890886
// Verify "bob" can no longer authenticate
@@ -952,11 +948,7 @@ private static Configuration initConfig() {
952948
// set security information.
953949
conf.setString(ConfigOptions.SERVER_SECURITY_PROTOCOL_MAP.key(), "CLIENT:sasl");
954950
conf.setString("security.sasl.enabled.mechanisms", "plain");
955-
conf.setString(
956-
"security.sasl.plain.jaas.config",
957-
"org.apache.fluss.security.auth.sasl.plain.PlainLoginModule required "
958-
+ " user_root=\"password\" "
959-
+ " user_guest=\"password2\";");
951+
conf.setString(ConfigOptions.SERVER_SASL_USERS.key(), "root:password,guest:passwords");
960952
conf.set(ConfigOptions.SUPER_USERS, "User:root");
961953
conf.set(ConfigOptions.AUTHORIZER_ENABLED, true);
962954
return conf;

fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/server/FlussProtocolPlugin.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -106,13 +106,13 @@ public void validate(Configuration newConfig) throws ConfigException {
106106
if (colonIdx <= 0 || colonIdx == entry.length() - 1) {
107107
throw new ConfigException(
108108
String.format(
109-
"security.sasl.users[%d] must be in 'username:password' format, but got '%s'.",
109+
"security.sasl.plain.users[%d] must be in 'username:password' format, but got '%s'.",
110110
i, entry));
111111
}
112112
String username = entry.substring(0, colonIdx);
113113
if (!uniqueUsernames.add(username)) {
114114
throw new ConfigException(
115-
"security.sasl.users must not contain duplicate usernames: '"
115+
"security.sasl.plain.users must not contain duplicate usernames: '"
116116
+ username
117117
+ "'.");
118118
}
@@ -126,7 +126,7 @@ public void reconfigure(Configuration newConfig) throws ConfigException {
126126

127127
/**
128128
* Enriches the given configuration with a generated JAAS config string derived from the
129-
* security.sasl.users list (format: 'username:password'). If the list is not present, the
129+
* security.sasl.plain.users list (format: 'username:password'). If the list is not present, the
130130
* original configuration is returned unchanged.
131131
*/
132132
private static Configuration enrichWithJaasConfig(Configuration config) {

fluss-rpc/src/test/java/org/apache/fluss/rpc/netty/authenticate/AuthenticationTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -242,7 +242,7 @@ private void buildNettyServer() throws Exception {
242242
configuration.setString(
243243
ConfigOptions.SERVER_SECURITY_PROTOCOL_MAP.key(), "CLIENT1:mutual,CLIENT2:sasl");
244244
configuration.setString("security.sasl.enabled.mechanisms", "plain");
245-
configuration.setString("security.sasl.users", "root:password,guest:password2");
245+
configuration.setString("security.sasl.plain.users", "root:password,guest:password2");
246246
configuration.set(ConfigOptions.SUPER_USERS, "User:root");
247247
configuration.set(ConfigOptions.AUTHORIZER_ENABLED, true);
248248
// 3 worker threads is enough for this test

fluss-rpc/src/test/java/org/apache/fluss/rpc/netty/authenticate/SaslAuthenticationITCase.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -187,7 +187,8 @@ void testAddAndDeleteUser() throws Exception {
187187
Configuration serverConfig = new Configuration();
188188
serverConfig.setString(ConfigOptions.SERVER_SECURITY_PROTOCOL_MAP.key(), "CLIENT:sasl");
189189
serverConfig.setString("security.sasl.enabled.mechanisms", "plain");
190-
serverConfig.setString("security.sasl.users", "admin:admin-secret,alice:alice-secret");
190+
serverConfig.setString(
191+
"security.sasl.plain.users", "admin:admin-secret,alice:alice-secret");
191192
serverConfig.setString(ConfigOptions.NETTY_SERVER_NUM_WORKER_THREADS.key(), "3");
192193

193194
MetricGroup metricGroup = NOPMetricsGroup.newInstance();
@@ -225,7 +226,8 @@ void testAddAndDeleteUser() throws Exception {
225226
addBobConfig.setString(ConfigOptions.SERVER_SECURITY_PROTOCOL_MAP.key(), "CLIENT:sasl");
226227
addBobConfig.setString("security.sasl.enabled.mechanisms", "plain");
227228
addBobConfig.setString(
228-
"security.sasl.users", "admin:admin-secret,alice:alice-secret,bob:bob-secret");
229+
"security.sasl.plain.users",
230+
"admin:admin-secret,alice:alice-secret,bob:bob-secret");
229231
plugin.validate(addBobConfig);
230232
plugin.reconfigure(addBobConfig);
231233

@@ -239,7 +241,8 @@ void testAddAndDeleteUser() throws Exception {
239241
removeAdminConfig.setString(
240242
ConfigOptions.SERVER_SECURITY_PROTOCOL_MAP.key(), "CLIENT:sasl");
241243
removeAdminConfig.setString("security.sasl.enabled.mechanisms", "plain");
242-
removeAdminConfig.setString("security.sasl.users", "alice:alice-secret,bob:bob-secret");
244+
removeAdminConfig.setString(
245+
"security.sasl.plain.users", "alice:alice-secret,bob:bob-secret");
243246
plugin.validate(removeAdminConfig);
244247
plugin.reconfigure(removeAdminConfig);
245248

fluss-server/src/main/java/org/apache/fluss/server/DynamicConfigManager.java

Lines changed: 31 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@
3636
import org.slf4j.LoggerFactory;
3737

3838
import java.util.ArrayList;
39-
import java.util.Arrays;
4039
import java.util.List;
4140
import java.util.Map;
4241

@@ -150,8 +149,17 @@ private void prepareIncrementalConfigs(
150149
break;
151150
case APPEND:
152151
validateListType(configPropName);
153-
String existingAppend = configsProps.getOrDefault(configPropName, "");
154-
if (existingAppend.isEmpty()) {
152+
String existingAppend;
153+
if (configsProps.containsKey(configPropName)) {
154+
existingAppend = configsProps.get(configPropName);
155+
} else {
156+
// Fall back to static config
157+
existingAppend =
158+
dynamicServerConfig
159+
.getInitialServerConfigs()
160+
.get(configPropName);
161+
}
162+
if (existingAppend == null || existingAppend.isEmpty()) {
155163
configsProps.put(configPropName, configPropValue);
156164
} else {
157165
configsProps.put(
@@ -160,13 +168,27 @@ private void prepareIncrementalConfigs(
160168
break;
161169
case SUBTRACT:
162170
validateListType(configPropName);
163-
String existingSubtract = configsProps.get(configPropName);
164-
if (existingSubtract != null) {
165-
List<String> items =
166-
new ArrayList<>(Arrays.asList(existingSubtract.split(",")));
167-
items.remove(configPropValue);
171+
String existingSubtract;
172+
if (configsProps.containsKey(configPropName)) {
173+
existingSubtract = configsProps.get(configPropName);
174+
} else {
175+
// Fall back to static config
176+
existingSubtract =
177+
dynamicServerConfig
178+
.getInitialServerConfigs()
179+
.get(configPropName);
180+
}
181+
if (existingSubtract != null && !existingSubtract.isEmpty()) {
182+
List<String> items = new ArrayList<>();
183+
for (String item : existingSubtract.split(",")) {
184+
String trimmed = item.trim();
185+
if (!trimmed.isEmpty()) {
186+
items.add(trimmed);
187+
}
188+
}
189+
items.removeIf(v -> v.equals(configPropValue));
168190
if (items.isEmpty()) {
169-
configsProps.remove(configPropName);
191+
configsProps.put(configPropName, null);
170192
} else {
171193
configsProps.put(configPropName, String.join(",", items));
172194
}

0 commit comments

Comments
 (0)