Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,24 @@ public class AvroConfluentFormatOptions {
.noDefaultValue()
.withDescription("Bearer auth token for Schema Registry");

public static final ConfigOption<String> TOKEN_ENDPOINT_URL =
ConfigOptions.key("bearer-auth.token.endpoint.url")
.stringType()
.noDefaultValue()
.withDescription("OAuth Bearer token endpoint URL for Schema Registry");

public static final ConfigOption<String> SASL_JAAS_CONFIG =
ConfigOptions.key("bearer-auth.jaas.config")
.stringType()
.noDefaultValue()
.withDescription("SASL JAAS configuration for Schema Registry");

public static final ConfigOption<String> LOGICAL_CLUSTER =
ConfigOptions.key("bearer-auth.logical.cluster")
.stringType()
.noDefaultValue()
.withDescription("Logical cluster identifier for Schema Registry");

// --------------------------------------------------------------------------------------------
// Fallback properties
// --------------------------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,16 @@
import static org.apache.flink.formats.avro.registry.confluent.AvroConfluentFormatOptions.BASIC_AUTH_USER_INFO;
import static org.apache.flink.formats.avro.registry.confluent.AvroConfluentFormatOptions.BEARER_AUTH_CREDENTIALS_SOURCE;
import static org.apache.flink.formats.avro.registry.confluent.AvroConfluentFormatOptions.BEARER_AUTH_TOKEN;
import static org.apache.flink.formats.avro.registry.confluent.AvroConfluentFormatOptions.LOGICAL_CLUSTER;
import static org.apache.flink.formats.avro.registry.confluent.AvroConfluentFormatOptions.PROPERTIES;
import static org.apache.flink.formats.avro.registry.confluent.AvroConfluentFormatOptions.SASL_JAAS_CONFIG;
import static org.apache.flink.formats.avro.registry.confluent.AvroConfluentFormatOptions.SCHEMA;
import static org.apache.flink.formats.avro.registry.confluent.AvroConfluentFormatOptions.SSL_KEYSTORE_LOCATION;
import static org.apache.flink.formats.avro.registry.confluent.AvroConfluentFormatOptions.SSL_KEYSTORE_PASSWORD;
import static org.apache.flink.formats.avro.registry.confluent.AvroConfluentFormatOptions.SSL_TRUSTSTORE_LOCATION;
import static org.apache.flink.formats.avro.registry.confluent.AvroConfluentFormatOptions.SSL_TRUSTSTORE_PASSWORD;
import static org.apache.flink.formats.avro.registry.confluent.AvroConfluentFormatOptions.SUBJECT;
import static org.apache.flink.formats.avro.registry.confluent.AvroConfluentFormatOptions.TOKEN_ENDPOINT_URL;
import static org.apache.flink.formats.avro.registry.confluent.AvroConfluentFormatOptions.URL;

/**
Expand Down Expand Up @@ -186,6 +189,9 @@ public Set<ConfigOption<?>> optionalOptions() {
options.add(BASIC_AUTH_USER_INFO);
options.add(BEARER_AUTH_CREDENTIALS_SOURCE);
options.add(BEARER_AUTH_TOKEN);
options.add(TOKEN_ENDPOINT_URL);
options.add(SASL_JAAS_CONFIG);
options.add(LOGICAL_CLUSTER);
return options;
}

Expand All @@ -203,7 +209,10 @@ public Set<ConfigOption<?>> forwardOptions() {
BASIC_AUTH_CREDENTIALS_SOURCE,
BASIC_AUTH_USER_INFO,
BEARER_AUTH_CREDENTIALS_SOURCE,
BEARER_AUTH_TOKEN)
BEARER_AUTH_TOKEN,
TOKEN_ENDPOINT_URL,
SASL_JAAS_CONFIG,
LOGICAL_CLUSTER)
.collect(Collectors.toSet());
}

Expand Down Expand Up @@ -237,6 +246,15 @@ public Set<ConfigOption<?>> forwardOptions() {
formatOptions
.getOptional(BEARER_AUTH_TOKEN)
.ifPresent(v -> properties.put("bearer.auth.token", v));
formatOptions
.getOptional(TOKEN_ENDPOINT_URL)
.ifPresent(v -> properties.put("sasl.oauthbearer.token.endpoint.url", v));
formatOptions
.getOptional(SASL_JAAS_CONFIG)
.ifPresent(v -> properties.put("sasl.jaas.config", v));
formatOptions
.getOptional(LOGICAL_CLUSTER)
.ifPresent(v -> properties.put("bearer.auth.logical.cluster", v));

if (properties.isEmpty()) {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,16 @@
import static org.apache.flink.formats.avro.registry.confluent.AvroConfluentFormatOptions.BASIC_AUTH_USER_INFO;
import static org.apache.flink.formats.avro.registry.confluent.AvroConfluentFormatOptions.BEARER_AUTH_CREDENTIALS_SOURCE;
import static org.apache.flink.formats.avro.registry.confluent.AvroConfluentFormatOptions.BEARER_AUTH_TOKEN;
import static org.apache.flink.formats.avro.registry.confluent.AvroConfluentFormatOptions.LOGICAL_CLUSTER;
import static org.apache.flink.formats.avro.registry.confluent.AvroConfluentFormatOptions.PROPERTIES;
import static org.apache.flink.formats.avro.registry.confluent.AvroConfluentFormatOptions.SASL_JAAS_CONFIG;
import static org.apache.flink.formats.avro.registry.confluent.AvroConfluentFormatOptions.SCHEMA;
import static org.apache.flink.formats.avro.registry.confluent.AvroConfluentFormatOptions.SSL_KEYSTORE_LOCATION;
import static org.apache.flink.formats.avro.registry.confluent.AvroConfluentFormatOptions.SSL_KEYSTORE_PASSWORD;
import static org.apache.flink.formats.avro.registry.confluent.AvroConfluentFormatOptions.SSL_TRUSTSTORE_LOCATION;
import static org.apache.flink.formats.avro.registry.confluent.AvroConfluentFormatOptions.SSL_TRUSTSTORE_PASSWORD;
import static org.apache.flink.formats.avro.registry.confluent.AvroConfluentFormatOptions.SUBJECT;
import static org.apache.flink.formats.avro.registry.confluent.AvroConfluentFormatOptions.TOKEN_ENDPOINT_URL;
import static org.apache.flink.formats.avro.registry.confluent.AvroConfluentFormatOptions.URL;
import static org.apache.flink.formats.avro.registry.confluent.RegistryAvroFormatFactory.buildOptionalPropertiesMap;

Expand Down Expand Up @@ -178,6 +181,9 @@ public Set<ConfigOption<?>> optionalOptions() {
options.add(BASIC_AUTH_USER_INFO);
options.add(BEARER_AUTH_CREDENTIALS_SOURCE);
options.add(BEARER_AUTH_TOKEN);
options.add(TOKEN_ENDPOINT_URL);
options.add(SASL_JAAS_CONFIG);
options.add(LOGICAL_CLUSTER);
return options;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ class RegistryAvroFormatFactoryTest {
+ " }\n"
+ " ]\n"
+ "}";
private static final String TOKEN_ENDPOINT_URL = "http://localhost:8080/token";

private static final Map<String, String> EXPECTED_OPTIONAL_PROPERTIES = new HashMap<>();

Expand All @@ -109,6 +110,9 @@ class RegistryAvroFormatFactoryTest {
EXPECTED_OPTIONAL_PROPERTIES.put("basic.auth.credentials.source", "USER_INFO");
EXPECTED_OPTIONAL_PROPERTIES.put("basic.auth.user.info", "user:pwd");
EXPECTED_OPTIONAL_PROPERTIES.put("bearer.auth.token", "CUSTOM");
EXPECTED_OPTIONAL_PROPERTIES.put("sasl.oauthbearer.token.endpoint.url", TOKEN_ENDPOINT_URL);
EXPECTED_OPTIONAL_PROPERTIES.put("sasl.jaas.config", "custom.jaas.config");
EXPECTED_OPTIONAL_PROPERTIES.put("bearer.auth.logical.cluster", "test-cluster");
}

@Test
Expand Down Expand Up @@ -271,6 +275,9 @@ private Map<String, String> getOptionalProperties() {
properties.put(AvroConfluentFormatOptions.BASIC_AUTH_USER_INFO.key(), "user:pwd");
// defined via general property map
properties.put("properties.bearer.auth.token", "CUSTOM");
properties.put(AvroConfluentFormatOptions.TOKEN_ENDPOINT_URL.key(), TOKEN_ENDPOINT_URL);
properties.put(AvroConfluentFormatOptions.SASL_JAAS_CONFIG.key(), "custom.jaas.config");
properties.put(AvroConfluentFormatOptions.LOGICAL_CLUSTER.key(), "test-cluster");
properties.put("schema", SCHEMA_STRING);

return getModifiedOptions(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ class DebeziumAvroFormatFactoryTest {

private static final String SUBJECT = "test-debezium-avro";
private static final String REGISTRY_URL = "http://localhost:8081";
private static final String TOKEN_ENDPOINT_URL = "http://localhost:8080/token";

@Test
void testSeDeSchema() {
Expand Down Expand Up @@ -209,6 +210,9 @@ private Map<String, String> getRegistryConfigs() {
final Map<String, String> registryConfigs = new HashMap<>();
registryConfigs.put("basic.auth.user.info", "something1");
registryConfigs.put("basic.auth.credentials.source", "something2");
registryConfigs.put("sasl.oauthbearer.token.endpoint.url", TOKEN_ENDPOINT_URL);
registryConfigs.put("sasl.jaas.config", "something3");
registryConfigs.put("bearer.auth.logical.cluster", "something4");
return registryConfigs;
}

Expand All @@ -223,6 +227,9 @@ private Map<String, String> getAllOptions() {
options.put("debezium-avro-confluent.subject", SUBJECT);
options.put("debezium-avro-confluent.basic-auth.user-info", "something1");
options.put("debezium-avro-confluent.basic-auth.credentials-source", "something2");
options.put("debezium-avro-confluent.bearer-auth.token.endpoint.url", TOKEN_ENDPOINT_URL);
options.put("debezium-avro-confluent.bearer-auth.jaas.config", "something3");
options.put("debezium-avro-confluent.bearer-auth.logical.cluster", "something4");
return options;
}

Expand Down