Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 0 additions & 6 deletions kotlinx-coroutines-core/api/kotlinx-coroutines-core.api
Original file line number Diff line number Diff line change
Expand Up @@ -341,12 +341,6 @@ public abstract interface class kotlinx/coroutines/DisposableHandle {
public abstract fun dispose ()V
}

public final class kotlinx/coroutines/EventLoopKt {
public static final fun isIoDispatcherThread (Ljava/lang/Thread;)Z
public static final fun processNextEventInCurrentThread ()J
public static final fun runSingleTaskFromCurrentSystemDispatcher ()J
}

public final class kotlinx/coroutines/ExceptionsKt {
public static final fun CancellationException (Ljava/lang/String;Ljava/lang/Throwable;)Ljava/util/concurrent/CancellationException;
}
Expand Down
2 changes: 1 addition & 1 deletion kotlinx-coroutines-core/common/src/Builders.common.kt
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ internal class DispatchedCoroutine<in T>(
override fun afterResume(state: Any?) {
if (tryResume()) return // completed before getResult invocation -- bail out
// Resume in a cancellable way because we have to switch back to the original dispatcher
uCont.intercepted().resumeCancellableWith(recoverResult(state, uCont))
uCont.intercepted().resumeCancellableWithInternal(recoverResult(state, uCont))
}

internal fun getResult(): Any? {
Expand Down
4 changes: 4 additions & 0 deletions kotlinx-coroutines-core/common/src/CancellableContinuation.kt
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ public interface CancellableContinuation<in T> : Continuation<T> {
* Implementation note: current implementation always returns RESUME_TOKEN or `null`
*
* @suppress **This is unstable API and it is subject to change.**
* Used in ktor.
*/
@InternalCoroutinesApi
public fun <R: T> tryResume(
Expand All @@ -172,6 +173,7 @@ public interface CancellableContinuation<in T> : Continuation<T> {
* [completeResume] must be invoked with it.
*
* @suppress **This is unstable API and it is subject to change.**
* Used in ktor.
*/
@InternalCoroutinesApi
public fun tryResumeWithException(exception: Throwable): Any?
Expand All @@ -180,6 +182,7 @@ public interface CancellableContinuation<in T> : Continuation<T> {
* Completes the execution of [tryResume] or [tryResumeWithException] on its non-null result.
*
* @suppress **This is unstable API and it is subject to change.**
* Used in ktor.
*/
@InternalCoroutinesApi
public fun completeResume(token: Any)
Expand All @@ -191,6 +194,7 @@ public interface CancellableContinuation<in T> : Continuation<T> {
* Exposed in our ABI since 1.0.0 within `suspendCancellableCoroutine` body.
*
* @suppress **This is unstable API and it is subject to change.**
* Used in ktor.
*/
@InternalCoroutinesApi
public fun initCancellability()
Expand Down
5 changes: 3 additions & 2 deletions kotlinx-coroutines-core/common/src/Job.kt
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ public interface Job : CoroutineContext.Element {
public val children: Sequence<Job>

/**
* Attaches child job so that this job becomes its parent and
* Attaches a child job so that this job becomes its parent and
* returns a handle that should be used to detach it.
*
* A parent-child relation has the following effect:
Expand All @@ -256,7 +256,8 @@ public interface Job : CoroutineContext.Element {
* lookup a [Job] instance in the parent context and use this function to attach themselves as a child.
* They also store a reference to the resulting [ChildHandle] and dispose a handle when they complete.
*
* @suppress This is an internal API. This method is too error prone for public API.
* @suppress This is an internal API. This method is too error-prone for public API.
* Used in IntelliJ.
*/
// ChildJob and ChildHandle are made internal on purpose to further deter 3rd-party impl of Job
@InternalCoroutinesApi
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -271,14 +271,23 @@ internal fun CoroutineDispatcher.safeIsDispatchNeeded(context: CoroutineContext)
* It may appear in stack traces when coroutines are started/resumed with unconfined dispatcher.
* @suppress **This an internal API and should not be used from general code.**
*/
@InternalCoroutinesApi
public fun <T> Continuation<T>.resumeCancellableWith(
internal fun <T> Continuation<T>.resumeCancellableWithInternal(
result: Result<T>,
): Unit = when (this) {
is DispatchedContinuation -> resumeCancellableWith(result)
else -> resumeWith(result)
}

@InternalCoroutinesApi
@Deprecated(
"This function was intended for internal use only and will be removed. " +
"If you have a use case for it, please file an issue in the issue tracker.",
level = DeprecationLevel.WARNING
) // WARNING in 1.11, ERROR in 1.12, REMOVE in 1.13, was @InternalCoroutinesApi
public fun <T> Continuation<T>.resumeCancellableWith(
result: Result<T>,
): Unit = resumeCancellableWithInternal(result)

internal fun DispatchedContinuation<Unit>.yieldUndispatched(): Boolean =
executeUnconfined(Unit, MODE_CANCELLABLE, doYield = true) {
run()
Expand Down
2 changes: 1 addition & 1 deletion kotlinx-coroutines-core/common/src/internal/Scopes.kt
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ internal open class ScopeCoroutine<in T>(

override fun afterCompletion(state: Any?) {
// Resume in a cancellable way by default when resuming from another context
uCont.intercepted().resumeCancellableWith(recoverResult(state, uCont))
uCont.intercepted().resumeCancellableWithInternal(recoverResult(state, uCont))
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,8 @@ import kotlin.contracts.*
@InternalCoroutinesApi
public expect open class SynchronizedObject() // marker abstract class

/**
* @suppress **This an internal API and should not be used from general code.**
*/
@InternalCoroutinesApi
public expect inline fun <T> synchronizedImpl(lock: SynchronizedObject, block: () -> T): T
@PublishedApi
internal expect inline fun <T> synchronizedImpl(lock: SynchronizedObject, block: () -> T): T

/**
* @suppress **This an internal API and should not be used from general code.**
Expand Down
6 changes: 3 additions & 3 deletions kotlinx-coroutines-core/common/src/intrinsics/Cancellable.kt
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import kotlin.coroutines.intrinsics.*
*/
@InternalCoroutinesApi
public fun <T> (suspend () -> T).startCoroutineCancellable(completion: Continuation<T>): Unit = runSafely(completion) {
createCoroutineUnintercepted(completion).intercepted().resumeCancellableWith(Result.success(Unit))
createCoroutineUnintercepted(completion).intercepted().resumeCancellableWithInternal(Result.success(Unit))
}

/**
Expand All @@ -23,7 +23,7 @@ public fun <T> (suspend () -> T).startCoroutineCancellable(completion: Continuat
internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(
receiver: R, completion: Continuation<T>,
) = runSafely(completion) {
createCoroutineUnintercepted(receiver, completion).intercepted().resumeCancellableWith(Result.success(Unit))
createCoroutineUnintercepted(receiver, completion).intercepted().resumeCancellableWithInternal(Result.success(Unit))
}

/**
Expand All @@ -32,7 +32,7 @@ internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(
*/
internal fun Continuation<Unit>.startCoroutineCancellable(fatalCompletion: Continuation<*>) =
runSafely(fatalCompletion) {
intercepted().resumeCancellableWith(Result.success(Unit))
intercepted().resumeCancellableWithInternal(Result.success(Unit))
}

/**
Expand Down
5 changes: 1 addition & 4 deletions kotlinx-coroutines-core/common/src/selects/Select.kt
Original file line number Diff line number Diff line change
Expand Up @@ -162,11 +162,8 @@ public typealias ProcessResultFunction = (clauseObject: Any, param: Any?, clause
* or [SelectInstance.selectInRegistrationPhase], should be processed in case of this `select`
* cancellation while dispatching. Unfortunately, we cannot pass this function only in [SelectInstance.trySelect],
* as [SelectInstance.selectInRegistrationPhase] can be called when the coroutine is already cancelled.
*
* @suppress **This is unstable API, and it is subject to change.**
*/
@InternalCoroutinesApi
public typealias OnCancellationConstructor = (select: SelectInstance<*>, param: Any?, internalResult: Any?) ->
internal typealias OnCancellationConstructor = (select: SelectInstance<*>, param: Any?, internalResult: Any?) ->
(Throwable, Any?, CoroutineContext) -> Unit

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,5 @@ public actual open class SynchronizedObject
/**
* @suppress **This an internal API and should not be used from general code.**
*/
@InternalCoroutinesApi
public actual inline fun <T> synchronizedImpl(lock: SynchronizedObject, block: () -> T): T = block()
@PublishedApi
internal actual inline fun <T> synchronizedImpl(lock: SynchronizedObject, block: () -> T): T = block()
101 changes: 0 additions & 101 deletions kotlinx-coroutines-core/jvm/src/EventLoop.kt
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
package kotlinx.coroutines

import kotlinx.coroutines.Runnable
import kotlinx.coroutines.scheduling.*
import kotlinx.coroutines.scheduling.CoroutineScheduler

internal actual abstract class EventLoopImplPlatform: EventLoop() {

protected abstract val thread: Thread
Expand All @@ -25,101 +21,4 @@ internal class BlockingEventLoop(

internal actual fun createEventLoop(): EventLoop = BlockingEventLoop(Thread.currentThread())

/**
* Processes next event in the current thread's event loop.
*
* The result of this function is to be interpreted like this:
* - `<= 0` -- there are potentially more events for immediate processing;
* - `> 0` -- a number of nanoseconds to wait for the next scheduled event;
* - [Long.MAX_VALUE] -- no more events or no thread-local event loop.
*
* Sample usage of this function:
*
* ```
* while (waitingCondition) {
* val time = processNextEventInCurrentThread()
* LockSupport.parkNanos(time)
* }
* ```
*
* @suppress **This an internal API and should not be used from general code.**
*/
@InternalCoroutinesApi
public fun processNextEventInCurrentThread(): Long =
// This API is used in Ktor for serverless integration where a single thread awaits a blocking call
// (and, to avoid actual blocking, does something via this call), see #850
ThreadLocalEventLoop.currentOrNull()?.processNextEvent() ?: Long.MAX_VALUE

internal actual inline fun platformAutoreleasePool(crossinline block: () -> Unit) = block()

/**
* Retrieves and executes a single task from the current system dispatcher ([Dispatchers.Default] or [Dispatchers.IO]).
* Returns `0` if any task was executed, `>= 0` for number of nanoseconds to wait until invoking this method again
* (implying that there will be a task to steal in N nanoseconds), `-1` if there is no tasks in the corresponding dispatcher at all.
*
* ### Invariants
*
* - When invoked from [Dispatchers.Default] **thread** (even if the actual context is different dispatcher,
* [CoroutineDispatcher.limitedParallelism] or any in-place wrapper), it runs an arbitrary task that ended
* up being scheduled to [Dispatchers.Default] or its counterpart. Tasks scheduled to [Dispatchers.IO]
* **are not** executed[1].
* - When invoked from [Dispatchers.IO] thread, the same rules apply, but for blocking tasks only.
*
* [1] -- this is purely technical limitation: the scheduler does not have "notify me when CPU token is available" API,
* and we cannot leave this method without leaving thread in its original state.
*
* ### Rationale
*
* This is an internal API that is intended to replace IDEA's core FJP decomposition.
* The following API is provided by IDEA core:
* ```
* runDecomposedTaskAndJoinIt { // <- non-suspending call
* // spawn as many tasks as needed
* // these tasks can also invoke 'runDecomposedTaskAndJoinIt'
* }
* ```
* The key observation here is that 'runDecomposedTaskAndJoinIt' can be invoked from `Dispatchers.Default` itself,
* thus blocking at least one thread. To avoid deadlocks and starvation during large hierarchical decompositions,
* 'runDecomposedTaskAndJoinIt' should not just block but also **help** execute the task or other tasks
* until an arbitrary condition is satisfied.
*
* See #3439 for additional details.
*
* ### Limitations and caveats
*
* - Executes tasks in-place, thus potentially leaking irrelevant thread-locals from the current thread
* - Is not 100% effective, because the caller should somehow "wait" (or do other work) for [Long] returned nanoseconds
* even when work arrives immediately after returning from this method.
* - When there is no more work, it's up to the caller to decide what to do. It's important to remember that
* work to current dispatcher may arrive **later** from external sources [1]
*
* [1] -- this is also a technicality that can be solved in kotlinx.coroutines itself, but unfortunately requires
* a tremendous effort.
*
* @throws IllegalStateException if the current thread is not system dispatcher thread
*/
@InternalCoroutinesApi
@DelicateCoroutinesApi
@PublishedApi
internal fun runSingleTaskFromCurrentSystemDispatcher(): Long {
val thread = Thread.currentThread()
if (thread !is CoroutineScheduler.Worker) throw IllegalStateException("Expected CoroutineScheduler.Worker, but got $thread")
return thread.runSingleTask()
}

/**
* Checks whether the given thread belongs to Dispatchers.IO.
* Note that feature "is part of the Dispatchers.IO" is *dynamic*, meaning that the thread
* may change this status when switching between tasks.
*
* This function is inteded to be used on the result of `Thread.currentThread()` for diagnostic
* purposes, and is declared as an extension only to avoid top-level scope pollution.
*/
@InternalCoroutinesApi
@DelicateCoroutinesApi
@PublishedApi
internal fun Thread.isIoDispatcherThread(): Boolean {
if (this !is CoroutineScheduler.Worker) return false
return isIo()
}

4 changes: 2 additions & 2 deletions kotlinx-coroutines-core/jvm/src/internal/Synchronized.kt
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,6 @@ public actual typealias SynchronizedObject = Any
/**
* @suppress **This an internal API and should not be used from general code.**
*/
@InternalCoroutinesApi
public actual inline fun <T> synchronizedImpl(lock: SynchronizedObject, block: () -> T): T =
@PublishedApi
internal actual inline fun <T> synchronizedImpl(lock: SynchronizedObject, block: () -> T): T =
kotlin.synchronized(lock, block)
30 changes: 0 additions & 30 deletions kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt
Original file line number Diff line number Diff line change
Expand Up @@ -753,28 +753,6 @@ internal class CoroutineScheduler(
tryReleaseCpu(WorkerState.TERMINATED)
}

/**
* See [runSingleTaskFromCurrentSystemDispatcher] for rationale and details.
* This is a fine-tailored method for a specific use-case not expected to be used widely.
*/
fun runSingleTask(): Long {
val stateSnapshot = state
val isCpuThread = state == WorkerState.CPU_ACQUIRED
val task = if (isCpuThread) {
findCpuTask()
} else {
findBlockingTask()
}
if (task == null) {
if (minDelayUntilStealableTaskNs == 0L) return -1L
return minDelayUntilStealableTaskNs
}
runSafely(task)
if (!isCpuThread) decrementBlockingTasks()
assert { state == stateSnapshot }
return 0L
}

fun isIo() = state == WorkerState.BLOCKING

// Counterpart to "tryUnpark"
Expand Down Expand Up @@ -927,20 +905,12 @@ internal class CoroutineScheduler(
return findBlockingTask()
}

// NB: ONLY for runSingleTask method
private fun findBlockingTask(): Task? {
return localQueue.pollBlocking()
?: globalBlockingQueue.removeFirstOrNull()
?: trySteal(STEAL_BLOCKING_ONLY)
}

// NB: ONLY for runSingleTask method
private fun findCpuTask(): Task? {
return localQueue.pollCpu()
?: globalBlockingQueue.removeFirstOrNull()
?: trySteal(STEAL_CPU_ONLY)
}

private fun findAnyTask(scanLocalQueue: Boolean): Task? {
/*
* Anti-starvation mechanism: probabilistically poll either local
Expand Down
11 changes: 3 additions & 8 deletions kotlinx-coroutines-core/jvm/src/scheduling/WorkQueue.kt
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ internal class WorkQueue {
fun trySteal(stealingMode: StealingMode, stolenTaskRef: ObjectRef<Task?>): Long {
val task = when (stealingMode) {
STEAL_ANY -> pollBuffer()
else -> stealWithExclusiveMode(stealingMode)
else -> stealWithExclusiveMode(onlyBlocking = stealingMode == STEAL_BLOCKING_ONLY)
}

if (task != null) {
Expand All @@ -131,10 +131,9 @@ internal class WorkQueue {
}

// Steal only tasks of a particular kind, potentially invoking full queue scan
private fun stealWithExclusiveMode(stealingMode: StealingMode): Task? {
private fun stealWithExclusiveMode(/* Only blocking OR only CPU */ onlyBlocking: Boolean): Task? {
var start = consumerIndex.value
val end = producerIndex.value
val onlyBlocking = stealingMode == STEAL_BLOCKING_ONLY
// Bail out if there is no blocking work for us
while (start != end) {
if (onlyBlocking && blockingTasksInBuffer.value == 0) return null
Expand All @@ -148,10 +147,6 @@ internal class WorkQueue {
// NB: ONLY for runSingleTask method
fun pollBlocking(): Task? = pollWithExclusiveMode(onlyBlocking = true /* only blocking */)

// Polls for CPU task, invoked only by the owner
// NB: ONLY for runSingleTask method
fun pollCpu(): Task? = pollWithExclusiveMode(onlyBlocking = false /* only cpu */)

private fun pollWithExclusiveMode(/* Only blocking OR only CPU */ onlyBlocking: Boolean): Task? {
while (true) { // Poll the slot
val lastScheduled = lastScheduledTask.value ?: break
Expand All @@ -175,7 +170,7 @@ internal class WorkQueue {
return null
}

private fun tryExtractFromTheMiddle(index: Int, onlyBlocking: Boolean): Task? {
private fun tryExtractFromTheMiddle(index: Int, /* Only blocking OR only CPU */ onlyBlocking: Boolean): Task? {
val arrayIndex = index and MASK
val value = buffer[arrayIndex]
if (value != null && value.isBlocking == onlyBlocking && buffer.compareAndSet(arrayIndex, value, null)) {
Expand Down
Loading