Skip to content

Commit bc6a2c1

Browse files
committed
Improved separation between client and worker.
1 parent 1b7cf70 commit bc6a2c1

File tree

13 files changed

+254
-109
lines changed

13 files changed

+254
-109
lines changed

async-container-supervisor.gemspec

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,5 @@ Gem::Specification.new do |spec|
2727
spec.add_dependency "async-container", "~> 0.22"
2828
spec.add_dependency "async-service"
2929
spec.add_dependency "io-endpoint"
30-
spec.add_dependency "io-stream"
3130
spec.add_dependency "memory-leak", "~> 0.3"
3231
end

bake/async/container/supervisor.rb

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
2+
def initialize(...)
3+
super
4+
5+
require "async/container/supervisor"
6+
end
7+
8+
def restart
9+
client do |connection|
10+
connection.call(do: :restart)
11+
end
12+
end
13+
14+
def status
15+
client do |connection|
16+
connection.call(do: :status)
17+
end
18+
end
19+
20+
private
21+
22+
def endpoint
23+
Async::Container::Supervisor.endpoint
24+
end
25+
26+
def client(&block)
27+
Sync do
28+
Async::Container::Supervisor::Client.new(endpoint: self.endpoint).connect(&block)
29+
end
30+
end

example/simple/simple.rb

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,7 @@ def setup(container)
1212

1313
container.run(name: self.class.name, count: 4, restart: true, health_check_timeout: 2) do |instance|
1414
Async do
15-
client = Async::Container::Supervisor::Client.new(instance, @evaluator.supervisor_endpoint)
16-
client.run
15+
Async::Container::Supervisor::Worker.new(instance, endpoint: @evaluator.supervisor_endpoint).run
1716

1817
start_time = Time.now
1918

@@ -45,6 +44,6 @@ def setup(container)
4544
include Async::Container::Supervisor::Environment
4645

4746
monitors do
48-
[Async::Container::Supervisor::MemoryMonitor.new(interval: 1, limit: 1024 * 1024 * 100)]
47+
[Async::Container::Supervisor::MemoryMonitor.new(interval: 1, limit: 1024 * 1024 * 400)]
4948
end
5049
end

fixtures/async/container/supervisor/a_server.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ def around(&block)
4646

4747
let(:registration_monitor) {RegistrationMonitor.new}
4848
let(:monitors) {[registration_monitor]}
49-
let(:server) {Async::Container::Supervisor::Server.new(@bound_endpoint, monitors: monitors)}
49+
let(:server) {Async::Container::Supervisor::Server.new(endpoint: @bound_endpoint, monitors: monitors)}
5050

5151
before do
5252
@bound_endpoint = endpoint.bound

lib/async/container/supervisor.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
require_relative "supervisor/version"
77

88
require_relative "supervisor/server"
9+
require_relative "supervisor/worker"
910
require_relative "supervisor/client"
1011

1112
require_relative "supervisor/memory_monitor"

lib/async/container/supervisor/client.rb

Lines changed: 29 additions & 87 deletions
Original file line numberDiff line numberDiff line change
@@ -3,120 +3,62 @@
33
# Released under the MIT License.
44
# Copyright, 2025, by Samuel Williams.
55

6-
require "io/stream"
76
require_relative "connection"
7+
require_relative "dispatchable"
88

99
module Async
1010
module Container
1111
module Supervisor
1212
class Client
13-
def self.run(...)
14-
self.new(...).run
13+
def initialize(endpoint: Supervisor.endpoint)
14+
@endpoint = endpoint
1515
end
1616

17-
def initialize(instance, endpoint = Supervisor.endpoint)
18-
@instance = instance
19-
@endpoint = endpoint
17+
include Dispatchable
18+
19+
protected def connect!
20+
peer = @endpoint.connect
21+
return Connection.new(peer, 0)
2022
end
2123

22-
def dispatch(call)
23-
method_name = "do_#{call.message[:do]}"
24-
self.public_send(method_name, call)
24+
# Called when a connection is established.
25+
protected def connected!(connection)
26+
# Do nothing by default.
2527
end
2628

29+
# Connect to the server.
2730
def connect
28-
unless @connection
29-
peer = @endpoint.connect
30-
stream = IO::Stream(peer)
31-
@connection = Connection.new(stream, 0, instance: @instance)
32-
33-
# Register the instance with the server:
34-
Async do
35-
@connection.call(do: :register, state: @instance)
36-
end
37-
end
31+
connection = connect!
32+
connection.run_in_background(self)
3833

39-
return @connection unless block_given?
34+
connected!(connection)
35+
36+
return connection unless block_given?
4037

4138
begin
42-
yield @connection
39+
yield connection
4340
ensure
44-
@connection.close
45-
end
46-
end
47-
48-
def close
49-
if connection = @connection
50-
@connection = nil
5141
connection.close
5242
end
5343
end
5444

