Skip to content

Commit

Permalink
replace individual endpoint_cnt read from store with 1 bulk read
Browse files Browse the repository at this point in the history
getNetworksFromStore reads networks and endpoint_cnt from the kvstores.
endpoint_cnt especially is read in a for-loop for each network and that
causes a lot of stress in poorly performing KV-Stores.
This fix eases the load on the kvstore by fetching all the endpoint_cnt
in a single read and the operation is performed on it.

Signed-off-by: Madhu Venugopal <madhu@docker.com>
  • Loading branch information
mavenugo committed Feb 2, 2017
1 parent ca62711 commit c109fcf
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 28 deletions.
42 changes: 36 additions & 6 deletions datastore/datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ type DataStore interface {
// key. The caller must pass a KVObject of the same type as
// the objects that need to be listed
List(string, KVObject) ([]KVObject, error)
// Map returns a Map of KVObjects
Map(key string, kvObject KVObject) (map[string]KVObject, error)
// Scope returns the scope of the store
Scope() string
// KVStore returns access to the KV Store
Expand Down Expand Up @@ -512,40 +514,68 @@ func (ds *datastore) List(key string, kvObject KVObject) ([]KVObject, error) {
return ds.cache.list(kvObject)
}

var kvol []KVObject
cb := func(key string, val KVObject) {
kvol = append(kvol, val)
}
err := ds.iterateKVPairsFromStore(key, kvObject, cb)
if err != nil {
return nil, err
}
return kvol, nil
}

func (ds *datastore) iterateKVPairsFromStore(key string, kvObject KVObject, callback func(string, KVObject)) error {
// Bail out right away if the kvObject does not implement KVConstructor
ctor, ok := kvObject.(KVConstructor)
if !ok {
return nil, fmt.Errorf("error listing objects, object does not implement KVConstructor interface")
return fmt.Errorf("error listing objects, object does not implement KVConstructor interface")
}

// Make sure the parent key exists
if err := ds.ensureParent(key); err != nil {
return nil, err
return err
}

kvList, err := ds.store.List(key)
if err != nil {
return nil, err
return err
}

var kvol []KVObject
for _, kvPair := range kvList {
if len(kvPair.Value) == 0 {
continue
}

dstO := ctor.New()
if err := dstO.SetValue(kvPair.Value); err != nil {
return nil, err
return err
}

// Make sure the object has a correct view of the DB index in
// case we need to modify it and update the DB.
dstO.SetIndex(kvPair.LastIndex)
callback(kvPair.Key, dstO)
}

return nil
}

kvol = append(kvol, dstO)
func (ds *datastore) Map(key string, kvObject KVObject) (map[string]KVObject, error) {
if ds.sequential {
ds.Lock()
defer ds.Unlock()
}

kvol := make(map[string]KVObject)
cb := func(key string, val KVObject) {
// Trim the leading & trailing "/" to make it consistent across all stores
kvol[strings.Trim(key, "/")] = val
}
err := ds.iterateKVPairsFromStore(key, kvObject, cb)
if err != nil {
return nil, err
}
return kvol, nil
}

Expand Down
28 changes: 15 additions & 13 deletions endpoint_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,21 +181,23 @@ type tableEntry struct {
}

func (ep *endpoint) Info() EndpointInfo {
n, err := ep.getNetworkFromStore()
if err != nil {
return nil
}

ep, err = n.getEndpointFromStore(ep.ID())
if err != nil {
return nil
}

sb, ok := ep.getSandbox()
if !ok {
// endpoint hasn't joined any sandbox.
// Just return the endpoint
return ep
n, err := ep.getNetworkFromStore()
if err != nil {
return nil
}

ep, err = n.getEndpointFromStore(ep.ID())
if err != nil {
return nil
}
sb, ok = ep.getSandbox()
if !ok {
// endpoint hasn't joined any sandbox.
// Just return the endpoint
return ep
}
}

if epi := sb.getEndpoint(ep.ID()); epi != nil {
Expand Down
22 changes: 13 additions & 9 deletions store.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package libnetwork

import (
"fmt"
"strings"

"github.com/Sirupsen/logrus"
"github.com/docker/libkv/store/boltdb"
Expand Down Expand Up @@ -152,21 +153,24 @@ func (c *controller) getNetworksFromStore() ([]*network, error) {
continue
}

kvep, err := store.Map(datastore.Key(epCntKeyPrefix), &endpointCnt{})
if err != nil {
if err != datastore.ErrKeyNotFound {
logrus.Warnf("failed to get endpoint_count map for scope %s: %v", store.Scope(), err)
}
}

for _, kvo := range kvol {
n := kvo.(*network)
n.Lock()
n.ctrlr = c
n.Unlock()

ec := &endpointCnt{n: n}
err = store.GetObject(datastore.Key(ec.Key()...), ec)
if err != nil && !n.inDelete {
logrus.Warnf("could not find endpoint count key %s for network %s while listing: %v", datastore.Key(ec.Key()...), n.Name(), err)
continue
// Trim the leading & trailing "/" to make it consistent across all stores
if val, ok := kvep[strings.Trim(datastore.Key(ec.Key()...), "/")]; ok {
ec = val.(*endpointCnt)
ec.n = n
n.epCnt = ec
}

n.Lock()
n.epCnt = ec
n.scope = store.Scope()
n.Unlock()
nl = append(nl, n)
Expand Down

0 comments on commit c109fcf

Please sign in to comment.