Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Locality-aware chunk distribution #3102

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,244 @@
package com.fastasyncworldedit.core.queue.implementation;

import com.fastasyncworldedit.core.Fawe;
import com.fastasyncworldedit.core.extent.filter.block.ChunkFilterBlock;
import com.fastasyncworldedit.core.internal.exception.FaweException;
import com.fastasyncworldedit.core.queue.Filter;
import com.sk89q.worldedit.internal.util.LogManagerCompat;
import com.sk89q.worldedit.math.BlockVector3;
import com.sk89q.worldedit.regions.Region;
import org.apache.logging.log4j.Logger;

import java.util.Collection;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveAction;

class ApplyTask<F extends Filter> extends RecursiveAction implements Runnable {

private static final Logger LOGGER = LogManagerCompat.getLogger();

private static final int INITIAL_REGION_SHIFT = 5;
private static final int SHIFT_REDUCTION = 1;

private final CommonState<F> commonState;
private final ApplyTask<F> before;
private final int minChunkX;
private final int minChunkZ;
private final int maxChunkX;
private final int maxChunkZ;
// Note: shift == INITIAL_REGION_SHIFT means we are in the root node.
// compute() relies on that when triggering postProcess
private final int shift;

@Override
public void run() {
compute();
}

private record CommonState<F extends Filter>(
F originalFilter,
Region region,
ParallelQueueExtent parallelQueueExtent,
ConcurrentMap<Thread, ThreadState<F>> stateCache,
boolean full,
boolean[] faweExceptionReasonsUsed
) {

}

private static final class ThreadState<F extends Filter> {

private final SingleThreadQueueExtent queue;
private final F filter;
private ChunkFilterBlock block;

private ThreadState(SingleThreadQueueExtent queue, F filter) {
this.queue = queue;
this.filter = filter;
}

}

ApplyTask(
final Region region,
final F filter,
final ParallelQueueExtent parallelQueueExtent,
final boolean full, final boolean[] faweExceptionReasonsUsed
) {
this.commonState = new CommonState<>(
filter,
region.clone(), // clone only once, assuming the filter doesn't modify that clone
parallelQueueExtent,
new ConcurrentHashMap<>(),
full,
faweExceptionReasonsUsed
);
this.before = null;
final BlockVector3 minimumPoint = region.getMinimumPoint();
this.minChunkX = minimumPoint.x() >> 4;
this.minChunkZ = minimumPoint.z() >> 4;
final BlockVector3 maximumPoint = region.getMaximumPoint();
this.maxChunkX = maximumPoint.x() >> 4;
this.maxChunkZ = maximumPoint.z() >> 4;
this.shift = INITIAL_REGION_SHIFT;

}

private ApplyTask(
final CommonState<F> commonState,
final ApplyTask<F> before,
final int minChunkX,
final int maxChunkX,
final int minChunkZ,
final int maxChunkZ,
final int higherShift
) {
this.commonState = commonState;
this.minChunkX = minChunkX;
this.maxChunkX = maxChunkX;
this.minChunkZ = minChunkZ;
this.maxChunkZ = maxChunkZ;
this.before = before;
this.shift = Math.max(0, higherShift - SHIFT_REDUCTION);
}

@Override
protected void compute() {
if (this.minChunkX != this.maxChunkX || this.minChunkZ != this.maxChunkZ) {
ApplyTask<F> subtask = null;
int minRegionX = this.minChunkX >> this.shift;
int minRegionZ = this.minChunkZ >> this.shift;
int maxRegionX = this.maxChunkX >> this.shift;
int maxRegionZ = this.maxChunkZ >> this.shift;
// This task covers multiple regions. Create one subtask per region
for (int regionX = minRegionX; regionX <= maxRegionX; regionX++) {
for (int regionZ = minRegionZ; regionZ <= maxRegionZ; regionZ++) {
if (shouldProcessDirectly()) {
// assume we should do a bigger batch of work here - the other threads are busy for a while
processRegion(regionX, regionZ, this.shift);
continue;
}
if (this.shift == 0 && !this.commonState.region.containsChunk(regionX, regionZ)) {
// if shift == 0, region coords are chunk coords
continue; // chunks not intersecting with the region don't need a task
}

// creating more tasks will likely help parallelism as other threads aren't *that* busy
subtask = new ApplyTask<>(
this.commonState,
subtask,
regionX << this.shift,
((regionX + 1) << this.shift) - 1,
regionZ << this.shift,
((regionZ + 1) << this.shift) - 1,
this.shift
);
subtask.fork();
}
}
// try processing tasks in reverse order if not processed already, otherwise "wait" for completion
while (subtask != null) {
if (subtask.tryUnfork()) {
subtask.invoke();
} else {
subtask.join();
}
subtask = subtask.before;
}
} else {
// we reached a task for a single chunk, let's process it
processChunk(this.minChunkX, this.minChunkZ);
}
if (this.shift == INITIAL_REGION_SHIFT) {
onCompletion();
}
}

private boolean shouldProcessDirectly() {
return ForkJoinTask.getSurplusQueuedTaskCount() > Math.max(3, 1 << this.shift);
}

private void processRegion(int regionX, int regionZ, int shift) {
final ThreadState<F> state = getState();
this.commonState.parallelQueueExtent.enter(state.queue);
try {
for (int chunkX = regionX << shift; chunkX <= ((regionX + 1) << shift) - 1; chunkX++) {
for (int chunkZ = regionZ << shift; chunkZ <= ((regionZ + 1) << shift) - 1; chunkZ++) {
if (!this.commonState.region.containsChunk(chunkX, chunkZ)) {
continue; // chunks not intersecting with the region must not be processed
}
applyChunk(chunkX, chunkZ, state);
}
}
} finally {
this.commonState.parallelQueueExtent.exit();
}

}

@SuppressWarnings("unchecked")
private ThreadState<F> getState() {
return this.commonState.stateCache.computeIfAbsent(
Thread.currentThread(),
__ -> new ThreadState<>(
(SingleThreadQueueExtent) this.commonState.parallelQueueExtent.getNewQueue(),
(F) this.commonState.originalFilter.fork()
)
);
}

private void processChunk(int chunkX, int chunkZ) {
final ThreadState<F> state = getState();
this.commonState.parallelQueueExtent.enter(state.queue);
try {
applyChunk(chunkX, chunkZ, state);
} finally {
this.commonState.parallelQueueExtent.exit();
}
}

private void applyChunk(int chunkX, int chunkZ, ThreadState<F> state) {
try {
state.block = state.queue.apply(
state.block,
state.filter,
this.commonState.region,
chunkX,
chunkZ,
this.commonState.full
);
} catch (Throwable t) {
if (t instanceof FaweException faweException) {
Fawe.handleFaweException(this.commonState.faweExceptionReasonsUsed, faweException, LOGGER);
} else if (t.getCause() instanceof FaweException faweException) {
Fawe.handleFaweException(this.commonState.faweExceptionReasonsUsed, faweException, LOGGER);
} else {
throw t;
}
}
}

private void onCompletion() {
for (ForkJoinTask<?> task : flushQueues()) {
if (task.tryUnfork()) {
task.invoke();
} else {
task.join();
}
}
}

private ForkJoinTask<?>[] flushQueues() {
final Collection<ThreadState<F>> values = this.commonState.stateCache.values();
ForkJoinTask<?>[] tasks = new ForkJoinTask[values.size()];
int i = values.size() - 1;
for (final ThreadState<F> value : values) {
tasks[i] = ForkJoinTask.adapt(value.queue::flush).fork();
i--;
}
return tasks;
}

}
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package com.fastasyncworldedit.core.queue.implementation;

