Skip to content

Commit

Permalink
New snapshot (#81)
Browse files Browse the repository at this point in the history
* fix temp push

* update version 5.2.1-SNAPSHOT

* fix test case

* fix jetty version,and fix rest api for dataInfoIds

* fix hashcode test

* fix working to init bug

* fix start task log

* fix Watcher can't get providate data,retry and finally return new

* add data server list api

* add server list api

* remove log

* fix isssue 21

* add query by id function

* fix issue 22

* delay client off process and sync data process to working status

* fix data connet meta error

* fix inject NotifyDataSyncHandler

* fix start log

* add send sub log

* fix subscriber to send log

* bugfix: #27

* bugfix: #27

* feature: Add monitoring logs #29

* feature: Add monitoring logs #29
(1) bugfix CommonResponse
(2) format

* bugfix: During meta startup, leader may not register itself #30

* bugfix: Sometimes receive "Not leader" response from leader in OnStartingFollowing() #31

* temp add

* add renew request

* data snapshot module

* add calculate digest service

* fix word cache clientid

* data renew module

* data renew/expired module

* add renew datuem request

* add WriteDataAcceptor

* session renew/expired module

* 1. bugfix ReNewDatumHandler: getByConnectId -> getOwnByConnectId
2. reactor DatumCache from static to instance

* add blacklist wrapper and filter

* upgrade jraft version to 1.2.5

* blacklist ut

* add clientoff delay time

* bugfix: The timing of snapshot construction is not right

* rename: ReNew -> Renew

* fix blacklist test case

* rename: unpub -> unPub

* add threadSize and queueSize limit

* bugfix: revert SessionRegistry

* fix sub fetch retry all error,and reset datainfoid version

* fix client fast chain breakage data can not be cleaned up”

* (1) remove logback.xml DEBUG level;
(2) dataServerBootstrapConfig rename;
(3) print conf when startup

* update log

* fix update zero version,and fix log

* add clientOffDelayMs default value

* fix clientOffDelayMs

* Task(DatumSnapshot/Pub/UnPub) add retry strategy

* bugfix DataNodeServiceImpl: retryTimes

* (1)cancelDataTaskListener duplicate
(2)bugfix DataNodeServiceImpl and SessionRegistry

* refactor datum version

* add hessian black list

* bugfix: log "retryTimes"

* bugfix DatumLeaseManager:  Consider the situation of connectId lose after data restart; ownConnectId should calculate dynamically

* add jvm blacklist api

* fix file name

* some code optimization

* data:refactor snapshot

* fix jetty version

* bugfix DatumLeaseManager: If in a non-working state, cannot clean up because the renew request cannot be received at this time.

* remove SessionSerialFilterResource

* WriteDataProcessor add TaskEvent log; Cache print task update

* data bugfix: snapshot must notify session

* fix SubscriberPushEmptyTask default implement

* merge new

* fix protect

* 1. When the pub of connectId is 0, no clearance action is triggered.
2. Print map. size regularly
3. Delete the log: "ConnectId (% s) expired, lastRenewTime is% s, pub. size is 0"

* DataNodeExchanger: print but ignore if from renew module, cause renew request is too much

* reduce log of renew

* data bugfix: Data coverage is also allowed when versions are equal. Consistent with session design.

* DatumCache bugfix: Index coverage should be updated after pubMap update

* DatumSnapshotHandler: limit print; do not call dataChangeEventCenter.onChange if no diff

* bugfix unpub npe (pub maybe already clean by DatumLeaseManager);LIMITED_LIST_SIZE_FOR_PRINT change to 30

* some code refactor

* add code comment

* fix data working to init,and fix empty push version

* consider unpub is isWriteRequest, Reduce Snapshot frequency

* RefreshUpdateTime is at the top, otherwise multiple snapshot can be issued concurrently

* update config: reduce retryTimes, increase delayTime, the purpose is to reduce performance consumption

* put resume() in finally code block, avoid lock leak

* modify renewDatumWheelTaskDelay and datumTimeToLiveSec

* When session receives a connection and generates renew tasks, it randomly delays different times to avoid everyone launching renew at the same time.

* data: add executor for handler
session: bugfix snapshot
session: refactor wheelTimer of renew to add executor

* add get data log

* snapshot and lastUpdateTimestamp: Specific to dataServerIP

* 1. DataServer: RenewDatumHandler must return GenericResponse but not CommonResponse, or else session will class cast exception
2. No need to update timestamp after renew
3. snapshot: Need to specify DataServerIP

* add logs

* 1. dataServer: reduce log of snapshotHandler
2. update logs

* dataServer: renew logic should delay for some time after status is WORKING, cause Data is processed asynchronously after synchronization from other DataServer

* bugfix bean; update log

* ignore renew request log

* fix UT

* fix .travis.yml

* fix version 5.3.0-SNAPSHOT

* fix online notify connect error

* fix push confirm error,and fix datum update version,pub threadpool config,add accesslimit service

* fix push confirm error,and fix datum update version,pub threadpool config,add accesslimit service (#45)

* add switch renew and expire

* implement renew enable/disable switch

* fix data client exechange log

* fix datum fetch connect error

* bugfix CacheService: set version zero when first sub and get datum error

* fix clean task for fetch

* bugfix DatumCache: Forget to clean up the index in datumCache.putSnapshot

* Session&Data increase WordCache use

* code optimize

* WordCache: registerId do not add WordCache

* fix fetch datum word cache

* fix NotifyFetchDatumHandler npe

* fix test case time

* fix test cast

* fix test case

* fix tast case

* fix ut case: StopPushDataSwitchTest

* ut case:renew module

* fix ut case:TempPublisherTest

* fix version,and merge new

* bugfix ut case: increase sleep time

* fix ut case:RenewTest

* fix version and fix callback executor,fix log error

* fix ut case:RenewTest format

* fix pom version

* fix ut case:do not run parallelly

* refactor providerdata process

* Memory optimization:Datum.processDatum

* add session notify test

* copy from mybank:
1. Update Subscriber: support for push context
2. increase queueSize of checkPushExecutor
3. fix the isolation function of Gzone and Rzone

* Modify the deny policy of accessDataExecutor of SessionServer

* remove useless code

* fix call back

* fix meta methodhandle cache

* fix push confirm success

* Change the communication between session and data to multi connection

* resolve compile error

* fix processor

* BoltClient: the creation of ConnectionEventAdapter should be inheritable

* fix currentTimeMillis product from source

* add client Invalid check task

* use multiple RpcClient instances instead of one RpcClient with multiple connections,and start a heartbeat thread to ensure connection pool because bolt does not maintain the number of connection pools

* refactor TaskListener and use map instead of list in DefaultTaskListenerManager; refactor getSingleTaskDispatcher()

* DataChangeRequestHandler:optimize performance

* refactor: Heartbeat between session and data

* fix: Synex-wh#20 (review)

* update

* BoltClient use just one RpcClient;
remove heartbeat between session and data;

* SyncDataCallback reduce ThreadSize for saving cpu

* reduce NOTIFY_SESSION_CALLBACK_EXECUTOR threadSize

* fix version in DataChangeFetchTask

* 1. filter out the unPubs of datum when first put, Otherwise, "syncData" or "fetchData" get Datum may contains unPubs, which will result something error
2. add jul-to-slf4j for some lib which use jul log, e.g. hessian

* fix meta mem

* fix test case

* fix temp case

* fix syncConfigRetryInterval 60s

* fix format

Co-authored-by: wukezhu <[email protected]>
  • Loading branch information
Synex-wh and atellwu authored Feb 3, 2020
1 parent 2b9f708 commit da14c5c
Show file tree
Hide file tree
Showing 127 changed files with 2,813 additions and 1,786 deletions.
9 changes: 7 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

<modelVersion>4.0.0</modelVersion>

<groupId>com.alipay.sofa</groupId>
<artifactId>registry-parent</artifactId>
<version>5.4.0-SNAPSHOT</version>
Expand Down Expand Up @@ -75,7 +75,7 @@
<lookout.version>1.5.2</lookout.version>
<mockito.version>1.10.19</mockito.version>
<powermock.version>1.6.6</powermock.version>
<jraft.version>1.2.5</jraft.version>
<jraft.version>1.2.7.beta1</jraft.version>
<metrics.version>4.0.2</metrics.version>
<commons-io.version>2.4</commons-io.version>
<jetty.version>[9.4.17.v20190418,9.4.19.v20190610]</jetty.version>
Expand All @@ -85,6 +85,11 @@

<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>jul-to-slf4j</artifactId>
<version>1.7.9</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public Datum() {
* @param dataCenter
*/
public Datum(String dataInfoId, String dataCenter) {
this.dataInfoId = dataInfoId;
this.dataInfoId = WordCache.getInstance().getWordCache(dataInfoId);
this.dataCenter = WordCache.getInstance().getWordCache(dataCenter);
updateVersion();
}
Expand Down Expand Up @@ -244,7 +244,7 @@ public void setContainsUnPub(boolean containsUnPub) {
this.containsUnPub = containsUnPub;
}

public static Datum processDatum(Datum datum) {
public static Datum internDatum(Datum datum) {
datum.setDataCenter(datum.getDataCenter());
datum.setDataInfoId(datum.getDataInfoId());
datum.setDataId(datum.getDataId());
Expand All @@ -253,7 +253,13 @@ public static Datum processDatum(Datum datum) {

Map<String, Publisher> pubMap = datum.getPubMap();
if (pubMap != null && !pubMap.isEmpty()) {
pubMap.forEach((registerId, publisher) -> Publisher.processPublisher(publisher));
pubMap.forEach((registerId, publisher) -> {
// let registerId == pub.getRegisterId in every <registerId, pub>, for reducing old gen memory
// because this Datum is put into Memory directly, by DatumCache.coverDatum
publisher.setRegisterId(registerId);
// change publisher word cache
Publisher.internPublisher(publisher);
});
}

return datum;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@
*/
package com.alipay.sofa.registry.common.model.store;

import com.fasterxml.jackson.annotation.JsonIgnore;

import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;

import com.fasterxml.jackson.annotation.JsonIgnore;

/**
*
* @author shangyu.wh
Expand Down Expand Up @@ -136,7 +136,7 @@ public String getProcessId() {
* @param processId value to be assigned to property processId
*/
public void setProcessId(String processId) {
this.processId = processId;
this.processId = WordCache.getInstance().getWordCache(processId);
}

/**
Expand Down Expand Up @@ -273,7 +273,7 @@ public String getClientId() {
* @param clientId value to be assigned to property clientId
*/
public void setClientId(String clientId) {
this.clientId = clientId;
this.clientId = WordCache.getInstance().getWordCache(clientId);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@
*/
package com.alipay.sofa.registry.common.model.store;

import java.util.List;

import com.alipay.sofa.registry.common.model.PublishType;
import com.alipay.sofa.registry.common.model.ServerDataBox;
import com.fasterxml.jackson.annotation.JsonIgnore;

import java.util.List;

/**
*
* @author shangyu.wh
Expand Down Expand Up @@ -90,8 +90,8 @@ protected String getOtherInfo() {
* @param publisher
* @return
*/
public static Publisher processPublisher(Publisher publisher) {

public static Publisher internPublisher(Publisher publisher) {
publisher.setRegisterId(publisher.getRegisterId());
publisher.setDataInfoId(publisher.getDataInfoId());
publisher.setInstanceId(publisher.getInstanceId());
publisher.setGroup(publisher.getGroup());
Expand All @@ -101,13 +101,6 @@ public static Publisher processPublisher(Publisher publisher) {
publisher.setProcessId(publisher.getProcessId());
publisher.setAppName(publisher.getAppName());

if (publisher.getSourceAddress() != null) {
publisher.setSourceAddress(new URL(publisher.getSourceAddress().getIpAddress(),
publisher.getSourceAddress().getPort()));
}

publisher.setAttributes(publisher.getAttributes());

return publisher;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,14 @@
*/
package com.alipay.sofa.registry.common.model.store;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import com.alipay.sofa.registry.common.model.ElementType;
import com.alipay.sofa.registry.common.model.constants.ValueConstants;
import com.alipay.sofa.registry.core.model.ScopeEnum;
import com.fasterxml.jackson.annotation.JsonIgnore;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
*
* @author shangyu.wh
Expand All @@ -31,16 +32,16 @@
public class Subscriber extends BaseInfo {

/** UID */
private static final long serialVersionUID = 98433360274932292L;
private static final long serialVersionUID = 98433360274932292L;
/** */
private ScopeEnum scope;
private ScopeEnum scope;
/** */
private ElementType elementType;
private ElementType elementType;

/**
* all dataCenter push dataInfo version
* last push context
*/
private Map<String/*dataCenter*/, Long> lastPushVersions = new ConcurrentHashMap<>();
private Map<String/*dataCenter*/, PushContext> lastPushContexts = new ConcurrentHashMap<>();

/**
* Getter method for property <tt>scope</tt>.
Expand Down Expand Up @@ -71,7 +72,11 @@ public ElementType getElementType() {
*/
public boolean checkVersion(String dataCenter, Long version) {

Long oldVersion = lastPushVersions.get(dataCenter);
PushContext lastPushContext = lastPushContexts.get(dataCenter);
if (lastPushContext == null) {
return version != null;
}
Long oldVersion = lastPushContext.pushVersion;
if (oldVersion == null) {
return version != null;
} else {
Expand All @@ -88,15 +93,26 @@ public boolean checkVersion(String dataCenter, Long version) {
* @return
*/
public void checkAndUpdateVersion(String dataCenter, Long version) {
checkAndUpdateVersion(dataCenter, version, -1);
}

/**
* check version input greater or equal to current version
* @param version
* @return
*/
public void checkAndUpdateVersion(String dataCenter, Long version, int pubCount) {

while (true) {
Long oldVersion = lastPushVersions.putIfAbsent(dataCenter, version);
PushContext pushContext = new PushContext(version, pubCount);
PushContext oldPushContext = lastPushContexts.putIfAbsent(dataCenter, pushContext);
// Add firstly
if (oldVersion == null) {
if (oldPushContext == null) {
break;
} else {
if (version > oldVersion) {
if (lastPushVersions.replace(dataCenter, oldVersion, version)) {
if (oldPushContext.pushVersion == null
|| (pushContext.pushVersion != null && pushContext.pushVersion > oldPushContext.pushVersion)) {
if (lastPushContexts.replace(dataCenter, oldPushContext, pushContext)) {
break;
}
} else {
Expand All @@ -106,6 +122,23 @@ public void checkAndUpdateVersion(String dataCenter, Long version) {
}
}

/**
* If the pushed data is empty, check the last push, for avoid continuous empty datum push
*/
public boolean allowPush(String dataCenter, int pubCount) {
boolean allowPush = true;
// condition of no push:
// 1. last push count is 0 and this time is also 0
// 2. last push is a valid push (version > 1)
if (pubCount == 0) {
PushContext pushContext = lastPushContexts.get(dataCenter);
allowPush = !(pushContext != null && pushContext.pushPubCount == 0
//last push is a valid push
&& pushContext.pushVersion != null && pushContext.pushVersion > ValueConstants.DEFAULT_NO_DATUM_VERSION);
}
return allowPush;
}

/**
* Setter method for property <tt>elementType</tt>.
*
Expand All @@ -126,28 +159,10 @@ protected String getOtherInfo() {
final StringBuilder sb = new StringBuilder("scope=");
sb.append(scope).append(",");
sb.append("elementType=").append(elementType).append(",");
sb.append("lastPushVersion=").append(lastPushVersions);
sb.append("pushVersion=").append(lastPushContexts);
return sb.toString();
}

/**
* Getter method for property <tt>lastPushVersions</tt>.
*
* @return property value of lastPushVersions
*/
public Map<String, Long> getLastPushVersions() {
return lastPushVersions;
}

/**
* Setter method for property <tt>lastPushVersions </tt>.
*
* @param lastPushVersions value to be assigned to property lastPushVersions
*/
public void setLastPushVersions(Map<String, Long> lastPushVersions) {
this.lastPushVersions = lastPushVersions;
}

/**
* @see Object#toString()
*/
Expand All @@ -156,9 +171,57 @@ public String toString() {
final StringBuilder sb = new StringBuilder("Subscriber{");
sb.append("scope=").append(scope);
sb.append(", elementType=").append(elementType);
sb.append(", lastPushVersions=").append(lastPushVersions);
sb.append(", lastPushContexts=").append(lastPushContexts);
sb.append(", super=").append(super.toString());
sb.append('}');
return sb.toString();
}

/**
* change subscriber word cache
* @param subscriber
* @return
*/
public static Subscriber internSubscriber(Subscriber subscriber) {
subscriber.setRegisterId(subscriber.getRegisterId());
subscriber.setDataInfoId(subscriber.getDataInfoId());
subscriber.setInstanceId(subscriber.getInstanceId());
subscriber.setGroup(subscriber.getGroup());
subscriber.setDataId(subscriber.getDataId());
subscriber.setClientId(subscriber.getClientId());
subscriber.setCell(subscriber.getCell());
subscriber.setProcessId(subscriber.getProcessId());
subscriber.setAppName(subscriber.getAppName());

return subscriber;
}

static class PushContext {
/**
* last pushed dataInfo version
*/
private Long pushVersion;

/**
* push pushed dataInfo pubCount
*/
private int pushPubCount;

public PushContext(Long pushVersion, int pushPubCount) {
this.pushVersion = pushVersion;
this.pushPubCount = pushPubCount;
}

/**
* @see Object#toString()
*/
@Override
public String toString() {
final StringBuilder sb = new StringBuilder("PushContext{");
sb.append("pushVersion=").append(pushVersion);
sb.append(", pushPubCount=").append(pushPubCount);
sb.append('}');
return sb.toString();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,23 @@ public class Watcher extends BaseInfo {
public DataType getDataType() {
return DataType.WATCHER;
}

/**
* change watcher word cache
* @param watcher
* @return
*/
public static Watcher internWatcher(Watcher watcher) {
watcher.setRegisterId(watcher.getRegisterId());
watcher.setDataInfoId(watcher.getDataInfoId());
watcher.setInstanceId(watcher.getInstanceId());
watcher.setGroup(watcher.getGroup());
watcher.setDataId(watcher.getDataId());
watcher.setClientId(watcher.getClientId());
watcher.setCell(watcher.getCell());
watcher.setProcessId(watcher.getProcessId());
watcher.setAppName(watcher.getAppName());

return watcher;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@
*/
package com.alipay.sofa.registry.common.model.store;

import java.util.concurrent.ConcurrentHashMap;
import com.google.common.collect.Interner;
import com.google.common.collect.Interners;

/**
*
Expand Down Expand Up @@ -45,7 +46,7 @@ public static WordCache getInstance() {
/**
* word cache map
*/
private ConcurrentHashMap<String, String> map = new ConcurrentHashMap<>();
private Interner<String> interners = Interners.newWeakInterner();

/**
*
Expand All @@ -56,8 +57,7 @@ public String getWordCache(String s) {
if (s == null) {
return null;
}
String oldValue = map.putIfAbsent(s, s);
return oldValue == null ? s : oldValue;
return interners.intern(s);
}

}
Loading

0 comments on commit da14c5c

Please sign in to comment.