Skip to content

Commit 506c59c

Browse files
committed
Added setting 'pipeline.batch.metrics' to enable/disable the collection of batch size related metrics into histrograms
1 parent 66cd2b3 commit 506c59c

File tree

9 files changed

+22
-14
lines changed

9 files changed

+22
-14
lines changed

logstash-core/lib/logstash/environment.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ module Environment
8282
Setting::ExistingFilePath.new("api.ssl.keystore.path", nil, false).nullable,
8383
Setting::Password.new("api.ssl.keystore.password", nil, false).nullable,
8484
Setting::StringArray.new("api.ssl.supported_protocols", nil, true, %w[TLSv1 TLSv1.1 TLSv1.2 TLSv1.3]),
85+
Setting::SettingString.new("pipeline.batch.metrics", "false", true, ["false", "true"]),
8586
Setting::SettingString.new("queue.type", "memory", true, ["persisted", "memory"]),
8687
Setting::Boolean.new("queue.drain", false),
8788
Setting::Bytes.new("queue.page_capacity", "64mb"),

logstash-core/lib/logstash/settings.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ def self.included(base)
5858
"path.dead_letter_queue",
5959
"path.queue",
6060
"pipeline.batch.delay",
61+
"pipeline.batch.metrics",
6162
"pipeline.batch.size",
6263
"pipeline.id",
6364
"pipeline.reloadable",

logstash-core/spec/logstash/instrument/wrapped_write_client_spec.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ def threaded_read_client
113113
end
114114

115115
context "WrappedSynchronousQueue" do
116-
let(:queue) { LogStash::WrappedSynchronousQueue.new(1024) }
116+
let(:queue) { LogStash::WrappedSynchronousQueue.new(1024, "false") }
117117

118118
before do
119119
read_client.set_events_metric(metric.namespace([:stats, :events]))

logstash-core/spec/logstash/queue_factory_spec.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
LogStash::Setting::SettingNumeric.new("queue.checkpoint.writes", 1024),
3232
LogStash::Setting::Boolean.new("queue.checkpoint.retry", false),
3333
LogStash::Setting::SettingString.new("pipeline.id", pipeline_id),
34+
LogStash::Setting::SettingString.new("pipeline.batch.metrics", "false", true, ["false", "true"]),
3435
LogStash::Setting::PositiveInteger.new("pipeline.batch.size", 125),
3536
LogStash::Setting::PositiveInteger.new("pipeline.workers", LogStash::Config::CpuCoreStrategy.maximum)
3637
]

logstash-core/spec/logstash/util/wrapped_synchronous_queue_spec.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
require "logstash/instrument/collector"
2020

2121
describe LogStash::WrappedSynchronousQueue do
22-
subject {LogStash::WrappedSynchronousQueue.new(5)}
22+
subject {LogStash::WrappedSynchronousQueue.new(5, "false")}
2323

2424
describe "queue clients" do
2525
context "when requesting a write client" do

logstash-core/src/main/java/org/logstash/ackedqueue/QueueFactoryExt.java

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,10 @@ public QueueFactoryExt(final Ruby runtime, final RubyClass metaClass) {
6868
public static AbstractWrappedQueueExt create(final ThreadContext context, final IRubyObject recv,
6969
final IRubyObject settings) throws IOException {
7070
final String type = getSetting(context, settings, QUEUE_TYPE_CONTEXT_NAME).asJavaString();
71+
72+
final String histogramFlag = getSetting(context, settings, SettingKeyDefinitions.PIPELINE_BATCH_METRICS)
73+
.asJavaString();
74+
7175
if (PERSISTED_TYPE.equals(type)) {
7276
final Path queuePath = Paths.get(
7377
getSetting(context, settings, SettingKeyDefinitions.PATH_QUEUE).asJavaString(),
@@ -93,15 +97,14 @@ public static AbstractWrappedQueueExt create(final ThreadContext context, final
9397
}
9498
);
9599
} else if (MEMORY_TYPE.equals(type)) {
100+
final int batchSize = getSetting(context, settings, SettingKeyDefinitions.PIPELINE_BATCH_SIZE)
101+
.convertToInteger().getIntValue();
102+
final int workers = getSetting(context, settings, SettingKeyDefinitions.PIPELINE_WORKERS)
103+
.convertToInteger().getIntValue();
96104
return new JrubyWrappedSynchronousQueueExt(
97105
context.runtime, RubyUtil.WRAPPED_SYNCHRONOUS_QUEUE_CLASS
98106
).initialize(
99-
context, context.runtime.newFixnum(
100-
getSetting(context, settings, SettingKeyDefinitions.PIPELINE_BATCH_SIZE)
101-
.convertToInteger().getIntValue()
102-
* getSetting(context, settings, SettingKeyDefinitions.PIPELINE_WORKERS)
103-
.convertToInteger().getIntValue()
104-
)
107+
context, context.runtime.newFixnum(batchSize * workers), context.runtime.newString(histogramFlag)
105108
);
106109
} else {
107110
throw context.runtime.newRaiseException(

logstash-core/src/main/java/org/logstash/common/SettingKeyDefinitions.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@ public class SettingKeyDefinitions {
3030

3131
public static final String PIPELINE_BATCH_SIZE = "pipeline.batch.size";
3232

33+
public static final String PIPELINE_BATCH_METRICS = "pipeline.batch.metrics";
34+
3335
public static final String PATH_QUEUE = "path.queue";
3436

3537
public static final String QUEUE_PAGE_CAPACITY = "queue.page_capacity";

logstash-core/src/main/java/org/logstash/execution/QueueReadClient.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,9 @@ public interface QueueReadClient {
3535
void addFilteredMetrics(int filteredSize);
3636
void closeBatch(QueueBatch batch) throws IOException;
3737

38-
public <V, E extends Exception> V executeWithTimers(final TimerMetric.ExceptionalSupplier<V,E> supplier) throws E;
38+
<V, E extends Exception> V executeWithTimers(final TimerMetric.ExceptionalSupplier<V,E> supplier) throws E;
3939

40-
public <E extends Exception> void executeWithTimers(final TimerMetric.ExceptionalRunnable<E> runnable) throws E;
40+
<E extends Exception> void executeWithTimers(final TimerMetric.ExceptionalRunnable<E> runnable) throws E;
4141

4242
boolean isEmpty();
4343
}

logstash-core/src/main/java/org/logstash/ext/JrubyWrappedSynchronousQueueExt.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,7 @@
2323
import java.util.concurrent.ArrayBlockingQueue;
2424
import java.util.concurrent.BlockingQueue;
2525

26-
import org.jruby.Ruby;
27-
import org.jruby.RubyClass;
28-
import org.jruby.RubyNumeric;
26+
import org.jruby.*;
2927
import org.jruby.anno.JRubyClass;
3028
import org.jruby.anno.JRubyMethod;
3129
import org.jruby.runtime.ThreadContext;
@@ -50,8 +48,10 @@ public JrubyWrappedSynchronousQueueExt(final Ruby runtime, final RubyClass metaC
5048
@JRubyMethod
5149
@SuppressWarnings("unchecked")
5250
public JrubyWrappedSynchronousQueueExt initialize(final ThreadContext context,
53-
IRubyObject size) {
51+
IRubyObject size,
52+
IRubyObject batchMetricsSampling) {
5453
int typedSize = ((RubyNumeric)size).getIntValue();
54+
String batchMetricsSamplingType = ((RubyString) batchMetricsSampling).asJavaString();
5555
this.queue = new ArrayBlockingQueue<>(typedSize);
5656
return this;
5757
}

0 commit comments

Comments
 (0)