Skip to content

Commit

Permalink
Add an etcdctl-based client
Browse files Browse the repository at this point in the history
This client shells out to etcdctl and supports the list-append workload
only--I haven't implemented any other features. I'm only doing this
because the etcd team refuse to believe a bug I reported could actually
be in etcd; they insist it must be in jetcd, and the jetcd maintainers
haven't found anything, and <sigh> there goes my entire Sunday

etcd-io/etcd#14092 (comment)
  • Loading branch information
aphyr committed Oct 3, 2022
1 parent 0ab9e75 commit db598db
Show file tree
Hide file tree
Showing 14 changed files with 339 additions and 120 deletions.
1 change: 1 addition & 0 deletions project.clj
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
]]
[io.netty/netty-codec-http2 "4.1.74.Final"]
[io.netty/netty-handler-proxy "4.1.74.Final"]
[cheshire "5.11.0"]
]
:jvm-opts ["-Djava.awt.headless=true"
"-server"
Expand Down
7 changes: 6 additions & 1 deletion src/jepsen/etcd.clj
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,12 @@

(def cli-opts
"Additional command line options."
[[nil "--corrupt-check" "If set, enables etcd's experimental corruption checking options"]
[[nil "--client-type TYPE" "What kind of client should we use? Either jetcd or etcdctl."
:default :jetcd
:parse-fn keyword
:validate [#{:etcdctl :jetcd} (cli/one-of #{:etcdctl :jetcd})]]

[nil "--corrupt-check" "If set, enables etcd's experimental corruption checking options"]

[nil "--lazyfs" "Mounts etcd in a lazyfs, and causes the kill nemesis to also wipe our unfsynced data files."]

Expand Down
3 changes: 2 additions & 1 deletion src/jepsen/etcd/append.clj
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
"Takes a transaction with reads, and constructs a collection of guards
verifying each read's revision is intact."
[t]
(info :guards (select-keys t [:reads :read-revision]))
(map (fn [k]
(if-let [v (get (:reads t) k)]
; If the key existed, we go by its modification revision
Expand Down Expand Up @@ -110,7 +111,7 @@
(defrecord TxnClient [conn]
client/Client
(open! [this test node]
(assoc this :conn (c/client node)))
(assoc this :conn (c/client test node)))

(setup! [_ test])

Expand Down
154 changes: 108 additions & 46 deletions src/jepsen/etcd/client.clj
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,13 @@
(:refer-clojure :exclude [await get swap!])
(:require [clojure.core :as c]
[clojure.tools.logging :refer [info warn]]
[jepsen.etcd [support :as support]]
[jepsen.etcd.client.txn :as t]
[jepsen.codec :as codec]
[jepsen.etcd [support :as etcd.support]]
[jepsen.etcd.client [etcdctl :as etcdctl]
[support :as s]
[txn :as t]]
[jepsen.util :refer [coll]]
[potemkin]
[slingshot.slingshot :refer [try+ throw+]])
(:import (java.lang AutoCloseable)
(java.net URI)
Expand Down Expand Up @@ -44,6 +48,7 @@
(io.etcd.jetcd.maintenance StatusResponse)
(io.etcd.jetcd.op Cmp
Cmp$Op
CmpTarget
Op
Op$PutOp
Op$GetOp)
Expand All @@ -64,10 +69,35 @@
"A default timeout, in ms."
5000)

(def put-option-with-prev-kv
"A put option which returns the previous kv"
(.. (PutOption/newBuilder) withPrevKV build))

; Serialization

(def ->bytes t/->bytes)
(def bytes-> t/bytes->)
(def ^Charset utf8 (Charset/forName "UTF-8"))

(defn ^ByteSequence str->bytes
"Converts a string to bytes, as UTF-8"
[^String s]
(ByteSequence/from (str s) utf8))

(defn bytes->str
[^ByteSequence bs]
(.toString bs utf8))

(defn ^ByteSequence ->bytes
"Coerces any object to a ByteSequence"
[x]
(if (instance? ByteSequence x)
x
(ByteSequence/from (codec/encode x))))

(defn bytes->
"Coerces a ByteSequence to any object"
[^ByteSequence bs]
(codec/decode (.getBytes bs)))


; Coercing responses to Clojure data
(defprotocol ToClj
Expand Down Expand Up @@ -172,16 +202,25 @@
:exception (.getException r)})
)


; Opening and closing clients

(defn ^Client client
"Builds a client for the given node."
[node]
(.. (Client/builder)
(endpoints (into-array String [(support/client-url node)]))
; (lazyInitialization false)
; (loadBalancerPolicy "some string???")
(build)))
"Builds a client for the given node. If given a test map, chooses what kind
of client to create based on (:client-type test)."
([node]
(.. (Client/builder)
(endpoints (into-array String [(etcd.support/client-url node)]))
; (lazyInitialization false)
; (loadBalancerPolicy "some string???")
(build)))
([test node]
(case (:client-type test)
:jetcd (client node)
:etcdctl (etcdctl/client test node))))

(defrecord EtcdCtl []
)

(defn close!
"Closes any client. Ignores ClosedClientExceptions."
Expand Down Expand Up @@ -381,7 +420,7 @@
"Sets key to value, synchronously."
[c k v]
(-> c kv-client
(.put (->bytes k) (->bytes v) t/put-option-with-prev-kv)
(.put (->bytes k) (->bytes v) put-option-with-prev-kv)
await
->clj))

Expand Down Expand Up @@ -418,42 +457,15 @@
first)))

(defn txn!
"Right now, transactions are all if statements, so this takes 2 or three
arguments: a test (which may be a collection) of guard ops, a true branch,
and a false branch. Branches may be single ops or collections of ops. With
only two args, skips the guard expr."
"Evaluates a transaction on the given client. Takes a predicate, a true
branch, and a false branch, as Clojure structures. See
jepsen.etcd.client.txn for constructors."
([c t-branch]
(txn! c nil t-branch))
([c test t-branch]
(txn! c test t-branch nil))
([c test t f]
(let [t (coll t)
f (coll f)
res (-> (.. (kv-client c)
(txn)
(If (into-array Cmp (coll test)))
(Then (into-array Op t))
(Else (into-array Op f))
(commit))
await
->clj)
; Zip together get/put responses into a single sequence
results (loop [rs (transient [])
ops (seq (if (:succeeded? res) t f))
gets (:gets res)
puts (:puts res)]
(if ops
(condp instance? (first ops)
Op$PutOp (recur (conj! rs (first puts))
(next ops)
gets
(next puts))
Op$GetOp (recur (conj! rs (first gets))
(next ops)
(next gets)
puts))
(persistent! rs)))]
(assoc res :results results))))
([c test t-branch f-branch]
(s/txn! c test t-branch f-branch)))

(defn cas*!
"Like cas!, but raw; returns full txn response map."
Expand Down Expand Up @@ -588,7 +600,7 @@
[client node-or-nodes]
(-> client
cluster-client
(.addMember (map #(URI/create (support/peer-url %)) (coll node-or-nodes)))
(.addMember (map #(URI/create (etcd.support/peer-url %)) (coll node-or-nodes)))
await
->clj))

Expand Down Expand Up @@ -616,7 +628,7 @@
[client node]
(-> client
maintenance-client
(.statusMember (URI/create (support/peer-url node)))
(.statusMember (URI/create (etcd.support/peer-url node)))
await
->clj))

Expand Down Expand Up @@ -667,3 +679,53 @@
"Returns the most current revision for this client."
[c]
(-> c (get* "") :header :revision))

(defn txn->java
"Turns a transaction AST into an appropriate Java datatype. For instance,
[:put \"foo\" \"bar\"] becomes an Op$PutOp."
[x]
(cond (sequential? x)
(let [[type & args] x]
(case type
:= (Cmp. (->bytes (first x)) Cmp$Op/EQUAL (second x))
:< (Cmp. (->bytes (first x)) Cmp$Op/LESS (second x))
:> (Cmp. (->bytes (first x)) Cmp$Op/GREATER (second x))
:mod-revision (CmpTarget/modRevision (first x))
:create-revision (CmpTarget/createRevision (first x))
:value (CmpTarget/value (->bytes (first x)))
:version (CmpTarget/version (first x))
:get (Op/get (->bytes (first x)) GetOption/DEFAULT)
:put (Op/put (->bytes (first x))
(->bytes (second x))
put-option-with-prev-kv)))))

(extend-protocol s/Client Client
(txn! [c test t f]
(let [test (coll test)
t (coll t)
f (coll f)
res (-> (.. (kv-client c)
(txn)
(If (into-array Cmp test))
(Then (into-array Op t))
(Else (into-array Op f))
(commit))
await
->clj)
; Zip together get/put responses into a single sequence
results (loop [rs (transient [])
ops (seq (if (:succeeded? res) t f))
gets (:gets res)
puts (:puts res)]
(if ops
(condp instance? (first ops)
Op$PutOp (recur (conj! rs (first puts))
(next ops)
gets
(next puts))
Op$GetOp (recur (conj! rs (first gets))
(next ops)
(next gets)
puts))
(persistent! rs)))]
(assoc res :results results))))
Loading

0 comments on commit db598db

Please sign in to comment.