From 7abe9527b4df198266f4d155dc44ab7bca469fa6 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Sun, 23 Apr 2023 12:47:26 +0800 Subject: [PATCH] [Schema Registry] Force authorization when kafkaEnableMultiTenantMetadata is false ### Motivation Currently when the schema registry and authorization are both enabled, the authorization is only performed when `kafkaEnableMultiTenantMetadata` is false. It's not reasonable because the role of the token must have the write permission to the schema registry topic. ### Modifications - In `performAuthorizationValidation`, do not check `kafkaEnableMultiTenantMetadata`. - Add the `testSchemaWrongAuth` to verify that a wrong role cannot be used to create an Avro produce. - Separate the default namespace and the default schema namespace in `KafkaAuthorizationTestBase` so that the authorization requirements will be clear. --- .../handlers/kop/SchemaRegistryManager.java | 4 +- .../kop/KafkaAuthorizationTestBase.java | 58 ++++++++++++++----- 2 files changed, 44 insertions(+), 18 deletions(-) diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/SchemaRegistryManager.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/SchemaRegistryManager.java index fac8615d85..77937224e8 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/SchemaRegistryManager.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/SchemaRegistryManager.java @@ -130,7 +130,7 @@ public String authenticate(FullHttpRequest request) throws SchemaStorageExceptio private void performAuthorizationValidation(String username, String role, String tenant) throws SchemaStorageException { - if (kafkaConfig.isAuthorizationEnabled() && kafkaConfig.isKafkaEnableMultiTenantMetadata()) { + if (kafkaConfig.isAuthorizationEnabled()) { KafkaPrincipal kafkaPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, role, username, null); String topicName = MetadataUtils.constructSchemaRegistryTopicName(tenant, kafkaConfig); try { @@ -138,7 +138,7 @@ private void performAuthorizationValidation(String username, String role, String authorizer.canAccessTenantAsync(kafkaPrincipal, Resource.of(ResourceType.TENANT, tenant)) .get(); if (tenantExists == null || !tenantExists) { - log.debug("SchemaRegistry username {} role {} tenant {} does not exist", + log.debug("SchemaRegistry username {} role {} tenant {} does not exist {}", username, role, tenant, topicName); throw new SchemaStorageException("Role " + role + " cannot access topic " + topicName + " " + "tenant " + tenant + " does not exist (wrong username?)", diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaAuthorizationTestBase.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaAuthorizationTestBase.java index bd0d8a6ee9..7f422996be 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaAuthorizationTestBase.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaAuthorizationTestBase.java @@ -21,10 +21,12 @@ import static org.testng.AssertJUnit.fail; import com.google.common.collect.Sets; +import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException; import io.confluent.kafka.serializers.KafkaAvroDeserializer; import io.confluent.kafka.serializers.KafkaAvroSerializer; import io.confluent.kafka.serializers.KafkaAvroSerializerConfig; import io.jsonwebtoken.SignatureAlgorithm; +import io.netty.handler.codec.http.HttpResponseStatus; import java.time.Duration; import java.util.Collections; import java.util.List; @@ -81,6 +83,7 @@ public abstract class KafkaAuthorizationTestBase extends KopProtocolHandlerTestB protected static final String TENANT = "KafkaAuthorizationTest"; protected static final String NAMESPACE = "ns1"; + private static final String SCHEMA_NAMESPACE = "ns2"; private static final String SHORT_TOPIC = "topic1"; private static final String TOPIC = "persistent://" + TENANT + "/" + NAMESPACE + "/" + SHORT_TOPIC; private static final SecretKey secretKey = AuthTokenUtils.createSecretKey(SignatureAlgorithm.HS256); @@ -117,7 +120,7 @@ protected void setup() throws Exception { conf.setKafkaMetadataNamespace("__kafka"); conf.setKafkaTenant(TENANT); conf.setKafkaNamespace(NAMESPACE); - conf.setKopSchemaRegistryNamespace(NAMESPACE); + conf.setKopSchemaRegistryNamespace(SCHEMA_NAMESPACE); conf.setClusterName(super.configClusterName); conf.setAuthorizationEnabled(true); @@ -709,18 +712,20 @@ public static Object[][] tokenPrefix() { // this test creates the schema registry topic, and this may interfere with other tests @Test(timeOut = 30000, priority = 1000, dataProvider = "tokenPrefix") public void testAvroProduceAndConsumeWithAuth(boolean withTokenPrefix) throws Exception { - - if (conf.isKafkaEnableMultiTenantMetadata()) { - // ensure that the KOP metadata namespace exists and that the user can write to it - // because we require "produce" permissions on the Schema Registry Topic - // while working in Multi Tenant mode - if (!admin.namespaces().getNamespaces(TENANT).contains(TENANT + "/" + conf.getKafkaMetadataNamespace())) { - admin.namespaces().createNamespace(TENANT + "/" + conf.getKafkaMetadataNamespace()); - } - admin.namespaces() - .grantPermissionOnNamespace(TENANT + "/" + conf.getKafkaMetadataNamespace(), SIMPLE_USER, - Sets.newHashSet(AuthAction.produce, AuthAction.consume)); + final String tenant = (conf.isKafkaEnableMultiTenantMetadata() ? TENANT : conf.getKafkaMetadataTenant()); + final String namespace = tenant + "/" + conf.getKafkaMetadataNamespace(); + final var namespaces = admin.namespaces().getNamespaces(tenant); + if (!namespaces.contains(namespace)) { + admin.namespaces().createNamespace(namespace); + } + admin.namespaces().grantPermissionOnNamespace(namespace, SIMPLE_USER, + Sets.newHashSet(AuthAction.produce, AuthAction.consume)); + final String schemaNamespace = tenant + "/" + conf.getKopSchemaRegistryNamespace(); + if (!namespaces.contains(schemaNamespace)) { + admin.namespaces().createNamespace(schemaNamespace); } + admin.namespaces().grantPermissionOnNamespace(schemaNamespace, SIMPLE_USER, + Sets.newHashSet(AuthAction.produce)); String topic = "SchemaRegistryTest-testAvroProduceAndConsumeWithAuth" + withTokenPrefix; IndexedRecord avroRecord = createAvroRecord(); @@ -754,6 +759,22 @@ public void testAvroProduceAndConsumeWithAuth(boolean withTokenPrefix) throws Ex consumer.close(); } + @Test(timeOut = 30000) + public void testSchemaWrongAuth() { + final var wrongToken = AuthTokenUtils.createToken(secretKey, "wrong-user", Optional.empty()); + final KafkaProducer producer = createAvroProducer(false, wrongToken); + try { + producer.send(new ProducerRecord<>("test-avro-wrong-auth", createAvroRecord())).get(); + fail(); + } catch (Exception e) { + assertTrue(e.getCause() instanceof RestClientException); + var restException = (RestClientException) e.getCause(); + assertEquals(restException.getErrorCode(), HttpResponseStatus.FORBIDDEN.code()); + assertTrue(restException.getMessage().contains("cannot access topic")); + } + producer.close(); + } + private IndexedRecord createAvroRecord() { String userSchema = "{\"namespace\": \"example.avro\", \"type\": \"record\", " @@ -766,6 +787,10 @@ private IndexedRecord createAvroRecord() { } private KafkaProducer createAvroProducer(boolean withTokenPrefix) { + return createAvroProducer(withTokenPrefix, userToken); + } + + private KafkaProducer createAvroProducer(boolean withTokenPrefix, String schemaToken) { Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" + getClientPort()); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class); @@ -783,10 +808,11 @@ private KafkaProducer createAvroProducer(boolean withTokenPrefi props.put("sasl.mechanism", "PLAIN"); - props.put(KafkaAvroSerializerConfig.BASIC_AUTH_CREDENTIALS_SOURCE, "USER_INFO"); - - props.put(KafkaAvroSerializerConfig.USER_INFO_CONFIG, - username + ":" + (withTokenPrefix ? password : userToken)); + if (schemaToken != null) { + props.put(KafkaAvroSerializerConfig.BASIC_AUTH_CREDENTIALS_SOURCE, "USER_INFO"); + props.put(KafkaAvroSerializerConfig.USER_INFO_CONFIG, + username + ":" + (withTokenPrefix ? password : userToken)); + } return new KafkaProducer<>(props); }