Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion project.clj
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@
"ring-jetty-adapter/src"
"ring-servlet/src"
"ring-jakarta-servlet/src"
"ring-websocket-protocols/src"]}
"ring-websocket-protocols/src"
"ring-sse-protocols/src"]}
:aliases {"test" ["sub" "test"]
"test-all" ["sub" "test-all"]
"bench" ["sub" "-s" "ring-bench" "run"]
Expand Down
1 change: 1 addition & 0 deletions ring-core/project.clj
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
:dependencies [[org.clojure/clojure "1.9.0"]
[org.ring-clojure/ring-core-protocols "1.15.2"]
[org.ring-clojure/ring-websocket-protocols "1.15.2"]
[org.ring-clojure/ring-sse-protocols "1.15.2"]
[ring/ring-codec "1.3.0"]
[commons-io "2.20.0"]
[org.apache.commons/commons-fileupload2-core "2.0.0-M4"]
Expand Down
23 changes: 23 additions & 0 deletions ring-core/src/ring/sse.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
(ns ring.sse
"Protocols and utility functions for SSE support."
(:refer-clojure :exclude [send])
(:require [ring.sse.protocols :as p]))


(extend-type clojure.lang.IPersistentMap
p/Listener
(on-open [m sender]
(when-let [kv (find m :on-open)] ((val kv) sender))))


(defn send
"Sends a SSE message"
[sender sso-message]
(p/-send sender sso-message))


(defn sse-response?
"Returns true if the response contains a SSE emitter."
[response]
(contains? response ::listener))

4 changes: 2 additions & 2 deletions ring-jakarta-servlet/src/ring/util/jakarta/servlet.clj
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@
:servlet-context (.getServletContext servlet)
:servlet-context-path (.getContextPath request)}))

