Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 0 additions & 12 deletions .github/workflows/issue-auto-closer.yml

This file was deleted.

1 change: 0 additions & 1 deletion .github/workflows/linux.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
name: Testing on Ubuntu
on:
- push
- pull_request
permissions:
contents: read
Expand Down
29 changes: 0 additions & 29 deletions .github/workflows/macos.yml

This file was deleted.

29 changes: 0 additions & 29 deletions .github/workflows/windows.yml

This file was deleted.

77 changes: 77 additions & 0 deletions README.compat.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
# Compat

## Mixed Elasticsearch Version Environments

### Compatibility Matrix

| Elasticsearch Gem | ES 7.x Server | ES 8.x Server | ES 9.x Server |
|-------------------|---------------|---------------|---------------|
| gem 7.17.x | ✅ Works | ✅ Works | ❌ No |
| gem 8.15.x | ✅ Works | ✅ Works | ❌ No |
| gem 9.1.x + patch | ✅ With config| ✅ With config| ✅ Works |

### Configuration Options

#### force_content_type

Type: String
Default: nil (auto-detect)
Valid values: "application/json", "application/x-ndjson"
Manually override the Content-Type header. Required when using elasticsearch gem 9.x with ES 7.x or 8.x servers.

```ruby
<match **>
@type elasticsearch
force_content_type application/json
</match>
```

#### ignore_version_content_type_mismatch

Type: Bool
Default: false
Automatically fallback to application/json if Content-Type version mismatch occurs. Enables seamless operation across mixed ES 7/8/9 environments.

Example:

```ruby
<match **>
@type elasticsearch
force_content_type application/json
ignore_version_content_type_mismath true
</match>
```

### Recommended Configuration

#### For ES 7/8 environments (Recommended)

Use elasticsearch gem 8.x - works with both versions, no configuration needed:

```ruby
# Gemfile
gem 'elasticsearch', '~> 8.15.0'

# fluent.conf
<match **>
@type elasticsearch
hosts es7:9200,es8:9200
# No special config needed!
</match>
```

#### For gem 9.x with ES 7/8 (Not recommended, but supported)

```ruby
# Gemfile
gem 'elasticsearch', '~> 9.1.0'

# fluent.conf
<match **>
@type elasticsearch
hosts es7:9200,es8:9200

# REQUIRED: Force compatible content type
force_content_type application/json
</match>
```
42 changes: 42 additions & 0 deletions lib/fluent/plugin/elasticsearch_api_bugfix.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
# Elasticsearch API 9.x Compatibility Patch
#
# Fixes crashes in elasticsearch-api gem 9.1.2 when connecting to ES 7.x/8.x servers.
#
# Bug: The gem expects ES 9 headers and crashes with NoMethodError when they're nil
#
# This patch is only needed if using elasticsearch gem 9.x
# Not needed if using elasticsearch gem 7.x or 8.x

require 'elasticsearch/api'

module Elasticsearch
module API
module Utils
class << self
if method_defined?(:update_ndjson_headers!)
alias_method :original_update_ndjson_headers!, :update_ndjson_headers!

def update_ndjson_headers!(headers, client_headers)
return headers unless client_headers.is_a?(Hash)

current_content = client_headers.keys.find { |c| c.to_s.match?(/content[-_]?type/i) }
return headers unless current_content

content_value = client_headers[current_content]
return headers unless content_value

# ES 7/8 compatibility: Only process ES9-specific headers
# If no "compatible-with" present, this is ES 7/8 format
return headers unless content_value.to_s.include?('compatible-with')

