-
-
Notifications
You must be signed in to change notification settings - Fork 6
/
counter.go
140 lines (107 loc) · 2.76 KB
/
counter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
package goriak
import (
"encoding/json"
"errors"
riak "github.com/basho/riak-go-client"
)
// NewCounter returns a partial Counter
// Counters returned by NewCounter() can only be updated with SetMap().
// Counter.Exec() will not work on counters returned by NewCounter()
func NewCounter() *Counter {
return &Counter{
val: 0,
increaseBy: 0,
}
}
// Counter is a wapper to handle Riak Counters
// Counter needs to be initialized by GetMap() to fully function
type Counter struct {
helper
val int64
increaseBy int64
}
// Increase the value in the Counter by i
// The value in Counter.Value() will be updated directly
// Increase() will not save the changes to Riak directly
func (c *Counter) Increase(i int64) *Counter {
if c == nil {
return nil
}
c.val += i
c.increaseBy += i
return c
}
// Value returns the value in the Counter
func (c *Counter) Value() int64 {
return c.val
}
// Exec saves changes made to the Counter to Riak
// Exec only works on Counters initialized by GetMap()
// If the commad succeeds the counter will be updated with the value in the response from Riak
func (c *Counter) Exec(client *Session) error {
if c == nil {
return errors.New("Nil Counter")
}
if c.name == "" {
return errors.New("Unknown path to Counter. Retrieve Counter with Get or Set before updating the Counter")
}
// Validate c.key
if c.key.bucket == "" || c.key.bucketType == "" || c.key.key == "" {
return errors.New("Invalid key in Counter Exec()")
}
op := &riak.MapOperation{}
outerOp := op
// Traverse c.path so that we increment the correct counter in nested maps
for _, subMapName := range c.path {
op = op.Map(subMapName)
}
op.IncrementCounter(c.name, c.increaseBy)
cmd, err := riak.NewUpdateMapCommandBuilder().
WithBucket(c.key.bucket).
WithBucketType(c.key.bucketType).
WithKey(c.key.key).
WithMapOperation(outerOp).
WithReturnBody(true).
Build()
if err != nil {
return err
}
err = client.riak.Execute(cmd)
if err != nil {
return err
}
res, ok := cmd.(*riak.UpdateMapCommand)
if !ok {
return errors.New("Could not convert")
}
if !res.Success() {
return errors.New("Not successful")
}
// Update c.val from the response
m := res.Response.Map
for _, subMapName := range c.path {
if _, ok := m.Maps[subMapName]; ok {
m = m.Maps[subMapName]
}
}
if resVal, ok := m.Counters[c.name]; ok {
c.val = resVal
}
// Reset increase counter
c.increaseBy = 0
return nil
}
// MarshalJSON satisfies the JSON interface
func (c Counter) MarshalJSON() ([]byte, error) {
return json.Marshal(c.val)
}
// UnmarshalJSON satisfies the JSON interface
func (c *Counter) UnmarshalJSON(data []byte) error {
var value int64
err := json.Unmarshal(data, &value)
if err != nil {
return err
}
c.val = value
return nil
}