Skip to content

Commit

Permalink
update libkv & etcd packages (#335)
Browse files Browse the repository at this point in the history
  • Loading branch information
mthenw authored Oct 20, 2017
1 parent c1740e1 commit e300c56
Show file tree
Hide file tree
Showing 12 changed files with 113 additions and 117 deletions.
8 changes: 4 additions & 4 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 1 addition & 5 deletions Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,12 @@

[[constraint]]
name = "github.com/coreos/etcd"
version = "3.1.8"
version = "3.2.9"

[[constraint]]
name = "github.com/coreos/pkg"
version = "3.0.0"

[[constraint]]
name = "github.com/docker/libkv"
version = "0.2.1"

[[constraint]]
name = "github.com/golang/mock"

Expand Down
8 changes: 4 additions & 4 deletions functions/functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func (f *Functions) RegisterFunction(fn *Function) (*Function, error) {
return nil, err
}

_, err := f.DB.Get(string(fn.ID))
_, err := f.DB.Get(string(fn.ID), &store.ReadOptions{Consistent: true})
if err == nil {
return nil, &ErrAlreadyRegistered{fn.ID}
}
Expand All @@ -45,7 +45,7 @@ func (f *Functions) RegisterFunction(fn *Function) (*Function, error) {

// UpdateFunction updates function configuration.
func (f *Functions) UpdateFunction(fn *Function) (*Function, error) {
_, err := f.DB.Get(string(fn.ID))
_, err := f.DB.Get(string(fn.ID), &store.ReadOptions{Consistent: true})
if err != nil {
return nil, &ErrNotFound{fn.ID}
}
Expand All @@ -71,7 +71,7 @@ func (f *Functions) UpdateFunction(fn *Function) (*Function, error) {

// GetFunction returns function from configuration.
func (f *Functions) GetFunction(id FunctionID) (*Function, error) {
kv, err := f.DB.Get(string(id))
kv, err := f.DB.Get(string(id), &store.ReadOptions{Consistent: true})
if err != nil {
return nil, &ErrNotFound{id}
}
Expand All @@ -89,7 +89,7 @@ func (f *Functions) GetFunction(id FunctionID) (*Function, error) {
func (f *Functions) GetAllFunctions() ([]*Function, error) {
fns := []*Function{}

kvs, err := f.DB.List("")
kvs, err := f.DB.List("", &store.ReadOptions{Consistent: true})
if err != nil {
return nil, err
}
Expand Down
22 changes: 11 additions & 11 deletions functions/functions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ func TestRegisterFunction(t *testing.T) {
defer ctrl.Finish()

db := mock.NewMockStore(ctrl)
db.EXPECT().Get("testid").Return(nil, errors.New("KV func not found"))
db.EXPECT().Get("testid", &store.ReadOptions{Consistent: true}).Return(nil, errors.New("KV func not found"))
db.EXPECT().Put("testid", []byte(`{"functionId":"testid","provider":{"type":"http","url":"http://example.com"}}`), nil).Return(nil)
service := &Functions{DB: db, Log: zap.NewNop()}

Expand All @@ -42,7 +42,7 @@ func TestRegisterFunction_AlreadyExistsError(t *testing.T) {
defer ctrl.Finish()

db := mock.NewMockStore(ctrl)
db.EXPECT().Get("testid").Return(nil, nil)
db.EXPECT().Get("testid", gomock.Any()).Return(nil, nil)
service := &Functions{DB: db, Log: zap.NewNop()}

_, err := service.RegisterFunction(&Function{ID: "testid", Provider: &Provider{Type: HTTPEndpoint, URL: "http://example.com"}})
Expand All @@ -55,7 +55,7 @@ func TestRegisterFunction_PutError(t *testing.T) {
defer ctrl.Finish()

db := mock.NewMockStore(ctrl)
db.EXPECT().Get("testid").Return(nil, errors.New("KV func not found"))
db.EXPECT().Get("testid", gomock.Any()).Return(nil, errors.New("KV func not found"))
db.EXPECT().Put("testid", []byte(`{"functionId":"testid","provider":{"type":"http","url":"http://example.com"}}`), nil).Return(errors.New("KV put error"))
service := &Functions{DB: db, Log: zap.NewNop()}

Expand All @@ -69,7 +69,7 @@ func TestUpdateFunction(t *testing.T) {
defer ctrl.Finish()

db := mock.NewMockStore(ctrl)
db.EXPECT().Get("testid").Return(&store.KVPair{Value: []byte(`{"functionId":"testid", "provider":{"type":"http","url":"http://example.com"}}`)}, nil)
db.EXPECT().Get("testid", &store.ReadOptions{Consistent: true}).Return(&store.KVPair{Value: []byte(`{"functionId":"testid", "provider":{"type":"http","url":"http://example.com"}}`)}, nil)
db.EXPECT().Put("testid", []byte(`{"functionId":"testid","provider":{"type":"http","url":"http://example1.com"}}`), nil).Return(nil)
service := &Functions{DB: db, Log: zap.NewNop()}

Expand All @@ -83,7 +83,7 @@ func TestUpdateFunction_ValidationError(t *testing.T) {
defer ctrl.Finish()

db := mock.NewMockStore(ctrl)
db.EXPECT().Get("testid").Return(&store.KVPair{Value: []byte(`{"functionId":"testid", "provider":{"type":"http","url":"http://example.com"}}`)}, nil)
db.EXPECT().Get("testid", gomock.Any()).Return(&store.KVPair{Value: []byte(`{"functionId":"testid", "provider":{"type":"http","url":"http://example.com"}}`)}, nil)
service := &Functions{DB: db, Log: zap.NewNop()}

_, err := service.UpdateFunction(&Function{ID: "testid", Provider: &Provider{Type: HTTPEndpoint}})
Expand All @@ -96,7 +96,7 @@ func TestUpdateFunction_NotFoundError(t *testing.T) {
defer ctrl.Finish()

db := mock.NewMockStore(ctrl)
db.EXPECT().Get("testid").Return(nil, errors.New("KV not found"))
db.EXPECT().Get("testid", gomock.Any()).Return(nil, errors.New("KV not found"))
service := &Functions{DB: db, Log: zap.NewNop()}

_, err := service.UpdateFunction(&Function{ID: "testid", Provider: &Provider{Type: HTTPEndpoint, URL: "http://example.com"}})
Expand All @@ -109,7 +109,7 @@ func TestUpdateFunction_PutError(t *testing.T) {
defer ctrl.Finish()

db := mock.NewMockStore(ctrl)
db.EXPECT().Get("testid").Return(&store.KVPair{Value: []byte(`{"functionId":"testid", "provider":{"type":"http","url":"http://example.com"}}`)}, nil)
db.EXPECT().Get("testid", gomock.Any()).Return(&store.KVPair{Value: []byte(`{"functionId":"testid", "provider":{"type":"http","url":"http://example.com"}}`)}, nil)
db.EXPECT().Put("testid", []byte(`{"functionId":"testid","provider":{"type":"http","url":"http://example1.com"}}`), nil).Return(errors.New("KV put error"))
service := &Functions{DB: db, Log: zap.NewNop()}

Expand All @@ -123,7 +123,7 @@ func TestGetFunction(t *testing.T) {
defer ctrl.Finish()

db := mock.NewMockStore(ctrl)
db.EXPECT().Get("testid").Return(&store.KVPair{Value: []byte(`{"functionId":"testid"}`)}, nil)
db.EXPECT().Get("testid", &store.ReadOptions{Consistent: true}).Return(&store.KVPair{Value: []byte(`{"functionId":"testid"}`)}, nil)
service := &Functions{DB: db, Log: zap.NewNop()}

function, _ := service.GetFunction(FunctionID("testid"))
Expand All @@ -136,7 +136,7 @@ func TestGetFunction_NotFound(t *testing.T) {
defer ctrl.Finish()

db := mock.NewMockStore(ctrl)
db.EXPECT().Get("testid").Return(nil, errors.New("KV func not found"))
db.EXPECT().Get("testid", gomock.Any()).Return(nil, errors.New("KV func not found"))
service := &Functions{DB: db, Log: zap.NewNop()}

_, err := service.GetFunction(FunctionID("testid"))
Expand All @@ -153,7 +153,7 @@ func TestGetAllFunctions(t *testing.T) {
&store.KVPair{Value: []byte(`{"functionId":"f2"}`)},
}
db := mock.NewMockStore(ctrl)
db.EXPECT().List("").Return(kvs, nil)
db.EXPECT().List("", &store.ReadOptions{Consistent: true}).Return(kvs, nil)
service := &Functions{DB: db, Log: zap.NewNop()}

list, _ := service.GetAllFunctions()
Expand All @@ -166,7 +166,7 @@ func TestGetAllFunctions_ListError(t *testing.T) {
defer ctrl.Finish()

db := mock.NewMockStore(ctrl)
db.EXPECT().List("").Return([]*store.KVPair{}, errors.New("KV list err"))
db.EXPECT().List("", gomock.Any()).Return([]*store.KVPair{}, errors.New("KV list err"))
service := &Functions{DB: db, Log: zap.NewNop()}

_, err := service.GetAllFunctions()
Expand Down
40 changes: 20 additions & 20 deletions internal/kv/mock/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,37 +80,37 @@ func (_mr *_MockStoreRecorder) DeleteTree(arg0 interface{}) *gomock.Call {
return _mr.mock.ctrl.RecordCall(_mr.mock, "DeleteTree", arg0)
}

func (_m *MockStore) Exists(_param0 string) (bool, error) {
ret := _m.ctrl.Call(_m, "Exists", _param0)
func (_m *MockStore) Exists(_param0 string, _param1 *store.ReadOptions) (bool, error) {
ret := _m.ctrl.Call(_m, "Exists", _param0, _param1)
ret0, _ := ret[0].(bool)
ret1, _ := ret[1].(error)
return ret0, ret1
}

func (_mr *_MockStoreRecorder) Exists(arg0 interface{}) *gomock.Call {
return _mr.mock.ctrl.RecordCall(_mr.mock, "Exists", arg0)
func (_mr *_MockStoreRecorder) Exists(arg0, arg1 interface{}) *gomock.Call {
return _mr.mock.ctrl.RecordCall(_mr.mock, "Exists", arg0, arg1)
}

func (_m *MockStore) Get(_param0 string) (*store.KVPair, error) {
ret := _m.ctrl.Call(_m, "Get", _param0)
func (_m *MockStore) Get(_param0 string, _param1 *store.ReadOptions) (*store.KVPair, error) {
ret := _m.ctrl.Call(_m, "Get", _param0, _param1)
ret0, _ := ret[0].(*store.KVPair)
ret1, _ := ret[1].(error)
return ret0, ret1
}

func (_mr *_MockStoreRecorder) Get(arg0 interface{}) *gomock.Call {
return _mr.mock.ctrl.RecordCall(_mr.mock, "Get", arg0)
func (_mr *_MockStoreRecorder) Get(arg0, arg1 interface{}) *gomock.Call {
return _mr.mock.ctrl.RecordCall(_mr.mock, "Get", arg0, arg1)
}

func (_m *MockStore) List(_param0 string) ([]*store.KVPair, error) {
ret := _m.ctrl.Call(_m, "List", _param0)
func (_m *MockStore) List(_param0 string, _param1 *store.ReadOptions) ([]*store.KVPair, error) {
ret := _m.ctrl.Call(_m, "List", _param0, _param1)
ret0, _ := ret[0].([]*store.KVPair)
ret1, _ := ret[1].(error)
return ret0, ret1
}

func (_mr *_MockStoreRecorder) List(arg0 interface{}) *gomock.Call {
return _mr.mock.ctrl.RecordCall(_mr.mock, "List", arg0)
func (_mr *_MockStoreRecorder) List(arg0, arg1 interface{}) *gomock.Call {
return _mr.mock.ctrl.RecordCall(_mr.mock, "List", arg0, arg1)
}

func (_m *MockStore) NewLock(_param0 string, _param1 *store.LockOptions) (store.Locker, error) {
Expand All @@ -134,24 +134,24 @@ func (_mr *_MockStoreRecorder) Put(arg0, arg1, arg2 interface{}) *gomock.Call {
return _mr.mock.ctrl.RecordCall(_mr.mock, "Put", arg0, arg1, arg2)
}

func (_m *MockStore) Watch(_param0 string, _param1 <-chan struct{}) (<-chan *store.KVPair, error) {
ret := _m.ctrl.Call(_m, "Watch", _param0, _param1)
func (_m *MockStore) Watch(_param0 string, _param1 <-chan struct{}, _param2 *store.ReadOptions) (<-chan *store.KVPair, error) {
ret := _m.ctrl.Call(_m, "Watch", _param0, _param1, _param2)
ret0, _ := ret[0].(<-chan *store.KVPair)
ret1, _ := ret[1].(error)
return ret0, ret1
}

func (_mr *_MockStoreRecorder) Watch(arg0, arg1 interface{}) *gomock.Call {
return _mr.mock.ctrl.RecordCall(_mr.mock, "Watch", arg0, arg1)
func (_mr *_MockStoreRecorder) Watch(arg0, arg1, arg2 interface{}) *gomock.Call {
return _mr.mock.ctrl.RecordCall(_mr.mock, "Watch", arg0, arg1, arg2)
}

func (_m *MockStore) WatchTree(_param0 string, _param1 <-chan struct{}) (<-chan []*store.KVPair, error) {
ret := _m.ctrl.Call(_m, "WatchTree", _param0, _param1)
func (_m *MockStore) WatchTree(_param0 string, _param1 <-chan struct{}, _param2 *store.ReadOptions) (<-chan []*store.KVPair, error) {
ret := _m.ctrl.Call(_m, "WatchTree", _param0, _param1, _param2)
ret0, _ := ret[0].(<-chan []*store.KVPair)
ret1, _ := ret[1].(error)
return ret0, ret1
}

func (_mr *_MockStoreRecorder) WatchTree(arg0, arg1 interface{}) *gomock.Call {
return _mr.mock.ctrl.RecordCall(_mr.mock, "WatchTree", arg0, arg1)
func (_mr *_MockStoreRecorder) WatchTree(arg0, arg1, arg2 interface{}) *gomock.Call {
return _mr.mock.ctrl.RecordCall(_mr.mock, "WatchTree", arg0, arg1, arg2)
}
20 changes: 10 additions & 10 deletions internal/kv/prefixed.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ func (ps *PrefixedStore) Put(key string, value []byte, options *store.WriteOptio
}

// Get passes requests to the underlying libkv implementation, appending the root to paths for isolation.
func (ps *PrefixedStore) Get(key string) (*store.KVPair, error) {
return ps.kv.Get(ps.root + key)
func (ps *PrefixedStore) Get(key string, options *store.ReadOptions) (*store.KVPair, error) {
return ps.kv.Get(ps.root+key, options)
}

// Delete passes requests to the underlying libkv implementation, appending the root to paths for isolation.
Expand All @@ -39,18 +39,18 @@ func (ps *PrefixedStore) Delete(key string) error {
}

// Exists passes requests to the underlying libkv implementation, appending the root to paths for isolation.
func (ps *PrefixedStore) Exists(key string) (bool, error) {
return ps.kv.Exists(ps.root + key)
func (ps *PrefixedStore) Exists(key string, options *store.ReadOptions) (bool, error) {
return ps.kv.Exists(ps.root+key, options)
}

// Watch passes requests to the underlying libkv implementation, appending the root to paths for isolation.
func (ps *PrefixedStore) Watch(key string, stopCh <-chan struct{}) (<-chan *store.KVPair, error) {
return ps.kv.Watch(ps.root+key, stopCh)
func (ps *PrefixedStore) Watch(key string, stopCh <-chan struct{}, options *store.ReadOptions) (<-chan *store.KVPair, error) {
return ps.kv.Watch(ps.root+key, stopCh, options)
}

// WatchTree passes requests to the underlying libkv implementation, appending the root to paths for isolation.
func (ps *PrefixedStore) WatchTree(directory string, stopCh <-chan struct{}) (<-chan []*store.KVPair, error) {
return ps.kv.WatchTree(ps.root+directory, stopCh)
func (ps *PrefixedStore) WatchTree(directory string, stopCh <-chan struct{}, options *store.ReadOptions) (<-chan []*store.KVPair, error) {
return ps.kv.WatchTree(ps.root+directory, stopCh, options)
}

// NewLock passes requests to the underlying libkv implementation, appending the root to paths for isolation.
Expand All @@ -59,8 +59,8 @@ func (ps *PrefixedStore) NewLock(key string, options *store.LockOptions) (store.
}

// List passes requests to the underlying libkv implementation, appending the root to paths for isolation.
func (ps *PrefixedStore) List(directory string) ([]*store.KVPair, error) {
prefixed, err := ps.kv.List(ps.root + directory)
func (ps *PrefixedStore) List(directory string, options *store.ReadOptions) ([]*store.KVPair, error) {
prefixed, err := ps.kv.List(ps.root+directory, options)
if err != nil {
return nil, err
}
Expand Down
10 changes: 5 additions & 5 deletions internal/kv/prefixed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"testing"

"github.com/golang/mock/gomock"
"github.com/serverless/event-gateway/functions/mock"
"github.com/serverless/event-gateway/internal/kv/mock"
"github.com/serverless/libkv/store"
"github.com/stretchr/testify/assert"
)
Expand All @@ -20,10 +20,10 @@ func TestPrefixedStoreList(t *testing.T) {
&store.KVPair{Key: "testroot/testdir/key2", Value: []byte("value2")},
}
kv := mock.NewMockStore(ctrl)
kv.EXPECT().List("testroot/testdir").Return(kvs, nil)
kv.EXPECT().List("testroot/testdir", &store.ReadOptions{Consistent: true}).Return(kvs, nil)
ps := NewPrefixedStore("testroot", kv)

values, err := ps.List("testdir")
values, err := ps.List("testdir", &store.ReadOptions{Consistent: true})
assert.Nil(t, err)
assert.Equal(t, []*store.KVPair{
&store.KVPair{Key: "testdir/key1", Value: []byte("value1")},
Expand All @@ -36,10 +36,10 @@ func TestPrefixedStoreList_Error(t *testing.T) {
defer ctrl.Finish()

kv := mock.NewMockStore(ctrl)
kv.EXPECT().List("testroot/key").Return(nil, errors.New("KV error"))
kv.EXPECT().List("testroot/key", nil).Return(nil, errors.New("KV error"))
ps := NewPrefixedStore("testroot", kv)

values, err := ps.List("key")
values, err := ps.List("key", nil)
assert.Nil(t, values)
assert.EqualError(t, err, "KV error")
}
4 changes: 2 additions & 2 deletions internal/kv/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func (w *Watcher) watchRoot(outgoingEvents chan event, shutdown chan struct{}) {
}

// populate directory if it doesn't exist
exists, err := w.kv.Exists(w.path)
exists, err := w.kv.Exists(w.path, nil)
if err != nil {
w.log.Error("Could not access database.",
zap.String("event", "db"),
Expand All @@ -114,7 +114,7 @@ func (w *Watcher) watchRoot(outgoingEvents chan event, shutdown chan struct{}) {
}

// create watch chan for this directory
events, err := w.kv.WatchTree(w.path, shutdown)
events, err := w.kv.WatchTree(w.path, shutdown, nil)
if err != nil {
w.log.Error("Could not watch directory.",
zap.String("event", "db"),
Expand Down
2 changes: 1 addition & 1 deletion subscriptions/mock/mock.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
//go:generate mockgen -package mock -destination ./store.go github.com/docker/libkv/store Store
//go:generate mockgen -package mock -destination ./store.go github.com/serverless /libkv/store Store

package mock
Loading

0 comments on commit e300c56

Please sign in to comment.