Skip to content

Commit

Permalink
RnD: Try implement native SynchronizedObject without LockState alloca…
Browse files Browse the repository at this point in the history
…tion (Kotlin#412)

    * Pack "thin" lock state in a single ptr-word
    * Spin when "fat" state is required (it's harder to pack in a single word)
  • Loading branch information
Aleksei.Glushko committed Apr 8, 2024
1 parent 7e640ba commit ab2a74e
Show file tree
Hide file tree
Showing 2 changed files with 125 additions and 100 deletions.
4 changes: 4 additions & 0 deletions atomicfu/src/nativeInterop/cinterop/interop.def
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ typedef struct lock_support {

typedef struct mutex_node {
lock_support_t* mutex;
int nestedLocks;
int waiters;
struct mutex_node* next;
} mutex_node_t;

Expand All @@ -24,6 +26,8 @@ lock_support_t* lock_support_init() {

mutex_node_t* mutex_node_init(mutex_node_t* mutexNode) {
mutexNode->mutex = lock_support_init();
mutexNode->nestedLocks = 0;
mutexNode->waiters = 0;
mutexNode->next = NULL;
return mutexNode;
}
Expand Down
221 changes: 121 additions & 100 deletions atomicfu/src/nativeMain/kotlin/kotlinx/atomicfu/locks/Synchronized.kt
Original file line number Diff line number Diff line change
Expand Up @@ -2,80 +2,140 @@ package kotlinx.atomicfu.locks

import platform.posix.*
import interop.*
import kotlinx.atomicfu.atomic
import kotlinx.cinterop.*
import kotlin.native.internal.NativePtr
import kotlinx.atomicfu.locks.SynchronizedObject.Status.*
import kotlinx.cinterop.NativePtr
import kotlin.concurrent.AtomicLong
import kotlin.concurrent.AtomicNativePtr
import kotlin.concurrent.AtomicReference
import kotlin.native.concurrent.*
import kotlin.experimental.ExperimentalNativeApi


private val threadCounter = atomic(0)

// TODO assert no overflow?
@kotlin.native.concurrent.ThreadLocal
private var threadId: UInt = threadCounter.addAndGet(1).toUInt()

public actual open class SynchronizedObject {

protected val lock = AtomicReference(LockState(UNLOCKED, 0, 0))
private enum class LockStatus { UNLOCKED, THIN, FAT }

@OptIn(ExperimentalNativeApi::class)
private value class LockWord private constructor(private val encoded: ULong) {
companion object {
private const val FAT_BIT_MASK = 1uL

fun unlocked() = Thin(0u, 0u).lockWord

fun fromLong(l: Long) = LockWord(l.toULong())
}

inline val fat: Boolean get() = encoded.and(FAT_BIT_MASK) != 0uL
inline val thin: Boolean get() = !fat

inline val status: LockStatus get() = when {
thin -> if (asThin().isUnlocked()) LockStatus.UNLOCKED else LockStatus.THIN
else -> LockStatus.FAT
}

inline fun asThin() = Thin(this)
inline fun asFat() = Fat(this)

inline fun toLong() = encoded.toLong()

value class Thin internal constructor(val lockWord: LockWord) {
init { assert(lockWord.thin) }

companion object {
// TODO outline some consts

inline operator fun invoke(nested: UInt, ownerTid: UInt): Thin {
if (nested > UInt.MAX_VALUE.shr(1)) throw IllegalArgumentException() // TODO
val nestedPart = nested.shl(1).toULong()
val tidPart = ownerTid.toULong().shl(UInt.SIZE_BITS)
val result = Thin(LockWord(nestedPart.or(tidPart)))
assert(result.nested == nested)
assert(result.ownerTid == ownerTid)
return result
}
}

inline val nested: UInt get() = lockWord.encoded.and(UInt.MAX_VALUE.toULong()).shr(1).toUInt()
inline val ownerTid: UInt get() = lockWord.encoded.and(UInt.MAX_VALUE.toULong().inv()).shr(UInt.SIZE_BITS).toUInt()

internal inline fun isUnlocked() = ownerTid == 0u
}

value class Fat internal constructor(val lockWord: LockWord) {
init { assert(lockWord.fat) }

companion object {
inline operator fun invoke(mutex: CPointer<mutex_node_t>, contended: Boolean = false): Fat {
val mutexPtrValue = mutex.toLong().toULong()
assert(mutexPtrValue.and(FAT_BIT_MASK) == 0uL)
return Fat(LockWord(mutexPtrValue.or(FAT_BIT_MASK)))
}
}

inline val mutex: CPointer<mutex_node_t> get() =
lockWord.encoded.and(FAT_BIT_MASK.inv()).toLong().toCPointer()!!
}

}

// TODO introduce AtomicLockWord
private val lockWord = AtomicLong(LockWord.unlocked().toLong())

private inline fun loadLockState() = LockWord.fromLong(lockWord.value)

private fun compareSetAndFreeLock(expected: LockWord, desired: LockWord): Boolean {
return lockWord.compareAndSet(expected.toLong(), desired.toLong())
}

public fun lock() {
val currentThreadId = pthread_self()!!
val currentThreadId = threadId
while (true) {
val state = lock.value
val state = loadLockState()
when (state.status) {
UNLOCKED -> {
val thinLock = LockState(THIN, 1, 0, currentThreadId)
if (lock.compareAndSet(state, thinLock))
LockStatus.UNLOCKED -> {
val thinLock = LockWord.Thin(1u, currentThreadId)
if (compareSetAndFreeLock(state, thinLock.lockWord))
return
}
THIN -> {
if (currentThreadId == state.ownerThreadId) {
LockStatus.THIN -> {
val thinState = state.asThin()
if (currentThreadId == thinState.ownerTid) {
// reentrant lock
val thinNested = LockState(THIN, state.nestedLocks + 1, state.waiters, currentThreadId)
if (lock.compareAndSet(state, thinNested))
val thinNested = LockWord.Thin(thinState.nested + 1u, currentThreadId)
if (compareSetAndFreeLock(state, thinNested.lockWord))
return
} else {
// another thread is trying to take this lock -> allocate native mutex
val mutex = mutexPool.allocate()
mutex.lock()
val fatLock = LockState(FAT, state.nestedLocks, state.waiters + 1, state.ownerThreadId, mutex)
if (lock.compareAndSet(state, fatLock)) {
//block the current thread waiting for the owner thread to release the permit
mutex.lock()
tryLockAfterResume(currentThreadId)
return
} else {
// return permit taken for the owner thread and release mutex back to the pool
mutex.unlock()
mutexPool.release(mutex)
}
// another thread holds the lock -> allocate native mutex
// TODO allocate native mutex
// or just spin
pthread_yield_np()
}
}
FAT -> {
if (currentThreadId == state.ownerThreadId) {
// reentrant lock
val nestedFatLock = LockState(FAT, state.nestedLocks + 1, state.waiters, state.ownerThreadId, state.mutex)
if (lock.compareAndSet(state, nestedFatLock)) return
} else if (state.ownerThreadId != null) {
val fatLock = LockState(FAT, state.nestedLocks, state.waiters + 1, state.ownerThreadId, state.mutex)
if (lock.compareAndSet(state, fatLock)) {
fatLock.mutex!!.lock()
tryLockAfterResume(currentThreadId)
return
}
}
LockStatus.FAT -> {
abort()
}
}
}
}

public fun tryLock(): Boolean {
val currentThreadId = pthread_self()!!
val currentThreadId = threadId
while (true) {
val state = lock.value
if (state.status == UNLOCKED) {
val thinLock = LockState(THIN, 1, 0, currentThreadId)
if (lock.compareAndSet(state, thinLock))
val state = loadLockState()
if (state.status == LockStatus.UNLOCKED) {
val thinLock = LockWord.Thin(1u, currentThreadId)
if (compareSetAndFreeLock(state, thinLock.lockWord))
return true
} else {
if (currentThreadId == state.ownerThreadId) {
val nestedLock = LockState(state.status, state.nestedLocks + 1, state.waiters, currentThreadId, state.mutex)
if (lock.compareAndSet(state, nestedLock))
// FIXME what if fat?
if (currentThreadId == state.asThin().ownerTid) {
val nestedLock = LockWord.Thin(state.asThin().nested + 1u, currentThreadId)
if (compareSetAndFreeLock(state, nestedLock.lockWord))
return true
} else {
return false
Expand All @@ -85,71 +145,32 @@ public actual open class SynchronizedObject {
}

public fun unlock() {
val currentThreadId = pthread_self()!!
val currentThreadId = threadId
while (true) {
val state = lock.value
require(currentThreadId == state.ownerThreadId) { "Thin lock may be only released by the owner thread, expected: ${state.ownerThreadId}, real: $currentThreadId" }
val state = loadLockState()
when (state.status) {
THIN -> {
LockStatus.THIN -> {
val thinState = state.asThin()
require(currentThreadId == thinState.ownerTid) { "Thin lock may be only released by the owner thread, expected: ${thinState.ownerTid}, real: $currentThreadId" }
// nested unlock
if (state.nestedLocks == 1) {
val unlocked = LockState(UNLOCKED, 0, 0)
if (lock.compareAndSet(state, unlocked))
if (thinState.nested == 1u) {
val unlocked = LockWord.unlocked()
if (compareSetAndFreeLock(state, unlocked))
return
} else {
val releasedNestedLock =
LockState(THIN, state.nestedLocks - 1, state.waiters, state.ownerThreadId)
if (lock.compareAndSet(state, releasedNestedLock))
val releasedNestedLock = LockWord.Thin(thinState.nested - 1u, thinState.ownerTid)
if (compareSetAndFreeLock(state, releasedNestedLock.lockWord))
return
}
}
FAT -> {
if (state.nestedLocks == 1) {
// last nested unlock -> release completely, resume some waiter
val releasedLock = LockState(FAT, 0, state.waiters - 1, null, state.mutex)
if (lock.compareAndSet(state, releasedLock)) {
releasedLock.mutex!!.unlock()
return
}
} else {
// lock is still owned by the current thread
val releasedLock =
LockState(FAT, state.nestedLocks - 1, state.waiters, state.ownerThreadId, state.mutex)
if (lock.compareAndSet(state, releasedLock))
return
}
LockStatus.FAT -> {
require(false)
}
else -> error("It is not possible to unlock the mutex that is not obtained")
}
}
}

private fun tryLockAfterResume(threadId: pthread_t) {
while (true) {
val state = lock.value
val newState = if (state.waiters == 0) // deflate
LockState(THIN, 1, 0, threadId)
else
LockState(FAT, 1, state.waiters, threadId, state.mutex)
if (lock.compareAndSet(state, newState)) {
if (state.waiters == 0) {
state.mutex!!.unlock()
mutexPool.release(state.mutex)
}
return
}
}
}

protected class LockState(
val status: Status,
val nestedLocks: Int,
val waiters: Int,
val ownerThreadId: pthread_t? = null,
val mutex: CPointer<mutex_node_t>? = null
)

protected enum class Status { UNLOCKED, THIN, FAT }

private fun CPointer<mutex_node_t>.lock() = lock(this.pointed.mutex)

Expand Down

0 comments on commit ab2a74e

Please sign in to comment.