diff --git a/src/datascript/conn.cljc b/src/datascript/conn.cljc index bb1c3e30..d45a7472 100644 --- a/src/datascript/conn.cljc +++ b/src/datascript/conn.cljc @@ -96,10 +96,11 @@ (transact! conn tx-data nil)) ([conn tx-data tx-meta] {:pre [(conn? conn)]} - (let [report (-transact! conn tx-data tx-meta)] - (doseq [[_ callback] (some-> (:listeners (meta conn)) (deref))] - (callback report)) - report))) + (locking conn + (let [report (-transact! conn tx-data tx-meta)] + (doseq [[_ callback] (some-> (:listeners (meta conn)) (deref))] + (callback report)) + report)))) (defn reset-conn! ([conn db] diff --git a/src/datascript/sync/client.cljc b/src/datascript/sync/client.cljc index 9b895fc3..9369b07b 100644 --- a/src/datascript/sync/client.cljc +++ b/src/datascript/sync/client.cljc @@ -5,7 +5,7 @@ [datascript.serialize :as serialize])) (defn client-id [] - (long (* (rand) 9007199254740991))) + (long (* (rand) 0x1FFFFFFFFFFFFF))) (def *last-tx-id (atom 0)) diff --git a/src/datascript/sync/server.cljc b/src/datascript/sync/server.cljc index 14c2bab7..7fbfee87 100644 --- a/src/datascript/sync/server.cljc +++ b/src/datascript/sync/server.cljc @@ -8,15 +8,20 @@ (get @(:clients (meta conn)) channel)) (defn on-tx [conn report] - ;; TODO filter what to send where - (let [msg {:message :transacted - :tx-data (db/tx-from-datoms (:tx-data report)) - :tx-id (:tx-id (:tx-meta report)) - :server-idx (:db/current-tx (:tempids report))}] - (doseq [[channel {:keys [status send-fn]}] @(:clients (meta conn)) - ; :let [_ (prn "broadcasting to" channel status)] - :when (= :active status)] - (send-fn channel msg)))) + (let [*clients (:clients (meta conn)) + msg {:message :transacted + :tx-data (db/tx-from-datoms (:tx-data report)) + :tx-id (:tx-id (:tx-meta report)) + :server-idx (:db/current-tx (:tempids report))}] + (doseq [[channel {:keys [status send-fn pending]}] @*clients] + (if (= :active status) + (do + (when pending + (doseq [msg pending] + (send-fn channel msg)) + (swap! *clients update client dissoc :pending)) + (send-fn channel msg)) + (swap! *clients update client update :pending (fnil conj []) msg))))) (defn client-connected [conn channel send-fn] (let [*clients (:clients (meta conn)) @@ -27,18 +32,26 @@ (conn/listen! conn :sync #(on-tx conn %))) nil)) +(defn drop-before [txs server-idx] + (vec + (drop-while #(<= (:server-idx %) server-idx) txs))) + (defn client-message [conn channel body] (case (:message body) :catching-up (let [{:keys [patterns server-idx]} body ;; TODO delta from server-idx {:keys [send-fn]} (client conn channel) - db @conn] + db @conn + server-idx (:max-tx db)] (send-fn channel {:message :catched-up :snapshot (serialize/serializable db) ;; TODO patterns - :server-idx (:max-tx db)}) - ;; TODO race - external txs between (:max-tx db) and after :status :active - (swap! (:clients (meta conn)) update channel assoc :status :active)) + :server-idx server-idx}) + (swap! (:clients (meta conn)) update channel + (fn [client] + (-> client + (assoc :status :active) + (update :pending drop-before server-idx))))) :transacting (doseq [{:keys [tx-data tx-id]} (:txs body)] diff --git a/test/datascript/test/sync.clj b/test/datascript/test/sync.clj index f9b7c6f5..0c405c91 100644 --- a/test/datascript/test/sync.clj +++ b/test/datascript/test/sync.clj @@ -22,14 +22,10 @@ (throw (ex-info "Timeout" {})))))) (def freeze - ; identity - tdc/transit-write-str - ) + tdc/transit-write-str) (def thaw - ; identity - tdc/transit-read-str - ) + tdc/transit-read-str) (defn setup [] (let [server (d/create-conn) @@ -80,7 +76,7 @@ (tdc/all-datoms @c2))) (is (= #{[1 :name "Ivan"]} (tdc/all-datoms @server))))) - + ; (t/test-ns *ns*) ; (t/run-test-var #'test-conn)