diff --git a/Sources/Core/Client.swift b/Sources/Core/Client.swift index 9919ca5e..43b44a61 100644 --- a/Sources/Core/Client.swift +++ b/Sources/Core/Client.swift @@ -611,11 +611,19 @@ public actor Client { Task { if let stream = self.attachmentMap[docKey]?.remoteWatchStream?.responseStream { + await self.attachmentMap[docKey]?.doc.publishConnectionEvent(.connected) + do { for try await response in stream { await self.handleWatchDocumentsResponse(docKey: docKey, response: response) } } catch { + await self.attachmentMap[docKey]?.doc.resetOnlineClients() + await self.attachmentMap[docKey]?.doc.publishInitializedEvent() + await self.attachmentMap[docKey]?.doc.publishConnectionEvent(.disconnected) + + Logger.debug("[WD] c:\"\(self.key)\" unwatches") + if let status = error as? GRPCStatus, status.code == .cancelled { // Canceled by Client by detach. so there is No need to reconnect. } else { @@ -665,7 +673,9 @@ public actor Client { switch body { case .initialization(let initialization): var onlineClients = Set() - for pbClientID in initialization.clientIds { + let actorID = await self.attachmentMap[docKey]?.doc.actorID + + for pbClientID in initialization.clientIds.filter({ $0 != actorID }) { onlineClients.insert(pbClientID) } @@ -716,12 +726,6 @@ public actor Client { self.attachmentMap[docKey]?.watchLoopReconnectTimer?.invalidate() self.attachmentMap[docKey]?.watchLoopReconnectTimer = nil - - Logger.debug("[WD] c:\"\(self.key)\" unwatches") - - Task { - await self.attachmentMap[docKey]?.doc.publishConnectionEvent(.disconnected) - } } private func onStreamDisconnect(_ docKey: DocumentKey) throws { diff --git a/Sources/Document/DocEvent.swift b/Sources/Document/DocEvent.swift index 26c2ab47..a17cfcf5 100644 --- a/Sources/Document/DocEvent.swift +++ b/Sources/Document/DocEvent.swift @@ -222,7 +222,8 @@ public struct InitializedEvent: DocEvent { /** * InitializedEvent type */ - public var value: [PeerElement] + public let source: OpSource = .local + public let value: [PeerElement] } public struct WatchedEvent: DocEvent { diff --git a/Sources/Document/Document.swift b/Sources/Document/Document.swift index e00f58ac..7d884bf3 100644 --- a/Sources/Document/Document.swift +++ b/Sources/Document/Document.swift @@ -125,7 +125,7 @@ public actor Document { throw YorkieError.documentRemoved(message: "\(self) is removed.") } - guard let actorID = self.changeID.getActorID() else { + guard let actorID = self.actorID else { throw YorkieError.unexpected(message: "actor ID is null.") } @@ -327,6 +327,10 @@ public actor Document { // TODOs also apply into root. } + var actorID: ActorID? { + self.changeID.getActorID() + } + /** * `getKey` returns the key of this document. * @@ -547,7 +551,7 @@ public actor Document { } let event = StatusChangedEvent(source: status == .removed ? .remote : .local, - value: StatusInfo(status: status, actorID: status == .attached ? self.changeID.getActorID() : nil)) + value: StatusInfo(status: status, actorID: status == .attached ? self.actorID : nil)) self.publish(event) } @@ -581,6 +585,10 @@ public actor Document { self.publish(SyncStatusChangedEvent(value: status)) } + func publishInitializedEvent() { + self.publish(InitializedEvent(value: self.getPresences())) + } + /** * `publish` triggers an event in this document, which can be received by * callback functions from document.subscribe(). @@ -593,7 +601,7 @@ public actor Document { callback(event, self) } - if let id = self.changeID.getActorID() { + if let id = self.actorID { var isMine = false var isOthers = false @@ -679,6 +687,14 @@ public actor Document { self.onlineClients = onlineClients } + /** + * `resetOnlineClients` resets the online client set. + * + */ + func resetOnlineClients() { + self.onlineClients = Set() + } + /** * `addOnlineClient` adds the given clientID into the online client set. */ @@ -704,7 +720,7 @@ public actor Document { * `getMyPresence` returns the presence of the current client. */ public func getMyPresence() -> [String: Any]? { - guard self.status == .attached, let id = self.changeID.getActorID() else { + guard self.status == .attached, let id = self.actorID else { return nil } @@ -715,6 +731,10 @@ public actor Document { * `getPresence` returns the presence of the given clientID. */ public func getPresence(_ clientID: ActorID) -> [String: Any]? { + if clientID == self.actorID { + return self.getMyPresence() + } + guard self.onlineClients.contains(clientID) else { return nil } @@ -735,10 +755,12 @@ public actor Document { public func getPresences(_ excludeMyself: Bool = false) -> [PeerElement] { var presences = [PeerElement]() - let excludeID = excludeMyself == true ? self.changeID.getActorID() : nil + if !excludeMyself, let actorID, let presence = getMyPresence() { + presences.append(PeerElement(actorID, presence)) + } for clientID in self.onlineClients { - if clientID != excludeID, let presence = getPresence(clientID) { + if let presence = getPresence(clientID) { presences.append((clientID, presence)) } } diff --git a/Tests/Integration/PresenceTests.swift b/Tests/Integration/PresenceTests.swift index 59230ee1..df5e908d 100644 --- a/Tests/Integration/PresenceTests.swift +++ b/Tests/Integration/PresenceTests.swift @@ -401,6 +401,62 @@ final class PresenceSubscribeTests: XCTestCase { try await c2.deactivate() } + func test_should_not_be_accessible_to_other_clients_presence_when_the_stream_is_disconnected() async throws { + let docKey = "\(self.description)-\(Date().description)".toDocKey + + let c1 = Client(rpcAddress) + let c2 = Client(rpcAddress) + try await c1.activate() + try await c2.activate() + let c2ID = await c2.id! + + let doc1 = Document(key: docKey) + + try await c1.attach(doc1, ["name": "a"]) + + let expect1 = expectation(description: "sub 1") + + await doc1.subscribePresence { event, _ in + if let event = event as? WatchedEvent, + event.value.clientID == c2ID + { + expect1.fulfill() + } + } + + let expect2 = expectation(description: "sub 2") + + await doc1.subscribeConnection { event, _ in + if let event = event as? ConnectionChangedEvent, + event.value == .disconnected + { + expect2.fulfill() + } + } + + let doc2 = Document(key: docKey) + try await c2.attach(doc2, ["name": "b"]) + + await fulfillment(of: [expect1], timeout: 5, enforceOrder: false) + await doc1.unsubscribePresence() + + var presence = await doc1.getPresence(c2ID) + + XCTAssertEqual(presence?["name"] as? String, "b") + + try await c1.changeSyncMode(doc1, .manual) + + await fulfillment(of: [expect2], timeout: 5, enforceOrder: false) + await doc1.unsubscribeConnection() + + presence = await doc1.getPresence(c2ID) + + XCTAssert(presence == nil) + + try await c1.deactivate() + try await c2.deactivate() + } + func test_should_receive_presence_changed_event_for_final_presence_if_there_are_multiple_presence_changes_within_doc_update() async throws { let docKey = "\(self.description)-\(Date().description)".toDocKey