Skip to content

Commit

Permalink
Fix issue24 (#41)
Browse files Browse the repository at this point in the history
* fix temp push

* update version 5.2.1-SNAPSHOT

* fix test case

* fix jetty version,and fix rest api for dataInfoIds

* fix hashcode test

* fix working to init bug

* fix start task log

* fix Watcher can't get providate data,retry and finally return new

* add data server list api

* add server list api

* remove log

* fix isssue 21

* add query by id function

* fix issue 22

* delay client off process and sync data process to working status

* fix data connet meta error

* fix inject NotifyDataSyncHandler

* fix start log

* add send sub log

* fix subscriber to send log

* fix word cache clientid

* add clientoff delay time

* fix clientOffDelayMs

* fix jetty version

* fix version to 5.2.1 release

* fix version

* fix .travis.yml

* fix test case

* fix

* fix test sync case

* fix test case

* fix test case

* fix case

* fix notify online no connect break,and add connect log

* add test case

* add test case

* fix test case

* fix format

* fix resource test case

* fix
  • Loading branch information
Synex-wh authored Aug 1, 2019
1 parent bf57100 commit 5ed7039
Show file tree
Hide file tree
Showing 67 changed files with 2,330 additions and 219 deletions.
4 changes: 3 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
language: java
sudo: false

dist: trusty

jdk:
- oraclejdk8

Expand All @@ -12,4 +14,4 @@ script:
- sh ./tools/check_format.sh

after_success:
- bash <(curl -s https://codecov.io/bash)
- bash <(curl -s https://codecov.io/bash)
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

<modelVersion>4.0.0</modelVersion>

<groupId>com.alipay.sofa</groupId>
<artifactId>registry-parent</artifactId>
<version>5.2.1-SNAPSHOT</version>
Expand Down Expand Up @@ -78,7 +78,7 @@
<jraft.version>1.2.4</jraft.version>
<metrics.version>4.0.2</metrics.version>
<commons-io.version>2.4</commons-io.version>
<jetty.version>[9.4.17.v20190418,)</jetty.version>
<jetty.version>[9.4.17.v20190418,9.4.19.v20190610]</jetty.version>
<main.user.dir>${user.dir}</main.user.dir>
<argLine>-Dnetwork_interface_denylist=docker0</argLine>
</properties>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,10 +213,6 @@ public void setRegistrationTimestamp(long registrationTimestamp) {
public String toString() {
final StringBuilder sb = new StringBuilder("DataNode{");
sb.append("ip=").append(getIp());
sb.append(", dataCenter='").append(dataCenter).append('\'');
sb.append(", regionId='").append(regionId).append('\'');
sb.append(", nodeStatus=").append(nodeStatus);
sb.append(", registrationTimestamp=").append(registrationTimestamp);
sb.append('}');
return sb.toString();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,10 +131,7 @@ public void setRegionId(String regionId) {
@Override
public String toString() {
final StringBuilder sb = new StringBuilder("MetaNode{");
sb.append("nodeUrl=").append(getIp());
sb.append(", dataCenter='").append(dataCenter).append('\'');
sb.append(", regionId='").append(regionId).append('\'');
sb.append(", nodeStatus=").append(nodeStatus);
sb.append("ip=").append(getIp());
sb.append('}');
return sb.toString();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,14 @@ public URL getNodeUrl() {
return nodeUrl;
}

/**
* get ip address from nodeUrl
* @return
*/
public String getIp() {
return nodeUrl == null ? "" : nodeUrl.getIpAddress();
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand Down Expand Up @@ -167,10 +175,7 @@ public void setNodeStatus(NodeStatus nodeStatus) {
@Override
public String toString() {
final StringBuilder sb = new StringBuilder("SessionNode{");
sb.append("nodeUrl=").append(nodeUrl);
sb.append(", regionId='").append(regionId).append('\'');
sb.append(", name='").append(name).append('\'');
sb.append(", nodeStatus=").append(nodeStatus);
sb.append("ip=").append(getIp());
sb.append('}');
return sb.toString();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ public void setAttributes(Map<String, String> attributes) {
Map<String, String> newAttributes = new HashMap<>();
if (attributes != null && !attributes.isEmpty()) {
attributes.forEach((key, value) -> newAttributes
.put(WordCache.getInstance().getWordCache(key), value));
.put(key, value));
}
this.attributes = newAttributes;
}
Expand Down Expand Up @@ -273,7 +273,7 @@ public String getClientId() {
* @param clientId value to be assigned to property clientId
*/
public void setClientId(String clientId) {
this.clientId = WordCache.getInstance().getWordCache(clientId);
this.clientId = clientId;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.alipay.sofa.registry.task.TaskClosure;

import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;

/**
Expand Down Expand Up @@ -75,6 +76,10 @@ public String getName() {

private long createTime;

private TaskClosure taskClosure;

private final String taskId;

private final Map<String, Object> attributes = new ConcurrentHashMap<String, Object>();

/**
Expand All @@ -84,10 +89,9 @@ public String getName() {
public TaskEvent(TaskType taskType) {
this.taskType = taskType;
this.createTime = System.currentTimeMillis();
taskId = UUID.randomUUID().toString();
}

private TaskClosure taskClosure;

/**
* constructor
* @param eventObj
Expand All @@ -96,6 +100,17 @@ public TaskEvent(TaskType taskType) {
public TaskEvent(Object eventObj, TaskType taskType) {
this.eventObj = eventObj;
this.taskType = taskType;
this.createTime = System.currentTimeMillis();
taskId = UUID.randomUUID().toString();
}

/**
* Getter method for property <tt>taskId</tt>.
*
* @return property value of taskId
*/
public String getTaskId() {
return taskId;
}

/**
Expand Down Expand Up @@ -195,7 +210,12 @@ public long getCreateTime() {

@Override
public String toString() {
return "TaskEvent{" + "eventObj=" + eventObj + ", sendTimeStamp=" + sendTimeStamp
+ ", attributes=" + attributes + '}';
final StringBuilder sb = new StringBuilder("TaskEvent{");
sb.append("eventObj=").append(eventObj);
sb.append(", sendTimeStamp=").append(sendTimeStamp);
sb.append(", attributes=").append(attributes);
sb.append(", taskId='").append(taskId).append('\'');
sb.append('}');
return sb.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@
import com.alipay.sofa.registry.server.data.datasync.sync.Scheduler;
import com.alipay.sofa.registry.server.data.datasync.sync.StoreServiceFactory;
import com.alipay.sofa.registry.server.data.datasync.sync.SyncDataServiceImpl;
import com.alipay.sofa.registry.server.data.event.AfterWorkingProcess;
import com.alipay.sofa.registry.server.data.event.EventCenter;
import com.alipay.sofa.registry.server.data.event.handler.AfterWorkingProcessHandler;
import com.alipay.sofa.registry.server.data.event.handler.DataServerChangeEventHandler;
import com.alipay.sofa.registry.server.data.event.handler.LocalDataServerChangeEventHandler;
import com.alipay.sofa.registry.server.data.event.handler.MetaServerChangeEventHandler;
Expand Down Expand Up @@ -76,6 +78,7 @@
import com.alipay.sofa.registry.util.PropertySplitter;
import org.glassfish.jersey.jackson.JacksonFeature;
import org.glassfish.jersey.server.ResourceConfig;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
Expand Down Expand Up @@ -449,4 +452,28 @@ public DataDigestResource dataDigestResource() {
return new DataDigestResource();
}
}

@Configuration
public static class AfterWorkingProcessConfiguration {

@Autowired
DisconnectEventHandler disconnectEventHandler;

@Autowired
AbstractClientHandler notifyDataSyncHandler;

@Bean(name = "afterWorkProcessors")
public List<AfterWorkingProcess> afterWorkingProcessors() {
List<AfterWorkingProcess> list = new ArrayList<>();
list.add(disconnectEventHandler);
list.add((NotifyDataSyncHandler) notifyDataSyncHandler);
return list;
}

@Bean
public AfterWorkingProcessHandler afterWorkingProcessHandler() {
return new AfterWorkingProcessHandler();
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,12 @@
import com.alipay.sofa.registry.remoting.ChannelHandler;
import com.alipay.sofa.registry.remoting.Server;
import com.alipay.sofa.registry.remoting.exchange.Exchange;
import com.alipay.sofa.registry.server.data.cache.CacheDigestTask;
import com.alipay.sofa.registry.server.data.datasync.sync.Scheduler;
import com.alipay.sofa.registry.server.data.event.EventCenter;
import com.alipay.sofa.registry.server.data.event.MetaServerChangeEvent;
import com.alipay.sofa.registry.server.data.event.StartTaskEvent;
import com.alipay.sofa.registry.server.data.event.StartTaskTypeEnum;
import com.alipay.sofa.registry.server.data.remoting.handler.AbstractServerHandler;
import com.alipay.sofa.registry.server.data.remoting.metaserver.IMetaServerService;
import org.glassfish.jersey.server.ResourceConfig;
Expand All @@ -38,10 +40,12 @@
import javax.ws.rs.Path;
import javax.ws.rs.ext.Provider;
import java.lang.annotation.Annotation;
import java.util.Arrays;
import java.util.Collection;
import java.util.Date;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;

/**
*
Expand Down Expand Up @@ -186,7 +190,13 @@ private void startScheduler() {
try {
if (schedulerStarted.compareAndSet(false, true)) {
syncDataScheduler.startScheduler();
eventCenter.post(StartTaskEvent.getInstance());
// start all startTask except renew task
eventCenter.post(new StartTaskEvent(
Arrays.stream(StartTaskTypeEnum.values()).filter(type->type != StartTaskTypeEnum.RENEW).collect(
Collectors.toSet())));

//start dump log
new CacheDigestTask().start();
}
} catch (Exception e) {
schedulerStarted.set(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,9 @@ public class DataServerConfig {

private int queueSize;

private int notifyIntervalMs;
private int notifyIntervalMs = 500;

private int clientOffDelayMs;
private int clientOffDelayMs = 1000;

private int notifyTempDataIntervalMs;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,15 @@
import com.alipay.sofa.registry.log.Logger;
import com.alipay.sofa.registry.log.LoggerFactory;
import com.alipay.sofa.registry.server.data.bootstrap.DataServerConfig;
import com.alipay.sofa.registry.server.data.event.handler.AfterWorkingProcessHandler;
import com.alipay.sofa.registry.server.data.node.DataNodeStatus;
import com.alipay.sofa.registry.server.data.util.LocalServerStatusEnum;
import com.google.common.collect.Sets;
import org.springframework.beans.factory.annotation.Autowired;

import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
Expand All @@ -53,8 +54,13 @@ public class DataServerCache {
@Autowired
private DataServerConfig dataServerConfig;

@Autowired
private AfterWorkingProcessHandler afterWorkingProcessHandler;

/** current dataServer list and version */
private volatile DataServerChangeItem dataServerChangeItem = new DataServerChangeItem();

/** new input dataServer list and version */
private volatile DataServerChangeItem newDataServerChangeItem = new DataServerChangeItem();

private final AtomicBoolean HAS_NOTIFY_ALL = new AtomicBoolean(
Expand Down Expand Up @@ -242,13 +248,14 @@ private void updateDataServerStatus() {
Map<String, LocalServerStatusEnum> map = nodeStatusMap.get(curVersion.get());
if (map != null) {
Set<String> ips = map.keySet();
if (!ips.containsAll(newDataServerChangeItem.getServerMap()
.get(dataServerConfig.getLocalDataCenter()).keySet())) {
LOGGER.info(
"nodeStatusMap not contains all push list,nodeStatusMap {} push {}",
nodeStatusMap,
newDataServerChangeItem.getServerMap()
.get(dataServerConfig.getLocalDataCenter()).keySet());
Set<String> itemIps = newDataServerChangeItem.getServerMap()
.get(dataServerConfig.getLocalDataCenter()).keySet();
if (!ips.containsAll(itemIps)) {

LOGGER
.info(
"nodeStatusMap not contains all push list,nodeStatusMap {},push {},diff {}",
nodeStatusMap, itemIps, Sets.difference(ips, itemIps));
return;
}
} else {
Expand All @@ -265,6 +272,9 @@ private void updateDataServerStatus() {

//after working status,must clean this map,because calculate backupTriad need add not working node,see LocalDataServerChangeEventHandler getToBeSyncMap
resetStatusMapToWorking();

//after working process
afterWorkingProcessHandler.afterWorkingProcess();
}
}

Expand Down Expand Up @@ -351,8 +361,12 @@ public Long getDataCenterNewVersion(String dataCenter) {
}
}

public BackupTriad calculateOldBackupTriad(String dataInfoId, String dataCenter,
DataServerConfig dataServerBootstrapConfig) {
/**
* calculate ConsistentHash base current data server list
* @param dataCenter
* @return
*/
public ConsistentHash<DataNode> calculateOldConsistentHash(String dataCenter) {
Map<String, Map<String, DataNode>> dataServerMap = dataServerChangeItem.getServerMap();
Map<String, DataNode> dataNodeMap = dataServerMap.get(dataCenter);

Expand All @@ -361,12 +375,9 @@ public BackupTriad calculateOldBackupTriad(String dataInfoId, String dataCenter,
Collection<DataNode> dataServerNodes = dataNodeMap.values();

ConsistentHash<DataNode> consistentHash = new ConsistentHash<>(
dataServerBootstrapConfig.getNumberOfReplicas(), dataServerNodes);

List<DataNode> list = consistentHash.getNUniqueNodesFor(dataInfoId,
dataServerBootstrapConfig.getStoreNodes());
dataServerConfig.getNumberOfReplicas(), dataServerNodes);

return new BackupTriad(dataInfoId, list);
return consistentHash;
} else {
LOGGER.warn("Calculate Old BackupTriad,old dataServer list is empty!");
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,10 @@
*/
public class DataChangeHandler implements InitializingBean {

private static final Logger LOGGER = LoggerFactory.getLogger(DataChangeHandler.class);
private static final Logger LOGGER = LoggerFactory
.getLogger(DataChangeHandler.class);

private static final Logger LOGGER_START = LoggerFactory.getLogger("DATA-START-LOGS");

@Autowired
private DataServerConfig dataServerBootstrapConfig;
Expand Down Expand Up @@ -74,7 +77,6 @@ public void start() {
for (int idx = 0; idx < queueCount; idx++) {
final DataChangeEventQueue dataChangeEventQueue = queues[idx];
final String name = dataChangeEventQueue.getName();
LOGGER.info("[DataChangeHandler] begin to notify datum in queue:{}", name);
executor.execute(() -> {
while (true) {
try {
Expand All @@ -85,7 +87,7 @@ public void start() {
}
}
});
LOGGER.info("[DataChangeHandler] notify datum in queue:{} success", name);
LOGGER_START.info("[DataChangeHandler] notify datum in queue:{} success", name);
}
}

Expand Down
Loading

0 comments on commit 5ed7039

Please sign in to comment.