# ES 9 detected, safe to call original
original_update_ndjson_headers!(headers, client_headers)
rescue StandardError => e
warn "[elasticsearch-api-compat] Failed to update headers: #{e.class} - #{e.message}"
headers
end
end
end
end
end
end
36 changes: 33 additions & 3 deletions lib/fluent/plugin/out_elasticsearch.rb
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
require_relative 'elasticsearch_index_lifecycle_management'
require_relative 'elasticsearch_tls'
require_relative 'elasticsearch_fallback_selector'
require_relative 'elasticsearch_api_bugfix'
begin
require_relative 'oj_serializer'
rescue LoadError
Expand Down Expand Up @@ -177,6 +178,12 @@ def initialize(retry_stream)
config_param :use_legacy_template, :bool, :default => true
config_param :catch_transport_exception_on_retry, :bool, :default => true
config_param :target_index_affinity, :bool, :default => false
config_param :force_content_type, :string, :default => nil,
:desc => "Force specific Content-Type header (e.g., 'application/json' or 'application/x-ndjson'). " \
"Overrides automatic version detection. Useful for mixed ES environments."
config_param :ignore_version_content_type_mismatch, :bool, :default => false,
:desc => "Automatically fallback to application/json if Content-Type version mismatch occurs. " \
"Enables seamless operation across mixed ES 7/8/9 environments."

config_section :metadata, param_name: :metainfo, multi: false do
config_param :include_chunk_id, :bool, :default => false
Expand Down Expand Up @@ -361,14 +368,31 @@ class << self
@type_name = nil
end
@accept_type = nil
if @content_type != ES9_CONTENT_TYPE

# Only set ES9 content type if not overridden and mismatch handling is not enabled
if @content_type.to_s != ES9_CONTENT_TYPE && !@ignore_version_content_type_mismatch
log.trace "Detected ES 9.x or above: Content-Type will be adjusted."
@content_type = ES9_CONTENT_TYPE
@accept_type = ES9_CONTENT_TYPE
elsif @ignore_version_content_type_mismatch
log.info "Ignoring ES version for Content-Type, using application/json for compatibility"
@content_type = :'application/json'
@accept_type = nil
end
end
end

if @content_type.nil?
log.warn "content_type was nil, defaulting to application/json"
@content_type = :'application/json'
end

if @force_content_type
log.info "Forcing Content-Type to: #{@force_content_type}"
@content_type = @force_content_type
@accept_type = nil
end

if @validate_client_version && !dry_run?
if @last_seen_major_version != client_library_version.to_i
raise Fluent::ConfigError, <<-EOC
Expand Down Expand Up @@ -623,11 +647,17 @@ def client(host = nil, compress_connection = false)
else
{}
end
headers = { 'Content-Type' => @content_type.to_s }

content_type_value = @content_type ? @content_type.to_s : 'application/json'
accept_type_value = @accept_type ? @accept_type.to_s : nil
content_type_value = 'application/json' if content_type_value.strip.empty?

headers = { 'Content-Type' => content_type_value }
.merge(@custom_headers)
.merge(@api_key_header)
.merge(gzip_headers)
headers.merge!('Accept' => @accept_type) if @accept_type

headers.merge!('Accept' => accept_type_value) if accept_type_value && !accept_type_value.strip.empty?

ssl_options = { verify: @ssl_verify, ca_file: @ca_file}.merge(@ssl_version_options)

Expand Down
40 changes: 40 additions & 0 deletions test/es-compat/docker-compose.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
services:
elasticsearch7:
image: docker.elastic.co/elasticsearch/elasticsearch:7.17.16
container_name: es7
environment:
- discovery.type=single-node
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
- xpack.security.enabled=false
ports:
- "9207:9200"
networks:
- elastic

elasticsearch8:
image: docker.elastic.co/elasticsearch/elasticsearch:8.12.0
container_name: es8
environment:
- discovery.type=single-node
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
- xpack.security.enabled=false
ports:
- "9208:9200"
networks:
- elastic

elasticsearch9:
image: docker.elastic.co/elasticsearch/elasticsearch:9.1.5
container_name: es9
environment:
- discovery.type=single-node
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
- xpack.security.enabled=false
ports:
- "9209:9200"
networks:
- elastic

networks:
elastic:
driver: bridge
Loading
Loading