Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

run parameters prototype #1121

Open
wants to merge 61 commits into
base: master
Choose a base branch
from
Open
Changes from 1 commit
Commits
Show all changes
61 commits
Select commit Hold shift + click to select a range
38fb15a
wip
0xbase12 Jul 12, 2017
5c349ea
wip
0xbase12 Jul 12, 2017
90be0aa
wip
0xbase12 Jul 12, 2017
0b71e24
Merge branch 'master' into issue#1112-run-paramtetes-prototype
0xbase12 Jul 12, 2017
30e8445
working sse prototype
0xbase12 Jul 13, 2017
ae09522
sse and zookeeper working
0xbase12 Jul 14, 2017
f42e004
Merge branch 'master' into issue#1112-run-paramtetes-prototype
0xbase12 Jul 18, 2017
c816f8a
run with action prototype working and some unit test
0xbase12 Jul 18, 2017
6cb9db0
wip
0xbase12 Jul 19, 2017
8f0ed09
wip run parameters working with zookeeper for creation
0xbase12 Jul 20, 2017
28ae980
wip run parameter retrieve value from zookeeper
0xbase12 Jul 20, 2017
5a447b0
wip issue with sse
0xbase12 Jul 21, 2017
56d2acb
fix issue with nginx buffering and accept header logic implemented
0xbase12 Jul 22, 2017
e6c5280
run param edit set value in zk and fix issue with get-version from zo…
0xbase12 Jul 23, 2017
622f147
implement query for run param with sse and json
0xbase12 Jul 24, 2017
cbf8d6d
add unit tests and dependencies
0xbase12 Jul 24, 2017
aa6e37e
simple lifecycle test for run with embded zookeeper server working
0xbase12 Jul 25, 2017
63a813d
Merge branch 'issue#1112-run-paramtetes-prototype' into issue#1111-ne…
0xbase12 Jul 25, 2017
cbfbc7e
minor enhancement and more unit test for start run action
0xbase12 Jul 25, 2017
69d7f19
Merge branch 'master' into issue#1111-new-run-ssclj-resource
0xbase12 Jul 27, 2017
22a4ed8
run parameter life cycle tests and run id should be passed at creatio…
0xbase12 Jul 27, 2017
d83a4bf
minor, vmstate should be set by the client
0xbase12 Jul 27, 2017
5866979
add acl for un parameters
0xbase12 Jul 28, 2017
199d006
rename run-id to run-href
0xbase12 Jul 28, 2017
dbc3d37
SSE data is now the full json of run parameter and little more other …
0xbase12 Jul 28, 2017
585052c
Merge branch 'master' into issue#1111-new-run-ssclj-resource
konstan Aug 28, 2017
39a5dee
names of es fixtures changed
konstan Aug 28, 2017
d3ac0d4
Merge branch 'master' into issue#1111-new-run-ssclj-resource
konstan Aug 28, 2017
65e23f6
Merge branch 'master' into issue#1111-new-run-ssclj-resource
konstan Aug 29, 2017
50916db
add values to run parameter query
0xbase12 Sep 1, 2017
023cc36
add type to run parameter spec
0xbase12 Sep 1, 2017
52075aa
store values in elasticsearch
0xbase12 Sep 1, 2017
65baf4c
Merge branch 'master' into issue#1111-new-run-ssclj-resource
0xbase12 Sep 1, 2017
c12bd33
Merge branch 'master' into issue#1111-new-run-ssclj-resource
0xbase12 Sep 4, 2017
39b7d6b
Merge branch 'issue#1111-new-run-ssclj-resource' of ssh://github.com/…
0xbase12 Sep 4, 2017
266984b
renaming run resource to deployment
0xbase12 Sep 5, 2017
a63d623
update deployment attribute state when run parameter is updated
0xbase12 Sep 5, 2017
591feab
in lifecycle test clean all zk nodes after each test
0xbase12 Sep 6, 2017
afb1b85
FIXME: temporarily adding explicit dependency on clojure/tools.reader
konstan Sep 6, 2017
54a7d0d
Merge branch 'issue#1111-new-run-ssclj-resource' of github.com:slipst…
konstan Sep 6, 2017
d953909
removed unused requires
konstan Sep 6, 2017
cf9faf6
WIP update of state-complete for node instance have special behavior …
0xbase12 Sep 6, 2017
bbf5162
WIP for lock and deployment state move
0xbase12 Sep 8, 2017
f0c8ffd
next-state working and idempotent
0xbase12 Sep 11, 2017
5d509e5
SSE should answer in json
0xbase12 Sep 11, 2017
f7d363f
enhance http accept header interpretation
0xbase12 Sep 11, 2017
2984d4f
enhance code and fix deployment-href in deployment parameter
0xbase12 Sep 11, 2017
24dfaf1
rename deployment-href to deployment in deployment parameter and minor
0xbase12 Sep 11, 2017
870325b
Merge branch 'master' into issue#1111-new-run-ssclj-resource
0xbase12 Sep 12, 2017
86a1d2a
abort deployment possible and creation of deployment from java code
0xbase12 Sep 14, 2017
bead98d
fix deployment spec, add abort
0xbase12 Sep 14, 2017
54d863c
new header auth for java, ssclj deployment template
0xbase12 Sep 15, 2017
d923418
fix authentification regression
0xbase12 Sep 18, 2017
ce5973e
Merge branch 'master' into issue#1111-new-run-ssclj-resource
konstan Sep 18, 2017
0578af4
add conversion from java run to deployment and fake http server for t…
0xbase12 Sep 18, 2017
af9e526
Merge branch 'master' into issue#1111-new-run-ssclj-resource
0xbase12 Sep 18, 2017
d3d881a
Merge branch 'issue#1111-new-run-ssclj-resource' of ssh://github.com/…
0xbase12 Sep 18, 2017
41a1327
fix lifecycle tests for deployment
0xbase12 Sep 19, 2017
a683faf
set java runtime parameter are propagated to ssclj and dyn creation o…
0xbase12 Sep 19, 2017
daaa39f
terminate action on deployment to be continued
0xbase12 Sep 20, 2017
95a3763
minor
0xbase12 Sep 21, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
working sse prototype
0xbase12 committed Jul 13, 2017
commit 30e8445a6da9721671f29b07e2f052dc598c685c
2 changes: 1 addition & 1 deletion ssclj/jar/build.boot
Original file line number Diff line number Diff line change
@@ -36,6 +36,7 @@
[metrics-clojure-jvm]
[metrics-clojure-graphite]
[me.raynes/fs]
[org.clojure/core.async :exclusions []]
[org.clojure/data.json]
[org.clojure/java.classpath]
[org.clojure/tools.cli]
@@ -45,7 +46,6 @@
[ring/ring-core]
[ring/ring-json]
[superstring]
[ring-sse/ring-sse]

