Skip to content

Commit

Permalink
Implement Conn as single atom with extend-clj
Browse files Browse the repository at this point in the history
  • Loading branch information
tonsky committed Nov 3, 2023
1 parent 16af9ad commit bfc673a
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 55 deletions.
1 change: 1 addition & 0 deletions deps.edn
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
{
:deps {
persistent-sorted-set/persistent-sorted-set {:mvn/version "0.3.0"}
io.github.tonsky/extend-clj {:mvn/version "0.1.0"}
}

:aliases {
Expand Down
7 changes: 4 additions & 3 deletions project.clj
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@
:url "https://github.com/tonsky/datascript"

:dependencies [
[org.clojure/clojure "1.10.2" :scope "provided"]
[org.clojure/clojurescript "1.10.844" :scope "provided"]
[persistent-sorted-set "0.3.0"]
[org.clojure/clojure "1.10.2" :scope "provided"]
[org.clojure/clojurescript "1.10.844" :scope "provided"]
[persistent-sorted-set "0.3.0"]
[io.github.tonsky/extend-clj "0.1.0"]
]

:plugins [
Expand Down
105 changes: 59 additions & 46 deletions src/datascript/conn.cljc
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,24 @@
(:require
[datascript.db :as db #?@(:cljs [:refer [DB FilteredDB]])]
[datascript.storage :as storage]
[extend-clj.core :as extend]
[me.tonsky.persistent-sorted-set :as set])
#?(:clj
(:import
[datascript.db DB FilteredDB])))

(extend/deftype-atom Conn [atom]
(deref-impl [this]
(:db @atom))
(compare-and-set-impl [this oldv newv]
(compare-and-set!
atom
(assoc @atom :db oldv)
(assoc @atom :db newv))))

(defn- make-conn [opts]
(->Conn (atom opts)))

(defn with
([db tx-data] (with db tx-data nil))
([db tx-data tx-meta]
Expand All @@ -25,19 +38,20 @@
(and
#?(:clj (instance? clojure.lang.IDeref conn)
:cljs (satisfies? cljs.core/IDeref conn))
(db/db? @conn)))
(if-some [db @conn]
(db/db? db)
true)))

(defn conn-from-db [db]
{:pre [(db/db? db)]}
(let [storage (storage/storage db)
meta (cond-> {:listeners (atom {})
:clients (atom {})}
storage
(merge {:tx-tail (atom [])
:db-last-stored (atom db)}))]
(when storage
(storage/store db))
(atom db :meta meta)))
(if-some [storage (storage/storage db)]
(do
(storage/store db)
(make-conn
{:db db
:tx-tail []
:db-last-stored db}))
(make-conn {:db db})))

(defn conn-from-datoms
([datoms]
Expand All @@ -61,32 +75,33 @@
(restore-conn storage {}))
([storage opts]
(when-some [[db tail] (storage/restore-impl storage opts)]
(atom (storage/db-with-tail db tail)
:meta {:listeners (atom {})
:tx-tail (atom tail)
:db-last-stored (atom db)})))))
(make-conn
{:db (storage/db-with-tail db tail)
:tx-tail tail
:db-last-stored db})))))

