Skip to content

Commit c50e2d7

Browse files
committed
Merge remote-tracking branch 'origin/11.0.x' into HEAD
2 parents d2ac7a8 + 9b8ecbc commit c50e2d7

File tree

6 files changed

+54
-29
lines changed

6 files changed

+54
-29
lines changed

src/test/java/io/confluent/connect/elasticsearch/ElasticsearchClientTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -573,7 +573,7 @@ public void testSsl() throws Exception {
573573
container = ElasticsearchContainer.fromSystemProperties().withSslEnabled(true);
574574
container.start();
575575

576-
String address = container.getConnectionUrl().replace(container.getContainerIpAddress(), container.hostMachineIpAddress());
576+
String address = container.getConnectionUrl(false);
577577
props.put(CONNECTION_URL_CONFIG, address);
578578
props.put(CONNECTION_USERNAME_CONFIG, ELASTIC_SUPERUSER_NAME);
579579
props.put(CONNECTION_PASSWORD_CONFIG, ELASTIC_SUPERUSER_PASSWORD);

src/test/java/io/confluent/connect/elasticsearch/helper/ElasticsearchContainer.java

Lines changed: 33 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515

1616
package io.confluent.connect.elasticsearch.helper;
1717

18+
import org.apache.kafka.common.config.SslConfigs;
1819
import org.elasticsearch.client.security.user.User;
1920
import org.elasticsearch.client.security.user.privileges.Role;
2021
import org.slf4j.Logger;
@@ -41,10 +42,13 @@
4142
import java.util.Map;
4243

4344
import io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig;
45+
import io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.SecurityProtocol;
4446

4547
import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.CONNECTION_PASSWORD_CONFIG;
4648
import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.CONNECTION_URL_CONFIG;
4749
import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.CONNECTION_USERNAME_CONFIG;
50+
import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.SECURITY_PROTOCOL_CONFIG;
51+
import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.SSL_CONFIG_PREFIX;
4852

4953
/**
5054
* A specialized TestContainer container for testing Elasticsearch, optionally with SSL support.
@@ -154,16 +158,32 @@ public ElasticsearchContainer(String imageName) {
154158
@Override
155159
public void start() {
156160
super.start();
161+
String address;
157162
if (isBasicAuthEnabled()) {
158163
Map<String, String> props = new HashMap<>();
159164
props.put(CONNECTION_USERNAME_CONFIG, ELASTIC_SUPERUSER_NAME);
160165
props.put(CONNECTION_PASSWORD_CONFIG, ELASTIC_SUPERUSER_PASSWORD);
161-
props.put(CONNECTION_URL_CONFIG, getConnectionUrl());
166+
if (isSslEnabled()) {
167+
addSslProps(props);
168+
address = this.getConnectionUrl(false);
169+
} else {
170+
address = this.getConnectionUrl();
171+
}
172+
props.put(CONNECTION_URL_CONFIG, address);
162173
ElasticsearchHelperClient helperClient = getHelperClient(props);
163174
createUsersAndRoles(helperClient);
164175
}
165176
}
166177

178+
public void addSslProps(Map<String, String> props) {
179+
props.put(SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SSL.name());
180+
props.put(SSL_CONFIG_PREFIX + SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, this.getKeystorePath());
181+
props.put(SSL_CONFIG_PREFIX + SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, this.getKeystorePassword());
182+
props.put(SSL_CONFIG_PREFIX + SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, this.getTruststorePath());
183+
props.put(SSL_CONFIG_PREFIX + SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, this.getTruststorePassword());
184+
props.put(SSL_CONFIG_PREFIX + SslConfigs.SSL_KEY_PASSWORD_CONFIG, this.getKeyPassword());
185+
}
186+
167187
private void createUsersAndRoles(ElasticsearchHelperClient helperClient ) {
168188
try {
169189
for (Role role: this.rolesToCreate) {
@@ -371,19 +391,28 @@ public String hostMachineIpAddress() {
371391
}
372392
}
373393

394+
/**
395+
* @see ElasticsearchContainer#getConnectionUrl(boolean)
396+
*/
397+
public String getConnectionUrl() {
398+
return getConnectionUrl(true);
399+
}
400+
374401
/**
375402
* Get the Elasticsearch connection URL.
376403
*
377404
* <p>This can only be called once the container is started.
378405
*
406+
* @param useContainerIpAddress use container IP if true, host machine's IP otherwise
407+
*
379408
* @return the connection URL; never null
380409
*/
381-
public String getConnectionUrl() {
410+
public String getConnectionUrl(boolean useContainerIpAddress) {
382411
String protocol = isSslEnabled() ? "https" : "http";
383412
return String.format(
384413
"%s://%s:%d",
385414
protocol,
386-
getContainerIpAddress(),
415+
useContainerIpAddress ? getContainerIpAddress() : hostMachineIpAddress(),
387416
getMappedPort(ELASTICSEARCH_DEFAULT_PORT)
388417
);
389418
}
@@ -524,7 +553,7 @@ public ElasticsearchHelperClient getHelperClient(Map<String, String> props) {
524553
superUserProps.put(CONNECTION_USERNAME_CONFIG, ELASTIC_SUPERUSER_NAME);
525554
superUserProps.put(CONNECTION_PASSWORD_CONFIG, ELASTIC_SUPERUSER_PASSWORD);
526555
ElasticsearchSinkConnectorConfig config = new ElasticsearchSinkConnectorConfig(superUserProps);
527-
ElasticsearchHelperClient client = new ElasticsearchHelperClient(this.getConnectionUrl(), config);
556+
ElasticsearchHelperClient client = new ElasticsearchHelperClient(props.get(CONNECTION_URL_CONFIG), config);
528557
return client;
529558
}
530559
}

src/test/java/io/confluent/connect/elasticsearch/integration/ElasticsearchConnectorIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131
import io.confluent.connect.elasticsearch.helper.ElasticsearchContainer;
3232

3333
import org.apache.kafka.connect.storage.StringConverter;
34-
import org.apache.kafka.test.IntegrationTest;
34+
import io.confluent.common.utils.IntegrationTest;
3535
import org.elasticsearch.client.security.user.User;
3636
import org.elasticsearch.client.security.user.privileges.Role;
3737
import org.elasticsearch.search.SearchHit;

src/test/java/io/confluent/connect/elasticsearch/integration/ElasticsearchConnectorKerberosIT.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,13 @@
1313
import java.util.Properties;
1414
import org.apache.hadoop.minikdc.MiniKdc;
1515
import org.apache.kafka.connect.errors.ConnectException;
16+
import io.confluent.common.utils.IntegrationTest;
1617
import org.junit.AfterClass;
1718
import org.junit.BeforeClass;
1819
import org.junit.Test;
20+
import org.junit.experimental.categories.Category;
1921

22+
@Category(IntegrationTest.class)
2023
public class ElasticsearchConnectorKerberosIT extends ElasticsearchConnectorBaseIT {
2124

2225
private static File baseDir;

src/test/java/io/confluent/connect/elasticsearch/integration/ElasticsearchConnectorKerberosWithSslIT.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,12 @@
77
import io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.SecurityProtocol;
88
import io.confluent.connect.elasticsearch.helper.ElasticsearchContainer;
99
import org.apache.kafka.common.config.SslConfigs;
10+
import io.confluent.common.utils.IntegrationTest;
1011
import org.junit.BeforeClass;
1112
import org.junit.Test;
13+
import org.junit.experimental.categories.Category;
1214

15+
@Category(IntegrationTest.class)
1316
public class ElasticsearchConnectorKerberosWithSslIT extends ElasticsearchConnectorKerberosIT{
1417

1518
@BeforeClass
@@ -33,10 +36,7 @@ public void testKerberos() {
3336
@Test
3437
public void testKerberosWithSsl() throws Exception {
3538
// Use IP address here because that's what the certificates allow
36-
String address = container.getConnectionUrl().replace(
37-
container.getContainerIpAddress(),
38-
container.hostMachineIpAddress()
39-
);
39+
String address = container.getConnectionUrl(false);
4040

4141
props.put(CONNECTION_URL_CONFIG, address);
4242
props.put(SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SSL.name());

src/test/java/io/confluent/connect/elasticsearch/integration/ElasticsearchConnectorSslIT.java

Lines changed: 12 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
package io.confluent.connect.elasticsearch.integration;
1717

1818
import io.confluent.common.utils.IntegrationTest;
19-
import io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.SecurityProtocol;
2019
import io.confluent.connect.elasticsearch.helper.ElasticsearchContainer;
2120

2221
import org.apache.kafka.common.config.SslConfigs;
@@ -28,14 +27,12 @@
2827
import org.slf4j.Logger;
2928
import org.slf4j.LoggerFactory;
3029

31-
import java.util.Collections;
3230
import java.util.List;
3331
import java.util.Map;
3432

3533
import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.CONNECTION_PASSWORD_CONFIG;
3634
import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.CONNECTION_URL_CONFIG;
3735
import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.CONNECTION_USERNAME_CONFIG;
38-
import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.SECURITY_PROTOCOL_CONFIG;
3936
import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.SSL_CONFIG_PREFIX;
4037

4138

@@ -58,20 +55,27 @@ public static void setupBeforeAll() {
5855
*/
5956
@Test
6057
public void testSecureConnectionVerifiedHostname() throws Throwable {
61-
// Use IP address here because that's what the certificates allow
62-
String address = container.getConnectionUrl();
63-
address = address.replace(container.getContainerIpAddress(), container.hostMachineIpAddress());
58+
// Use container IP address here because that's what the certificates allow
59+
String address = container.getConnectionUrl(false);
6460
log.info("Creating connector for {}.", address);
6561

6662
props.put(CONNECTION_URL_CONFIG, address);
67-
addSslProps();
63+
container.addSslProps(props);
6864

6965
helperClient = container.getHelperClient(props);
7066

7167
// Start connector
7268
runSimpleTest(props);
7369
}
7470

71+
@Override
72+
protected Map<String, String> createProps() {
73+
props = super.createProps();
74+
props.put(CONNECTION_USERNAME_CONFIG, ELASTIC_MINIMAL_PRIVILEGES_NAME);
75+
props.put(CONNECTION_PASSWORD_CONFIG, ELASTIC_MINIMAL_PRIVILEGES_PASSWORD);
76+
return props;
77+
}
78+
7579
@Test
7680
public void testSecureConnectionHostnameVerificationDisabled() throws Throwable {
7781
// Use 'localhost' here that is not in self-signed cert
@@ -80,7 +84,7 @@ public void testSecureConnectionHostnameVerificationDisabled() throws Throwable
8084
log.info("Creating connector for {}", address);
8185

8286
props.put(CONNECTION_URL_CONFIG, address);
83-
addSslProps();
87+
container.addSslProps(props);
8488

8589
// disable hostname verification
8690
props.put(SSL_CONFIG_PREFIX + SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
@@ -90,15 +94,4 @@ public void testSecureConnectionHostnameVerificationDisabled() throws Throwable
9094
// Start connector
9195
runSimpleTest(props);
9296
}
93-
94-
private void addSslProps() {
95-
props.put(SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SSL.name());
96-
props.put(SSL_CONFIG_PREFIX + SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, container.getKeystorePath());
97-
props.put(SSL_CONFIG_PREFIX + SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, container.getKeystorePassword());
98-
props.put(SSL_CONFIG_PREFIX + SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, container.getTruststorePath());
99-
props.put(SSL_CONFIG_PREFIX + SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, container.getTruststorePassword());
100-
props.put(SSL_CONFIG_PREFIX + SslConfigs.SSL_KEY_PASSWORD_CONFIG, container.getKeyPassword());
101-
props.put(CONNECTION_USERNAME_CONFIG, ELASTIC_MINIMAL_PRIVILEGES_NAME);
102-
props.put(CONNECTION_PASSWORD_CONFIG, ELASTIC_MINIMAL_PRIVILEGES_PASSWORD);
103-
}
10497
}

0 commit comments

Comments
 (0)