Skip to content

Commit

Permalink
d/addresses, d/collect-garbage, changed IStorage to batch API
Browse files Browse the repository at this point in the history
  • Loading branch information
tonsky committed Jul 27, 2023
1 parent e3c3489 commit d673b89
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 16 deletions.
80 changes: 64 additions & 16 deletions src/datascript/storage.clj → src/datascript/core_storage.clj
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,32 @@

(import
'[datascript.db Datom]
'[java.util List]
'[java.util List HashSet]
'[java.io BufferedOutputStream File FileOutputStream OutputStream PushbackReader]
'[me.tonsky.persistent_sorted_set ANode Branch Leaf PersistentSortedSet RefType Settings])

(def ^:private root-addr
#uuid "d5695966-036d-6740-8541-d80034219c28")

(defprotocol IStorage
(-store [_ addr data])
(-restore [_ addr]))
(-store [_ addr+data-seq]
"Gives you a sequence of `[addr data]` pairs to serialize and store.
`addr`s are java.util.UUID.
`data`s are clojure-serializable data structure (maps, keywords, lists, ints etc)")

(-restore [_ addr]
"Read back and deserialize data stored under single `addr`")

(-list-addresses [_]
"Return seq that lists all addresses currently stored in your storage.
Will be used during GC to remove keys that are no longer used.")

(-delete [_ addrs-seq]
"Delete data stored under `addrs` (seq). Will be called during GC"))

; (defprotocol ICache
; (-compute-if-absent [_ key compute-fn]))

(defn output-stream ^OutputStream [^File file]
(let [os (FileOutputStream. file)]
Expand All @@ -34,7 +50,8 @@
(file-storage dir {}))
([dir opts]
(.mkdirs (io/file dir))
(let [name-fn (or (:name-fn opts) str)
(let [addr->filename-fn (or (:addr->filename-fn opts) str)
filename->addr-fn (or (:filename->addr-fn opts) #(UUID/fromString %))
write-fn (or
(:write-fn opts)
(when-some [freeze-fn (:freeze-fn opts)]
Expand All @@ -53,15 +70,24 @@
(with-open [rdr (PushbackReader. (io/reader is))]
(edn/read rdr))))]
(reify IStorage
(-store [_ addr data]
(with-open [os (output-stream (io/file dir (name-fn addr)))]
(write-fn os data)))
(-store [_ addr+data-seq]
(doseq [[addr data] addr+data-seq]
(with-open [os (output-stream (io/file dir (addr->filename-fn addr)))]
(write-fn os data))))

(-restore [_ addr]
(with-open [is (io/input-stream (io/file dir (name-fn addr)))]
(read-fn is)))))))
(with-open [is (io/input-stream (io/file dir (addr->filename-fn addr)))]
(read-fn is)))

(-list-addresses [_]
(mapv #(-> ^File % .getName filename->addr-fn)
(.listFiles (io/file dir))))

(-delete [_ addrs-seq]
(doseq [addr addrs-seq]
(.delete (io/file dir (addr->filename-fn addr)))))))))

(deftype StorageAdapter [^me.tonsky.persistent_sorted_set.IStorage istorage
^Settings settings]
(deftype StorageAdapter [istorage ^Settings settings *buffer]
me.tonsky.persistent_sorted_set.IStorage
(store [_ ^ANode node]
(let [addr (squuid)
Expand All @@ -72,7 +98,8 @@
:keys keys}
(instance? Branch node)
(assoc :addresses (.addresses ^Branch node)))]
(-store istorage addr data)
; (-store istorage addr data)
(vswap! *buffer conj! [addr data])
addr))
(restore [_ addr]
(let [{:keys [level keys addresses]} (-restore istorage addr)
Expand All @@ -97,7 +124,8 @@
(validate-storage (:avet db) istorage)
(let [eavt ^PersistentSortedSet (:eavt db)
settings (.-_settings eavt)
adapter (StorageAdapter. istorage settings)
*buffer (volatile! (transient []))
adapter (StorageAdapter. istorage settings *buffer)
meta (merge
{:schema (:schema db)
:max-eid (:max-eid db)
Expand All @@ -106,20 +134,40 @@
:aevt (set/store (:aevt db) adapter)
:avet (set/store (:avet db) adapter)}
(@#'set/settings->map settings))]
(-store istorage root-addr meta))))
(vswap! *buffer conj! [root-addr meta])
(-store istorage (persistent! @*buffer)))))

(defn restore
([istorage]
(restore istorage {}))
([istorage opts]
(let [root (-restore istorage root-addr)
{:keys [schema eavt aevt avet max-eid max-tx]} root
settings (@#'set/map->settings root)
adapter (StorageAdapter. istorage settings)]
settings (@#'set/map->settings root)
adapter (StorageAdapter. istorage settings nil)]
(db/restore-db
{:schema schema
:eavt (set/restore-by db/cmp-datoms-eavt eavt adapter root)
:aevt (set/restore-by db/cmp-datoms-aevt aevt adapter root)
:avet (set/restore-by db/cmp-datoms-avet avet adapter root)
:max-eid max-eid
:max-tx max-tx}))))

(defn addresses
"Returns all addresses in use by current db. Anything that is not in
the return set is safe to be deleted"
[db]
(let [set (HashSet.)
visit-fn #(.add set %)]
(.add set root-addr)
(.walkAddresses ^PersistentSortedSet (:eavt db) visit-fn)
(.walkAddresses ^PersistentSortedSet (:aevt db) visit-fn)
(.walkAddresses ^PersistentSortedSet (:avet db) visit-fn)
set))

(defn collect-garbage [istorage db]
(let [used (addresses db)
unused (->> (-list-addresses istorage)
(remove #(contains? used %))
(vec))]
(-delete istorage unused)))
4 changes: 4 additions & 0 deletions test_storage/datascript/test_storage.clj
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,10 @@
(d/store streaming-transit-msgpack-storage db) ;; 6.3 sec

(def db' (d/restore streaming-edn-storage))

(count (d/addresses db'))
(count (d/-list-addresses streaming-edn-storage))
(d/collect-garbage streaming-edn-storage db')

(first (:eavt db'))

Expand Down

0 comments on commit d673b89

Please sign in to comment.