Skip to content

Commit 76b5838

Browse files
committed
Merge 15.0.x into pr_merge_from_14_1_x_to_15_0_x - resolved automatically
2 parents 9ba2b1a + bb95851 commit 76b5838

15 files changed

+1346
-260
lines changed

pom.xml

Lines changed: 26 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,11 @@
55
<parent>
66
<groupId>io.confluent</groupId>
77
<artifactId>common</artifactId>
8-
<version>7.0.12</version>
8+
<version>[7.9.0, 7.9.1)</version>
99
</parent>
1010

1111
<artifactId>kafka-connect-elasticsearch</artifactId>
12-
<version>14.1.4-SNAPSHOT</version>
12+
<version>15.0.2-SNAPSHOT</version>
1313
<packaging>jar</packaging>
1414
<name>kafka-connect-elasticsearch</name>
1515
<organization>
@@ -33,13 +33,12 @@
3333
<connection>scm:git:git://github.com/confluentinc/kafka-connect-elasticsearch.git</connection>
3434
<developerConnection>scm:git:[email protected]:confluentinc/kafka-connect-elasticsearch.git</developerConnection>
3535
<url>https://github.com/confluentinc/kafka-connect-elasticsearch</url>
36-
<tag>14.1.x</tag>
36+
<tag>15.0.x</tag>
3737
</scm>
3838

3939
<properties>
4040
<es.version>7.17.24</es.version>
4141
<hamcrest.version>1.3</hamcrest.version>
42-
<mockito.version>2.28.2</mockito.version>
4342
<gson.version>2.9.0</gson.version>
4443
<test.containers.version>1.16.3</test.containers.version>
4544
<kafka.connect.maven.plugin.version>0.11.1</kafka.connect.maven.plugin.version>
@@ -54,7 +53,7 @@
5453
<dependency.check.version>6.1.6</dependency.check.version>
5554
<confluent.maven.repo>http://packages.confluent.io/maven/</confluent.maven.repo>
5655
<commons.codec.version>1.15</commons.codec.version>
57-
<confluent.version>${io.confluent.common.version}</confluent.version>
56+
<confluent.version>[7.9.0,7.9.1)</confluent.version>
5857
<jackson.version>2.16.0</jackson.version>
5958
<dependency.check.skip>true</dependency.check.skip>
6059
</properties>
@@ -182,7 +181,15 @@
182181
<dependency>
183182
<groupId>org.apache.kafka</groupId>
184183
<artifactId>kafka-clients</artifactId>
185-
<version>${kafka.version}</version>
184+
<version>7.8.0-ccs</version>
185+
<classifier>test</classifier>
186+
<type>test-jar</type>
187+
<scope>test</scope>
188+
</dependency>
189+
<dependency>
190+
<groupId>org.apache.kafka</groupId>
191+
<artifactId>kafka-server-common</artifactId>
192+
<version>7.8.0-ccs</version>
186193
<classifier>test</classifier>
187194
<type>test-jar</type>
188195
<scope>test</scope>
@@ -312,6 +319,19 @@
312319
</dependencies>
313320
</dependencyManagement>
314321

322+
<distributionManagement>
323+
<repository>
324+
<id>aws-release</id>
325+
<name>AWS Release Repository</name>
326+
<url>${confluent.release.repo}</url>
327+
</repository>
328+
<snapshotRepository>
329+
<id>aws-snapshot</id>
330+
<name>AWS Snapshot Repository</name>
331+
<url>${confluent.snapshot.repo}</url>
332+
</snapshotRepository>
333+
</distributionManagement>
334+
315335
<build>
316336
<plugins>
317337
<plugin>

