Skip to content

Commit

Permalink
add rpc custom thread pool monitor (#908)
Browse files Browse the repository at this point in the history
  • Loading branch information
chuailiwu authored Mar 4, 2022
1 parent c1018f7 commit aa8478d
Show file tree
Hide file tree
Showing 8 changed files with 185 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.concurrent.atomic.AtomicInteger;

import com.alipay.sofa.rpc.common.annotation.VisibleForTesting;
import com.alipay.sofa.rpc.common.utils.StringUtils;
import org.slf4j.Logger;

import com.alipay.sofa.rpc.boot.log.SofaBootRpcLoggerFactory;
Expand Down Expand Up @@ -53,6 +54,8 @@ public class RpcThreadPoolMonitor {

private Thread monitor;

private String poolName = "";

public RpcThreadPoolMonitor(String loggerName) {
this(null, loggerName, DEFAULT_SLEEP_TIME);
}
Expand Down Expand Up @@ -99,6 +102,9 @@ public void run() {
sb.append("active:" + activeSize + ", ");
sb.append("idle:" + (poolSize - activeSize) + ", ");
sb.append("poolSize:" + poolSize);
if (StringUtils.isNotBlank(poolName)) {
sb.append(", poolName: " + poolName);
}
logger.info(sb.toString());
}
} catch (Throwable throwable) {
Expand Down Expand Up @@ -133,6 +139,14 @@ public void setThreadPoolExecutor(ThreadPoolExecutor threadPoolExecutor) {
this.threadPoolExecutor = threadPoolExecutor;
}

public void setPoolName(String poolName) {
this.poolName = poolName;
}

public String getPoolName() {
return poolName;
}

public void stop() {
synchronized (this) {
this.active = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,21 @@
import com.alipay.sofa.rpc.boot.log.SofaBootRpcLoggerFactory;
import com.alipay.sofa.rpc.common.RpcConstants;
import com.alipay.sofa.rpc.config.ServerConfig;
import com.alipay.sofa.rpc.config.UserThreadPoolManager;
import com.alipay.sofa.rpc.log.LogCodes;
import com.alipay.sofa.rpc.server.Server;
import com.alipay.sofa.rpc.server.UserThreadPool;
import com.alipay.sofa.rpc.server.bolt.BoltServer;
import com.alipay.sofa.rpc.server.triple.TripleServer;
import org.slf4j.Logger;
import org.springframework.util.StringUtils;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadPoolExecutor;

Expand All @@ -44,54 +50,56 @@
*/
public class ServerConfigContainer {

private static final Logger LOGGER = SofaBootRpcLoggerFactory
.getLogger(ServerConfigContainer.class);
private static final Logger LOGGER = SofaBootRpcLoggerFactory
.getLogger(ServerConfigContainer.class);

private SofaBootRpcProperties sofaBootRpcProperties;
private SofaBootRpcProperties sofaBootRpcProperties;
/**
* bolt ServerConfig
*/
private volatile ServerConfig boltServerConfig;
private final Object BOLT_LOCK = new Object();
private volatile ServerConfig boltServerConfig;
private final Object BOLT_LOCK = new Object();

/**
* rest ServerConfig
*/
private volatile ServerConfig restServerConfig;
private final Object REST_LOCK = new Object();
private volatile ServerConfig restServerConfig;
private final Object REST_LOCK = new Object();

/**
* dubbo ServerConfig
*/
private volatile ServerConfig dubboServerConfig;
private final Object DUBBO_LOCK = new Object();
private volatile ServerConfig dubboServerConfig;
private final Object DUBBO_LOCK = new Object();

/**
* h2c ServerConfig
*/
private volatile ServerConfig h2cServerConfig;
private final Object H2C_LOCK = new Object();
private volatile ServerConfig h2cServerConfig;
private final Object H2C_LOCK = new Object();

/**
* http ServerConfig
*/
private volatile ServerConfig httpServerConfig;
private final Object HTTP_LOCK = new Object();
private volatile ServerConfig httpServerConfig;
private final Object HTTP_LOCK = new Object();

/**
* http ServerConfig
*/
private volatile ServerConfig tripleServerConfig;
private final Object TRIPLE_LOCK = new Object();
private volatile ServerConfig tripleServerConfig;
private final Object TRIPLE_LOCK = new Object();

//custom server configs
private Map<String, ServerConfig> customServerConfigs = new ConcurrentHashMap<String, ServerConfig>();
private Map<String, ServerConfig> customServerConfigs = new ConcurrentHashMap<String, ServerConfig>();

private RpcThreadPoolMonitor boltThreadPoolMonitor = new RpcThreadPoolMonitor(
LoggerConstant.BOLT_THREAD_LOGGER_NAME);
private RpcThreadPoolMonitor boltThreadPoolMonitor = new RpcThreadPoolMonitor(
LoggerConstant.BOLT_THREAD_LOGGER_NAME);

private RpcThreadPoolMonitor tripleThreadPoolMonitor = new RpcThreadPoolMonitor(
LoggerConstant.TRIPLE_THREAD_LOGGER_NAME);
private RpcThreadPoolMonitor tripleThreadPoolMonitor = new RpcThreadPoolMonitor(
LoggerConstant.TRIPLE_THREAD_LOGGER_NAME);

private List<RpcThreadPoolMonitor> customThreadPoolMonitorList = new ArrayList<>();

public ServerConfigContainer(SofaBootRpcProperties sofaBootRpcProperties) {
this.sofaBootRpcProperties = sofaBootRpcProperties;
Expand Down Expand Up @@ -156,6 +164,29 @@ public void startServers() {
}
}

startCustomThreadPoolMonitor();
}

private void startCustomThreadPoolMonitor() {
Set<UserThreadPool> userThreadPoolSet = UserThreadPoolManager.getUserThreadPoolSet();
if (!userThreadPoolSet.isEmpty()) {
Set<String> poolNames = new HashSet<>();
for (UserThreadPool pool : userThreadPoolSet) {
RpcThreadPoolMonitor customThreadPoolMonitor = new RpcThreadPoolMonitor(
LoggerConstant.CUSTOM_THREAD_LOGGER_NAME);
customThreadPoolMonitorList.add(customThreadPoolMonitor);
if (poolNames.contains(pool.getThreadPoolName())) {
//use to distinguish some UserThreadPools set same poolName
customThreadPoolMonitor.setPoolName(pool.getThreadPoolName() + "-"
+ pool.hashCode());
} else {
customThreadPoolMonitor.setPoolName(pool.getThreadPoolName());
}
customThreadPoolMonitor.setThreadPoolExecutor(pool.getExecutor());
customThreadPoolMonitor.start();
poolNames.add(pool.getThreadPoolName());
}
}
}

/**
Expand Down Expand Up @@ -584,6 +615,8 @@ public void closeAllServer() {
tripleThreadPoolMonitor.stop();
}

stopCustomThreadPoolMonitor();

destroyServerConfig(boltServerConfig);
destroyServerConfig(restServerConfig);
destroyServerConfig(dubboServerConfig);
Expand All @@ -602,6 +635,15 @@ public void closeAllServer() {
customServerConfigs.clear();
}

private void stopCustomThreadPoolMonitor() {
if (!customThreadPoolMonitorList.isEmpty()) {
for (RpcThreadPoolMonitor monitor : customThreadPoolMonitorList) {
monitor.stop();
}
customThreadPoolMonitorList.clear();
}
}

private void destroyServerConfig(ServerConfig serverConfig) {
if (serverConfig != null) {
Server server = serverConfig.getServer();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,5 @@ public class LoggerConstant {

public static final String BOLT_THREAD_LOGGER_NAME = "RPC-BOLT-THREADPOOL";
public static final String TRIPLE_THREAD_LOGGER_NAME = "RPC-TRIPLE-THREADPOOL";
public static final String CUSTOM_THREAD_LOGGER_NAME = "RPC-CUSTOM-THREADPOOL";
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,16 @@
</layout>
</appender>

<appender name="RPC-CUSTOM-THREADPOOL-LOG-FILE" class="org.apache.log4j.DailyRollingFileAppender">
<param name="file" value="${logging.path}/rpc/custom-threadpool.log"/>
<param name="append" value="true"/>
<param name="encoding" value="${file.encoding}"/>
<param name="threshold" value="${logging.level.com.alipay.sofa.rpc.boot}"/>
<layout class="org.apache.log4j.PatternLayout">
<param name="ConversionPattern" value="%d %-5p %-32t %c{2} - %m%n"/>
</layout>
</appender>

<logger name="RPC-BOLT-THREADPOOL">
<level value="${logging.level.com.alipay.sofa.rpc.boot}"/>
<appender-ref ref="RPC-BOLT-THREADPOOL-LOG-FILE"/>
Expand All @@ -55,6 +65,12 @@
<appender-ref ref="ERROR-APPENDER"/>
</logger>

<logger name="RPC-CUSTOM-THREADPOOL">
<level value="${logging.level.com.alipay.sofa.rpc.boot}"/>
<appender-ref ref="RPC-CUSTOM-THREADPOOL-LOG-FILE"/>
<appender-ref ref="ERROR-APPENDER"/>
</logger>

<root>
<level value="${logging.level.com.alipay.sofa.rpc.boot}"/>
<appender-ref ref="ROOT-APPENDER"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,18 @@
<TimeBasedTriggeringPolicy interval="1" modulate="true"/>
</Policies>
</RollingFile>

<RollingFile name="RPC-CUSTOM-THREADPOOL-LOG-FILE" fileName="${RPC_LOG_PATH}/rpc/custom-threadpool.log" append="true"
filePattern="${RPC_LOG_PATH}/rpc/custom-threadpool.log.%d{yyyy-MM-dd}">
<ThresholdFilter level="${RPC_LOG_LEVEL}" onMatch="ACCEPT" onMismatch="DENY"/>
<PatternLayout charset="${RPC_FILE_ENCODING}">
<pattern>%d %-5p %-32t %c{2} - %m%n %throwable</pattern>
</PatternLayout>
<Policies>
<!-- 按天分日志文件:重要的是 filePattern 配置到按照天 -->
<TimeBasedTriggeringPolicy interval="1" modulate="true"/>
</Policies>
</RollingFile>
</appenders>

<loggers>
Expand All @@ -72,6 +84,11 @@
<appender-ref ref="ERROR-APPENDER"/>
</logger>

<logger name="RPC-CUSTOM-THREADPOOL" level="${RPC_LOG_LEVEL}" additivity="false">
<appender-ref ref="RPC-CUSTOM-THREADPOOL-LOG-FILE"/>
<appender-ref ref="ERROR-APPENDER"/>
</logger>

<root level="${RPC_LOG_LEVEL}">
<appender-ref ref="ROOT-APPENDER"/>
<appender-ref ref="ERROR-APPENDER"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,28 @@
</encoder>
</appender>

<appender name="RPC-CUSTOM-THREADPOOL-LOG-FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
<append>true</append>
<filter class="ch.qos.logback.classic.filter.LevelFilter">
<level>info</level>
<onMatch>ACCEPT</onMatch>
<onMismatch>DENY</onMismatch>
</filter>
<file>${logging.path}/rpc/custom-threadpool.log</file>
<!-- 每天生成一个日志文件,保存30天的日志文件 -->
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<!--日志文件输出的文件名:按天回滚 daily -->
<FileNamePattern>${logging.path}/rpc/custom-threadpool.log.%d{yyyy-MM-dd}</FileNamePattern>
<!--日志文件保留天数-->
<MaxHistory>30</MaxHistory>
</rollingPolicy>
<encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
<pattern>%d %-5p %-32t %c{2} - %m%n</pattern>
<!-- 编码 -->
<charset>${file.encoding}</charset>
</encoder>
</appender>


<!-- logback loggers -->

Expand All @@ -107,6 +129,11 @@
<appender-ref ref="ERROR-APPENDER"/>
</logger>

<logger name="RPC-CUSTOM-THREADPOOL" level="${logging.level.com.alipay.sofa.rpc.boot}" additivity="false">
<appender-ref ref="RPC-CUSTOM-THREADPOOL-LOG-FILE"/>
<appender-ref ref="ERROR-APPENDER"/>
</logger>

<root level="${logging.level.com.alipay.sofa.rpc.boot}">
<appender-ref ref="ROOT-APPENDER"/>
<appender-ref ref="ERROR-APPENDER"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
*/
package com.alipay.sofa.rpc.boot.test.container;

import com.alipay.sofa.rpc.boot.common.RpcThreadPoolMonitor;
import com.alipay.sofa.rpc.config.UserThreadPoolManager;
import com.alipay.sofa.rpc.server.UserThreadPool;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
Expand All @@ -30,6 +33,11 @@
import com.alipay.sofa.rpc.common.RpcConstants;
import com.alipay.sofa.rpc.config.ServerConfig;

import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.List;

/**
* @author <a href="mailto:[email protected]">LiWei</a>
*/
Expand Down Expand Up @@ -184,4 +192,42 @@ public void testCreateHttpServerConfig() {
Assert.assertEquals(1, serverConfig.getAccepts());
Assert.assertEquals(8, serverConfig.getQueues());
}

@Test
public void testStartCustomThreadPoolMonitor() throws NoSuchMethodException,
IllegalAccessException,
InvocationTargetException, NoSuchFieldException {
UserThreadPoolManager.registerUserThread("service1", new UserThreadPool());
UserThreadPoolManager.registerUserThread("service2", new UserThreadPool());
UserThreadPoolManager.registerUserThread("service3", new UserThreadPool("same-name"));
UserThreadPoolManager.registerUserThread("service4", new UserThreadPool("same-name"));

Method privateStartMethod = serverConfigContainer.getClass().getDeclaredMethod(
"startCustomThreadPoolMonitor");
privateStartMethod.setAccessible(true);
privateStartMethod.invoke(serverConfigContainer);

Field privateField = serverConfigContainer.getClass().getDeclaredField(
"customThreadPoolMonitorList");
privateField.setAccessible(true);
Object value = privateField.get(serverConfigContainer);
List<RpcThreadPoolMonitor> customThreadPoolMonitorList = (List<RpcThreadPoolMonitor>) value;
Assert.assertEquals(customThreadPoolMonitorList.size(), 4);

boolean hasHashCode = false;
for (RpcThreadPoolMonitor monitor : customThreadPoolMonitorList) {
if (monitor.getPoolName().contains("same-name-")) {
hasHashCode = true;
}
}
Assert.assertTrue(hasHashCode);

Method privateStopMethod = serverConfigContainer.getClass().getDeclaredMethod(
"stopCustomThreadPoolMonitor");
privateStopMethod.setAccessible(true);
privateStopMethod.invoke(serverConfigContainer);

Assert.assertEquals(customThreadPoolMonitorList.size(), 0);

}
}
4 changes: 2 additions & 2 deletions sofa-boot-project/sofaboot-dependencies/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@
<!--core-->
<sofa.registry.version>5.4.2</sofa.registry.version>
<tracer.core.version>3.1.0</tracer.core.version>
<rpc.core.version>5.7.7</rpc.core.version>
<rpc.core.version>5.8.3</rpc.core.version>

<!--2rd lib dependency-->
<sofa.common.tools.version>1.3.6</sofa.common.tools.version>
<sofa.bolt.version>1.5.6</sofa.bolt.version>
<sofa.bolt.version>1.5.10</sofa.bolt.version>
<sofa.hessian.version>3.3.12</sofa.hessian.version>

<!--3rd lib dependency-->
Expand Down

0 comments on commit aa8478d

Please sign in to comment.