Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[indexer_autoscaler_lambda] pause indexing if the available storage drops below a threshold #14

Merged
merged 8 commits into from
Nov 12, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
incorporate free storage metrics into autoscaler tuning
akumar1214 committed Nov 6, 2024
commit 9464bd89035d27ca6c6019de82711f51c24b1e53
Original file line number Diff line number Diff line change
@@ -9,6 +9,7 @@ module ElasticGraph

def get_cluster_health: () -> ::Hash[::String, untyped]
def get_node_os_stats: () -> ::Hash[::String, untyped]
def get_node_os_roles: () -> ::Hash[::String, untyped]
def get_flat_cluster_settings: () -> ::Hash[::String, untyped]
def put_persistent_cluster_settings: (::Hash[::Symbol | ::String, untyped]) -> void

Original file line number Diff line number Diff line change
@@ -72,6 +72,10 @@ def get_cluster_health
def get_node_os_stats
transform_errors { |c| c.nodes.stats(metric: "os").body }
end

def get_node_roles
transform_errors { |c| c.nodes.stats(metric: "roles").body }
end

def get_flat_cluster_settings
transform_errors { |c| c.cluster.get_settings(flat_settings: true).body }
Original file line number Diff line number Diff line change
@@ -24,6 +24,8 @@ def define_stubs(stub, requested_stubs)
stub.get("/_cluster/health") { |env| response_for(body, env) }
in :get_node_os_stats
stub.get("/_nodes/stats/os") { |env| response_for(body, env) }
in :get_node_roles
stub.get("/_nodes/stats/roles") { |env| response_for(body, env) }
in :get_flat_cluster_settings
stub.get("/_cluster/settings?flat_settings=true") { |env| response_for(body, env) }
in :put_persistent_cluster_settings
Original file line number Diff line number Diff line change
@@ -16,6 +16,7 @@ ElasticGraphGemspecHelper.define_elasticgraph_gem(gemspec_file: __FILE__, catego

spec.add_dependency "aws-sdk-lambda", "~> 1.125"
spec.add_dependency "aws-sdk-sqs", "~> 1.80"
spec.add_dependency "aws-sdk-cloudwatch", "~> 1.10"

# aws-sdk-sqs requires an XML library be available. On Ruby < 3 it'll use rexml from the standard library but on Ruby 3.0+
# we have to add an explicit dependency. It supports ox, oga, libxml, nokogiri or rexml, and of those, ox seems to be the
Original file line number Diff line number Diff line change
@@ -32,11 +32,13 @@ def self.from_parsed_yaml(parsed_yaml, &datastore_client_customization_block)
def initialize(
datastore_core:,
sqs_client: nil,
lambda_client: nil
lambda_client: nil,
cloudwatch_client: nil
)
@datastore_core = datastore_core
@sqs_client = sqs_client
@lambda_client = lambda_client
@cloudwatch_client = cloudwatch_client
end

def sqs_client
@@ -53,13 +55,21 @@ def lambda_client
end
end

def cloudwatch_client
@cloudwatch_client ||= begin
require "aws-sdk-cloudwatch"
Aws::CloudWatch::Client.new
end
end

def concurrency_scaler
@concurrency_scaler ||= begin
require "elastic_graph/indexer_autoscaler_lambda/concurrency_scaler"
ConcurrencyScaler.new(
datastore_core: @datastore_core,
sqs_client: sqs_client,
lambda_client: lambda_client
lambda_client: lambda_client,
cloudwatch_client: cloudwatch_client
)
end
end
Original file line number Diff line number Diff line change
@@ -12,16 +12,17 @@ module ElasticGraph
class IndexerAutoscalerLambda
# @private
class ConcurrencyScaler
def initialize(datastore_core:, sqs_client:, lambda_client:)
def initialize(datastore_core:, sqs_client:, lambda_client:, cloudwatch_client:)
@logger = datastore_core.logger
@datastore_core = datastore_core
@sqs_client = sqs_client
@lambda_client = lambda_client
@cloudwatch_client = cloudwatch_client
end

MINIMUM_CONCURRENCY = 2

def tune_indexer_concurrency(queue_urls:, min_cpu_target:, max_cpu_target:, maximum_concurrency:, indexer_function_name:)
def tune_indexer_concurrency(queue_urls:, min_cpu_target:, max_cpu_target:, maximum_concurrency:, minimum_free_storage:, indexer_function_name:)
queue_attributes = get_queue_attributes(queue_urls)
queue_arns = queue_attributes.fetch(:queue_arns)
num_messages = queue_attributes.fetch(:total_messages)
@@ -37,6 +38,8 @@ def tune_indexer_concurrency(queue_urls:, min_cpu_target:, max_cpu_target:, maxi

new_target_concurrency =
if num_messages.positive?
free_storage = get_min_free_storage

cpu_utilization = get_max_cpu_utilization
cpu_midpoint = (max_cpu_target + min_cpu_target) / 2.0

@@ -45,27 +48,33 @@ def tune_indexer_concurrency(queue_urls:, min_cpu_target:, max_cpu_target:, maxi
if current_concurrency.nil?
details_logger.log_unset
nil
elsif free_storage < minimum_free_storage
details_logger.log_pause(free_storage)
0
elsif cpu_utilization < min_cpu_target
increase_factor = (cpu_midpoint / cpu_utilization).clamp(0.0, 1.5)
(current_concurrency * increase_factor).round.tap do |new_concurrency|
details_logger.log_increase(
cpu_utilization: cpu_utilization,
min_free_storage: free_storage,
current_concurrency: current_concurrency,
new_concurrency: new_concurrency
new_concurrency: new_concurrency,
)
end
elsif cpu_utilization > max_cpu_target
decrease_factor = cpu_utilization / cpu_midpoint - 1
(current_concurrency - (current_concurrency * decrease_factor)).round.tap do |new_concurrency|
details_logger.log_decrease(
cpu_utilization: cpu_utilization,
min_free_storage: free_storage,
current_concurrency: current_concurrency,
new_concurrency: new_concurrency
)
end
else
details_logger.log_no_change(
cpu_utilization: cpu_utilization,
min_free_storage: free_storage,
current_concurrency: current_concurrency
)
current_concurrency
@@ -94,6 +103,43 @@ def get_max_cpu_utilization
end.max.to_f
end

def get_min_free_storage
metric_data_queries = get_data_node_ids_by_cluster_name.map(&:first).map do |cluster_name, node_id|
{
id: node_id,
metric_stat: {
metric: {
namespace: 'AWS/ES',
metric_name: 'FreeStorageSpace',
dimensions: [
{ name: 'DomainName', value: cluster_name },
{ name: 'NodeId', value: node_id }
]
},
period: 30, # seconds
stat: 'Minimum'
},
return_data: true
}
end

metric_response = @cloudwatch_client.get_metric_data({
start_time: ::Time.now - 900, # past 15 minutes
end_time: ::Time.now,
metric_data_queries: metric_data_queries
})

metric_response.metric_data_results.map { |result| result.values.first }.min / (1024 * 1024) # result is in bytes
end

def get_data_node_ids_by_cluster_name
@datastore_core.clients_by_name.flat_map do |name, client|
client.get_node_roles.map do |id, roles|
roles["roles"].include?("data") ? { name => id } : nil
end
end.compact
end

def get_queue_attributes(queue_urls)
attributes_per_queue = queue_urls.map do |queue_url|
@sqs_client.get_queue_attributes(
Original file line number Diff line number Diff line change
@@ -30,29 +30,39 @@ def initialize(
}
end

def log_increase(cpu_utilization:, current_concurrency:, new_concurrency:)
def log_increase(cpu_utilization:, min_free_storage:, current_concurrency:, new_concurrency:)
log_result({
"action" => "increase",
"cpu_utilization" => cpu_utilization,
"min_free_storage" => min_free_storage,
"current_concurrency" => current_concurrency,
"new_concurrency" => new_concurrency
})
end

def log_decrease(cpu_utilization:, current_concurrency:, new_concurrency:)
def log_decrease(cpu_utilization:, min_free_storage:, current_concurrency:, new_concurrency:)
log_result({
"action" => "decrease",
"cpu_utilization" => cpu_utilization,
"min_free_storage" => min_free_storage,
"current_concurrency" => current_concurrency,
"new_concurrency" => new_concurrency
})
end

def log_no_change(cpu_utilization:, current_concurrency:)
def log_no_change(cpu_utilization:, min_free_storage:, current_concurrency:)
log_result({
"action" => "no_change",
"cpu_utilization" => cpu_utilization,
"current_concurrency" => current_concurrency
"min_free_storage" => min_free_storage,
"current_concurrency" => current_concurrency,
})
end

def log_pause(min_free_storage)
log_result({
"action" => "pause",
"min_free_storage" => min_free_storage
})
end

Original file line number Diff line number Diff line change
@@ -26,6 +26,7 @@ def handle_request(event:, context:)
min_cpu_target: event.fetch("min_cpu_target"),
max_cpu_target: event.fetch("max_cpu_target"),
maximum_concurrency: event.fetch("maximum_concurrency"),
minimum_free_storage: event.fetch("minimum_free_storage"),
indexer_function_name: event.fetch("indexer_function_name")
)
end
Original file line number Diff line number Diff line change
@@ -4,7 +4,8 @@ module ElasticGraph
def initialize: (
datastore_core: DatastoreCore,
sqs_client: Aws::SQS::Client,
lambda_client: Aws::Lambda::Client
lambda_client: Aws::Lambda::Client,
cloudwatch_client: Aws::CloudWatch::Client
) -> void

MINIMUM_CONCURRENCY: ::Integer
@@ -14,6 +15,7 @@ module ElasticGraph
min_cpu_target: ::Integer,
max_cpu_target: ::Integer,
maximum_concurrency: ::Integer,
minimum_free_storage: ::Integer,
indexer_function_name: ::String
) -> void

@@ -23,8 +25,11 @@ module ElasticGraph
@datastore_core: DatastoreCore
@sqs_client: Aws::SQS::Client
@lambda_client: Aws::Lambda::Client
@cloudwatch_client: Aws::CloudWatch::Client

def get_max_cpu_utilization: () -> ::Float
def get_min_free_storage: () -> ::Float
def get_data_node_ids_by_cluster_name: () -> ::Hash[::String, ::String]
def get_queue_attributes: (::Array[::String]) -> { total_messages: ::Integer, queue_arns: ::Array[::String] }
def get_concurrency: (::String) -> ::Integer?

Original file line number Diff line number Diff line change
@@ -12,21 +12,26 @@ module ElasticGraph

def log_increase: (
cpu_utilization: ::Float,
min_free_storage: ::Float,
current_concurrency: ::Integer,
new_concurrency: ::Integer
) -> void

def log_decrease: (
cpu_utilization: ::Float,
min_free_storage: ::Float,
current_concurrency: ::Integer,
new_concurrency: ::Integer
) -> void

def log_no_change: (
cpu_utilization: ::Float,
min_free_storage: ::Float,
current_concurrency: ::Integer
) -> void

def log_pause: (::String) -> void

def log_reset: () -> void

def log_unset: () -> void
Original file line number Diff line number Diff line change
@@ -11,6 +11,7 @@ module ElasticGraph
datastore_core: DatastoreCore,
?sqs_client: Aws::SQS::Client?,
?lambda_client: Aws::Lambda::Client?,
?cloudwatch_client: Aws::CloudWatch::Client?,
) -> void

@sqs_client: Aws::SQS::Client?
@@ -19,6 +20,9 @@ module ElasticGraph
@lambda_client: Aws::Lambda::Client?
def lambda_client: () -> Aws::Lambda::Client

@cloudwatch_client: Aws::CloudWatch::Client?
def cloudwatch_client: () -> Aws::CloudWatch::Client

@concurrency_scaler: ConcurrencyScaler?
def concurrency_scaler: () -> ConcurrencyScaler
end
Original file line number Diff line number Diff line change
@@ -16,6 +16,7 @@ module BuildsIndexerAutoscalerLambda
def build_indexer_autoscaler(
sqs_client: nil,
lambda_client: nil,
cloudwatch_client: nil,
**datastore_core_options,
&customize_datastore_config
)
@@ -28,6 +29,7 @@ def build_indexer_autoscaler(
IndexerAutoscalerLambda.new(
sqs_client: sqs_client,
lambda_client: lambda_client,
cloudwatch_client: cloudwatch_client,
datastore_core: datastore_core
)
end
Original file line number Diff line number Diff line change
@@ -8,6 +8,7 @@

require "aws-sdk-lambda"
require "aws-sdk-sqs"
require "aws-sdk-cloudwatch"
require "elastic_graph/indexer_autoscaler_lambda/concurrency_scaler"
require "support/builds_indexer_autoscaler"

@@ -22,13 +23,16 @@ class IndexerAutoscalerLambda
let(:max_cpu_target) { 80 }
let(:cpu_midpoint) { 75 }
let(:maximum_concurrency) { 1000 }
let(:minimum_free_storage) { 10000 }

it "1.5x the concurrency when the CPU usage is significantly below the minimum target" do
lambda_client = lambda_client_with_concurrency(200)
cloudwatch_client = cloudwatch_client_with_storage_metrics(minimum_free_storage + 1)
concurrency_scaler = build_concurrency_scaler(
datastore_client: datastore_client_with_cpu_usage(10.0),
sqs_client: sqs_client_with_number_of_messages(1),
lambda_client: lambda_client
lambda_client: lambda_client,
cloudwatch_client: cloudwatch_client
)

tune_indexer_concurrency(concurrency_scaler)
@@ -39,10 +43,12 @@ class IndexerAutoscalerLambda
it "increases concurrency by a factor CPU usage when CPU is slightly below the minimum target" do
# CPU is at 50% and our target range is 70-80. 75 / 50 = 1.5, so increase it by 50%.
lambda_client = lambda_client_with_concurrency(200)
cloudwatch_client = cloudwatch_client_with_storage_metrics(minimum_free_storage + 1)
concurrency_scaler = build_concurrency_scaler(
datastore_client: datastore_client_with_cpu_usage(50.0),
sqs_client: sqs_client_with_number_of_messages(1),
lambda_client: lambda_client
lambda_client: lambda_client,
cloudwatch_client: cloudwatch_client
)

tune_indexer_concurrency(concurrency_scaler)
@@ -53,10 +59,12 @@ class IndexerAutoscalerLambda
it "sets concurrency to the max when it cannot be increased anymore when CPU usage is under the limit" do
current_concurrency = maximum_concurrency - 1
lambda_client = lambda_client_with_concurrency(current_concurrency)
cloudwatch_client = cloudwatch_client_with_storage_metrics(minimum_free_storage + 1)
concurrency_scaler = build_concurrency_scaler(
datastore_client: datastore_client_with_cpu_usage(10),
sqs_client: sqs_client_with_number_of_messages(1),
lambda_client: lambda_client
lambda_client: lambda_client,
cloudwatch_client: cloudwatch_client
)

tune_indexer_concurrency(concurrency_scaler)
@@ -67,10 +75,12 @@ class IndexerAutoscalerLambda
it "decreases concurrency by a factor of the CPU when the CPU usage is over the limit" do
# CPU is at 90% and our target range is 70-80. 90 / 75 = 1.2, so decrease it by 20%.
lambda_client = lambda_client_with_concurrency(500)
cloudwatch_client = cloudwatch_client_with_storage_metrics(minimum_free_storage + 1)
concurrency_scaler = build_concurrency_scaler(
datastore_client: datastore_client_with_cpu_usage(90.0),
sqs_client: sqs_client_with_number_of_messages(1),
lambda_client: lambda_client
lambda_client: lambda_client,
cloudwatch_client: cloudwatch_client
)

tune_indexer_concurrency(concurrency_scaler)
@@ -81,10 +91,12 @@ class IndexerAutoscalerLambda
it "leaves concurrency unchanged when it cannot be decreased anymore when CPU utilization is over the limit" do
current_concurrency = 0
lambda_client = lambda_client_with_concurrency(current_concurrency)
cloudwatch_client = cloudwatch_client_with_storage_metrics(minimum_free_storage + 1)
concurrency_scaler = build_concurrency_scaler(
datastore_client: datastore_client_with_cpu_usage(100),
sqs_client: sqs_client_with_number_of_messages(1),
lambda_client: lambda_client
lambda_client: lambda_client,
cloudwatch_client: cloudwatch_client
)

tune_indexer_concurrency(concurrency_scaler)
@@ -94,11 +106,13 @@ class IndexerAutoscalerLambda

it "does not adjust concurrency when the CPU is within the target range" do
lambda_client = lambda_client_with_concurrency(500)
cloudwatch_client = cloudwatch_client_with_storage_metrics(minimum_free_storage + 1)
[min_cpu_target, cpu_midpoint, max_cpu_target].each do |cpu_usage|
concurrency_scaler = build_concurrency_scaler(
datastore_client: datastore_client_with_cpu_usage(cpu_usage),
sqs_client: sqs_client_with_number_of_messages(1),
lambda_client: lambda_client
lambda_client: lambda_client,
cloudwatch_client: cloudwatch_client
)

tune_indexer_concurrency(concurrency_scaler)
@@ -113,10 +127,12 @@ class IndexerAutoscalerLambda
expect(high_cpu_usage).to be > max_cpu_target

lambda_client = lambda_client_with_concurrency(current_concurrency)
cloudwatch_client = cloudwatch_client_with_storage_metrics(minimum_free_storage + 1)
concurrency_scaler = build_concurrency_scaler(
datastore_client: datastore_client_with_cpu_usage(min_cpu_target, high_cpu_usage),
sqs_client: sqs_client_with_number_of_messages(1),
lambda_client: lambda_client
lambda_client: lambda_client,
cloudwatch_client: cloudwatch_client
)

tune_indexer_concurrency(concurrency_scaler)
@@ -125,13 +141,30 @@ class IndexerAutoscalerLambda
expect(updated_concurrency_requested_from(lambda_client)).to eq [460] # 500 - 8% since 81/75 = 1.08
end

it "resets the concurrency when free storage space drops below the minimum regardless of cpu" do
lambda_client = lambda_client_with_concurrency(500)
cloudwatch_client = cloudwatch_client_with_storage_metrics(minimum_free_storage + 1, minimum_free_storage - 1)
concurrency_scaler = build_concurrency_scaler(
datastore_client: datastore_client_with_cpu_usage(min_cpu_target - 1),
sqs_client: sqs_client_with_number_of_messages(1),
lambda_client: lambda_client,
cloudwatch_client: cloudwatch_client
)

tune_indexer_concurrency(concurrency_scaler)

expect(updated_concurrency_requested_from(lambda_client)).to eq [2] # 2 is the minimum
end

it "sets concurrency to the min when there are no messages in the queue" do
current_concurrency = 500
lambda_client = lambda_client_with_concurrency(current_concurrency)
cloudwatch_client = cloudwatch_client_with_storage_metrics(minimum_free_storage + 1)
concurrency_scaler = build_concurrency_scaler(
datastore_client: datastore_client_with_cpu_usage(min_cpu_target - 1),
sqs_client: sqs_client_with_number_of_messages(0),
lambda_client: lambda_client
lambda_client: lambda_client,
cloudwatch_client: cloudwatch_client
)

tune_indexer_concurrency(concurrency_scaler)
@@ -141,12 +174,14 @@ class IndexerAutoscalerLambda

it "leaves concurrency unset if it is currently unset" do
lambda_client = lambda_client_without_concurrency
cloudwatch_client = cloudwatch_client_with_storage_metrics(minimum_free_storage + 1)

# CPU is at 50% and our target range is 70-80.
concurrency_scaler = build_concurrency_scaler(
datastore_client: datastore_client_with_cpu_usage(50),
sqs_client: sqs_client_with_number_of_messages(1),
lambda_client: lambda_client
lambda_client: lambda_client,
cloudwatch_client: cloudwatch_client
)

tune_indexer_concurrency(concurrency_scaler)
@@ -165,24 +200,34 @@ def updated_concurrency_requested_from(lambda_client)
end

def datastore_client_with_cpu_usage(percent, percent2 = percent)
stubbed_datastore_client(get_node_os_stats: {
"nodes" => {
"node1" => {
"os" => {
"cpu" => {
"percent" => percent
stubbed_datastore_client(
get_node_os_stats: {
"nodes" => {
"node1" => {
"os" => {
"cpu" => {
"percent" => percent
}
}
},
"node2" => {
"os" => {
"cpu" => {
"percent" => percent2
}
}
}
}
},
get_node_roles: {
"node1" => {
"roles" => ["data"]
},
"node2" => {
"os" => {
"cpu" => {
"percent" => percent2
}
}
"roles" => ["data"]
}
}
})
)
end

def sqs_client_with_number_of_messages(num_messages)
@@ -204,6 +249,26 @@ def lambda_client_with_concurrency(concurrency)
end
end

def cloudwatch_client_with_storage_metrics(free_storage, free_storage2 = free_storage)
::Aws::CloudWatch::Client.new(stub_responses: true).tap do |cloudwatch_client|
cloudwatch_client.stub_responses(:get_metric_data, {
# return values are in bytes
metric_data_results: [
{
id: "node1",
values: [(free_storage * 1024 * 1024).to_f],
timestamps: [::Time.parse("2024-10-30T12:00:00Z")]
},
{
id: "node2",
values: [(free_storage2 * 1024 * 1024).to_f],
timestamps: [::Time.parse("2024-10-30T12:00:00Z")]
}
]
})
end
end

# If the lambda is using unreserved concurrency, reserved_concurrent_executions on the Lambda client will be nil.
def lambda_client_without_concurrency
::Aws::Lambda::Client.new(stub_responses: true).tap do |lambda_client|
@@ -213,11 +278,12 @@ def lambda_client_without_concurrency
end
end

def build_concurrency_scaler(datastore_client:, sqs_client:, lambda_client:)
def build_concurrency_scaler(datastore_client:, sqs_client:, lambda_client:, cloudwatch_client:)
build_indexer_autoscaler(
clients_by_name: {"main" => datastore_client},
sqs_client: sqs_client,
lambda_client: lambda_client
lambda_client: lambda_client,
cloudwatch_client: cloudwatch_client
).concurrency_scaler
end

@@ -227,6 +293,7 @@ def tune_indexer_concurrency(concurrency_scaler)
min_cpu_target: min_cpu_target,
max_cpu_target: max_cpu_target,
maximum_concurrency: maximum_concurrency,
minimum_free_storage: minimum_free_storage,
indexer_function_name: indexer_function_name
)
end
Original file line number Diff line number Diff line change
@@ -37,6 +37,7 @@
"min_cpu_target" => 70,
"max_cpu_target" => 80,
"maximum_concurrency" => 1000,
"minimum_free_storage" => 100,
"indexer_function_name" => "some-eg-app-indexer"
}
lambda_function.handle_request(event: event, context: {})
Original file line number Diff line number Diff line change
@@ -75,6 +75,10 @@ def get_node_os_stats
transform_errors { |c| c.nodes.stats(metric: "os") }
end

def get_node_roles
transform_errors { |c| c.nodes.stats(metric: "roles") }
end

def get_flat_cluster_settings
transform_errors { |c| c.cluster.get_settings(flat_settings: true) }
end
Original file line number Diff line number Diff line change
@@ -47,6 +47,8 @@ def define_stubs(stub, requested_stubs)
stub.get("/_cluster/health") { |env| response_for(body, env) }
in :get_node_os_stats
stub.get("/_nodes/stats/os") { |env| response_for(body, env) }
in :get_node_roles
stub.get("/_nodes/stats/roles") { |env| response_for(body, env) }
in :get_flat_cluster_settings
stub.get("/_cluster/settings?flat_settings=true") { |env| response_for(body, env) }
in :put_persistent_cluster_settings
Original file line number Diff line number Diff line change
@@ -41,6 +41,12 @@ module ElasticGraph
expect(client.get_node_os_stats).to eq "Node stats"
end

it "supports `get_node_roles`" do
client = build_client({get_node_roles: "Node roles"})

expect(client.get_node_roles).to eq "Node roles"
end

it "supports `get_flat_cluster_settings`" do
client = build_client({get_flat_cluster_settings: "Flat cluster settings!"})