Skip to content

Commit

Permalink
Sync: initial
Browse files Browse the repository at this point in the history
  • Loading branch information
tonsky committed Nov 21, 2023
1 parent 8cc2d13 commit 71deb40
Show file tree
Hide file tree
Showing 9 changed files with 281 additions and 1 deletion.
1 change: 1 addition & 0 deletions deps.edn
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
cheshire/cheshire {:mvn/version "5.10.0"}
com.cognitect/transit-clj {:mvn/version "1.0.324"}
com.cognitect/transit-cljs {:mvn/version "0.8.269"}
org.clojure/core.async {:mvn/version "1.6.681"}
}
}

Expand Down
51 changes: 51 additions & 0 deletions docs/sync.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
Server assigns id:

- Changes when client reconnects

Client assigns id:

- What to do with client before :catch-up message?
- Where to store send-fn?
- Can track how far each client is. Why?


```
tx :: {:tx-data [...]
:tx-id <any>
:server-idx <long>}
```

# Client connects to a server

```
SND {:message :catching-up
:patterns [<pattern> ...]
:server-idx <long>?}
RCV {:message :catched-up
:snapshot <serializable db>
:server-idx <long>}
or
RCV {:message :catched-up
:txs [<tx> ...]}
```

# Client makes a transaction

```
SND {:message :transacting
:server-idx server-idx
:txs [{:tx-data ...
:tx-id ...} ...]}
```

# Server broadcasts a transaction

```
RCV {:message :transacted
:tx-data ...
:tx-id ...
:server-idx ...} ...]}
```
3 changes: 2 additions & 1 deletion project.clj
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,8 @@
:test {:dependencies [[metosin/jsonista "0.3.3"]
[cheshire "5.10.0"]
[com.cognitect/transit-clj "1.0.324"]
[com.cognitect/transit-cljs "0.8.269"]]}
[com.cognitect/transit-cljs "0.8.269"]
[org.clojure/core.async "1.6.681"]]}
:bench {:dependencies [[criterium "0.4.6"]
[metosin/jsonista "0.3.3"]
[com.clojure-goes-fast/clj-async-profiler "0.5.1"]]}
Expand Down
4 changes: 4 additions & 0 deletions src/datascript/db.cljc
Original file line number Diff line number Diff line change
Expand Up @@ -1779,3 +1779,7 @@
:else
(raise "Bad entity type at " entity ", expected map or vector"
{:error :transact/syntax, :tx-data entity})))))