(defn ^:no-doc -transact! [conn tx-data tx-meta]
{:pre [(conn? conn)]}
(let [*report (atom nil)]
(let [*report (volatile! nil)]
(swap! conn
(fn [db]
(let [r (with db tx-data tx-meta)]
(reset! *report r)
(vreset! *report r)
(:db-after r))))
#?(:clj
(when-some [storage (storage/storage @conn)]
(let [{db :db-after
datoms :tx-data} @*report
settings (set/settings (:eavt db))
*tx-tail (:tx-tail (meta conn))
tx-tail' (swap! *tx-tail conj datoms)]
*atom (:atom conn)
tx-tail' (:tx-tail (swap! *atom update :tx-tail conj datoms))]
(if (> (transduce (map count) + 0 tx-tail') (:branching-factor settings))
;; overflow tail
(do
(storage/store-impl! db (storage/storage-adapter db) false)
(reset! *tx-tail [])
(reset! (:db-last-stored (meta conn)) db))
(swap! *atom assoc
:tx-tail []
:db-last-stored db))
;; just update tail
(storage/store-tail db tx-tail')))))
@*report))
Expand All @@ -96,10 +111,11 @@
(transact! conn tx-data nil))
([conn tx-data tx-meta]
{:pre [(conn? conn)]}
(let [report (-transact! conn tx-data tx-meta)]
(doseq [[_ callback] (some-> (:listeners (meta conn)) (deref))]
(callback report))
report)))
(locking conn
(let [report (-transact! conn tx-data tx-meta)]
(doseq [[_ callback] (:listeners @(:atom conn))]
(callback report))
report))))

(defn reset-conn!
([conn db]
Expand All @@ -112,16 +128,19 @@
{:db-before db-before
:db-after db
:tx-data (concat
(map #(assoc % :added false) (db/-datoms db-before :eavt nil nil nil nil))
(when db-before
(map #(assoc % :added false) (db/-datoms db-before :eavt nil nil nil nil)))
(db/-datoms db :eavt nil nil nil nil))
:tx-meta tx-meta})]
#?(:clj
(when-some [storage (storage/storage db-before)]
(storage/store db)
(reset! (:tx-tail (meta conn)) [])
(reset! (:db-last-stored (meta conn)) db)))
(reset! conn db)
(doseq [[_ callback] (some-> (:listeners (meta conn)) (deref))]
(if-some [storage (storage/storage db-before)]
(do
(storage/store db)
(swap! (:atom conn) assoc
:db db
:tx-tail []
:db-last-stored db))
(reset! conn db))
(doseq [[_ callback] (:listeners @(:atom conn))]
(callback report))
db)))

Expand All @@ -131,25 +150,19 @@
#?(:clj
(when-some [storage (storage/storage @conn)]
(storage/store-impl! db (storage/storage-adapter db) true)
(reset! (:tx-tail (meta conn)) [])
(reset! (:db-last-stored (meta conn)) db)))
(swap! (:atom conn) assoc
:tx-tail []
:db-last-stored db)))
db))

(defn- atom? [a]
#?(:cljs (instance? Atom a)
:clj (instance? clojure.lang.IAtom a)))

(defn listen!
([conn callback]
(listen! conn (rand) callback))
([conn key callback]
{:pre [(conn? conn)
(atom? (:listeners (meta conn)))]}
(swap! (:listeners (meta conn)) assoc key callback)
{:pre [(conn? conn)]}
(swap! (:atom conn) update :listeners assoc key callback)
key))

(defn unlisten!
[conn key]
{:pre [(conn? conn)
(atom? (:listeners (meta conn)))]}
(swap! (:listeners (meta conn)) dissoc key))
(defn unlisten! [conn key]
{:pre [(conn? conn)]}
(swap! (:atom conn) update :listeners dissoc key))
12 changes: 6 additions & 6 deletions test/datascript/test/storage.clj
Original file line number Diff line number Diff line change
Expand Up @@ -242,15 +242,15 @@
(d/transact! conn [[:db/add 2 :name "Oleg"]])
(is (= 7 (count @(:*writes storage))))
(is (= @#'storage/tail-addr (last @(:*writes storage))))
(is (= 2 (count @(:tx-tail (meta conn)))))
(is (= 2 (count (apply concat @(:tx-tail (meta conn))))))
(is (= 2 (count (:tx-tail @(:atom conn)))))
(is (= 2 (count (apply concat (:tx-tail @(:atom conn))))))

;; bigger tx, still writing tail
(d/transact! conn (mapv #(vector :db/add % :name (str %)) (range 3 33)))
(is (= 8 (count @(:*writes storage))))
(is (= @#'storage/tail-addr (last @(:*writes storage))))
(is (= 3 (count @(:tx-tail (meta conn)))))
(is (= 32 (count (apply concat @(:tx-tail (meta conn))))))
(is (= 3 (count (:tx-tail @(:atom conn)))))
(is (= 32 (count (apply concat (:tx-tail @(:atom conn))))))

;; tail overflows, flush db
(d/transact! conn [[:db/add 33 :name "Petr"]])
Expand Down Expand Up @@ -285,11 +285,11 @@

;; gc on conn
(is (> (count (storage/-list-addresses storage))
(count (d/addresses @(:db-last-stored (meta conn''))))))
(count (d/addresses (:db-last-stored @(:atom conn''))))))

(d/collect-garbage storage)
(is (= (count (storage/-list-addresses storage))
(count (d/addresses @(:db-last-stored (meta conn''))))))
(count (d/addresses (:db-last-stored @(:atom conn''))))))

(let [conn''' (d/restore-conn storage)]
(is (= @conn'' @conn''')))))))
Expand Down

0 comments on commit bfc673a

Please sign in to comment.