Skip to content

feat: Add thread reader to client #54

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: main
Choose a base branch
from

Conversation

Exterm1nate
Copy link
Contributor

When stopping the client its thread continues working for some time. This is not a problem in production, but in tests it is. Webmock removes its stub after the test, while the thread is still working, so we sometimes receive such errors:

#<Thread:0x0000761ef6935090@LD/SSEClient /usr/local/bundle/ruby/3.1.0/gems/ld-eventsource-2.2.3/lib/ld-eventsource/client.rb:144 run> terminated with exception (report_on_exception is true):
/usr/local/bundle/ruby/3.1.0/gems/webmock-3.25.1/lib/webmock/http_lib_adapters/http_rb/webmock.rb:63:in `halt': Real HTTP connections are disabled. Unregistered request: GET http://localhost:3003/admin/v1/emails with headers {'Accept'=>'text/event-stream', 'Authorization'=>'Bearer test_key', 'Cache-Control'=>'no-cache', 'Connection'=>'close', 'Host'=>'localhost:3003', 'Last-Event-Id'=>'0', 'User-Agent'=>'ruby-eventsource'} (WebMock::NetConnectNotAllowedError)

This PR adds a reader for thread, so it can be killed after the test example.

subject(:call_method) do
  processor.start
  sleep 0.1
end

after do
  processor.connection.thread&.kill
end

@Exterm1nate Exterm1nate requested a review from a team as a code owner April 18, 2025 08:01
@keelerm84
Copy link
Member

@Exterm1nate thank you for raising the issue and working on this contribution.

I would prefer to not expose the thread as that's an implementation detail, and it enables a lot of misuse. I've updated the PR to add a close_and_wait method. This will block until that thread has shutdown, or some timeout period has elapsed.

Would this work for you?

@Exterm1nate
Copy link
Contributor Author

This solution appears to work, but it has a noticeable disadvantage: tests will take longer if we wait for the last run_stream cycle to complete. It would be better if we could terminate the thread immediately without waiting.

@keelerm84
Copy link
Member

tests will take longer if we wait for the last run_stream cycle to complete.

I wouldn't expect this to take long at all. That method should early exit when it sees we have called close. This makes me think something else is preventing the thread from exiting immediately, like a hung IO reader.

Do you have a small reproduction case I could reference to dig into this further?

@Exterm1nate
Copy link
Contributor Author

This code emulates my test suite:

RSpec.describe "stream processor" do
  context "with stream" do
    subject(:call_method) do
      stream

      # Wait for SSE client preparations
      sleep 0.1
    end

    let!(:obj) { instance_double("object", success: nil, failure: nil) }
    let(:stream) do
      stream = SSE::Client.new("http://localhost:3000")

      stream.on_event do |event|
        # Success logic
        obj.success
      end

      stream.on_error do |e|
        # Fail logic
        obj.failure
      end

      stream
    end

    before do
      stub_request(:get, "http://localhost:3000").to_return({
        body: "event: created\ndata: my_data\n\n",
        headers: { "Content-Type" => "text/event-stream" },
      })
    end

    after do
      # Uncomment one of:
      # stream.close
      # stream.close_and_wait
    end

    10.times do |i|
      it "executes success callback [#{i}]" do
        call_method

        expect(obj).to have_received(:success)
      end
    end
  end

  context "without stream" do
    10.times do |i|
      it "does nothing [#{i}]" do
        sleep 0.3

        expect(true).to be(true)
      end
    end
  end
end

Context with stream runs streams 10 times and checks that an event occurred, whereas without stream just sleeps for 300 ms 10 times to emulate other application specs.

When using stream.close (line 36) test suite takes 4.169 sec to complete (1.169 without non-stream specs) and there are such errors in console:

#<Thread:0x00007e67e4d15978@LD/SSEClient /home/kirill/.rbenv/versions/3.1.7/lib/ruby/gems/3.1.0/bundler/gems/ruby-eventsource-dc3622061685/lib/ld-eventsource/client.rb:145 run> terminated with exception (report_on_exception is true):
/home/kirill/.rbenv/versions/3.1.7/lib/ruby/gems/3.1.0/gems/webmock-3.25.1/lib/webmock/http_lib_adapters/http_rb/webmock.rb:63:in `halt': Real HTTP connections are disabled. Unregistered request: GET http://localhost:3000/ with headers {'Accept'=>'text/event-stream', 'Cache-Control'=>'no-cache', 'Connection'=>'close', 'Host'=>'localhost:3000', 'User-Agent'=>'ruby-eventsource'} (WebMock::NetConnectNotAllowedError)

# ...

============================================================
	from /home/kirill/.rbenv/versions/3.1.7/lib/ruby/gems/3.1.0/gems/webmock-3.25.1/lib/webmock/http_lib_adapters/http_rb/webmock.rb:13:in `exec'
	from /home/kirill/.rbenv/versions/3.1.7/lib/ruby/gems/3.1.0/gems/webmock-3.25.1/lib/webmock/http_lib_adapters/http_rb/client.rb:10:in `perform'
	from /home/kirill/.rbenv/versions/3.1.7/lib/ruby/gems/3.1.0/gems/http-5.2.0/lib/http/client.rb:31:in `request'
	from /home/kirill/.rbenv/versions/3.1.7/lib/ruby/gems/3.1.0/bundler/gems/ruby-eventsource-dc3622061685/lib/ld-eventsource/client.rb:283:in `block in connect'
	from /home/kirill/.rbenv/versions/3.1.7/lib/ruby/gems/3.1.0/bundler/gems/ruby-eventsource-dc3622061685/lib/ld-eventsource/client.rb:272:in `loop'
	from /home/kirill/.rbenv/versions/3.1.7/lib/ruby/gems/3.1.0/bundler/gems/ruby-eventsource-dc3622061685/lib/ld-eventsource/client.rb:272:in `connect'
	from /home/kirill/.rbenv/versions/3.1.7/lib/ruby/gems/3.1.0/bundler/gems/ruby-eventsource-dc3622061685/lib/ld-eventsource/client.rb:241:in `run_stream'
	from /home/kirill/.rbenv/versions/3.1.7/lib/ruby/gems/3.1.0/bundler/gems/ruby-eventsource-dc3622061685/lib/ld-eventsource/client.rb:145:in `block in initialize'

When using stream.close_and_wait (line 37) test suite finishes in 9.649 sec (6.649 without non-stream specs) and there are no errors in console.

When using thread.kill directly test suite takes the same time to complete as stream.close and there are no errors in console too.

The performance difference between stream.close_and_wait and thread.kill is over 5x, which is critical.

If you don't want to expose the thread (and I agree with it, it's an implementation detail that should be private) maybe we can add a method, that immediately stops the stream? Like client#close!?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants