Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Lock local changes to assert transactions #203

Merged
merged 4 commits into from
Jun 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
minSdk = "24"
compileSdk = "34"
targetSdk = "34"
agp = "8.4.1"
agp = "8.4.2"
connectKotlin = "0.6.1"
okhttp = "4.12.0"
coroutines = "1.8.1"
Expand Down
50 changes: 50 additions & 0 deletions yorkie/src/androidTest/kotlin/dev/yorkie/core/ClientTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -840,4 +840,54 @@ class ClientTest {
collectJobs.forEach(Job::cancel)
}
}

@Test
fun test_duplicated_local_changes_not_sent_to_server() {
withTwoClientsAndDocuments(detachDocuments = false) { c1, c2, d1, d2, _ ->
val d1Events = mutableListOf<Document.Event>()
val d2Events = mutableListOf<Document.Event>()
val collectJobs = listOf(
launch(start = CoroutineStart.UNDISPATCHED) {
d1.events.filter { it is RemoteChange || it is LocalChange }
.collect(d1Events::add)
},
launch(start = CoroutineStart.UNDISPATCHED) {
d2.events.filter { it is RemoteChange || it is LocalChange }
.collect(d2Events::add)
},
)

listOf(
d1.updateAsync { root, _ ->
root.setNewTree(
"t",
element("doc") {
element("p") { text { "12" } }
element("p") { text { "34" } }
},
)
},
c1.syncAsync(),
c1.syncAsync(),
c1.syncAsync(),
c1.detachAsync(d1),
)

withTimeout(GENERAL_TIMEOUT) {
while (d2Events.isEmpty()) {
delay(50)
}
}
assertIs<RemoteChange>(d2Events.first())
assertTreesXmlEquals("<doc><p>12</p><p>34</p></doc>", d1, d2)

delay(500)

assertEquals(1, d2Events.size)

c2.detachAsync(d2).await()

collectJobs.forEach(Job::cancel)
}
}
}
200 changes: 109 additions & 91 deletions yorkie/src/main/kotlin/dev/yorkie/core/Client.kt
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ import kotlinx.coroutines.flow.fold
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.mapNotNull
import kotlinx.coroutines.launch
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import kotlinx.coroutines.withContext
import okhttp3.OkHttpClient

Expand Down Expand Up @@ -109,6 +111,11 @@ public class Client @VisibleForTesting internal constructor(
"x-shard-key" to listOf("${options.apiKey.orEmpty()}/$value"),
)

private val mutexForDocuments = mutableMapOf<Document.Key, Mutex>()

private val Document.mutex
get() = mutexForDocuments.getOrPut(key) { Mutex() }

