Skip to content

Commit caaaf9a

Browse files
committed
Add ability to customize bulk batch size
1 parent d5f1782 commit caaaf9a

File tree

6 files changed

+115
-65
lines changed

6 files changed

+115
-65
lines changed

Diff for: lib/logstash/outputs/elasticsearch.rb

+19-4
Original file line numberDiff line numberDiff line change
@@ -75,12 +75,12 @@
7575
#
7676
# ==== HTTP Compression
7777
#
78-
# This plugin supports request and response compression. Response compression is enabled by default and
79-
# for Elasticsearch versions 5.0 and later, the user doesn't have to set any configs in Elasticsearch for
80-
# it to send back compressed response. For versions before 5.0, `http.compression` must be set to `true` in
78+
# This plugin supports request and response compression. Response compression is enabled by default and
79+
# for Elasticsearch versions 5.0 and later, the user doesn't have to set any configs in Elasticsearch for
80+
# it to send back compressed response. For versions before 5.0, `http.compression` must be set to `true` in
8181
# Elasticsearch[https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-http.html#modules-http] to take advantage of response compression when using this plugin
8282
#
83-
# For requests compression, regardless of the Elasticsearch version, users have to enable `http_compression`
83+
# For requests compression, regardless of the Elasticsearch version, users have to enable `http_compression`
8484
# setting in their Logstash config file.
8585
#
8686
class LogStash::Outputs::ElasticSearch < LogStash::Outputs::Base
@@ -103,6 +103,8 @@ class LogStash::Outputs::ElasticSearch < LogStash::Outputs::Base
103103

104104
config_name "elasticsearch"
105105

106+
DEFAULT_BATCH_SIZE = 20 * 1024 * 1024 # 20MiB
107+
106108
# The Elasticsearch action to perform. Valid actions are:
107109
#
108110
# - index: indexes a document (an event from Logstash).
@@ -242,6 +244,19 @@ class LogStash::Outputs::ElasticSearch < LogStash::Outputs::Base
242244
# Custom Headers to send on each request to elasticsearch nodes
243245
config :custom_headers, :validate => :hash, :default => {}
244246

247+
# Bulk batch size is used to determine at what point to send the bulk requests.
248+
# The criteria used for default value is:
249+
# 1. We need a number that's less than 100MiB because ES
250+
# won't accept bulks larger than that.
251+
# 2. It must be large enough to amortize the connection constant
252+
# across multiple requests.
253+
# 3. It must be small enough that even if multiple threads hit this size
254+
# we won't use a lot of heap.
255+
#
256+
# We wound up agreeing that a number greater than 10 MiB and less than 100MiB
257+
# made sense. We picked one on the lowish side to not use too much heap.
258+
config :bulk_batch_size, :validate => :number, :default => DEFAULT_BATCH_SIZE
259+
245260
# @override to handle proxy => '' as if none was set
246261
def config_init(params)
247262
proxy = params['proxy']

Diff for: lib/logstash/outputs/elasticsearch/common_configs.rb

+3-4
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,8 @@ def self.included(mod)
1919
# Joda formats are defined http://www.joda.org/joda-time/apidocs/org/joda/time/format/DateTimeFormat.html[here].
2020
mod.config :index, :validate => :string, :default => DEFAULT_INDEX_NAME
2121

22-
mod.config :document_type,
23-
:validate => :string,
22+
mod.config :document_type,
23+
:validate => :string,
2424
:deprecated => "Document types are being deprecated in Elasticsearch 6.0, and removed entirely in 7.0. You should avoid this feature"
2525

2626
# From Logstash 1.3 onwards, a template is applied to Elasticsearch during
@@ -69,7 +69,7 @@ def self.included(mod)
6969
# The version to use for indexing. Use sprintf syntax like `%{my_version}` to use a field value here.
7070
# See https://www.elastic.co/blog/elasticsearch-versioning-support.
7171
mod.config :version, :validate => :string
72-
72+
7373
# The version_type to use for indexing.
7474
# See https://www.elastic.co/blog/elasticsearch-versioning-support.
7575
# See also https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-index_.html#_version_types
@@ -145,7 +145,6 @@ def self.included(mod)
145145
# here like `pipeline => "%{INGEST_PIPELINE}"`
146146
mod.config :pipeline, :validate => :string, :default => nil
147147

148-
149148
# -----
150149
# ILM configurations (beta)
151150
# -----

Diff for: lib/logstash/outputs/elasticsearch/http_client.rb

+8-22
Original file line numberDiff line numberDiff line change
@@ -8,21 +8,6 @@
88
require 'stringio'
99

