diff --git a/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api b/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api index 91d5c1ab22..170c4c6b7b 100644 --- a/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api +++ b/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api @@ -341,12 +341,6 @@ public abstract interface class kotlinx/coroutines/DisposableHandle { public abstract fun dispose ()V } -public final class kotlinx/coroutines/EventLoopKt { - public static final fun isIoDispatcherThread (Ljava/lang/Thread;)Z - public static final fun processNextEventInCurrentThread ()J - public static final fun runSingleTaskFromCurrentSystemDispatcher ()J -} - public final class kotlinx/coroutines/ExceptionsKt { public static final fun CancellationException (Ljava/lang/String;Ljava/lang/Throwable;)Ljava/util/concurrent/CancellationException; } diff --git a/kotlinx-coroutines-core/common/src/Builders.common.kt b/kotlinx-coroutines-core/common/src/Builders.common.kt index 23ef7665b5..7fe4c7333f 100644 --- a/kotlinx-coroutines-core/common/src/Builders.common.kt +++ b/kotlinx-coroutines-core/common/src/Builders.common.kt @@ -253,7 +253,7 @@ internal class DispatchedCoroutine( override fun afterResume(state: Any?) { if (tryResume()) return // completed before getResult invocation -- bail out // Resume in a cancellable way because we have to switch back to the original dispatcher - uCont.intercepted().resumeCancellableWith(recoverResult(state, uCont)) + uCont.intercepted().resumeCancellableWithInternal(recoverResult(state, uCont)) } internal fun getResult(): Any? { diff --git a/kotlinx-coroutines-core/common/src/CancellableContinuation.kt b/kotlinx-coroutines-core/common/src/CancellableContinuation.kt index 4a76dd8601..0a6bcb6509 100644 --- a/kotlinx-coroutines-core/common/src/CancellableContinuation.kt +++ b/kotlinx-coroutines-core/common/src/CancellableContinuation.kt @@ -160,6 +160,7 @@ public interface CancellableContinuation : Continuation { * Implementation note: current implementation always returns RESUME_TOKEN or `null` * * @suppress **This is unstable API and it is subject to change.** + * Used in ktor. */ @InternalCoroutinesApi public fun tryResume( @@ -172,6 +173,7 @@ public interface CancellableContinuation : Continuation { * [completeResume] must be invoked with it. * * @suppress **This is unstable API and it is subject to change.** + * Used in ktor. */ @InternalCoroutinesApi public fun tryResumeWithException(exception: Throwable): Any? @@ -180,6 +182,7 @@ public interface CancellableContinuation : Continuation { * Completes the execution of [tryResume] or [tryResumeWithException] on its non-null result. * * @suppress **This is unstable API and it is subject to change.** + * Used in ktor. */ @InternalCoroutinesApi public fun completeResume(token: Any) @@ -191,6 +194,7 @@ public interface CancellableContinuation : Continuation { * Exposed in our ABI since 1.0.0 within `suspendCancellableCoroutine` body. * * @suppress **This is unstable API and it is subject to change.** + * Used in ktor. */ @InternalCoroutinesApi public fun initCancellability() diff --git a/kotlinx-coroutines-core/common/src/Job.kt b/kotlinx-coroutines-core/common/src/Job.kt index 594cf8bfad..e59045f5d4 100644 --- a/kotlinx-coroutines-core/common/src/Job.kt +++ b/kotlinx-coroutines-core/common/src/Job.kt @@ -240,7 +240,7 @@ public interface Job : CoroutineContext.Element { public val children: Sequence /** - * Attaches child job so that this job becomes its parent and + * Attaches a child job so that this job becomes its parent and * returns a handle that should be used to detach it. * * A parent-child relation has the following effect: @@ -256,7 +256,8 @@ public interface Job : CoroutineContext.Element { * lookup a [Job] instance in the parent context and use this function to attach themselves as a child. * They also store a reference to the resulting [ChildHandle] and dispose a handle when they complete. * - * @suppress This is an internal API. This method is too error prone for public API. + * @suppress This is an internal API. This method is too error-prone for public API. + * Used in IntelliJ. */ // ChildJob and ChildHandle are made internal on purpose to further deter 3rd-party impl of Job @InternalCoroutinesApi diff --git a/kotlinx-coroutines-core/common/src/internal/DispatchedContinuation.kt b/kotlinx-coroutines-core/common/src/internal/DispatchedContinuation.kt index 4c8f54e877..c6383c11de 100644 --- a/kotlinx-coroutines-core/common/src/internal/DispatchedContinuation.kt +++ b/kotlinx-coroutines-core/common/src/internal/DispatchedContinuation.kt @@ -271,14 +271,23 @@ internal fun CoroutineDispatcher.safeIsDispatchNeeded(context: CoroutineContext) * It may appear in stack traces when coroutines are started/resumed with unconfined dispatcher. * @suppress **This an internal API and should not be used from general code.** */ -@InternalCoroutinesApi -public fun Continuation.resumeCancellableWith( +internal fun Continuation.resumeCancellableWithInternal( result: Result, ): Unit = when (this) { is DispatchedContinuation -> resumeCancellableWith(result) else -> resumeWith(result) } +@InternalCoroutinesApi +@Deprecated( + "This function was intended for internal use only and will be removed. " + + "If you have a use case for it, please file an issue in the issue tracker.", + level = DeprecationLevel.WARNING +) // WARNING in 1.11, ERROR in 1.12, REMOVE in 1.13, was @InternalCoroutinesApi +public fun Continuation.resumeCancellableWith( + result: Result, +): Unit = resumeCancellableWithInternal(result) + internal fun DispatchedContinuation.yieldUndispatched(): Boolean = executeUnconfined(Unit, MODE_CANCELLABLE, doYield = true) { run() diff --git a/kotlinx-coroutines-core/common/src/internal/Scopes.kt b/kotlinx-coroutines-core/common/src/internal/Scopes.kt index 9b830bd5c9..01315cab4d 100644 --- a/kotlinx-coroutines-core/common/src/internal/Scopes.kt +++ b/kotlinx-coroutines-core/common/src/internal/Scopes.kt @@ -20,7 +20,7 @@ internal open class ScopeCoroutine( override fun afterCompletion(state: Any?) { // Resume in a cancellable way by default when resuming from another context - uCont.intercepted().resumeCancellableWith(recoverResult(state, uCont)) + uCont.intercepted().resumeCancellableWithInternal(recoverResult(state, uCont)) } /** diff --git a/kotlinx-coroutines-core/common/src/internal/Synchronized.common.kt b/kotlinx-coroutines-core/common/src/internal/Synchronized.common.kt index 43777f20db..60342c7028 100644 --- a/kotlinx-coroutines-core/common/src/internal/Synchronized.common.kt +++ b/kotlinx-coroutines-core/common/src/internal/Synchronized.common.kt @@ -11,11 +11,8 @@ import kotlin.contracts.* @InternalCoroutinesApi public expect open class SynchronizedObject() // marker abstract class -/** - * @suppress **This an internal API and should not be used from general code.** - */ -@InternalCoroutinesApi -public expect inline fun synchronizedImpl(lock: SynchronizedObject, block: () -> T): T +@PublishedApi +internal expect inline fun synchronizedImpl(lock: SynchronizedObject, block: () -> T): T /** * @suppress **This an internal API and should not be used from general code.** diff --git a/kotlinx-coroutines-core/common/src/intrinsics/Cancellable.kt b/kotlinx-coroutines-core/common/src/intrinsics/Cancellable.kt index 1e87d767af..bf9ad2e26a 100644 --- a/kotlinx-coroutines-core/common/src/intrinsics/Cancellable.kt +++ b/kotlinx-coroutines-core/common/src/intrinsics/Cancellable.kt @@ -13,7 +13,7 @@ import kotlin.coroutines.intrinsics.* */ @InternalCoroutinesApi public fun (suspend () -> T).startCoroutineCancellable(completion: Continuation): Unit = runSafely(completion) { - createCoroutineUnintercepted(completion).intercepted().resumeCancellableWith(Result.success(Unit)) + createCoroutineUnintercepted(completion).intercepted().resumeCancellableWithInternal(Result.success(Unit)) } /** @@ -23,7 +23,7 @@ public fun (suspend () -> T).startCoroutineCancellable(completion: Continuat internal fun (suspend (R) -> T).startCoroutineCancellable( receiver: R, completion: Continuation, ) = runSafely(completion) { - createCoroutineUnintercepted(receiver, completion).intercepted().resumeCancellableWith(Result.success(Unit)) + createCoroutineUnintercepted(receiver, completion).intercepted().resumeCancellableWithInternal(Result.success(Unit)) } /** @@ -32,7 +32,7 @@ internal fun (suspend (R) -> T).startCoroutineCancellable( */ internal fun Continuation.startCoroutineCancellable(fatalCompletion: Continuation<*>) = runSafely(fatalCompletion) { - intercepted().resumeCancellableWith(Result.success(Unit)) + intercepted().resumeCancellableWithInternal(Result.success(Unit)) } /** diff --git a/kotlinx-coroutines-core/common/src/selects/Select.kt b/kotlinx-coroutines-core/common/src/selects/Select.kt index a13338c32b..3b8999ced5 100644 --- a/kotlinx-coroutines-core/common/src/selects/Select.kt +++ b/kotlinx-coroutines-core/common/src/selects/Select.kt @@ -162,11 +162,8 @@ public typealias ProcessResultFunction = (clauseObject: Any, param: Any?, clause * or [SelectInstance.selectInRegistrationPhase], should be processed in case of this `select` * cancellation while dispatching. Unfortunately, we cannot pass this function only in [SelectInstance.trySelect], * as [SelectInstance.selectInRegistrationPhase] can be called when the coroutine is already cancelled. - * - * @suppress **This is unstable API, and it is subject to change.** */ -@InternalCoroutinesApi -public typealias OnCancellationConstructor = (select: SelectInstance<*>, param: Any?, internalResult: Any?) -> +internal typealias OnCancellationConstructor = (select: SelectInstance<*>, param: Any?, internalResult: Any?) -> (Throwable, Any?, CoroutineContext) -> Unit /** diff --git a/kotlinx-coroutines-core/jsAndWasmShared/src/internal/Synchronized.kt b/kotlinx-coroutines-core/jsAndWasmShared/src/internal/Synchronized.kt index e828f67588..22e8f598d5 100644 --- a/kotlinx-coroutines-core/jsAndWasmShared/src/internal/Synchronized.kt +++ b/kotlinx-coroutines-core/jsAndWasmShared/src/internal/Synchronized.kt @@ -11,5 +11,5 @@ public actual open class SynchronizedObject /** * @suppress **This an internal API and should not be used from general code.** */ -@InternalCoroutinesApi -public actual inline fun synchronizedImpl(lock: SynchronizedObject, block: () -> T): T = block() +@PublishedApi +internal actual inline fun synchronizedImpl(lock: SynchronizedObject, block: () -> T): T = block() diff --git a/kotlinx-coroutines-core/jvm/src/EventLoop.kt b/kotlinx-coroutines-core/jvm/src/EventLoop.kt index 15d4ab5c85..f2834f926e 100644 --- a/kotlinx-coroutines-core/jvm/src/EventLoop.kt +++ b/kotlinx-coroutines-core/jvm/src/EventLoop.kt @@ -1,9 +1,5 @@ package kotlinx.coroutines -import kotlinx.coroutines.Runnable -import kotlinx.coroutines.scheduling.* -import kotlinx.coroutines.scheduling.CoroutineScheduler - internal actual abstract class EventLoopImplPlatform: EventLoop() { protected abstract val thread: Thread @@ -25,101 +21,4 @@ internal class BlockingEventLoop( internal actual fun createEventLoop(): EventLoop = BlockingEventLoop(Thread.currentThread()) -/** - * Processes next event in the current thread's event loop. - * - * The result of this function is to be interpreted like this: - * - `<= 0` -- there are potentially more events for immediate processing; - * - `> 0` -- a number of nanoseconds to wait for the next scheduled event; - * - [Long.MAX_VALUE] -- no more events or no thread-local event loop. - * - * Sample usage of this function: - * - * ``` - * while (waitingCondition) { - * val time = processNextEventInCurrentThread() - * LockSupport.parkNanos(time) - * } - * ``` - * - * @suppress **This an internal API and should not be used from general code.** - */ -@InternalCoroutinesApi -public fun processNextEventInCurrentThread(): Long = - // This API is used in Ktor for serverless integration where a single thread awaits a blocking call - // (and, to avoid actual blocking, does something via this call), see #850 - ThreadLocalEventLoop.currentOrNull()?.processNextEvent() ?: Long.MAX_VALUE - internal actual inline fun platformAutoreleasePool(crossinline block: () -> Unit) = block() - -/** - * Retrieves and executes a single task from the current system dispatcher ([Dispatchers.Default] or [Dispatchers.IO]). - * Returns `0` if any task was executed, `>= 0` for number of nanoseconds to wait until invoking this method again - * (implying that there will be a task to steal in N nanoseconds), `-1` if there is no tasks in the corresponding dispatcher at all. - * - * ### Invariants - * - * - When invoked from [Dispatchers.Default] **thread** (even if the actual context is different dispatcher, - * [CoroutineDispatcher.limitedParallelism] or any in-place wrapper), it runs an arbitrary task that ended - * up being scheduled to [Dispatchers.Default] or its counterpart. Tasks scheduled to [Dispatchers.IO] - * **are not** executed[1]. - * - When invoked from [Dispatchers.IO] thread, the same rules apply, but for blocking tasks only. - * - * [1] -- this is purely technical limitation: the scheduler does not have "notify me when CPU token is available" API, - * and we cannot leave this method without leaving thread in its original state. - * - * ### Rationale - * - * This is an internal API that is intended to replace IDEA's core FJP decomposition. - * The following API is provided by IDEA core: - * ``` - * runDecomposedTaskAndJoinIt { // <- non-suspending call - * // spawn as many tasks as needed - * // these tasks can also invoke 'runDecomposedTaskAndJoinIt' - * } - * ``` - * The key observation here is that 'runDecomposedTaskAndJoinIt' can be invoked from `Dispatchers.Default` itself, - * thus blocking at least one thread. To avoid deadlocks and starvation during large hierarchical decompositions, - * 'runDecomposedTaskAndJoinIt' should not just block but also **help** execute the task or other tasks - * until an arbitrary condition is satisfied. - * - * See #3439 for additional details. - * - * ### Limitations and caveats - * - * - Executes tasks in-place, thus potentially leaking irrelevant thread-locals from the current thread - * - Is not 100% effective, because the caller should somehow "wait" (or do other work) for [Long] returned nanoseconds - * even when work arrives immediately after returning from this method. - * - When there is no more work, it's up to the caller to decide what to do. It's important to remember that - * work to current dispatcher may arrive **later** from external sources [1] - * - * [1] -- this is also a technicality that can be solved in kotlinx.coroutines itself, but unfortunately requires - * a tremendous effort. - * - * @throws IllegalStateException if the current thread is not system dispatcher thread - */ -@InternalCoroutinesApi -@DelicateCoroutinesApi -@PublishedApi -internal fun runSingleTaskFromCurrentSystemDispatcher(): Long { - val thread = Thread.currentThread() - if (thread !is CoroutineScheduler.Worker) throw IllegalStateException("Expected CoroutineScheduler.Worker, but got $thread") - return thread.runSingleTask() -} - -/** - * Checks whether the given thread belongs to Dispatchers.IO. - * Note that feature "is part of the Dispatchers.IO" is *dynamic*, meaning that the thread - * may change this status when switching between tasks. - * - * This function is inteded to be used on the result of `Thread.currentThread()` for diagnostic - * purposes, and is declared as an extension only to avoid top-level scope pollution. - */ -@InternalCoroutinesApi -@DelicateCoroutinesApi -@PublishedApi -internal fun Thread.isIoDispatcherThread(): Boolean { - if (this !is CoroutineScheduler.Worker) return false - return isIo() -} - diff --git a/kotlinx-coroutines-core/jvm/src/internal/Synchronized.kt b/kotlinx-coroutines-core/jvm/src/internal/Synchronized.kt index 47273d6ceb..6f7c6674a5 100644 --- a/kotlinx-coroutines-core/jvm/src/internal/Synchronized.kt +++ b/kotlinx-coroutines-core/jvm/src/internal/Synchronized.kt @@ -11,6 +11,6 @@ public actual typealias SynchronizedObject = Any /** * @suppress **This an internal API and should not be used from general code.** */ -@InternalCoroutinesApi -public actual inline fun synchronizedImpl(lock: SynchronizedObject, block: () -> T): T = +@PublishedApi +internal actual inline fun synchronizedImpl(lock: SynchronizedObject, block: () -> T): T = kotlin.synchronized(lock, block) diff --git a/kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt b/kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt index 3430ebadec..ca43d50979 100644 --- a/kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt +++ b/kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt @@ -753,28 +753,6 @@ internal class CoroutineScheduler( tryReleaseCpu(WorkerState.TERMINATED) } - /** - * See [runSingleTaskFromCurrentSystemDispatcher] for rationale and details. - * This is a fine-tailored method for a specific use-case not expected to be used widely. - */ - fun runSingleTask(): Long { - val stateSnapshot = state - val isCpuThread = state == WorkerState.CPU_ACQUIRED - val task = if (isCpuThread) { - findCpuTask() - } else { - findBlockingTask() - } - if (task == null) { - if (minDelayUntilStealableTaskNs == 0L) return -1L - return minDelayUntilStealableTaskNs - } - runSafely(task) - if (!isCpuThread) decrementBlockingTasks() - assert { state == stateSnapshot } - return 0L - } - fun isIo() = state == WorkerState.BLOCKING // Counterpart to "tryUnpark" @@ -927,20 +905,12 @@ internal class CoroutineScheduler( return findBlockingTask() } - // NB: ONLY for runSingleTask method private fun findBlockingTask(): Task? { return localQueue.pollBlocking() ?: globalBlockingQueue.removeFirstOrNull() ?: trySteal(STEAL_BLOCKING_ONLY) } - // NB: ONLY for runSingleTask method - private fun findCpuTask(): Task? { - return localQueue.pollCpu() - ?: globalBlockingQueue.removeFirstOrNull() - ?: trySteal(STEAL_CPU_ONLY) - } - private fun findAnyTask(scanLocalQueue: Boolean): Task? { /* * Anti-starvation mechanism: probabilistically poll either local diff --git a/kotlinx-coroutines-core/jvm/src/scheduling/WorkQueue.kt b/kotlinx-coroutines-core/jvm/src/scheduling/WorkQueue.kt index a048d75d55..ca6cc2323f 100644 --- a/kotlinx-coroutines-core/jvm/src/scheduling/WorkQueue.kt +++ b/kotlinx-coroutines-core/jvm/src/scheduling/WorkQueue.kt @@ -120,7 +120,7 @@ internal class WorkQueue { fun trySteal(stealingMode: StealingMode, stolenTaskRef: ObjectRef): Long { val task = when (stealingMode) { STEAL_ANY -> pollBuffer() - else -> stealWithExclusiveMode(stealingMode) + else -> stealWithExclusiveMode(onlyBlocking = stealingMode == STEAL_BLOCKING_ONLY) } if (task != null) { @@ -131,10 +131,9 @@ internal class WorkQueue { } // Steal only tasks of a particular kind, potentially invoking full queue scan - private fun stealWithExclusiveMode(stealingMode: StealingMode): Task? { + private fun stealWithExclusiveMode(/* Only blocking OR only CPU */ onlyBlocking: Boolean): Task? { var start = consumerIndex.value val end = producerIndex.value - val onlyBlocking = stealingMode == STEAL_BLOCKING_ONLY // Bail out if there is no blocking work for us while (start != end) { if (onlyBlocking && blockingTasksInBuffer.value == 0) return null @@ -148,10 +147,6 @@ internal class WorkQueue { // NB: ONLY for runSingleTask method fun pollBlocking(): Task? = pollWithExclusiveMode(onlyBlocking = true /* only blocking */) - // Polls for CPU task, invoked only by the owner - // NB: ONLY for runSingleTask method - fun pollCpu(): Task? = pollWithExclusiveMode(onlyBlocking = false /* only cpu */) - private fun pollWithExclusiveMode(/* Only blocking OR only CPU */ onlyBlocking: Boolean): Task? { while (true) { // Poll the slot val lastScheduled = lastScheduledTask.value ?: break @@ -175,7 +170,7 @@ internal class WorkQueue { return null } - private fun tryExtractFromTheMiddle(index: Int, onlyBlocking: Boolean): Task? { + private fun tryExtractFromTheMiddle(index: Int, /* Only blocking OR only CPU */ onlyBlocking: Boolean): Task? { val arrayIndex = index and MASK val value = buffer[arrayIndex] if (value != null && value.isBlocking == onlyBlocking && buffer.compareAndSet(arrayIndex, value, null)) { diff --git a/kotlinx-coroutines-core/jvm/test/EventLoopsTest.kt b/kotlinx-coroutines-core/jvm/test/EventLoopsTest.kt index 551d1977c0..9c764a0c97 100644 --- a/kotlinx-coroutines-core/jvm/test/EventLoopsTest.kt +++ b/kotlinx-coroutines-core/jvm/test/EventLoopsTest.kt @@ -41,7 +41,7 @@ class EventLoopsTest : TestBase() { GlobalScope.launch(Dispatchers.Unconfined) { expect(2) GlobalScope.launch(Dispatchers.Unconfined) { - // this gets scheduled into outer unconfined loop + // this gets scheduled into the outer unconfined loop expect(4) } expect(3) // ^^ executed before the above unconfined @@ -57,9 +57,9 @@ class EventLoopsTest : TestBase() { assertTrue(Thread.currentThread().name.startsWith(DefaultExecutor.THREAD_NAME)) expect(2) // now runBlocking inside default executor thread --> should use outer event loop - DefaultExecutor.enqueue(Runnable { + DefaultExecutor.enqueue { expect(4) // will execute when runBlocking runs loop - }) + } expect(3) runBlocking { expect(5) @@ -68,30 +68,12 @@ class EventLoopsTest : TestBase() { finish(6) } - /** - * Simple test for [processNextEventInCurrentThread] API use-case. - */ - @Test - fun testProcessNextEventInCurrentThreadSimple() = runTest { - expect(1) - val event = EventSync() - // this coroutine fires event - launch { - expect(3) - event.fireEvent() - } - // main coroutine waits for event (same thread!) - expect(2) - event.blockingAwait() - finish(4) - } - @Test fun testSecondThreadRunBlocking() = runTest { val testThread = Thread.currentThread() val testContext = coroutineContext val event = EventSync() // will signal completion - var thread = thread { + val thread = thread { runBlocking { // outer event loop // Produce string "OK" val ch = produce { send("OK") } @@ -103,27 +85,8 @@ class EventLoopsTest : TestBase() { } event.fireEvent() // done thread } - event.blockingAwait() // wait for thread to complete - thread.join() // it is safe to join thread now - } - - /** - * Test for [processNextEventInCurrentThread] API use-case with delay. - */ - @Test - fun testProcessNextEventInCurrentThreadDelay() = runTest { - expect(1) - val event = EventSync() - // this coroutine fires event - launch { - expect(3) - delay(100) - event.fireEvent() - } - // main coroutine waits for event (same thread!) - expect(2) - event.blockingAwait() - finish(4) + event.blockingAwait() // wait for the thread to complete + thread.join() // it is safe to join the thread now } /** @@ -153,7 +116,7 @@ class EventLoopsTest : TestBase() { fun blockingAwait() { check(waitingThread.getAndSet(Thread.currentThread()) == null) while (!fired.getAndSet(false)) { - val time = processNextEventInCurrentThread() + val time = ThreadLocalEventLoop.currentOrNull()?.processNextEvent() ?: Long.MAX_VALUE LockSupport.parkNanos(time) } waitingThread.value = null diff --git a/kotlinx-coroutines-core/jvm/test/scheduling/CoroutineSchedulerInternalApiStressTest.kt b/kotlinx-coroutines-core/jvm/test/scheduling/CoroutineSchedulerInternalApiStressTest.kt deleted file mode 100644 index dbc6609101..0000000000 --- a/kotlinx-coroutines-core/jvm/test/scheduling/CoroutineSchedulerInternalApiStressTest.kt +++ /dev/null @@ -1,86 +0,0 @@ -package kotlinx.coroutines.scheduling - -import kotlinx.coroutines.testing.* -import kotlinx.coroutines.* -import kotlinx.coroutines.internal.AVAILABLE_PROCESSORS -import org.junit.Test -import java.util.* -import java.util.concurrent.ConcurrentHashMap -import java.util.concurrent.CountDownLatch -import java.util.concurrent.CyclicBarrier -import java.util.concurrent.atomic.AtomicInteger -import kotlin.random.* -import kotlin.random.Random -import kotlin.test.* -import kotlin.time.* - -class CoroutineSchedulerInternalApiStressTest : TestBase() { - - @Test(timeout = 120_000L) - fun testHelpDefaultIoIsIsolated() = repeat(100 * stressTestMultiplierSqrt) { - val ioTaskMarker = ThreadLocal.withInitial { false } - runTest { - val jobToComplete = Job() - val expectedIterations = 100 - val completionLatch = CountDownLatch(1) - val tasksToCompleteJob = AtomicInteger(expectedIterations) - val observedIoThreads = Collections.newSetFromMap(ConcurrentHashMap()) - val observedDefaultThreads = Collections.newSetFromMap(ConcurrentHashMap()) - - val barrier = CyclicBarrier(AVAILABLE_PROCESSORS) - val spawners = ArrayList() - repeat(AVAILABLE_PROCESSORS - 1) { - // Launch CORES - 1 spawners - spawners += launch(Dispatchers.Default) { - barrier.await() - repeat(expectedIterations) { - launch { - val tasksLeft = tasksToCompleteJob.decrementAndGet() - if (tasksLeft < 0) return@launch // Leftovers are being executed all over the place - observedDefaultThreads.add(Thread.currentThread()) - if (tasksLeft == 0) { - // Verify threads first - try { - assertFalse(observedIoThreads.containsAll(observedDefaultThreads)) - } finally { - jobToComplete.complete() - } - } - } - - // Sometimes launch an IO task to mess with a scheduler - if (Random.nextInt(0..9) == 0) { - launch(Dispatchers.IO) { - ioTaskMarker.set(true) - observedIoThreads.add(Thread.currentThread()) - assertTrue(Thread.currentThread().isIoDispatcherThread()) - } - } - } - completionLatch.await() - } - } - - withContext(Dispatchers.Default) { - barrier.await() - var timesHelped = 0 - while (!jobToComplete.isCompleted) { - val result = runSingleTaskFromCurrentSystemDispatcher() - assertFalse(ioTaskMarker.get()) - if (result == 0L) { - ++timesHelped - continue - } else if (result >= 0L) { - Thread.sleep(result.toDuration(DurationUnit.NANOSECONDS).toDelayMillis()) - } else { - Thread.sleep(10) - } - } - completionLatch.countDown() - assertEquals(100, timesHelped) - assertTrue(Thread.currentThread() in observedDefaultThreads, observedDefaultThreads.toString()) - } - } - } -} - diff --git a/kotlinx-coroutines-core/native/src/internal/Synchronized.kt b/kotlinx-coroutines-core/native/src/internal/Synchronized.kt index 43ff8bd9fc..46b7058134 100644 --- a/kotlinx-coroutines-core/native/src/internal/Synchronized.kt +++ b/kotlinx-coroutines-core/native/src/internal/Synchronized.kt @@ -10,8 +10,5 @@ import kotlinx.atomicfu.locks.withLock as withLock2 @InternalCoroutinesApi public actual typealias SynchronizedObject = kotlinx.atomicfu.locks.SynchronizedObject -/** - * @suppress **This an internal API and should not be used from general code.** - */ -@InternalCoroutinesApi -public actual inline fun synchronizedImpl(lock: SynchronizedObject, block: () -> T): T = lock.withLock2(block) +@PublishedApi +internal actual inline fun synchronizedImpl(lock: SynchronizedObject, block: () -> T): T = lock.withLock2(block) diff --git a/reactive/kotlinx-coroutines-reactive/src/ReactiveFlow.kt b/reactive/kotlinx-coroutines-reactive/src/ReactiveFlow.kt index c93c8de3fa..63253cc43d 100644 --- a/reactive/kotlinx-coroutines-reactive/src/ReactiveFlow.kt +++ b/reactive/kotlinx-coroutines-reactive/src/ReactiveFlow.kt @@ -104,7 +104,6 @@ private class PublisherAsFlow( collectImpl(scope.coroutineContext, SendingCollector(scope.channel)) } -@Suppress("ReactiveStreamsSubscriberImplementation") private class ReactiveSubscriber( capacity: Int, onBufferOverflow: BufferOverflow, @@ -163,7 +162,6 @@ internal fun Publisher.injectCoroutineContext(coroutineContext: Coroutine * Adapter that transforms [Flow] into TCK-complaint [Publisher]. * [cancel] invocation cancels the original flow. */ -@Suppress("ReactiveStreamsPublisherImplementation") private class FlowAsPublisher( private val flow: Flow, private val context: CoroutineContext @@ -229,7 +227,7 @@ public class FlowSubscription( subscriber.onNext(value) // Suspend if needed before requesting the next value if (requested.decrementAndGet() <= 0) { - suspendCancellableCoroutine { + suspendCancellableCoroutine { producer.value = it } } else {