Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PoC for experimental async support #369

Draft
wants to merge 4 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
/**
* Copyright (C) Gustav Karlsson
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.github.kagkarlsson.scheduler;

import com.github.kagkarlsson.scheduler.logging.ConfigurableLogger;
import com.github.kagkarlsson.scheduler.stats.StatsRegistry;
import com.github.kagkarlsson.scheduler.task.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Instant;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;

@SuppressWarnings("rawtypes")
class AsyncExecutePicked implements Runnable {
private static final Logger LOG = LoggerFactory.getLogger(AsyncExecutePicked.class);
private final Executor executor;
private final TaskRepository taskRepository;
private SchedulerClientEventListener earlyExecutionListener;
private final SchedulerClient schedulerClient;
private final StatsRegistry statsRegistry;
private final TaskResolver taskResolver;
private final SchedulerState schedulerState;
private final ConfigurableLogger failureLogger;
private final Clock clock;
private final Execution pickedExecution;

public AsyncExecutePicked(Executor executor, TaskRepository taskRepository, SchedulerClientEventListener earlyExecutionListener, SchedulerClient schedulerClient, StatsRegistry statsRegistry,
TaskResolver taskResolver, SchedulerState schedulerState, ConfigurableLogger failureLogger,
Clock clock, Execution pickedExecution) {
this.executor = executor;
this.taskRepository = taskRepository;
this.earlyExecutionListener = earlyExecutionListener;
this.schedulerClient = schedulerClient;
this.statsRegistry = statsRegistry;
this.taskResolver = taskResolver;
this.schedulerState = schedulerState;
this.failureLogger = failureLogger;
this.clock = clock;
this.pickedExecution = pickedExecution;
}

@Override
public void run() {
// FIXLATER: need to cleanup all the references back to scheduler fields
final UUID executionId = executor.addCurrentlyProcessing(new CurrentlyExecuting(pickedExecution, clock));
statsRegistry.register(StatsRegistry.CandidateStatsEvent.EXECUTED);
executePickedExecution(pickedExecution).whenComplete((c, ex) -> executor.removeCurrentlyProcessing(executionId));
}

private CompletableFuture<CompletionHandler> executePickedExecution(Execution execution) {
final Optional<Task> task = taskResolver.resolve(execution.taskInstance.getTaskName());
if (!task.isPresent()) {
LOG.error("Failed to find implementation for task with name '{}'. Should have been excluded in JdbcRepository.", execution.taskInstance.getTaskName());
statsRegistry.register(StatsRegistry.SchedulerStatsEvent.UNEXPECTED_ERROR);
return new CompletableFuture<>();
}
if (!(task.get() instanceof AsyncExecutionHandler)) {
throw new IllegalStateException("Should only ever try to execute async when task has an AsyncExecutionHandler");
}

AsyncExecutionHandler asyncHandler = (AsyncExecutionHandler) task.get();
Instant executionStarted = clock.now();
LOG.debug("Executing " + execution);
CompletableFuture<CompletionHandler> completableFuture = asyncHandler.executeAsync(execution.taskInstance, new AsyncExecutionContext(schedulerState, execution, schedulerClient, executor.getExecutorService()));

return completableFuture.whenCompleteAsync((completion, ex) -> {
if (ex != null) {
if (ex instanceof RuntimeException) {
failure(task.get(), execution, ex, executionStarted, "Unhandled exception");
statsRegistry.register(StatsRegistry.ExecutionStatsEvent.FAILED);
} else {
failure(task.get(), execution, ex, executionStarted, "Error");
statsRegistry.register(StatsRegistry.ExecutionStatsEvent.FAILED);
}
return;
}
LOG.debug("Execution done");
complete(completion, execution, executionStarted);
statsRegistry.register(StatsRegistry.ExecutionStatsEvent.COMPLETED);

}, executor.getExecutorService());
}

private void complete(CompletionHandler completion, Execution execution, Instant executionStarted) {
ExecutionComplete completeEvent = ExecutionComplete.success(execution, executionStarted, clock.now());
try {
completion.complete(completeEvent, new ExecutionOperations(taskRepository, earlyExecutionListener, execution));
statsRegistry.registerSingleCompletedExecution(completeEvent);
} catch (Throwable e) {
statsRegistry.register(StatsRegistry.SchedulerStatsEvent.COMPLETIONHANDLER_ERROR);
statsRegistry.register(StatsRegistry.SchedulerStatsEvent.UNEXPECTED_ERROR);
LOG.error("Failed while completing execution {}. Execution will likely remain scheduled and locked/picked. " +
"The execution should be detected as dead after a while, and handled according to the tasks DeadExecutionHandler.", execution, e);
}
}

private void failure(Task task, Execution execution, Throwable cause, Instant executionStarted, String errorMessagePrefix) {
String logMessage = errorMessagePrefix + " during execution of task with name '{}'. Treating as failure.";
failureLogger.log(logMessage, cause, task.getName());

ExecutionComplete completeEvent = ExecutionComplete.failure(execution, executionStarted, clock.now(), cause);
try {
task.getFailureHandler().onFailure(completeEvent, new ExecutionOperations(taskRepository, earlyExecutionListener, execution));
statsRegistry.registerSingleCompletedExecution(completeEvent);
} catch (Throwable e) {
statsRegistry.register(StatsRegistry.SchedulerStatsEvent.FAILUREHANDLER_ERROR);
statsRegistry.register(StatsRegistry.SchedulerStatsEvent.UNEXPECTED_ERROR);
LOG.error("Failed while completing execution {}. Execution will likely remain scheduled and locked/picked. " +
"The execution should be detected as dead after a while, and handled according to the tasks DeadExecutionHandler.", execution, e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ public void addToQueue(Runnable r, Runnable afterDone) {
try {
r.run();
} finally {
// For async, these callbacks are run before complete,
// thus allowing queue of ongoing executions to grow without bound
kagkarlsson marked this conversation as resolved.
Show resolved Hide resolved
currentlyInQueueOrProcessing.decrementAndGet();
// Run callbacks after decrementing currentlyInQueueOrProcessing
afterDone.run();
Expand Down Expand Up @@ -99,4 +101,10 @@ public void removeCurrentlyProcessing(UUID executionId) {
LOG.warn("Released execution was not found in collection of executions currently being processed. Should never happen. Execution-id: " + executionId);
}
}

public java.util.concurrent.Executor getExecutorService() {
return executorService;
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@

import com.github.kagkarlsson.scheduler.logging.ConfigurableLogger;
import com.github.kagkarlsson.scheduler.stats.StatsRegistry;
import com.github.kagkarlsson.scheduler.task.AsyncExecutionHandler;
import com.github.kagkarlsson.scheduler.task.Execution;
import com.github.kagkarlsson.scheduler.task.Task;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -64,6 +66,7 @@ public FetchCandidates(Executor executor, TaskRepository taskRepository, Schedul
upperLimit = pollingStrategyConfig.getUpperLimit(threadpoolSize);
}

@SuppressWarnings("rawtypes")
@Override
public void run() {
Instant now = clock.now();
Expand All @@ -84,9 +87,25 @@ public void run() {
executor.addToQueue(
() -> {
final Optional<Execution> candidate = new PickDue(e, newDueBatch).call();
candidate.ifPresent(picked -> new ExecutePicked(executor, taskRepository, earlyExecutionListener, schedulerClient, statsRegistry,
taskResolver, schedulerState, failureLogger,
clock, picked).run());

candidate.ifPresent(picked -> {

// Experimental support for async execution. Peek at Task to see if support async
// Unresolved tasks will be handled further in
final Optional<Task> task = taskResolver.resolve(picked.taskInstance.getTaskName());
if (task.isPresent() && task.get() instanceof AsyncExecutionHandler) {
// Experimental branch
new AsyncExecutePicked(executor, taskRepository, earlyExecutionListener,
schedulerClient, statsRegistry, taskResolver, schedulerState, failureLogger,
clock, picked).run();

} else {
// The default
new ExecutePicked(executor, taskRepository, earlyExecutionListener,
schedulerClient, statsRegistry, taskResolver, schedulerState, failureLogger,
clock, picked).run();
}
});
},
() -> {
newDueBatch.oneExecutionDone(triggerCheckForNewExecutions::run);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,15 @@

import com.github.kagkarlsson.scheduler.logging.ConfigurableLogger;
import com.github.kagkarlsson.scheduler.stats.StatsRegistry;
import com.github.kagkarlsson.scheduler.task.AsyncExecutionHandler;
import com.github.kagkarlsson.scheduler.task.Execution;
import com.github.kagkarlsson.scheduler.task.Task;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Instant;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;

public class LockAndFetchCandidates implements PollStrategy {
Expand Down Expand Up @@ -88,10 +91,23 @@ public void run() {
}

for (Execution picked : pickedExecutions) {
executor.addToQueue(
new ExecutePicked(executor, taskRepository, earlyExecutionListener, schedulerClient, statsRegistry,
taskResolver, schedulerState, failureLogger,
clock, picked),
executor.addToQueue(() -> {
// Experimental support for async execution. Peek at Task to see if support async
// Unresolved tasks will be handled further in
final Optional<Task> task = taskResolver.resolve(picked.taskInstance.getTaskName());
if (task.isPresent() && task.get() instanceof AsyncExecutionHandler) {
// Experimental branch
new AsyncExecutePicked(executor, taskRepository, earlyExecutionListener,
schedulerClient, statsRegistry, taskResolver, schedulerState, failureLogger,
clock, picked).run();

} else {
// The default
new ExecutePicked(executor, taskRepository, earlyExecutionListener,
schedulerClient, statsRegistry, taskResolver, schedulerState, failureLogger,
clock, picked).run();
}
},
() -> {
if (moreExecutionsInDatabase.get()
&& executor.getNumberInQueueOrProcessing() <= lowerLimit) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/**
* Copyright (C) Gustav Karlsson
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.github.kagkarlsson.scheduler.task;

import com.github.kagkarlsson.scheduler.SchedulerClient;
import com.github.kagkarlsson.scheduler.SchedulerState;

import java.util.concurrent.Executor;

public class AsyncExecutionContext extends ExecutionContext {
private final Executor executor;

public AsyncExecutionContext(SchedulerState schedulerState, Execution execution, SchedulerClient schedulerClient,
Executor executor) {
super(schedulerState, execution, schedulerClient);
this.executor = executor;
}

public Executor getAsyncExecutor() {
return executor;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/**
* Copyright (C) Gustav Karlsson
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.github.kagkarlsson.scheduler.task;

import java.util.concurrent.CompletableFuture;

/**
* Experimental
*/
public interface AsyncExecutionHandler<T> extends ExecutionHandler<T> {

CompletableFuture<CompletionHandler<T>> executeAsync(TaskInstance<T> taskInstance, AsyncExecutionContext executionContext);

@Override
default CompletionHandler<T> execute(TaskInstance<T> taskInstance, ExecutionContext executionContext) {
throw new UnsupportedOperationException("Standard blocking execute note supported in this handler.");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,7 @@
package com.github.kagkarlsson.scheduler.task;

public interface ExecutionHandler<T> {

CompletionHandler<T> execute(TaskInstance<T> taskInstance, ExecutionContext executionContext);

}
Loading