Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Lock local changes to assert transactions #203

Merged
merged 4 commits into from
Jun 13, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 50 additions & 0 deletions yorkie/src/androidTest/kotlin/dev/yorkie/core/ClientTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -840,4 +840,54 @@ class ClientTest {
collectJobs.forEach(Job::cancel)
}
}

@Test
fun test_duplicated_local_changes_not_sent_to_server() {
withTwoClientsAndDocuments(detachDocuments = false) { c1, c2, d1, d2, _ ->
val d1Events = mutableListOf<Document.Event>()
val d2Events = mutableListOf<Document.Event>()
val collectJobs = listOf(
launch(start = CoroutineStart.UNDISPATCHED) {
d1.events.filter { it is RemoteChange || it is LocalChange }
.collect(d1Events::add)
},
launch(start = CoroutineStart.UNDISPATCHED) {
d2.events.filter { it is RemoteChange || it is LocalChange }
.collect(d2Events::add)
},
)

listOf(
d1.updateAsync { root, _ ->
root.setNewTree(
"t",
element("doc") {
element("p") { text { "12" } }
element("p") { text { "34" } }
},
)
},
c1.syncAsync(),
c1.syncAsync(),
c1.syncAsync(),
c1.detachAsync(d1),
)

withTimeout(GENERAL_TIMEOUT) {
while (d2Events.isEmpty()) {
delay(50)
}
}
assertIs<RemoteChange>(d2Events.first())
assertTreesXmlEquals("<doc><p>12</p><p>34</p></doc>", d1, d2)

delay(500)

assert(d2Events.size == 1)

c2.detachAsync(d2).await()

collectJobs.forEach(Job::cancel)
}
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test test_duplicated_local_changes_not_sent_to_server is well-structured and aligns with the PR's objective to prevent duplicated local changes from being sent to the server. However, consider adding more detailed comments within the test to explain each step, especially the multiple syncAsync calls and their purpose in this context. This will improve maintainability and readability for other developers who may work on this test in the future.

}
200 changes: 109 additions & 91 deletions yorkie/src/main/kotlin/dev/yorkie/core/Client.kt
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ import kotlinx.coroutines.flow.fold
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.mapNotNull
import kotlinx.coroutines.launch
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import kotlinx.coroutines.withContext
import okhttp3.OkHttpClient

Expand Down Expand Up @@ -109,6 +111,11 @@ public class Client @VisibleForTesting internal constructor(
"x-shard-key" to listOf("${options.apiKey.orEmpty()}/$value"),
)

private val mutexForDocuments = mutableMapOf<Document.Key, Mutex>()

private val Document.mutex
get() = mutexForDocuments.getOrPut(key) { Mutex() }

