Skip to content

Commit

Permalink
fix rate dump
Browse files Browse the repository at this point in the history
  • Loading branch information
dzdx committed Feb 7, 2021
1 parent b0f39e7 commit 5ab5d33
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 76 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,19 +24,16 @@
import com.alipay.sofa.registry.server.data.bootstrap.DataServerConfig;
import com.alipay.sofa.registry.server.data.cache.DatumCache;
import com.alipay.sofa.registry.util.ConcurrentUtils;
import com.alipay.sofa.registry.util.NamedThreadFactory;
import org.apache.commons.lang.time.DateUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.util.CollectionUtils;

import javax.annotation.PostConstruct;
import java.util.Map;
import java.util.*;
import java.util.Map.Entry;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
*
* @author qian.lqlq
* @version $Id: CacheDigestTask.java, v 0.1 2018-04-27 17:40 qian.lqlq Exp $
*/
Expand All @@ -56,42 +53,52 @@ public void init() {
LOGGER.info("cache digest off with intervalSecs={}", intervalSec);
return;
}
ScheduledExecutorService executor = new ScheduledThreadPoolExecutor(1,
new NamedThreadFactory("CacheDigestTask"));
executor.scheduleWithFixedDelay(() -> {
try {
Map<String, Map<String, Datum>> allMap = datumCache.getAll();
if (!allMap.isEmpty()) {
for (Entry<String, Map<String, Datum>> dataCenterEntry : allMap.entrySet()) {
String dataCenter = dataCenterEntry.getKey();
Map<String, Datum> datumMap = dataCenterEntry.getValue();
LOGGER.info("size of datum in {} is {}", dataCenter, datumMap.size());
for (Entry<String, Datum> dataInfoEntry : datumMap.entrySet()) {
String dataInfoId = dataInfoEntry.getKey();
Datum data = dataInfoEntry.getValue();
Map<String, Publisher> pubMap = data.getPubMap();
StringBuilder pubStr = new StringBuilder(1024);
if (!CollectionUtils.isEmpty(pubMap)) {
for (Publisher publisher : pubMap.values()) {
pubStr.append(logPublisher(publisher)).append(";");
}
Date firstDate = new Date();
firstDate = DateUtils.round(firstDate, Calendar.MINUTE);
firstDate.setMinutes(firstDate.getMinutes() / 5 * 5 + 5);
Timer timer = new Timer("CacheDigestTask", true);
TimerTask task = new TimerTask() {
@Override
public void run() {
dump();
}
};
timer.scheduleAtFixedRate(task, firstDate, intervalSec * 1000);
}

private void dump() {
try {
Map<String, Map<String, Datum>> allMap = datumCache.getAll();
if (!allMap.isEmpty()) {
for (Entry<String, Map<String, Datum>> dataCenterEntry : allMap.entrySet()) {
String dataCenter = dataCenterEntry.getKey();
Map<String, Datum> datumMap = dataCenterEntry.getValue();
LOGGER.info("size of datum in {} is {}", dataCenter, datumMap.size());
for (Entry<String, Datum> dataInfoEntry : datumMap.entrySet()) {
String dataInfoId = dataInfoEntry.getKey();
Datum data = dataInfoEntry.getValue();
Map<String, Publisher> pubMap = data.getPubMap();
StringBuilder pubStr = new StringBuilder(1024);
if (!CollectionUtils.isEmpty(pubMap)) {
for (Publisher publisher : pubMap.values()) {
pubStr.append(logPublisher(publisher)).append(";");
}
LOGGER.info("[Datum]{},{},{},[{}]", dataInfoId,
data.getVersion(), dataCenter, pubStr.toString());
// avoid io is busy
ConcurrentUtils.sleepUninterruptibly(20, TimeUnit.MILLISECONDS);
}
int pubCount = datumMap.values().stream().mapToInt(Datum::publisherSize).sum();
LOGGER.info("size of publisher in {} is {}", dataCenter, pubCount);
LOGGER.info("[Datum]{},{},{},[{}]", dataInfoId,
data.getVersion(), dataCenter, pubStr.toString());
// avoid io is busy
ConcurrentUtils.sleepUninterruptibly(20, TimeUnit.MILLISECONDS);
}
} else {
LOGGER.info("datum cache is empty");
int pubCount = datumMap.values().stream().mapToInt(Datum::publisherSize).sum();
LOGGER.info("size of publisher in {} is {}", dataCenter, pubCount);
}

} catch (Throwable t) {
LOGGER.error("cache digest error", t);
} else {
LOGGER.info("datum cache is empty");
}
}, intervalSec, intervalSec, TimeUnit.SECONDS);

} catch (Throwable t) {
LOGGER.error("cache digest error", t);
}
}

private String logPublisher(Publisher publisher) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,22 +26,15 @@
import com.alipay.sofa.registry.server.session.store.DataStore;
import com.alipay.sofa.registry.server.session.store.Interests;
import com.alipay.sofa.registry.util.ConcurrentUtils;
import com.alipay.sofa.registry.util.NamedThreadFactory;
import org.apache.commons.lang.time.DateUtils;
import org.springframework.beans.factory.annotation.Autowired;

import javax.annotation.PostConstruct;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

/**
*
* @author xiaojian.xj
* @version $Id: SessionCacheDigestTask.java, v 0.1 2020年08月03日 14:37 xiaojian.xj Exp $
*/
Expand All @@ -50,10 +43,10 @@ public class SessionCacheDigestTask {
private static final Logger LOGGER = LoggerFactory.getLogger("CACHE-DIGEST");

@Autowired
private DataStore sessionDataStore;
private DataStore sessionDataStore;

@Autowired
private Interests sessionInterests;
private Interests sessionInterests;

@Autowired
private SessionServerConfig sessionServerConfig;
Expand All @@ -65,35 +58,44 @@ public void init() {
LOGGER.info("cache digest off with intervalSec={}", intervalSec);
return;
}
ScheduledExecutorService executor = new ScheduledThreadPoolExecutor(1,
new NamedThreadFactory("CacheDigestTask"));
executor.scheduleWithFixedDelay(() -> {

try {
Collection<String> storeDataInfoIds = sessionDataStore.getDataInfoIds();
Collection<String> interestDataInfoIds = sessionInterests.getDataInfoIds();
Set<String> dataInfoIds = new HashSet<>(storeDataInfoIds.size()+interestDataInfoIds.size());

dataInfoIds.addAll(storeDataInfoIds);
dataInfoIds.addAll(interestDataInfoIds);

dataInfoIds.stream().forEach(dataInfoId -> {
Collection<Publisher> publishers = sessionDataStore.getDatas(dataInfoId);
Collection<Subscriber> subscribers = sessionInterests.getDatas(dataInfoId);

LOGGER.info("[dataInfo] {}; {}; {}; {}; [{}]; [{}]",
sessionServerConfig.getSessionServerDataCenter(), dataInfoId,
publishers.size(), subscribers.size(),
logPubOrSub(publishers), logPubOrSub(subscribers));
// avoid io is too busy
ConcurrentUtils.sleepUninterruptibly(20, TimeUnit.MILLISECONDS);
});

} catch (Throwable t) {
LOGGER.error("[CacheDigestTask] cache digest error", t);
}

}, intervalSec, intervalSec, TimeUnit.SECONDS);
Date firstDate = new Date();
firstDate = DateUtils.round(firstDate, Calendar.MINUTE);
firstDate.setMinutes(firstDate.getMinutes() / 5 * 5 + 5);
Timer timer = new Timer("CacheDigestTask", true);
TimerTask task = new TimerTask() {
@Override
public void run() {
dump();
}
};
timer.scheduleAtFixedRate(task, firstDate, intervalSec * 1000);
}

private void dump() {
try {
Collection<String> storeDataInfoIds = sessionDataStore.getDataInfoIds();
Collection<String> interestDataInfoIds = sessionInterests.getDataInfoIds();
Set<String> dataInfoIds = new HashSet<>(storeDataInfoIds.size() + interestDataInfoIds.size());

dataInfoIds.addAll(storeDataInfoIds);
dataInfoIds.addAll(interestDataInfoIds);

dataInfoIds.stream().forEach(dataInfoId -> {
Collection<Publisher> publishers = sessionDataStore.getDatas(dataInfoId);
Collection<Subscriber> subscribers = sessionInterests.getDatas(dataInfoId);

LOGGER.info("[dataInfo] {}; {}; {}; {}; [{}]; [{}]",
sessionServerConfig.getSessionServerDataCenter(), dataInfoId,
publishers.size(), subscribers.size(),
logPubOrSub(publishers), logPubOrSub(subscribers));
// avoid io is too busy
ConcurrentUtils.sleepUninterruptibly(20, TimeUnit.MILLISECONDS);
});

} catch (Throwable t) {
LOGGER.error("[CacheDigestTask] cache digest error", t);
}

}

private String logPubOrSub(Collection<? extends BaseInfo> infos) {
Expand Down

0 comments on commit 5ab5d33

Please sign in to comment.