Skip to content

feat: Possibility to immediately kill worker thread #54

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 33 additions & 5 deletions lib/ld-eventsource/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ def initialize(uri,
socket_factory: nil)
@uri = URI(uri)
@stopped = Concurrent::AtomicBoolean.new(false)
@shutdown = Concurrent::Event.new

@headers = headers.clone
@connect_timeout = connect_timeout
Expand Down Expand Up @@ -141,7 +142,8 @@ def initialize(uri,

yield self if block_given?

Thread.new { run_stream }.name = 'LD/SSEClient'
@thread = Thread.new { run_stream }
@thread.name = 'LD/SSEClient'
end

#
Expand Down Expand Up @@ -185,9 +187,31 @@ def on_error(&action)
# has no effect if called a second time.
#
def close
if @stopped.make_true
reset_http
end
reset_http if @stopped.make_true
end

#
# Permanently shuts down the client and its connection, and kills the background worker thread. No further events will be dispatched. This
# has no effect if called a second time.
#
def kill
close
@thread&.kill
end

#
# Permanently shuts down the client and its connection, and waits until shutdown is complete. No further events will be dispatched. This
# has no effect if called a second time.
#
# If a timeout is specified, the method will return after that amount of time has passed, even if the shutdown is
# not complete.
#
# @param timeout [Float] (nil) maximum time to wait for shutdown, in seconds
# @return [Boolean] true if the shutdown completed successfully, false if it timed out
#
def close_and_wait(timeout = nil)
close
@shutdown.wait(timeout)
end

#
Expand Down Expand Up @@ -230,7 +254,8 @@ def run_stream
end
# There's a potential race if close was called in the middle of the previous line, i.e. after we
# connected but before @cxn was set. Checking the variable again is a bit clunky but avoids that.
return if @stopped.value
break if @stopped.value

read_stream(resp) if !resp.nil?
rescue => e
# When we deliberately close the connection, it will usually trigger an exception. The exact type
Expand All @@ -241,12 +266,15 @@ def run_stream
log_and_dispatch_error(e, "Unexpected error from event source")
end
end

begin
reset_http
rescue StandardError => e
log_and_dispatch_error(e, "Unexpected error while closing stream")
end
end

@shutdown.set
end

# Try to establish a streaming connection. Returns the StreamingHTTPConnection object if successful.
Expand Down
9 changes: 5 additions & 4 deletions spec/client_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,11 @@ def with_client(client)
begin
yield client
ensure
client.close
raise 'Failed to close within timeout' unless client.close_and_wait(1)
end
end


def send_stream_content(res, content, keep_open:)
res.content_type = "text/event-stream"
res.status = 200
Expand All @@ -57,7 +58,7 @@ def send_stream_content(res, content, keep_open:)
requests << req
send_stream_content(res, "", keep_open: true)
end

headers = { "Authorization" => "secret" }

with_client(subject.new(server.base_uri, headers: headers)) do |client|
Expand All @@ -82,7 +83,7 @@ def send_stream_content(res, content, keep_open:)
requests << req
send_stream_content(res, "", keep_open: true)
end

headers = { "Authorization" => "secret" }

with_client(subject.new(server.base_uri, headers: headers, last_event_id: id)) do |client|
Expand Down Expand Up @@ -438,7 +439,7 @@ def send_stream_content(res, content, keep_open:)
server.setup_response("/") do |req,res|
send_stream_content(res, "", keep_open: true)
end

with_client(subject.new(server.base_uri)) do |client|
expect(client.closed?).to be(false)

Expand Down