-
Notifications
You must be signed in to change notification settings - Fork 0
/
storager_fsm.go
157 lines (133 loc) · 3.43 KB
/
storager_fsm.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
package storager
import (
"bufio"
"bytes"
"context"
"encoding/binary"
"io"
"github.com/golang/snappy"
"github.com/weedge/xdis-storager/openkv"
)
// SnapshotHead is the head of a snapshot.
type SnapshotHead struct {
CommitID uint64
}
// Read reads meta from the Reader.
func (h *SnapshotHead) Read(r io.Reader) error {
return binary.Read(r, binary.BigEndian, &h.CommitID)
}
// Write writes meta to the Writer
func (h *SnapshotHead) Write(w io.Writer) error {
return binary.Write(w, binary.BigEndian, h.CommitID)
}
// SaveSnapshotWithHead dumps data to the Writer with SnapshotHead
func (s *Storager) SaveSnapshotWithHead(ctx context.Context, h *SnapshotHead, w io.Writer) (err error) {
var snap *openkv.Snapshot
s.wLock.Lock()
if snap, err = s.odb.NewSnapshot(); err != nil {
s.wLock.Unlock()
return err
}
s.wLock.Unlock()
defer snap.Close()
// new writeSize io buffer for io.Writer
wb := bufio.NewWriterSize(w, 4096)
if h != nil {
if err = h.Write(wb); err != nil {
return err
}
}
it := snap.NewIterator()
defer it.Close()
it.SeekToFirst()
compressBuf := make([]byte, 4096)
var key []byte
var value []byte
for ; it.Valid(); it.Next() {
key = it.RawKey()
value = it.RawValue()
// len(compress key) | compress key
// key is a new slice from compressBuf substr or make a new slice
key = snappy.Encode(compressBuf, key)
if err = binary.Write(wb, binary.BigEndian, uint16(len(key))); err != nil {
return err
}
if _, err = wb.Write(key); err != nil {
return err
}
// len(compress value) | compress value
// value is a new slice from compressBuf substr or make a new slice
value = snappy.Encode(compressBuf, value)
if err = binary.Write(wb, binary.BigEndian, uint32(len(value))); err != nil {
return err
}
if _, err = wb.Write(value); err != nil {
return err
}
}
// write buffer flush to io.Writer
return wb.Flush()
}
// RecoverFromSnapshotWithHead clears all data and loads dump file to db
// return snapshot head info,error
func (s *Storager) RecoverFromSnapshotWithHead(ctx context.Context, r io.Reader) (h *SnapshotHead, err error) {
s.wLock.Lock()
defer s.wLock.Unlock()
// clear all data
if err = s.flushAll(ctx); err != nil {
return nil, err
}
// new io buffer for io.Reader
rb := bufio.NewReaderSize(r, 4096)
h = new(SnapshotHead)
if err = h.Read(rb); err != nil {
return nil, err
}
var keyLen uint16
var keyBuf bytes.Buffer
var valueLen uint32
var valueBuf bytes.Buffer
deKeyBuf := make([]byte, 4096)
deValueBuf := make([]byte, 4096)
var key, value []byte
wb := s.odb.NewWriteBatch()
defer wb.Close()
n := 0
for {
// read key
if err = binary.Read(rb, binary.BigEndian, &keyLen); err != nil && err != io.EOF {
return nil, err
} else if err == io.EOF {
break
}
if _, err = io.CopyN(&keyBuf, rb, int64(keyLen)); err != nil {
return nil, err
}
if key, err = snappy.Decode(deKeyBuf, keyBuf.Bytes()); err != nil {
return nil, err
}
// read value
if err = binary.Read(rb, binary.BigEndian, &valueLen); err != nil {
return nil, err
}
if _, err = io.CopyN(&valueBuf, rb, int64(valueLen)); err != nil {
return nil, err
}
if value, err = snappy.Decode(deValueBuf, valueBuf.Bytes()); err != nil {
return nil, err
}
wb.Put(key, value)
n++
if n%1024 == 0 {
if err = wb.Commit(); err != nil {
return nil, err
}
}
keyBuf.Reset()
valueBuf.Reset()
}
if err = wb.Commit(); err != nil {
return nil, err
}
return h, nil
}