public constructor(
host: String,
options: Options = Options(),
Expand Down Expand Up @@ -236,32 +243,35 @@ public class Client @VisibleForTesting internal constructor(
SyncResult(
document,
runCatching {
val request = pushPullChangesRequest {
clientId = requireClientId().value
changePack = document.createChangePack().toPBChangePack()
documentId = documentID
pushOnly = syncMode == SyncMode.RealtimePushOnly
}
val response = service.pushPullChanges(
request,
document.key.documentBasedRequestHeader,
).getOrThrow()
val responsePack = response.changePack.toChangePack()
// NOTE(7hong13, chacha912, hackerwins): If syncLoop already executed with
// PushPull, ignore the response when the syncMode is PushOnly.
val currentSyncMode = attachments.value[document.key]?.syncMode
if (responsePack.hasChanges &&
currentSyncMode == SyncMode.RealtimePushOnly ||
currentSyncMode == SyncMode.RealtimeSyncOff
) {
return@runCatching
}
document.mutex.withLock {
val request = pushPullChangesRequest {
clientId = requireClientId().value
changePack = document.createChangePack().toPBChangePack()
documentId = documentID
pushOnly = syncMode == SyncMode.RealtimePushOnly
}
val response = service.pushPullChanges(
request,
document.key.documentBasedRequestHeader,
).getOrThrow()
val responsePack = response.changePack.toChangePack()
// NOTE(7hong13, chacha912, hackerwins): If syncLoop already executed with
// PushPull, ignore the response when the syncMode is PushOnly.
val currentSyncMode = attachments.value[document.key]?.syncMode
if (responsePack.hasChanges &&
currentSyncMode == SyncMode.RealtimePushOnly ||
currentSyncMode == SyncMode.RealtimeSyncOff
) {
return@runCatching
}

document.applyChangePack(responsePack)
// NOTE(chacha912): If a document has been removed, watchStream should
// be disconnected to not receive an event for that document.
if (document.status == DocumentStatus.Removed) {
attachments.value -= document.key
document.applyChangePack(responsePack)
// NOTE(chacha912): If a document has been removed, watchStream should
// be disconnected to not receive an event for that document.
if (document.status == DocumentStatus.Removed) {
attachments.value -= document.key
mutexForDocuments.remove(document.key)
}
}
}.onFailure {
coroutineContext.ensureActive()
Expand Down Expand Up @@ -478,36 +488,38 @@ public class Client @VisibleForTesting internal constructor(
require(document.status == DocumentStatus.Detached) {
"document is not detached"
}
document.setActor(requireClientId())
document.updateAsync { _, presence ->
presence.put(initialPresence)
}.await()

val request = attachDocumentRequest {
clientId = requireClientId().value
changePack = document.createChangePack().toPBChangePack()
}
val response = service.attachDocument(
request,
document.key.documentBasedRequestHeader,
).getOrElse {
ensureActive()
return@async Result.failure(it)
}
val pack = response.changePack.toChangePack()
document.applyChangePack(pack)
document.mutex.withLock {
document.setActor(requireClientId())
document.updateAsync { _, presence ->
presence.put(initialPresence)
}.await()

if (document.status == DocumentStatus.Removed) {
return@async SUCCESS
}
val request = attachDocumentRequest {
clientId = requireClientId().value
changePack = document.createChangePack().toPBChangePack()
}
val response = service.attachDocument(
request,
document.key.documentBasedRequestHeader,
).getOrElse {
ensureActive()
return@async Result.failure(it)
}
val pack = response.changePack.toChangePack()
document.applyChangePack(pack)

document.status = DocumentStatus.Attached
attachments.value += document.key to Attachment(
document,
response.documentId,
syncMode,
)
waitForInitialization(document.key)
if (document.status == DocumentStatus.Removed) {
return@async SUCCESS
}

document.status = DocumentStatus.Attached
attachments.value += document.key to Attachment(
document,
response.documentId,
syncMode,
)
waitForInitialization(document.key)
}
SUCCESS
}
}
Expand All @@ -525,30 +537,33 @@ public class Client @VisibleForTesting internal constructor(
check(isActive) {
"client is not active"
}
val attachment = attachments.value[document.key]
?: throw IllegalArgumentException("document is not attached")
document.mutex.withLock {
val attachment = attachments.value[document.key]
?: throw IllegalArgumentException("document is not attached")

document.updateAsync { _, presence ->
presence.clear()
}.await()
document.updateAsync { _, presence ->
presence.clear()
}.await()

val request = detachDocumentRequest {
clientId = requireClientId().value
changePack = document.createChangePack().toPBChangePack()
documentId = attachment.documentID
}
val response = service.detachDocument(
request,
document.key.documentBasedRequestHeader,
).getOrElse {
ensureActive()
return@async Result.failure(it)
}
val pack = response.changePack.toChangePack()
document.applyChangePack(pack)
if (document.status != DocumentStatus.Removed) {
document.status = DocumentStatus.Detached
attachments.value -= document.key
val request = detachDocumentRequest {
clientId = requireClientId().value
changePack = document.createChangePack().toPBChangePack()
documentId = attachment.documentID
}
val response = service.detachDocument(
request,
document.key.documentBasedRequestHeader,
).getOrElse {
ensureActive()
return@async Result.failure(it)
}
val pack = response.changePack.toChangePack()
document.applyChangePack(pack)
if (document.status != DocumentStatus.Removed) {
document.status = DocumentStatus.Detached
attachments.value -= document.key
mutexForDocuments.remove(document.key)
}
}
SUCCESS
}
Expand Down Expand Up @@ -586,24 +601,27 @@ public class Client @VisibleForTesting internal constructor(
check(isActive) {
"client is not active"
}
val attachment = attachments.value[document.key]
?: throw IllegalArgumentException("document is not attached")
document.mutex.withLock {
val attachment = attachments.value[document.key]
?: throw IllegalArgumentException("document is not attached")

val request = removeDocumentRequest {
clientId = requireClientId().value
changePack = document.createChangePack(forceRemove = true).toPBChangePack()
documentId = attachment.documentID
}
val response = service.removeDocument(
request,
document.key.documentBasedRequestHeader,
).getOrElse {
ensureActive()
return@async Result.failure(it)
val request = removeDocumentRequest {
clientId = requireClientId().value
changePack = document.createChangePack(forceRemove = true).toPBChangePack()
documentId = attachment.documentID
}
val response = service.removeDocument(
request,
document.key.documentBasedRequestHeader,
).getOrElse {
ensureActive()
return@async Result.failure(it)
}
val pack = response.changePack.toChangePack()
document.applyChangePack(pack)
attachments.value -= document.key
mutexForDocuments.remove(document.key)
}
val pack = response.changePack.toChangePack()
document.applyChangePack(pack)
attachments.value -= document.key
SUCCESS
}
}
Expand Down
38 changes: 16 additions & 22 deletions yorkie/src/main/kotlin/dev/yorkie/document/Document.kt
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,6 @@ import kotlinx.coroutines.flow.filterNot
import kotlinx.coroutines.flow.mapNotNull
import kotlinx.coroutines.flow.stateIn
import kotlinx.coroutines.launch
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import kotlinx.coroutines.withContext

/**
Expand Down Expand Up @@ -128,8 +126,6 @@ public class Document(
.takeIf { status == DocumentStatus.Attached }
.orEmpty()

private val changeMutex = Mutex()

/**
* Executes the given [updater] to update this document.
*/
Expand Down Expand Up @@ -259,29 +255,27 @@ public class Document(
* 3. Do Garbage collection.
*/
internal suspend fun applyChangePack(pack: ChangePack): Unit = withContext(dispatcher) {
changeMutex.withLock {
if (pack.hasSnapshot) {
applySnapshot(pack.checkPoint.serverSeq, checkNotNull(pack.snapshot))
} else if (pack.hasChanges) {
applyChanges(pack.changes)
}
if (pack.hasSnapshot) {
applySnapshot(pack.checkPoint.serverSeq, checkNotNull(pack.snapshot))
} else if (pack.hasChanges) {
applyChanges(pack.changes)
}

val iterator = localChanges.iterator()
while (iterator.hasNext()) {
val change = iterator.next()
if (change.id.clientSeq > pack.checkPoint.clientSeq) {
break
}
iterator.remove()
val iterator = localChanges.iterator()
while (iterator.hasNext()) {
val change = iterator.next()
if (change.id.clientSeq > pack.checkPoint.clientSeq) {
break
}
iterator.remove()
}

checkPoint = checkPoint.forward(pack.checkPoint)
checkPoint = checkPoint.forward(pack.checkPoint)

pack.minSyncedTicket?.let(::garbageCollect)
pack.minSyncedTicket?.let(::garbageCollect)

if (pack.isRemoved) {
status = DocumentStatus.Removed
}
if (pack.isRemoved) {
status = DocumentStatus.Removed
}
}

Expand Down
Loading