Skip to content

Commit

Permalink
Rip out d.latestPos from Dispatcher; it wasn't used
Browse files Browse the repository at this point in the history
  • Loading branch information
kegsay committed Jun 9, 2023
1 parent 1b19d71 commit e861ccb
Show file tree
Hide file tree
Showing 3 changed files with 7 additions and 17 deletions.
2 changes: 1 addition & 1 deletion sync3/caches/global.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func NewGlobalCache(store *state.Storage) *GlobalCache {
}
}

func (c *GlobalCache) OnRegistered(_ context.Context, _ int64) error {
func (c *GlobalCache) OnRegistered(_ context.Context) error {
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion sync3/caches/user.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ func (c *UserCache) Unsubscribe(id int) {
// externally by sync3.SyncLiveHandler.userCache. It's importatn that we don't spend too
// long inside this function, because it is called within a global lock on the
// sync3.Dispatcher (see sync3.Dispatcher.Register).
func (c *UserCache) OnRegistered(ctx context.Context, _ int64) error {
func (c *UserCache) OnRegistered(ctx context.Context) error {
// select all spaces the user is a part of to seed the cache correctly. This has to be done in
// the OnRegistered callback which has locking guarantees. This is why...
latestPos, joinedRooms, joinTimings, err := c.globalCache.LoadJoinedRooms(ctx, c.UserID)
Expand Down
20 changes: 5 additions & 15 deletions sync3/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,27 +24,22 @@ type Receiver interface {
OnNewEvent(ctx context.Context, event *caches.EventData)
OnReceipt(ctx context.Context, receipt internal.Receipt)
OnEphemeralEvent(ctx context.Context, roomID string, ephEvent json.RawMessage)
// OnRegistered is called after a successful call to Dispatcher.Register. After
// this call, the receiver will be told about events whose NID is greater than
// latestPos.
OnRegistered(ctx context.Context, latestPos int64) error
// OnRegistered is called after a successful call to Dispatcher.Register
OnRegistered(ctx context.Context) error
}

// Dispatches live events to caches
type Dispatcher struct {
jrt *JoinedRoomsTracker
userToReceiver map[string]Receiver
userToReceiverMu *sync.RWMutex
// latestPos is an eventNID, the largest that this dispatcher has seen.
latestPos int64
}

func NewDispatcher() *Dispatcher {
return &Dispatcher{
jrt: NewJoinedRoomsTracker(),
userToReceiver: make(map[string]Receiver),
userToReceiverMu: &sync.RWMutex{},
latestPos: 0,
}
}

Expand Down Expand Up @@ -73,7 +68,7 @@ func (d *Dispatcher) Register(ctx context.Context, userID string, r Receiver) er
logger.Warn().Str("user", userID).Msg("Dispatcher.Register: receiver already registered")
}
d.userToReceiver[userID] = r
return r.OnRegistered(ctx, d.latestPos)
return r.OnRegistered(ctx)
}

func (d *Dispatcher) ReceiverForUser(userID string) Receiver {
Expand Down Expand Up @@ -154,14 +149,9 @@ func (d *Dispatcher) OnNewInitialRoomState(ctx context.Context, roomID string, s
}

func (d *Dispatcher) OnNewEvent(
ctx context.Context, roomID string, event json.RawMessage, pos int64,
ctx context.Context, roomID string, event json.RawMessage, nid int64,
) {
// keep track of the latest position. We don't care about it, but Receivers do if they want
// to atomically load from the global cache and receive updates.
if pos > d.latestPos {
d.latestPos = pos
}
ed := d.newEventData(event, roomID, pos)
ed := d.newEventData(event, roomID, nid)

// update the tracker
targetUser := ""
Expand Down

0 comments on commit e861ccb

Please sign in to comment.