Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for DELETE with CAS #589

Merged
merged 8 commits into from
Jan 9, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 17 additions & 1 deletion command/agent/kvs_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,12 +226,28 @@ func (s *HTTPServer) KVSDelete(resp http.ResponseWriter, req *http.Request, args
return nil, nil
}

// Check for cas value
if _, ok := params["cas"]; ok {
casVal, err := strconv.ParseUint(params.Get("cas"), 10, 64)
if err != nil {
return nil, err
}
applyReq.DirEnt.ModifyIndex = casVal
applyReq.Op = structs.KVSDeleteCAS
}

// Make the RPC
var out bool
if err := s.agent.RPC("KVS.Apply", &applyReq, &out); err != nil {
return nil, err
}
return nil, nil

// Only use the out value if this was a CAS
if applyReq.Op == structs.KVSDeleteCAS {
return out, nil
} else {
return true, nil
}
}

// missingKey checks if the key is missing
Expand Down
92 changes: 90 additions & 2 deletions command/agent/kvs_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@ package agent
import (
"bytes"
"fmt"
"github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/consul/testutil"
"net/http"
"net/http/httptest"
"os"
"reflect"
"testing"

"github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/consul/testutil"
)

func TestKVSEndpoint_PUT_GET_DELETE(t *testing.T) {
Expand Down Expand Up @@ -183,6 +184,93 @@ func TestKVSEndpoint_Recurse(t *testing.T) {
}
}

func TestKVSEndpoint_DELETE_CAS(t *testing.T) {
dir, srv := makeHTTPServer(t)
defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown()

testutil.WaitForLeader(t, srv.agent.RPC, "dc1")

{
buf := bytes.NewBuffer([]byte("test"))
req, err := http.NewRequest("PUT", "/v1/kv/test", buf)
if err != nil {
t.Fatalf("err: %v", err)
}

resp := httptest.NewRecorder()
obj, err := srv.KVSEndpoint(resp, req)
if err != nil {
t.Fatalf("err: %v", err)
}

if res := obj.(bool); !res {
t.Fatalf("should work")
}
}

req, err := http.NewRequest("GET", "/v1/kv/test", nil)
if err != nil {
t.Fatalf("err: %v", err)
}

resp := httptest.NewRecorder()
obj, err := srv.KVSEndpoint(resp, req)
if err != nil {
t.Fatalf("err: %v", err)
}
d := obj.(structs.DirEntries)[0]

// Create a CAS request, bad index
{
buf := bytes.NewBuffer([]byte("zip"))
req, err := http.NewRequest("DELETE",
fmt.Sprintf("/v1/kv/test?cas=%d", d.ModifyIndex-1), buf)
if err != nil {
t.Fatalf("err: %v", err)
}

resp := httptest.NewRecorder()
obj, err := srv.KVSEndpoint(resp, req)
if err != nil {
t.Fatalf("err: %v", err)
}

if res := obj.(bool); res {
t.Fatalf("should NOT work")
}
}

// Create a CAS request, good index
{
buf := bytes.NewBuffer([]byte("zip"))
req, err := http.NewRequest("DELETE",
fmt.Sprintf("/v1/kv/test?cas=%d", d.ModifyIndex), buf)
if err != nil {
t.Fatalf("err: %v", err)
}

resp := httptest.NewRecorder()
obj, err := srv.KVSEndpoint(resp, req)
if err != nil {
t.Fatalf("err: %v", err)
}

if res := obj.(bool); !res {
t.Fatalf("should work")
}
}

// Verify the delete
req, _ = http.NewRequest("GET", "/v1/kv/test", nil)
resp = httptest.NewRecorder()
obj, _ = srv.KVSEndpoint(resp, req)
if obj != nil {
t.Fatalf("should be destroyed")
}
}

