Skip to content

Commit

Permalink
[new] [#153] PoC: transducer support on thaw
Browse files Browse the repository at this point in the history
Note: also considered (but ultimately rejected) idea of a separate
`*thaw-mapfn*` opt that operates directly on every `thaw-from-in!`
result.

This (transducer) approach is more flexible, and covers the most
common use cases just fine. Having both seems excessive.
  • Loading branch information
ptaoussanis committed Aug 2, 2023
1 parent 60bc4e9 commit 89f98b4
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 20 deletions.
91 changes: 71 additions & 20 deletions src/taoensso/nippy.clj
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,45 @@

(enc/defonce ^:dynamic *incl-metadata?* "Include metadata when freezing/thawing?" true)

(enc/defonce ^:dynamic *thaw-xform*
"Experimental, subject to change. Feedback welcome.
Transducer to use when thawing standard Clojure collection types
(vectors, maps, sets, lists, etc.).
Allows fast+flexible inspection and manipulation of data being thawed.
Key-val style data structures like maps will provide `MapEntry` args
to reducing function. Use `map-entry?`, `key`, `val` utils for these.
Example transducers:
(map (fn [x] (println x) x)) ; Print each coll item thawed
(comp
(map (fn [x] (if (= x :secret) :redacted x))) ; Replace secrets
(remove (fn [x] ; Remove maps with a truthy :remove?
(or
(and (map? x) (:remove? x))
(and (map-entry? x) (= (key x) :remove?) (val y)))))))
Note that while this is a very powerful feature, correctly writing
and debugging transducers and reducing fns can be tricky.
To help, if Nippy encounters an errors while applying your xform, it
will throw a detailed `ExceptionInfo` with message
\"Error thrown via `*thaw-xform*`\" to help you debug."
nil)

(comment
(binding [*thaw-xform*
(comp
(map (fn [x] (println x) x))
(map (fn [x] (if (= x 1) 0 x)))
(map (fn [x] (/ 1 0))))]

(thaw (freeze [1 1 0 1 1]))))

;;;; Java Serializable config
;; Unfortunately quite a bit of complexity to do this safely