1010
module LogStash; module Outputs; class ElasticSearch;
11-
# This is a constant instead of a config option because
12-
# there really isn't a good reason to configure it.
13-
#
14-
# The criteria used are:
15-
# 1. We need a number that's less than 100MiB because ES
16-
# won't accept bulks larger than that.
17-
# 2. It must be large enough to amortize the connection constant
18-
# across multiple requests.
19-
# 3. It must be small enough that even if multiple threads hit this size
20-
# we won't use a lot of heap.
21-
#
22-
# We wound up agreeing that a number greater than 10 MiB and less than 100MiB
23-
# made sense. We picked one on the lowish side to not use too much heap.
24-
TARGET_BULK_BYTES = 20 * 1024 * 1024 # 20MiB
25-
2611
class HttpClient
2712
attr_reader :client, :options, :logger, :pool, :action_count, :recv_count
2813
# This is here in case we use DEFAULT_OPTIONS in the future
@@ -52,6 +37,7 @@ class HttpClient
5237
def initialize(options={})
5338
@logger = options[:logger]
5439
@metric = options[:metric]
40+
@bulk_batch_size = options[:bulk_batch_size]
5541
@bulk_request_metrics = @metric.namespace(:bulk_requests)
5642
@bulk_response_metrics = @bulk_request_metrics.namespace(:responses)
5743

@@ -110,7 +96,7 @@ def bulk(actions)
11096
if http_compression
11197
body_stream.set_encoding "BINARY"
11298
stream_writer = Zlib::GzipWriter.new(body_stream, Zlib::DEFAULT_COMPRESSION, Zlib::DEFAULT_STRATEGY)
113-
else
99+
else
114100
stream_writer = body_stream
115101
end
116102
bulk_responses = []
@@ -119,7 +105,7 @@ def bulk(actions)
119105
action.map {|line| LogStash::Json.dump(line)}.join("\n") :
120106
LogStash::Json.dump(action)
121107
as_json << "\n"
122-
if (body_stream.size + as_json.bytesize) > TARGET_BULK_BYTES
108+
if (body_stream.size + as_json.bytesize) > @bulk_batch_size
123109
bulk_responses << bulk_send(body_stream) unless body_stream.size == 0
124110
end
125111
stream_writer.write(as_json)
@@ -215,7 +201,7 @@ def scheme
215201
else
216202
nil
217203
end
218-
204+
219205
calculated_scheme = calculate_property(uris, :scheme, explicit_scheme, sniffing)
220206

221207
if calculated_scheme && calculated_scheme !~ /https?/
@@ -235,7 +221,7 @@ def port
235221
# Enter things like foo:123, bar and wind up with foo:123, bar:9200
236222
calculate_property(uris, :port, nil, sniffing) || 9200
237223
end
238-
224+
239225
def uris
240226
@options[:hosts]
241227
end
@@ -254,7 +240,7 @@ def http_compression
254240

255241
def build_adapter(options)
256242
timeout = options[:timeout] || 0
257-
243+
258244
adapter_options = {
259245
:socket_timeout => timeout,
260246
:request_timeout => timeout,
@@ -281,7 +267,7 @@ def build_adapter(options)
281267
adapter_class = ::LogStash::Outputs::ElasticSearch::HttpClient::ManticoreAdapter
282268
adapter = adapter_class.new(@logger, adapter_options)
283269
end
284-
270+
285271
def build_pool(options)
286272
adapter = build_adapter(options)
287273

@@ -331,7 +317,7 @@ def host_to_url(h)
331317
h.query
332318
end
333319
prefixed_raw_query = raw_query && !raw_query.empty? ? "?#{raw_query}" : nil
334-
320+
335321
raw_url = "#{raw_scheme}://#{postfixed_userinfo}#{raw_host}:#{raw_port}#{prefixed_raw_path}#{prefixed_raw_query}"
336322

337323
::LogStash::Util::SafeURI.new(raw_url)

Diff for: lib/logstash/outputs/elasticsearch/http_client_builder.rb

+6-5
Original file line numberDiff line numberDiff line change
@@ -11,13 +11,14 @@ def self.build(logger, hosts, params)
1111
:http_compression => params["http_compression"],
1212
:headers => params["custom_headers"] || {}
1313
}
14-
14+
1515
client_settings[:proxy] = params["proxy"] if params["proxy"]
16-
16+
1717
common_options = {
1818
:client_settings => client_settings,
1919
:metric => params["metric"],
20-
:resurrect_delay => params["resurrect_delay"]
20+
:resurrect_delay => params["resurrect_delay"],
21+
:bulk_batch_size => params["bulk_batch_size"]
2122
}
2223

2324
if params["sniffing"]
@@ -65,7 +66,7 @@ def self.build(logger, hosts, params)
6566
LogStash::ConfigurationError,
6667
"External versioning requires the presence of a version number."
6768
) if external_version_types.include?(params.fetch('version_type', '')) and params.fetch("version", nil) == nil
68-
69+
6970

