Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mutex for Kotlin/Common #508

Open
wants to merge 4 commits into
base: native-thread-parking
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions atomicfu/api/atomicfu.api
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,20 @@ public final class kotlinx/atomicfu/TraceKt {
public static final fun named (Lkotlinx/atomicfu/TraceBase;Ljava/lang/String;)Lkotlinx/atomicfu/TraceBase;
}

public final class kotlinx/atomicfu/locks/Mutex {
public fun <init> ()V
public fun <init> (Ljava/util/concurrent/locks/ReentrantLock;)V
public final fun getReentrantLock ()Ljava/util/concurrent/locks/ReentrantLock;
Comment on lines +140 to +141

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's avoid exposing the ReentrantLock constructor and accessor for now. What if we do implement our own mutex down the line that performs better?

public final fun isLocked ()Z
public final fun lock ()V
public final fun tryLock ()Z
public final fun unlock ()V
}

public final class kotlinx/atomicfu/locks/MutexKt {
public static final fun withLock (Lkotlinx/atomicfu/locks/Mutex;Lkotlin/jvm/functions/Function0;)Ljava/lang/Object;
}

public final class kotlinx/atomicfu/parking/KThread {
public static final field Companion Lkotlinx/atomicfu/parking/KThread$Companion;
}
Expand Down
1 change: 1 addition & 0 deletions atomicfu/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ kotlin {

jvmTest {
dependencies {
implementation("org.jetbrains.kotlinx:lincheck:2.35")
implementation("org.jetbrains.kotlin:kotlin-reflect")
implementation("org.jetbrains.kotlin:kotlin-test")
implementation("org.jetbrains.kotlin:kotlin-test-junit")
Expand Down

This file was deleted.

This file was deleted.

79 changes: 79 additions & 0 deletions atomicfu/src/commonMain/kotlin/kotlinx/atomicfu/locks/Mutex.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package kotlinx.atomicfu.locks

import kotlin.contracts.ExperimentalContracts
import kotlin.contracts.InvocationKind
import kotlin.contracts.contract

/**
* Mutual exclusion for Kotlin Multiplatform.
*
* It can protect a shared resource or critical section from multiple thread accesses.
* Threads can acquire the lock by calling [lock] and release the lock by calling [unlock].
*
* When a thread calls [lock] while another thread is locked, it will suspend until the lock is released.
* When multiple threads are waiting for the lock, they will acquire it in a fair order (first in first out).

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

*
* It is reentrant, meaning the lock holding thread can call [lock] multiple times without suspending.
* To release the lock (after multiple [lock] calls) an equal number of [unlock] calls are required.
*
* This Mutex should not be used in combination with coroutines and `suspend` functions
* as it blocks the waiting thread.
* Use the `Mutex` from the coroutines library instead.
*
* ```Kotlin
* mutex.withLock {
* // Critical section only executed by
* // one thread at a time.
* }
* ```
*/

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is user-visible documentation, and so it must explain the contracts of the data structure. What is a mutex? What would an example of its usage look like? Is it reentrant? Is it fair? It should mention that it's unsuitable for async (suspend) code, blocks the thread, and is only suitable for low-level technical work.

The same goes for every function in the file: all API entries need to be documented. When does each function return? When can it throw an exception? What does it do?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have adjusted the docs, wdyt?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great, thanks!

expect class Mutex() {
/**
* Returns `true` if this mutex is locked.
*/
fun isLocked(): Boolean

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even if it turns out someone does need this function, they can do fun Mutex.isLocked() = if (tryLock()) { unlock(); true } else false (right?), so including this function is purely a performance optimization for a performance problem that may not even exist. So, unless we know of some specific use cases for this function, let's not take on the extra commitment of including it.

Copy link
Collaborator Author

@bbrockbernd bbrockbernd Feb 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is correct. I have tried to keep the api similar to the coroutines mutex's api. But will remove it for now.

P.S. true and false should be swapped right?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On a second thought, this won't work if I am holding the lock myself since it is reentrant.


/**
* Tries to lock this mutex, returning `false` if this mutex is already locked.
*
* It is recommended to use [withLock] for safety reasons, so that the acquired lock is always
* released at the end of your critical section, and [unlock] is never invoked before a successful
* lock acquisition.
*/
fun tryLock(): Boolean

/**
* Locks the mutex, suspends the thread until the lock is acquired.
*
* It is recommended to use [withLock] for safety reasons, so that the acquired lock is always
* released at the end of your critical section, and [unlock] is never invoked before a successful
* lock acquisition.
*/
fun lock()

/**
* Releases the lock.
* Throws [IllegalStateException] when the current thread is not holding the lock.
*
* It is recommended to use [withLock] for safety reasons, so that the acquired lock is always
* released at the end of the critical section, and [unlock] is never invoked before a successful
* lock acquisition.
*/
fun unlock()
}

/**
* Executes the given code [block] under this mutex's lock.
*
* @return result of [block]
*/
@OptIn(ExperimentalContracts::class)
inline fun <T> Mutex.withLock(block: () -> T): T {
contract { callsInPlace(block, InvocationKind.EXACTLY_ONCE) }
lock()
return try {
block()
} finally {
unlock()
}
}
150 changes: 150 additions & 0 deletions atomicfu/src/concurrentMain/kotlin/kotlinx/atomicfu/locks/Mutex.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
package kotlinx.atomicfu.locks

import kotlinx.atomicfu.AtomicRef
import kotlinx.atomicfu.atomic
import kotlinx.atomicfu.parking.ThreadParker
import kotlinx.atomicfu.parking.currentThreadId

internal class NativeMutex {
/**
* Mutex implementation for Kotlin/Native.
* In concurrentMain sourceSet to be testable with Lincheck.
*
* The [state] variable stands for: 0 -> Lock is free
* 1 -> Lock is locked but no waiters
* 4 -> Lock is locked with 3 waiters
*
* The state.incrementAndGet() call makes my claim on the lock.
* The returned value either means I acquired it (when it is 1).
* Or I need to enqueue and park (when it is > 1).
*
* The [holdCount] variable is to enable reentrancy.
*
* Works by using a [parkingQueue].
* When a thread tries to acquire the lock, but finds it is already locked it enqueues by appending to the [parkingQueue].
* On enqueue the parking queue provides the second last node, this node is used to park on.
* When our thread is woken up that means that the thread parked on the thrid last node called unpark on the second last node.
* Since a woken up thread is first inline it means that it's node is the head and can therefore dequeue.
*
* Unlocking happens by calling state.decrementAndGet().
* When the returned value is 0 it means the lock is free and we can simply return.
* If the new state is > 0, then there are waiters. We wake up the first by unparking the head of the queue.
* This even works when a thread is not parked yet,
* since the ThreadParker can be pre-unparked resulting in the parking call to return immediately.
*/
private val parkingQueue = ParkingQueue()
private val owningThread = atomic(-1L)
private val state = atomic(0)
private val holdCount = atomic(0)


fun lock() {
val currentThreadId = currentThreadId()

// Has to be checked in this order!
if (holdCount.value > 0 && currentThreadId == owningThread.value) {
// Is reentring thread
holdCount.incrementAndGet()
return
}

// Otherwise try acquire lock
val newState = state.incrementAndGet()
// If new state 1 than I have acquired lock skipping queue.
if (newState == 1) {
owningThread.value = currentThreadId
holdCount.incrementAndGet()
return
}

// If state larger than 1 -> enqueue and park
// When woken up thread has acquired lock and his node in the queue is therefore at the head.
// Remove head
if (newState > 1) {
val prevNode = parkingQueue.enqueue()
prevNode.parker.park()
parkingQueue.dequeue()
owningThread.value = currentThreadId
holdCount.incrementAndGet()
return
}
}

fun unlock() {
val currentThreadId = currentThreadId()
val currentOwnerId = owningThread.value
if (currentThreadId != currentOwnerId) throw IllegalStateException("Thread is not holding the lock")

// dec hold count
val newHoldCount = holdCount.decrementAndGet()
if (newHoldCount > 0) return
if (newHoldCount < 0) throw IllegalStateException("Thread unlocked more than it locked")

// Lock is released by decrementing (only if decremented to 0)
val currentState = state.decrementAndGet()
if (currentState == 0) return

// If waiters wake up the first in line. The woken up thread will dequeue the node.
if (currentState > 0) {
val nextParker = parkingQueue.getHead()
nextParker.parker.unpark()
return
}
}

fun isLocked(): Boolean {
return state.value > 0
}

fun tryLock(): Boolean {
val currentThreadId = currentThreadId()
if (holdCount.value > 0 && owningThread.value == currentThreadId || state.compareAndSet(0, 1)) {
owningThread.value = currentThreadId
holdCount.incrementAndGet()
return true
}
return false
}

// Based on Micheal-Scott Queue
private class ParkingQueue {
private val head: AtomicRef<Node>
private val tail: AtomicRef<Node>

init {
val first = Node()
head = atomic(first)
tail = atomic(first)
}

fun getHead(): Node {
return head.value
}

fun enqueue(): Node {
while (true) {
val node = Node()
val curTail = tail.value
if (curTail.next.compareAndSet(null, node)) {
tail.compareAndSet(curTail, node)
return curTail
}
else tail.compareAndSet(curTail, curTail.next.value!!)
}
}

fun dequeue() {
while (true) {
val currentHead = head.value
val currentHeadNext = currentHead.next.value ?: throw IllegalStateException("Dequeing parker but already empty, should not be possible")
if (head.compareAndSet(currentHead, currentHeadNext)) return
}
}

}

private class Node {
val parker = ThreadParker()
val next = atomic<Node?>(null)
}
}
Loading