-
Notifications
You must be signed in to change notification settings - Fork 2
/
tx.go
125 lines (114 loc) · 2.73 KB
/
tx.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
package tikv
import (
"context"
"encoding/json"
"fmt"
"github.com/autom8ter/myjson/kv"
"github.com/autom8ter/myjson/kv/kvutil"
tikvErr "github.com/tikv/client-go/v2/error"
"github.com/tikv/client-go/v2/txnkv/transaction"
)
type tikvTx struct {
txn *transaction.KVTxn
opts kv.TxOpts
db *tikvKV
entries []kv.CDC
}
func (t *tikvTx) NewIterator(kopts kv.IterOpts) (kv.Iterator, error) {
if kopts.Reverse {
if kopts.Seek == nil {
iter, err := t.txn.IterReverse(kvutil.NextPrefix(kopts.UpperBound))
if err != nil {
return nil, err
}
// iter.Seek(kopts.Seek) // TODO: how to seek?
return &tikvIterator{iter: iter, opts: kopts}, nil
}
iter, err := t.txn.IterReverse(kvutil.NextPrefix(kopts.Seek))
if err != nil {
return nil, err
}
return &tikvIterator{iter: iter, opts: kopts}, nil
}
iter, err := t.txn.Iter(kopts.Prefix, kvutil.NextPrefix(kopts.UpperBound))
if err != nil {
return nil, err
}
// iter.Seek(kopts.Seek) // TODO: how to seek?
return &tikvIterator{iter: iter, opts: kopts}, nil
}
func (t *tikvTx) Get(ctx context.Context, key []byte) ([]byte, error) {
{
val, _ := t.db.cache.Get(ctx, string(key)).Result()
if val != "" {
return []byte(val), nil
}
}
val, err := t.txn.Get(ctx, key)
if err != nil {
if tikvErr.IsErrNotFound(err) {
return nil, nil
}
return nil, err
}
return val, err
}
func (t *tikvTx) Set(ctx context.Context, key, value []byte) error {
if t.opts.IsReadOnly {
return fmt.Errorf("writes forbidden in read-only transaction")
}
if err := t.txn.Set(key, value); err != nil {
return err
}
t.entries = append(t.entries, kv.CDC{
Operation: kv.SETOP,
Key: key,
Value: value,
})
return nil
}
func (t *tikvTx) Delete(ctx context.Context, key []byte) error {
if t.opts.IsReadOnly {
return fmt.Errorf("writes forbidden in read-only transaction")
}
if err := t.txn.Delete(key); err != nil {
return err
}
t.db.cache.Del(ctx, string(key))
t.entries = append(t.entries, kv.CDC{
Operation: kv.DELOP,
Key: key,
})
return nil
}
func (t *tikvTx) Rollback(ctx context.Context) error {
t.entries = []kv.CDC{}
return t.txn.Rollback()
}
func (t *tikvTx) Commit(ctx context.Context) error {
if err := t.txn.Commit(ctx); err != nil {
return err
}
var toSet = map[string][]byte{}
for _, e := range t.entries {
bits, err := json.Marshal(e)
if err != nil {
return err
}
t.db.cache.Publish(ctx, string(e.Key), bits)
switch e.Operation {
case kv.DELOP:
toSet[string(e.Key)] = []byte("")
case kv.SETOP:
toSet[string(e.Key)] = e.Value
}
}
if err := t.db.cache.MSet(ctx, toSet).Err(); err != nil {
return err
}
t.entries = []kv.CDC{}
return nil
}
func (t *tikvTx) Close(ctx context.Context) {
t.entries = []kv.CDC{}
}