-
-
Notifications
You must be signed in to change notification settings - Fork 311
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
9 changed files
with
281 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,51 @@ | ||
Server assigns id: | ||
|
||
- Changes when client reconnects | ||
|
||
Client assigns id: | ||
|
||
- What to do with client before :catch-up message? | ||
- Where to store send-fn? | ||
- Can track how far each client is. Why? | ||
|
||
|
||
``` | ||
tx :: {:tx-data [...] | ||
:tx-id <any> | ||
:server-idx <long>} | ||
``` | ||
|
||
# Client connects to a server | ||
|
||
``` | ||
SND {:message :catching-up | ||
:patterns [<pattern> ...] | ||
:server-idx <long>?} | ||
RCV {:message :catched-up | ||
:snapshot <serializable db> | ||
:server-idx <long>} | ||
or | ||
RCV {:message :catched-up | ||
:txs [<tx> ...]} | ||
``` | ||
|
||
# Client makes a transaction | ||
|
||
``` | ||
SND {:message :transacting | ||
:server-idx server-idx | ||
:txs [{:tx-data ... | ||
:tx-id ...} ...]} | ||
``` | ||
|
||
# Server broadcasts a transaction | ||
|
||
``` | ||
RCV {:message :transacted | ||
:tx-data ... | ||
:tx-id ... | ||
:server-idx ...} ...]} | ||
``` |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,73 @@ | ||
(ns datascript.sync.client | ||
(:require | ||
[datascript.conn :as conn] | ||
[datascript.db :as db] | ||
[datascript.serialize :as serialize])) | ||
|
||
(defn client-id [] | ||
(long (* (rand) 9007199254740991))) | ||
|
||
(def *last-tx-id | ||
(atom 0)) | ||
|
||
(defn new-tx-id [client-id] | ||
[client-id (swap! *last-tx-id inc)]) | ||
|
||
(defn on-tx [conn report] | ||
(when-not (:server? (:tx-meta report)) | ||
(let [{:keys [client-id send-fn server-idx pending]} (meta conn) | ||
tx {:tx-data (db/tx-from-datoms (:tx-data report)) | ||
:tx-id (new-tx-id client-id)}] | ||
(send-fn | ||
{:message :transacting | ||
:server-idx @server-idx | ||
:txs [tx]}) | ||
(swap! pending conj tx)))) | ||
|
||
(defn create-conn [patterns send-fn] | ||
(let [res (atom nil :meta | ||
{:client-id (client-id) | ||
:server-db (atom nil) | ||
:pending (atom #?(:clj clojure.lang.PersistentQueue/EMPTY | ||
:cljs cljs.core.PersistentQueue.EMPTY)) | ||
:server-idx (atom nil) | ||
:send-fn send-fn | ||
:listeners (atom {})})] | ||
(send-fn {:message :catching-up}) | ||
res)) | ||
|
||
(defn server-message [conn body] | ||
(case (:message body) | ||
:catched-up | ||
(let [{:keys [snapshot server-idx]} body | ||
db (serialize/from-serializable snapshot)] | ||
(reset! conn db) | ||
(reset! (:server-db (meta conn)) db) | ||
(reset! (:server-idx (meta conn)) server-idx) | ||
(conn/listen! conn :sync #(on-tx conn %))) | ||
|
||
:transacted | ||
(let [{:keys [tx-data tx-id server-idx]} body | ||
{*server-db :server-db | ||
*server-idx :server-idx | ||
*pending :pending | ||
*listeners :listeners} (meta conn) | ||
report (conn/with @*server-db tx-data {:server? true}) | ||
server-db' (:db-after report)] | ||
(reset! *server-db server-db') | ||
(reset! *server-idx server-idx) | ||
(if (= tx-id (:tx-id (peek @*pending))) | ||
(swap! *pending pop) | ||
(do | ||
(reset! conn (reduce conn/db-with server-db' @*pending)) | ||
(doseq [[_ callback] @*listeners] | ||
(callback report)))))) | ||
nil) | ||
|
||
(defn server-disconnected [conn] | ||
;; TODO impl me | ||
) | ||
|
||
(defn server-connected [conn] | ||
;; TODO impl me | ||
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,54 @@ | ||
(ns datascript.sync.server | ||
(:require | ||
[datascript.conn :as conn] | ||
[datascript.db :as db] | ||
[datascript.serialize :as serialize])) | ||
|
||
(defn- client [conn channel] | ||
(get @(:clients (meta conn)) channel)) | ||
|
||
(defn on-tx [conn report] | ||
;; TODO filter what to send where | ||
(let [msg {:message :transacted | ||
:tx-data (db/tx-from-datoms (:tx-data report)) | ||
:tx-id (:tx-id (:tx-meta report)) | ||
:server-idx (:db/current-tx (:tempids report))}] | ||
(doseq [[channel {:keys [status send-fn]}] @(:clients (meta conn)) | ||
; :let [_ (prn "broadcasting to" channel status)] | ||
:when (= :active status)] | ||
(send-fn channel msg)))) | ||
|
||
(defn client-connected [conn channel send-fn] | ||
(let [*clients (:clients (meta conn)) | ||
clients' (swap! *clients assoc channel | ||
{:status :connected | ||
:send-fn send-fn})] | ||
(when (= 1 (count clients')) | ||
(conn/listen! conn :sync #(on-tx conn %))) | ||
nil)) | ||
|
||
(defn client-message [conn channel body] | ||
(case (:message body) | ||
:catching-up | ||
(let [{:keys [patterns server-idx]} body ;; TODO delta from server-idx | ||
{:keys [send-fn]} (client conn channel) | ||
db @conn] | ||
(send-fn channel | ||
{:message :catched-up | ||
:snapshot (serialize/serializable db) ;; TODO patterns | ||
:server-idx (:max-tx db)}) | ||
;; TODO race - external txs between (:max-tx db) and after :status :active | ||
(swap! (:clients (meta conn)) update channel assoc :status :active)) | ||
|
||
:transacting | ||
(doseq [{:keys [tx-data tx-id]} (:txs body)] | ||
;; TODO handle exception here | ||
(conn/transact! conn tx-data {:tx-id tx-id}))) | ||
nil) | ||
|
||
(defn client-disconnected [conn channel] | ||
(let [*clients (:clients (meta conn)) | ||
clients' (swap! *clients dissoc channel)] | ||
(when (= 0 (count clients')) | ||
(conn/unlisten! conn :sync)) | ||
nil)) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,87 @@ | ||
(ns datascript.test.sync | ||
(:require | ||
[clojure.core.async :as async :refer [>! <! go go-loop]] | ||
[clojure.edn :as edn] | ||
[clojure.test :as t :refer [is are deftest testing]] | ||
[datascript.core :as d] | ||
[datascript.sync.server :as server] | ||
[datascript.sync.client :as client] | ||
[datascript.test.core :as tdc])) | ||
|
||
(defn wait-on [*atom cond] | ||
(let [*p (promise)] | ||
(add-watch *atom :wait | ||
(fn [_ _ _ new] | ||
(when (cond new) | ||
(deliver *p true)))) | ||
(when (cond @*atom) | ||
(deliver *p true)) | ||
(let [p (deref *p 100 :timeout)] | ||
(remove-watch *atom :wait) | ||
(when (= :timeout p) | ||
(throw (ex-info "Timeout" {})))))) | ||
|
||
(def freeze | ||
; identity | ||
tdc/transit-write-str | ||
) | ||
|
||
(def thaw | ||
; identity | ||
tdc/transit-read-str | ||
) | ||
|
||
(defn setup [] | ||
(let [server (d/create-conn) | ||
ch (async/chan 10) | ||
ch1 (async/chan 10) | ||
ch2 (async/chan 10) | ||
c1 (client/create-conn nil #(go (>! ch [:c1 (freeze %)]))) | ||
c2 (client/create-conn nil #(go (>! ch [:c2 (freeze %)])))] | ||
(server/client-connected server :c1 (fn [_ msg] (go (>! ch1 (freeze msg))))) | ||
(server/client-connected server :c2 (fn [_ msg] (go (>! ch2 (freeze msg))))) | ||
(go-loop [] | ||
(when-some [msg (<! ch1)] | ||
(let [msg (thaw msg)] | ||
(tdc/log "C1 RCV" msg) | ||
(client/server-message c1 msg) | ||
(recur)))) | ||
(go-loop [] | ||
(when-some [msg (<! ch2)] | ||
(let [msg (thaw msg)] | ||
(tdc/log "C2 RCV" msg) | ||
(client/server-message c2 msg) | ||
(recur)))) | ||
(go-loop [] | ||
(when-some [[id msg] (<! ch)] | ||
(let [msg (thaw msg)] | ||
(tdc/log "SRV RCV" id msg) | ||
(server/client-message server id msg) | ||
(recur)))) | ||
(wait-on c1 some?) | ||
(wait-on c2 some?) | ||
{:server server | ||
:c1 c1 | ||
:c2 c2})) | ||
|
||
(defn wait-all [{:keys [server c1 c2]}] | ||
(wait-on (:pending (meta c1)) empty?) | ||
(wait-on (:pending (meta c2)) empty?) | ||
(wait-on (:server-idx (meta c1)) #(= % (:max-tx @server))) | ||
(wait-on (:server-idx (meta c2)) #(= % (:max-tx @server)))) | ||
|
||
(deftest test-sync | ||
(let [{:keys [server c1 c2] :as setup} (setup)] | ||
(d/transact! c1 [[:db/add 1 :name "Ivan"]]) | ||
(wait-all setup) | ||
(is (= #{[1 :name "Ivan"]} | ||
(tdc/all-datoms @c1))) | ||
(is (= #{[1 :name "Ivan"]} | ||
(tdc/all-datoms @c2))) | ||
(is (= #{[1 :name "Ivan"]} | ||
(tdc/all-datoms @server))))) | ||
|
||
|
||
; (t/test-ns *ns*) | ||
; (t/run-test-var #'test-conn) | ||
|