(defn tx-from-datoms [datoms]
(mapv #(vector (if (:added %) :db/add :db/retract) (:e %) (:a %) (:v %)) datoms))

73 changes: 73 additions & 0 deletions src/datascript/sync/client.cljc
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
(ns datascript.sync.client
(:require
[datascript.conn :as conn]
[datascript.db :as db]
[datascript.serialize :as serialize]))

(defn client-id []
(long (* (rand) 9007199254740991)))

(def *last-tx-id
(atom 0))

(defn new-tx-id [client-id]
[client-id (swap! *last-tx-id inc)])

(defn on-tx [conn report]
(when-not (:server? (:tx-meta report))
(let [{:keys [client-id send-fn server-idx pending]} (meta conn)
tx {:tx-data (db/tx-from-datoms (:tx-data report))
:tx-id (new-tx-id client-id)}]
(send-fn
{:message :transacting
:server-idx @server-idx
:txs [tx]})
(swap! pending conj tx))))

(defn create-conn [patterns send-fn]
(let [res (atom nil :meta
{:client-id (client-id)
:server-db (atom nil)
:pending (atom #?(:clj clojure.lang.PersistentQueue/EMPTY
:cljs cljs.core.PersistentQueue.EMPTY))
:server-idx (atom nil)
:send-fn send-fn
:listeners (atom {})})]
(send-fn {:message :catching-up})
res))

(defn server-message [conn body]
(case (:message body)
:catched-up
(let [{:keys [snapshot server-idx]} body
db (serialize/from-serializable snapshot)]
(reset! conn db)
(reset! (:server-db (meta conn)) db)
(reset! (:server-idx (meta conn)) server-idx)
(conn/listen! conn :sync #(on-tx conn %)))

:transacted
(let [{:keys [tx-data tx-id server-idx]} body
{*server-db :server-db
*server-idx :server-idx
*pending :pending
*listeners :listeners} (meta conn)
report (conn/with @*server-db tx-data {:server? true})
server-db' (:db-after report)]
(reset! *server-db server-db')
(reset! *server-idx server-idx)
(if (= tx-id (:tx-id (peek @*pending)))
(swap! *pending pop)
(do
(reset! conn (reduce conn/db-with server-db' @*pending))
(doseq [[_ callback] @*listeners]
(callback report))))))
nil)

(defn server-disconnected [conn]
;; TODO impl me
)

(defn server-connected [conn]
;; TODO impl me
)
54 changes: 54 additions & 0 deletions src/datascript/sync/server.cljc
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
(ns datascript.sync.server
(:require
[datascript.conn :as conn]
[datascript.db :as db]
[datascript.serialize :as serialize]))

(defn- client [conn channel]
(get @(:clients (meta conn)) channel))

(defn on-tx [conn report]
;; TODO filter what to send where
(let [msg {:message :transacted
:tx-data (db/tx-from-datoms (:tx-data report))
:tx-id (:tx-id (:tx-meta report))
:server-idx (:db/current-tx (:tempids report))}]
(doseq [[channel {:keys [status send-fn]}] @(:clients (meta conn))
; :let [_ (prn "broadcasting to" channel status)]
:when (= :active status)]
(send-fn channel msg))))

(defn client-connected [conn channel send-fn]
(let [*clients (:clients (meta conn))
clients' (swap! *clients assoc channel
{:status :connected
:send-fn send-fn})]
(when (= 1 (count clients'))
(conn/listen! conn :sync #(on-tx conn %)))
nil))

(defn client-message [conn channel body]
(case (:message body)
:catching-up
(let [{:keys [patterns server-idx]} body ;; TODO delta from server-idx
{:keys [send-fn]} (client conn channel)
db @conn]
(send-fn channel
{:message :catched-up
:snapshot (serialize/serializable db) ;; TODO patterns
:server-idx (:max-tx db)})
;; TODO race - external txs between (:max-tx db) and after :status :active
(swap! (:clients (meta conn)) update channel assoc :status :active))

:transacting
(doseq [{:keys [tx-data tx-id]} (:txs body)]
;; TODO handle exception here
(conn/transact! conn tx-data {:tx-id tx-id})))
nil)

(defn client-disconnected [conn channel]
(let [*clients (:clients (meta conn))
clients' (swap! *clients dissoc channel)]
(when (= 0 (count clients'))
(conn/unlisten! conn :sync))
nil))
1 change: 1 addition & 0 deletions test/datascript/test.cljc
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
datascript.test.query-v3
datascript.test.serialize
#?(:clj datascript.test.storage)
#?(:clj datascript.test.sync)
datascript.test.transact
datascript.test.tuples
datascript.test.validation
Expand Down
8 changes: 8 additions & 0 deletions test/datascript/test/core.cljc
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,14 @@
#?(:clj (transit-read (.getBytes ^String s "UTF-8") :json)
:cljs (transit-read s :json)))

#?(:clj
(def lock (Object.)))

(defn log [& args]
#?(:clj (locking lock
(apply println args))
:cljs (apply println args)))

;; Core tests

(deftest test-protocols
Expand Down
87 changes: 87 additions & 0 deletions test/datascript/test/sync.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
(ns datascript.test.sync
(:require
[clojure.core.async :as async :refer [>! <! go go-loop]]
[clojure.edn :as edn]
[clojure.test :as t :refer [is are deftest testing]]
[datascript.core :as d]
[datascript.sync.server :as server]
[datascript.sync.client :as client]
[datascript.test.core :as tdc]))

(defn wait-on [*atom cond]
(let [*p (promise)]
(add-watch *atom :wait
(fn [_ _ _ new]
(when (cond new)
(deliver *p true))))
(when (cond @*atom)
(deliver *p true))
(let [p (deref *p 100 :timeout)]
(remove-watch *atom :wait)
(when (= :timeout p)
(throw (ex-info "Timeout" {}))))))

(def freeze
; identity
tdc/transit-write-str
)

(def thaw
; identity
tdc/transit-read-str
)

(defn setup []
(let [server (d/create-conn)
ch (async/chan 10)
ch1 (async/chan 10)
ch2 (async/chan 10)
c1 (client/create-conn nil #(go (>! ch [:c1 (freeze %)])))
c2 (client/create-conn nil #(go (>! ch [:c2 (freeze %)])))]
(server/client-connected server :c1 (fn [_ msg] (go (>! ch1 (freeze msg)))))
(server/client-connected server :c2 (fn [_ msg] (go (>! ch2 (freeze msg)))))
(go-loop []
(when-some [msg (<! ch1)]
(let [msg (thaw msg)]
(tdc/log "C1 RCV" msg)
(client/server-message c1 msg)
(recur))))
(go-loop []
(when-some [msg (<! ch2)]
(let [msg (thaw msg)]
(tdc/log "C2 RCV" msg)
(client/server-message c2 msg)
(recur))))
(go-loop []
(when-some [[id msg] (<! ch)]
(let [msg (thaw msg)]
(tdc/log "SRV RCV" id msg)
(server/client-message server id msg)
(recur))))
(wait-on c1 some?)
(wait-on c2 some?)
{:server server
:c1 c1
:c2 c2}))

(defn wait-all [{:keys [server c1 c2]}]
(wait-on (:pending (meta c1)) empty?)
(wait-on (:pending (meta c2)) empty?)
(wait-on (:server-idx (meta c1)) #(= % (:max-tx @server)))
(wait-on (:server-idx (meta c2)) #(= % (:max-tx @server))))

(deftest test-sync
(let [{:keys [server c1 c2] :as setup} (setup)]
(d/transact! c1 [[:db/add 1 :name "Ivan"]])
(wait-all setup)
(is (= #{[1 :name "Ivan"]}
(tdc/all-datoms @c1)))
(is (= #{[1 :name "Ivan"]}
(tdc/all-datoms @c2)))
(is (= #{[1 :name "Ivan"]}
(tdc/all-datoms @server)))))


; (t/test-ns *ns*)
; (t/run-test-var #'test-conn)

0 comments on commit 71deb40

Please sign in to comment.