src/main/java/io/confluent/connect/elasticsearch/ConfigCallbackHandler.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@
5454
import org.apache.http.nio.conn.ssl.SSLIOSessionStrategy;
5555
import org.apache.http.nio.reactor.ConnectingIOReactor;
5656
import org.apache.http.nio.reactor.IOReactorException;
57-
import org.apache.kafka.common.network.Mode;
57+
import org.apache.kafka.common.network.ConnectionMode;
5858
import org.apache.kafka.common.security.ssl.SslFactory;
5959
import org.apache.kafka.connect.errors.ConnectException;
6060
import org.elasticsearch.client.RestClientBuilder.HttpClientConfigCallback;
@@ -258,7 +258,7 @@ private void configureSslContext(HttpAsyncClientBuilder builder) {
258258
* Gets the SslContext for the client.
259259
*/
260260
private SSLContext sslContext() {
261-
SslFactory sslFactory = new SslFactory(Mode.CLIENT, null, false);
261+
SslFactory sslFactory = new SslFactory(ConnectionMode.CLIENT, null, false);
262262
sslFactory.configure(config.sslConfigs());
263263

264264
try {

src/main/java/io/confluent/connect/elasticsearch/DataConverter.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ private String convertKey(Schema keySchema, Object key) {
121121
}
122122
}
123123

124-
public DocWriteRequest<?> convertRecord(SinkRecord record, String index) {
124+
public DocWriteRequest<?> convertRecord(SinkRecord record, String resourceName) {
125125
if (record.value() == null) {
126126
switch (config.behaviorOnNullValues()) {
127127
case IGNORE:
@@ -166,7 +166,7 @@ public DocWriteRequest<?> convertRecord(SinkRecord record, String index) {
166166

167167
// delete
168168
if (record.value() == null) {
169-
return maybeAddExternalVersioning(new DeleteRequest(index).id(id), record);
169+
return maybeAddExternalVersioning(new DeleteRequest(resourceName).id(id), record);
170170
}
171171

172172
String payload = getPayload(record);
@@ -175,14 +175,14 @@ public DocWriteRequest<?> convertRecord(SinkRecord record, String index) {
175175
// index
176176
switch (config.writeMethod()) {
177177
case UPSERT:
178-
return new UpdateRequest(index, id)
178+
return new UpdateRequest(resourceName, id)
179179
.doc(payload, XContentType.JSON)
180180
.upsert(payload, XContentType.JSON)
181181
.retryOnConflict(Math.min(config.maxInFlightRequests(), 5));
182182
case INSERT:
183183
OpType opType = config.isDataStream() ? OpType.CREATE : OpType.INDEX;
184184
IndexRequest req =
185-
new IndexRequest(index).source(payload, XContentType.JSON).opType(opType);
185+
new IndexRequest(resourceName).source(payload, XContentType.JSON).opType(opType);
186186
if (config.useAutogeneratedIds()) {
187187
return req;
188188
}

src/main/java/io/confluent/connect/elasticsearch/ElasticsearchClient.java

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -280,13 +280,14 @@ public boolean createIndexOrDataStream(String name) {
280280
/**
281281
* Creates a mapping for the given index and schema.
282282
*
283-
* @param index the index to create the mapping for
283+
* @param resourceName the resource to create the mapping for
284284
* @param schema the schema to map
285285
*/
286-
public void createMapping(String index, Schema schema) {
287-
PutMappingRequest request = new PutMappingRequest(index).source(Mapping.buildMapping(schema));
286+
public void createMapping(String resourceName, Schema schema) {
287+
PutMappingRequest request = new PutMappingRequest(resourceName)
288+
.source(Mapping.buildMapping(schema));
288289
callWithRetries(
289-
String.format("create mapping for index %s with schema %s", index, schema),
290+
String.format("create mapping for resource %s with schema %s", resourceName, schema),
290291
() -> client.indices().putMapping(request, RequestOptions.DEFAULT)
291292
);
292293
}
@@ -318,11 +319,11 @@ public void waitForInFlightRequests() {
318319

319320
/**
320321
* Checks whether the index already has a mapping or not.
321-
* @param index the index to check
322+
* @param resourceName the resource to check
322323
* @return true if a mapping exists, false if it does not
323324
*/
324-
public boolean hasMapping(String index) {
325-
MappingMetadata mapping = mapping(index);
325+
public boolean hasMapping(String resourceName) {
326+
MappingMetadata mapping = mapping(resourceName);
326327
return mapping != null && mapping.sourceAsMap() != null && !mapping.sourceAsMap().isEmpty();
327328
}
328329

0 commit comments

Comments
 (0)