(defn- set-headers [^HttpServletResponse response, headers]
(defn set-headers [^HttpServletResponse response, headers]
(doseq [[key val-or-vals] headers]
(if (string? val-or-vals)
(.setHeader response key val-or-vals)
Expand All @@ -69,7 +69,7 @@
(when-let [content-type (get headers "Content-Type")]
(.setContentType response content-type)))

(defn- make-output-stream
(defn make-output-stream
[^HttpServletResponse response ^AsyncContext context]
(let [os (.getOutputStream response)]
(if (nil? context)
Expand Down
55 changes: 52 additions & 3 deletions ring-jetty-adapter/src/ring/adapter/jetty.clj
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@
(:require [clojure.java.io :as io]
[ring.util.jakarta.servlet :as servlet]
[ring.websocket :as ws]
[ring.websocket.protocols :as wsp])
[ring.websocket.protocols :as wsp]
[ring.sse :as sse]
[ring.sse.protocols :as ssep])
(:import [java.nio ByteBuffer]
[java.time Duration]
[org.eclipse.jetty.server
Expand Down Expand Up @@ -108,14 +110,61 @@
(.setMaxBinaryMessageSize (:ws-max-binary-size options 65536)))
(.upgrade container creator request response)))

(defn- sse-write [^java.io.Writer out k v]
(when v
(doto out
(.write (name k))
(.write ": ")
(.write (str v))
(.write "\r\n"))))

(defn- make-sse-sender [^java.io.OutputStream resp-stream ^java.util.concurrent.CountDownLatch close-latch]
(let [out (io/writer resp-stream)]
(reify ssep/Sender
(-send [_ {:keys [id event data]}]
(try
(doto out
(sse-write :id id)
(sse-write :event event)
(sse-write :data data)
(.write "\r\n")
(.flush))
(catch java.io.IOException _
(.countDown close-latch)))))))

(defn- upgrade-to-sse
[^Request request ^HttpServletResponse response response-map _options]
(let [context (.startAsync request)
output (servlet/make-output-stream response context)]
(try
(let [close-latch (java.util.concurrent.CountDownLatch. 1)
on-open (-> response-map :ring.sse/listener :on-open)
sse-sender (make-sse-sender output close-latch)]
(doto response
(.setStatus (:status response-map 200))
(servlet/set-headers (assoc (:headers response-map) "Content-Type" "text/event-stream")))
(.start context (fn [] (on-open sse-sender)))
(.await close-latch))
;; Client terminates the connection:
(catch java.io.IOException _)
(catch java.lang.InterruptedException _)
(finally
(.close output)))))

(defn- proxy-handler ^ServletHandler [handler options]
(proxy [ServletHandler] []
(doHandle [_ ^Request base-request request response]
(doHandle [_ ^Request base-request ^Request request response]
(let [request-map (servlet/build-request-map request)
response-map (handler request-map)]
(try
(if (ws/websocket-response? response-map)
(cond
(ws/websocket-response? response-map)
(upgrade-to-websocket request response response-map options)

(sse/sse-response? response-map)
(upgrade-to-sse request response response-map options)

:else
(servlet/update-servlet-response response response-map))
(finally
(.setHandled base-request true)
Expand Down
54 changes: 53 additions & 1 deletion ring-jetty-adapter/test/ring/adapter/test/jetty.clj
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@
[less.awful.ssl :as less-ssl]
[ring.core.protocols :as p]
[ring.websocket :as ws]
[ring.websocket.protocols :as wsp])
[ring.websocket.protocols :as wsp]
[ring.sse :as sse]
[ring.sse.protocols :as ssep])
(:import [java.io File]
[java.nio ByteBuffer]
[java.nio.file Paths]
Expand Down Expand Up @@ -940,6 +942,56 @@
(is (realized? closer))
(is (= @closer [1009 "Binary message too large: 6 > 5"])))))))

;; All browsers have SSE support but I could not find any simple SSE client libs
;; that I could use for testing. This is rather crude way to test SSE, but...

(defn sse-get [test-port]
(with-open [socket (java.net.Socket. "localhost" test-port)
out (-> (.getOutputStream socket) (io/writer))
in (-> (.getInputStream socket) (io/reader))]
(let [read-kv (fn [key-fn]
(loop [data {}]
(let [line (.readLine in)]
(if (str/blank? line)
data
(let [[_ k v] (re-matches #"([^:]+):\s*(.*)" line)]
(recur (assoc data (key-fn k) v)))))))]
(doto out
(.write "GET / HTTP/1.1\r\n")
(.write "host: localhost\r\n")
(.write "accept: text/event-stream\r\n")
(.write "connection: close\r\n")
(.write "\r\n")
(.flush))
(let [status-line (.readLine in)
[_ status _] (str/split status-line #"\s+")
headers (read-kv str/lower-case)]
{:status (parse-long status)
:headers headers
:body (->> (repeatedly (fn [] (read-kv keyword)))
(take-while (fn [message] (-> message :event (not= "close"))))
(into []))}))))

(deftest run-jetty-sse-test
(let [messages [{:id "1"
:event "test"
:data "message 1"}
{:id "2"
:data "message 2"}]
handler (constantly
{::sse/listener {:on-open (fn [sse-sender]
;; Send the test messages:
(doseq [message messages]
(sse/send sse-sender message))
;; Send close message so that the `sse-get` knows when to stop
;; reading response:
(sse/send sse-sender {:event "close"}))}})]
(with-server handler {:port test-port}
(let [resp (sse-get test-port)]
(is (= 200 (:status resp)))
(is (= "text/event-stream" (get-in resp [:headers "content-type"])))
(is (= messages (:body resp)))))))

(deftest run-jetty-async-websocket-test
(testing "ping/pong"
(let [log (atom [])
Expand Down
9 changes: 9 additions & 0 deletions ring-sse-protocols/project.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
(defproject org.ring-clojure/ring-sse-protocols "1.15.2"
:description "Ring protocols for SSE."
:url "https://github.com/ring-clojure/ring"
:scm {:dir ".."}
:license {:name "The MIT License"
:url "http://opensource.org/licenses/MIT"}
:dependencies []
:profiles
{:dev {:dependencies [[org.clojure/clojure "1.9.0"]]}})
12 changes: 12 additions & 0 deletions ring-sse-protocols/src/ring/sse/protocols.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
(ns ring.sse.protocols)


(defprotocol Listener
"A protocol for handling SSE responses. The second argument is an object that
satisfies the SSESender protocol."
(on-open [listener sse-sender] "Called when the SSE response is opened and ready."))


(defprotocol Sender
"A protocol for sending SSE responses."
(-send [sender message] "Sends a SSE message"))
Loading