7071
# Create API setup
7172
raise(
@@ -144,7 +145,7 @@ def self.setup_ssl(logger, params)
144145

145146
def self.setup_basic_auth(logger, params)
146147
user, password = params["user"], params["password"]
147-
148+
148149
return {} unless user && password && password.value
149150

150151
{

Diff for: spec/integration/outputs/index_spec.rb

+48-15
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
require_relative "../../../spec/es_spec_helper"
22
require "logstash/outputs/elasticsearch"
33

4-
describe "TARGET_BULK_BYTES", :integration => true do
5-
let(:target_bulk_bytes) { LogStash::Outputs::ElasticSearch::TARGET_BULK_BYTES }
4+
describe "BATCH_BULK_SIZE", :integration => true do
5+
let(:batch_bulk_size) { LogStash::Outputs::ElasticSearch::DEFAULT_BATCH_SIZE }
66
let(:event_count) { 1000 }
77
let(:events) { event_count.times.map { event }.to_a }
88
let(:config) {
@@ -23,11 +23,11 @@
2323
end
2424

2525
describe "batches that are too large for one" do
26-
let(:event) { LogStash::Event.new("message" => "a " * (((target_bulk_bytes/2) / event_count)+1)) }
26+
let(:event) { LogStash::Event.new("message" => "a " * (((batch_bulk_size/2) / event_count)+1)) }
2727

2828
it "should send in two batches" do
2929
expect(subject.client).to have_received(:bulk_send).twice do |payload|
30-
expect(payload.size).to be <= target_bulk_bytes
30+
expect(payload.size).to be <= batch_bulk_size
3131
end
3232
end
3333

@@ -38,7 +38,40 @@
3838

3939
it "should send in one batch" do
4040
expect(subject.client).to have_received(:bulk_send).once do |payload|
41-
expect(payload.size).to be <= target_bulk_bytes
41+
expect(payload.size).to be <= batch_bulk_size
42+
end
43+
end
44+
end
45+
end
46+
47+
describe "custom bulk size set" do
48+
let(:batch_bulk_size) { 5 * 1024 * 1024 }
49+
let(:config) {
50+
{
51+
"hosts" => get_host_port,
52+
"index" => index,
53+
"bulk_batch_size" => batch_bulk_size
54+
}
55+
}
56+
57+
describe "batches that are too large for one" do
58+
let(:event) { LogStash::Event.new("message" => "a " * (((batch_bulk_size/2) / event_count)+1)) }
59+
60+
it "should send in two batches" do
61+
expect(subject.client).to have_received(:bulk_send).twice do |payload|
62+
expect(payload.size).to be <= batch_bulk_size
63+
end
64+
end
65+
66+
describe "batches that fit in one" do
67+
# Normally you'd want to generate a request that's just 1 byte below the limit, but it's
68+
# impossible to know how many bytes an event will serialize as with bulk proto overhead
69+
let(:event) { LogStash::Event.new("message" => "a") }
70+
71+
it "should send in one batch" do
72+
expect(subject.client).to have_received(:bulk_send).once do |payload|
73+
expect(payload.size).to be <= batch_bulk_size
74+
end
4275
end
4376
end
4477
end
@@ -53,7 +86,7 @@
5386
let(:config) { "not implemented" }
5487
let(:events) { event_count.times.map { event }.to_a }
5588
subject { LogStash::Outputs::ElasticSearch.new(config) }
56-
89+
5790
let(:es_url) { "http://#{get_host_port}" }
5891
let(:index_url) {"#{es_url}/#{index}"}
5992
let(:http_client_options) { {} }
@@ -65,7 +98,7 @@
6598
subject.register
6699
subject.multi_receive([])
67100
end
68-
101+
69102
shared_examples "an indexer" do |secure|
70103
it "ships events" do
71104
subject.multi_receive(events)
@@ -85,13 +118,13 @@
85118
expect(doc["_index"]).to eq(index)
86119
end
87120
end
88-
121+
89122
it "sets the correct content-type header" do
90123
expected_manticore_opts = {:headers => {"Content-Type" => "application/json"}, :body => anything}
91124
if secure
92125
expected_manticore_opts = {
93-
:headers => {"Content-Type" => "application/json"},
94-
:body => anything,
126+
:headers => {"Content-Type" => "application/json"},
127+
:body => anything,
95128
:auth => {
96129
:user => user,
97130
:password => password,
@@ -146,22 +179,22 @@
146179
:auth => {
147180
:user => user,
148181
:password => password
149-
},
182+
},
150183
:ssl => {
151184
:enabled => true,
152185
:ca_file => cacert
153186
}
154187
}
155188
end
156189
it_behaves_like("an indexer", true)
157-
190+
158191
describe "with a password requiring escaping" do
159192
let(:user) { "f@ncyuser" }
160193
let(:password) { "ab%12#" }
161-
194+
162195
include_examples("an indexer", true)
163196
end
164-
197+
165198
describe "with a user/password requiring escaping in the URL" do
166199
let(:config) do
167200
{
@@ -171,7 +204,7 @@
171204
"index" => index
172205
}
173206
end
174-
207+
175208
include_examples("an indexer", true)
176209
end
177210
end

0 commit comments

Comments
 (0)