Skip to content

Commit 9ba2b1a

Browse files
authored
Merge pull request #880 from confluentinc/backport-pr870-to-14.1.x
CC-32762: Set threadName with taskId prefix
2 parents fd219fa + 40f261d commit 9ba2b1a

File tree

9 files changed

+172
-40
lines changed

9 files changed

+172
-40
lines changed

src/main/java/io/confluent/connect/elasticsearch/ElasticsearchClient.java

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import java.util.concurrent.locks.Condition;
3333
import java.util.concurrent.locks.Lock;
3434
import java.util.concurrent.locks.ReentrantLock;
35+
import java.util.concurrent.ThreadFactory;
3536
import java.util.function.BiConsumer;
3637

3738
import org.apache.http.HttpHost;
@@ -122,9 +123,21 @@ public class ElasticsearchClient {
122123
public ElasticsearchClient(
123124
ElasticsearchSinkConnectorConfig config,
124125
ErrantRecordReporter reporter,
125-
Runnable afterBulkCallback
126+
Runnable afterBulkCallback,
127+
int taskId,
128+
String connectorName
126129
) {
127-
this.bulkExecutorService = Executors.newFixedThreadPool(config.maxInFlightRequests());
130+
this.bulkExecutorService = Executors.newFixedThreadPool(config.maxInFlightRequests(),
131+
new ThreadFactory() {
132+
private final AtomicInteger threadNumber = new AtomicInteger(1);
133+
@Override
134+
public Thread newThread(Runnable r) {
135+
Thread thread = Executors.defaultThreadFactory().newThread(r);
136+
thread.setName(connectorName + "-" + taskId + "-elasticsearch-bulk-executor-"
137+
+ threadNumber.getAndIncrement());
138+
return thread;
139+
}
140+
});
128141
this.numBufferedRecords = new AtomicInteger(0);
129142
this.error = new AtomicReference<>();
130143
this.requestToSinkRecord = new ConcurrentHashMap<>();

src/main/java/io/confluent/connect/elasticsearch/ElasticsearchSinkConnector.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -58,10 +58,10 @@ public Class<? extends Task> taskClass() {
5858
@Override
5959
public List<Map<String, String>> taskConfigs(int maxTasks) {
6060
List<Map<String, String>> taskConfigs = new ArrayList<>();
61-
Map<String, String> taskProps = new HashMap<>();
62-
taskProps.putAll(configProperties);
6361
for (int i = 0; i < maxTasks; i++) {
64-
taskConfigs.add(taskProps);
62+
HashMap<String, String> taskConfig = new HashMap<>(configProperties);
63+
taskConfig.put(ElasticsearchSinkTaskConfig.TASK_ID_CONFIG, Integer.toString(i));
64+
taskConfigs.add(taskConfig);
6565
}
6666
return taskConfigs;
6767
}

src/main/java/io/confluent/connect/elasticsearch/ElasticsearchSinkConnectorConfig.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -891,6 +891,10 @@ private static void addDataStreamConfigs(ConfigDef configDef) {
891891

892892
public static final ConfigDef CONFIG = baseConfigDef();
893893

894+
protected ElasticsearchSinkConnectorConfig(ConfigDef config, Map<String, String> properties) {
895+
super(config, properties);
896+
}
897+
894898
public ElasticsearchSinkConnectorConfig(Map<String, String> props) {
895899
super(CONFIG, props);
896900
}

src/main/java/io/confluent/connect/elasticsearch/ElasticsearchSinkTask.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ public class ElasticsearchSinkTask extends SinkTask {
4242

4343
private DataConverter converter;
4444
private ElasticsearchClient client;
45-
private ElasticsearchSinkConnectorConfig config;
45+
private ElasticsearchSinkTaskConfig config;
4646
private ErrantRecordReporter reporter;
4747
private Set<String> existingMappings;
4848
private Set<String> indexCache;
@@ -58,7 +58,7 @@ public void start(Map<String, String> props) {
5858
protected void start(Map<String, String> props, ElasticsearchClient client) {
5959
log.info("Starting ElasticsearchSinkTask.");
6060

61-
this.config = new ElasticsearchSinkConnectorConfig(props);
61+
this.config = new ElasticsearchSinkTaskConfig(props);
6262
this.converter = new DataConverter(config);
6363
this.existingMappings = new HashSet<>();
6464
this.indexCache = new HashSet<>();
@@ -80,7 +80,8 @@ protected void start(Map<String, String> props, ElasticsearchClient client) {
8080
}
8181
Runnable afterBulkCallback = () -> offsetTracker.updateOffsets();
8282
this.client = client != null ? client
83-
: new ElasticsearchClient(config, reporter, afterBulkCallback);
83+
: new ElasticsearchClient(config, reporter, afterBulkCallback,
84+
config.getTaskId(), config.getConnectorName());
8485

8586
if (!config.flushSynchronously()) {
8687
this.offsetTracker = new AsyncOffsetTracker(context);
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*
2+
* Copyright 2018 Confluent Inc.
3+
*
4+
* Licensed under the Confluent Community License (the "License"); you may not use
5+
* this file except in compliance with the License. You may obtain a copy of the
6+
* License at
7+
*
8+
* http://www.confluent.io/confluent-community-license
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
13+
* specific language governing permissions and limitations under the License.
14+
*/
15+
16+
package io.confluent.connect.elasticsearch;
17+
18+
import org.apache.kafka.common.config.ConfigDef;
19+
import java.util.Map;
20+
21+
public class ElasticsearchSinkTaskConfig extends ElasticsearchSinkConnectorConfig {
22+
23+
private final int taskId;
24+
private final String connectorName;
25+
26+
public static final String TASK_ID_CONFIG = "taskId";
27+
private static final ConfigDef.Type TASK_ID_TYPE = ConfigDef.Type.INT;
28+
public static final ConfigDef.Importance TASK_ID_IMPORTANCE = ConfigDef.Importance.LOW;
29+
public static final int TASK_ID_DEFAULT = 0;
30+
31+
/**
32+
* Return a ConfigDef object used to define this config's fields.
33+
*
34+
* @return A ConfigDef object used to define this config's fields.
35+
*/
36+
public static ConfigDef config() {
37+
return ElasticsearchSinkConnectorConfig.baseConfigDef()
38+
.defineInternal(
39+
TASK_ID_CONFIG,
40+
TASK_ID_TYPE,
41+
TASK_ID_DEFAULT,
42+
TASK_ID_IMPORTANCE
43+
);
44+
}
45+
46+
/**
47+
* @param properties A Map detailing configuration properties and their respective values.
48+
*/
49+
public ElasticsearchSinkTaskConfig(Map<String, String> properties) {
50+
super(config(), properties);
51+
taskId = properties.containsKey(TASK_ID_CONFIG) ? getInt(TASK_ID_CONFIG) : 0;
52+
connectorName = originalsStrings().get("name");
53+
}
54+
55+
public int getTaskId() {
56+
return taskId;
57+
}
58+
59+
public String getConnectorName() {
60+
return connectorName;
61+
}
62+
}

0 commit comments

Comments
 (0)