generated from ipfs/ipfs-repository-template
-
Notifications
You must be signed in to change notification settings - Fork 1
/
batch.go
151 lines (126 loc) · 3.28 KB
/
batch.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
package ddbds
import (
"context"
"fmt"
"math"
"time"
"github.com/aws/aws-sdk-go/service/dynamodb"
"github.com/ipfs/go-datastore"
)
const (
maxBatchChunkAttempts = 3
)
func (d *DDBDatastore) Batch(_ context.Context) (datastore.Batch, error) {
return &batch{
ds: d,
reqs: map[datastore.Key][]byte{},
}, nil
}
type batch struct {
ds *DDBDatastore
// if the value exists but is nil, then it's a delete
reqs map[datastore.Key][]byte
}
func (b *batch) Put(ctx context.Context, key datastore.Key, value []byte) error {
b.reqs[key] = value
return nil
}
func (b *batch) Delete(ctx context.Context, key datastore.Key) error {
b.reqs[key] = nil
return nil
}
func (b *batch) Commit(ctx context.Context) error {
var keys []datastore.Key
for k := range b.reqs {
keys = append(keys, k)
}
return b.commitKeys(ctx, keys)
}
func (b *batch) commitKeys(ctx context.Context, keys []datastore.Key) error {
ctx, stop := context.WithCancel(ctx)
defer stop()
log.Debugf("committing batch", "Batch", keys)
errs := make(chan error)
chunks := chunk(len(keys), 25)
for _, chunk := range chunks {
var writeReqs []*dynamodb.WriteRequest
for _, keyIdx := range chunk {
k := keys[keyIdx]
v := b.reqs[k]
if v != nil {
// put
itemMap, err := b.ds.makePutItem(k, v, 0)
if err != nil {
return err
}
writeReqs = append(writeReqs, &dynamodb.WriteRequest{
PutRequest: &dynamodb.PutRequest{Item: itemMap},
})
} else {
// delete
itemMap, err := b.ds.makeDeleteItemMap(k)
if err != nil {
return err
}
writeReqs = append(writeReqs, &dynamodb.WriteRequest{
DeleteRequest: &dynamodb.DeleteRequest{Key: itemMap},
})
}
}
go b.commitChunk(ctx, errs, writeReqs)
}
for i := 0; i < len(chunks); i++ {
err := <-errs
if err != nil {
return err
}
}
return nil
}
func (b *batch) commitChunk(ctx context.Context, errs chan<- error, chunk []*dynamodb.WriteRequest) {
attempts := 0
var err error
defer func() {
select {
case errs <- err:
case <-ctx.Done():
}
}()
var res *dynamodb.BatchWriteItemOutput
for attempts < maxBatchChunkAttempts {
attempts++
batchReq := dynamodb.BatchWriteItemInput{
RequestItems: map[string][]*dynamodb.WriteRequest{b.ds.table: chunk},
}
res, err = b.ds.ddbClient.BatchWriteItemWithContext(ctx, &batchReq)
if err != nil {
return
}
if len(res.UnprocessedItems[b.ds.table]) == 0 {
return
}
chunk = res.UnprocessedItems[b.ds.table]
// sleep using exponential backoff w/ jitter
jitter := (b.ds.rand.Float64() * 0.2) + 0.9 // jitter factor is in interval [0.9:1.1]
delayMS := math.Exp2(float64(attempts)) * 250 * jitter // delays are approx 500, 1000, 2000, 4000, ...
delay := time.Duration(time.Duration(delayMS) * time.Millisecond)
time.Sleep(delay)
}
err = fmt.Errorf("reached max attempts (%d) trying to commit batch to DynamoDB, last error: %w", maxBatchChunkAttempts, err)
}
// chunk returns a list of chunks, each consisting of a list of array indexes.
func chunk(len int, chunkSize int) [][]int {
if chunkSize == 0 {
return nil
}
var chunks [][]int
for i := 0; i < len; i++ {
chunkIdx := i / chunkSize
elemIdx := i % chunkSize
if elemIdx == 0 {
chunks = append(chunks, nil)
}
chunks[chunkIdx] = append(chunks[chunkIdx], i)
}
return chunks
}