Skip to content

Commit 91bc9ce

Browse files
andseljsvdmashhurs
authored
Adds schema registry's truststore and keystore settings (#137)
This commit mainly exposes location password and type settings for schema registry's secret and key stores. It brings those configuration options, if available, and directly forward down to the Kafka's SerDes library and Manticore client. Introduces a script named setup_keystore_and_truststore.sh to setup keystore and truststore used in integration tests. Furthermore it reworks a little bit the bash scripts to setup Kafka and Schema Registry in integration test to avoid download of artifacts if they are already locally downloaded. Co-authored-by: João Duarte <[email protected]> Co-authored-by: Mashhur <[email protected]>
1 parent 0462434 commit 91bc9ce

11 files changed

+241
-26
lines changed

.gitignore

+1
Original file line numberDiff line numberDiff line change
@@ -11,3 +11,4 @@ build/
1111
.idea/
1212
vendor
1313
*.jar
14+
tls_repository/

CHANGELOG.md

+3
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
## 11.2.0
2+
- Added TLS truststore and keystore settings specifically to access the schema registry [#137](https://github.com/logstash-plugins/logstash-integration-kafka/pull/137)
3+
14
## 11.1.0
25
- Added config `group_instance_id` to use the Kafka's consumer static membership feature [#135](https://github.com/logstash-plugins/logstash-integration-kafka/pull/135)
36

docs/input-kafka.asciidoc

+54
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,12 @@ See the https://kafka.apache.org/{kafka_client_doc}/documentation for more detai
135135
| <<plugins-{type}s-{plugin}-schema_registry_key>> |<<string,string>>|No
136136
| <<plugins-{type}s-{plugin}-schema_registry_proxy>> |<<uri,uri>>|No
137137
| <<plugins-{type}s-{plugin}-schema_registry_secret>> |<<string,string>>|No
138+
| <<plugins-{type}s-{plugin}-schema_registry_ssl_keystore_location>> |a valid filesystem path|No
139+
| <<plugins-{type}s-{plugin}-schema_registry_ssl_keystore_password>> |<<password,password>>|No
140+
| <<plugins-{type}s-{plugin}-schema_registry_ssl_keystore_type>> |<<string,string>>, one of `["jks", "PKCS12"]`|No
141+
| <<plugins-{type}s-{plugin}-schema_registry_ssl_truststore_location>> |a valid filesystem path|No
142+
| <<plugins-{type}s-{plugin}-schema_registry_ssl_truststore_password>> |<<password,password>>|No
143+
| <<plugins-{type}s-{plugin}-schema_registry_ssl_truststore_type>> |<<string,string>>, one of `["jks", "PKCS12"]`|No
138144
| <<plugins-{type}s-{plugin}-schema_registry_url>> |<<uri,uri>>|No
139145
| <<plugins-{type}s-{plugin}-schema_registry_validation>> |<<string,string>>|No
140146
| <<plugins-{type}s-{plugin}-security_protocol>> |<<string,string>>, one of `["PLAINTEXT", "SSL", "SASL_PLAINTEXT", "SASL_SSL"]`|No
@@ -598,6 +604,54 @@ Set the address of a forward HTTP proxy. An empty string is treated as if proxy
598604

599605
Set the password for basic authorization to access remote Schema Registry.
600606

607+
[id="plugins-{type}s-{plugin}-schema_registry_ssl_keystore_location"]
608+
===== `schema_registry_ssl_keystore_location`
609+
610+
* Value type is <<path,path>>
611+
* There is no default value for this setting.
612+
613+
If schema registry client authentication is required, this setting stores the keystore path.
614+
615+
[id="plugins-{type}s-{plugin}-schema_registry_ssl_keystore_password"]
616+
===== `schema_registry_ssl_keystore_password`
617+
618+
* Value type is <<password,password>>
619+
* There is no default value for this setting.
620+
621+
If schema registry authentication is required, this setting stores the keystore password.
622+
623+
[id="plugins-{type}s-{plugin}-schema_registry_ssl_keystore_type"]
624+
===== `schema_registry_ssl_keystore_type`
625+
626+
* Value type is <<string,string>>
627+
* There is no default value for this setting.
628+
629+
The format of the keystore file. It must be either `jks` or `PKCS12`.
630+
631+
[id="plugins-{type}s-{plugin}-schema_registry_ssl_truststore_location"]
632+
===== `schema_registry_ssl_truststore_location`
633+
634+
* Value type is <<path,path>>
635+
* There is no default value for this setting.
636+
637+
The truststore path to validate the schema registry's certificate.
638+
639+
[id="plugins-{type}s-{plugin}-schema_registry_ssl_truststore_password"]
640+
===== `schema_registry_ssl_truststore_password`
641+
642+
* Value type is <<password,password>>
643+
* There is no default value for this setting.
644+
645+
The schema registry truststore password.
646+
647+
[id="plugins-{type}s-{plugin}-schema_registry_ssl_truststore_type"]
648+
===== `schema_registry_ssl_truststore_type`
649+
650+
* Value type is <<string,string>>
651+
* There is no default value for this setting.
652+
653+
The format of the schema registry's truststore file. It must be either `jks` or `PKCS12`.
654+
601655
[id="plugins-{type}s-{plugin}-schema_registry_url"]
602656
===== `schema_registry_url`
603657

kafka_test_setup.sh

+36-4
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
# Setup Kafka and create test topics
33

44
set -ex
5+
# check if KAFKA_VERSION env var is set
56
if [ -n "${KAFKA_VERSION+1}" ]; then
67
echo "KAFKA_VERSION is $KAFKA_VERSION"
78
else
@@ -13,8 +14,12 @@ export _JAVA_OPTIONS="-Djava.net.preferIPv4Stack=true"
1314
rm -rf build
1415
mkdir build
1516

16-
echo "Downloading Kafka version $KAFKA_VERSION"
17-
curl -s -o build/kafka.tgz "https://archive.apache.org/dist/kafka/$KAFKA_VERSION/kafka_2.12-$KAFKA_VERSION.tgz"
17+
echo "Setup Kafka version $KAFKA_VERSION"
18+
if [ ! -e "kafka_2.12-$KAFKA_VERSION.tgz" ]; then
19+
echo "Kafka not present locally, downloading"
20+
curl -s -o "kafka_2.12-$KAFKA_VERSION.tgz" "https://archive.apache.org/dist/kafka/$KAFKA_VERSION/kafka_2.12-$KAFKA_VERSION.tgz"
21+
fi
22+
cp kafka_2.12-$KAFKA_VERSION.tgz build/kafka.tgz
1823
mkdir build/kafka && tar xzf build/kafka.tgz -C build/kafka --strip-components 1
1924

2025
echo "Starting ZooKeeper"
@@ -24,9 +29,36 @@ echo "Starting Kafka broker"
2429
build/kafka/bin/kafka-server-start.sh -daemon build/kafka/config/server.properties --override advertised.host.name=127.0.0.1 --override log.dirs="${PWD}/build/kafka-logs"
2530
sleep 10
2631

27-
echo "Downloading Confluent Platform"
28-
curl -s -o build/confluent_platform.tar.gz http://packages.confluent.io/archive/5.5/confluent-community-5.5.1-2.12.tar.gz
32+
echo "Setup Confluent Platform"
33+
# check if CONFLUENT_VERSION env var is set
34+
if [ -n "${CONFLUENT_VERSION+1}" ]; then
35+
echo "CONFLUENT_VERSION is $CONFLUENT_VERSION"
36+
else
37+
CONFLUENT_VERSION=5.5.1
38+
fi
39+
if [ ! -e confluent-community-$CONFLUENT_VERSION-2.12.tar.gz ]; then
40+
echo "Confluent Platform not present locally, downloading"
41+
CONFLUENT_MINOR=$(echo "$CONFLUENT_VERSION" | sed -n 's/^\([[:digit:]]*\.[[:digit:]]*\)\.[[:digit:]]*$/\1/p')
42+
echo "CONFLUENT_MINOR is $CONFLUENT_MINOR"
43+
curl -s -o confluent-community-$CONFLUENT_VERSION-2.12.tar.gz http://packages.confluent.io/archive/$CONFLUENT_MINOR/confluent-community-$CONFLUENT_VERSION-2.12.tar.gz
44+
fi
45+
cp confluent-community-$CONFLUENT_VERSION-2.12.tar.gz build/confluent_platform.tar.gz
2946
mkdir build/confluent_platform && tar xzf build/confluent_platform.tar.gz -C build/confluent_platform --strip-components 1
47+
48+
echo "Configuring TLS on Schema registry"
49+
rm -Rf tls_repository
50+
mkdir tls_repository
51+
./setup_keystore_and_truststore.sh
52+
# configure schema-registry to handle https on 8083 port
53+
if [[ "$OSTYPE" == "darwin"* ]]; then
54+
sed -i '' 's/http:\/\/0.0.0.0:8081/http:\/\/0.0.0.0:8081, https:\/\/0.0.0.0:8083/g' build/confluent_platform/etc/schema-registry/schema-registry.properties
55+
else
56+
sed -i 's/http:\/\/0.0.0.0:8081/http:\/\/0.0.0.0:8081, https:\/\/0.0.0.0:8083/g' build/confluent_platform/etc/schema-registry/schema-registry.properties
57+
fi
58+
echo "ssl.keystore.location=`pwd`/tls_repository/schema_reg.jks" >> build/confluent_platform/etc/schema-registry/schema-registry.properties
59+
echo "ssl.keystore.password=changeit" >> build/confluent_platform/etc/schema-registry/schema-registry.properties
60+
echo "ssl.key.password=changeit" >> build/confluent_platform/etc/schema-registry/schema-registry.properties
61+
3062
cp build/confluent_platform/etc/schema-registry/schema-registry.properties build/confluent_platform/etc/schema-registry/authed-schema-registry.properties
3163
echo "authentication.method=BASIC" >> build/confluent_platform/etc/schema-registry/authed-schema-registry.properties
3264
echo "authentication.roles=admin,developer,user,sr-user" >> build/confluent_platform/etc/schema-registry/authed-schema-registry.properties

kafka_test_teardown.sh

+3
Original file line numberDiff line numberDiff line change
@@ -10,3 +10,6 @@ echo "Stopping Kafka broker"
1010
build/kafka/bin/kafka-server-stop.sh
1111
echo "Stopping zookeeper"
1212
build/kafka/bin/zookeeper-server-stop.sh
13+
14+
echo "Clean TLS folder"
15+
rm -Rf tls_repository

lib/logstash/inputs/kafka.rb

+11
Original file line numberDiff line numberDiff line change
@@ -460,6 +460,17 @@ def create_consumer(client_id, group_instance_id)
460460
set_trustore_keystore_config(props)
461461
set_sasl_config(props)
462462
end
463+
if schema_registry_ssl_truststore_location
464+
props.put('schema.registry.ssl.truststore.location', schema_registry_ssl_truststore_location)
465+
props.put('schema.registry.ssl.truststore.password', schema_registry_ssl_truststore_password.value)
466+
props.put('schema.registry.ssl.truststore.type', schema_registry_ssl_truststore_type)
467+
end
468+
469+
if schema_registry_ssl_keystore_location
470+
props.put('schema.registry.ssl.keystore.location', schema_registry_ssl_keystore_location)
471+
props.put('schema.registry.ssl.keystore.password', schema_registry_ssl_keystore_password.value)
472+
props.put('schema.registry.ssl.keystore.type', schema_registry_ssl_keystore_type)
473+
end
463474

464475
org.apache.kafka.clients.consumer.KafkaConsumer.new(props)
465476
rescue => e

lib/logstash/plugin_mixins/kafka/avro_schema_registry.rb

+31
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,24 @@ def setup_schema_registry_config
2424
# This option permits to define a proxy to be used to reach the schema registry service instance.
2525
config :schema_registry_proxy, :validate => :uri
2626

27+
# If schema registry client authentication is required, this setting stores the keystore path.
28+
config :schema_registry_ssl_keystore_location, :validate => :string
29+
30+
# The keystore password.
31+
config :schema_registry_ssl_keystore_password, :validate => :password
32+
33+
# The keystore type
34+
config :schema_registry_ssl_keystore_type, :validate => ['jks', 'PKCS12'], :default => "jks"
35+
36+
# The JKS truststore path to validate the Schema Registry's certificate.
37+
config :schema_registry_ssl_truststore_location, :validate => :string
38+
39+
# The truststore password.
40+
config :schema_registry_ssl_truststore_password, :validate => :password
41+
42+
# The truststore type
43+
config :schema_registry_ssl_truststore_type, :validate => ['jks', 'PKCS12'], :default => "jks"
44+
2745
# Option to skip validating the schema registry during registration. This can be useful when using
2846
# certificate based auth
2947
config :schema_registry_validation, :validate => ['auto', 'skip'], :default => 'auto'
@@ -68,6 +86,19 @@ def check_for_schema_registry_connectivity_and_subjects
6886
if schema_registry_key and !schema_registry_key.empty?
6987
options[:auth] = {:user => schema_registry_key, :password => schema_registry_secret.value}
7088
end
89+
if schema_registry_ssl_truststore_location and !schema_registry_ssl_truststore_location.empty?
90+
options[:ssl] = {} unless options.key?(:ssl)
91+
options[:ssl][:truststore] = schema_registry_ssl_truststore_location unless schema_registry_ssl_truststore_location.nil?
92+
options[:ssl][:truststore_password] = schema_registry_ssl_truststore_password.value unless schema_registry_ssl_truststore_password.nil?
93+
options[:ssl][:truststore_type] = schema_registry_ssl_truststore_type unless schema_registry_ssl_truststore_type.nil?
94+
end
95+
if schema_registry_ssl_keystore_location and !schema_registry_ssl_keystore_location.empty?
96+
options[:ssl] = {} unless options.key? :ssl
97+
options[:ssl][:keystore] = schema_registry_ssl_keystore_location unless schema_registry_ssl_keystore_location.nil?
98+
options[:ssl][:keystore_password] = schema_registry_ssl_keystore_password.value unless schema_registry_ssl_keystore_password.nil?
99+
options[:ssl][:keystore_type] = schema_registry_ssl_keystore_type unless schema_registry_ssl_keystore_type.nil?
100+
end
101+
71102
client = Manticore::Client.new(options)
72103
begin
73104
response = client.get(@schema_registry_url.uri.to_s + '/subjects').body

logstash-integration-kafka.gemspec

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
Gem::Specification.new do |s|
22
s.name = 'logstash-integration-kafka'
3-
s.version = '11.1.0'
3+
s.version = '11.2.0'
44
s.licenses = ['Apache-2.0']
55
s.summary = "Integration with Kafka - input and output plugins"
66
s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline "+

setup_keystore_and_truststore.sh

+12
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
#!/bin/bash
2+
# Setup Schema Registry keystore and Kafka's schema registry client's truststore
3+
set -ex
4+
5+
echo "Generating schema registry key store"
6+
keytool -genkey -alias schema_reg -keyalg RSA -keystore tls_repository/schema_reg.jks -keypass changeit -storepass changeit -validity 365 -keysize 2048 -dname "CN=localhost, OU=John Doe, O=Acme Inc, L=Unknown, ST=Unknown, C=IT"
7+
8+
echo "Exporting schema registry certificate"
9+
keytool -exportcert -rfc -keystore tls_repository/schema_reg.jks -storepass changeit -alias schema_reg -file tls_repository/schema_reg_certificate.pem
10+
11+
echo "Creating client's truststore and importing schema registry's certificate"
12+
keytool -import -trustcacerts -file tls_repository/schema_reg_certificate.pem -keypass changeit -storepass changeit -keystore tls_repository/clienttruststore.jks -noprompt

0 commit comments

Comments
 (0)