diff --git a/src/datascript/sync/client.cljc b/src/datascript/sync/client.cljc index 9369b07b..f01981ca 100644 --- a/src/datascript/sync/client.cljc +++ b/src/datascript/sync/client.cljc @@ -2,7 +2,8 @@ (:require [datascript.conn :as conn] [datascript.db :as db] - [datascript.serialize :as serialize])) + [datascript.serialize :as serialize] + [datascript.util :as util])) (defn client-id [] (long (* (rand) 0x1FFFFFFFFFFFFF))) @@ -15,24 +16,25 @@ (defn on-tx [conn report] (when-not (:server? (:tx-meta report)) - (let [{:keys [client-id send-fn server-idx pending]} (meta conn) + (let [{:keys [client-id send-fn server-idx connected?]} @(:atom conn) tx {:tx-data (db/tx-from-datoms (:tx-data report)) :tx-id (new-tx-id client-id)}] - (send-fn - {:message :transacting - :server-idx @server-idx - :txs [tx]}) - (swap! pending conj tx)))) + (when connected? + (send-fn + {:message :transacting + :server-idx server-idx + :txs [tx]})) + (swap! (:atom conn) update :pending conj tx)))) (defn create-conn [patterns send-fn] - (let [res (atom nil :meta - {:client-id (client-id) - :server-db (atom nil) - :pending (atom #?(:clj clojure.lang.PersistentQueue/EMPTY - :cljs cljs.core.PersistentQueue.EMPTY)) - :server-idx (atom nil) - :send-fn send-fn - :listeners (atom {})})] + (let [res (@#'conn/make-conn + {:db nil + :client-id (client-id) + :server-db nil + :pending util/empty-queue + :server-idx nil + :connected? true + :send-fn send-fn})] (send-fn {:message :catching-up}) res)) @@ -41,33 +43,40 @@ :catched-up (let [{:keys [snapshot server-idx]} body db (serialize/from-serializable snapshot)] - (reset! conn db) - (reset! (:server-db (meta conn)) db) - (reset! (:server-idx (meta conn)) server-idx) - (conn/listen! conn :sync #(on-tx conn %))) + (conn/reset-conn! conn db {:server? true}) + (swap! (:atom conn) + (fn [atom] + (-> atom + (assoc :server-db db) + (assoc :server-idx server-idx) + (update :listeners assoc :sync #(on-tx conn %)))))) :transacted (let [{:keys [tx-data tx-id server-idx]} body - {*server-db :server-db - *server-idx :server-idx - *pending :pending - *listeners :listeners} (meta conn) - report (conn/with @*server-db tx-data {:server? true}) + {server-db :server-db + pending :pending + listeners :listeners} @(:atom conn) + report (conn/with server-db tx-data {:server? true}) server-db' (:db-after report)] - (reset! *server-db server-db') - (reset! *server-idx server-idx) - (if (= tx-id (:tx-id (peek @*pending))) - (swap! *pending pop) + (swap! (:atom conn) assoc + :server-db server-db' + :server-idx server-idx) + (if (= tx-id (:tx-id (peek pending))) + (swap! (:atom conn) update :pending pop) (do - (reset! conn (reduce conn/db-with server-db' @*pending)) - (doseq [[_ callback] @*listeners] + (reset! conn (reduce conn/db-with server-db' pending)) + (doseq [[_ callback] listeners] (callback report)))))) nil) (defn server-disconnected [conn] - ;; TODO impl me - ) + (swap! (:atom conn) + #(-> % + (assoc :connected? false) + (update :listeners dissoc :sync)))) (defn server-connected [conn] - ;; TODO impl me - ) + (swap! (:atom conn) assoc :connected? true) + (let [{:keys [send-fn server-idx]} @(:atom conn)] + (send-fn {:message :catching-up + :server-idx server-idx}))) diff --git a/src/datascript/sync/server.cljc b/src/datascript/sync/server.cljc index 7fbfee87..7319c429 100644 --- a/src/datascript/sync/server.cljc +++ b/src/datascript/sync/server.cljc @@ -5,29 +5,29 @@ [datascript.serialize :as serialize])) (defn- client [conn channel] - (get @(:clients (meta conn)) channel)) + (-> conn :atom deref :clients (get channel))) (defn on-tx [conn report] - (let [*clients (:clients (meta conn)) + (let [clients (:clients @(:atom conn)) msg {:message :transacted :tx-data (db/tx-from-datoms (:tx-data report)) :tx-id (:tx-id (:tx-meta report)) :server-idx (:db/current-tx (:tempids report))}] - (doseq [[channel {:keys [status send-fn pending]}] @*clients] + (doseq [[channel {:keys [status send-fn pending]}] clients] (if (= :active status) (do (when pending (doseq [msg pending] (send-fn channel msg)) - (swap! *clients update client dissoc :pending)) + (swap! (:atom conn) update :clients update client dissoc :pending)) (send-fn channel msg)) - (swap! *clients update client update :pending (fnil conj []) msg))))) + (swap! (:atom conn) update :clients update client update :pending (fnil conj []) msg))))) (defn client-connected [conn channel send-fn] - (let [*clients (:clients (meta conn)) - clients' (swap! *clients assoc channel - {:status :connected - :send-fn send-fn})] + (let [clients' (:clients + (swap! (:atom conn) update :clients assoc channel + {:status :connected + :send-fn send-fn}))] (when (= 1 (count clients')) (conn/listen! conn :sync #(on-tx conn %))) nil)) @@ -47,7 +47,7 @@ {:message :catched-up :snapshot (serialize/serializable db) ;; TODO patterns :server-idx server-idx}) - (swap! (:clients (meta conn)) update channel + (swap! (:atom conn) update :clients update channel (fn [client] (-> client (assoc :status :active) @@ -60,8 +60,7 @@ nil) (defn client-disconnected [conn channel] - (let [*clients (:clients (meta conn)) - clients' (swap! *clients dissoc channel)] + (let [clients' (:clients (swap! (:atom conn) update :clients dissoc channel))] (when (= 0 (count clients')) (conn/unlisten! conn :sync)) nil)) diff --git a/src/datascript/util.cljc b/src/datascript/util.cljc index 8cf19e0d..d3bc00a5 100644 --- a/src/datascript/util.cljc +++ b/src/datascript/util.cljc @@ -10,6 +10,10 @@ `(when *debug* (println ~@body))) +(def empty-queue + #?(:clj clojure.lang.PersistentQueue/EMPTY + :cljs cljs.core.PersistentQueue.EMPTY)) + (defn- rand-bits [pow] (rand-int (bit-shift-left 1 pow))) diff --git a/test/datascript/test/sync.clj b/test/datascript/test/sync.clj index 0c405c91..1a3f1196 100644 --- a/test/datascript/test/sync.clj +++ b/test/datascript/test/sync.clj @@ -61,15 +61,19 @@ :c2 c2})) (defn wait-all [{:keys [server c1 c2]}] - (wait-on (:pending (meta c1)) empty?) - (wait-on (:pending (meta c2)) empty?) - (wait-on (:server-idx (meta c1)) #(= % (:max-tx @server))) - (wait-on (:server-idx (meta c2)) #(= % (:max-tx @server)))) + (wait-on (:atom c1) + #(and + (empty? (:pending %)) + (= (:server-idx %) (:max-tx @server)))) + (wait-on (:atom c2) + #(and + (empty? (:pending %)) + (= (:server-idx %) (:max-tx @server))))) (deftest test-sync (let [{:keys [server c1 c2] :as setup} (setup)] (d/transact! c1 [[:db/add 1 :name "Ivan"]]) - (wait-all setup) + (wait-all setup) (is (= #{[1 :name "Ivan"]} (tdc/all-datoms @c1))) (is (= #{[1 :name "Ivan"]} @@ -78,6 +82,6 @@ (tdc/all-datoms @server))))) -; (t/test-ns *ns*) -; (t/run-test-var #'test-conn) - +(comment + (t/test-ns *ns*) + (t/run-test-var #'test-conn))