Skip to content

Commit

Permalink
refactor(sync): enable concurrent sync start/stop
Browse files Browse the repository at this point in the history
Removes the concept of ConnectionPolicy, replacing it with `SyncExecutor` operations.
Multiple entities can easily call `syncExecutor.request {}`, which is far easier than having to manage the ConnectionPolicy flags.
  • Loading branch information
vitorhugods committed Feb 21, 2025
1 parent 7c462ea commit 4cb134a
Show file tree
Hide file tree
Showing 34 changed files with 1,414 additions and 1,179 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@ import com.wire.kalium.logic.feature.publicuser.GetAllContactsResult
import kotlinx.coroutines.flow.first

suspend fun getConversations(userSession: UserSessionScope): List<Conversation> {
userSession.syncManager.waitUntilLive()
userSession.syncExecutor.request {
waitUntilLiveOrFailure()
}

val conversations = userSession.conversations.getConversations().let {
when (it) {
Expand All @@ -53,7 +55,9 @@ suspend fun UserSessionScope.listConversations(): List<Conversation> {
}

suspend fun UserSessionScope.selectConversation(): Conversation {
syncManager.waitUntilLive()
syncExecutor.request {
waitUntilLiveOrFailure()
}

val conversations = listConversations()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import com.wire.kalium.logic.feature.auth.autoVersioningAuth.AutoVersionAuthScop
import com.wire.kalium.logic.feature.client.RegisterClientResult
import com.wire.kalium.logic.feature.client.RegisterClientUseCase
import com.wire.kalium.logic.feature.server.GetServerConfigResult
import com.wire.kalium.util.DelicateKaliumApi
import kotlinx.coroutines.runBlocking

class LoginCommand : CliktCommand(name = "login") {
Expand Down Expand Up @@ -110,6 +111,7 @@ class LoginCommand : CliktCommand(name = "login") {
}
}

@OptIn(DelicateKaliumApi::class)
override fun run(): Unit = runBlocking {
val loginResult = authenticate().let {
if (it !is AuthenticationResult.Success) {
Expand All @@ -132,6 +134,10 @@ class LoginCommand : CliktCommand(name = "login") {
}
}

userSession = currentContext.findOrSetObject { coreLogic.getSessionScope(userId) }
userSession = currentContext.findOrSetObject {
coreLogic.getSessionScope(userId).also {
it.syncExecutor.request { keepSyncAlwaysOn() }
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@ class UpdateSupportedProtocolsCommand : CliktCommand(name = "update-supported-pr
private val userSession by requireObject<UserSessionScope>()

override fun run() = runBlocking {
userSession.syncManager.waitUntilLive()
userSession.syncExecutor.request {
waitUntilLiveOrFailure()
}
userSession.users.updateSupportedProtocols().fold({ failure ->
throw PrintMessage("updating supported protocols failed: $failure")
}, {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -866,7 +866,9 @@ internal class MLSConversationDataSource(

private suspend fun keepCommitAndRetry(groupID: GroupID): Either<CoreFailure, Unit> {
kaliumLogger.w("Migrating failed commit to new epoch and re-trying.")

// FIXME: Sync Cyclic Dependency.
// This function can be called DURING sync. And at the same time, it waits for Sync.
// Perhaps it should be scheduled for a retry in the future, after Sync is done.
return syncManager.waitUntilLiveOrFailure().flatMap {
commitPendingProposalsWithoutRetry(groupID)
}
Expand All @@ -878,6 +880,9 @@ internal class MLSConversationDataSource(
wrapMLSRequest {
mlsClient.clearPendingCommit(idMapper.toCryptoModel(groupID))
}.flatMap {
// FIXME: Sync Cyclic Dependency.
// This function can be called DURING sync. And at the same time, it waits for Sync.
// Perhaps it should be scheduled for a retry in the future, after Sync is done.
syncManager.waitUntilLiveOrFailure()
}
}
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -43,20 +43,7 @@ internal interface IncrementalSyncRepository {
*/
val incrementalSyncState: Flow<IncrementalSyncStatus>

/**
* Buffered flow of [ConnectionPolicy].
* - Has a replay size of 1, so the latest
* value is always immediately available for new observers.
* - Doesn't emit repeated values.
* - It has a limited buffer of size [BUFFER_SIZE]
* that will drop the oldest values if the buffer is full
* to prevent emissions from being suspended due to slow
* collectors.
* @see [BufferOverflow]
*/
val connectionPolicyState: Flow<ConnectionPolicy>
suspend fun updateIncrementalSyncState(newState: IncrementalSyncStatus)
suspend fun setConnectionPolicy(connectionPolicy: ConnectionPolicy)

companion object {
// The same default buffer size used by Coroutines channels
Expand All @@ -80,28 +67,13 @@ internal class InMemoryIncrementalSyncRepository(
.asSharedFlow()
.distinctUntilChanged()

private val _connectionPolicy = MutableSharedFlow<ConnectionPolicy>(
replay = 1,
extraBufferCapacity = BUFFER_SIZE,
onBufferOverflow = BufferOverflow.DROP_OLDEST,
)

override val connectionPolicyState = _connectionPolicy
.asSharedFlow()
.distinctUntilChanged()

init {
_syncState.tryEmit(IncrementalSyncStatus.Pending)
_connectionPolicy.tryEmit(ConnectionPolicy.KEEP_ALIVE)
}

override suspend fun updateIncrementalSyncState(newState: IncrementalSyncStatus) {
logger.i("IncrementalSyncStatus Updated FROM:${_syncState.first()}; TO: $newState")
_syncState.emit(newState)
}

override suspend fun setConnectionPolicy(connectionPolicy: ConnectionPolicy) {
logger.i("IncrementalSync Connection Policy changed: $connectionPolicy")
_connectionPolicy.emit(connectionPolicy)
}
}
Loading

0 comments on commit 4cb134a

Please sign in to comment.