-
Notifications
You must be signed in to change notification settings - Fork 29
/
lww_e_set.go
151 lines (121 loc) · 2.68 KB
/
lww_e_set.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
package crdt
import (
"encoding/json"
"errors"
"time"
"github.com/benbjohnson/clock"
)
type LWWSet struct {
addMap map[interface{}]time.Time
rmMap map[interface{}]time.Time
bias BiasType
clock clock.Clock
}
type BiasType string
const (
BiasAdd BiasType = "a"
BiasRemove BiasType = "r"
)
var (
ErrNoSuchBias = errors.New("no such bias found")
)
func NewLWWSet() (*LWWSet, error) {
return NewLWWSetWithBias(BiasAdd)
}
func NewLWWSetWithBias(bias BiasType) (*LWWSet, error) {
if bias != BiasAdd && bias != BiasRemove {
return nil, ErrNoSuchBias
}
return &LWWSet{
addMap: make(map[interface{}]time.Time),
rmMap: make(map[interface{}]time.Time),
bias: bias,
clock: clock.New(),
}, nil
}
func (s *LWWSet) Add(value interface{}) {
s.addMap[value] = s.clock.Now()
}
func (s *LWWSet) Remove(value interface{}) {
s.rmMap[value] = s.clock.Now()
}
func (s *LWWSet) Contains(value interface{}) bool {
addTime, addOk := s.addMap[value]
// If a value is not present in added set then
// always return false, irrespective of whether
// it is present in the removed set.
if !addOk {
return false
}
rmTime, rmOk := s.rmMap[value]
// If a value is present in added set and not in remove
// we should always return true.
if !rmOk {
return true
}
switch s.bias {
case BiasAdd:
return !addTime.Before(rmTime)
case BiasRemove:
return rmTime.Before(addTime)
}
// This case will almost always never be hit. Usually
// if an invalid Bias value is provided, it is called
// at a higher level.
return false
}
func (s *LWWSet) Merge(r *LWWSet) {
for value, ts := range r.addMap {
if t, ok := s.addMap[value]; ok && t.Before(ts) {
s.addMap[value] = ts
} else {
if t.Before(ts) {
s.addMap[value] = ts
} else {
s.addMap[value] = t
}
}
}
for value, ts := range r.rmMap {
if t, ok := s.rmMap[value]; ok && t.Before(ts) {
s.rmMap[value] = ts
} else {
if t.Before(ts) {
s.rmMap[value] = ts
} else {
s.rmMap[value] = t
}
}
}
}
type lwwesetJSON struct {
T string `json:"type"`
B string `json:"bias"`
E []elJSON `json:"e"`
}
type elJSON struct {
Elem interface{} `json:"el"`
TAdd int64 `json:"ta,omitempty"`
TDel int64 `json:"td,omitempty"`
}
func (s *LWWSet) MarshalJSON() ([]byte, error) {
l := &lwwesetJSON{
T: "lww-e-set",
B: string(s.bias),
E: make([]elJSON, 0, len(s.addMap)),
}
for e, t := range s.addMap {
el := elJSON{Elem: e, TAdd: t.Unix()}
if td, ok := s.rmMap[e]; ok {
el.TDel = td.Unix()
}
l.E = append(l.E, el)
}
for e, t := range s.rmMap {
if _, ok := s.addMap[e]; ok {
continue
}
l.E = append(l.E, elJSON{Elem: e, TDel: t.Unix()})
}
return json.Marshal(l)
}