Skip to content

Commit e5d28e1

Browse files
Managed guest code execution by dedicated thread pool in `ThreadManager' (#12613)
1 parent e260d04 commit e5d28e1

File tree

15 files changed

+333
-343
lines changed

15 files changed

+333
-343
lines changed

engine/runtime-compiler/src/main/java/org/enso/compiler/context/CompilerContext.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import java.io.PrintStream;
55
import java.net.URI;
66
import java.util.List;
7+
import java.util.concurrent.ExecutorService;
78
import java.util.concurrent.Future;
89
import java.util.function.Consumer;
910
import java.util.logging.Level;
@@ -68,9 +69,7 @@ RuntimeException formatDiagnostic(
6869
// threads
6970
boolean isCreateThreadAllowed();
7071

71-
Thread createThread(Runnable r);
72-
73-
Thread createSystemThread(Runnable r);
72+
ExecutorService newParsingPool();
7473

7574
// Truffle related
7675

engine/runtime-compiler/src/main/scala/org/enso/compiler/Compiler.scala

Lines changed: 1 addition & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,6 @@ import java.util.concurrent.{
4545
CompletableFuture,
4646
ExecutorService,
4747
Future,
48-
LinkedBlockingDeque,
49-
ThreadPoolExecutor,
5048
TimeUnit
5149
}
5250
import java.util.logging.Level
@@ -77,16 +75,7 @@ class Compiler(
7775

7876
/** The thread pool that handles parsing of modules. */
7977
private val pool: ExecutorService = if (config.parallelParsing) {
80-
new ThreadPoolExecutor(
81-
Compiler.startingThreadCount,
82-
Compiler.maximumThreadCount,
83-
Compiler.threadKeepalive,
84-
TimeUnit.SECONDS,
85-
new LinkedBlockingDeque[Runnable](),
86-
(runnable: Runnable) => {
87-
context.createThread(runnable)
88-
}
89-
)
78+
context.newParsingPool()
9079
} else null
9180

9281
/** Java accessor */
@@ -1166,7 +1155,6 @@ class Compiler(
11661155
}
11671156

11681157
pool.shutdownNow()
1169-
Thread.sleep(100)
11701158
} else {
11711159
pool.shutdownNow()
11721160
}

engine/runtime-instrument-common/src/main/java/org/enso/interpreter/service/ExecutionService.java

Lines changed: 133 additions & 100 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,10 @@
2121
import java.util.Objects;
2222
import java.util.Optional;
2323
import java.util.UUID;
24+
import java.util.concurrent.ExecutionException;
25+
import java.util.concurrent.Future;
2426
import java.util.function.Consumer;
27+
import java.util.function.Supplier;
2528
import org.enso.common.MethodNames;
2629
import org.enso.compiler.suggestions.SimpleUpdate;
2730
import org.enso.interpreter.instrument.Endpoint;
@@ -68,7 +71,6 @@
6871
* language.
6972
*/
7073
public final class ExecutionService {
71-
7274
private static final String MAIN_METHOD = "main";
7375
private final EnsoContext context;
7476
private final Optional<IdExecutionService> idExecutionInstrument;
@@ -173,37 +175,44 @@ public void execute(
173175
SourceNotFoundException,
174176
UnsupportedMessageException,
175177
UnsupportedTypeException {
176-
SourceSection src = call.getFunction().getSourceSection();
177-
if (src == null) {
178-
throw new SourceNotFoundException(call.getFunction().getName());
179-
}
180-
181-
var callbacks =
182-
new ExecutionCallbacks(
183-
visualizationHolder,
184-
nextExecutionItem,
185-
cache,
186-
methodCallsCache,
187-
syncState,
188-
expressionExecutionState,
189-
onCachedCallback,
190-
onComputedCallback,
191-
funCallCallback,
192-
onExecutedVisualizationCallback,
193-
this.context.isProgressReportEnabled() ? onComputedCallback : null);
194-
Optional<EventBinding<ExecutionEventNodeFactory>> eventNodeFactory =
195-
idExecutionInstrument.map(
196-
service ->
197-
service.bind(module, call.getFunction().getCallTarget(), callbacks, this.timer));
198-
199-
Object p = context.getThreadManager().enter();
200-
try {
201-
var callFn = Function.fullyApplied(execute.getCallTarget(), substituteMissingArguments(call));
202-
RunStateNode.getUncached().execute(null, cacheKey(), cache, callFn);
203-
} finally {
204-
context.getThreadManager().leave(p);
205-
eventNodeFactory.ifPresent(EventBinding::dispose);
206-
}
178+
var pending =
179+
submitExecution(
180+
() -> {
181+
SourceSection src = call.getFunction().getSourceSection();
182+
if (src == null) {
183+
throw new SourceNotFoundException(call.getFunction().getName());
184+
}
185+
186+
var callbacks =
187+
new ExecutionCallbacks(
188+
visualizationHolder,
189+
nextExecutionItem,
190+
cache,
191+
methodCallsCache,
192+
syncState,
193+
expressionExecutionState,
194+
onCachedCallback,
195+
onComputedCallback,
196+
funCallCallback,
197+
onExecutedVisualizationCallback,
198+
this.context.isProgressReportEnabled() ? onComputedCallback : null);
199+
Optional<EventBinding<ExecutionEventNodeFactory>> eventNodeFactory =
200+
idExecutionInstrument.map(
201+
service ->
202+
service.bind(
203+
module, call.getFunction().getCallTarget(), callbacks, this.timer));
204+
205+
try {
206+
var callFn =
207+
Function.fullyApplied(
208+
execute.getCallTarget(), substituteMissingArguments(call));
209+
RunStateNode.getUncached().execute(null, cacheKey(), cache, callFn);
210+
} finally {
211+
eventNodeFactory.ifPresent(EventBinding::dispose);
212+
}
213+
return null;
214+
});
215+
resultOf(pending);
207216
}
208217

209218
/**
@@ -292,12 +301,12 @@ private FunctionCallInstrumentationNode.FunctionCall substituteMissingArguments(
292301
* @return a result of evaluation
293302
*/
294303
public Object evaluateExpression(Module module, String expression) {
295-
Object p = context.getThreadManager().enter();
296-
try {
297-
return invoke.getCallTarget().call(module, expression);
298-
} finally {
299-
context.getThreadManager().leave(p);
300-
}
304+
var future =
305+
submitExecution(
306+
() -> {
307+
return invoke.getCallTarget().call(module, expression);
308+
});
309+
return resultOf(future);
301310
}
302311

303312
/**
@@ -324,14 +333,15 @@ public String toDisplayString(Object receiver) {
324333
* @return the result of calling the function
325334
*/
326335
public Object callFunction(Object fn, Object argument) {
327-
Object p = context.getThreadManager().enter();
328-
try {
329-
var callArgs =
330-
Function.ArgumentsHelper.buildArguments(null, new Object[] {fn, new Object[] {argument}});
331-
return call.getCallTarget().call(callArgs);
332-
} finally {
333-
context.getThreadManager().leave(p);
334-
}
336+
var future =
337+
submitExecution(
338+
() -> {
339+
var callArgs =
340+
Function.ArgumentsHelper.buildArguments(
341+
null, new Object[] {fn, new Object[] {argument}});
342+
return call.getCallTarget().call(callArgs);
343+
});
344+
return resultOf(future);
335345
}
336346

337347
/**
@@ -341,7 +351,7 @@ public Object callFunction(Object fn, Object argument) {
341351
* @param cache the runtime cache
342352
* @param executionCache cache with values provided by main execution
343353
* @param module the module providing scope for the function
344-
* @param function the function object
354+
* @param fn the function object
345355
* @param arguments the sequence of arguments applied to the function
346356
* @return the result of calling the function
347357
*/
@@ -352,56 +362,63 @@ public Object callFunctionWithInstrument(
352362
Module module,
353363
Object function,
354364
Object... arguments) {
355-
UUID nextExecutionItem = null;
356-
CallTarget entryCallTarget =
357-
(function instanceof Function) ? ((Function) function).getCallTarget() : null;
358-
MethodCallsCache methodCallsCache = new MethodCallsCache();
359-
UpdatesSynchronizationState syncState = new UpdatesSynchronizationState();
360-
Consumer<ExpressionCall> funCallCallback = (value) -> {};
361-
Consumer<ExpressionValue> onComputedCallback =
362-
(value) -> context.getLogger().finest("_ON_COMPUTED " + value.getExpressionId());
363-
Consumer<ExpressionValue> onCachedCallback =
364-
(value) -> context.getLogger().finest("_ON_CACHED_VALUE " + value.getExpressionId());
365-
Consumer<ExecutedVisualization> onExecutedVisualizationCallback = (value) -> {};
366-
ExpressionExecutionState expressionExecutionState = new ExpressionExecutionState();
367-
Consumer<ExpressionValue> onProgressCallback =
368-
(value) -> context.getLogger().finest("_ON_PROGRESS " + value.getExpressionId());
369-
370-
var callbacks =
371-
new ExecutionCallbacks(
372-
visualizationHolder,
373-
nextExecutionItem,
374-
cache,
375-
methodCallsCache,
376-
syncState,
377-
expressionExecutionState,
378-
onCachedCallback,
379-
onComputedCallback,
380-
funCallCallback,
381-
onExecutedVisualizationCallback,
382-
onProgressCallback);
383-
Optional<EventBinding<ExecutionEventNodeFactory>> eventNodeFactory =
384-
idExecutionInstrument.map(
385-
service -> service.bind(module, entryCallTarget, callbacks, this.timer));
386-
var ret = new Object[1];
387-
Object p = context.getThreadManager().enter();
388-
try {
389-
State state;
390-
if (function instanceof FunctionCallInstrumentationNode.FunctionCall fnCall) {
391-
state = fnCall.getState();
392-
} else {
393-
var fn = (Function) function;
394-
state = State.create(context);
395-
function = new FunctionCallInstrumentationNode.FunctionCall(fn, state, new Object[0]);
396-
}
397-
var callArgs = new Object[] {function, arguments};
398-
var callFn = Function.fullyApplied(call.getCallTarget(), callArgs);
399-
ret[0] = RunStateNode.getUncached().execute(null, cacheKey(), executionCache, callFn);
400-
} finally {
401-
context.getThreadManager().leave(p);
402-
eventNodeFactory.ifPresent(EventBinding::dispose);
403-
}
404-
return ret[0];
365+
366+
var future =
367+
submitExecution(
368+
() -> {
369+
var fn = function;
370+
UUID nextExecutionItem = null;
371+
CallTarget entryCallTarget =
372+
(fn instanceof Function) ? ((Function) fn).getCallTarget() : null;
373+
MethodCallsCache methodCallsCache = new MethodCallsCache();
374+
UpdatesSynchronizationState syncState = new UpdatesSynchronizationState();
375+
Consumer<ExpressionCall> funCallCallback = (value) -> {};
376+
Consumer<ExpressionValue> onComputedCallback =
377+
(value) -> context.getLogger().finest("_ON_COMPUTED " + value.getExpressionId());
378+
Consumer<ExpressionValue> onCachedCallback =
379+
(value) ->
380+
context.getLogger().finest("_ON_CACHED_VALUE " + value.getExpressionId());
381+
Consumer<ExecutedVisualization> onExecutedVisualizationCallback = (value) -> {};
382+
ExpressionExecutionState expressionExecutionState = new ExpressionExecutionState();
383+
Consumer<ExpressionValue> onProgressCallback =
384+
(value) -> context.getLogger().finest("_ON_PROGRESS " + value.getExpressionId());
385+
386+
var callbacks =
387+
new ExecutionCallbacks(
388+
visualizationHolder,
389+
nextExecutionItem,
390+
cache,
391+
methodCallsCache,
392+
syncState,
393+
expressionExecutionState,
394+
onCachedCallback,
395+
onComputedCallback,
396+
funCallCallback,
397+
onExecutedVisualizationCallback,
398+
onProgressCallback);
399+
Optional<EventBinding<ExecutionEventNodeFactory>> eventNodeFactory =
400+
idExecutionInstrument.map(
401+
service -> service.bind(module, entryCallTarget, callbacks, this.timer));
402+
var ret = new Object[1];
403+
try {
404+
State state;
405+
if (fn instanceof FunctionCallInstrumentationNode.FunctionCall fnCall) {
406+
state = fnCall.getState();
407+
} else {
408+
var tmp = (Function) fn;
409+
state = State.create(context);
410+
fn = new FunctionCallInstrumentationNode.FunctionCall(tmp, state, new Object[0]);
411+
}
412+
var callArgs = new Object[] {fn, arguments};
413+
var callFn = Function.fullyApplied(call.getCallTarget(), callArgs);
414+
ret[0] =
415+
RunStateNode.getUncached().execute(null, cacheKey(), executionCache, callFn);
416+
} finally {
417+
eventNodeFactory.ifPresent(EventBinding::dispose);
418+
}
419+
return ret[0];
420+
});
421+
return resultOf(future);
405422
}
406423

407424
private Type cacheKey() {
@@ -530,8 +547,12 @@ public boolean isExitException(AbstractTruffleException ex) {
530547
* @return a human-readable version of its contents.
531548
*/
532549
public String getExceptionMessage(AbstractTruffleException panic) {
550+
var future = submitExecution(() -> computeExceptionMessage(panic));
551+
return resultOf(future);
552+
}
553+
554+
private String computeExceptionMessage(AbstractTruffleException panic) {
533555
var iop = InteropLibrary.getUncached();
534-
var p = context.getThreadManager().enter();
535556
var payload = panic instanceof PanicException ex ? ex.getPayload() : panic;
536557
try {
537558
// Invoking a member on an Atom that does not have a method `to_display_text` will not
@@ -552,8 +573,6 @@ public String getExceptionMessage(AbstractTruffleException panic) {
552573
} else {
553574
throw e;
554575
}
555-
} finally {
556-
context.getThreadManager().leave(p);
557576
}
558577
}
559578

@@ -562,6 +581,20 @@ static <E extends Throwable> E raise(Class<E> type, Throwable ex) throws E {
562581
throw (E) ex;
563582
}
564583

584+
private <T> Future<T> submitExecution(Supplier<T> c) {
585+
return context.getThreadManager().submit(c);
586+
}
587+
588+
private static <T> T resultOf(Future<T> future) {
589+
try {
590+
return future.get();
591+
} catch (InterruptedException ex) {
592+
throw raise(RuntimeException.class, ex);
593+
} catch (ExecutionException ex) {
594+
throw raise(RuntimeException.class, ex.getCause());
595+
}
596+
}
597+
565598
private static final class ExecuteRootNode extends RootNode {
566599
@Node.Child private InteropLibrary iop = InteropLibrary.getFactory().createDispatched(5);
567600

engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/execution/CommandExecutionEngine.scala

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -49,16 +49,16 @@ class CommandExecutionEngine(interpreterContext: InterpreterContext)
4949
logger.debug(
5050
"Executing commands in a separate command pool"
5151
)
52-
interpreterContext.executionService.getContext
53-
.newCachedThreadPool("command-pool", 2, 10, 50, false)
52+
interpreterContext.executionService.getContext.getThreadManager
53+
.newCachedThreadPool("command-pool", 2, 10, 50)
5454
}
5555

5656
private val sequentialExecutionService =
57-
interpreterContext.executionService.getContext.newFixedThreadPool(
58-
1,
59-
"sequential-command-pool",
60-
false
61-
)
57+
interpreterContext.executionService.getContext.getThreadManager
58+
.newFixedThreadPool(
59+
1,
60+
"sequential-command-pool"
61+
)
6262
private val sequentialExecutionContext =
6363
ExecutionContext.fromExecutor(sequentialExecutionService)
6464

0 commit comments

Comments
 (0)