[com.sixsq.slipstream/utils]
[com.sixsq.slipstream/auth]
Original file line number Diff line number Diff line change
@@ -10,8 +10,12 @@
[superstring.core :as str]
[ring.util.response :as r]
[com.sixsq.slipstream.util.response :as sr]
[ring.sse :as sse]
))
[com.sixsq.slipstream.db.impl :as db]
[clojure.tools.logging :as log]
[com.sixsq.slipstream.ssclj.util.sse :as sse]
[clojure.core.async :as async]
)
(:import (clojure.lang ExceptionInfo)))

(def ^:const resource-name "RunParameter")

@@ -52,38 +56,35 @@
[request]
(add-impl request))

;(def handler
; (sse/event-channel-handler
; (fn [request response raise event-ch]
; (async/go
; (dotimes [i 20]
; (let [event {:id (java.util.UUID/randomUUID)
; :name "foo"
; :data (json/generate-string {:foo "bar"})}]
; (async/>! event-ch event)
; (async/<! (async/timeout 1000))))
; (async/close! event-ch)))
; {:on-client-disconnect #(log/debug "sse/on-client-disconnect: " %)}))

;(defn retrieve-fn
; [resource-name]
; (fn [{{uuid :uuid} :params :as request}]
; (try
; (-> (str (u/de-camelcase resource-name) "/" uuid)
; (db/retrieve request)
; (a/can-view? request)
; (crud/set-operations request)
; (r/json-response))
; (catch ExceptionInfo ei
; (ex-data ei)))))
(def handler
(sse/event-channel-handler
(fn [request response raise event-ch]
(async/go
(dotimes [i 20]
(let [event {:id (java.util.UUID/randomUUID)
:name "foo"
:data "bar"}]
(async/>! event-ch event)
(async/<! (async/timeout 1000))))
(async/close! event-ch)))
{:on-client-disconnect #(log/debug "sse/on-client-disconnect: " %)}))

(defn retrieve-fn
[resource-name]
(fn [{{uuid :uuid} :params :as request}]
(try
(-> (str (u/de-camelcase resource-name) "/" uuid)
(db/retrieve request)
(a/can-view? request)
(crud/set-operations request))
(catch ExceptionInfo ei
(ex-data ei)))))

(def retrieve-impl (std-crud/retrieve-fn resource-name))

(defmethod crud/retrieve resource-name
[request]
;(handler request (partial retrieve-fn resource-name) (fn[e] (log/error e)))
(std-crud/retrieve-fn resource-name)
)
(handler request))

(def edit-impl (std-crud/edit-fn resource-name))

139 changes: 139 additions & 0 deletions ssclj/jar/src/com/sixsq/slipstream/ssclj/util/sse.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
(ns com.sixsq.slipstream.ssclj.util.sse
(:require [clojure.core.async :as async]
[clojure.java.io :as io]
[clojure.string :as string]
[ring.core.protocols :as ring]
[ring.util.response :as ring-response]
[manifold.stream :as s]
[manifold.deferred :as d])
)

(def CRLF "\r\n")
(def EVENT_FIELD "event: ")
(def DATA_FIELD "data: ")
(def ID_FIELD "id: ")

(defn mk-data
([name data]
(mk-data name data nil))
([name data id]
(let [sb (StringBuilder.)]
(when name
(.append sb EVENT_FIELD)
(.append sb name)
(.append sb CRLF))

(doseq [part (string/split data #"\r?\n")]
(.append sb DATA_FIELD)
(.append sb part)
(.append sb CRLF))

(when (not-empty id)
(.append sb ID_FIELD)
(.append sb id)
(.append sb CRLF))

(.append sb CRLF)
(str sb))))

(defn send-event
[channel name data id put-fn raise]
(try
(put-fn channel (mk-data name data id))
(catch Throwable t
(async/close! channel)
(raise t)
nil)))

(defn- start-dispatch-loop
"Kicks off the loop that transfers data provided by the application
on `event-channel` to the HTTP infrastructure via
`response-channel`."
[{:keys [event-channel response-channel heartbeat-delay on-client-disconnect raise] :as opts}]
(async/go
(loop []
(let [hb-timeout (async/timeout (* 1000 heartbeat-delay))
[event port] (async/alts! [event-channel hb-timeout])]
(cond
(= port hb-timeout)
(when (async/>! response-channel CRLF)
(recur))

(and (some? event) (= port event-channel))
(let [{event-name :name
event-data :data
event-id :id}
(if (map? event)
(reduce (fn [agg [k v]] (assoc agg k (str v))) {} event)
{:data (str event)})]
(when (send-event response-channel event-name event-data event-id async/put! raise)
(recur))))))
(async/close! event-channel)
(async/close! response-channel)
(when on-client-disconnect (on-client-disconnect))
:done))

(defn- start-stream
"Starts an SSE event stream and initiates a heartbeat to keep the
connection alive. `stream-ready-fn` will be called with a core.async
channel and the initial response map. The application can then put
maps with keys :id, :name, and :data on that channel to cause SSE
events to be sent to the client. Either the client or the
application may close the channel to terminate and clean up the
event stream; the client closes it by closing the connection. The
SSE's core.async buffer can either be a fixed buffer (n) or a
0-arity function that returns a buffer."
[{:keys [stream-ready-fn request respond raise heartbeat-delay bufferfn-or-n on-client-disconnect]}]
(let [heartbeat-delay (or heartbeat-delay 10)
bufferfn-or-n (or bufferfn-or-n 10)
response-channel (async/chan (if (fn? bufferfn-or-n) (bufferfn-or-n) bufferfn-or-n))
response (-> (ring-response/response (s/->source response-channel))
(ring-response/content-type "text/event-stream") ;; TODO: content negotiation? "text/event-stream+json"?
(ring-response/charset "UTF-8")
(ring-response/header "Connection" "close")
(ring-response/header "Cache-Control" "no-cache"))
;; TODO: re-create CORS support as per original: (update-in [:headers] merge (:cors-headers context))
event-channel (async/chan (if (fn? bufferfn-or-n) (bufferfn-or-n) bufferfn-or-n))]
(respond response)
(async/thread
(stream-ready-fn request response raise event-channel)
:done)
(start-dispatch-loop (merge {:event-channel event-channel
:response-channel response-channel
:heartbeat-delay heartbeat-delay
:raise raise}
(when on-client-disconnect
{:on-client-disconnect #(on-client-disconnect response)})))))

(defn ring->aleph [handler]
(fn [request]
(let [response (d/deferred)]
(handler request #(d/success! response %) #(d/error! response %))
response)))

(defn event-channel-handler
"Returns a Ring async handler which will start a Server Sent Event
stream with the requesting client. `stream-ready-fn` will be called
in a future, and will be passed the original request, the initial
response, the raise fn, and the event channel.

Options:

:buffer - either an integer buffer size, or a 0-arity function that
returns a buffer.
:heartbeat-delay - An integer number of seconds between heartbeat
messages
:on-client-disconnect - A function of one argument (the initial response)
which will be called when the client permanently disconnects."
([stream-ready-fn]
(event-channel-handler stream-ready-fn {}))
([stream-ready-fn {:keys [buffer heartbeat-delay on-client-disconnect]}]
(ring->aleph
(fn [request respond raise]
(start-stream {:stream-ready-fn stream-ready-fn
:request request
:respond respond
:raise raise
:heartbeat-delay heartbeat-delay
:bufferfn-or-n buffer
:on-client-disconnect on-client-disconnect})))))