Skip to content

Commit 3d2c4df

Browse files
committed
Introduce AccessSupportRssChecker to reject the un-support application earlier
1 parent 4ce1aa8 commit 3d2c4df

File tree

4 files changed

+170
-1
lines changed

4 files changed

+170
-1
lines changed

coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorConf.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.uniffle.common.config.ConfigOptions;
2424
import org.apache.uniffle.common.config.ConfigUtils;
2525
import org.apache.uniffle.common.config.RssBaseConf;
26+
import org.apache.uniffle.coordinator.access.checker.AccessSupportRssChecker;
2627
import org.apache.uniffle.coordinator.conf.ClientConfParser;
2728
import org.apache.uniffle.coordinator.strategy.assignment.AbstractAssignmentStrategy;
2829
import org.apache.uniffle.coordinator.strategy.assignment.AssignmentStrategyFactory;
@@ -92,7 +93,8 @@ public class CoordinatorConf extends RssBaseConf {
9293
.asList()
9394
.defaultValues(
9495
"org.apache.uniffle.coordinator.access.checker.AccessClusterLoadChecker",
95-
"org.apache.uniffle.coordinator.access.checker.AccessQuotaChecker")
96+
"org.apache.uniffle.coordinator.access.checker.AccessQuotaChecker",
97+
AccessSupportRssChecker.class.getCanonicalName())
9698
.withDescription("Access checkers");
9799
public static final ConfigOption<Integer> COORDINATOR_ACCESS_CANDIDATES_UPDATE_INTERVAL_SEC =
98100
ConfigOptions.key("rss.coordinator.access.candidates.updateIntervalSec")
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.uniffle.coordinator.access.checker;
19+
20+
import java.io.IOException;
21+
22+
import org.apache.hadoop.io.serializer.JavaSerialization;
23+
import org.slf4j.Logger;
24+
import org.slf4j.LoggerFactory;
25+
26+
import org.apache.uniffle.common.util.Constants;
27+
import org.apache.uniffle.coordinator.AccessManager;
28+
import org.apache.uniffle.coordinator.access.AccessCheckResult;
29+
import org.apache.uniffle.coordinator.access.AccessInfo;
30+
import org.apache.uniffle.coordinator.metric.CoordinatorMetrics;
31+
32+
/**
33+
* AccessSupportRssChecker checks whether the extra properties support rss, for example, the
34+
* serializer is java, rss is not supported.
35+
*/
36+
public class AccessSupportRssChecker extends AbstractAccessChecker {
37+
private static final Logger LOG = LoggerFactory.getLogger(AccessSupportRssChecker.class);
38+
39+
public AccessSupportRssChecker(AccessManager accessManager) throws Exception {
40+
super(accessManager);
41+
}
42+
43+
@Override
44+
public AccessCheckResult check(AccessInfo accessInfo) {
45+
String serializer = accessInfo.getExtraProperties().get("serializer");
46+
if (JavaSerialization.class.getName().equals(serializer)) {
47+
String msg = String.format("Denied by AccessSupportRssChecker, accessInfo[%s].", accessInfo);
48+
if (LOG.isDebugEnabled()) {
49+
LOG.debug("serializer is {}, {}", serializer, msg);
50+
}
51+
CoordinatorMetrics.counterTotalSupportRssDeniedRequest.inc();
52+
return new AccessCheckResult(false, msg);
53+
}
54+
55+
return new AccessCheckResult(true, Constants.COMMON_SUCCESS_MESSAGE);
56+
}
57+
58+
@Override
59+
public void close() throws IOException {}
60+
}

