Skip to content

Commit 567da82

Browse files
author
Ilanchezhian Rajamanickam
authored
Log server version on sink task startup (confluentinc#561)
* Log server version on sink task startup. Revert client upgrade * Fix spacing * Fix build by replacing Metadata packages * Catch all exceptions when getting ES server version. Close Client * set high level client to null on close * Unused Imports * Move comments. change log level to warn * Close high level client after use` * Wrap client close in try * Change log level to warn * Add error to log`
1 parent 2d1882f commit 567da82

File tree

3 files changed

+71
-27
lines changed

3 files changed

+71
-27
lines changed

src/main/java/io/confluent/connect/elasticsearch/ElasticsearchClient.java

Lines changed: 21 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -15,25 +15,6 @@
1515

1616
package io.confluent.connect.elasticsearch;
1717

18-
import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.FLUSH_TIMEOUT_MS_CONFIG;
19-
import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.MAX_BUFFERED_RECORDS_CONFIG;
20-
21-
import io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.BehaviorOnMalformedDoc;
22-
import java.io.IOException;
23-
import java.util.ArrayList;
24-
import java.util.Arrays;
25-
import java.util.HashSet;
26-
import java.util.List;
27-
import java.util.Set;
28-
import java.util.concurrent.Callable;
29-
import java.util.concurrent.ConcurrentHashMap;
30-
import java.util.concurrent.ConcurrentMap;
31-
import java.util.concurrent.Executors;
32-
import java.util.concurrent.ScheduledExecutorService;
33-
import java.util.concurrent.TimeUnit;
34-
import java.util.concurrent.atomic.AtomicInteger;
35-
import java.util.concurrent.atomic.AtomicReference;
36-
import java.util.stream.Collectors;
3718
import org.apache.http.HttpHost;
3819
import org.apache.http.nio.conn.NHttpClientConnectionManager;
3920
import org.apache.kafka.common.utils.Time;
@@ -62,6 +43,27 @@
6243
import org.slf4j.Logger;
6344
import org.slf4j.LoggerFactory;
6445

46+
import java.io.IOException;
47+
import java.util.ArrayList;
48+
import java.util.Arrays;
49+
import java.util.HashSet;
50+
import java.util.List;
51+
import java.util.Set;
52+
import java.util.concurrent.Callable;
53+
import java.util.concurrent.ConcurrentHashMap;
54+
import java.util.concurrent.ConcurrentMap;
55+
import java.util.concurrent.Executors;
56+
import java.util.concurrent.ScheduledExecutorService;
57+
import java.util.concurrent.TimeUnit;
58+
import java.util.concurrent.atomic.AtomicInteger;
59+
import java.util.concurrent.atomic.AtomicReference;
60+
import java.util.stream.Collectors;
61+
62+
import io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.BehaviorOnMalformedDoc;
63+
64+
import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.FLUSH_TIMEOUT_MS_CONFIG;
65+
import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.MAX_BUFFERED_RECORDS_CONFIG;
66+
6567
public class ElasticsearchClient {
6668

6769
private static final Logger log = LoggerFactory.getLogger(ElasticsearchClient.class);

src/main/java/io/confluent/connect/elasticsearch/ElasticsearchSinkTask.java

Lines changed: 44 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515

1616
package io.confluent.connect.elasticsearch;
1717

18-
import io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.BehaviorOnNullValues;
18+
import org.apache.http.HttpHost;
1919
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
2020
import org.apache.kafka.common.TopicPartition;
2121
import org.apache.kafka.connect.errors.ConnectException;
@@ -24,13 +24,20 @@
2424
import org.apache.kafka.connect.sink.SinkRecord;
2525
import org.apache.kafka.connect.sink.SinkTask;
2626
import org.elasticsearch.action.DocWriteRequest;
27+
import org.elasticsearch.action.main.MainResponse;
28+
import org.elasticsearch.client.RequestOptions;
29+
import org.elasticsearch.client.RestClient;
30+
import org.elasticsearch.client.RestHighLevelClient;
2731
import org.slf4j.Logger;
2832
import org.slf4j.LoggerFactory;
2933

3034
import java.util.Collection;
3135
import java.util.HashSet;
3236
import java.util.Map;
3337
import java.util.Set;
38+
import java.util.stream.Collectors;
39+
40+
import io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.BehaviorOnNullValues;
3441

3542
public class ElasticsearchSinkTask extends SinkTask {
3643

@@ -62,7 +69,6 @@ protected void start(Map<String, String> props, ElasticsearchClient client) {
6269
if (context.errantRecordReporter() == null) {
6370
log.info("Errant record reporter not configured.");
6471
}
65-
6672
// may be null if DLQ not enabled
6773
reporter = context.errantRecordReporter();
6874
} catch (NoClassDefFoundError | NoSuchMethodError e) {
@@ -72,7 +78,8 @@ protected void start(Map<String, String> props, ElasticsearchClient client) {
7278

7379
this.client = client != null ? client : new ElasticsearchClient(config, reporter);
7480

75-
log.info("Started ElasticsearchSinkTask.");
81+
log.info("Started ElasticsearchSinkTask. Connecting to ES server version: {}",
82+
getServerVersion());
7683
}
7784

7885
@Override
@@ -125,6 +132,40 @@ private void checkMapping(SinkRecord record) {
125132
}
126133
}
127134

135+
private String getServerVersion() {
136+
ConfigCallbackHandler configCallbackHandler = new ConfigCallbackHandler(config);
137+
RestHighLevelClient highLevelClient = new RestHighLevelClient(
138+
RestClient
139+
.builder(
140+
config.connectionUrls()
141+
.stream()
142+
.map(HttpHost::create)
143+
.collect(Collectors.toList())
144+
.toArray(new HttpHost[config.connectionUrls().size()])
145+
)
146+
.setHttpClientConfigCallback(configCallbackHandler)
147+
.setRequestConfigCallback(configCallbackHandler)
148+
);
149+
MainResponse response;
150+
String esVersionNumber = "Unknown";
151+
try {
152+
response = highLevelClient.info(RequestOptions.DEFAULT);
153+
esVersionNumber = response.getVersion().toString();
154+
} catch (Exception e) {
155+
// Same error messages as from validating the connection for IOException.
156+
// Insufficient privileges to validate the version number if caught
157+
// ElasticsearchStatusException.
158+
log.warn("Failed to get ES server version", e);
159+
} finally {
160+
try {
161+
highLevelClient.close();
162+
} catch (Exception e) {
163+
log.warn("Failed to close high level client", e);
164+
}
165+
}
166+
return esVersionNumber;
167+
}
168+
128169
/**
129170
* Returns the converted index name from a given topic name. Elasticsearch accepts:
130171
* <ul>

src/test/java/io/confluent/connect/elasticsearch/helper/ElasticsearchHelperClient.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,6 @@
1515

1616
package io.confluent.connect.elasticsearch.helper;
1717

18-
import io.confluent.connect.elasticsearch.ConfigCallbackHandler;
19-
import io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig;
20-
import java.io.IOException;
21-
import java.util.Map.Entry;
22-
2318
import org.apache.http.HttpHost;
2419
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
2520
import org.elasticsearch.action.search.SearchRequest;
@@ -42,6 +37,12 @@
4237
import org.slf4j.Logger;
4338
import org.slf4j.LoggerFactory;
4439

40+
import java.io.IOException;
41+
import java.util.Map.Entry;
42+
43+
import io.confluent.connect.elasticsearch.ConfigCallbackHandler;
44+
import io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig;
45+
4546
public class ElasticsearchHelperClient {
4647

4748
private static final Logger log = LoggerFactory.getLogger(ElasticsearchHelperClient.class);

0 commit comments

Comments
 (0)