forked from donovanhide/eventsource
-
Notifications
You must be signed in to change notification settings - Fork 8
/
repository.go
53 lines (47 loc) · 1.39 KB
/
repository.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
package eventsource
import (
"sort"
"sync"
)
// SliceRepository is an example repository that uses a slice as storage for past events.
type SliceRepository struct {
events map[string][]Event
lock *sync.RWMutex
}
// NewSliceRepository creates a SliceRepository.
func NewSliceRepository() *SliceRepository {
return &SliceRepository{
events: make(map[string][]Event),
lock: &sync.RWMutex{},
}
}
func (repo SliceRepository) indexOfEvent(channel, id string) int {
return sort.Search(len(repo.events[channel]), func(i int) bool {
return repo.events[channel][i].Id() >= id
})
}
// Replay implements the event replay logic for the Repository interface.
func (repo SliceRepository) Replay(channel, id string) (out chan Event) {
out = make(chan Event)
go func() {
defer close(out)
repo.lock.RLock()
defer repo.lock.RUnlock()
events := repo.events[channel][repo.indexOfEvent(channel, id):]
for i := range events {
out <- events[i]
}
}()
return
}
// Add adds an event to the repository history.
func (repo *SliceRepository) Add(channel string, event Event) {
repo.lock.Lock()
defer repo.lock.Unlock()
i := repo.indexOfEvent(channel, event.Id())
if i < len(repo.events[channel]) && repo.events[channel][i].Id() == event.Id() {
repo.events[channel][i] = event
} else {
repo.events[channel] = append(repo.events[channel][:i], append([]Event{event}, repo.events[channel][i:]...)...)
}
}