Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds support for atomic transactions spanning multiple KV entries. #2028

Merged
merged 24 commits into from
May 15, 2016
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
9edca28
Moves KVS-related state store code out into its own set of files.
May 2, 2016
6c2aeb2
Splits existing KVS operations into *Txn helpers for later reuse.
May 4, 2016
b7ae973
Adds state store support for atomic KVS ops.
May 5, 2016
e491245
Performs basic plumbing of KVS transactions through all the layers.
May 7, 2016
2f51926
Adds an empty get test case.
May 10, 2016
fcb0c20
Adds internal endpoint read ACL support and full unit tests.
May 10, 2016
7a797da
Adds unit tests for HTTP endpoint.
May 10, 2016
44ab1aa
Adds type for API ops and an example transaction.
May 10, 2016
23545f9
Fixes some go vet findings in a unit test.
May 11, 2016
1fefdcb
Terminates pretty responses with a newline.
May 10, 2016
69f58ad
Moves txn code into a new endpoint, not specific to KV.
May 11, 2016
38d0f66
Refactors TxnRequest/TxnResponse into a form that will allow non-KV ops.
May 11, 2016
960b9d6
Switches to "KV" instead of "KV" for the KV operations.
May 11, 2016
4882a9f
De-nests the KV output structure (removes DirEnt member).
May 11, 2016
04d99cd
Makes get fail a transaction if the key doesn't exist.
May 11, 2016
17cd0ac
Adds documentation for the transaction endpoint.
May 11, 2016
8a7428e
Hoists KV processing helper functions up as static functions.
May 12, 2016
9443c6b
Adds a comment for the txnKVS() function.
May 12, 2016
a37bf9d
Adds a read-only optimized path for transactions.
May 13, 2016
fbfb90a
Removes null results for deletes, and preps for more than one result …
May 13, 2016
570d46a
Adds some size limiting features to transactions to help prevent abuse.
May 13, 2016
4bbaf1c
Switches GETs to a filtering model for ACLs.
May 13, 2016
778b975
Adds a get-tree verb to KV transaction operations.
May 13, 2016
6533876
Reduces the number of operations in a transaction to 64.
May 15, 2016
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
146 changes: 146 additions & 0 deletions api/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,42 @@ type KVPair struct {
// KVPairs is a list of KVPair objects
type KVPairs []*KVPair

// KVOp constants give possible operations available in a KVTxn.
type KVOp string

const (
KVSet KVOp = "set"
KVDelete = "delete"
KVDeleteCAS = "delete-cas"
KVDeleteTree = "delete-tree"
KVCAS = "cas"
KVLock = "lock"
KVUnlock = "unlock"
KVGet = "get"
KVCheckSession = "check-session"
KVCheckIndex = "check-index"
)

// KVTxnOp defines a single operation inside a transaction.
type KVTxnOp struct {
Verb string
Key string
Value []byte
Flags uint64
Index uint64
Session string
}

// KVTxnOps defines a set of operations to be performed inside a single
// transaction.
type KVTxnOps []*KVTxnOp

// KVTxnResponse has the outcome of a transaction.
type KVTxnResponse struct {
Results []*KVPair
Errors TxnErrors
}

// KV is used to manipulate the K/V API
type KV struct {
c *Client
Expand Down Expand Up @@ -238,3 +274,113 @@ func (k *KV) deleteInternal(key string, params map[string]string, q *WriteOption
res := strings.Contains(string(buf.Bytes()), "true")
return res, qm, nil
}

// TxnOp is the internal format we send to Consul. It's not specific to KV,
// though currently only KV operations are supported.
type TxnOp struct {
KV *KVTxnOp
}

// TxnOps is a list of transaction operations.
type TxnOps []*TxnOp

// TxnResult is the internal format we receive from Consul.
type TxnResult struct {
KV *KVPair
}

// TxnResults is a list of TxnResult objects.
type TxnResults []*TxnResult

// TxnError is used to return information about an operation in a transaction.
type TxnError struct {
OpIndex int
What string
}

// TxnErrors is a list of TxnError objects.
type TxnErrors []*TxnError

// TxnResponse is the internal format we receive from Consul.
type TxnResponse struct {
Results TxnResults
Errors TxnErrors
}

// Txn is used to apply multiple KV operations in a single, atomic transaction.
// Note that Go will perform the required base64 encoding on the values
// automatically because the type is a byte slice. Transactions are defined as a
// list of operations to perform, using the KVOp constants and KVTxnOp structure
// to define operations. If any operation fails, none of the changes are applied
// to the state store. Note that this hides the internal raw transaction interface
// and munges the input and output types into KV-specific ones for ease of use.
// If there are more non-KV operations in the future we may break out a new
// transaction API client, but it will be easy to keep this KV-specific variant
// supported.
//
// Here's an example:
//
// ops := KVTxnOps{
// &KVTxnOp{
// Verb: KVLock,
// Key: "test/lock",
// Session: "adf4238a-882b-9ddc-4a9d-5b6758e4159e",
// Value: []byte("hello"),
// },
// &KVTxnOp{
// Verb: KVGet,
// Key: "another/key",
// },
// }
// ok, response, _, err := kv.Txn(&ops, nil)
//
// If there is a problem making the transaction request then an error will be
// returned. Otherwise, the ok value will be true if the transaction succeeded
// or false if it was rolled back. The response is a structured return value which
// will have the outcome of the transaction. Its Results member will have entries
// for each operation. Deleted keys will have a nil entry in the, and to save
// space, the Value of each key in the Results will be nil unless the operation
// is a KVGet. If the transaction was rolled back, the Errors member will have
// entries referencing the index of the operation that failed along with an error
// message.
func (k *KV) Txn(txn KVTxnOps, q *WriteOptions) (bool, *KVTxnResponse, *WriteMeta, error) {
r := k.c.newRequest("PUT", "/v1/txn")
r.setWriteOptions(q)

// Convert into the internal format since this is an all-KV txn.
ops := make(TxnOps, 0, len(txn))
for _, kvOp := range txn {
ops = append(ops, &TxnOp{KV: kvOp})
}
r.obj = ops
rtt, resp, err := k.c.doRequest(r)
if err != nil {
return false, nil, nil, err
}
defer resp.Body.Close()

wm := &WriteMeta{}
wm.RequestTime = rtt

if resp.StatusCode == http.StatusOK || resp.StatusCode == http.StatusConflict {
var txnResp TxnResponse
if err := decodeBody(resp, &txnResp); err != nil {
return false, nil, nil, err
}

// Convert from the internal format.
kvResp := KVTxnResponse{
Errors: txnResp.Errors,
}
for _, result := range txnResp.Results {
kvResp.Results = append(kvResp.Results, result.KV)
}
return resp.StatusCode == http.StatusOK, &kvResp, wm, nil
}

var buf bytes.Buffer
if _, err := io.Copy(&buf, resp.Body); err != nil {
return false, nil, nil, fmt.Errorf("Failed to read response: %v", err)
}
return false, nil, nil, fmt.Errorf("Failed request: %s", buf.String())
}
92 changes: 92 additions & 0 deletions api/kv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package api
import (
"bytes"
"path"
"strings"
"testing"
"time"
)
Expand Down Expand Up @@ -445,3 +446,94 @@ func TestClient_AcquireRelease(t *testing.T) {
t.Fatalf("unexpected value: %#v", meta)
}
}

func TestClient_Txn(t *testing.T) {
t.Parallel()
c, s := makeClient(t)
defer s.Stop()

session := c.Session()
kv := c.KV()

// Make a session.
id, _, err := session.CreateNoChecks(nil, nil)
if err != nil {
t.Fatalf("err: %v", err)
}
defer session.Destroy(id, nil)

// Acquire and get the key via a transaction, but don't supply a valid
// session.
key := testKey()
value := []byte("test")
txn := KVTxnOps{
&KVTxnOp{
Verb: KVLock,
Key: key,
Value: value,
},
&KVTxnOp{
Verb: KVGet,
Key: key,
},
}
ok, ret, _, err := kv.Txn(txn, nil)
if err != nil {
t.Fatalf("err: %v", err)
} else if ok {
t.Fatalf("transaction should have failed")
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Basically the previous ~20LOC but added to the function comment block for Txn.


if ret == nil || len(ret.Errors) != 2 || len(ret.Results) != 0 {
t.Fatalf("bad: %v", ret)
}
if ret.Errors[0].OpIndex != 0 ||
!strings.Contains(ret.Errors[0].What, "missing session") ||
!strings.Contains(ret.Errors[1].What, "doesn't exist") {
t.Fatalf("bad: %v", ret.Errors[0])
}

// Now poke in a real session and try again.
txn[0].Session = id
ok, ret, _, err = kv.Txn(txn, nil)
if err != nil {
t.Fatalf("err: %v", err)
} else if !ok {
t.Fatalf("transaction failure")
}

if ret == nil || len(ret.Errors) != 0 || len(ret.Results) != 2 {
t.Fatalf("bad: %v", ret)
}
for i, result := range ret.Results {
var expected []byte
if i == 1 {
expected = value
}

if result.Key != key ||
!bytes.Equal(result.Value, expected) ||
result.Session != id ||
result.LockIndex != 1 {
t.Fatalf("bad: %v", result)
}
}

// Sanity check using the regular GET API.
pair, meta, err := kv.Get(key, nil)
if err != nil {
t.Fatalf("err: %v", err)
}
if pair == nil {
t.Fatalf("expected value: %#v", pair)
}
if pair.LockIndex != 1 {
t.Fatalf("Expected lock: %v", pair)
}
if pair.Session != id {
t.Fatalf("Expected lock: %v", pair)
}
if meta.LastIndex == 0 {
t.Fatalf("unexpected value: %#v", meta)
}
}
33 changes: 23 additions & 10 deletions command/agent/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,8 @@ func (s *HTTPServer) registerHandlers(enableDebug bool) {
s.mux.HandleFunc("/v1/query", s.wrap(s.PreparedQueryGeneral))
s.mux.HandleFunc("/v1/query/", s.wrap(s.PreparedQuerySpecific))

s.mux.HandleFunc("/v1/txn", s.wrap(s.Txn))

if enableDebug {
s.mux.HandleFunc("/debug/pprof/", pprof.Index)
s.mux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)
Expand Down Expand Up @@ -342,28 +344,39 @@ func (s *HTTPServer) wrap(handler func(resp http.ResponseWriter, req *http.Reque
return
}

prettyPrint := false
if _, ok := req.URL.Query()["pretty"]; ok {
prettyPrint = true
}
// Write out the JSON object
if obj != nil {
var buf []byte
if prettyPrint {
buf, err = json.MarshalIndent(obj, "", " ")
} else {
buf, err = json.Marshal(obj)
}
buf, err = s.marshalJSON(req, obj)
if err != nil {
goto HAS_ERR
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've been wanting to do this for a while:

if prettyPrint {
  buf += "\n"
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm-a sneak that in since I'm touching that code as part of this change.

resp.Header().Set("Content-Type", "application/json")
resp.Write(buf)
}
}
return f
}

// marshalJSON marshals the object into JSON, respecting the user's pretty-ness
// configuration.
func (s *HTTPServer) marshalJSON(req *http.Request, obj interface{}) ([]byte, error) {
if _, ok := req.URL.Query()["pretty"]; ok {
buf, err := json.MarshalIndent(obj, "", " ")
if err != nil {
return nil, err
}
buf = append(buf, "\n"...)
return buf, nil
}

buf, err := json.Marshal(obj)
if err != nil {
return nil, err
}
return buf, err
}

// Returns true if the UI is enabled.
func (s *HTTPServer) IsUIEnabled() bool {
return s.uiDir != "" || s.agent.config.EnableUi
Expand Down
1 change: 1 addition & 0 deletions command/agent/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,7 @@ func testPrettyPrint(pretty string, t *testing.T) {
srv.wrap(handler)(resp, req)

expected, _ := json.MarshalIndent(r, "", " ")
expected = append(expected, "\n"...)
actual, err := ioutil.ReadAll(resp.Body)
if err != nil {
t.Fatalf("err: %s", err)
Expand Down
Loading