func TestKVSEndpoint_CAS(t *testing.T) {
dir, srv := makeHTTPServer(t)
defer os.RemoveAll(dir)
Expand Down
7 changes: 7 additions & 0 deletions consul/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,13 @@ func (c *consulFSM) applyKVSOperation(buf []byte, index uint64) interface{} {
return c.state.KVSSet(index, &req.DirEnt)
case structs.KVSDelete:
return c.state.KVSDelete(index, req.DirEnt.Key)
case structs.KVSDeleteCAS:
act, err := c.state.KVSDeleteCheckAndSet(index, req.DirEnt.Key, req.DirEnt.ModifyIndex)
if err != nil {
return err
} else {
return act
}
case structs.KVSDeleteTree:
return c.state.KVSDeleteTree(index, req.DirEnt.Key)
case structs.KVSCAS:
Expand Down
60 changes: 60 additions & 0 deletions consul/fsm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -603,6 +603,66 @@ func TestFSM_KVSDeleteTree(t *testing.T) {
}
}

func TestFSM_KVSDeleteCheckAndSet(t *testing.T) {
path, err := ioutil.TempDir("", "fsm")
if err != nil {
t.Fatalf("err: %v", err)
}
fsm, err := NewFSM(nil, path, os.Stderr)
if err != nil {
t.Fatalf("err: %v", err)
}
defer fsm.Close()

req := structs.KVSRequest{
Datacenter: "dc1",
Op: structs.KVSSet,
DirEnt: structs.DirEntry{
Key: "/test/path",
Flags: 0,
Value: []byte("test"),
},
}
buf, err := structs.Encode(structs.KVSRequestType, req)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := fsm.Apply(makeLog(buf))
if resp != nil {
t.Fatalf("resp: %v", resp)
}

// Verify key is set
_, d, err := fsm.state.KVSGet("/test/path")
if err != nil {
t.Fatalf("err: %v", err)
}
if d == nil {
t.Fatalf("key missing")
}

// Run the check-and-set
req.Op = structs.KVSDeleteCAS
req.DirEnt.ModifyIndex = d.ModifyIndex
buf, err = structs.Encode(structs.KVSRequestType, req)
if err != nil {
t.Fatalf("err: %v", err)
}
resp = fsm.Apply(makeLog(buf))
if resp.(bool) != true {
t.Fatalf("resp: %v", resp)
}

// Verify key is gone
_, d, err = fsm.state.KVSGet("/test/path")
if err != nil {
t.Fatalf("err: %v", err)
}
if d != nil {
t.Fatalf("bad: %v", d)
}
}

func TestFSM_KVSCheckAndSet(t *testing.T) {
path, err := ioutil.TempDir("", "fsm")
if err != nil {
Expand Down
36 changes: 36 additions & 0 deletions consul/state_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -1299,6 +1299,42 @@ func (s *StateStore) KVSDelete(index uint64, key string) error {
return s.kvsDeleteWithIndex(index, "id", key)
}

// KVSDeleteCheckAndSet is used to perform an atomic delete check-and-set
func (s *StateStore) KVSDeleteCheckAndSet(index uint64, key string, casIndex uint64) (bool, error) {
tx, err := s.tables.StartTxn(false)
if err != nil {
return false, err
}
defer tx.Abort()

// Get the existing node
res, err := s.kvsTable.GetTxn(tx, "id", key)
if err != nil {
return false, err
}

// Get the existing node if any
var exist *structs.DirEntry
if len(res) > 0 {
exist = res[0].(*structs.DirEntry)
}

// Use the casIndex as the constraint. A modify time of 0 means
// we are doign a delete-if-not-exists (odd...), while any other
// value means we expect that modify time.
if casIndex == 0 {
return exist == nil, nil
} else if casIndex > 0 && (exist == nil || exist.ModifyIndex != casIndex) {
return false, nil
}

// Do the actual delete
if err := s.kvsDeleteWithIndexTxn(index, tx, "id", key); err != nil {
return false, err
}
return true, tx.Commit()
}

// KVSDeleteTree is used to delete all keys with a given prefix
func (s *StateStore) KVSDeleteTree(index uint64, prefix string) error {
if prefix == "" {
Expand Down
50 changes: 50 additions & 0 deletions consul/state_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1569,6 +1569,56 @@ func TestKVSDelete(t *testing.T) {
}
}

func TestKVSDeleteCheckAndSet(t *testing.T) {
store, err := testStateStore()
if err != nil {
t.Fatalf("err: %v", err)
}
defer store.Close()

// CAS should fail, no entry
ok, err := store.KVSDeleteCheckAndSet(1000, "/foo", 100)
if err != nil {
t.Fatalf("err: %v", err)
}
if ok {
t.Fatalf("unexpected commit")
}

// CAS should work, no entry
ok, err = store.KVSDeleteCheckAndSet(1000, "/foo", 0)
if err != nil {
t.Fatalf("err: %v", err)
}
if !ok {
t.Fatalf("unexpected failure")
}

// Make an entry
d := &structs.DirEntry{Key: "/foo"}
if err := store.KVSSet(1000, d); err != nil {
t.Fatalf("err: %v", err)
}

// Constrain on a wrong modify time
ok, err = store.KVSDeleteCheckAndSet(1001, "/foo", 42)
if err != nil {
t.Fatalf("err: %v", err)
}
if ok {
t.Fatalf("unexpected commit")
}

// Constrain on a correct modify time
ok, err = store.KVSDeleteCheckAndSet(1002, "/foo", 1000)
if err != nil {
t.Fatalf("err: %v", err)
}
if !ok {
t.Fatalf("expected commit")
}
}

func TestKVSCheckAndSet(t *testing.T) {
store, err := testStateStore()
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions consul/structs/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,7 @@ type KVSOp string
const (
KVSSet KVSOp = "set"
KVSDelete = "delete"
KVSDeleteCAS = "delete-cas" // Delete with check-and-set
KVSDeleteTree = "delete-tree"
KVSCAS = "cas" // Check-and-set
KVSLock = "lock" // Lock a key
Expand Down
14 changes: 11 additions & 3 deletions website/source/docs/agent/http.html.markdown
Original file line number Diff line number Diff line change
Expand Up @@ -212,9 +212,17 @@ then the update has not taken place.
### DELETE method

Lastly, the `DELETE` method can be used to delete a single key or all
keys sharing a prefix. If the "?recurse" query parameter is provided,
then all keys with the prefix are deleted, otherwise only the specified
key.
keys sharing a prefix. There are a number of patameters that can
be used with a DELETE request:

* ?recurse : This is used to delete all keys which have the specified prefix.
Without this, only a key with an exact match will be deleted.

* ?cas=\<index\> : This flag is used to turn the `DELETE` into a Check-And-Set
operation. This is very useful as it allows clients to build more complex
synchronization primitives on top. If the index is 0, then Consul will only
delete the key if it does not already exist (noop). If the index is non-zero, then
the key is only deleted if the index matches the `ModifyIndex` of that key.

## <a name="agent"></a> Agent

Expand Down