From db598db50ba2e45d22147a38860db5d729712032 Mon Sep 17 00:00:00 2001 From: Aphyr Date: Sun, 2 Oct 2022 22:07:30 -0400 Subject: [PATCH] Add an etcdctl-based client This client shells out to etcdctl and supports the list-append workload only--I haven't implemented any other features. I'm only doing this because the etcd team refuse to believe a bug I reported could actually be in etcd; they insist it must be in jetcd, and the jetcd maintainers haven't found anything, and there goes my entire Sunday https://github.com/etcd-io/etcd/issues/14092#issuecomment-1260571355 --- project.clj | 1 + src/jepsen/etcd.clj | 7 +- src/jepsen/etcd/append.clj | 3 +- src/jepsen/etcd/client.clj | 154 +++++++++++++++++++-------- src/jepsen/etcd/client/etcdctl.clj | 163 +++++++++++++++++++++++++++++ src/jepsen/etcd/client/support.clj | 6 ++ src/jepsen/etcd/client/txn.clj | 72 +++---------- src/jepsen/etcd/db.clj | 9 +- src/jepsen/etcd/lock.clj | 6 +- src/jepsen/etcd/nemesis.clj | 5 +- src/jepsen/etcd/register.clj | 2 +- src/jepsen/etcd/set.clj | 2 +- src/jepsen/etcd/support.clj | 27 ++++- src/jepsen/etcd/watch.clj | 2 +- 14 files changed, 339 insertions(+), 120 deletions(-) create mode 100644 src/jepsen/etcd/client/etcdctl.clj create mode 100644 src/jepsen/etcd/client/support.clj diff --git a/project.clj b/project.clj index 6b3e7f4..a5ca73c 100644 --- a/project.clj +++ b/project.clj @@ -13,6 +13,7 @@ ]] [io.netty/netty-codec-http2 "4.1.74.Final"] [io.netty/netty-handler-proxy "4.1.74.Final"] + [cheshire "5.11.0"] ] :jvm-opts ["-Djava.awt.headless=true" "-server" diff --git a/src/jepsen/etcd.clj b/src/jepsen/etcd.clj index 8fcd8e9..9a92cea 100644 --- a/src/jepsen/etcd.clj +++ b/src/jepsen/etcd.clj @@ -148,7 +148,12 @@ (def cli-opts "Additional command line options." - [[nil "--corrupt-check" "If set, enables etcd's experimental corruption checking options"] + [[nil "--client-type TYPE" "What kind of client should we use? Either jetcd or etcdctl." + :default :jetcd + :parse-fn keyword + :validate [#{:etcdctl :jetcd} (cli/one-of #{:etcdctl :jetcd})]] + + [nil "--corrupt-check" "If set, enables etcd's experimental corruption checking options"] [nil "--lazyfs" "Mounts etcd in a lazyfs, and causes the kill nemesis to also wipe our unfsynced data files."] diff --git a/src/jepsen/etcd/append.clj b/src/jepsen/etcd/append.clj index 19cb956..cec0718 100644 --- a/src/jepsen/etcd/append.clj +++ b/src/jepsen/etcd/append.clj @@ -55,6 +55,7 @@ "Takes a transaction with reads, and constructs a collection of guards verifying each read's revision is intact." [t] + (info :guards (select-keys t [:reads :read-revision])) (map (fn [k] (if-let [v (get (:reads t) k)] ; If the key existed, we go by its modification revision @@ -110,7 +111,7 @@ (defrecord TxnClient [conn] client/Client (open! [this test node] - (assoc this :conn (c/client node))) + (assoc this :conn (c/client test node))) (setup! [_ test]) diff --git a/src/jepsen/etcd/client.clj b/src/jepsen/etcd/client.clj index e227191..e0b8f2c 100644 --- a/src/jepsen/etcd/client.clj +++ b/src/jepsen/etcd/client.clj @@ -3,9 +3,13 @@ (:refer-clojure :exclude [await get swap!]) (:require [clojure.core :as c] [clojure.tools.logging :refer [info warn]] - [jepsen.etcd [support :as support]] - [jepsen.etcd.client.txn :as t] + [jepsen.codec :as codec] + [jepsen.etcd [support :as etcd.support]] + [jepsen.etcd.client [etcdctl :as etcdctl] + [support :as s] + [txn :as t]] [jepsen.util :refer [coll]] + [potemkin] [slingshot.slingshot :refer [try+ throw+]]) (:import (java.lang AutoCloseable) (java.net URI) @@ -44,6 +48,7 @@ (io.etcd.jetcd.maintenance StatusResponse) (io.etcd.jetcd.op Cmp Cmp$Op + CmpTarget Op Op$PutOp Op$GetOp) @@ -64,10 +69,35 @@ "A default timeout, in ms." 5000) +(def put-option-with-prev-kv + "A put option which returns the previous kv" + (.. (PutOption/newBuilder) withPrevKV build)) + ; Serialization -(def ->bytes t/->bytes) -(def bytes-> t/bytes->) +(def ^Charset utf8 (Charset/forName "UTF-8")) + +(defn ^ByteSequence str->bytes + "Converts a string to bytes, as UTF-8" + [^String s] + (ByteSequence/from (str s) utf8)) + +(defn bytes->str + [^ByteSequence bs] + (.toString bs utf8)) + +(defn ^ByteSequence ->bytes + "Coerces any object to a ByteSequence" + [x] + (if (instance? ByteSequence x) + x + (ByteSequence/from (codec/encode x)))) + +(defn bytes-> + "Coerces a ByteSequence to any object" + [^ByteSequence bs] + (codec/decode (.getBytes bs))) + ; Coercing responses to Clojure data (defprotocol ToClj @@ -172,16 +202,25 @@ :exception (.getException r)}) ) + ; Opening and closing clients (defn ^Client client - "Builds a client for the given node." - [node] - (.. (Client/builder) - (endpoints (into-array String [(support/client-url node)])) - ; (lazyInitialization false) - ; (loadBalancerPolicy "some string???") - (build))) + "Builds a client for the given node. If given a test map, chooses what kind + of client to create based on (:client-type test)." + ([node] + (.. (Client/builder) + (endpoints (into-array String [(etcd.support/client-url node)])) + ; (lazyInitialization false) + ; (loadBalancerPolicy "some string???") + (build))) + ([test node] + (case (:client-type test) + :jetcd (client node) + :etcdctl (etcdctl/client test node)))) + +(defrecord EtcdCtl [] + ) (defn close! "Closes any client. Ignores ClosedClientExceptions." @@ -381,7 +420,7 @@ "Sets key to value, synchronously." [c k v] (-> c kv-client - (.put (->bytes k) (->bytes v) t/put-option-with-prev-kv) + (.put (->bytes k) (->bytes v) put-option-with-prev-kv) await ->clj)) @@ -418,42 +457,15 @@ first))) (defn txn! - "Right now, transactions are all if statements, so this takes 2 or three - arguments: a test (which may be a collection) of guard ops, a true branch, - and a false branch. Branches may be single ops or collections of ops. With - only two args, skips the guard expr." + "Evaluates a transaction on the given client. Takes a predicate, a true + branch, and a false branch, as Clojure structures. See + jepsen.etcd.client.txn for constructors." ([c t-branch] (txn! c nil t-branch)) ([c test t-branch] (txn! c test t-branch nil)) - ([c test t f] - (let [t (coll t) - f (coll f) - res (-> (.. (kv-client c) - (txn) - (If (into-array Cmp (coll test))) - (Then (into-array Op t)) - (Else (into-array Op f)) - (commit)) - await - ->clj) - ; Zip together get/put responses into a single sequence - results (loop [rs (transient []) - ops (seq (if (:succeeded? res) t f)) - gets (:gets res) - puts (:puts res)] - (if ops - (condp instance? (first ops) - Op$PutOp (recur (conj! rs (first puts)) - (next ops) - gets - (next puts)) - Op$GetOp (recur (conj! rs (first gets)) - (next ops) - (next gets) - puts)) - (persistent! rs)))] - (assoc res :results results)))) + ([c test t-branch f-branch] + (s/txn! c test t-branch f-branch))) (defn cas*! "Like cas!, but raw; returns full txn response map." @@ -588,7 +600,7 @@ [client node-or-nodes] (-> client cluster-client - (.addMember (map #(URI/create (support/peer-url %)) (coll node-or-nodes))) + (.addMember (map #(URI/create (etcd.support/peer-url %)) (coll node-or-nodes))) await ->clj)) @@ -616,7 +628,7 @@ [client node] (-> client maintenance-client - (.statusMember (URI/create (support/peer-url node))) + (.statusMember (URI/create (etcd.support/peer-url node))) await ->clj)) @@ -667,3 +679,53 @@ "Returns the most current revision for this client." [c] (-> c (get* "") :header :revision)) + +(defn txn->java + "Turns a transaction AST into an appropriate Java datatype. For instance, + [:put \"foo\" \"bar\"] becomes an Op$PutOp." + [x] + (cond (sequential? x) + (let [[type & args] x] + (case type + := (Cmp. (->bytes (first x)) Cmp$Op/EQUAL (second x)) + :< (Cmp. (->bytes (first x)) Cmp$Op/LESS (second x)) + :> (Cmp. (->bytes (first x)) Cmp$Op/GREATER (second x)) + :mod-revision (CmpTarget/modRevision (first x)) + :create-revision (CmpTarget/createRevision (first x)) + :value (CmpTarget/value (->bytes (first x))) + :version (CmpTarget/version (first x)) + :get (Op/get (->bytes (first x)) GetOption/DEFAULT) + :put (Op/put (->bytes (first x)) + (->bytes (second x)) + put-option-with-prev-kv))))) + +(extend-protocol s/Client Client + (txn! [c test t f] + (let [test (coll test) + t (coll t) + f (coll f) + res (-> (.. (kv-client c) + (txn) + (If (into-array Cmp test)) + (Then (into-array Op t)) + (Else (into-array Op f)) + (commit)) + await + ->clj) + ; Zip together get/put responses into a single sequence + results (loop [rs (transient []) + ops (seq (if (:succeeded? res) t f)) + gets (:gets res) + puts (:puts res)] + (if ops + (condp instance? (first ops) + Op$PutOp (recur (conj! rs (first puts)) + (next ops) + gets + (next puts)) + Op$GetOp (recur (conj! rs (first gets)) + (next ops) + (next gets) + puts)) + (persistent! rs)))] + (assoc res :results results)))) diff --git a/src/jepsen/etcd/client/etcdctl.clj b/src/jepsen/etcd/client/etcdctl.clj new file mode 100644 index 0000000..cf382ff --- /dev/null +++ b/src/jepsen/etcd/client/etcdctl.clj @@ -0,0 +1,163 @@ +(ns jepsen.etcd.client.etcdctl + "A partial implementation of a client which shells out to etcdctl on the + given node. We encode values using pr-str to make things easier to debug, + rather than bytes." + (:require [clojure [edn :as edn] + [string :as str]] + [clojure.tools.logging :refer [info warn]] + [cheshire.core :as json] + [jepsen [control :as c] + [util :as util :refer [coll]]] + [jepsen.etcd [support :as support]] + [jepsen.etcd.client.support :as s] + [slingshot.slingshot :refer [try+ throw+]]) + (:import (java.util Base64))) + +(defn etcdctl! + "Runs an etcdctl command, passing the given stdin and returning parsed JSON." + [test node cmd in] + (try+ (let [res (val (first (c/on-nodes test [node] + (fn [_ _] + (support/etcdctl! [cmd :-w :json] + :in in)))))] + (json/parse-string res true)) + ; Rewrite errors + (catch [:type :jepsen.control/nonzero-exit, :exit 1] e + (let [err (:err e) + ;_ (warn :caught err) + first-msg (first (str/split-lines (:err e)))] + (if (re-find #"^\{" first-msg) + ; Maybe JSON? + (let [parsed (json/parse-string (:err e) true) + error (:error parsed)] + ; They squirrel away the actual error message in an error + ; field--msg is often something useless like "retrying of unary + ; invoker failed" + ;(warn parsed) + (throw+ + (condp re-find error + #"duplicate key" + {:definite? true, :type :duplicate-key, :description error}))) + ; Not JSON + (throw+ {:definite? false, :type :etcdctl, :description err})))))) + +(defn parse-header + "Interprets a header" + [header] + {:member-id (:member_id header) + :revision (:revision header) + :raft-term (:raft_term header)}) + +(defn parse-kv + "Interprets a KV response" + [{:keys [key create_revision mod_revision version value]}] + (clojure.lang.MapEntry. + (-> (Base64/getDecoder) + (.decode key) + String. + edn/read-string) + {:value (-> (Base64/getDecoder) + (.decode value) + String. + edn/read-string) + :version version + :create-revision create_revision + :mod-revision mod_revision})) + +(defn parse-kvs + "Interprets a series of KV responses" + [kvs] + (into {} (mapv parse-kv kvs))) + +(defn parse-response + "Interprets a JSON Response object." + [response] + (let [r (:Response response) + ks (keys r) + _ (assert (= 1 (count ks)) + (str "Unexpected multi-key response " (pr-str response))) + k (first ks) + v (get r k)] + (case k + :response_put {:header (parse-header (:header v))} + :response_range {:header (parse-header (:header v)) + :count (:count v) + ; :more? (< (count (:vs v)) (:count v)) + :kvs (parse-kvs (:kvs v))}))) + +(defn parse-res + "Massages a JSON response into the same form the client expects." + [res] + (let [{:keys [header succeeded responses]} res] + {:succeeded? succeeded + :header (parse-header header) + :responses (mapv parse-response responses)})) + +(defn txn->text + "Turns a transaction AST (a Clojure structure) representing a transaction + into text for etcdctl. See + https://chromium.googlesource.com/external/github.com/coreos/etcd/+/release-3.0/etcdctl/README.md#txn-options + for the syntax here." + [x] + (cond ; Hope escaping is sufficient for our purposes; this is a quick hack + (string? x) + (pr-str x) + + ; Integers are encoded with double-quotes too? I guess? + (integer? x) + (str "\"" x "\"") + + (sequential? x) + (let [[type & args] x] + (case type + :txn (let [[pred t-branch f-branch] args] + (str/join "\n" + (concat (map txn->text (coll pred)) + [""] + (map txn->text (coll t-branch)) + [""] + (map txn->text (coll f-branch)) + ["\n"]))) + + ; arrrrgh they encode the syntax tree two incompatible ways between + ; jetcd and the etcdctl text version. Our AST is [:< k [:mod 5]], + ; but we have to spit out mod(k) < 5. Always put the mod (or other + ; fun) first. + (:= :< :>) + (let [[k target] args + [fun value] target] + (str (case fun + :mod-revision "mod" + :value "val" + :version "ver") + "(" (txn->text k) ") " (name type) " " (txn->text value))) + + :put (str "put " (first args) " " (txn->text (pr-str (second args)))) + :get (str "get " (first args)))))) + +(defrecord EtcdctlClient [test node] + s/Client + (txn! + [this pred t-branch f-branch] + (let [txn [:txn pred t-branch f-branch] + _ (info :txn txn "\n" (txn->text txn)) + raw-res (etcdctl! test node :txn (txn->text txn)) + _ (info :raw-res (util/pprint-str raw-res)) + res (parse-res raw-res) + ; Parse get/put results + _ (info :parsed (util/pprint-str res)) + res (-> res + (dissoc :responses) + (assoc :results + (->> res + :responses)))] + (info :res (util/pprint-str res)) + res)) + + java.lang.AutoCloseable + (close [this])) + +(defn client + "Constructs a client for the given test and node." + [test node] + (EtcdctlClient. test node)) diff --git a/src/jepsen/etcd/client/support.clj b/src/jepsen/etcd/client/support.clj new file mode 100644 index 0000000..62268ec --- /dev/null +++ b/src/jepsen/etcd/client/support.clj @@ -0,0 +1,6 @@ +(ns jepsen.etcd.client.support + "Basic functions for working with clients.") + +(defprotocol Client + (txn! [client pred t-branch false-branch] + "Takes a predicate test, an optional true branch, and an optional false branch. See jepsen.etcd.client.txn for how to construct these arguments.")) diff --git a/src/jepsen/etcd/client/txn.clj b/src/jepsen/etcd/client/txn.clj index 71dc23f..09db3fe 100644 --- a/src/jepsen/etcd/client/txn.clj +++ b/src/jepsen/etcd/client/txn.clj @@ -1,87 +1,49 @@ (ns jepsen.etcd.client.txn - "Support functions for constructing transactional expressions." - (:refer-clojure :exclude [get < = > compare]) - (:require [jepsen [codec :as codec]]) - (:import (java.nio.charset Charset) - (io.etcd.jetcd ByteSequence) - (io.etcd.jetcd.options GetOption - PutOption) - (io.etcd.jetcd.op Cmp - Cmp$Op - CmpTarget - Op - Op$PutOp - Op$GetOp))) + "Support functions for constructing transactional expressions. These + construct little Clojure ASTs like [:< [:mod-revison \"foo\" 5] 6]." + (:refer-clojure :exclude [get < = > compare])) -(def ^Charset utf8 (Charset/forName "UTF-8")) - -(defn ^ByteSequence str->bytes - "Converts a string to bytes, as UTF-8" - [^String s] - (ByteSequence/from (str s) utf8)) - -(defn bytes->str - [^ByteSequence bs] - (.toString bs utf8)) - -(defn ^ByteSequence ->bytes - "Coerces any object to a ByteSequence" - [x] - (if (instance? ByteSequence x) - x - (ByteSequence/from (codec/encode x)))) - -(defn bytes-> - "Coerces a ByteSequence to any object" - [^ByteSequence bs] - (codec/decode (.getBytes bs))) - -(def put-option-with-prev-kv - "A put option which returns the previous kv" - (.. (PutOption/newBuilder) withPrevKV build)) - -; Transactional Ops -(defn ^Op$GetOp get +(defn get "Constructs a get operation." [k] - (Op/get (->bytes k) GetOption/DEFAULT)) + [:get k]) -(defn ^Op$PutOp put +(defn put "Constructs a put operation." [k v] - (Op/put (->bytes k) (->bytes v) put-option-with-prev-kv)) + [:put k v]) (defn version "A comparison target by version." - [^long v] - (CmpTarget/version v)) + [v] + [:version v]) (defn value "A comparison target by value." [v] - (CmpTarget/value (->bytes v))) + [:value v]) (defn mod-revision "A comparison target by modification revision." - [^long revision] - (CmpTarget/modRevision revision)) + [revision] + [:mod-revision revision]) (defn create-revision "A comparison target by creation revision." - [^long revision] - (CmpTarget/createRevision revision)) + [revision] + [:create-revision revision]) (defn = "Constructs an equality comparison between key and target." [k target] - (Cmp. (->bytes k) Cmp$Op/EQUAL target)) + [:= k target]) (defn < "Constructs an LESS inequality comparison between key and target." [k target] - (Cmp. (->bytes k) Cmp$Op/LESS target)) + [:< k target]) (defn > "Constructs an GREATER inequality comparison between key and target." [k target] - (Cmp. (->bytes k) Cmp$Op/GREATER target)) + [:> k target]) diff --git a/src/jepsen/etcd/db.clj b/src/jepsen/etcd/db.clj index d5135f0..72866df 100644 --- a/src/jepsen/etcd/db.clj +++ b/src/jepsen/etcd/db.clj @@ -16,7 +16,7 @@ [support :as s]] [slingshot.slingshot :refer [throw+ try+]])) -(def dir "/opt/etcd") +(def dir s/dir) (def binary "etcd") (def logfile (str dir "/etcd.log")) (def pidfile (str dir "/etcd.pid")) @@ -26,13 +26,6 @@ [node] (str dir "/" node ".etcd")) -(defn etcdctl! - "Runs an etcdctl command on the local node." - [& args] - (c/su - (c/exec (str dir "/etcdctl") - :--endpoints (s/client-url (cn/local-ip)) args))) - (defn wipe! "Wipes data files on the current node." [test node] diff --git a/src/jepsen/etcd/lock.clj b/src/jepsen/etcd/lock.clj index 25474a6..a9529f6 100644 --- a/src/jepsen/etcd/lock.clj +++ b/src/jepsen/etcd/lock.clj @@ -92,7 +92,7 @@ client/Client (open! [this test node] (assoc this - :conn (c/client node) + :conn (c/client test node) :lease+lock (atom nil))) (setup! [this test]) @@ -139,7 +139,7 @@ (defrecord LockingSetClient [conn lock-name latency set] client/Client (open! [this test node] - (assoc this :conn (c/client node))) + (assoc this :conn (c/client test node))) (setup! [this test]) @@ -185,7 +185,7 @@ (defrecord LockingEtcdSetClient [conn lock-name latency k] client/Client (open! [this test node] - (assoc this :conn (c/client node))) + (assoc this :conn (c/client test node))) (setup! [this test]) diff --git a/src/jepsen/etcd/nemesis.clj b/src/jepsen/etcd/nemesis.clj index e444a01..82a9ad0 100644 --- a/src/jepsen/etcd/nemesis.clj +++ b/src/jepsen/etcd/nemesis.clj @@ -11,7 +11,8 @@ [jepsen.nemesis [combined :as nc] [time :as nt]] [jepsen.etcd [client :as client] - [db :as db]] + [db :as db] + [support :as s]] [slingshot.slingshot :refer [try+ throw+]])) (defn member-nemesis @@ -91,7 +92,7 @@ (fn [_ _] (info "Defragmenting") (try+ - (db/etcdctl! :defrag) + (s/etcdctl! :defrag) :defragged (catch [:exit 1] e (condp re-find (:err e) diff --git a/src/jepsen/etcd/register.clj b/src/jepsen/etcd/register.clj index 4d060fe..4085456 100644 --- a/src/jepsen/etcd/register.clj +++ b/src/jepsen/etcd/register.clj @@ -15,7 +15,7 @@ (defrecord Client [conn] client/Client (open! [this test node] - (assoc this :conn (c/client node))) + (assoc this :conn (c/client test node))) (setup! [this test]) diff --git a/src/jepsen/etcd/set.clj b/src/jepsen/etcd/set.clj index ba728ee..50df8c6 100644 --- a/src/jepsen/etcd/set.clj +++ b/src/jepsen/etcd/set.clj @@ -9,7 +9,7 @@ (defrecord SetClient [k conn] client/Client (open! [this test node] - (assoc this :conn (c/client node))) + (assoc this :conn (c/client test node))) (setup! [_ test] (c/put! conn k #{})) diff --git a/src/jepsen/etcd/support.clj b/src/jepsen/etcd/support.clj index 2de8e30..b2e8d68 100644 --- a/src/jepsen/etcd/support.clj +++ b/src/jepsen/etcd/support.clj @@ -1,6 +1,10 @@ (ns jepsen.etcd.support (:require [clojure.string :as str] - [jepsen.control.net :as c.net])) + [jepsen [control :as c]] + [jepsen.control [core :as c.core] + [net :as c.net]])) + +(def dir "/opt/etcd") (defn node-url "An HTTP url for connecting to a node on a particular port." @@ -25,3 +29,24 @@ (map (fn [node] (str node "=" (peer-url node)))) (str/join ","))) + +(defn etcdctl! + "Runs an etcdctl command with the current control session, against the local + node. Takes an optional :in argument, after which should come a stdin + string." + [& args] + (let [[command [_ stdin]] (split-with (complement #{:in}) args) + cmd (->> [(str dir "/etcdctl") + :--endpoints (client-url (c.net/local-ip)) + command] + (map c/escape) + (str/join " ")) + action {:cmd cmd, :in stdin}] + (c/su + (-> action + c/wrap-cd + c/wrap-sudo + c/wrap-trace + c/ssh* + c.core/throw-on-nonzero-exit + c/just-stdout)))) diff --git a/src/jepsen/etcd/watch.clj b/src/jepsen/etcd/watch.clj index 0055fed..845a31f 100644 --- a/src/jepsen/etcd/watch.clj +++ b/src/jepsen/etcd/watch.clj @@ -215,7 +215,7 @@ client/Client (open! [this test node] (assoc this - :conn (c/client node) + :conn (c/client test node) :revision (atom 0))) (setup! [this test])