diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 1d0ba1f83..072f5e7a9 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -2,7 +2,7 @@ minSdk = "24" compileSdk = "34" targetSdk = "34" -agp = "8.4.1" +agp = "8.4.2" connectKotlin = "0.6.1" okhttp = "4.12.0" coroutines = "1.8.1" diff --git a/yorkie/src/androidTest/kotlin/dev/yorkie/core/ClientTest.kt b/yorkie/src/androidTest/kotlin/dev/yorkie/core/ClientTest.kt index e61a1e019..35930ce37 100644 --- a/yorkie/src/androidTest/kotlin/dev/yorkie/core/ClientTest.kt +++ b/yorkie/src/androidTest/kotlin/dev/yorkie/core/ClientTest.kt @@ -840,4 +840,54 @@ class ClientTest { collectJobs.forEach(Job::cancel) } } + + @Test + fun test_duplicated_local_changes_not_sent_to_server() { + withTwoClientsAndDocuments(detachDocuments = false) { c1, c2, d1, d2, _ -> + val d1Events = mutableListOf() + val d2Events = mutableListOf() + val collectJobs = listOf( + launch(start = CoroutineStart.UNDISPATCHED) { + d1.events.filter { it is RemoteChange || it is LocalChange } + .collect(d1Events::add) + }, + launch(start = CoroutineStart.UNDISPATCHED) { + d2.events.filter { it is RemoteChange || it is LocalChange } + .collect(d2Events::add) + }, + ) + + listOf( + d1.updateAsync { root, _ -> + root.setNewTree( + "t", + element("doc") { + element("p") { text { "12" } } + element("p") { text { "34" } } + }, + ) + }, + c1.syncAsync(), + c1.syncAsync(), + c1.syncAsync(), + c1.detachAsync(d1), + ) + + withTimeout(GENERAL_TIMEOUT) { + while (d2Events.isEmpty()) { + delay(50) + } + } + assertIs(d2Events.first()) + assertTreesXmlEquals("

12

34

", d1, d2) + + delay(500) + + assertEquals(1, d2Events.size) + + c2.detachAsync(d2).await() + + collectJobs.forEach(Job::cancel) + } + } } diff --git a/yorkie/src/main/kotlin/dev/yorkie/core/Client.kt b/yorkie/src/main/kotlin/dev/yorkie/core/Client.kt index a64df393e..9baa4cc23 100644 --- a/yorkie/src/main/kotlin/dev/yorkie/core/Client.kt +++ b/yorkie/src/main/kotlin/dev/yorkie/core/Client.kt @@ -72,6 +72,8 @@ import kotlinx.coroutines.flow.fold import kotlinx.coroutines.flow.map import kotlinx.coroutines.flow.mapNotNull import kotlinx.coroutines.launch +import kotlinx.coroutines.sync.Mutex +import kotlinx.coroutines.sync.withLock import kotlinx.coroutines.withContext import okhttp3.OkHttpClient @@ -109,6 +111,11 @@ public class Client @VisibleForTesting internal constructor( "x-shard-key" to listOf("${options.apiKey.orEmpty()}/$value"), ) + private val mutexForDocuments = mutableMapOf() + + private val Document.mutex + get() = mutexForDocuments.getOrPut(key) { Mutex() } + public constructor( host: String, options: Options = Options(), @@ -236,32 +243,35 @@ public class Client @VisibleForTesting internal constructor( SyncResult( document, runCatching { - val request = pushPullChangesRequest { - clientId = requireClientId().value - changePack = document.createChangePack().toPBChangePack() - documentId = documentID - pushOnly = syncMode == SyncMode.RealtimePushOnly - } - val response = service.pushPullChanges( - request, - document.key.documentBasedRequestHeader, - ).getOrThrow() - val responsePack = response.changePack.toChangePack() - // NOTE(7hong13, chacha912, hackerwins): If syncLoop already executed with - // PushPull, ignore the response when the syncMode is PushOnly. - val currentSyncMode = attachments.value[document.key]?.syncMode - if (responsePack.hasChanges && - currentSyncMode == SyncMode.RealtimePushOnly || - currentSyncMode == SyncMode.RealtimeSyncOff - ) { - return@runCatching - } + document.mutex.withLock { + val request = pushPullChangesRequest { + clientId = requireClientId().value + changePack = document.createChangePack().toPBChangePack() + documentId = documentID + pushOnly = syncMode == SyncMode.RealtimePushOnly + } + val response = service.pushPullChanges( + request, + document.key.documentBasedRequestHeader, + ).getOrThrow() + val responsePack = response.changePack.toChangePack() + // NOTE(7hong13, chacha912, hackerwins): If syncLoop already executed with + // PushPull, ignore the response when the syncMode is PushOnly. + val currentSyncMode = attachments.value[document.key]?.syncMode + if (responsePack.hasChanges && + currentSyncMode == SyncMode.RealtimePushOnly || + currentSyncMode == SyncMode.RealtimeSyncOff + ) { + return@runCatching + } - document.applyChangePack(responsePack) - // NOTE(chacha912): If a document has been removed, watchStream should - // be disconnected to not receive an event for that document. - if (document.status == DocumentStatus.Removed) { - attachments.value -= document.key + document.applyChangePack(responsePack) + // NOTE(chacha912): If a document has been removed, watchStream should + // be disconnected to not receive an event for that document. + if (document.status == DocumentStatus.Removed) { + attachments.value -= document.key + mutexForDocuments.remove(document.key) + } } }.onFailure { coroutineContext.ensureActive() @@ -478,36 +488,38 @@ public class Client @VisibleForTesting internal constructor( require(document.status == DocumentStatus.Detached) { "document is not detached" } - document.setActor(requireClientId()) - document.updateAsync { _, presence -> - presence.put(initialPresence) - }.await() - - val request = attachDocumentRequest { - clientId = requireClientId().value - changePack = document.createChangePack().toPBChangePack() - } - val response = service.attachDocument( - request, - document.key.documentBasedRequestHeader, - ).getOrElse { - ensureActive() - return@async Result.failure(it) - } - val pack = response.changePack.toChangePack() - document.applyChangePack(pack) + document.mutex.withLock { + document.setActor(requireClientId()) + document.updateAsync { _, presence -> + presence.put(initialPresence) + }.await() - if (document.status == DocumentStatus.Removed) { - return@async SUCCESS - } + val request = attachDocumentRequest { + clientId = requireClientId().value + changePack = document.createChangePack().toPBChangePack() + } + val response = service.attachDocument( + request, + document.key.documentBasedRequestHeader, + ).getOrElse { + ensureActive() + return@async Result.failure(it) + } + val pack = response.changePack.toChangePack() + document.applyChangePack(pack) - document.status = DocumentStatus.Attached - attachments.value += document.key to Attachment( - document, - response.documentId, - syncMode, - ) - waitForInitialization(document.key) + if (document.status == DocumentStatus.Removed) { + return@async SUCCESS + } + + document.status = DocumentStatus.Attached + attachments.value += document.key to Attachment( + document, + response.documentId, + syncMode, + ) + waitForInitialization(document.key) + } SUCCESS } } @@ -525,30 +537,33 @@ public class Client @VisibleForTesting internal constructor( check(isActive) { "client is not active" } - val attachment = attachments.value[document.key] - ?: throw IllegalArgumentException("document is not attached") + document.mutex.withLock { + val attachment = attachments.value[document.key] + ?: throw IllegalArgumentException("document is not attached") - document.updateAsync { _, presence -> - presence.clear() - }.await() + document.updateAsync { _, presence -> + presence.clear() + }.await() - val request = detachDocumentRequest { - clientId = requireClientId().value - changePack = document.createChangePack().toPBChangePack() - documentId = attachment.documentID - } - val response = service.detachDocument( - request, - document.key.documentBasedRequestHeader, - ).getOrElse { - ensureActive() - return@async Result.failure(it) - } - val pack = response.changePack.toChangePack() - document.applyChangePack(pack) - if (document.status != DocumentStatus.Removed) { - document.status = DocumentStatus.Detached - attachments.value -= document.key + val request = detachDocumentRequest { + clientId = requireClientId().value + changePack = document.createChangePack().toPBChangePack() + documentId = attachment.documentID + } + val response = service.detachDocument( + request, + document.key.documentBasedRequestHeader, + ).getOrElse { + ensureActive() + return@async Result.failure(it) + } + val pack = response.changePack.toChangePack() + document.applyChangePack(pack) + if (document.status != DocumentStatus.Removed) { + document.status = DocumentStatus.Detached + attachments.value -= document.key + mutexForDocuments.remove(document.key) + } } SUCCESS } @@ -586,24 +601,27 @@ public class Client @VisibleForTesting internal constructor( check(isActive) { "client is not active" } - val attachment = attachments.value[document.key] - ?: throw IllegalArgumentException("document is not attached") + document.mutex.withLock { + val attachment = attachments.value[document.key] + ?: throw IllegalArgumentException("document is not attached") - val request = removeDocumentRequest { - clientId = requireClientId().value - changePack = document.createChangePack(forceRemove = true).toPBChangePack() - documentId = attachment.documentID - } - val response = service.removeDocument( - request, - document.key.documentBasedRequestHeader, - ).getOrElse { - ensureActive() - return@async Result.failure(it) + val request = removeDocumentRequest { + clientId = requireClientId().value + changePack = document.createChangePack(forceRemove = true).toPBChangePack() + documentId = attachment.documentID + } + val response = service.removeDocument( + request, + document.key.documentBasedRequestHeader, + ).getOrElse { + ensureActive() + return@async Result.failure(it) + } + val pack = response.changePack.toChangePack() + document.applyChangePack(pack) + attachments.value -= document.key + mutexForDocuments.remove(document.key) } - val pack = response.changePack.toChangePack() - document.applyChangePack(pack) - attachments.value -= document.key SUCCESS } } diff --git a/yorkie/src/main/kotlin/dev/yorkie/document/Document.kt b/yorkie/src/main/kotlin/dev/yorkie/document/Document.kt index 6224c490e..85eb53ee0 100644 --- a/yorkie/src/main/kotlin/dev/yorkie/document/Document.kt +++ b/yorkie/src/main/kotlin/dev/yorkie/document/Document.kt @@ -51,8 +51,6 @@ import kotlinx.coroutines.flow.filterNot import kotlinx.coroutines.flow.mapNotNull import kotlinx.coroutines.flow.stateIn import kotlinx.coroutines.launch -import kotlinx.coroutines.sync.Mutex -import kotlinx.coroutines.sync.withLock import kotlinx.coroutines.withContext /** @@ -128,8 +126,6 @@ public class Document( .takeIf { status == DocumentStatus.Attached } .orEmpty() - private val changeMutex = Mutex() - /** * Executes the given [updater] to update this document. */ @@ -259,29 +255,27 @@ public class Document( * 3. Do Garbage collection. */ internal suspend fun applyChangePack(pack: ChangePack): Unit = withContext(dispatcher) { - changeMutex.withLock { - if (pack.hasSnapshot) { - applySnapshot(pack.checkPoint.serverSeq, checkNotNull(pack.snapshot)) - } else if (pack.hasChanges) { - applyChanges(pack.changes) - } + if (pack.hasSnapshot) { + applySnapshot(pack.checkPoint.serverSeq, checkNotNull(pack.snapshot)) + } else if (pack.hasChanges) { + applyChanges(pack.changes) + } - val iterator = localChanges.iterator() - while (iterator.hasNext()) { - val change = iterator.next() - if (change.id.clientSeq > pack.checkPoint.clientSeq) { - break - } - iterator.remove() + val iterator = localChanges.iterator() + while (iterator.hasNext()) { + val change = iterator.next() + if (change.id.clientSeq > pack.checkPoint.clientSeq) { + break } + iterator.remove() + } - checkPoint = checkPoint.forward(pack.checkPoint) + checkPoint = checkPoint.forward(pack.checkPoint) - pack.minSyncedTicket?.let(::garbageCollect) + pack.minSyncedTicket?.let(::garbageCollect) - if (pack.isRemoved) { - status = DocumentStatus.Removed - } + if (pack.isRemoved) { + status = DocumentStatus.Removed } }