Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

wire a context in most of the Datastore methods #161

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 24 additions & 22 deletions autobatch/autobatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
package autobatch

import (
"context"

ds "github.com/ipfs/go-datastore"
dsq "github.com/ipfs/go-datastore/query"
)
Expand Down Expand Up @@ -34,16 +36,16 @@ func NewAutoBatching(d ds.Batching, size int) *Datastore {
}

// Delete deletes a key/value
func (d *Datastore) Delete(k ds.Key) error {
func (d *Datastore) Delete(ctx context.Context, k ds.Key) error {
d.buffer[k] = op{delete: true}
if len(d.buffer) > d.maxBufferEntries {
return d.Flush()
return d.Flush(ctx)
}
return nil
}

// Get retrieves a value given a key.
func (d *Datastore) Get(k ds.Key) ([]byte, error) {
func (d *Datastore) Get(ctx context.Context, k ds.Key) ([]byte, error) {
o, ok := d.buffer[k]
if ok {
if o.delete {
Expand All @@ -52,21 +54,21 @@ func (d *Datastore) Get(k ds.Key) ([]byte, error) {
return o.value, nil
}

return d.child.Get(k)
return d.child.Get(ctx, k)
}

// Put stores a key/value.
func (d *Datastore) Put(k ds.Key, val []byte) error {
func (d *Datastore) Put(ctx context.Context, k ds.Key, val []byte) error {
d.buffer[k] = op{value: val}
if len(d.buffer) > d.maxBufferEntries {
return d.Flush()
return d.Flush(ctx)
}
return nil
}

// Sync flushes all operations on keys at or under the prefix
// from the current batch to the underlying datastore
func (d *Datastore) Sync(prefix ds.Key) error {
func (d *Datastore) Sync(ctx context.Context, prefix ds.Key) error {
b, err := d.child.Batch()
if err != nil {
return err
Expand All @@ -79,9 +81,9 @@ func (d *Datastore) Sync(prefix ds.Key) error {

var err error
if o.delete {
err = b.Delete(k)
err = b.Delete(ctx, k)
} else {
err = b.Put(k, o.value)
err = b.Put(ctx, k, o.value)
}
if err != nil {
return err
Expand All @@ -90,11 +92,11 @@ func (d *Datastore) Sync(prefix ds.Key) error {
delete(d.buffer, k)
}

return b.Commit()
return b.Commit(ctx)
}

// Flush flushes the current batch to the underlying datastore.
func (d *Datastore) Flush() error {
func (d *Datastore) Flush(ctx context.Context) error {
b, err := d.child.Batch()
if err != nil {
return err
Expand All @@ -103,9 +105,9 @@ func (d *Datastore) Flush() error {
for k, o := range d.buffer {
var err error
if o.delete {
err = b.Delete(k)
err = b.Delete(ctx, k)
} else {
err = b.Put(k, o.value)
err = b.Put(ctx, k, o.value)
}
if err != nil {
return err
Expand All @@ -114,21 +116,21 @@ func (d *Datastore) Flush() error {
// clear out buffer
d.buffer = make(map[ds.Key]op, d.maxBufferEntries)

return b.Commit()
return b.Commit(ctx)
}

// Has checks if a key is stored.
func (d *Datastore) Has(k ds.Key) (bool, error) {
func (d *Datastore) Has(ctx context.Context, k ds.Key) (bool, error) {
o, ok := d.buffer[k]
if ok {
return !o.delete, nil
}

return d.child.Has(k)
return d.child.Has(ctx, k)
}

// GetSize implements Datastore.GetSize
func (d *Datastore) GetSize(k ds.Key) (int, error) {
func (d *Datastore) GetSize(ctx context.Context, k ds.Key) (int, error) {
o, ok := d.buffer[k]
if ok {
if o.delete {
Expand All @@ -137,17 +139,17 @@ func (d *Datastore) GetSize(k ds.Key) (int, error) {
return len(o.value), nil
}

return d.child.GetSize(k)
return d.child.GetSize(ctx, k)
}

// Query performs a query
func (d *Datastore) Query(q dsq.Query) (dsq.Results, error) {
err := d.Flush()
func (d *Datastore) Query(ctx context.Context, q dsq.Query) (dsq.Results, error) {
err := d.Flush(ctx)
if err != nil {
return nil, err
}

return d.child.Query(q)
return d.child.Query(ctx, q)
}

// DiskUsage implements the PersistentDatastore interface.
Expand All @@ -156,7 +158,7 @@ func (d *Datastore) DiskUsage() (uint64, error) {
}

func (d *Datastore) Close() error {
err1 := d.Flush()
err1 := d.Flush(context.Background())
err2 := d.child.Close()
if err1 != nil {
return err1
Expand Down
51 changes: 28 additions & 23 deletions autobatch/autobatch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package autobatch

import (
"bytes"
"context"
"fmt"
"testing"

Expand All @@ -14,6 +15,8 @@ func TestAutobatch(t *testing.T) {
}

func TestFlushing(t *testing.T) {
ctx := context.Background()

child := ds.NewMapDatastore()
d := NewAutoBatching(child, 16)

Expand All @@ -24,15 +27,15 @@ func TestFlushing(t *testing.T) {
v := []byte("hello world")

for _, k := range keys {
err := d.Put(k, v)
err := d.Put(ctx, k, v)
if err != nil {
t.Fatal(err)
}
}

// Get works normally.
for _, k := range keys {
val, err := d.Get(k)
val, err := d.Get(ctx, k)
if err != nil {
t.Fatal(err)
}
Expand All @@ -43,36 +46,36 @@ func TestFlushing(t *testing.T) {
}

// Not flushed
_, err := child.Get(keys[0])
_, err := child.Get(ctx, keys[0])
if err != ds.ErrNotFound {
t.Fatal("shouldnt have found value")
}

// Delete works.
err = d.Delete(keys[14])
err = d.Delete(ctx, keys[14])
if err != nil {
t.Fatal(err)
}
_, err = d.Get(keys[14])
_, err = d.Get(ctx, keys[14])
if err != ds.ErrNotFound {
t.Fatal(err)
}

// Still not flushed
_, err = child.Get(keys[0])
_, err = child.Get(ctx, keys[0])
if err != ds.ErrNotFound {
t.Fatal("shouldnt have found value")
}

// Final put flushes.
err = d.Put(ds.NewKey("test16"), v)
err = d.Put(ctx, ds.NewKey("test16"), v)
if err != nil {
t.Fatal(err)
}

// should be flushed now, try to get keys from child datastore
for _, k := range keys[:14] {
val, err := child.Get(k)
val, err := child.Get(ctx, k)
if err != nil {
t.Fatal(err)
}
Expand All @@ -83,18 +86,18 @@ func TestFlushing(t *testing.T) {
}

// Never flushed the deleted key.
_, err = child.Get(keys[14])
_, err = child.Get(ctx, keys[14])
if err != ds.ErrNotFound {
t.Fatal("shouldnt have found value")
}

// Delete doesn't flush
err = d.Delete(keys[0])
err = d.Delete(ctx, keys[0])
if err != nil {
t.Fatal(err)
}

val, err := child.Get(keys[0])
val, err := child.Get(ctx, keys[0])
if err != nil {
t.Fatal(err)
}
Expand All @@ -105,22 +108,24 @@ func TestFlushing(t *testing.T) {
}

func TestSync(t *testing.T) {
ctx := context.Background()

child := ds.NewMapDatastore()
d := NewAutoBatching(child, 100)

put := func(key ds.Key) {
if err := d.Put(key, []byte(key.String())); err != nil {
if err := d.Put(ctx, key, []byte(key.String())); err != nil {
t.Fatal(err)
}
}
del := func(key ds.Key) {
if err := d.Delete(key); err != nil {
if err := d.Delete(ctx, key); err != nil {
t.Fatal(err)
}
}

get := func(d ds.Datastore, key ds.Key) {
val, err := d.Get(key)
val, err := d.Get(ctx, key)
if err != nil {
t.Fatal(err)
}
Expand All @@ -130,21 +135,21 @@ func TestSync(t *testing.T) {
}
}
invalidGet := func(d ds.Datastore, key ds.Key) {
if _, err := d.Get(key); err != ds.ErrNotFound {
if _, err := d.Get(ctx, key); err != ds.ErrNotFound {
t.Fatal("should not have found value")
}
}

// Test if Syncing Puts works
internalSyncTest(t, d, child, put, del, get, invalidGet)
internalSyncTest(t, ctx, d, child, put, del, get, invalidGet)

// Test if Syncing Deletes works
internalSyncTest(t, d, child, del, put, invalidGet, get)
internalSyncTest(t, ctx, d, child, del, put, invalidGet, get)
}

// This function can be used to test Sync Puts and Deletes
// For clarity comments are written as if op = Put and undoOp = Delete
func internalSyncTest(t *testing.T, d, child ds.Datastore, op, undoOp func(ds.Key),
func internalSyncTest(t *testing.T, ctx context.Context, d, child ds.Datastore, op, undoOp func(ds.Key),
checkOp, checkUndoOp func(ds.Datastore, ds.Key)) {
var keys []ds.Key
keymap := make(map[ds.Key]int)
Expand Down Expand Up @@ -185,7 +190,7 @@ func internalSyncTest(t *testing.T, d, child ds.Datastore, op, undoOp func(ds.Ke
checkUndoOp(child, ds.NewKey("0"))

// Sync the tree "0/*/*"
if err := d.Sync(ds.NewKey("0")); err != nil {
if err := d.Sync(ctx, ds.NewKey("0")); err != nil {
t.Fatal(err)
}

Expand All @@ -196,7 +201,7 @@ func internalSyncTest(t *testing.T, d, child ds.Datastore, op, undoOp func(ds.Ke
checkKeyRange(t, keymap, keys, child, [][]string{{"1", "3/1/1"}}, checkUndoOp)

// Sync the tree "1/1/*"
if err := d.Sync(ds.NewKey("1/1")); err != nil {
if err := d.Sync(ctx, ds.NewKey("1/1")); err != nil {
t.Fatal(err)
}

Expand All @@ -207,7 +212,7 @@ func internalSyncTest(t *testing.T, d, child ds.Datastore, op, undoOp func(ds.Ke
checkKeyRange(t, keymap, keys, child, [][]string{{"1", "1/0/1"}, {"2", "3/1/1"}}, checkUndoOp)

// Sync the tree "3/1/1"
if err := d.Sync(ds.NewKey("3/1/1")); err != nil {
if err := d.Sync(ctx, ds.NewKey("3/1/1")); err != nil {
t.Fatal(err)
}

Expand All @@ -217,7 +222,7 @@ func internalSyncTest(t *testing.T, d, child ds.Datastore, op, undoOp func(ds.Ke
// Verify no other keys were synchronized
checkKeyRange(t, keymap, keys, child, [][]string{{"1", "1/0/1"}, {"2", "3/1/0"}}, checkUndoOp)

if err := d.Sync(ds.Key{}); err != nil {
if err := d.Sync(ctx, ds.Key{}); err != nil {
t.Fatal(err)
}

Expand All @@ -231,7 +236,7 @@ func internalSyncTest(t *testing.T, d, child ds.Datastore, op, undoOp func(ds.Ke
op(deletedKey)

// Sync it
if err := d.Sync(deletedKey); err != nil {
if err := d.Sync(ctx, deletedKey); err != nil {
t.Fatal(err)
}

Expand Down
Loading