From fcde386e579a276000e6e27b74875c7c87b70c6d Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Fri, 14 Jul 2023 18:01:26 +0800 Subject: [PATCH] [Schema Registry] Force authorization when kafkaEnableMultiTenantMetadata is false ### Motivation Currently when the schema registry and authorization are both enabled, the authorization is only performed when `kafkaEnableMultiTenantMetadata` is false. It's not reasonable because the role of the token must have the write permission to the schema registry topic. ### Modifications - In `performAuthorizationValidation`, do not chec `kafkaEnableMultiTenantMetadata`. - Add the `testSchemaWrongAuth` to verify that a wrong role cannot be used to create an Avro produce. - Separate the default namespace and the default schema namespace in `KafkaAuthorizationTestBase` so that the permission requirements will be clear. --- .../handlers/kop/SchemaRegistryManager.java | 8 +-- .../auth/KafkaAuthorizationTestBase.java | 49 ++++++++++++------- 2 files changed, 37 insertions(+), 20 deletions(-) diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/SchemaRegistryManager.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/SchemaRegistryManager.java index c218719d28..395bbb77f2 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/SchemaRegistryManager.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/SchemaRegistryManager.java @@ -46,6 +46,7 @@ import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.broker.authentication.AuthenticationDataSource; import org.apache.pulsar.broker.authentication.AuthenticationProvider; import org.apache.pulsar.broker.authentication.AuthenticationService; import org.apache.pulsar.broker.authentication.AuthenticationState; @@ -135,16 +136,17 @@ public String authenticate(FullHttpRequest request) throws SchemaStorageExceptio private void performAuthorizationValidation(String username, String role, String tenant) throws SchemaStorageException { - if (kafkaConfig.isAuthorizationEnabled() && kafkaConfig.isKafkaEnableMultiTenantMetadata()) { + if (kafkaConfig.isAuthorizationEnabled()) { KafkaPrincipal kafkaPrincipal = - new KafkaPrincipal(KafkaPrincipal.USER_TYPE, role, username, null, null); + new KafkaPrincipal(KafkaPrincipal.USER_TYPE, role, username, null, + new AuthenticationDataSource() {}); String topicName = MetadataUtils.constructSchemaRegistryTopicName(tenant, kafkaConfig); try { Boolean tenantExists = authorizer.canAccessTenantAsync(kafkaPrincipal, Resource.of(ResourceType.TENANT, tenant)) .get(); if (tenantExists == null || !tenantExists) { - log.debug("SchemaRegistry username {} role {} tenant {} does not exist", + log.debug("SchemaRegistry username {} role {} tenant {} does not exist {}", username, role, tenant, topicName); throw new SchemaStorageException("Role " + role + " cannot access topic " + topicName + " " + "tenant " + tenant + " does not exist (wrong username?)", diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/security/auth/KafkaAuthorizationTestBase.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/security/auth/KafkaAuthorizationTestBase.java index 325edf8b8c..d4f3177f9a 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/security/auth/KafkaAuthorizationTestBase.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/security/auth/KafkaAuthorizationTestBase.java @@ -84,6 +84,7 @@ public abstract class KafkaAuthorizationTestBase extends KopProtocolHandlerTestB protected static final String TENANT = "KafkaAuthorizationTest"; protected static final String NAMESPACE = "ns1"; + private static final String SCHEMA_NAMESPACE = "ns2"; private static final String SHORT_TOPIC = "topic1"; private static final String TOPIC = "persistent://" + TENANT + "/" + NAMESPACE + "/" + SHORT_TOPIC; private static final SecretKey secretKey = AuthTokenUtils.createSecretKey(SignatureAlgorithm.HS256); @@ -120,7 +121,7 @@ protected void setup() throws Exception { conf.setKafkaMetadataNamespace("__kafka"); conf.setKafkaTenant(TENANT); conf.setKafkaNamespace(NAMESPACE); - conf.setKopSchemaRegistryNamespace(NAMESPACE); + conf.setKopSchemaRegistryNamespace(SCHEMA_NAMESPACE); conf.setClusterName(super.configClusterName); conf.setAuthorizationEnabled(true); @@ -712,18 +713,16 @@ public static Object[][] tokenPrefix() { // this test creates the schema registry topic, and this may interfere with other tests @Test(timeOut = 30000, priority = 1000, dataProvider = "tokenPrefix") public void testAvroProduceAndConsumeWithAuth(boolean withTokenPrefix) throws Exception { - - if (conf.isKafkaEnableMultiTenantMetadata()) { - // ensure that the KOP metadata namespace exists and that the user can write to it - // because we require "produce" permissions on the Schema Registry Topic - // while working in Multi Tenant mode - if (!admin.namespaces().getNamespaces(TENANT).contains(TENANT + "/" + conf.getKafkaMetadataNamespace())) { - admin.namespaces().createNamespace(TENANT + "/" + conf.getKafkaMetadataNamespace()); - } - admin.namespaces() - .grantPermissionOnNamespace(TENANT + "/" + conf.getKafkaMetadataNamespace(), SIMPLE_USER, - Sets.newHashSet(AuthAction.produce, AuthAction.consume)); + admin.namespaces().grantPermissionOnNamespace(conf.getKafkaTenant() + "/" + conf.getKafkaNamespace(), + SIMPLE_USER, Sets.newHashSet(AuthAction.produce, AuthAction.consume)); + final String tenant = (conf.isKafkaEnableMultiTenantMetadata() ? TENANT : conf.getKafkaMetadataTenant()); + final var namespaces = admin.namespaces().getNamespaces(tenant); + final String schemaNamespace = tenant + "/" + conf.getKopSchemaRegistryNamespace(); + if (!namespaces.contains(schemaNamespace)) { + admin.namespaces().createNamespace(schemaNamespace); } + admin.namespaces().grantPermissionOnNamespace(schemaNamespace, SIMPLE_USER, + Sets.newHashSet(AuthAction.produce)); String topic = "SchemaRegistryTest-testAvroProduceAndConsumeWithAuth" + withTokenPrefix; IndexedRecord avroRecord = createAvroRecord(); @@ -759,7 +758,7 @@ public void testAvroProduceAndConsumeWithAuth(boolean withTokenPrefix) throws Ex @Test(timeOut = 30000) public void testSchemaNoAuth() { - final KafkaProducer producer = createAvroProducer(false, false); + final KafkaProducer producer = createAvroProducer(false, null); try { producer.send(new ProducerRecord<>("test-avro-wrong-auth", createAvroRecord())).get(); fail(); @@ -772,6 +771,22 @@ public void testSchemaNoAuth() { producer.close(); } + @Test(timeOut = 30000) + public void testSchemaWrongAuth() { + final var wrongToken = AuthTokenUtils.createToken(secretKey, "wrong-user", Optional.empty()); + final KafkaProducer producer = createAvroProducer(false, wrongToken); + try { + producer.send(new ProducerRecord<>("test-avro-wrong-auth", createAvroRecord())).get(); + fail(); + } catch (Exception e) { + assertTrue(e.getCause() instanceof RestClientException); + var restException = (RestClientException) e.getCause(); + assertEquals(restException.getErrorCode(), HttpResponseStatus.FORBIDDEN.code()); + assertTrue(restException.getMessage().contains("cannot access topic")); + } + producer.close(); + } + private IndexedRecord createAvroRecord() { String userSchema = "{\"namespace\": \"example.avro\", \"type\": \"record\", " + "\"name\": \"User\", \"fields\": [{\"name\": \"name\", \"type\": \"string\"}]}"; @@ -783,10 +798,10 @@ private IndexedRecord createAvroRecord() { } private KafkaProducer createAvroProducer(boolean withTokenPrefix) { - return createAvroProducer(withTokenPrefix, true); + return createAvroProducer(withTokenPrefix, userToken); } - private KafkaProducer createAvroProducer(boolean withTokenPrefix, boolean withSchemaToken) { + private KafkaProducer createAvroProducer(boolean withTokenPrefix, String schemaToken) { Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" + getClientPort()); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class); @@ -803,10 +818,10 @@ private KafkaProducer createAvroProducer(boolean withTokenPrefi props.put("security.protocol", "SASL_PLAINTEXT"); props.put("sasl.mechanism", "PLAIN"); - if (withSchemaToken) { + if (schemaToken != null) { props.put(KafkaAvroSerializerConfig.BASIC_AUTH_CREDENTIALS_SOURCE, "USER_INFO"); props.put(KafkaAvroSerializerConfig.USER_INFO_CONFIG, - username + ":" + (withTokenPrefix ? password : userToken)); + username + ":" + (withTokenPrefix ? password : schemaToken)); } return new KafkaProducer<>(props);