diff --git a/src/datascript/storage.clj b/src/datascript/core_storage.clj similarity index 60% rename from src/datascript/storage.clj rename to src/datascript/core_storage.clj index 47c7bce3..f052b81e 100644 --- a/src/datascript/storage.clj +++ b/src/datascript/core_storage.clj @@ -9,7 +9,7 @@ (import '[datascript.db Datom] - '[java.util List] + '[java.util List HashSet] '[java.io BufferedOutputStream File FileOutputStream OutputStream PushbackReader] '[me.tonsky.persistent_sorted_set ANode Branch Leaf PersistentSortedSet RefType Settings]) @@ -17,8 +17,24 @@ #uuid "d5695966-036d-6740-8541-d80034219c28") (defprotocol IStorage - (-store [_ addr data]) - (-restore [_ addr])) + (-store [_ addr+data-seq] + "Gives you a sequence of `[addr data]` pairs to serialize and store. + + `addr`s are java.util.UUID. + `data`s are clojure-serializable data structure (maps, keywords, lists, ints etc)") + + (-restore [_ addr] + "Read back and deserialize data stored under single `addr`") + + (-list-addresses [_] + "Return seq that lists all addresses currently stored in your storage. + Will be used during GC to remove keys that are no longer used.") + + (-delete [_ addrs-seq] + "Delete data stored under `addrs` (seq). Will be called during GC")) + +; (defprotocol ICache +; (-compute-if-absent [_ key compute-fn])) (defn output-stream ^OutputStream [^File file] (let [os (FileOutputStream. file)] @@ -34,7 +50,8 @@ (file-storage dir {})) ([dir opts] (.mkdirs (io/file dir)) - (let [name-fn (or (:name-fn opts) str) + (let [addr->filename-fn (or (:addr->filename-fn opts) str) + filename->addr-fn (or (:filename->addr-fn opts) #(UUID/fromString %)) write-fn (or (:write-fn opts) (when-some [freeze-fn (:freeze-fn opts)] @@ -53,15 +70,24 @@ (with-open [rdr (PushbackReader. (io/reader is))] (edn/read rdr))))] (reify IStorage - (-store [_ addr data] - (with-open [os (output-stream (io/file dir (name-fn addr)))] - (write-fn os data))) + (-store [_ addr+data-seq] + (doseq [[addr data] addr+data-seq] + (with-open [os (output-stream (io/file dir (addr->filename-fn addr)))] + (write-fn os data)))) + (-restore [_ addr] - (with-open [is (io/input-stream (io/file dir (name-fn addr)))] - (read-fn is))))))) + (with-open [is (io/input-stream (io/file dir (addr->filename-fn addr)))] + (read-fn is))) + + (-list-addresses [_] + (mapv #(-> ^File % .getName filename->addr-fn) + (.listFiles (io/file dir)))) + + (-delete [_ addrs-seq] + (doseq [addr addrs-seq] + (.delete (io/file dir (addr->filename-fn addr))))))))) -(deftype StorageAdapter [^me.tonsky.persistent_sorted_set.IStorage istorage - ^Settings settings] +(deftype StorageAdapter [istorage ^Settings settings *buffer] me.tonsky.persistent_sorted_set.IStorage (store [_ ^ANode node] (let [addr (squuid) @@ -72,7 +98,8 @@ :keys keys} (instance? Branch node) (assoc :addresses (.addresses ^Branch node)))] - (-store istorage addr data) + ; (-store istorage addr data) + (vswap! *buffer conj! [addr data]) addr)) (restore [_ addr] (let [{:keys [level keys addresses]} (-restore istorage addr) @@ -97,7 +124,8 @@ (validate-storage (:avet db) istorage) (let [eavt ^PersistentSortedSet (:eavt db) settings (.-_settings eavt) - adapter (StorageAdapter. istorage settings) + *buffer (volatile! (transient [])) + adapter (StorageAdapter. istorage settings *buffer) meta (merge {:schema (:schema db) :max-eid (:max-eid db) @@ -106,7 +134,8 @@ :aevt (set/store (:aevt db) adapter) :avet (set/store (:avet db) adapter)} (@#'set/settings->map settings))] - (-store istorage root-addr meta)))) + (vswap! *buffer conj! [root-addr meta]) + (-store istorage (persistent! @*buffer))))) (defn restore ([istorage] @@ -114,8 +143,8 @@ ([istorage opts] (let [root (-restore istorage root-addr) {:keys [schema eavt aevt avet max-eid max-tx]} root - settings (@#'set/map->settings root) - adapter (StorageAdapter. istorage settings)] + settings (@#'set/map->settings root) + adapter (StorageAdapter. istorage settings nil)] (db/restore-db {:schema schema :eavt (set/restore-by db/cmp-datoms-eavt eavt adapter root) @@ -123,3 +152,22 @@ :avet (set/restore-by db/cmp-datoms-avet avet adapter root) :max-eid max-eid :max-tx max-tx})))) + +(defn addresses + "Returns all addresses in use by current db. Anything that is not in + the return set is safe to be deleted" + [db] + (let [set (HashSet.) + visit-fn #(.add set %)] + (.add set root-addr) + (.walkAddresses ^PersistentSortedSet (:eavt db) visit-fn) + (.walkAddresses ^PersistentSortedSet (:aevt db) visit-fn) + (.walkAddresses ^PersistentSortedSet (:avet db) visit-fn) + set)) + +(defn collect-garbage [istorage db] + (let [used (addresses db) + unused (->> (-list-addresses istorage) + (remove #(contains? used %)) + (vec))] + (-delete istorage unused))) diff --git a/test_storage/datascript/test_storage.clj b/test_storage/datascript/test_storage.clj index b3c72292..343c56ac 100644 --- a/test_storage/datascript/test_storage.clj +++ b/test_storage/datascript/test_storage.clj @@ -89,6 +89,10 @@ (d/store streaming-transit-msgpack-storage db) ;; 6.3 sec (def db' (d/restore streaming-edn-storage)) + + (count (d/addresses db')) + (count (d/-list-addresses streaming-edn-storage)) + (d/collect-garbage streaming-edn-storage db') (first (:eavt db'))