From 4514313da3bcbf1f230651bc7939cec5e846a227 Mon Sep 17 00:00:00 2001
From: Cas Donoghue <cas.donoghue@gmail.com>
Date: Thu, 23 Jan 2025 13:29:43 -0800
Subject: [PATCH] Properly handle 413 Payload Too Large errors (#1199)
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

* Properly handle `413` Payload Too Large errors

Previously when Elasticsearch responds with a 413 (Payload Too Large) status,
the manticore adapter raises an error before the response can be processed
by the bulk_send error handling. This commit refactors the way
`BadErrorResponse` codes are handled. Previously we had logic in the manticore
adaptor which special cased raising errors on some codes. This commit refactors
such that the adaptor raises on any error status and the caller is now
responsible for special case handling the code.

* 12.0.2 release prep

* Use `error_code` instead of `code` when handling BadResponseCodeError

Previously a few bugs spotted in code review were being obfuscated by the
combinations of tests not running in CI and the incorrect method for retrieving
a code from a BadResponseCodeError. This commit updates the method names and
addresses the feedback from code review.

---------

Co-authored-by: João Duarte <jsvduarte@gmail.com>
---
 CHANGELOG.md                                  |  2 +
 docs/index.asciidoc                           | 17 ++++++-
 .../outputs/elasticsearch/http_client.rb      | 49 ++++++++++---------
 .../http_client/manticore_adapter.rb          |  5 +-
 .../outputs/elasticsearch/http_client/pool.rb | 24 ++++-----
 logstash-output-elasticsearch.gemspec         |  2 +-
 spec/unit/outputs/elasticsearch_spec.rb       |  7 ++-
 7 files changed, 61 insertions(+), 45 deletions(-)

diff --git a/CHANGELOG.md b/CHANGELOG.md
index c9d9ae6f..1a46b00a 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,3 +1,5 @@
+## 11.22.12
+ - Properly handle http code 413 (Payload Too Large) [#1199](https://github.com/logstash-plugins/logstash-output-elasticsearch/pull/1199)
 ## 11.22.11
  - Remove irrelevant log warning about elastic stack version [#1202](https://github.com/logstash-plugins/logstash-output-elasticsearch/pull/1202)
 
diff --git a/docs/index.asciidoc b/docs/index.asciidoc
index 514722a3..07f7efdc 100644
--- a/docs/index.asciidoc
+++ b/docs/index.asciidoc
@@ -196,7 +196,22 @@ This plugin uses the Elasticsearch bulk API to optimize its imports into Elastic
 either partial or total failures. The bulk API sends batches of requests to an HTTP endpoint. Error codes for the HTTP
 request are handled differently than error codes for individual documents.
 
-HTTP requests to the bulk API are expected to return a 200 response code. All other response codes are retried indefinitely.
+
+HTTP requests to the bulk API are expected to return a 200 response code. All other response codes are retried indefinitely,
+including 413 (Payload Too Large) responses.
+
+If you want to handle large payloads differently, you can configure 413 responses to go to the Dead Letter Queue instead:
+
+[source,ruby]
+-----
+output {
+  elasticsearch {
+    hosts => ["localhost:9200"]
+    dlq_custom_codes => [413]  # Send 413 errors to DLQ instead of retrying
+  }
+-----
+
+This will capture oversized payloads in the DLQ for analysis rather than retrying them.
 
 The following document errors are handled as follows:
 
diff --git a/lib/logstash/outputs/elasticsearch/http_client.rb b/lib/logstash/outputs/elasticsearch/http_client.rb
index e0b70e36..120d3e67 100644
--- a/lib/logstash/outputs/elasticsearch/http_client.rb
+++ b/lib/logstash/outputs/elasticsearch/http_client.rb
@@ -182,22 +182,20 @@ def join_bulk_responses(bulk_responses)
     def bulk_send(body_stream, batch_actions)
       params = compression_level? ? {:headers => {"Content-Encoding" => "gzip"}} : {}
 
-      response = @pool.post(@bulk_path, params, body_stream.string)
-
-      @bulk_response_metrics.increment(response.code.to_s)
-
-      case response.code
-      when 200 # OK
-        LogStash::Json.load(response.body)
-      when 413 # Payload Too Large
+      begin
+        response = @pool.post(@bulk_path, params, body_stream.string)
+        @bulk_response_metrics.increment(response.code.to_s)
+      rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e
+        @bulk_response_metrics.increment(e.response_code.to_s)
+        raise e unless e.response_code == 413
+        # special handling for 413, treat it as a document level issue
         logger.warn("Bulk request rejected: `413 Payload Too Large`", :action_count => batch_actions.size, :content_length => body_stream.size)
-        emulate_batch_error_response(batch_actions, response.code, 'payload_too_large')
-      else
-        url = ::LogStash::Util::SafeURI.new(response.final_url)
-        raise ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError.new(
-          response.code, url, body_stream.to_s, response.body
-        )
+        return emulate_batch_error_response(batch_actions, 413, 'payload_too_large')
+      rescue => e # it may be a network issue instead, re-raise
+        raise e
       end
+
+      LogStash::Json.load(response.body)
     end
 
     def emulate_batch_error_response(actions, http_code, reason)
@@ -411,6 +409,9 @@ def host_to_url(h)
     def exists?(path, use_get=false)
       response = use_get ? @pool.get(path) : @pool.head(path)
       response.code >= 200 && response.code <= 299
+    rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e
+      return false if e.response_code == 404
+      raise e
     end
 
     def template_exists?(template_endpoint, name)
@@ -421,6 +422,8 @@ def template_put(template_endpoint, name, template)
       path = "#{template_endpoint}/#{name}"
       logger.info("Installing Elasticsearch template", name: name)
       @pool.put(path, nil, LogStash::Json.dump(template))
+    rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e
+      raise e unless e.response_code == 404
     end
 
     # ILM methods
@@ -432,17 +435,15 @@ def rollover_alias_exists?(name)
 
     # Create a new rollover alias
     def rollover_alias_put(alias_name, alias_definition)
-      begin
-        @pool.put(CGI::escape(alias_name), nil, LogStash::Json.dump(alias_definition))
-        logger.info("Created rollover alias", name: alias_name)
-        # If the rollover alias already exists, ignore the error that comes back from Elasticsearch
-      rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e
-        if e.response_code == 400
-            logger.info("Rollover alias already exists, skipping", name: alias_name)
-            return
-        end
-        raise e
+      @pool.put(CGI::escape(alias_name), nil, LogStash::Json.dump(alias_definition))
+      logger.info("Created rollover alias", name: alias_name)
+      # If the rollover alias already exists, ignore the error that comes back from Elasticsearch
+    rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e
+      if e.response_code == 400
+        logger.info("Rollover alias already exists, skipping", name: alias_name)
+        return
       end
+      raise e
     end
 
     def get_xpack_info
diff --git a/lib/logstash/outputs/elasticsearch/http_client/manticore_adapter.rb b/lib/logstash/outputs/elasticsearch/http_client/manticore_adapter.rb
index c9e49ec7..11f85b53 100644
--- a/lib/logstash/outputs/elasticsearch/http_client/manticore_adapter.rb
+++ b/lib/logstash/outputs/elasticsearch/http_client/manticore_adapter.rb
@@ -76,11 +76,8 @@ def perform_request(url, method, path, params={}, body=nil)
         raise ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::HostUnreachableError.new(e, request_uri_as_string)
       end
 
-      # 404s are excluded because they are valid codes in the case of
-      # template installation. We might need a better story around this later
-      # but for our current purposes this is correct
       code = resp.code
-      if code < 200 || code > 299 && code != 404
+      if code < 200 || code > 299 # assume anything not 2xx is an error that the layer above needs to interpret
         raise ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError.new(code, request_uri, body, resp.body)
       end
 
diff --git a/lib/logstash/outputs/elasticsearch/http_client/pool.rb b/lib/logstash/outputs/elasticsearch/http_client/pool.rb
index 68715066..1ef9d0f9 100644
--- a/lib/logstash/outputs/elasticsearch/http_client/pool.rb
+++ b/lib/logstash/outputs/elasticsearch/http_client/pool.rb
@@ -253,13 +253,11 @@ def get_license(url)
     def health_check_request(url)
       logger.debug("Running health check to see if an Elasticsearch connection is working",
                    :healthcheck_url => url.sanitized.to_s, :path => @healthcheck_path)
-      begin
-        response = perform_request_to_url(url, :head, @healthcheck_path)
-        return response, nil
-      rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e
-        logger.warn("Health check failed", code: e.response_code, url: e.url, message: e.message)
-        return nil, e
-      end
+      response = perform_request_to_url(url, :head, @healthcheck_path)
+      return response, nil
+    rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e
+      logger.warn("Health check failed", code: e.response_code, url: e.url, message: e.message)
+      return nil, e
     end
 
     def healthcheck!(register_phase = true)
@@ -312,13 +310,11 @@ def healthcheck!(register_phase = true)
     end
 
     def get_root_path(url, params={})
-      begin
-        resp = perform_request_to_url(url, :get, ROOT_URI_PATH, params)
-        return resp, nil
-      rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e
-        logger.warn("Elasticsearch main endpoint returns #{e.response_code}", message: e.message, body: e.response_body)
-        return nil, e
-      end
+      resp = perform_request_to_url(url, :get, ROOT_URI_PATH, params)
+      return resp, nil
+    rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e
+      logger.warn("Elasticsearch main endpoint returns #{e.response_code}", message: e.message, body: e.response_body)
+      return nil, e
     end
 
     def test_serverless_connection(url, root_response)
diff --git a/logstash-output-elasticsearch.gemspec b/logstash-output-elasticsearch.gemspec
index e8d5f858..1909085c 100644
--- a/logstash-output-elasticsearch.gemspec
+++ b/logstash-output-elasticsearch.gemspec
@@ -1,6 +1,6 @@
 Gem::Specification.new do |s|
   s.name            = 'logstash-output-elasticsearch'
-  s.version         = '11.22.11'
+  s.version         = '11.22.12'
   s.licenses        = ['apache-2.0']
   s.summary         = "Stores logs in Elasticsearch"
   s.description     = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"
diff --git a/spec/unit/outputs/elasticsearch_spec.rb b/spec/unit/outputs/elasticsearch_spec.rb
index b71e8720..35c90804 100644
--- a/spec/unit/outputs/elasticsearch_spec.rb
+++ b/spec/unit/outputs/elasticsearch_spec.rb
@@ -915,7 +915,12 @@
       allow(elasticsearch_output_instance.client.pool).to receive(:post) do |path, params, body|
         if body.length > max_bytes
           max_bytes *= 2 # ensure a successful retry
-          double("Response", :code => 413, :body => "")
+          raise ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError.new(
+            413,
+            "test-url",
+            body,
+            ""
+          )
         else
           double("Response", :code => 200, :body => '{"errors":false,"items":[{"index":{"status":200,"result":"created"}}]}')
         end