Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clean up methods related to presence #594

Merged
merged 2 commits into from
Aug 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion api/converter/to_bytes.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
)

// SnapshotToBytes converts the given document to byte array.
func SnapshotToBytes(obj *crdt.Object, presences *innerpresence.Map) ([]byte, error) {
func SnapshotToBytes(obj *crdt.Object, presences map[string]innerpresence.Presence) ([]byte, error) {
pbElem, err := toJSONElement(obj)
if err != nil {
return nil, err
Expand Down
7 changes: 3 additions & 4 deletions api/converter/to_pb.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,12 +123,11 @@ func ToDocumentSummary(summary *types.DocumentSummary) (*api.DocumentSummary, er
}

// ToPresences converts the given model to Protobuf format.
func ToPresences(presences *innerpresence.Map) map[string]*api.Presence {
func ToPresences(presences map[string]innerpresence.Presence) map[string]*api.Presence {
pbPresences := make(map[string]*api.Presence)
presences.Range(func(k string, v innerpresence.Presence) bool {
for k, v := range presences {
pbPresences[k] = ToPresence(v)
return true
})
}
return pbPresences
}

Expand Down
13 changes: 9 additions & 4 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,7 @@ func (c *Client) Watch(
clientIDs = append(clientIDs, id.String())
}

doc.SetOnlineClientSet(clientIDs...)
doc.SetOnlineClients(clientIDs...)
return nil, nil
case *api.WatchDocumentResponse_Event:
eventType, err := converter.FromEventType(resp.Event.Type)
Expand All @@ -450,19 +450,22 @@ func (c *Client) Watch(
return &WatchResponse{Type: DocumentChanged}, nil
case types.DocumentsWatchedEvent:
doc.AddOnlineClient(cli.String())
if doc.OnlinePresence(cli.String()) == nil {
if doc.Presence(cli.String()) == nil {
return nil, nil
}

return &WatchResponse{
Type: DocumentWatched,
Presences: map[string]innerpresence.Presence{
cli.String(): doc.OnlinePresence(cli.String()),
cli.String(): doc.Presence(cli.String()),
},
}, nil
case types.DocumentsUnwatchedEvent:
p := doc.OnlinePresence(cli.String())
p := doc.Presence(cli.String())
doc.RemoveOnlineClient(cli.String())
if p == nil {
return nil, nil
}

return &WatchResponse{
Type: DocumentUnwatched,
Expand Down Expand Up @@ -521,6 +524,8 @@ func (c *Client) Watch(
t := PresenceChanged
if e.Type == document.WatchedEvent {
t = DocumentWatched
} else if e.Type == document.UnwatchedEvent {
t = DocumentUnwatched
}
rch <- WatchResponse{Type: t, Presences: e.Presences}
case <-ctx.Done():
Expand Down
52 changes: 29 additions & 23 deletions pkg/document/document.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ const (
// enabling real-time synchronization.
WatchedEvent DocEventType = "watched"

// UnwatchedEvent means that the client has disconnected from the server,
// disabling real-time synchronization.
UnwatchedEvent DocEventType = "unwatched"

// PresenceChangedEvent means that the presences of the clients who are editing
// the document have changed.
PresenceChangedEvent DocEventType = "presence-changed"
Expand Down Expand Up @@ -282,48 +286,50 @@ func (d *Document) ensureClone() error {
return nil
}

// Presences returns the presence map of this document.
func (d *Document) Presences() map[string]innerpresence.Presence {
// TODO(hackerwins): We need to use client key instead of actor ID for exposing presence.
presences := make(map[string]innerpresence.Presence)
d.doc.presences.Range(func(key string, value innerpresence.Presence) bool {
presences[key] = value
return true
})
return presences
// MyPresence returns the presence of the actor.
func (d *Document) MyPresence() innerpresence.Presence {
return d.doc.MyPresence()
}

// Presence returns the presence of the given client.
// If the client is not online, it returns nil.
func (d *Document) Presence(clientID string) innerpresence.Presence {
return d.doc.Presence(clientID)
}

// MyPresence returns the presence of the actor.
func (d *Document) MyPresence() innerpresence.Presence {
return d.doc.MyPresence()
// PresenceForTest returns the presence of the given client
// regardless of whether the client is online or not.
func (d *Document) PresenceForTest(clientID string) innerpresence.Presence {
return d.doc.PresenceForTest(clientID)
}

// SetOnlineClientSet sets the online client set.
func (d *Document) SetOnlineClientSet(clientIDs ...string) {
d.doc.SetOnlineClientSet(clientIDs...)
// Presences returns the presence map of online clients.
func (d *Document) Presences() map[string]innerpresence.Presence {
// TODO(hackerwins): We need to use client key instead of actor ID for exposing presence.
return d.doc.Presences()
}

// AllPresences returns the presence map of all clients
// regardless of whether the client is online or not.
func (d *Document) AllPresences() map[string]innerpresence.Presence {
return d.doc.AllPresences()
}

// AddOnlineClient adds the given client to the online client set.
// SetOnlineClients sets the online clients.
func (d *Document) SetOnlineClients(clientIDs ...string) {
d.doc.SetOnlineClients(clientIDs...)
}

// AddOnlineClient adds the given client to the online clients.
func (d *Document) AddOnlineClient(clientID string) {
d.doc.AddOnlineClient(clientID)
}

// RemoveOnlineClient removes the given client from the online client set.
// RemoveOnlineClient removes the given client from the online clients.
func (d *Document) RemoveOnlineClient(clientID string) {
d.doc.RemoveOnlineClient(clientID)
}

// OnlinePresence returns the presence of the given client. If the client is not
// online, it returns nil.
func (d *Document) OnlinePresence(clientID string) innerpresence.Presence {
return d.doc.OnlinePresence(clientID)
}

// Events returns the events of this document.
func (d *Document) Events() <-chan DocEvent {
return d.events
Expand Down
9 changes: 6 additions & 3 deletions pkg/document/innerpresence/presence.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,13 @@ func (m *Map) Range(f func(clientID string, presence Presence) bool) {
}

// Load returns the presence for the given clientID.
func (m *Map) Load(clientID string) (Presence, bool) {
func (m *Map) Load(clientID string) Presence {
presence, ok := m.presences.Load(clientID)
if !ok {
return nil, false
return nil
}

return presence.(Presence), true
return presence.(Presence)
}

// LoadOrStore returns the existing presence if exists.
Expand Down Expand Up @@ -140,6 +140,9 @@ func (p Presence) Clear() {

// DeepCopy copies itself deeply.
func (p Presence) DeepCopy() Presence {
if p == nil {
return nil
}
clone := make(map[string]string)
for k, v := range p {
clone[k] = v
Expand Down
92 changes: 63 additions & 29 deletions pkg/document/internal_document.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,17 +257,29 @@ func (d *InternalDocument) ApplyChanges(changes ...*change.Change) ([]DocEvent,
if c.PresenceChange() != nil {
clientID := c.ID().ActorID().String()
if _, ok := d.onlineClients.Load(clientID); ok {
event := DocEvent{
Type: PresenceChangedEvent,
Presences: map[string]innerpresence.Presence{
clientID: c.PresenceChange().Presence,
},
switch c.PresenceChange().ChangeType {
case innerpresence.Put:
eventType := PresenceChangedEvent
if !d.presences.Has(clientID) {
eventType = WatchedEvent
}
event := DocEvent{
Type: eventType,
Presences: map[string]innerpresence.Presence{
clientID: c.PresenceChange().Presence,
},
}
events = append(events, event)
case innerpresence.Clear:
event := DocEvent{
Type: UnwatchedEvent,
Presences: map[string]innerpresence.Presence{
clientID: d.Presence(clientID),
},
}
events = append(events, event)
d.RemoveOnlineClient(clientID)
}

if !d.presences.Has(clientID) {
event.Type = WatchedEvent
}
events = append(events, event)
}
}

Expand All @@ -283,34 +295,56 @@ func (d *InternalDocument) ApplyChanges(changes ...*change.Change) ([]DocEvent,

// MyPresence returns the presence of the actor currently editing the document.
func (d *InternalDocument) MyPresence() innerpresence.Presence {
p := d.presences.LoadOrStore(d.changeID.ActorID().String(), innerpresence.NewPresence())
if d.status != StatusAttached {
return innerpresence.NewPresence()
}
p := d.presences.Load(d.changeID.ActorID().String())
return p.DeepCopy()
}

// Presences returns the map of presences of the actors currently editing the document.
func (d *InternalDocument) Presences() *innerpresence.Map {
return d.presences
}

// OnlinePresence returns the presence of the given client. If the client is not
// online, it returns nil.
func (d *InternalDocument) OnlinePresence(clientID string) innerpresence.Presence {
// Presence returns the presence of the given client.
// If the client is not online, it returns nil.
func (d *InternalDocument) Presence(clientID string) innerpresence.Presence {
if _, ok := d.onlineClients.Load(clientID); !ok {
return nil
}

presence, _ := d.presences.Load(clientID)
return presence
return d.presences.Load(clientID).DeepCopy()
}

// Presence returns the presence of the given client.
func (d *InternalDocument) Presence(clientID string) innerpresence.Presence {
presence, _ := d.presences.Load(clientID)
return presence
// PresenceForTest returns the presence of the given client
// regardless of whether the client is online or not.
func (d *InternalDocument) PresenceForTest(clientID string) innerpresence.Presence {
return d.presences.Load(clientID).DeepCopy()
}

// Presences returns the presence map of online clients.
func (d *InternalDocument) Presences() map[string]innerpresence.Presence {
presences := make(map[string]innerpresence.Presence)
d.onlineClients.Range(func(key, value interface{}) bool {
p := d.presences.Load(key.(string))
if p == nil {
return true
}
presences[key.(string)] = p.DeepCopy()
return true
})
return presences
}

// AllPresences returns the presence map of all clients
// regardless of whether the client is online or not.
func (d *InternalDocument) AllPresences() map[string]innerpresence.Presence {
hackerwins marked this conversation as resolved.
Show resolved Hide resolved
presences := make(map[string]innerpresence.Presence)
d.presences.Range(func(key string, value innerpresence.Presence) bool {
presences[key] = value.DeepCopy()
return true
})
return presences
}

// SetOnlineClientSet sets the online client set.
func (d *InternalDocument) SetOnlineClientSet(ids ...string) {
// SetOnlineClients sets the online clients.
func (d *InternalDocument) SetOnlineClients(ids ...string) {
d.onlineClients.Range(func(key, value interface{}) bool {
d.onlineClients.Delete(key)
return true
Expand All @@ -321,12 +355,12 @@ func (d *InternalDocument) SetOnlineClientSet(ids ...string) {
}
}

// AddOnlineClient adds the given client to the online client set.
// AddOnlineClient adds the given client to the online clients.
func (d *InternalDocument) AddOnlineClient(clientID string) {
d.onlineClients.Store(clientID, true)
}

// RemoveOnlineClient removes the given client from the online client set.
// RemoveOnlineClient removes the given client from the online clients.
func (d *InternalDocument) RemoveOnlineClient(clientID string) {
d.onlineClients.Delete(clientID)
}
2 changes: 1 addition & 1 deletion server/backend/database/memory/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -947,7 +947,7 @@ func (d *DB) CreateSnapshotInfo(
docID types.ID,
doc *document.InternalDocument,
) error {
snapshot, err := converter.SnapshotToBytes(doc.RootObject(), doc.Presences())
snapshot, err := converter.SnapshotToBytes(doc.RootObject(), doc.AllPresences())
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion server/backend/database/mongo/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1031,7 +1031,7 @@ func (c *Client) CreateSnapshotInfo(
if err != nil {
return err
}
snapshot, err := converter.SnapshotToBytes(doc.RootObject(), doc.Presences())
snapshot, err := converter.SnapshotToBytes(doc.RootObject(), doc.AllPresences())
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion server/packs/pushpull.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ func pullSnapshot(
}
cpAfterPull := cpAfterPush.NextServerSeq(docInfo.ServerSeq)

snapshot, err := converter.SnapshotToBytes(doc.RootObject(), doc.Presences())
snapshot, err := converter.SnapshotToBytes(doc.RootObject(), doc.AllPresences())
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion server/rpc/admin_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ func (s *adminServer) GetSnapshotMeta(
return nil, err
}

snapshot, err := converter.SnapshotToBytes(doc.RootObject(), doc.Presences())
snapshot, err := converter.SnapshotToBytes(doc.RootObject(), doc.AllPresences())
if err != nil {
return nil, err
}
Expand Down
Loading
Loading