Skip to content
This repository has been archived by the owner on Apr 22, 2020. It is now read-only.

Commit

Permalink
return write duration
Browse files Browse the repository at this point in the history
  • Loading branch information
mneedham committed Feb 12, 2019
1 parent 1207fbb commit 7914cd5
Show file tree
Hide file tree
Showing 3 changed files with 120 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -94,51 +94,45 @@ public void export(Stream<SimilarityResult> similarityPairs, long batchSize) {

ArrayBlockingQueue<List<SimilarityResult>> outQueue = new ArrayBlockingQueue<>(queueSize);

ExecutorService executor = Executors.newFixedThreadPool(1);
Future<Integer> inQueueBatchCountFuture = executor.submit(() -> {
AtomicInteger inQueueBatchCount = new AtomicInteger(0);
stream.parallel().forEach(partition -> {
IntSet nodesInPartition = new IntHashSet();
for (DisjointSetStruct.InternalResult internalResult : partition) {
nodesInPartition.add(internalResult.internalNodeId);
}

List<SimilarityResult> inPartition = new ArrayList<>();
List<SimilarityResult> outPartition = new ArrayList<>();
AtomicInteger inQueueBatchCount = new AtomicInteger(0);
stream.parallel().forEach(partition -> {
IntSet nodesInPartition = new IntHashSet();
for (DisjointSetStruct.InternalResult internalResult : partition) {
nodesInPartition.add(internalResult.internalNodeId);
}

for (DisjointSetStruct.InternalResult result : partition) {
int nodeId = result.internalNodeId;
graph.forEachRelationship(nodeId, Direction.OUTGOING, (sourceNodeId, targetNodeId, relationId, weight) -> {
SimilarityResult similarityRelationship = new SimilarityResult(idMap.toOriginalNodeId(sourceNodeId), idMap.toOriginalNodeId(targetNodeId), -1, -1, -1, weight);
List<SimilarityResult> inPartition = new ArrayList<>();
List<SimilarityResult> outPartition = new ArrayList<>();

if (nodesInPartition.contains(targetNodeId)) {
inPartition.add(similarityRelationship);
} else {
outPartition.add(similarityRelationship);
}
for (DisjointSetStruct.InternalResult result : partition) {
int nodeId = result.internalNodeId;
graph.forEachRelationship(nodeId, Direction.OUTGOING, (sourceNodeId, targetNodeId, relationId, weight) -> {
SimilarityResult similarityRelationship = new SimilarityResult(idMap.toOriginalNodeId(sourceNodeId),
idMap.toOriginalNodeId(targetNodeId), -1, -1, -1, weight);

return false;
});
}
if (nodesInPartition.contains(targetNodeId)) {
inPartition.add(similarityRelationship);
} else {
outPartition.add(similarityRelationship);
}

if (!inPartition.isEmpty()) {
int inQueueBatches = writeSequential(inPartition.stream(), batchSize);
inQueueBatchCount.addAndGet(inQueueBatches);
}
return false;
});
}

if (!outPartition.isEmpty()) {
put(outQueue, outPartition);
}
});
return inQueueBatchCount.get();
if (!inPartition.isEmpty()) {
int inQueueBatches = writeSequential(inPartition.stream(), batchSize);
inQueueBatchCount.addAndGet(inQueueBatches);
}

if (!outPartition.isEmpty()) {
put(outQueue, outPartition);
}
});

Integer inQueueBatches = null;
try {
inQueueBatches = inQueueBatchCountFuture.get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}

int inQueueBatches = inQueueBatchCount.get();


int outQueueBatches = writeSequential(outQueue.stream().flatMap(Collection::stream), batchSize);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,7 @@
import org.HdrHistogram.DoubleHistogram;
import org.neo4j.graphalgo.core.ProcedureConfiguration;
import org.neo4j.graphalgo.core.ProcedureConstants;
import org.neo4j.graphalgo.core.utils.ParallelUtil;
import org.neo4j.graphalgo.core.utils.Pools;
import org.neo4j.graphalgo.core.utils.QueueBasedSpliterator;
import org.neo4j.graphalgo.core.utils.TerminationFlag;
import org.neo4j.graphalgo.core.utils.*;
import org.neo4j.graphalgo.impl.util.TopKConsumer;
import org.neo4j.graphdb.Result;
import org.neo4j.kernel.api.KernelTransaction;
Expand Down Expand Up @@ -77,7 +74,76 @@ Long getWriteBatchSize(ProcedureConfiguration configuration) {
return configuration.get("writeBatchSize", 10000L);
}


public class SimilarityResultBuilder {
protected long writeDuration = -1;
protected boolean write = false;
private int nodes;
private String writeRelationshipType;
private String writeProperty;
private AtomicLong similarityPairs;
private DoubleHistogram histogram;


public SimilarityResultBuilder withWriteDuration(long writeDuration) {
this.writeDuration = writeDuration;
return this;
}

public SimilarityResultBuilder withWrite(boolean write) {
this.write = write;
return this;
}

/**
* returns an AutoClosable which measures the time
* until it gets closed. Saves the duration as writeMillis
*
* @return
*/
public ProgressTimer timeWrite() {
return ProgressTimer.start(this::withWriteDuration);
}

public SimilaritySummaryResult build() {
return SimilaritySummaryResult.from(nodes, similarityPairs, writeRelationshipType, writeProperty, write, histogram, writeDuration);
}

public SimilarityResultBuilder nodes(int nodes) {
this.nodes = nodes;
return this;
}

public SimilarityResultBuilder write(boolean write) {
this.write = write;
return this;
}

public SimilarityResultBuilder writeRelationshipType(String writeRelationshipType) {
this.writeRelationshipType = writeRelationshipType;
return this;
}

public SimilarityResultBuilder writeProperty(String writeProperty) {
this.writeProperty = writeProperty;
return this;
}

public SimilarityResultBuilder similarityPairs(AtomicLong similarityPairs) {
this.similarityPairs = similarityPairs;
return this;
}

public SimilarityResultBuilder histogram(DoubleHistogram histogram) {
this.histogram = histogram;
return this;
}
}

Stream<SimilaritySummaryResult> writeAndAggregateResults(Stream<SimilarityResult> stream, int length, ProcedureConfiguration configuration, boolean write, String writeRelationshipType, String writeProperty, boolean writeParallel) {
SimilarityResultBuilder builder = new SimilarityResultBuilder();
builder.nodes(length).write(write).writeRelationshipType(writeRelationshipType).writeProperty(writeProperty);

long writeBatchSize = getWriteBatchSize(configuration);
AtomicLong similarityPairs = new AtomicLong();
DoubleHistogram histogram = new DoubleHistogram(5);
Expand All @@ -88,24 +154,31 @@ Stream<SimilaritySummaryResult> writeAndAggregateResults(Stream<SimilarityResult

if (write) {
if (writeParallel) {
ParallelSimilarityExporter parallelSimilarityExporter = new ParallelSimilarityExporter(api, log, writeRelationshipType, writeProperty, length);
parallelSimilarityExporter.export(stream.peek(recorder), writeBatchSize);
try (ProgressTimer timer = builder.timeWrite()) {
ParallelSimilarityExporter parallelSimilarityExporter = new ParallelSimilarityExporter(api, log, writeRelationshipType, writeProperty, length);
parallelSimilarityExporter.export(stream.peek(recorder), writeBatchSize);
}

} else {
SimilarityExporter similarityExporter = new SimilarityExporter(api, log, writeRelationshipType, writeProperty);
similarityExporter.export(stream.peek(recorder), writeBatchSize);
try (ProgressTimer timer = builder.timeWrite()) {
SimilarityExporter similarityExporter = new SimilarityExporter(api, log, writeRelationshipType, writeProperty);
similarityExporter.export(stream.peek(recorder), writeBatchSize);
}
}

} else {
stream.forEach(recorder);
}

return Stream.of(SimilaritySummaryResult.from(length, similarityPairs, writeRelationshipType, writeProperty, write, histogram));
builder.similarityPairs(similarityPairs).histogram(histogram);
return Stream.of(builder.build());

// return Stream.of(SimilaritySummaryResult.from(length, similarityPairs, writeRelationshipType, writeProperty, write, histogram));
}

Stream<SimilaritySummaryResult> emptyStream(String writeRelationshipType, String writeProperty) {
return Stream.of(SimilaritySummaryResult.from(0, new AtomicLong(0), writeRelationshipType,
writeProperty, false, new DoubleHistogram(5)));
writeProperty, false, new DoubleHistogram(5), -1));
}

Double getSimilarityCutoff(ProcedureConfiguration configuration) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

public class SimilaritySummaryResult {

public final long writeDuration;
public final long nodes;
public final long similarityPairs;
public final boolean write;
Expand All @@ -46,7 +47,7 @@ public SimilaritySummaryResult(long nodes, long similarityPairs,
boolean write, String writeRelationshipType, String writeProperty,
double min, double max, double mean, double stdDev,
double p25, double p50, double p75, double p90, double p95,
double p99, double p999, double p100) {
double p99, double p999, double p100, long writeDuration) {
this.nodes = nodes;
this.similarityPairs = similarityPairs;
this.write = write;
Expand All @@ -64,9 +65,10 @@ public SimilaritySummaryResult(long nodes, long similarityPairs,
this.p99 = p99;
this.p999 = p999;
this.p100 = p100;
this.writeDuration = writeDuration;
}

static SimilaritySummaryResult from(long length, AtomicLong similarityPairs, String writeRelationshipType, String writeProperty, boolean write, DoubleHistogram histogram) {
static SimilaritySummaryResult from(long length, AtomicLong similarityPairs, String writeRelationshipType, String writeProperty, boolean write, DoubleHistogram histogram, long writeDuration) {
return new SimilaritySummaryResult(
length,
similarityPairs.get(),
Expand All @@ -84,7 +86,8 @@ static SimilaritySummaryResult from(long length, AtomicLong similarityPairs, Str
histogram.getValueAtPercentile(95D),
histogram.getValueAtPercentile(99D),
histogram.getValueAtPercentile(99.9D),
histogram.getValueAtPercentile(100D)
histogram.getValueAtPercentile(100D),
writeDuration
);
}
}

0 comments on commit 7914cd5

Please sign in to comment.