Skip to content

Commit

Permalink
Merge pull request #1291 from hashicorp/f-memdb
Browse files Browse the repository at this point in the history
New memdb-based state store
  • Loading branch information
slackpad committed Oct 21, 2015
2 parents b1a335d + ff1eec0 commit 7601a16
Show file tree
Hide file tree
Showing 47 changed files with 9,180 additions and 7,701 deletions.
58 changes: 33 additions & 25 deletions api/lock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func TestLock_LockUnlock(t *testing.T) {
t.Fatalf("err: %v", err)
}

// Should loose leadership
// Should lose leadership
select {
case <-leaderCh:
case <-time.After(time.Second):
Expand Down Expand Up @@ -105,32 +105,40 @@ func TestLock_DeleteKey(t *testing.T) {
c, s := makeClient(t)
defer s.Stop()

lock, err := c.LockKey("test/lock")
if err != nil {
t.Fatalf("err: %v", err)
}

// Should work
leaderCh, err := lock.Lock(nil)
if err != nil {
t.Fatalf("err: %v", err)
}
if leaderCh == nil {
t.Fatalf("not leader")
}
defer lock.Unlock()
// This uncovered some issues around special-case handling of low index
// numbers where it would work with a low number but fail for higher
// ones, so we loop this a bit to sweep the index up out of that
// territory.
for i := 0; i < 10; i++ {
func() {
lock, err := c.LockKey("test/lock")
if err != nil {
t.Fatalf("err: %v", err)
}

go func() {
// Nuke the key, simulate an operator intervention
kv := c.KV()
kv.Delete("test/lock", nil)
}()
// Should work
leaderCh, err := lock.Lock(nil)
if err != nil {
t.Fatalf("err: %v", err)
}
if leaderCh == nil {
t.Fatalf("not leader")
}
defer lock.Unlock()

// Should loose leadership
select {
case <-leaderCh:
case <-time.After(time.Second):
t.Fatalf("should not be leader")
go func() {
// Nuke the key, simulate an operator intervention
kv := c.KV()
kv.Delete("test/lock", nil)
}()

// Should loose leadership
select {
case <-leaderCh:
case <-time.After(time.Second):
t.Fatalf("should not be leader")
}
}()
}
}

Expand Down
3 changes: 2 additions & 1 deletion command/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"time"

"github.com/hashicorp/consul/consul"
"github.com/hashicorp/consul/consul/state"
"github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/serf/serf"
)
Expand Down Expand Up @@ -94,7 +95,7 @@ type Agent struct {
eventBuf []*UserEvent
eventIndex int
eventLock sync.RWMutex
eventNotify consul.NotifyGroup
eventNotify state.NotifyGroup

shutdown bool
shutdownCh chan struct{}
Expand Down
6 changes: 3 additions & 3 deletions command/agent/dns.go
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,7 @@ RPC:
}

// Add the node record
records := d.formatNodeRecord(&out.NodeServices.Node, out.NodeServices.Node.Address,
records := d.formatNodeRecord(out.NodeServices.Node, out.NodeServices.Node.Address,
req.Question[0].Name, qType, d.config.NodeTTL)
if records != nil {
resp.Answer = append(resp.Answer, records...)
Expand Down Expand Up @@ -585,7 +585,7 @@ func (d *DNSServer) serviceNodeRecords(nodes structs.CheckServiceNodes, req, res
handled[addr] = struct{}{}

// Add the node record
records := d.formatNodeRecord(&node.Node, addr, qName, qType, ttl)
records := d.formatNodeRecord(node.Node, addr, qName, qType, ttl)
if records != nil {
resp.Answer = append(resp.Answer, records...)
}
Expand Down Expand Up @@ -626,7 +626,7 @@ func (d *DNSServer) serviceSRVRecords(dc string, nodes structs.CheckServiceNodes
}

// Add the extra record
records := d.formatNodeRecord(&node.Node, addr, srvRec.Target, dns.TypeANY, ttl)
records := d.formatNodeRecord(node.Node, addr, srvRec.Target, dns.TypeANY, ttl)
if records != nil {
resp.Extra = append(resp.Extra, records...)
}
Expand Down
4 changes: 4 additions & 0 deletions command/agent/local_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ func TestAgentAntiEntropy_Services(t *testing.T) {

// All the services should match
for id, serv := range services.NodeServices.Services {
serv.CreateIndex, serv.ModifyIndex = 0, 0
switch id {
case "mysql":
if !reflect.DeepEqual(serv, srv1) {
Expand Down Expand Up @@ -236,6 +237,7 @@ func TestAgentAntiEntropy_EnableTagOverride(t *testing.T) {

// All the services should match
for id, serv := range services.NodeServices.Services {
serv.CreateIndex, serv.ModifyIndex = 0, 0
switch id {
case "svc_id1":
if serv.ID != "svc_id1" ||
Expand Down Expand Up @@ -455,6 +457,7 @@ func TestAgentAntiEntropy_Services_ACLDeny(t *testing.T) {

// All the services should match
for id, serv := range services.NodeServices.Services {
serv.CreateIndex, serv.ModifyIndex = 0, 0
switch id {
case "mysql":
t.Fatalf("should not be permitted")
Expand Down Expand Up @@ -581,6 +584,7 @@ func TestAgentAntiEntropy_Checks(t *testing.T) {

// All the checks should match
for _, chk := range checks.HealthChecks {
chk.CreateIndex, chk.ModifyIndex = 0, 0
switch chk.CheckID {
case "mysql":
if !reflect.DeepEqual(chk, chk1) {
Expand Down
20 changes: 14 additions & 6 deletions consul/acl_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,16 +123,20 @@ func (a *ACL) Get(args *structs.ACLSpecificRequest,
state := a.srv.fsm.State()
return a.srv.blockingRPC(&args.QueryOptions,
&reply.QueryMeta,
state.QueryTables("ACLGet"),
state.GetQueryWatch("ACLGet"),
func() error {
index, acl, err := state.ACLGet(args.ACL)
if err != nil {
return err
}

reply.Index = index
if acl != nil {
reply.ACLs = structs.ACLs{acl}
} else {
reply.ACLs = nil
}
return err
return nil
})
}

Expand Down Expand Up @@ -194,10 +198,14 @@ func (a *ACL) List(args *structs.DCSpecificRequest,
state := a.srv.fsm.State()
return a.srv.blockingRPC(&args.QueryOptions,
&reply.QueryMeta,
state.QueryTables("ACLList"),
state.GetQueryWatch("ACLList"),
func() error {
var err error
reply.Index, reply.ACLs, err = state.ACLList()
return err
index, acls, err := state.ACLList()
if err != nil {
return err
}

reply.Index, reply.ACLs = index, acls
return nil
})
}
8 changes: 4 additions & 4 deletions consul/acl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -724,7 +724,7 @@ func TestACL_filterServices(t *testing.T) {
func TestACL_filterServiceNodes(t *testing.T) {
// Create some service nodes
nodes := structs.ServiceNodes{
structs.ServiceNode{
&structs.ServiceNode{
Node: "node1",
ServiceName: "foo",
},
Expand All @@ -748,7 +748,7 @@ func TestACL_filterServiceNodes(t *testing.T) {
func TestACL_filterNodeServices(t *testing.T) {
// Create some node services
services := structs.NodeServices{
Node: structs.Node{
Node: &structs.Node{
Node: "node1",
},
Services: map[string]*structs.NodeService{
Expand Down Expand Up @@ -778,10 +778,10 @@ func TestACL_filterCheckServiceNodes(t *testing.T) {
// Create some nodes
nodes := structs.CheckServiceNodes{
structs.CheckServiceNode{
Node: structs.Node{
Node: &structs.Node{
Node: "node1",
},
Service: structs.NodeService{
Service: &structs.NodeService{
ID: "foo",
Service: "foo",
},
Expand Down
55 changes: 40 additions & 15 deletions consul/catalog_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,13 +119,19 @@ func (c *Catalog) ListNodes(args *structs.DCSpecificRequest, reply *structs.Inde
return err
}

// Get the local state
// Get the list of nodes.
state := c.srv.fsm.State()
return c.srv.blockingRPC(&args.QueryOptions,
return c.srv.blockingRPC(
&args.QueryOptions,
&reply.QueryMeta,
state.QueryTables("Nodes"),
state.GetQueryWatch("Nodes"),
func() error {
reply.Index, reply.Nodes = state.Nodes()
index, nodes, err := state.Nodes()
if err != nil {
return err
}

reply.Index, reply.Nodes = index, nodes
return nil
})
}
Expand All @@ -136,13 +142,19 @@ func (c *Catalog) ListServices(args *structs.DCSpecificRequest, reply *structs.I
return err
}

// Get the current nodes
// Get the list of services and their tags.
state := c.srv.fsm.State()
return c.srv.blockingRPC(&args.QueryOptions,
return c.srv.blockingRPC(
&args.QueryOptions,
&reply.QueryMeta,
state.QueryTables("Services"),
state.GetQueryWatch("Services"),
func() error {
reply.Index, reply.Services = state.Services()
index, services, err := state.Services()
if err != nil {
return err
}

reply.Index, reply.Services = index, services
return c.srv.filterACL(args.Token, reply)
})
}
Expand All @@ -160,15 +172,23 @@ func (c *Catalog) ServiceNodes(args *structs.ServiceSpecificRequest, reply *stru

// Get the nodes
state := c.srv.fsm.State()
err := c.srv.blockingRPC(&args.QueryOptions,
err := c.srv.blockingRPC(
&args.QueryOptions,
&reply.QueryMeta,
state.QueryTables("ServiceNodes"),
state.GetQueryWatch("ServiceNodes"),
func() error {
var index uint64
var services structs.ServiceNodes
var err error
if args.TagFilter {
reply.Index, reply.ServiceNodes = state.ServiceTagNodes(args.ServiceName, args.ServiceTag)
index, services, err = state.ServiceTagNodes(args.ServiceName, args.ServiceTag)
} else {
reply.Index, reply.ServiceNodes = state.ServiceNodes(args.ServiceName)
index, services, err = state.ServiceNodes(args.ServiceName)
}
if err != nil {
return err
}
reply.Index, reply.ServiceNodes = index, services
return c.srv.filterACL(args.Token, reply)
})

Expand Down Expand Up @@ -198,11 +218,16 @@ func (c *Catalog) NodeServices(args *structs.NodeSpecificRequest, reply *structs

// Get the node services
state := c.srv.fsm.State()
return c.srv.blockingRPC(&args.QueryOptions,
return c.srv.blockingRPC(
&args.QueryOptions,
&reply.QueryMeta,
state.QueryTables("NodeServices"),
state.GetQueryWatch("NodeServices"),
func() error {
reply.Index, reply.NodeServices = state.NodeServices(args.Node)
index, services, err := state.NodeServices(args.Node)
if err != nil {
return err
}
reply.Index, reply.NodeServices = index, services
return c.srv.filterACL(args.Token, reply)
})
}
Loading

0 comments on commit 7601a16

Please sign in to comment.