public constructor(
host: String,
options: Options = Options(),
Expand Down Expand Up @@ -236,32 +243,35 @@ public class Client @VisibleForTesting internal constructor(
SyncResult(
document,
runCatching {
val request = pushPullChangesRequest {
clientId = requireClientId().value
changePack = document.createChangePack().toPBChangePack()
documentId = documentID
pushOnly = syncMode == SyncMode.RealtimePushOnly
}
val response = service.pushPullChanges(
request,
document.key.documentBasedRequestHeader,
).getOrThrow()
val responsePack = response.changePack.toChangePack()
// NOTE(7hong13, chacha912, hackerwins): If syncLoop already executed with
// PushPull, ignore the response when the syncMode is PushOnly.
val currentSyncMode = attachments.value[document.key]?.syncMode
if (responsePack.hasChanges &&
currentSyncMode == SyncMode.RealtimePushOnly ||
currentSyncMode == SyncMode.RealtimeSyncOff
) {
return@runCatching
}
document.mutex.withLock {
val request = pushPullChangesRequest {
clientId = requireClientId().value
changePack = document.createChangePack().toPBChangePack()
documentId = documentID
pushOnly = syncMode == SyncMode.RealtimePushOnly
}
val response = service.pushPullChanges(
request,
document.key.documentBasedRequestHeader,
).getOrThrow()
val responsePack = response.changePack.toChangePack()
// NOTE(7hong13, chacha912, hackerwins): If syncLoop already executed with
// PushPull, ignore the response when the syncMode is PushOnly.
val currentSyncMode = attachments.value[document.key]?.syncMode
if (responsePack.hasChanges &&
currentSyncMode == SyncMode.RealtimePushOnly ||
currentSyncMode == SyncMode.RealtimeSyncOff
) {
return@runCatching
}

document.applyChangePack(responsePack)
// NOTE(chacha912): If a document has been removed, watchStream should
// be disconnected to not receive an event for that document.
if (document.status == DocumentStatus.Removed) {
attachments.value -= document.key
document.applyChangePack(responsePack)
// NOTE(chacha912): If a document has been removed, watchStream should
// be disconnected to not receive an event for that document.
if (document.status == DocumentStatus.Removed) {
attachments.value -= document.key
mutexForDocuments.remove(document.key)
}
}
}.onFailure {
coroutineContext.ensureActive()
Expand Down Expand Up @@ -478,36 +488,38 @@ public class Client @VisibleForTesting internal constructor(
require(document.status == DocumentStatus.Detached) {
"document is not detached"
}
document.setActor(requireClientId())
document.updateAsync { _, presence ->
presence.put(initialPresence)
}.await()

val request = attachDocumentRequest {
clientId = requireClientId().value
changePack = document.createChangePack().toPBChangePack()
}
val response = service.attachDocument(
request,
document.key.documentBasedRequestHeader,
).getOrElse {
ensureActive()
return@async Result.failure(it)
}
val pack = response.changePack.toChangePack()
document.applyChangePack(pack)
document.mutex.withLock {
document.setActor(requireClientId())
document.updateAsync { _, presence ->
presence.put(initialPresence)
}.await()

if (document.status == DocumentStatus.Removed) {
return@async SUCCESS
}
val request = attachDocumentRequest {
clientId = requireClientId().value
changePack = document.createChangePack().toPBChangePack()
}
val response = service.attachDocument(
request,
document.key.documentBasedRequestHeader,
).getOrElse {
ensureActive()
return@async Result.failure(it)
}
val pack = response.changePack.toChangePack()
document.applyChangePack(pack)

document.status = DocumentStatus.Attached
attachments.value += document.key to Attachment(
document,
response.documentId,
syncMode,
)
waitForInitialization(document.key)
if (document.status == DocumentStatus.Removed) {
return@async SUCCESS
}

document.status = DocumentStatus.Attached
attachments.value += document.key to Attachment(
document,
response.documentId,
syncMode,
)
waitForInitialization(document.key)
}
SUCCESS
}
}
Expand All @@ -525,30 +537,33 @@ public class Client @VisibleForTesting internal constructor(
check(isActive) {
"client is not active"
}
val attachment = attachments.value[document.key]
?: throw IllegalArgumentException("document is not attached")
document.mutex.withLock {
val attachment = attachments.value[document.key]
?: throw IllegalArgumentException("document is not attached")

document.updateAsync { _, presence ->
presence.clear()
}.await()
document.updateAsync { _, presence ->
presence.clear()
}.await()

val request = detachDocumentRequest {
clientId = requireClientId().value
changePack = document.createChangePack().toPBChangePack()
documentId = attachment.documentID
}
val response = service.detachDocument(
request,
document.key.documentBasedRequestHeader,
).getOrElse {
ensureActive()
return@async Result.failure(it)
}
val pack = response.changePack.toChangePack()
document.applyChangePack(pack)
if (document.status != DocumentStatus.Removed) {
document.status = DocumentStatus.Detached
attachments.value -= document.key
val request = detachDocumentRequest {
clientId = requireClientId().value
changePack = document.createChangePack().toPBChangePack()
documentId = attachment.documentID
}
val response = service.detachDocument(
request,
document.key.documentBasedRequestHeader,
).getOrElse {
ensureActive()
return@async Result.failure(it)
}
val pack = response.changePack.toChangePack()
document.applyChangePack(pack)
if (document.status != DocumentStatus.Removed) {
document.status = DocumentStatus.Detached
attachments.value -= document.key
mutexForDocuments.remove(document.key)
}
}
SUCCESS
}
Expand Down Expand Up @@ -586,24 +601,27 @@ public class Client @VisibleForTesting internal constructor(
check(isActive) {
"client is not active"
}
val attachment = attachments.value[document.key]
?: throw IllegalArgumentException("document is not attached")
document.mutex.withLock {
val attachment = attachments.value[document.key]
?: throw IllegalArgumentException("document is not attached")

val request = removeDocumentRequest {
clientId = requireClientId().value
changePack = document.createChangePack(forceRemove = true).toPBChangePack()
documentId = attachment.documentID
}
val response = service.removeDocument(
request,
document.key.documentBasedRequestHeader,
).getOrElse {
ensureActive()
return@async Result.failure(it)
val request = removeDocumentRequest {
clientId = requireClientId().value
changePack = document.createChangePack(forceRemove = true).toPBChangePack()
documentId = attachment.documentID
}
val response = service.removeDocument(
request,
document.key.documentBasedRequestHeader,
).getOrElse {
ensureActive()
return@async Result.failure(it)
}
val pack = response.changePack.toChangePack()
document.applyChangePack(pack)
attachments.value -= document.key
mutexForDocuments.remove(document.key)
}
val pack = response.changePack.toChangePack()
document.applyChangePack(pack)
attachments.value -= document.key
SUCCESS
}
}
Expand Down
38 changes: 16 additions & 22 deletions yorkie/src/main/kotlin/dev/yorkie/document/Document.kt
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,6 @@ import kotlinx.coroutines.flow.filterNot
import kotlinx.coroutines.flow.mapNotNull
import kotlinx.coroutines.flow.stateIn
import kotlinx.coroutines.launch
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import kotlinx.coroutines.withContext

/**
Expand Down Expand Up @@ -128,8 +126,6 @@ public class Document(
.takeIf { status == DocumentStatus.Attached }
.orEmpty()

private val changeMutex = Mutex()

/**
* Executes the given [updater] to update this document.
*/
Expand Down Expand Up @@ -259,29 +255,27 @@ public class Document(
* 3. Do Garbage collection.
*/
internal suspend fun applyChangePack(pack: ChangePack): Unit = withContext(dispatcher) {
changeMutex.withLock {
if (pack.hasSnapshot) {
applySnapshot(pack.checkPoint.serverSeq, checkNotNull(pack.snapshot))
} else if (pack.hasChanges) {
applyChanges(pack.changes)
}
if (pack.hasSnapshot) {
applySnapshot(pack.checkPoint.serverSeq, checkNotNull(pack.snapshot))
} else if (pack.hasChanges) {
applyChanges(pack.changes)
}

val iterator = localChanges.iterator()
while (iterator.hasNext()) {
val change = iterator.next()
if (change.id.clientSeq > pack.checkPoint.clientSeq) {
break
}
iterator.remove()
val iterator = localChanges.iterator()
while (iterator.hasNext()) {
val change = iterator.next()
if (change.id.clientSeq > pack.checkPoint.clientSeq) {
break
}
iterator.remove()
}

checkPoint = checkPoint.forward(pack.checkPoint)
checkPoint = checkPoint.forward(pack.checkPoint)

pack.minSyncedTicket?.let(::garbageCollect)
pack.minSyncedTicket?.let(::garbageCollect)

if (pack.isRemoved) {
status = DocumentStatus.Removed
}
if (pack.isRemoved) {
status = DocumentStatus.Removed
}
}

Expand Down
Loading