55-
private def dump(call)
56-
if path = call[:path]
57-
File.open(path, "w") do |file|
58-
yield file
59-
end
60-
61-
call.finish(path: path)
62-
else
63-
buffer = StringIO.new
64-
yield buffer
65-
66-
call.finish(data: buffer.string)
67-
end
68-
end
69-
70-
def do_scheduler_dump(call)
71-
dump(call) do |file|
72-
Fiber.scheduler.print_hierarchy(file)
73-
end
74-
end
75-
76-
def do_memory_dump(call)
77-
require "objspace"
78-
79-
dump(call) do |file|
80-
ObjectSpace.dump_all(output: file)
81-
end
82-
end
83-
84-
def do_thread_dump(call)
85-
dump(call) do |file|
86-
Thread.list.each do |thread|
87-
file.puts(thread.inspect)
88-
file.puts(thread.backtrace)
89-
end
90-
end
91-
end
92-
93-
def do_garbage_profile_start(call)
94-
GC::Profiler.enable
95-
call.finish(started: true)
96-
end
97-
98-
def do_garbage_profile_stop(call)
99-
GC::Profiler.disable
100-
101-
dump(connection, message) do |file|
102-
file.puts GC::Profiler.result
103-
end
104-
end
105-
45+
# Run the client in a loop, reconnecting if necessary.
10646
def run
107-
Async do |task|
47+
Async do
10848
loop do
109-
connect do |connection|
110-
connection.run(self)
49+
connection = connect!
50+
51+
Async do
52+
connected!(connection)
11153
end
112-
rescue => error
113-
Console.error(self, "Unexpected error while running client!", exception: error)
11454

115-
# Retry after a small delay:
55+
connection.run(self)
56+
rescue => error
57+
Console.error(self, "Connection failed:", exception: error)
11658
sleep(rand)
59+
ensure
60+
connection.close
11761
end
118-
ensure
119-
task.stop
12062
end
12163
end
12264
end

lib/async/container/supervisor/connection.rb

Lines changed: 39 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,14 @@ def initialize(connection, id, message)
1818
@queue = ::Thread::Queue.new
1919
end
2020

21+
def as_json(...)
22+
@message
23+
end
24+
25+
def to_json(...)
26+
as_json.to_json(...)
27+
end
28+
2129
# @attribute [Connection] The connection that initiated the call.
2230
attr :connection
2331

@@ -36,6 +44,11 @@ def pop(...)
3644
@queue.pop(...)
3745
end
3846

47+
# The call was never completed and the connection itself was closed.
48+
def close
49+
@queue.close
50+
end
51+
3952
def each(&block)
4053
while response = self.pop
4154
yield response
@@ -47,6 +60,10 @@ def finish(**response)
4760
@queue.close
4861
end
4962

63+
def fail(**response)
64+
self.finish(failed: true, **response)
65+
end
66+
5067
def closed?
5168
@queue.closed?
5269
end
@@ -86,13 +103,13 @@ def self.call(connection, **message, &block)
86103
end
87104
end
88105

89-
def initialize(stream, id, **state)
106+
def initialize(stream, id = 0, **state)
90107
@stream = stream
108+
@id = id
91109
@state = state
92110

111+
@reader = nil
93112
@calls = {}
94-
95-
@id = id
96113
end
97114

98115
# @attribute [Hash(Integer, Call)] Calls in progress.
@@ -153,11 +170,30 @@ def run(target)
153170
end
154171
end
155172

173+
def run_in_background(target, parent: Task.current)
174+
@reader ||= parent.async do
175+
self.run(target)
176+
end
177+
end
178+
156179
def close
180+
if @reader
181+
@reader.stop
182+
@reader = nil
183+
end
184+
157185
if stream = @stream
158186
@stream = nil
159187
stream.close
160188
end
189+
190+
if @calls
191+
@calls.each do |id, call|
192+
call.close
193+
end
194+
195+
@calls.clear
196+
end
161197
end
162198
end
163199
end
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
# frozen_string_literal: true
2+
3+
# Released under the MIT License.
4+
# Copyright, 2025, by Samuel Williams.
5+
6+
require_relative "connection"
7+
require_relative "endpoint"
8+
9+
require "io/stream"
10+
11+
module Async
12+
module Container
13+
module Supervisor
14+
module Dispatchable
15+
def dispatch(call)
16+
method_name = "do_#{call.message[:do]}"
17+
self.public_send(method_name, call)
18+
rescue => error
19+
Console.error(self, "Error while dispatching call.", exception: error, call: call)
20+
21+
call.fail(error: {
22+
class: error.class,
23+
message: error.message,
24+
backtrace: error.backtrace,
25+
})
26+
end
27+
end
28+
end
29+
end
30+
end

lib/async/container/supervisor/environment.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ def monitors
4343
end
4444

4545
def make_server(endpoint)
46-
Server.new(endpoint, monitors: self.monitors)
46+
Server.new(endpoint: endpoint, monitors: self.monitors)
4747
end
4848
end
4949
end

lib/async/container/supervisor/memory_monitor.rb

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ module Async
1010
module Container
1111
module Supervisor
1212
class MemoryMonitor
13-
def initialize(interval: 10, limit: nil)
13+
def initialize(interval: 10, limit: nil, &block)
1414
@interval = interval
1515
@cluster = Memory::Leak::Cluster.new(limit: limit)
1616
@processes = Hash.new{|hash, key| hash[key] = Set.new.compare_by_identity}
@@ -42,6 +42,12 @@ def remove(connection)
4242
end
4343
end
4444

45+
def status(call)
46+
@processes.each do
47+
call.push(memory_monitor: @cluster)
48+
end
49+
end
50+
4551
def run
4652
Async do
4753
while true
@@ -54,6 +60,7 @@ def run
5460

5561
response = connection.call(do: :memory_dump, path: path, timeout: 30)
5662
Console.info(self, "Memory dump saved to:", path, response: response)
63+
@block.call(response) if @block
5764
end
5865

5966
# Kill the process:

0 commit comments

Comments
 (0)