diff --git a/src/datascript/core_storage.clj b/src/datascript/core_storage.clj index f052b81e..452aa66c 100644 --- a/src/datascript/core_storage.clj +++ b/src/datascript/core_storage.clj @@ -1,4 +1,3 @@ - (in-ns 'datascript.core) (require @@ -13,6 +12,7 @@ '[java.io BufferedOutputStream File FileOutputStream OutputStream PushbackReader] '[me.tonsky.persistent_sorted_set ANode Branch Leaf PersistentSortedSet RefType Settings]) +;; datascriptstorageroot23718 (def ^:private root-addr #uuid "d5695966-036d-6740-8541-d80034219c28") @@ -36,7 +36,123 @@ ; (defprotocol ICache ; (-compute-if-absent [_ key compute-fn])) -(defn output-stream ^OutputStream [^File file] +(def ^:private ^:dynamic *store-buffer*) + +(defrecord StorageAdapter [storage ^Settings settings] + me.tonsky.persistent_sorted_set.IStorage + (store [_ ^ANode node] + (let [addr (squuid) + keys (map (fn [^Datom d] + [(.-e d) (.-a d) (.-v d) (.-tx d)]) + (.keys node)) + data (cond-> {:level (.level node) + :keys keys} + (instance? Branch node) + (assoc :addresses (.addresses ^Branch node)))] + (vswap! *store-buffer* conj! [addr data]) + addr)) + (restore [_ addr] + (let [{:keys [level keys addresses]} (-restore storage addr) + keys' ^List (map (fn [[e a v tx]] (db/datom e a v tx)) keys)] + (if addresses + (Branch. (int level) keys' ^List addresses settings) + (Leaf. keys' settings))))) + +(defn- make-adapter [storage opts] + (let [settings (@#'set/map->settings opts)] + (->StorageAdapter storage settings))) + +(defn- adapter ^StorageAdapter [db] + (.-_storage ^PersistentSortedSet (:eavt db))) + +(defn storage + "Returns IStorage used by current DB instance" + [db] + (when-some [adapter (adapter db)] + (:storage adapter))) + +(defn- store-impl! [db adapter opts] + (binding [*store-buffer* (volatile! (transient []))] + (let [eavt ^PersistentSortedSet (:eavt db) + settings (.-_settings eavt) + meta (merge + {:schema (:schema db) + :max-eid (:max-eid db) + :max-tx (:max-tx db) + :eavt (set/store (:eavt db) adapter) + :aevt (set/store (:aevt db) adapter) + :avet (set/store (:avet db) adapter)} + (@#'set/settings->map settings))] + (vswap! *store-buffer* conj! [root-addr meta]) + (-store (:storage adapter) (persistent! @*store-buffer*)) + db))) + +(defn store! + ([db] + (if-some [adapter (adapter db)] + (store-impl! db adapter {}) + (throw (ex-info "Database has no associated storage" {})))) + ([db storage] + (if-some [adapter (adapter db)] + (let [current-storage (:storage adapter)] + (if (identical? current-storage storage) + (store-impl! db adapter {}) + (throw (ex-info "Database is already stored with another IStorage" {:storage current-storage})))) + (let [settings (.-_settings ^PersistentSortedSet (:eavt db)) + adapter (StorageAdapter. storage settings)] + (store-impl! db adapter {}))))) + +(defn restore + ([storage] + (restore storage {})) + ([storage opts] + (let [root (-restore storage root-addr) + {:keys [schema eavt aevt avet max-eid max-tx]} root + opts (merge root opts) + adapter (make-adapter storage opts)] + (db/restore-db + {:schema schema + :eavt (set/restore-by db/cmp-datoms-eavt eavt adapter opts) + :aevt (set/restore-by db/cmp-datoms-aevt aevt adapter opts) + :avet (set/restore-by db/cmp-datoms-avet avet adapter opts) + :max-eid max-eid + :max-tx max-tx})))) + +(defn- addresses-impl [db *set] + (let [visit-fn #(vswap! *set conj! %)] + (visit-fn root-addr) + (.walkAddresses ^PersistentSortedSet (:eavt db) visit-fn) + (.walkAddresses ^PersistentSortedSet (:aevt db) visit-fn) + (.walkAddresses ^PersistentSortedSet (:avet db) visit-fn))) + +(defn addresses + "Returns all addresses in use by current db. Anything that is not in + the return set is safe to be deleted" + [& dbs] + (let [*set (volatile! (transient #{}))] + (doseq [db dbs] + (addresses-impl db *set)) + (persistent! @*set))) + +(defn collect-garbage! + "Deletes all keys from storage that are not referenced by any of the provided dbs. + Careful! If you have a lazy-loaded database and do GC on a newer version of it, + old version might stop working. Make sure to always pass all alive references + to DBs you are using" + [& dbs] + (let [used (apply addresses dbs)] + (doseq [db dbs + :let [storage (storage db)] + :when storage + :let [unused (->> (-list-addresses storage) + (remove used) + (vec))]] + (-delete storage unused)))) + +(defn- output-stream + "OutputStream that ignores flushes. Workaround for slow transit-clj JSON writer. + See https://github.com/cognitect/transit-clj/issues/43" + ^OutputStream [^File file] (let [os (FileOutputStream. file)] (proxy [BufferedOutputStream] [os] (flush []) @@ -46,6 +162,18 @@ (proxy-super close)))))) (defn file-storage + "Default implementation that stores data in files in a dir. + + Options are: + + :freeze-fn :: (data) -> String. A serialization function + :thaw-fn :: (String) -> data. A deserialization function + :write-fn :: (OutputStream data) -> void. Implement your own writer to FileOutputStream + :read-fn :: (InputStream) -> Object. Implement your own reader from FileInputStream + :addr->filename-fn :: (UUID) -> String. Construct file name from address + :filename->addr-fn :: (String) -> UUID. Reconstruct address from file name + + All options are optional." ([dir] (file-storage dir {})) ([dir opts] @@ -86,88 +214,3 @@ (-delete [_ addrs-seq] (doseq [addr addrs-seq] (.delete (io/file dir (addr->filename-fn addr))))))))) - -(deftype StorageAdapter [istorage ^Settings settings *buffer] - me.tonsky.persistent_sorted_set.IStorage - (store [_ ^ANode node] - (let [addr (squuid) - keys (map (fn [^Datom d] - [(.-e d) (.-a d) (.-v d) (.-tx d)]) - (.keys node)) - data (cond-> {:level (.level node) - :keys keys} - (instance? Branch node) - (assoc :addresses (.addresses ^Branch node)))] - ; (-store istorage addr data) - (vswap! *buffer conj! [addr data]) - addr)) - (restore [_ addr] - (let [{:keys [level keys addresses]} (-restore istorage addr) - keys' ^List (map (fn [[e a v tx]] (db/datom e a v tx)) keys)] - (if addresses - (Branch. (int level) keys' ^List addresses settings) - (Leaf. keys' settings))))) - -(defn validate-storage [^PersistentSortedSet set istorage] - (when-some [^StorageAdapter adapter (.-_storage set)] - (let [istorage' (.-istorage adapter)] - (when-not (identical? istorage' istorage) - (throw (ex-info "Database is already stored with another IStorage" - {:istorage istorage'})))))) - -(defn store - ([istorage db] - (store istorage db {})) - ([istorage db opts] - (validate-storage (:eavt db) istorage) - (validate-storage (:aevt db) istorage) - (validate-storage (:avet db) istorage) - (let [eavt ^PersistentSortedSet (:eavt db) - settings (.-_settings eavt) - *buffer (volatile! (transient [])) - adapter (StorageAdapter. istorage settings *buffer) - meta (merge - {:schema (:schema db) - :max-eid (:max-eid db) - :max-tx (:max-tx db) - :eavt (set/store (:eavt db) adapter) - :aevt (set/store (:aevt db) adapter) - :avet (set/store (:avet db) adapter)} - (@#'set/settings->map settings))] - (vswap! *buffer conj! [root-addr meta]) - (-store istorage (persistent! @*buffer))))) - -(defn restore - ([istorage] - (restore istorage {})) - ([istorage opts] - (let [root (-restore istorage root-addr) - {:keys [schema eavt aevt avet max-eid max-tx]} root - settings (@#'set/map->settings root) - adapter (StorageAdapter. istorage settings nil)] - (db/restore-db - {:schema schema - :eavt (set/restore-by db/cmp-datoms-eavt eavt adapter root) - :aevt (set/restore-by db/cmp-datoms-aevt aevt adapter root) - :avet (set/restore-by db/cmp-datoms-avet avet adapter root) - :max-eid max-eid - :max-tx max-tx})))) - -(defn addresses - "Returns all addresses in use by current db. Anything that is not in - the return set is safe to be deleted" - [db] - (let [set (HashSet.) - visit-fn #(.add set %)] - (.add set root-addr) - (.walkAddresses ^PersistentSortedSet (:eavt db) visit-fn) - (.walkAddresses ^PersistentSortedSet (:aevt db) visit-fn) - (.walkAddresses ^PersistentSortedSet (:avet db) visit-fn) - set)) - -(defn collect-garbage [istorage db] - (let [used (addresses db) - unused (->> (-list-addresses istorage) - (remove #(contains? used %)) - (vec))] - (-delete istorage unused))) diff --git a/src/datascript/db.cljc b/src/datascript/db.cljc index e27f7010..fc60dbf1 100644 --- a/src/datascript/db.cljc +++ b/src/datascript/db.cljc @@ -964,9 +964,9 @@ (map->DB {:schema schema :rschema (rschema (merge implicit-schema schema)) - :eavt (set/sorted-set-opts (assoc opts :cmp cmp-datoms-eavt)) - :aevt (set/sorted-set-opts (assoc opts :cmp cmp-datoms-aevt)) - :avet (set/sorted-set-opts (assoc opts :cmp cmp-datoms-avet)) + :eavt (set/sorted-set* (assoc opts :cmp cmp-datoms-eavt)) + :aevt (set/sorted-set* (assoc opts :cmp cmp-datoms-aevt)) + :avet (set/sorted-set* (assoc opts :cmp cmp-datoms-avet)) :max-eid e0 :max-tx tx0 :pull-patterns (lru/cache 100) diff --git a/test_storage/datascript/test_storage.clj b/test_storage/datascript/test_storage.clj index 343c56ac..64891d19 100644 --- a/test_storage/datascript/test_storage.clj +++ b/test_storage/datascript/test_storage.clj @@ -80,19 +80,19 @@ (def db (d/from-serializable json {:branching-factor 512})) (count db)) - (d/store streaming-edn-storage (d/empty-db)) + (d/store! (d/empty-db) streaming-edn-storage) - (d/store streaming-edn-storage db) ;; 10 sec - (d/store inmemory-edn-storage db) ;; 10 sec - (d/store streaming-transit-json-storage db) ;; 7.5 sec - (d/store inmemory-transit-json-storage db) ;; 6.4 sec - (d/store streaming-transit-msgpack-storage db) ;; 6.3 sec + (d/store! db streaming-edn-storage) ;; 10 sec + (d/store! db inmemory-edn-storage) ;; 10 sec + (d/store! db streaming-transit-json-storage) ;; 7.5 sec + (d/store! db inmemory-transit-json-storage) ;; 6.4 sec + (d/store! db streaming-transit-msgpack-storage) ;; 6.3 sec (def db' (d/restore streaming-edn-storage)) (count (d/addresses db')) (count (d/-list-addresses streaming-edn-storage)) - (d/collect-garbage streaming-edn-storage db') + (d/collect-garbage! db') (first (:eavt db'))