Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[indexer_autoscaler_lambda] pause indexing if the available storage drops below a threshold #14

Merged
merged 8 commits into from
Nov 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ These libraries wrap the the core ElasticGraph libraries so that they can be dep
graph LR;
elasticgraph-admin_lambda --> rake & elasticgraph-admin & elasticgraph-lambda_support
elasticgraph-graphql_lambda --> elasticgraph-graphql & elasticgraph-lambda_support
elasticgraph-indexer_autoscaler_lambda --> elasticgraph-datastore_core & elasticgraph-lambda_support & aws-sdk-lambda & aws-sdk-sqs & ox
elasticgraph-indexer_autoscaler_lambda --> elasticgraph-datastore_core & elasticgraph-lambda_support & aws-sdk-lambda & aws-sdk-sqs & aws-sdk-cloudwatch & ox
elasticgraph-indexer_lambda --> elasticgraph-indexer & elasticgraph-lambda_support & aws-sdk-s3 & ox
elasticgraph-lambda_support --> elasticgraph-opensearch & faraday_middleware-aws-sigv4
style elasticgraph-admin_lambda color: DodgerBlue;
Expand All @@ -112,11 +112,13 @@ graph LR;
style elasticgraph-datastore_core color: Green;
style aws-sdk-lambda color: Red;
style aws-sdk-sqs color: Red;
style aws-sdk-cloudwatch color: Red;
style ox color: Red;
style elasticgraph-indexer color: Green;
style aws-sdk-s3 color: Red;
style elasticgraph-opensearch color: Green;
style faraday_middleware-aws-sigv4 color: Red;
click aws-sdk-cloudwatch href "https://rubygems.org/gems/aws-sdk-cloudwatch"
click aws-sdk-lambda href "https://rubygems.org/gems/aws-sdk-lambda"
click aws-sdk-s3 href "https://rubygems.org/gems/aws-sdk-s3"
click aws-sdk-sqs href "https://rubygems.org/gems/aws-sdk-sqs"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ ElasticGraphGemspecHelper.define_elasticgraph_gem(gemspec_file: __FILE__, catego

spec.add_dependency "aws-sdk-lambda", "~> 1.125"
spec.add_dependency "aws-sdk-sqs", "~> 1.80"
spec.add_dependency "aws-sdk-cloudwatch", "~> 1.104"

# aws-sdk-sqs requires an XML library be available. On Ruby < 3 it'll use rexml from the standard library but on Ruby 3.0+
# we have to add an explicit dependency. It supports ox, oga, libxml, nokogiri or rexml, and of those, ox seems to be the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,13 @@ def self.from_parsed_yaml(parsed_yaml, &datastore_client_customization_block)
def initialize(
datastore_core:,
sqs_client: nil,
lambda_client: nil
lambda_client: nil,
cloudwatch_client: nil
)
@datastore_core = datastore_core
@sqs_client = sqs_client
@lambda_client = lambda_client
@cloudwatch_client = cloudwatch_client
end

def sqs_client
Expand All @@ -53,13 +55,21 @@ def lambda_client
end
end

def cloudwatch_client
@cloudwatch_client ||= begin
require "aws-sdk-cloudwatch"
Aws::CloudWatch::Client.new
end
end

def concurrency_scaler
@concurrency_scaler ||= begin
require "elastic_graph/indexer_autoscaler_lambda/concurrency_scaler"
ConcurrencyScaler.new(
datastore_core: @datastore_core,
sqs_client: sqs_client,
lambda_client: lambda_client
lambda_client: lambda_client,
cloudwatch_client: cloudwatch_client
)
end
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,25 @@ module ElasticGraph
class IndexerAutoscalerLambda
# @private
class ConcurrencyScaler
def initialize(datastore_core:, sqs_client:, lambda_client:)
def initialize(datastore_core:, sqs_client:, lambda_client:, cloudwatch_client:)
@logger = datastore_core.logger
@datastore_core = datastore_core
@sqs_client = sqs_client
@lambda_client = lambda_client
@cloudwatch_client = cloudwatch_client
end

MINIMUM_CONCURRENCY = 2

def tune_indexer_concurrency(queue_urls:, min_cpu_target:, max_cpu_target:, maximum_concurrency:, indexer_function_name:)
def tune_indexer_concurrency(
queue_urls:,
min_cpu_target:,
max_cpu_target:,
maximum_concurrency:,
required_free_storage_in_mb:,
indexer_function_name:,
cluster_name:
)
queue_attributes = get_queue_attributes(queue_urls)
queue_arns = queue_attributes.fetch(:queue_arns)
num_messages = queue_attributes.fetch(:total_messages)
Expand All @@ -37,6 +46,8 @@ def tune_indexer_concurrency(queue_urls:, min_cpu_target:, max_cpu_target:, maxi

new_target_concurrency =
if num_messages.positive?
lowest_node_free_storage_in_mb = get_lowest_node_free_storage_in_mb(cluster_name)

cpu_utilization = get_max_cpu_utilization
cpu_midpoint = (max_cpu_target + min_cpu_target) / 2.0

Expand All @@ -45,11 +56,19 @@ def tune_indexer_concurrency(queue_urls:, min_cpu_target:, max_cpu_target:, maxi
if current_concurrency.nil?
details_logger.log_unset
nil
elsif lowest_node_free_storage_in_mb < required_free_storage_in_mb
details_logger.log_pause(
lowest_node_free_storage_in_mb: lowest_node_free_storage_in_mb,
required_free_storage_in_mb: required_free_storage_in_mb
)
MINIMUM_CONCURRENCY
elsif cpu_utilization < min_cpu_target
increase_factor = (cpu_midpoint / cpu_utilization).clamp(0.0, 1.5)
(current_concurrency * increase_factor).round.tap do |new_concurrency|
details_logger.log_increase(
cpu_utilization: cpu_utilization,
lowest_node_free_storage_in_mb: lowest_node_free_storage_in_mb,
required_free_storage_in_mb: required_free_storage_in_mb,
current_concurrency: current_concurrency,
new_concurrency: new_concurrency
)
Expand All @@ -59,20 +78,24 @@ def tune_indexer_concurrency(queue_urls:, min_cpu_target:, max_cpu_target:, maxi
(current_concurrency - (current_concurrency * decrease_factor)).round.tap do |new_concurrency|
details_logger.log_decrease(
cpu_utilization: cpu_utilization,
lowest_node_free_storage_in_mb: lowest_node_free_storage_in_mb,
required_free_storage_in_mb: required_free_storage_in_mb,
current_concurrency: current_concurrency,
new_concurrency: new_concurrency
)
end
else
details_logger.log_no_change(
cpu_utilization: cpu_utilization,
lowest_node_free_storage_in_mb: lowest_node_free_storage_in_mb,
required_free_storage_in_mb: required_free_storage_in_mb,
current_concurrency: current_concurrency
)
current_concurrency
end
else
details_logger.log_reset
0
MINIMUM_CONCURRENCY
myronmarston marked this conversation as resolved.
Show resolved Hide resolved
end

if new_target_concurrency && new_target_concurrency != current_concurrency
Expand All @@ -94,6 +117,22 @@ def get_max_cpu_utilization
end.max.to_f
end

def get_lowest_node_free_storage_in_mb(cluster_name)
metric_response = @cloudwatch_client.get_metric_data({
start_time: ::Time.now - 1200, # past 20 minutes
end_time: ::Time.now,
metric_data_queries: [
{
id: "minFreeStorageAcrossNodes",
expression: "SEARCH('{AWS/ES,ClientId,DomainName} MetricName=\"FreeStorageSpace\" AND DomainName=\"#{cluster_name}\"', 'Minimum', 60)",
return_data: true
}
]
})

metric_response.metric_data_results.first.values.first
myronmarston marked this conversation as resolved.
Show resolved Hide resolved
end

def get_queue_attributes(queue_urls)
attributes_per_queue = queue_urls.map do |queue_url|
@sqs_client.get_queue_attributes(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,32 +30,46 @@ def initialize(
}
end

def log_increase(cpu_utilization:, current_concurrency:, new_concurrency:)
def log_increase(cpu_utilization:, lowest_node_free_storage_in_mb:, required_free_storage_in_mb:, current_concurrency:, new_concurrency:)
log_result({
"action" => "increase",
"cpu_utilization" => cpu_utilization,
"lowest_node_free_storage_in_mb" => lowest_node_free_storage_in_mb,
"required_free_storage_in_mb" => required_free_storage_in_mb,
"current_concurrency" => current_concurrency,
"new_concurrency" => new_concurrency
})
end

def log_decrease(cpu_utilization:, current_concurrency:, new_concurrency:)
def log_decrease(cpu_utilization:, lowest_node_free_storage_in_mb:, required_free_storage_in_mb:, current_concurrency:, new_concurrency:)
log_result({
"action" => "decrease",
"cpu_utilization" => cpu_utilization,
"lowest_node_free_storage_in_mb" => lowest_node_free_storage_in_mb,
"required_free_storage_in_mb" => required_free_storage_in_mb,
"current_concurrency" => current_concurrency,
"new_concurrency" => new_concurrency
})
end

def log_no_change(cpu_utilization:, current_concurrency:)
def log_no_change(cpu_utilization:, lowest_node_free_storage_in_mb:, required_free_storage_in_mb:, current_concurrency:)
log_result({
"action" => "no_change",
"cpu_utilization" => cpu_utilization,
"lowest_node_free_storage_in_mb" => lowest_node_free_storage_in_mb,
"required_free_storage_in_mb" => required_free_storage_in_mb,
"current_concurrency" => current_concurrency
})
end

def log_pause(lowest_node_free_storage_in_mb:, required_free_storage_in_mb:)
log_result({
"action" => "pause",
"lowest_node_free_storage_in_mb" => lowest_node_free_storage_in_mb,
"required_free_storage_in_mb" => required_free_storage_in_mb
})
end

def log_reset
log_result({"action" => "reset"})
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ def handle_request(event:, context:)
min_cpu_target: event.fetch("min_cpu_target"),
max_cpu_target: event.fetch("max_cpu_target"),
maximum_concurrency: event.fetch("maximum_concurrency"),
indexer_function_name: event.fetch("indexer_function_name")
required_free_storage_in_mb: event.fetch("required_free_storage_in_mb"),
indexer_function_name: event.fetch("indexer_function_name"),
cluster_name: event.fetch("cluster_name")
)
end
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ module ElasticGraph
def initialize: (
datastore_core: DatastoreCore,
sqs_client: Aws::SQS::Client,
lambda_client: Aws::Lambda::Client
lambda_client: Aws::Lambda::Client,
cloudwatch_client: Aws::CloudWatch::Client
) -> void

MINIMUM_CONCURRENCY: ::Integer
Expand All @@ -14,7 +15,9 @@ module ElasticGraph
min_cpu_target: ::Integer,
max_cpu_target: ::Integer,
maximum_concurrency: ::Integer,
indexer_function_name: ::String
required_free_storage_in_mb: ::Integer,
indexer_function_name: ::String,
cluster_name: ::String
) -> void

private
Expand All @@ -23,8 +26,10 @@ module ElasticGraph
@datastore_core: DatastoreCore
@sqs_client: Aws::SQS::Client
@lambda_client: Aws::Lambda::Client
@cloudwatch_client: Aws::CloudWatch::Client

def get_max_cpu_utilization: () -> ::Float
def get_lowest_node_free_storage_in_mb: (::String) -> ::Float
def get_queue_attributes: (::Array[::String]) -> { total_messages: ::Integer, queue_arns: ::Array[::String] }
def get_concurrency: (::String) -> ::Integer?

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,26 +7,37 @@ module ElasticGraph
queue_urls: ::Array[::String],
min_cpu_target: ::Integer,
max_cpu_target: ::Integer,
num_messages: ::Integer,
num_messages: ::Integer
) -> void

def log_increase: (
cpu_utilization: ::Float,
lowest_node_free_storage_in_mb: ::Float,
required_free_storage_in_mb: ::Integer,
current_concurrency: ::Integer,
new_concurrency: ::Integer
) -> void

def log_decrease: (
cpu_utilization: ::Float,
lowest_node_free_storage_in_mb: ::Float,
required_free_storage_in_mb: ::Integer,
current_concurrency: ::Integer,
new_concurrency: ::Integer
) -> void

def log_no_change: (
cpu_utilization: ::Float,
lowest_node_free_storage_in_mb: ::Float,
required_free_storage_in_mb: ::Integer,
current_concurrency: ::Integer
) -> void

def log_pause: (
lowest_node_free_storage_in_mb: ::Float,
required_free_storage_in_mb: ::Integer
) -> void

def log_reset: () -> void

def log_unset: () -> void
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ module ElasticGraph
datastore_core: DatastoreCore,
?sqs_client: Aws::SQS::Client?,
?lambda_client: Aws::Lambda::Client?,
?cloudwatch_client: Aws::CloudWatch::Client?,
) -> void

@sqs_client: Aws::SQS::Client?
Expand All @@ -19,6 +20,9 @@ module ElasticGraph
@lambda_client: Aws::Lambda::Client?
def lambda_client: () -> Aws::Lambda::Client

@cloudwatch_client: Aws::CloudWatch::Client?
def cloudwatch_client: () -> Aws::CloudWatch::Client

@concurrency_scaler: ConcurrencyScaler?
def concurrency_scaler: () -> ConcurrencyScaler
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ module BuildsIndexerAutoscalerLambda
def build_indexer_autoscaler(
sqs_client: nil,
lambda_client: nil,
cloudwatch_client: nil,
**datastore_core_options,
&customize_datastore_config
)
Expand All @@ -28,6 +29,7 @@ def build_indexer_autoscaler(
IndexerAutoscalerLambda.new(
sqs_client: sqs_client,
lambda_client: lambda_client,
cloudwatch_client: cloudwatch_client,
datastore_core: datastore_core
)
end
Expand Down
Loading