Skip to content

Commit

Permalink
Reset online clients when stream is disconnected (#165)
Browse files Browse the repository at this point in the history
* Remove Client eventStream

* Reset online clients when stream is disconnected
  • Loading branch information
humdrum authored May 17, 2024
1 parent cde554c commit fdccb26
Show file tree
Hide file tree
Showing 4 changed files with 97 additions and 14 deletions.
18 changes: 11 additions & 7 deletions Sources/Core/Client.swift
Original file line number Diff line number Diff line change
Expand Up @@ -611,11 +611,19 @@ public actor Client {

Task {
if let stream = self.attachmentMap[docKey]?.remoteWatchStream?.responseStream {
await self.attachmentMap[docKey]?.doc.publishConnectionEvent(.connected)

do {
for try await response in stream {
await self.handleWatchDocumentsResponse(docKey: docKey, response: response)
}
} catch {
await self.attachmentMap[docKey]?.doc.resetOnlineClients()
await self.attachmentMap[docKey]?.doc.publishInitializedEvent()
await self.attachmentMap[docKey]?.doc.publishConnectionEvent(.disconnected)

Logger.debug("[WD] c:\"\(self.key)\" unwatches")

if let status = error as? GRPCStatus, status.code == .cancelled {
// Canceled by Client by detach. so there is No need to reconnect.
} else {
Expand Down Expand Up @@ -665,7 +673,9 @@ public actor Client {
switch body {
case .initialization(let initialization):
var onlineClients = Set<ActorID>()
for pbClientID in initialization.clientIds {
let actorID = await self.attachmentMap[docKey]?.doc.actorID

for pbClientID in initialization.clientIds.filter({ $0 != actorID }) {
onlineClients.insert(pbClientID)
}

Expand Down Expand Up @@ -716,12 +726,6 @@ public actor Client {

self.attachmentMap[docKey]?.watchLoopReconnectTimer?.invalidate()
self.attachmentMap[docKey]?.watchLoopReconnectTimer = nil

Logger.debug("[WD] c:\"\(self.key)\" unwatches")

Task {
await self.attachmentMap[docKey]?.doc.publishConnectionEvent(.disconnected)
}
}

private func onStreamDisconnect(_ docKey: DocumentKey) throws {
Expand Down
3 changes: 2 additions & 1 deletion Sources/Document/DocEvent.swift
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,8 @@ public struct InitializedEvent: DocEvent {
/**
* InitializedEvent type
*/
public var value: [PeerElement]
public let source: OpSource = .local
public let value: [PeerElement]
}

public struct WatchedEvent: DocEvent {
Expand Down
34 changes: 28 additions & 6 deletions Sources/Document/Document.swift
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ public actor Document {
throw YorkieError.documentRemoved(message: "\(self) is removed.")
}

guard let actorID = self.changeID.getActorID() else {
guard let actorID = self.actorID else {
throw YorkieError.unexpected(message: "actor ID is null.")
}

Expand Down Expand Up @@ -327,6 +327,10 @@ public actor Document {
// TODOs also apply into root.
}

var actorID: ActorID? {
self.changeID.getActorID()
}

/**
* `getKey` returns the key of this document.
*
Expand Down Expand Up @@ -547,7 +551,7 @@ public actor Document {
}

let event = StatusChangedEvent(source: status == .removed ? .remote : .local,
value: StatusInfo(status: status, actorID: status == .attached ? self.changeID.getActorID() : nil))
value: StatusInfo(status: status, actorID: status == .attached ? self.actorID : nil))

self.publish(event)
}
Expand Down Expand Up @@ -581,6 +585,10 @@ public actor Document {
self.publish(SyncStatusChangedEvent(value: status))
}

func publishInitializedEvent() {
self.publish(InitializedEvent(value: self.getPresences()))
}

/**
* `publish` triggers an event in this document, which can be received by
* callback functions from document.subscribe().
Expand All @@ -593,7 +601,7 @@ public actor Document {
callback(event, self)
}

if let id = self.changeID.getActorID() {
if let id = self.actorID {
var isMine = false
var isOthers = false

Expand Down Expand Up @@ -679,6 +687,14 @@ public actor Document {
self.onlineClients = onlineClients
}

/**
* `resetOnlineClients` resets the online client set.
*
*/
func resetOnlineClients() {
self.onlineClients = Set<ActorID>()
}

/**
* `addOnlineClient` adds the given clientID into the online client set.
*/
Expand All @@ -704,7 +720,7 @@ public actor Document {
* `getMyPresence` returns the presence of the current client.
*/
public func getMyPresence() -> [String: Any]? {
guard self.status == .attached, let id = self.changeID.getActorID() else {
guard self.status == .attached, let id = self.actorID else {
return nil
}

Expand All @@ -715,6 +731,10 @@ public actor Document {
* `getPresence` returns the presence of the given clientID.
*/
public func getPresence(_ clientID: ActorID) -> [String: Any]? {
if clientID == self.actorID {
return self.getMyPresence()
}

guard self.onlineClients.contains(clientID) else {
return nil
}
Expand All @@ -735,10 +755,12 @@ public actor Document {
public func getPresences(_ excludeMyself: Bool = false) -> [PeerElement] {
var presences = [PeerElement]()

let excludeID = excludeMyself == true ? self.changeID.getActorID() : nil
if !excludeMyself, let actorID, let presence = getMyPresence() {
presences.append(PeerElement(actorID, presence))
}

for clientID in self.onlineClients {
if clientID != excludeID, let presence = getPresence(clientID) {
if let presence = getPresence(clientID) {
presences.append((clientID, presence))
}
}
Expand Down
56 changes: 56 additions & 0 deletions Tests/Integration/PresenceTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,62 @@ final class PresenceSubscribeTests: XCTestCase {
try await c2.deactivate()
}

func test_should_not_be_accessible_to_other_clients_presence_when_the_stream_is_disconnected() async throws {
let docKey = "\(self.description)-\(Date().description)".toDocKey

let c1 = Client(rpcAddress)
let c2 = Client(rpcAddress)
try await c1.activate()
try await c2.activate()
let c2ID = await c2.id!

let doc1 = Document(key: docKey)

try await c1.attach(doc1, ["name": "a"])

let expect1 = expectation(description: "sub 1")

await doc1.subscribePresence { event, _ in
if let event = event as? WatchedEvent,
event.value.clientID == c2ID
{
expect1.fulfill()
}
}

let expect2 = expectation(description: "sub 2")

await doc1.subscribeConnection { event, _ in
if let event = event as? ConnectionChangedEvent,
event.value == .disconnected
{
expect2.fulfill()
}
}

let doc2 = Document(key: docKey)
try await c2.attach(doc2, ["name": "b"])

await fulfillment(of: [expect1], timeout: 5, enforceOrder: false)
await doc1.unsubscribePresence()

var presence = await doc1.getPresence(c2ID)

XCTAssertEqual(presence?["name"] as? String, "b")

try await c1.changeSyncMode(doc1, .manual)

await fulfillment(of: [expect2], timeout: 5, enforceOrder: false)
await doc1.unsubscribeConnection()

presence = await doc1.getPresence(c2ID)

XCTAssert(presence == nil)

try await c1.deactivate()
try await c2.deactivate()
}

func test_should_receive_presence_changed_event_for_final_presence_if_there_are_multiple_presence_changes_within_doc_update() async throws {
let docKey = "\(self.description)-\(Date().description)".toDocKey

Expand Down

0 comments on commit fdccb26

Please sign in to comment.