import com.fastasyncworldedit.core.Fawe;
import com.fastasyncworldedit.core.FaweCache;
import com.fastasyncworldedit.core.configuration.Settings;
import com.fastasyncworldedit.core.extent.NullExtent;
Expand Down Expand Up @@ -49,7 +48,6 @@
import java.util.List;
import java.util.Set;
import java.util.concurrent.ForkJoinTask;
import java.util.stream.IntStream;

public class ParallelQueueExtent extends PassthroughExtent {

Expand All @@ -65,8 +63,6 @@ public class ParallelQueueExtent extends PassthroughExtent {
private final boolean fastmode;
private final SideEffectSet sideEffectSet;
private int changes;
private int lastException = Integer.MIN_VALUE;
private int exceptionCount = 0;

public ParallelQueueExtent(QueueHandler handler, World world, boolean fastmode, @Nullable SideEffectSet sideEffectSet) {
super(handler.getQueue(world, new BatchProcessorHolder(), new BatchProcessorHolder()));
Expand Down Expand Up @@ -100,11 +96,11 @@ public static void setCurrentExtent(Extent extent) {
FaweThread.setCurrentExtent(extent);
}

private void enter(Extent extent) {
void enter(Extent extent) {
FaweThread.setCurrentExtent(extent);
}

private void exit() {
void exit() {
FaweThread.clearCurrentExtent();
}

Expand All @@ -129,13 +125,12 @@ public boolean cancel() {
}

@SuppressWarnings("rawtypes")
private IQueueExtent<IQueueChunk> getNewQueue() {
IQueueExtent<IQueueChunk> getNewQueue() {
SingleThreadQueueExtent queue = (SingleThreadQueueExtent) handler.getQueue(world, this.processor, this.postProcessor);
queue.setFastMode(fastmode);
queue.setSideEffectSet(sideEffectSet);
queue.setFaweExceptionArray(faweExceptionReasonsUsed);
queue.setTargetSize(Settings.settings().QUEUE.TARGET_SIZE * Settings.settings().QUEUE.THREAD_TARGET_SIZE_PERCENT / 100);
enter(queue);
return queue;
}

Expand All @@ -158,62 +153,16 @@ public <T extends Filter> T apply(Region region, T filter, boolean full) {
getExtent().flush();
filter.finish();
} else {
final ForkJoinTask[] tasks = IntStream.range(0, size).mapToObj(i -> handler.submit(() -> {
try {
final Filter newFilter = filter.fork();
final Region newRegion = region.clone();
// Create a chunk that we will reuse/reset for each operation
final SingleThreadQueueExtent queue = (SingleThreadQueueExtent) getNewQueue();
synchronized (queue) {
try {
ChunkFilterBlock block = null;
while (true) {
// Get the next chunk posWeakChunk
final int chunkX;
final int chunkZ;
synchronized (chunksIter) {
if (!chunksIter.hasNext()) {
break;
}
final BlockVector2 pos = chunksIter.next();
chunkX = pos.x();
chunkZ = pos.z();
}
block = queue.apply(block, newFilter, newRegion, chunkX, chunkZ, full);
}
queue.flush();
filter.finish();
} catch (Throwable t) {
if (t instanceof FaweException) {
Fawe.handleFaweException(faweExceptionReasonsUsed, (FaweException) t, LOGGER);
} else if (t.getCause() instanceof FaweException) {
Fawe.handleFaweException(faweExceptionReasonsUsed, (FaweException) t.getCause(), LOGGER);
} else {
throw t;
}
}
}
} catch (Throwable e) {
String message = e.getMessage();
int hash = message != null ? message.hashCode() : 0;
if (lastException != hash) {
lastException = hash;
exceptionCount = 0;
LOGGER.catching(e);
} else if (exceptionCount < Settings.settings().QUEUE.PARALLEL_THREADS) {
exceptionCount++;
LOGGER.warn(message);
}
} finally {
exit();
}
})).toArray(ForkJoinTask[]::new);
// Join filters
for (ForkJoinTask task : tasks) {
if (task != null) {
task.quietlyJoin();
}
ForkJoinTask<?> task = this.handler.submit(
new ApplyTask<>(region, filter, this, full, this.faweExceptionReasonsUsed)
);
// wait for task to finish
try {
task.join();
} catch (Throwable e) {
LOGGER.catching(e);
}
// Join filters
filter.join();
}
return filter;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

/**
Expand All @@ -50,7 +51,13 @@ public abstract class QueueHandler implements Trimable, Runnable {
Settings.settings().QUEUE.PARALLEL_THREADS,
new FaweForkJoinWorkerThreadFactory("FAWE Fork Join Pool Primary - %s"),
null,
false
false,
Settings.settings().QUEUE.PARALLEL_THREADS,
Settings.settings().QUEUE.PARALLEL_THREADS,
0,
pool -> true,
60,
TimeUnit.SECONDS
);

/**
Expand Down
Loading
Loading