Skip to content

Commit 7794e8a

Browse files
committed
Improve MemoryMonitor implementation.
1 parent 9129b9a commit 7794e8a

File tree

3 files changed

+55
-17
lines changed

3 files changed

+55
-17
lines changed

async-container-supervisor.gemspec

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,5 +27,5 @@ Gem::Specification.new do |spec|
2727
spec.add_dependency "async-container", "~> 0.22"
2828
spec.add_dependency "async-service"
2929
spec.add_dependency "io-endpoint"
30-
spec.add_dependency "memory-leak", "~> 0.3"
30+
spec.add_dependency "memory-leak", "~> 0.5"
3131
end

lib/async/container/supervisor.rb

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,3 +13,13 @@
1313

1414
require_relative "supervisor/environment"
1515
require_relative "supervisor/supervised"
16+
17+
# @namespace
18+
module Async
19+
# @namespace
20+
module Container
21+
# @namespace
22+
module Supervisor
23+
end
24+
end
25+
end

lib/async/container/supervisor/memory_monitor.rb

Lines changed: 44 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -10,25 +10,46 @@ module Async
1010
module Container
1111
module Supervisor
1212
class MemoryMonitor
13-
def initialize(interval: 10, limit: nil, &block)
13+
# Create a new memory monitor.
14+
#
15+
# @parameter interval [Integer] The interval at which to check for memory leaks.
16+
# @parameter limit [Integer] The limit of memory that a process can consume before being killed.
17+
def initialize(interval: 10)
1418
@interval = interval
1519
@cluster = Memory::Leak::Cluster.new(limit: limit)
1620
@processes = Hash.new{|hash, key| hash[key] = Set.new.compare_by_identity}
1721
end
1822

23+
# Add a process to the memory monitor. You may override this to control how processes are added to the cluster.
24+
#
25+
# @parameter process_id [Integer] The process ID to add.
26+
def add(process_id)
27+
@cluster.add(process_id)
28+
end
29+
30+
# Remove a process from the memory monitor.
31+
#
32+
# @parameter process_id [Integer] The process ID to remove.
33+
def remove(process_id)
34+
@cluster.remove(process_id)
35+
end
36+
37+
# Register the connection (worker) with the memory monitor.
1938
def register(connection)
39+
Console.info(self, "Registering connection:", connection: connection, state: connection.state)
2040
if process_id = connection.state[:process_id]
2141
connections = @processes[process_id]
2242

2343
if connections.empty?
2444
Console.info(self, "Registering process:", process_id: process_id)
25-
@cluster.add(process_id)
45+
self.add(process_id)
2646
end
2747

2848
connections.add(connection)
2949
end
3050
end
3151

52+
# Remove the connection (worker) from the memory monitor.
3253
def remove(connection)
3354
if process_id = connection.state[:process_id]
3455
connections = @processes[process_id]
@@ -37,33 +58,40 @@ def remove(connection)
3758

3859
if connections.empty?
3960
Console.info(self, "Removing process:", process_id: process_id)
40-
@cluster.remove(process_id)
61+
self.remove(process_id)
4162
end
4263
end
4364
end
4465

66+
# Dump the current status of the memory monitor.
67+
#
68+
# @parameter call [Connection::Call] The call to respond to.
4569
def status(call)
4670
call.push(memory_monitor: @cluster)
4771
end
4872

73+
# Invoked when a memory leak is detected.
74+
#
75+
# @parameter process_id [Integer] The process ID of the process that has a memory leak.
76+
# @parameter monitor [Memory::Leak::Monitor] The monitor that detected the memory leak.
77+
# @returns [Boolean] True if the process was killed.
78+
def memory_leak_detected(process_id, monitor)
79+
Console.info(self, "Killing process:", process_id: process_id)
80+
Process.kill(:INT, process_id)
81+
82+
true
83+
end
84+
85+
# Run the memory monitor.
86+
#
87+
# @returns [Async::Task] The task that is running the memory monitor.
4988
def run
5089
Async do
5190
while true
91+
# This block must return true if the process was killed.
5292
@cluster.check! do |process_id, monitor|
5393
Console.error(self, "Memory leak detected in process:", process_id: process_id, monitor: monitor)
54-
connections = @processes[process_id]
55-
56-
connections.each do |connection|
57-
path = "/tmp/memory_dump_#{process_id}.json"
58-
59-
response = connection.call(do: :memory_dump, path: path, timeout: 30)
60-
Console.info(self, "Memory dump saved to:", path, response: response)
61-
@block.call(response) if @block
62-
end
63-
64-
# Kill the process:
65-
Console.info(self, "Killing process:", process_id: process_id)
66-
Process.kill(:INT, process_id)
94+
memory_leak_detected(process_id, monitor)
6795
end
6896

6997
sleep(@interval)

0 commit comments

Comments
 (0)