Expand Down Expand Up @@ -1232,6 +1271,7 @@
(opt->bindings :auto-freeze-compressor #'*auto-freeze-compressor*)
(opt->bindings :custom-readers #'*custom-readers*)
(opt->bindings :incl-metadata? #'*incl-metadata?*)
(opt->bindings :thaw-xform #'*thaw-xform*)
(opt->bindings :serializable-allowlist
(case action
:freeze #'*freeze-serializable-allowlist*
Expand Down Expand Up @@ -1355,13 +1395,34 @@

(defmacro ^:private editable? [coll] `(instance? clojure.lang.IEditableCollection ~coll))

(defn- xform* [xform] (enc/catching-xform {:error/msg "Error thrown via `*thaw-xform*`"} xform))

(defn- transduce-thaw1 [^DataInput in xform n init rf]
(let [rf (if xform ((xform* xform) rf) rf)]
(rf (enc/reduce-n (fn [acc _] (rf acc (thaw-from-in! in))) init n))))

(defn- transduce-thaw2 [^DataInput in xform n init rf2 rf1]
(if xform
(let [rf ((xform* xform) rf1)] (rf (enc/reduce-n (fn [acc _] (rf acc (clojure.lang.MapEntry/create (thaw-from-in! in) (thaw-from-in! in)))) init n)))
(let [rf rf2 ] (rf (enc/reduce-n (fn [acc _] (rf acc (thaw-from-in! in) (thaw-from-in! in))) init n)))))

(defn- read-into [to ^DataInput in ^long n]
(if (and (editable? to) (> n 10))
(persistent!
(enc/reduce-n (fn [acc _] (conj! acc (thaw-from-in! in)))
(transient to) n))
(transduce-thaw1 in *thaw-xform* n (transient to) (fn rf ([x] (persistent! x)) ([acc x] (conj! acc x))))
(transduce-thaw1 in *thaw-xform* n to (fn rf ([x] x) ([acc x] (conj acc x))))))

(declare ^:private read-kvs-into)
(defn- read-kvs-depr [to ^DataInput in] (read-kvs-into to in (quot (.readInt in) 2)))
(defn- read-kvs-into [to ^DataInput in ^long n]
(if (and (editable? to) (> n 10))

(enc/reduce-n (fn [acc _] (conj acc (thaw-from-in! in))) to n)))
(transduce-thaw2 in *thaw-xform* n (transient to)
(fn rf2 ([x] (persistent! x)) ([acc k v] (assoc! acc k v)))
(fn rf1 ([x] (persistent! x)) ([acc kv] (assoc! acc (key kv) (val kv)))))

(transduce-thaw2 in *thaw-xform* n to
(fn rf2 ([x] x) ([acc k v] (assoc acc k v)))
(fn rf1 ([x] x) ([acc kv] (assoc acc (key kv) (val kv)))))))

(defn- read-objects [^objects ary ^DataInput in]
(enc/reduce-n
Expand All @@ -1370,17 +1431,6 @@
ary)
ary (alength ary)))

(defn- read-kvs-into [to ^DataInput in ^long n]
(if (and (editable? to) (> n 10))
(persistent!
(enc/reduce-n (fn [acc _] (assoc! acc (thaw-from-in! in) (thaw-from-in! in)))
(transient to) n))

(enc/reduce-n (fn [acc _] (assoc acc (thaw-from-in! in) (thaw-from-in! in)))
to n)))

(defn- read-kvs-depr [to ^DataInput in] (read-kvs-into to in (quot (.readInt in) 2)))

(def ^:private class-method-sig (into-array Class [IPersistentMap]))

(defn- read-custom! [in prefixed? type-id]
Expand Down Expand Up @@ -1593,8 +1643,8 @@
id-regex (re-pattern (thaw-from-in! in))

id-vec-0 []
id-vec-2 [(thaw-from-in! in) (thaw-from-in! in)]
id-vec-3 [(thaw-from-in! in) (thaw-from-in! in) (thaw-from-in! in)]
id-vec-2 (read-into [] in 2)
id-vec-3 (read-into [] in 3)
id-vec-sm* (read-into [] in (read-sm-count* in))
id-vec-sm_ (read-into [] in (read-sm-count in))
id-vec-md (read-into [] in (read-md-count in))
Expand Down Expand Up @@ -1758,7 +1808,7 @@
(do (throw (ex-info (str "Unrecognized :auto encryptor id: " encryptor-id)
{:encryptor-id encryptor-id})))))

(def ^:private err-msg-unknown-thaw-failure "Decryption/decompression failure, or data unfrozen/damaged.")
(def ^:private err-msg-unknown-thaw-failure "Possible decryption/decompression error, unfrozen/damaged data, etc.")
(def ^:private err-msg-unrecognized-header "Unrecognized (but apparently well-formed) header. Data frozen with newer Nippy version?")

(defn fast-thaw
Expand Down Expand Up @@ -1792,7 +1842,7 @@
([^bytes ba
{:as opts
:keys [v1-compatibility? compressor encryptor password
serializable-allowlist incl-metadata?]
serializable-allowlist incl-metadata? thaw-xform]
:or {compressor :auto
encryptor :auto}}]

Expand Down Expand Up @@ -1820,7 +1870,8 @@
'*auto-freeze-compressor* *auto-freeze-compressor*
'*custom-readers* *custom-readers*
'*incl-metadata?* *incl-metadata?*
'*thaw-serializable-allowlist* *thaw-serializable-allowlist*)}
'*thaw-serializable-allowlist* *thaw-serializable-allowlist*
'*thaw-xform* *thaw-xform*)}

e))))

Expand Down
19 changes: 19 additions & 0 deletions test/taoensso/nippy_tests.clj
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,25 @@

"Don't try to preserve metadata on vars")])

;;;; thaw-xform

(deftest _thaw-xform
[(is (= (binding [nippy/*thaw-xform* nil] (thaw (freeze [1 2 :secret 3 4]))) [1 2 :secret 3 4]))
(is (= (binding [nippy/*thaw-xform* (map (fn [x] (if (= x :secret) :redacted x)))] (thaw (freeze [1 2 :secret 3 4]))) [1 2 :redacted 3 4]))

(is (= (binding [nippy/*thaw-xform* (remove (fn [x] (and (map-entry? x) (and (= (key x) :x) (val x)))))]
(thaw (freeze {:a :A, :b :B, :x :X, :c {:x :X}, :d #{:d1 :d2 {:d3 :D3, :x :X}}})))
{:a :A, :b :B, :c {}, :d #{:d1 :d2 {:d3 :D3}}}))

(is (= (binding [nippy/*thaw-xform* (remove (fn [x] (and (map? x) (contains? x :x))))]
(thaw (freeze {:a :A, :b :B, :x :X, :c {:x :X}, :d #{:d1 :d2 {:d3 :D3, :x :X}}})))
{:a :A, :b :B, :x :X, :c {:x :X}, :d #{:d1 :d2}}))

(is (= (binding [nippy/*thaw-xform* (map (fn [x] (/ 1 0)))] (thaw (freeze []))) []) "rf not run on empty colls")

(let [ex (enc/throws :default (binding [nippy/*thaw-xform* (map (fn [x] (/ 1 0)))] (thaw (freeze [:a :b]))))]
(is (= (-> ex enc/ex-cause enc/ex-cause ex-data :call) '(rf acc in)) "Error thrown via `*thaw-xform*`"))])

;;;; Benchmarks

(deftest _benchmarks
Expand Down

0 comments on commit 89f98b4

Please sign in to comment.