coordinator/src/main/java/org/apache/uniffle/coordinator/metric/CoordinatorMetrics.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ public class CoordinatorMetrics {
4343
private static final String TOTAL_CANDIDATES_DENIED_REQUEST = "total_candidates_denied_request";
4444
private static final String TOTAL_LOAD_DENIED_REQUEST = "total_load_denied_request";
4545
private static final String TOTAL_QUOTA_DENIED_REQUEST = "total_quota_denied_request";
46+
private static final String TOTAL_SUPPORT_RSS_DENIED_REQUEST = "total_support_rss_denied_request";
4647
public static final String REMOTE_STORAGE_IN_USED_PREFIX = "remote_storage_in_used_";
4748
public static final String APP_NUM_TO_USER = "app_num";
4849
public static final String USER_LABEL = "user_name";
@@ -57,6 +58,7 @@ public class CoordinatorMetrics {
5758
public static Counter counterTotalCandidatesDeniedRequest;
5859
public static Counter counterTotalQuotaDeniedRequest;
5960
public static Counter counterTotalLoadDeniedRequest;
61+
public static Counter counterTotalSupportRssDeniedRequest;
6062
public static final Map<String, Gauge> GAUGE_USED_REMOTE_STORAGE = JavaUtils.newConcurrentMap();
6163

6264
private static MetricsManager metricsManager;
@@ -118,5 +120,7 @@ private static void setUpMetrics() {
118120
metricsManager.addCounter(TOTAL_CANDIDATES_DENIED_REQUEST);
119121
counterTotalQuotaDeniedRequest = metricsManager.addCounter(TOTAL_QUOTA_DENIED_REQUEST);
120122
counterTotalLoadDeniedRequest = metricsManager.addCounter(TOTAL_LOAD_DENIED_REQUEST);
123+
counterTotalSupportRssDeniedRequest =
124+
metricsManager.addCounter(TOTAL_SUPPORT_RSS_DENIED_REQUEST);
121125
}
122126
}
Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.uniffle.coordinator.checker;
19+
20+
import java.util.Collections;
21+
import java.util.HashMap;
22+
import java.util.HashSet;
23+
import java.util.Map;
24+
25+
import org.apache.hadoop.conf.Configuration;
26+
import org.apache.hadoop.io.serializer.JavaSerialization;
27+
import org.apache.hadoop.io.serializer.WritableSerialization;
28+
import org.junit.jupiter.api.AfterEach;
29+
import org.junit.jupiter.api.BeforeEach;
30+
import org.junit.jupiter.api.Test;
31+
32+
import org.apache.uniffle.coordinator.AccessManager;
33+
import org.apache.uniffle.coordinator.ApplicationManager;
34+
import org.apache.uniffle.coordinator.ClusterManager;
35+
import org.apache.uniffle.coordinator.CoordinatorConf;
36+
import org.apache.uniffle.coordinator.SimpleClusterManager;
37+
import org.apache.uniffle.coordinator.access.AccessInfo;
38+
import org.apache.uniffle.coordinator.access.checker.AccessSupportRssChecker;
39+
import org.apache.uniffle.coordinator.metric.CoordinatorMetrics;
40+
41+
import static org.apache.uniffle.coordinator.CoordinatorConf.COORDINATOR_ACCESS_CHECKERS;
42+
import static org.junit.jupiter.api.Assertions.assertFalse;
43+
import static org.junit.jupiter.api.Assertions.assertTrue;
44+
import static org.mockito.Mockito.mock;
45+
46+
public class AccessSupportRssCheckerTest {
47+
48+
@BeforeEach
49+
public void setUp() {
50+
CoordinatorMetrics.register();
51+
}
52+
53+
@AfterEach
54+
public void clear() {
55+
CoordinatorMetrics.clear();
56+
}
57+
58+
@Test
59+
public void test() throws Exception {
60+
ClusterManager clusterManager = mock(SimpleClusterManager.class);
61+
62+
CoordinatorConf conf = new CoordinatorConf();
63+
conf.set(
64+
COORDINATOR_ACCESS_CHECKERS,
65+
Collections.singletonList(AccessSupportRssChecker.class.getName()));
66+
Map<String, String> properties = new HashMap<>();
67+
68+
/** case1: check success when the serializer config is empty. */
69+
try (ApplicationManager applicationManager = new ApplicationManager(conf)) {
70+
AccessManager accessManager =
71+
new AccessManager(
72+
conf, clusterManager, applicationManager.getQuotaManager(), new Configuration());
73+
AccessSupportRssChecker checker =
74+
(AccessSupportRssChecker) accessManager.getAccessCheckers().get(0);
75+
AccessInfo accessInfo = new AccessInfo("test", new HashSet<>(), properties, "user");
76+
assertTrue(checker.check(accessInfo).isSuccess());
77+
}
78+
79+
/** case2: check failed when the serializer config is JavaSerialization. */
80+
properties.put("serializer", JavaSerialization.class.getCanonicalName());
81+
try (ApplicationManager applicationManager = new ApplicationManager(conf)) {
82+
AccessManager accessManager =
83+
new AccessManager(
84+
conf, clusterManager, applicationManager.getQuotaManager(), new Configuration());
85+
AccessSupportRssChecker checker =
86+
(AccessSupportRssChecker) accessManager.getAccessCheckers().get(0);
87+
AccessInfo accessInfo = new AccessInfo("test", new HashSet<>(), properties, "user");
88+
assertFalse(checker.check(accessInfo).isSuccess());
89+
}
90+
91+
/** case3: check success when the serializer config is other than JavaSerialization. */
92+
properties.put("serializer", WritableSerialization.class.getCanonicalName());
93+
try (ApplicationManager applicationManager = new ApplicationManager(conf)) {
94+
AccessManager accessManager =
95+
new AccessManager(
96+
conf, clusterManager, applicationManager.getQuotaManager(), new Configuration());
97+
AccessSupportRssChecker checker =
98+
(AccessSupportRssChecker) accessManager.getAccessCheckers().get(0);
99+
AccessInfo accessInfo = new AccessInfo("test", new HashSet<>(), properties, "user");
100+
assertTrue(checker.check(accessInfo).isSuccess());
101+
}
102+
}
103+
}

0 commit comments

Comments
 (0)