From 79100753422edf7caa0d6cb5a81931d49715b2e1 Mon Sep 17 00:00:00 2001 From: vchag Date: Sun, 2 Nov 2025 21:44:58 -0800 Subject: [PATCH 1/4] Introducing changes to enable forwarding of Polaris events to AWS CloudWatch --- .../src/main/resources/application.properties | 1 + .../PolarisIcebergObjectMapperCustomizer.java | 9 + .../events/json/mixins/IcebergMixins.java | 48 + .../json/mixins/PolarisEventBaseMixin.java | 28 + .../PropertyMapEventListener.java | 43 - .../AllEventsForwardingListener.java | 854 ++++++++++++++++++ .../AwsCloudWatchConfiguration.java | 9 +- .../AwsCloudWatchEventListener.java | 91 +- .../AwsCloudWatchEventListenerTest.java | 66 +- .../cloudwatch/Dockerfile-localstack-version | 0 10 files changed, 1080 insertions(+), 69 deletions(-) create mode 100644 runtime/service/src/main/java/org/apache/polaris/service/events/json/mixins/IcebergMixins.java create mode 100644 runtime/service/src/main/java/org/apache/polaris/service/events/json/mixins/PolarisEventBaseMixin.java delete mode 100644 runtime/service/src/main/java/org/apache/polaris/service/events/jsonEventListener/PropertyMapEventListener.java create mode 100644 runtime/service/src/main/java/org/apache/polaris/service/events/listeners/AllEventsForwardingListener.java rename runtime/service/src/main/java/org/apache/polaris/service/events/{jsonEventListener => listeners}/aws/cloudwatch/AwsCloudWatchConfiguration.java (91%) rename runtime/service/src/main/java/org/apache/polaris/service/events/{jsonEventListener => listeners}/aws/cloudwatch/AwsCloudWatchEventListener.java (70%) rename runtime/service/src/test/java/org/apache/polaris/service/events/{jsonEventListener => listeners}/aws/cloudwatch/AwsCloudWatchEventListenerTest.java (82%) rename runtime/service/src/test/resources/org/apache/polaris/service/events/{jsonEventListener => listeners}/aws/cloudwatch/Dockerfile-localstack-version (100%) diff --git a/runtime/defaults/src/main/resources/application.properties b/runtime/defaults/src/main/resources/application.properties index 91d41f9256..f8470bdee1 100644 --- a/runtime/defaults/src/main/resources/application.properties +++ b/runtime/defaults/src/main/resources/application.properties @@ -144,6 +144,7 @@ polaris.event-listener.type=no-op # polaris.event-listener.aws-cloudwatch.log-stream=polaris-cloudwatch-default-stream # polaris.event-listener.aws-cloudwatch.region=us-east-1 # polaris.event-listener.aws-cloudwatch.synchronous-mode=false +# polaris.event-listener.aws-cloudwatch.event-types= // the absence of this property would result if processing all Polaris event types. polaris.log.request-id-header-name=Polaris-Request-Id # polaris.log.mdc.aid=polaris diff --git a/runtime/service/src/main/java/org/apache/polaris/service/config/PolarisIcebergObjectMapperCustomizer.java b/runtime/service/src/main/java/org/apache/polaris/service/config/PolarisIcebergObjectMapperCustomizer.java index fded73d704..b36d81a30f 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/config/PolarisIcebergObjectMapperCustomizer.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/config/PolarisIcebergObjectMapperCustomizer.java @@ -30,7 +30,12 @@ import io.smallrye.config.WithConverter; import jakarta.inject.Inject; import jakarta.inject.Singleton; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.rest.RESTSerializers; +import org.apache.polaris.service.events.PolarisEvent; +import org.apache.polaris.service.events.json.mixins.IcebergMixins; +import org.apache.polaris.service.events.json.mixins.PolarisEventBaseMixin; import org.eclipse.microprofile.config.inject.ConfigProperty; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -56,6 +61,10 @@ public void customize(ObjectMapper objectMapper) { objectMapper.setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY); objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); objectMapper.setPropertyNamingStrategy(new PropertyNamingStrategies.KebabCaseStrategy()); + objectMapper.addMixIn(PolarisEvent.class, PolarisEventBaseMixin.class); + objectMapper.addMixIn(TableIdentifier.class, IcebergMixins.TableIdentifierMixin.class); + objectMapper.addMixIn(Namespace.class, IcebergMixins.NamespaceMixin.class); + RESTSerializers.registerAll(objectMapper); Serializers.registerSerializers(objectMapper); objectMapper diff --git a/runtime/service/src/main/java/org/apache/polaris/service/events/json/mixins/IcebergMixins.java b/runtime/service/src/main/java/org/apache/polaris/service/events/json/mixins/IcebergMixins.java new file mode 100644 index 0000000000..7f9dcd1f52 --- /dev/null +++ b/runtime/service/src/main/java/org/apache/polaris/service/events/json/mixins/IcebergMixins.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.polaris.service.events.json.mixins; + +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.iceberg.catalog.Namespace; + +/** Mixins for Iceberg classes we don't control, to keep JSON concise. */ +public final class IcebergMixins { + + // Private constructor to prevent instantiation + private IcebergMixins() {} + + /** Serializes Namespace as an object like: "namespace": ["sales", "north.america"] */ + public abstract static class NamespaceMixin { + @JsonProperty("namespace") + public abstract String[] levels(); + } + + /** + * Serializes TableIdentifier as a scalar string like: {"namespace": ["sales", "north.america"], + * "name": "transactions"} + */ + public abstract static class TableIdentifierMixin { + @JsonProperty("namespace") + public abstract Namespace namespace(); + + @JsonProperty("name") + public abstract String name(); + } +} diff --git a/runtime/service/src/main/java/org/apache/polaris/service/events/json/mixins/PolarisEventBaseMixin.java b/runtime/service/src/main/java/org/apache/polaris/service/events/json/mixins/PolarisEventBaseMixin.java new file mode 100644 index 0000000000..7ea183e3c1 --- /dev/null +++ b/runtime/service/src/main/java/org/apache/polaris/service/events/json/mixins/PolarisEventBaseMixin.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.polaris.service.events.json.mixins; + +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.databind.PropertyNamingStrategies; +import com.fasterxml.jackson.databind.annotation.JsonNaming; + +@JsonInclude(JsonInclude.Include.NON_NULL) +@JsonNaming(PropertyNamingStrategies.SnakeCaseStrategy.class) +public abstract class PolarisEventBaseMixin {} diff --git a/runtime/service/src/main/java/org/apache/polaris/service/events/jsonEventListener/PropertyMapEventListener.java b/runtime/service/src/main/java/org/apache/polaris/service/events/jsonEventListener/PropertyMapEventListener.java deleted file mode 100644 index 63311fc035..0000000000 --- a/runtime/service/src/main/java/org/apache/polaris/service/events/jsonEventListener/PropertyMapEventListener.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.polaris.service.events.jsonEventListener; - -import java.util.HashMap; -import org.apache.polaris.service.events.IcebergRestCatalogEvents; -import org.apache.polaris.service.events.listeners.PolarisEventListener; - -/** - * This class provides a common framework for transforming Polaris events into a HashMap, which can - * be used to transform the event further, such as transforming into a JSON string, and send them to - * various destinations. Concrete implementations should override the - * {{@code @link#transformAndSendEvent(HashMap)}} method to define how the event data should be - * transformed into a JSON string, transmitted, and/or stored. - */ -public abstract class PropertyMapEventListener implements PolarisEventListener { - protected abstract void transformAndSendEvent(HashMap properties); - - @Override - public void onAfterRefreshTable(IcebergRestCatalogEvents.AfterRefreshTableEvent event) { - HashMap properties = new HashMap<>(); - properties.put("event_type", event.getClass().getSimpleName()); - properties.put("table_identifier", event.tableIdentifier().toString()); - transformAndSendEvent(properties); - } -} diff --git a/runtime/service/src/main/java/org/apache/polaris/service/events/listeners/AllEventsForwardingListener.java b/runtime/service/src/main/java/org/apache/polaris/service/events/listeners/AllEventsForwardingListener.java new file mode 100644 index 0000000000..ddb1e763e7 --- /dev/null +++ b/runtime/service/src/main/java/org/apache/polaris/service/events/listeners/AllEventsForwardingListener.java @@ -0,0 +1,854 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.polaris.service.events.listeners; + +import org.apache.polaris.service.events.AfterAttemptTaskEvent; +import org.apache.polaris.service.events.BeforeAttemptTaskEvent; +import org.apache.polaris.service.events.BeforeLimitRequestRateEvent; +import org.apache.polaris.service.events.CatalogGenericTableServiceEvents; +import org.apache.polaris.service.events.CatalogPolicyServiceEvents; +import org.apache.polaris.service.events.CatalogsServiceEvents; +import org.apache.polaris.service.events.IcebergRestCatalogEvents; +import org.apache.polaris.service.events.PolarisEvent; +import org.apache.polaris.service.events.PrincipalRolesServiceEvents; +import org.apache.polaris.service.events.PrincipalsServiceEvents; + +/** + * Base class for event listeners that with to generically forward all {@link PolarisEvent + * PolarisEvents} to an external sink. + * + *

This design follows the Template Method pattern, centralizing shared control flow in the base + * class while allowing subclasses to supply the event-specific behavior. + */ +public abstract class AllEventsForwardingListener implements PolarisEventListener { + + /** Subclasses implement the actual logic once, generically. */ + protected abstract void handle(PolarisEvent event); + + /** Optional filter (config-based). Default: handle all. */ + protected boolean shouldHandle(Object event) { + return true; + } + + @Override + public void onAfterGetCatalog(CatalogsServiceEvents.AfterGetCatalogEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onBeforeCreateCatalog(CatalogsServiceEvents.BeforeCreateCatalogEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onAfterCreateCatalog(CatalogsServiceEvents.AfterCreateCatalogEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onBeforeDeleteCatalog(CatalogsServiceEvents.BeforeDeleteCatalogEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onAfterDeleteCatalog(CatalogsServiceEvents.AfterDeleteCatalogEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onBeforeGetCatalog(CatalogsServiceEvents.BeforeGetCatalogEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onBeforeUpdateCatalog(CatalogsServiceEvents.BeforeUpdateCatalogEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onAfterUpdateCatalog(CatalogsServiceEvents.AfterUpdateCatalogEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onBeforeListCatalogs(CatalogsServiceEvents.BeforeListCatalogsEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onAfterListCatalogs(CatalogsServiceEvents.AfterListCatalogsEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onBeforeCreatePrincipal(PrincipalsServiceEvents.BeforeCreatePrincipalEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onAfterCreatePrincipal(PrincipalsServiceEvents.AfterCreatePrincipalEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onBeforeDeletePrincipal(PrincipalsServiceEvents.BeforeDeletePrincipalEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onAfterDeletePrincipal(PrincipalsServiceEvents.AfterDeletePrincipalEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onBeforeGetPrincipal(PrincipalsServiceEvents.BeforeGetPrincipalEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onAfterGetPrincipal(PrincipalsServiceEvents.AfterGetPrincipalEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onBeforeUpdatePrincipal(PrincipalsServiceEvents.BeforeUpdatePrincipalEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onAfterUpdatePrincipal(PrincipalsServiceEvents.AfterUpdatePrincipalEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onBeforeRotateCredentials( + PrincipalsServiceEvents.BeforeRotateCredentialsEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onAfterRotateCredentials(PrincipalsServiceEvents.AfterRotateCredentialsEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onBeforeListPrincipals(PrincipalsServiceEvents.BeforeListPrincipalsEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onAfterListPrincipals(PrincipalsServiceEvents.AfterListPrincipalsEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onBeforeResetCredentials(PrincipalsServiceEvents.BeforeResetCredentialsEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onAfterResetCredentials(PrincipalsServiceEvents.AfterResetCredentialsEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onBeforeAssignPrincipalRole( + PrincipalsServiceEvents.BeforeAssignPrincipalRoleEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onAfterAssignPrincipalRole( + PrincipalsServiceEvents.AfterAssignPrincipalRoleEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onBeforeRevokePrincipalRole( + PrincipalsServiceEvents.BeforeRevokePrincipalRoleEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onAfterRevokePrincipalRole( + PrincipalsServiceEvents.AfterRevokePrincipalRoleEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onBeforeListAssignedPrincipalRoles( + PrincipalsServiceEvents.BeforeListAssignedPrincipalRolesEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onAfterListAssignedPrincipalRoles( + PrincipalsServiceEvents.AfterListAssignedPrincipalRolesEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onBeforeCreatePrincipalRole( + PrincipalRolesServiceEvents.BeforeCreatePrincipalRoleEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onAfterCreatePrincipalRole( + PrincipalRolesServiceEvents.AfterCreatePrincipalRoleEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onBeforeDeletePrincipalRole( + PrincipalRolesServiceEvents.BeforeDeletePrincipalRoleEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onAfterDeletePrincipalRole( + PrincipalRolesServiceEvents.AfterDeletePrincipalRoleEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onBeforeGetPrincipalRole( + PrincipalRolesServiceEvents.BeforeGetPrincipalRoleEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onAfterGetPrincipalRole( + PrincipalRolesServiceEvents.AfterGetPrincipalRoleEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onBeforeUpdatePrincipalRole( + PrincipalRolesServiceEvents.BeforeUpdatePrincipalRoleEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onAfterUpdatePrincipalRole( + PrincipalRolesServiceEvents.AfterUpdatePrincipalRoleEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onBeforeListPrincipalRoles( + PrincipalRolesServiceEvents.BeforeListPrincipalRolesEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onAfterListPrincipalRoles( + PrincipalRolesServiceEvents.AfterListPrincipalRolesEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onBeforeCreateCatalogRole(CatalogsServiceEvents.BeforeCreateCatalogRoleEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onAfterCreateCatalogRole(CatalogsServiceEvents.AfterCreateCatalogRoleEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onBeforeDeleteCatalogRole(CatalogsServiceEvents.BeforeDeleteCatalogRoleEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onAfterDeleteCatalogRole(CatalogsServiceEvents.AfterDeleteCatalogRoleEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onBeforeGetCatalogRole(CatalogsServiceEvents.BeforeGetCatalogRoleEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onAfterGetCatalogRole(CatalogsServiceEvents.AfterGetCatalogRoleEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onBeforeUpdateCatalogRole(CatalogsServiceEvents.BeforeUpdateCatalogRoleEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onAfterUpdateCatalogRole(CatalogsServiceEvents.AfterUpdateCatalogRoleEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onBeforeListCatalogRoles(CatalogsServiceEvents.BeforeListCatalogRolesEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onAfterListCatalogRoles(CatalogsServiceEvents.AfterListCatalogRolesEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onBeforeAssignCatalogRoleToPrincipalRole( + PrincipalRolesServiceEvents.BeforeAssignCatalogRoleToPrincipalRoleEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onAfterAssignCatalogRoleToPrincipalRole( + PrincipalRolesServiceEvents.AfterAssignCatalogRoleToPrincipalRoleEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onBeforeRevokeCatalogRoleFromPrincipalRole( + PrincipalRolesServiceEvents.BeforeRevokeCatalogRoleFromPrincipalRoleEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onAfterRevokeCatalogRoleFromPrincipalRole( + PrincipalRolesServiceEvents.AfterRevokeCatalogRoleFromPrincipalRoleEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onBeforeListAssigneePrincipalsForPrincipalRole( + PrincipalRolesServiceEvents.BeforeListAssigneePrincipalsForPrincipalRoleEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onAfterListAssigneePrincipalsForPrincipalRole( + PrincipalRolesServiceEvents.AfterListAssigneePrincipalsForPrincipalRoleEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onBeforeListCatalogRolesForPrincipalRole( + PrincipalRolesServiceEvents.BeforeListCatalogRolesForPrincipalRoleEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onAfterListCatalogRolesForPrincipalRole( + PrincipalRolesServiceEvents.AfterListCatalogRolesForPrincipalRoleEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onBeforeAddGrantToCatalogRole( + CatalogsServiceEvents.BeforeAddGrantToCatalogRoleEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onAfterAddGrantToCatalogRole( + CatalogsServiceEvents.AfterAddGrantToCatalogRoleEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onBeforeRevokeGrantFromCatalogRole( + CatalogsServiceEvents.BeforeRevokeGrantFromCatalogRoleEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onAfterRevokeGrantFromCatalogRole( + CatalogsServiceEvents.AfterRevokeGrantFromCatalogRoleEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onBeforeListAssigneePrincipalRolesForCatalogRole( + CatalogsServiceEvents.BeforeListAssigneePrincipalRolesForCatalogRoleEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onAfterListAssigneePrincipalRolesForCatalogRole( + CatalogsServiceEvents.AfterListAssigneePrincipalRolesForCatalogRoleEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onBeforeListGrantsForCatalogRole( + CatalogsServiceEvents.BeforeListGrantsForCatalogRoleEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onAfterListGrantsForCatalogRole( + CatalogsServiceEvents.AfterListGrantsForCatalogRoleEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onBeforeCreateNamespace(IcebergRestCatalogEvents.BeforeCreateNamespaceEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onAfterCreateNamespace(IcebergRestCatalogEvents.AfterCreateNamespaceEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onBeforeListNamespaces(IcebergRestCatalogEvents.BeforeListNamespacesEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onAfterListNamespaces(IcebergRestCatalogEvents.AfterListNamespacesEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onBeforeLoadNamespaceMetadata( + IcebergRestCatalogEvents.BeforeLoadNamespaceMetadataEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onAfterLoadNamespaceMetadata( + IcebergRestCatalogEvents.AfterLoadNamespaceMetadataEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onBeforeCheckExistsNamespace( + IcebergRestCatalogEvents.BeforeCheckExistsNamespaceEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onAfterCheckExistsNamespace( + IcebergRestCatalogEvents.AfterCheckExistsNamespaceEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onBeforeDropNamespace(IcebergRestCatalogEvents.BeforeDropNamespaceEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onAfterDropNamespace(IcebergRestCatalogEvents.AfterDropNamespaceEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onBeforeUpdateNamespaceProperties( + IcebergRestCatalogEvents.BeforeUpdateNamespacePropertiesEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onAfterUpdateNamespaceProperties( + IcebergRestCatalogEvents.AfterUpdateNamespacePropertiesEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onBeforeCreateTable(IcebergRestCatalogEvents.BeforeCreateTableEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onAfterCreateTable(IcebergRestCatalogEvents.AfterCreateTableEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onBeforeCommitTable(IcebergRestCatalogEvents.BeforeCommitTableEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onAfterCommitTable(IcebergRestCatalogEvents.AfterCommitTableEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onBeforeRefreshTable(IcebergRestCatalogEvents.BeforeRefreshTableEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onAfterRefreshTable(IcebergRestCatalogEvents.AfterRefreshTableEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onBeforeListTables(IcebergRestCatalogEvents.BeforeListTablesEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onAfterListTables(IcebergRestCatalogEvents.AfterListTablesEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onBeforeLoadTable(IcebergRestCatalogEvents.BeforeLoadTableEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onAfterLoadTable(IcebergRestCatalogEvents.AfterLoadTableEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onBeforeCheckExistsTable(IcebergRestCatalogEvents.BeforeCheckExistsTableEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onAfterCheckExistsTable(IcebergRestCatalogEvents.AfterCheckExistsTableEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onBeforeDropTable(IcebergRestCatalogEvents.BeforeDropTableEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onAfterDropTable(IcebergRestCatalogEvents.AfterDropTableEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onBeforeRegisterTable(IcebergRestCatalogEvents.BeforeRegisterTableEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onAfterRegisterTable(IcebergRestCatalogEvents.AfterRegisterTableEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onBeforeRenameTable(IcebergRestCatalogEvents.BeforeRenameTableEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onAfterRenameTable(IcebergRestCatalogEvents.AfterRenameTableEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onBeforeUpdateTable(IcebergRestCatalogEvents.BeforeUpdateTableEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onAfterUpdateTable(IcebergRestCatalogEvents.AfterUpdateTableEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onBeforeCreateView(IcebergRestCatalogEvents.BeforeCreateViewEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onAfterCreateView(IcebergRestCatalogEvents.AfterCreateViewEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onBeforeCommitView(IcebergRestCatalogEvents.BeforeCommitViewEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onAfterCommitView(IcebergRestCatalogEvents.AfterCommitViewEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onBeforeRefreshView(IcebergRestCatalogEvents.BeforeRefreshViewEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onAfterRefreshView(IcebergRestCatalogEvents.AfterRefreshViewEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onBeforeListViews(IcebergRestCatalogEvents.BeforeListViewsEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onAfterListViews(IcebergRestCatalogEvents.AfterListViewsEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onBeforeLoadView(IcebergRestCatalogEvents.BeforeLoadViewEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onAfterLoadView(IcebergRestCatalogEvents.AfterLoadViewEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onBeforeCheckExistsView(IcebergRestCatalogEvents.BeforeCheckExistsViewEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onAfterCheckExistsView(IcebergRestCatalogEvents.AfterCheckExistsViewEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onBeforeDropView(IcebergRestCatalogEvents.BeforeDropViewEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onAfterDropView(IcebergRestCatalogEvents.AfterDropViewEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onBeforeRenameView(IcebergRestCatalogEvents.BeforeRenameViewEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onAfterRenameView(IcebergRestCatalogEvents.AfterRenameViewEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onBeforeReplaceView(IcebergRestCatalogEvents.BeforeReplaceViewEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onAfterReplaceView(IcebergRestCatalogEvents.AfterReplaceViewEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onBeforeLoadCredentials(IcebergRestCatalogEvents.BeforeLoadCredentialsEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onAfterLoadCredentials(IcebergRestCatalogEvents.AfterLoadCredentialsEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onBeforeCommitTransaction( + IcebergRestCatalogEvents.BeforeCommitTransactionEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onAfterCommitTransaction(IcebergRestCatalogEvents.AfterCommitTransactionEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onBeforeSendNotification(IcebergRestCatalogEvents.BeforeSendNotificationEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onAfterSendNotification(IcebergRestCatalogEvents.AfterSendNotificationEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onBeforeGetConfig(IcebergRestCatalogEvents.BeforeGetConfigEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onAfterGetConfig(IcebergRestCatalogEvents.AfterGetConfigEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onBeforeCreatePolicy(CatalogPolicyServiceEvents.BeforeCreatePolicyEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onAfterCreatePolicy(CatalogPolicyServiceEvents.AfterCreatePolicyEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onBeforeListPolicies(CatalogPolicyServiceEvents.BeforeListPoliciesEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onAfterListPolicies(CatalogPolicyServiceEvents.AfterListPoliciesEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onBeforeLoadPolicy(CatalogPolicyServiceEvents.BeforeLoadPolicyEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onAfterLoadPolicy(CatalogPolicyServiceEvents.AfterLoadPolicyEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onBeforeUpdatePolicy(CatalogPolicyServiceEvents.BeforeUpdatePolicyEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onAfterUpdatePolicy(CatalogPolicyServiceEvents.AfterUpdatePolicyEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onBeforeDropPolicy(CatalogPolicyServiceEvents.BeforeDropPolicyEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onAfterDropPolicy(CatalogPolicyServiceEvents.AfterDropPolicyEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onBeforeAttachPolicy(CatalogPolicyServiceEvents.BeforeAttachPolicyEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onAfterAttachPolicy(CatalogPolicyServiceEvents.AfterAttachPolicyEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onBeforeDetachPolicy(CatalogPolicyServiceEvents.BeforeDetachPolicyEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onAfterDetachPolicy(CatalogPolicyServiceEvents.AfterDetachPolicyEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onBeforeGetApplicablePolicies( + CatalogPolicyServiceEvents.BeforeGetApplicablePoliciesEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onAfterGetApplicablePolicies( + CatalogPolicyServiceEvents.AfterGetApplicablePoliciesEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onBeforeCreateGenericTable( + CatalogGenericTableServiceEvents.BeforeCreateGenericTableEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onAfterCreateGenericTable( + CatalogGenericTableServiceEvents.AfterCreateGenericTableEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onBeforeDropGenericTable( + CatalogGenericTableServiceEvents.BeforeDropGenericTableEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onAfterDropGenericTable( + CatalogGenericTableServiceEvents.AfterDropGenericTableEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onBeforeListGenericTables( + CatalogGenericTableServiceEvents.BeforeListGenericTablesEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onAfterListGenericTables( + CatalogGenericTableServiceEvents.AfterListGenericTablesEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onBeforeLoadGenericTable( + CatalogGenericTableServiceEvents.BeforeLoadGenericTableEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onAfterLoadGenericTable( + CatalogGenericTableServiceEvents.AfterLoadGenericTableEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onBeforeAttemptTask(BeforeAttemptTaskEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onAfterAttemptTask(AfterAttemptTaskEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onBeforeLimitRequestRate(BeforeLimitRequestRateEvent event) { + if (shouldHandle(event)) handle(event); + } +} diff --git a/runtime/service/src/main/java/org/apache/polaris/service/events/jsonEventListener/aws/cloudwatch/AwsCloudWatchConfiguration.java b/runtime/service/src/main/java/org/apache/polaris/service/events/listeners/aws/cloudwatch/AwsCloudWatchConfiguration.java similarity index 91% rename from runtime/service/src/main/java/org/apache/polaris/service/events/jsonEventListener/aws/cloudwatch/AwsCloudWatchConfiguration.java rename to runtime/service/src/main/java/org/apache/polaris/service/events/listeners/aws/cloudwatch/AwsCloudWatchConfiguration.java index e511f13a0c..57f9ae98c7 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/events/jsonEventListener/aws/cloudwatch/AwsCloudWatchConfiguration.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/events/listeners/aws/cloudwatch/AwsCloudWatchConfiguration.java @@ -16,13 +16,16 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.polaris.service.events.jsonEventListener.aws.cloudwatch; +package org.apache.polaris.service.events.listeners.aws.cloudwatch; import io.quarkus.runtime.annotations.StaticInitSafe; import io.smallrye.config.ConfigMapping; import io.smallrye.config.WithDefault; import io.smallrye.config.WithName; import jakarta.enterprise.context.ApplicationScoped; +import java.util.Optional; +import java.util.Set; +import org.apache.polaris.service.events.PolarisEvent; /** Configuration interface for AWS CloudWatch event listener integration. */ @StaticInitSafe @@ -86,4 +89,8 @@ public interface AwsCloudWatchConfiguration { @WithName("synchronous-mode") @WithDefault("false") boolean synchronousMode(); + + @WithName("event-types") + Optional>> + eventTypes(); // defaults to empty option i.e. process all events } diff --git a/runtime/service/src/main/java/org/apache/polaris/service/events/jsonEventListener/aws/cloudwatch/AwsCloudWatchEventListener.java b/runtime/service/src/main/java/org/apache/polaris/service/events/listeners/aws/cloudwatch/AwsCloudWatchEventListener.java similarity index 70% rename from runtime/service/src/main/java/org/apache/polaris/service/events/jsonEventListener/aws/cloudwatch/AwsCloudWatchEventListener.java rename to runtime/service/src/main/java/org/apache/polaris/service/events/listeners/aws/cloudwatch/AwsCloudWatchEventListener.java index 87cf70a2f1..887b28a631 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/events/jsonEventListener/aws/cloudwatch/AwsCloudWatchEventListener.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/events/listeners/aws/cloudwatch/AwsCloudWatchEventListener.java @@ -17,10 +17,13 @@ * under the License. */ -package org.apache.polaris.service.events.jsonEventListener.aws.cloudwatch; +package org.apache.polaris.service.events.listeners.aws.cloudwatch; +import com.fasterxml.jackson.annotation.JsonUnwrapped; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.PropertyNamingStrategies; +import com.fasterxml.jackson.databind.annotation.JsonNaming; import io.smallrye.common.annotation.Identifier; import jakarta.annotation.PostConstruct; import jakarta.annotation.PreDestroy; @@ -29,14 +32,16 @@ import jakarta.ws.rs.core.Context; import jakarta.ws.rs.core.SecurityContext; import java.time.Clock; -import java.util.HashMap; +import java.util.Collection; import java.util.List; +import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.function.Supplier; import org.apache.polaris.core.auth.PolarisPrincipal; import org.apache.polaris.core.context.CallContext; import org.apache.polaris.service.config.PolarisIcebergObjectMapperCustomizer; -import org.apache.polaris.service.events.jsonEventListener.PropertyMapEventListener; +import org.apache.polaris.service.events.PolarisEvent; +import org.apache.polaris.service.events.listeners.AllEventsForwardingListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import software.amazon.awssdk.regions.Region; @@ -51,10 +56,10 @@ @ApplicationScoped @Identifier("aws-cloudwatch") -public class AwsCloudWatchEventListener extends PropertyMapEventListener { +public class AwsCloudWatchEventListener extends AllEventsForwardingListener { private static final Logger LOGGER = LoggerFactory.getLogger(AwsCloudWatchEventListener.class); - final ObjectMapper objectMapper = new ObjectMapper(); + final ObjectMapper objectMapper; private CloudWatchLogsAsyncClient client; private final String logGroup; @@ -62,6 +67,8 @@ public class AwsCloudWatchEventListener extends PropertyMapEventListener { private final Region region; private final boolean synchronousMode; private final Clock clock; + private final Set> allowedEventTypes; + private final boolean listenToAllEvents; @Inject CallContext callContext; @@ -71,13 +78,38 @@ public class AwsCloudWatchEventListener extends PropertyMapEventListener { public AwsCloudWatchEventListener( AwsCloudWatchConfiguration config, Clock clock, - PolarisIcebergObjectMapperCustomizer customizer) { + PolarisIcebergObjectMapperCustomizer customizer, + ObjectMapper mapper) { this.logStream = config.awsCloudWatchLogStream(); this.logGroup = config.awsCloudWatchLogGroup(); this.region = Region.of(config.awsCloudWatchRegion()); this.synchronousMode = config.synchronousMode(); this.clock = clock; + this.objectMapper = mapper; customizer.customize(this.objectMapper); + this.listenToAllEvents = + config.eventTypes().isEmpty() + || config.eventTypes().map(Set::isEmpty).orElse(true) + || config.eventTypes().get().stream().anyMatch(e -> e == PolarisEvent.class); + this.allowedEventTypes = listenToAllEvents ? Set.of() : Set.copyOf(config.eventTypes().get()); + } + + @Override + protected boolean shouldHandle(Object event) { + if (!(event instanceof PolarisEvent polarisEvent)) { + return false; + } + + if (this.listenToAllEvents) { + return true; + } + Class actualType = polarisEvent.getClass(); + return allowedEventTypes.stream().anyMatch(cfg -> cfg.isAssignableFrom(actualType)); + } + + @Override + protected void handle(PolarisEvent event) { + transformAndSendEvent(event); } @PostConstruct @@ -86,6 +118,14 @@ void start() { ensureLogGroupAndStream(); } + @PostConstruct + void verifyMapper() { + LOGGER.info( + "ObjectMapper hash={}, mixins={}", + System.identityHashCode(objectMapper), + objectMapper.mixInCount()); + } + protected CloudWatchLogsAsyncClient createCloudWatchAsyncClient() { return CloudWatchLogsAsyncClient.builder().region(region).build(); } @@ -151,28 +191,44 @@ void shutdown() { } } - @Override - protected void transformAndSendEvent(HashMap properties) { - properties.put("realm_id", callContext.getRealmContext().getRealmIdentifier()); - properties.put("principal", securityContext.getUserPrincipal().getName()); - properties.put( - "activated_roles", ((PolarisPrincipal) securityContext.getUserPrincipal()).getRoles()); - // TODO: Add request ID when it is available + @JsonNaming(PropertyNamingStrategies.SnakeCaseStrategy.class) + public record CloudWatchEvent( + String principal, + String realmId, + Collection activatedRoles, + String eventType, + @JsonUnwrapped PolarisEvent event // flatten + ) {} + + protected void transformAndSendEvent(PolarisEvent event) { + + CloudWatchEvent payload = + new CloudWatchEvent( + securityContext.getUserPrincipal().getName(), + callContext.getRealmContext().getRealmIdentifier(), + ((PolarisPrincipal) securityContext.getUserPrincipal()).getRoles(), + event.getClass().getSimpleName(), + event); + String eventAsJson; + try { - eventAsJson = objectMapper.writeValueAsString(properties); - } catch (JsonProcessingException e) { - LOGGER.error("Error processing event into JSON string: ", e); - LOGGER.debug("Failed to convert the following object into JSON string: {}", properties); + eventAsJson = objectMapper.writeValueAsString(payload); + } catch (JsonProcessingException ex) { + LOGGER.error("Error serializing CloudWatch payload: ", ex); + LOGGER.debug("Failed to convert the following object into JSON string: {}", payload); return; } + InputLogEvent inputLogEvent = InputLogEvent.builder().message(eventAsJson).timestamp(clock.millis()).build(); + PutLogEventsRequest.Builder requestBuilder = PutLogEventsRequest.builder() .logGroupName(logGroup) .logStreamName(logStream) .logEvents(List.of(inputLogEvent)); + CompletableFuture future = client .putLogEvents(requestBuilder.build()) @@ -183,6 +239,7 @@ protected void transformAndSendEvent(HashMap properties) { "Error writing log to CloudWatch. Event: {}, Error: ", inputLogEvent, err); } }); + if (synchronousMode) { future.join(); } diff --git a/runtime/service/src/test/java/org/apache/polaris/service/events/jsonEventListener/aws/cloudwatch/AwsCloudWatchEventListenerTest.java b/runtime/service/src/test/java/org/apache/polaris/service/events/listeners/aws/cloudwatch/AwsCloudWatchEventListenerTest.java similarity index 82% rename from runtime/service/src/test/java/org/apache/polaris/service/events/jsonEventListener/aws/cloudwatch/AwsCloudWatchEventListenerTest.java rename to runtime/service/src/test/java/org/apache/polaris/service/events/listeners/aws/cloudwatch/AwsCloudWatchEventListenerTest.java index 3aac097b17..1f794ab49f 100644 --- a/runtime/service/src/test/java/org/apache/polaris/service/events/jsonEventListener/aws/cloudwatch/AwsCloudWatchEventListenerTest.java +++ b/runtime/service/src/test/java/org/apache/polaris/service/events/listeners/aws/cloudwatch/AwsCloudWatchEventListenerTest.java @@ -17,12 +17,14 @@ * under the License. */ -package org.apache.polaris.service.events.jsonEventListener.aws.cloudwatch; +package org.apache.polaris.service.events.listeners.aws.cloudwatch; import static org.apache.polaris.containerspec.ContainerSpecHelper.containerSpecHelper; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.Mockito.when; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.PropertyNamingStrategies; import io.quarkus.runtime.configuration.MemorySize; import jakarta.ws.rs.core.SecurityContext; @@ -33,6 +35,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.polaris.core.PolarisCallContext; import org.apache.polaris.core.auth.PolarisPrincipal; @@ -40,6 +43,9 @@ import org.apache.polaris.core.context.RealmContext; import org.apache.polaris.service.config.PolarisIcebergObjectMapperCustomizer; import org.apache.polaris.service.events.IcebergRestCatalogEvents; +import org.apache.polaris.service.events.PolarisEvent; +import org.apache.polaris.service.events.json.mixins.IcebergMixins; +import org.apache.polaris.service.events.json.mixins.PolarisEventBaseMixin; import org.awaitility.Awaitility; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -86,17 +92,22 @@ class AwsCloudWatchEventListenerTest { private ExecutorService executorService; private AutoCloseable mockitoContext; + private ObjectMapper objectMapper; @BeforeEach void setUp() { mockitoContext = MockitoAnnotations.openMocks(this); executorService = Executors.newSingleThreadExecutor(); + // Build the test mapper and apply the same customizations Quarkus would + objectMapper = new ObjectMapper(); + // Configure the mocks when(config.awsCloudWatchLogGroup()).thenReturn(LOG_GROUP); when(config.awsCloudWatchLogStream()).thenReturn(LOG_STREAM); when(config.awsCloudWatchRegion()).thenReturn("us-east-1"); - when(config.synchronousMode()).thenReturn(false); // Default to async mode + when(config.synchronousMode()).thenReturn(false); // default async + when(config.eventTypes()).thenReturn(java.util.Optional.empty()); // handle all events } @AfterEach @@ -130,7 +141,7 @@ private CloudWatchLogsAsyncClient createCloudWatchAsyncClient() { private AwsCloudWatchEventListener createListener(CloudWatchLogsAsyncClient client) { AwsCloudWatchEventListener listener = - new AwsCloudWatchEventListener(config, clock, customizer) { + new AwsCloudWatchEventListener(config, clock, customizer, objectMapper) { @Override protected CloudWatchLogsAsyncClient createCloudWatchAsyncClient() { return client; @@ -201,7 +212,8 @@ void shouldSendEventToCloudWatch() { listener.start(); try { // Create and send a test event - TableIdentifier testTable = TableIdentifier.of("test_namespace", "test_table"); + Namespace namespaceTest = Namespace.of("test_namespace.test1", "test1a"); + TableIdentifier testTable = TableIdentifier.of(namespaceTest, "test_table"); listener.onAfterRefreshTable( new IcebergRestCatalogEvents.AfterRefreshTableEvent("test_catalog", testTable)); @@ -236,12 +248,18 @@ void shouldSendEventToCloudWatch() { .satisfies( logEvent -> { String message = logEvent.message(); + JsonNode root = objectMapper.readTree(message); + JsonNode event = root.path("event").isMissingNode() ? root : root.path("event"); assertThat(message).contains(REALM); assertThat(message) .contains( IcebergRestCatalogEvents.AfterRefreshTableEvent.class.getSimpleName()); assertThat(message).contains(TEST_USER); - assertThat(message).contains(testTable.toString()); + // table_identifier object + JsonNode tableId = event.path("table_identifier"); + assertThat(tableId.isObject()).isTrue(); + assertThat(tableId.path("name").asText()).isEqualTo("test_table"); + assertThat(tableId.path("namespace").isArray()).isTrue(); }); } finally { // Clean up @@ -309,17 +327,49 @@ void shouldSendEventInSynchronousMode() { @Test void ensureObjectMapperCustomizerIsApplied() { - AwsCloudWatchEventListener listener = createListener(createCloudWatchAsyncClient()); - listener.start(); + + AwsCloudWatchEventListener listener = + new AwsCloudWatchEventListener(config, clock, customizer, objectMapper); assertThat(listener.objectMapper.getPropertyNamingStrategy()) .isInstanceOf(PropertyNamingStrategies.KebabCaseStrategy.class); assertThat(listener.objectMapper.getFactory().streamReadConstraints().getMaxDocumentLength()) .isEqualTo(MAX_BODY_SIZE.longValue()); + + assertThat(objectMapper.findMixInClassFor(Namespace.class)) + .as("Namespace mixin should be registered") + .isEqualTo(IcebergMixins.NamespaceMixin.class); + + assertThat(objectMapper.findMixInClassFor(TableIdentifier.class)) + .as("TableIdentifier mixin should be registered") + .isEqualTo(IcebergMixins.TableIdentifierMixin.class); + + assertThat(objectMapper.findMixInClassFor(PolarisEvent.class)) + .as("Namespace mixin should be registered") + .isEqualTo(PolarisEventBaseMixin.class); + } + + @Test + void shouldListenToAllEventTypesWhenConfigNotProvided() { + // given: config.eventTypes() is empty → listen to all events + when(config.eventTypes()).thenReturn(java.util.Optional.empty()); + + AwsCloudWatchEventListener listener = + new AwsCloudWatchEventListener(config, clock, customizer, objectMapper); + + // This is any random PolarisEvent — if the listener listens to all types, + // shouldHandle(event) should return true + PolarisEvent randomEvent = + new IcebergRestCatalogEvents.AfterRefreshTableEvent( + "test_catalog", TableIdentifier.of("db", "table")); + + boolean shouldHandle = listener.shouldHandle(randomEvent); + assertThat(shouldHandle) + .as("Listener should handle all events when no eventTypes are configured") + .isTrue(); } private void verifyLogGroupAndStreamExist(CloudWatchLogsAsyncClient client) { - // Verify log group exists DescribeLogGroupsResponse groups = client .describeLogGroups( diff --git a/runtime/service/src/test/resources/org/apache/polaris/service/events/jsonEventListener/aws/cloudwatch/Dockerfile-localstack-version b/runtime/service/src/test/resources/org/apache/polaris/service/events/listeners/aws/cloudwatch/Dockerfile-localstack-version similarity index 100% rename from runtime/service/src/test/resources/org/apache/polaris/service/events/jsonEventListener/aws/cloudwatch/Dockerfile-localstack-version rename to runtime/service/src/test/resources/org/apache/polaris/service/events/listeners/aws/cloudwatch/Dockerfile-localstack-version From 9c14f6a892b2ca834b2abd3f52a16c288f032579 Mon Sep 17 00:00:00 2001 From: vchag Date: Tue, 4 Nov 2025 21:26:52 -0800 Subject: [PATCH 2/4] Include following changes: 1. Typo correction 2. Removing ObjectMapper customizer from AwsCloudWatchEventListener and AwsCloudWatchEventListenerTest --- .../src/main/resources/application.properties | 2 +- .../AllEventsForwardingListener.java | 2 +- .../AwsCloudWatchEventListener.java | 22 ++++--------------- .../AwsCloudWatchEventListenerTest.java | 12 +++++----- 4 files changed, 12 insertions(+), 26 deletions(-) diff --git a/runtime/defaults/src/main/resources/application.properties b/runtime/defaults/src/main/resources/application.properties index f8470bdee1..d00101894c 100644 --- a/runtime/defaults/src/main/resources/application.properties +++ b/runtime/defaults/src/main/resources/application.properties @@ -144,7 +144,7 @@ polaris.event-listener.type=no-op # polaris.event-listener.aws-cloudwatch.log-stream=polaris-cloudwatch-default-stream # polaris.event-listener.aws-cloudwatch.region=us-east-1 # polaris.event-listener.aws-cloudwatch.synchronous-mode=false -# polaris.event-listener.aws-cloudwatch.event-types= // the absence of this property would result if processing all Polaris event types. +# polaris.event-listener.aws-cloudwatch.event-types= // the absence of this property would result in processing all Polaris event types. polaris.log.request-id-header-name=Polaris-Request-Id # polaris.log.mdc.aid=polaris diff --git a/runtime/service/src/main/java/org/apache/polaris/service/events/listeners/AllEventsForwardingListener.java b/runtime/service/src/main/java/org/apache/polaris/service/events/listeners/AllEventsForwardingListener.java index ddb1e763e7..a709998797 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/events/listeners/AllEventsForwardingListener.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/events/listeners/AllEventsForwardingListener.java @@ -31,7 +31,7 @@ import org.apache.polaris.service.events.PrincipalsServiceEvents; /** - * Base class for event listeners that with to generically forward all {@link PolarisEvent + * Base class for event listeners that wish to generically forward all {@link PolarisEvent * PolarisEvents} to an external sink. * *

This design follows the Template Method pattern, centralizing shared control flow in the base diff --git a/runtime/service/src/main/java/org/apache/polaris/service/events/listeners/aws/cloudwatch/AwsCloudWatchEventListener.java b/runtime/service/src/main/java/org/apache/polaris/service/events/listeners/aws/cloudwatch/AwsCloudWatchEventListener.java index 887b28a631..96e1846b71 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/events/listeners/aws/cloudwatch/AwsCloudWatchEventListener.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/events/listeners/aws/cloudwatch/AwsCloudWatchEventListener.java @@ -39,7 +39,6 @@ import java.util.function.Supplier; import org.apache.polaris.core.auth.PolarisPrincipal; import org.apache.polaris.core.context.CallContext; -import org.apache.polaris.service.config.PolarisIcebergObjectMapperCustomizer; import org.apache.polaris.service.events.PolarisEvent; import org.apache.polaris.service.events.listeners.AllEventsForwardingListener; import org.slf4j.Logger; @@ -76,22 +75,17 @@ public class AwsCloudWatchEventListener extends AllEventsForwardingListener { @Inject public AwsCloudWatchEventListener( - AwsCloudWatchConfiguration config, - Clock clock, - PolarisIcebergObjectMapperCustomizer customizer, - ObjectMapper mapper) { + AwsCloudWatchConfiguration config, Clock clock, ObjectMapper mapper) { this.logStream = config.awsCloudWatchLogStream(); this.logGroup = config.awsCloudWatchLogGroup(); this.region = Region.of(config.awsCloudWatchRegion()); this.synchronousMode = config.synchronousMode(); this.clock = clock; this.objectMapper = mapper; - customizer.customize(this.objectMapper); + this.allowedEventTypes = config.eventTypes().orElse(Set.of()); this.listenToAllEvents = - config.eventTypes().isEmpty() - || config.eventTypes().map(Set::isEmpty).orElse(true) - || config.eventTypes().get().stream().anyMatch(e -> e == PolarisEvent.class); - this.allowedEventTypes = listenToAllEvents ? Set.of() : Set.copyOf(config.eventTypes().get()); + allowedEventTypes.isEmpty() + || allowedEventTypes.stream().anyMatch(c -> c == PolarisEvent.class); } @Override @@ -118,14 +112,6 @@ void start() { ensureLogGroupAndStream(); } - @PostConstruct - void verifyMapper() { - LOGGER.info( - "ObjectMapper hash={}, mixins={}", - System.identityHashCode(objectMapper), - objectMapper.mixInCount()); - } - protected CloudWatchLogsAsyncClient createCloudWatchAsyncClient() { return CloudWatchLogsAsyncClient.builder().region(region).build(); } diff --git a/runtime/service/src/test/java/org/apache/polaris/service/events/listeners/aws/cloudwatch/AwsCloudWatchEventListenerTest.java b/runtime/service/src/test/java/org/apache/polaris/service/events/listeners/aws/cloudwatch/AwsCloudWatchEventListenerTest.java index 1f794ab49f..708e88bd4e 100644 --- a/runtime/service/src/test/java/org/apache/polaris/service/events/listeners/aws/cloudwatch/AwsCloudWatchEventListenerTest.java +++ b/runtime/service/src/test/java/org/apache/polaris/service/events/listeners/aws/cloudwatch/AwsCloudWatchEventListenerTest.java @@ -92,15 +92,15 @@ class AwsCloudWatchEventListenerTest { private ExecutorService executorService; private AutoCloseable mockitoContext; - private ObjectMapper objectMapper; + private static final ObjectMapper objectMapper = new ObjectMapper(); + ; @BeforeEach void setUp() { mockitoContext = MockitoAnnotations.openMocks(this); executorService = Executors.newSingleThreadExecutor(); - // Build the test mapper and apply the same customizations Quarkus would - objectMapper = new ObjectMapper(); + customizer.customize(objectMapper); // Configure the mocks when(config.awsCloudWatchLogGroup()).thenReturn(LOG_GROUP); @@ -141,7 +141,7 @@ private CloudWatchLogsAsyncClient createCloudWatchAsyncClient() { private AwsCloudWatchEventListener createListener(CloudWatchLogsAsyncClient client) { AwsCloudWatchEventListener listener = - new AwsCloudWatchEventListener(config, clock, customizer, objectMapper) { + new AwsCloudWatchEventListener(config, clock, objectMapper) { @Override protected CloudWatchLogsAsyncClient createCloudWatchAsyncClient() { return client; @@ -329,7 +329,7 @@ void shouldSendEventInSynchronousMode() { void ensureObjectMapperCustomizerIsApplied() { AwsCloudWatchEventListener listener = - new AwsCloudWatchEventListener(config, clock, customizer, objectMapper); + new AwsCloudWatchEventListener(config, clock, objectMapper); assertThat(listener.objectMapper.getPropertyNamingStrategy()) .isInstanceOf(PropertyNamingStrategies.KebabCaseStrategy.class); @@ -355,7 +355,7 @@ void shouldListenToAllEventTypesWhenConfigNotProvided() { when(config.eventTypes()).thenReturn(java.util.Optional.empty()); AwsCloudWatchEventListener listener = - new AwsCloudWatchEventListener(config, clock, customizer, objectMapper); + new AwsCloudWatchEventListener(config, clock, objectMapper); // This is any random PolarisEvent — if the listener listens to all types, // shouldHandle(event) should return true From 4c0aef587e63d5e60d657561a0a53701417f324b Mon Sep 17 00:00:00 2001 From: vchag Date: Tue, 4 Nov 2025 21:58:22 -0800 Subject: [PATCH 3/4] Enforce AllEventsForwardingListener.shouldHandle to accept only PolarisEvent types --- .../events/listeners/AllEventsForwardingListener.java | 2 +- .../aws/cloudwatch/AwsCloudWatchEventListener.java | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/runtime/service/src/main/java/org/apache/polaris/service/events/listeners/AllEventsForwardingListener.java b/runtime/service/src/main/java/org/apache/polaris/service/events/listeners/AllEventsForwardingListener.java index a709998797..7aaf7604b1 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/events/listeners/AllEventsForwardingListener.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/events/listeners/AllEventsForwardingListener.java @@ -43,7 +43,7 @@ public abstract class AllEventsForwardingListener implements PolarisEventListene protected abstract void handle(PolarisEvent event); /** Optional filter (config-based). Default: handle all. */ - protected boolean shouldHandle(Object event) { + protected boolean shouldHandle(PolarisEvent event) { return true; } diff --git a/runtime/service/src/main/java/org/apache/polaris/service/events/listeners/aws/cloudwatch/AwsCloudWatchEventListener.java b/runtime/service/src/main/java/org/apache/polaris/service/events/listeners/aws/cloudwatch/AwsCloudWatchEventListener.java index 96e1846b71..21e3975fcf 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/events/listeners/aws/cloudwatch/AwsCloudWatchEventListener.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/events/listeners/aws/cloudwatch/AwsCloudWatchEventListener.java @@ -89,15 +89,15 @@ public AwsCloudWatchEventListener( } @Override - protected boolean shouldHandle(Object event) { - if (!(event instanceof PolarisEvent polarisEvent)) { + protected boolean shouldHandle(PolarisEvent event) { + if (event == null) { return false; } if (this.listenToAllEvents) { return true; } - Class actualType = polarisEvent.getClass(); + Class actualType = event.getClass(); return allowedEventTypes.stream().anyMatch(cfg -> cfg.isAssignableFrom(actualType)); } From 456c0e5fb2b2218fa31f18b074f61236d9fbacd3 Mon Sep 17 00:00:00 2001 From: Adam Christian Date: Fri, 5 Dec 2025 06:52:41 -0800 Subject: [PATCH 4/4] DO NOT MERGE: Idea for Redacting Events --- .../FieldRedactionSerializerModifier.java | 119 ++++++ .../events/json/RedactingSerializer.java | 51 +++ .../events/json/mixins/RedactionMixins.java | 160 ++++++++ .../AwsCloudWatchConfiguration.java | 46 +++ .../AwsCloudWatchEventListener.java | 77 +++- .../FieldRedactionSerializerModifierTest.java | 152 ++++++++ .../events/json/RedactingSerializerTest.java | 141 +++++++ .../json/mixins/RedactionMixinsTest.java | 70 ++++ .../AwsCloudWatchEventListenerTest.java | 351 ++++++++++++++++++ 9 files changed, 1166 insertions(+), 1 deletion(-) create mode 100644 runtime/service/src/main/java/org/apache/polaris/service/events/json/FieldRedactionSerializerModifier.java create mode 100644 runtime/service/src/main/java/org/apache/polaris/service/events/json/RedactingSerializer.java create mode 100644 runtime/service/src/main/java/org/apache/polaris/service/events/json/mixins/RedactionMixins.java create mode 100644 runtime/service/src/test/java/org/apache/polaris/service/events/json/FieldRedactionSerializerModifierTest.java create mode 100644 runtime/service/src/test/java/org/apache/polaris/service/events/json/RedactingSerializerTest.java create mode 100644 runtime/service/src/test/java/org/apache/polaris/service/events/json/mixins/RedactionMixinsTest.java diff --git a/runtime/service/src/main/java/org/apache/polaris/service/events/json/FieldRedactionSerializerModifier.java b/runtime/service/src/main/java/org/apache/polaris/service/events/json/FieldRedactionSerializerModifier.java new file mode 100644 index 0000000000..f323332489 --- /dev/null +++ b/runtime/service/src/main/java/org/apache/polaris/service/events/json/FieldRedactionSerializerModifier.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.polaris.service.events.json; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.BeanDescription; +import com.fasterxml.jackson.databind.JsonSerializer; +import com.fasterxml.jackson.databind.SerializationConfig; +import com.fasterxml.jackson.databind.SerializerProvider; +import com.fasterxml.jackson.databind.ser.BeanPropertyWriter; +import com.fasterxml.jackson.databind.ser.BeanSerializerModifier; +import java.io.IOException; +import java.util.List; +import java.util.Set; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +/** + * A Jackson BeanSerializerModifier that redacts fields based on configured patterns. + * + *

This modifier intercepts serialization of bean properties and replaces values with a redaction + * marker for fields that match the configured patterns. It supports: + * + *

    + *
  • Simple field names (e.g., "password", "secret") + *
  • Nested field paths using dot notation (e.g., "properties.api-key") + *
  • Wildcard patterns (e.g., "*.secret", "credentials.*") + *
+ */ +public class FieldRedactionSerializerModifier extends BeanSerializerModifier { + + private final List fieldPatterns; + + /** + * Creates a new FieldRedactionSerializerModifier with the specified field patterns. + * + * @param fieldNames set of field names or patterns to redact + */ + public FieldRedactionSerializerModifier(Set fieldNames) { + this.fieldPatterns = + fieldNames.stream() + .map(FieldRedactionSerializerModifier::convertToRegexPattern) + .map(Pattern::compile) + .collect(Collectors.toList()); + } + + /** + * Converts a field pattern (which may contain wildcards) to a regex pattern. + * + * @param pattern the field pattern (e.g., "*.secret", "credentials.*", "exact-field") + * @return a regex pattern string + */ + private static String convertToRegexPattern(String pattern) { + // Escape special regex characters except for our wildcard (*) + String escaped = pattern.replace(".", "\\.").replace("*", ".*"); + // Match the entire field path + return "^" + escaped + "$"; + } + + @Override + public List changeProperties( + SerializationConfig config, + BeanDescription beanDesc, + List beanProperties) { + + if (fieldPatterns.isEmpty()) { + return beanProperties; + } + + return beanProperties.stream() + .map(writer -> shouldRedactField(writer.getName()) ? createRedactingWriter(writer) : writer) + .collect(Collectors.toList()); + } + + /** + * Checks if a field name matches any of the configured redaction patterns. + * + * @param fieldName the field name to check + * @return true if the field should be redacted + */ + private boolean shouldRedactField(String fieldName) { + return fieldPatterns.stream().anyMatch(pattern -> pattern.matcher(fieldName).matches()); + } + + /** + * Creates a new BeanPropertyWriter that redacts the field value. + * + * @param original the original BeanPropertyWriter + * @return a new BeanPropertyWriter that redacts the value + */ + private BeanPropertyWriter createRedactingWriter(BeanPropertyWriter original) { + return new BeanPropertyWriter(original) { + @Override + public void serializeAsField(Object bean, JsonGenerator gen, SerializerProvider prov) + throws Exception { + gen.writeFieldName(_name); + gen.writeString(RedactingSerializer.getRedactedMarker()); + } + }; + } +} + diff --git a/runtime/service/src/main/java/org/apache/polaris/service/events/json/RedactingSerializer.java b/runtime/service/src/main/java/org/apache/polaris/service/events/json/RedactingSerializer.java new file mode 100644 index 0000000000..3011e4d3f6 --- /dev/null +++ b/runtime/service/src/main/java/org/apache/polaris/service/events/json/RedactingSerializer.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.polaris.service.events.json; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.JsonSerializer; +import com.fasterxml.jackson.databind.SerializerProvider; +import java.io.IOException; + +/** + * A Jackson serializer that redacts string values by replacing them with a redaction marker. + * + *

This serializer is used to prevent sensitive information from being included in serialized + * JSON output, particularly for event logging to external systems like CloudWatch. + */ +public class RedactingSerializer extends JsonSerializer { + private static final String REDACTED_MARKER = "***REDACTED***"; + + @Override + public void serialize(Object value, JsonGenerator gen, SerializerProvider serializers) + throws IOException { + gen.writeString(REDACTED_MARKER); + } + + /** + * Returns the marker string used to indicate redacted values. + * + * @return the redaction marker string + */ + public static String getRedactedMarker() { + return REDACTED_MARKER; + } +} + diff --git a/runtime/service/src/main/java/org/apache/polaris/service/events/json/mixins/RedactionMixins.java b/runtime/service/src/main/java/org/apache/polaris/service/events/json/mixins/RedactionMixins.java new file mode 100644 index 0000000000..4e4c2e8a3a --- /dev/null +++ b/runtime/service/src/main/java/org/apache/polaris/service/events/json/mixins/RedactionMixins.java @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.polaris.service.events.json.mixins; + +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.databind.annotation.JsonSerialize; +import java.util.Map; +import org.apache.polaris.service.events.json.RedactingSerializer; + +/** + * Jackson mixins for redacting sensitive information from event payloads. + * + *

These mixins are applied to various Iceberg and Polaris types to prevent sensitive data from + * being included in serialized events sent to external systems like CloudWatch. + * + *

The mixins support different redaction modes: + * + *

    + *
  • PARTIAL - Redact credentials and secrets, but keep metadata like locations + *
  • FULL - Redact all potentially sensitive fields including locations and properties + *
+ */ +public class RedactionMixins { + + /** + * Mixin for partial redaction of TableMetadata. + * + *

In PARTIAL mode, we keep the location but redact properties that may contain secrets. + */ + public abstract static class TableMetadataPartialRedactionMixin { + @JsonIgnore + public abstract Map properties(); + } + + /** + * Mixin for full redaction of TableMetadata. + * + *

In FULL mode, we redact both location and properties. + */ + public abstract static class TableMetadataFullRedactionMixin { + @JsonSerialize(using = RedactingSerializer.class) + public abstract String location(); + + @JsonIgnore + public abstract Map properties(); + } + + /** + * Mixin for partial redaction of ViewMetadata. + * + *

In PARTIAL mode, we keep the location but redact properties that may contain secrets. + */ + public abstract static class ViewMetadataPartialRedactionMixin { + @JsonIgnore + public abstract Map properties(); + } + + /** + * Mixin for full redaction of ViewMetadata. + * + *

In FULL mode, we redact both location and properties. + */ + public abstract static class ViewMetadataFullRedactionMixin { + @JsonSerialize(using = RedactingSerializer.class) + public abstract String location(); + + @JsonIgnore + public abstract Map properties(); + } + + /** + * Mixin for redacting LoadTableResponse/LoadTableResult. + * + *

Redacts the config map and storage-credentials array which contain sensitive credential + * information. + */ + public abstract static class LoadTableResponseRedactionMixin { + @JsonIgnore + public abstract Map config(); + + @JsonIgnore + public abstract Object storageCredentials(); + } + + /** + * Mixin for redacting LoadViewResponse/LoadViewResult. + * + *

Redacts the config map which may contain sensitive configuration. + */ + public abstract static class LoadViewResponseRedactionMixin { + @JsonIgnore + public abstract Map config(); + } + + /** + * Mixin for redacting ConfigResponse. + * + *

Redacts both defaults and overrides maps which may contain sensitive configuration like + * credentials, tokens, or internal endpoints. + * + *

Note: Since ConfigResponse only has defaults and overrides, ignoring both would result in + * an empty object that Jackson can't serialize. Instead, we ignore the entire configResponse + * field in AfterGetConfigEvent. + */ + public abstract static class ConfigResponseRedactionMixin { + @JsonIgnore + public abstract Map defaults(); + + @JsonIgnore + public abstract Map overrides(); + } + + /** + * Mixin for redacting AfterGetConfigEvent. + * + *

Completely omits the configResponse field which contains sensitive configuration. + */ + public abstract static class AfterGetConfigEventRedactionMixin { + @JsonIgnore + public abstract Object configResponse(); + } + + /** + * Mixin for redacting CreateNamespaceRequest. + * + *

Redacts properties that may contain secrets or sensitive configuration. + */ + public abstract static class CreateNamespaceRequestRedactionMixin { + @JsonIgnore + public abstract Map properties(); + } + + /** + * Mixin for redacting UpdateNamespacePropertiesRequest. + * + *

Redacts property updates and removals that may contain or reference secrets. + */ + public abstract static class UpdateNamespacePropertiesRequestRedactionMixin { + @JsonIgnore + public abstract Map updates(); + } +} + diff --git a/runtime/service/src/main/java/org/apache/polaris/service/events/listeners/aws/cloudwatch/AwsCloudWatchConfiguration.java b/runtime/service/src/main/java/org/apache/polaris/service/events/listeners/aws/cloudwatch/AwsCloudWatchConfiguration.java index 57f9ae98c7..771f78af03 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/events/listeners/aws/cloudwatch/AwsCloudWatchConfiguration.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/events/listeners/aws/cloudwatch/AwsCloudWatchConfiguration.java @@ -93,4 +93,50 @@ public interface AwsCloudWatchConfiguration { @WithName("event-types") Optional>> eventTypes(); // defaults to empty option i.e. process all events + + /** + * Returns the redaction mode for sensitive data in events. + * + *

Controls how sensitive information is handled when serializing events to CloudWatch: + * + *

    + *
  • NONE - No redaction, all data is sent as-is (use with caution) + *
  • PARTIAL - Redact highly sensitive fields like credentials, but keep metadata locations + * and properties + *
  • FULL - Redact all potentially sensitive fields including locations, properties, and + * credentials + *
+ * + *

Configuration property: {@code polaris.event-listener.aws-cloudwatch.redaction-mode} + * + * @return the redaction mode, defaults to PARTIAL for security + */ + @WithName("redaction-mode") + @WithDefault("PARTIAL") + RedactionMode redactionMode(); + + /** + * Returns additional field names to redact beyond the defaults. + * + *

Allows customization of which fields should be redacted. Field names can use dot notation + * for nested fields (e.g., "properties.custom-secret"). These fields are redacted in addition to + * the default redacted fields based on the redaction mode. + * + *

Configuration property: {@code + * polaris.event-listener.aws-cloudwatch.additional-redacted-fields} + * + * @return a set of additional field names to redact, empty by default + */ + @WithName("additional-redacted-fields") + Optional> additionalRedactedFields(); + + /** Enum defining redaction modes for sensitive data. */ + enum RedactionMode { + /** No redaction - all data sent as-is. Use with extreme caution. */ + NONE, + /** Partial redaction - redact credentials and secrets, keep metadata. */ + PARTIAL, + /** Full redaction - redact all potentially sensitive fields. */ + FULL + } } diff --git a/runtime/service/src/main/java/org/apache/polaris/service/events/listeners/aws/cloudwatch/AwsCloudWatchEventListener.java b/runtime/service/src/main/java/org/apache/polaris/service/events/listeners/aws/cloudwatch/AwsCloudWatchEventListener.java index 21e3975fcf..3979838d40 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/events/listeners/aws/cloudwatch/AwsCloudWatchEventListener.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/events/listeners/aws/cloudwatch/AwsCloudWatchEventListener.java @@ -24,6 +24,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.PropertyNamingStrategies; import com.fasterxml.jackson.databind.annotation.JsonNaming; +import com.fasterxml.jackson.databind.module.SimpleModule; import io.smallrye.common.annotation.Identifier; import jakarta.annotation.PostConstruct; import jakarta.annotation.PreDestroy; @@ -37,10 +38,21 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.function.Supplier; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.rest.requests.CreateNamespaceRequest; +import org.apache.iceberg.rest.requests.UpdateNamespacePropertiesRequest; +import org.apache.iceberg.rest.responses.ConfigResponse; +import org.apache.iceberg.rest.responses.LoadTableResponse; +import org.apache.iceberg.rest.responses.LoadViewResponse; +import org.apache.iceberg.view.ViewMetadata; import org.apache.polaris.core.auth.PolarisPrincipal; import org.apache.polaris.core.context.CallContext; +import org.apache.polaris.service.events.IcebergRestCatalogEvents; import org.apache.polaris.service.events.PolarisEvent; +import org.apache.polaris.service.events.json.FieldRedactionSerializerModifier; +import org.apache.polaris.service.events.json.mixins.RedactionMixins; import org.apache.polaris.service.events.listeners.AllEventsForwardingListener; +import org.apache.polaris.service.events.listeners.aws.cloudwatch.AwsCloudWatchConfiguration.RedactionMode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import software.amazon.awssdk.regions.Region; @@ -81,13 +93,76 @@ public AwsCloudWatchEventListener( this.region = Region.of(config.awsCloudWatchRegion()); this.synchronousMode = config.synchronousMode(); this.clock = clock; - this.objectMapper = mapper; + this.objectMapper = + configureRedaction( + mapper.copy(), + config.redactionMode(), + config.additionalRedactedFields().orElse(Set.of())); this.allowedEventTypes = config.eventTypes().orElse(Set.of()); this.listenToAllEvents = allowedEventTypes.isEmpty() || allowedEventTypes.stream().anyMatch(c -> c == PolarisEvent.class); } + /** + * Configures the ObjectMapper with redaction mixins based on the specified redaction mode. + * + * @param mapper the ObjectMapper to configure (should be a copy) + * @param redactionMode the redaction mode to apply + * @param additionalRedactedFields additional field names to redact beyond the defaults + * @return the configured ObjectMapper + */ + private static ObjectMapper configureRedaction( + ObjectMapper mapper, RedactionMode redactionMode, Set additionalRedactedFields) { + LOGGER.debug("Configuring redaction with mode: {}", redactionMode); + LOGGER.debug("Additional redacted fields: {}", additionalRedactedFields); + + if (redactionMode == RedactionMode.NONE && additionalRedactedFields.isEmpty()) { + // No redaction - return mapper as-is + return mapper; + } + + // Apply field-level redaction patterns if configured + if (!additionalRedactedFields.isEmpty()) { + SimpleModule module = new SimpleModule(); + module.setSerializerModifier(new FieldRedactionSerializerModifier(additionalRedactedFields)); + mapper.registerModule(module); + LOGGER.debug("Registered field-level redaction for {} patterns", additionalRedactedFields.size()); + } + + // Skip mixin-based redaction if mode is NONE + if (redactionMode == RedactionMode.NONE) { + return mapper; + } + + // Always redact these highly sensitive types regardless of mode + mapper.addMixIn(LoadTableResponse.class, RedactionMixins.LoadTableResponseRedactionMixin.class); + mapper.addMixIn(LoadViewResponse.class, RedactionMixins.LoadViewResponseRedactionMixin.class); + // Redact the entire AfterGetConfigEvent.configResponse field + mapper.addMixIn( + IcebergRestCatalogEvents.AfterGetConfigEvent.class, + RedactionMixins.AfterGetConfigEventRedactionMixin.class); + LOGGER.debug("Registered AfterGetConfigEvent mixin"); + mapper.addMixIn( + CreateNamespaceRequest.class, RedactionMixins.CreateNamespaceRequestRedactionMixin.class); + mapper.addMixIn( + UpdateNamespacePropertiesRequest.class, + RedactionMixins.UpdateNamespacePropertiesRequestRedactionMixin.class); + + if (redactionMode == RedactionMode.PARTIAL) { + // Partial redaction - keep locations, redact properties + mapper.addMixIn( + TableMetadata.class, RedactionMixins.TableMetadataPartialRedactionMixin.class); + mapper.addMixIn(ViewMetadata.class, RedactionMixins.ViewMetadataPartialRedactionMixin.class); + } else if (redactionMode == RedactionMode.FULL) { + // Full redaction - redact both locations and properties + mapper.addMixIn(TableMetadata.class, RedactionMixins.TableMetadataFullRedactionMixin.class); + mapper.addMixIn(ViewMetadata.class, RedactionMixins.ViewMetadataFullRedactionMixin.class); + } + + return mapper; + } + @Override protected boolean shouldHandle(PolarisEvent event) { if (event == null) { diff --git a/runtime/service/src/test/java/org/apache/polaris/service/events/json/FieldRedactionSerializerModifierTest.java b/runtime/service/src/test/java/org/apache/polaris/service/events/json/FieldRedactionSerializerModifierTest.java new file mode 100644 index 0000000000..cee236a043 --- /dev/null +++ b/runtime/service/src/test/java/org/apache/polaris/service/events/json/FieldRedactionSerializerModifierTest.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.polaris.service.events.json; + +import static org.assertj.core.api.Assertions.assertThat; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.module.SimpleModule; +import java.util.Set; +import org.junit.jupiter.api.Test; + +class FieldRedactionSerializerModifierTest { + + private static final String REDACTED_MARKER = RedactingSerializer.getRedactedMarker(); + + static class TestBean { + public String password; + public String username; + public String apiKey; + public String publicData; + + public TestBean(String password, String username, String apiKey, String publicData) { + this.password = password; + this.username = username; + this.apiKey = apiKey; + this.publicData = publicData; + } + } + + @Test + void shouldRedactExactFieldNames() throws Exception { + ObjectMapper mapper = new ObjectMapper(); + SimpleModule module = new SimpleModule(); + module.setSerializerModifier( + new FieldRedactionSerializerModifier(Set.of("password", "apiKey"))); + mapper.registerModule(module); + + TestBean bean = new TestBean("secret123", "john_doe", "key-12345", "public info"); + String json = mapper.writeValueAsString(bean); + JsonNode node = mapper.readTree(json); + + assertThat(node.get("password").asText()).isEqualTo(REDACTED_MARKER); + assertThat(node.get("apiKey").asText()).isEqualTo(REDACTED_MARKER); + assertThat(node.get("username").asText()).isEqualTo("john_doe"); + assertThat(node.get("publicData").asText()).isEqualTo("public info"); + } + + @Test + void shouldRedactFieldsWithWildcardPrefix() throws Exception { + ObjectMapper mapper = new ObjectMapper(); + SimpleModule module = new SimpleModule(); + module.setSerializerModifier(new FieldRedactionSerializerModifier(Set.of("*Key"))); + mapper.registerModule(module); + + TestBean bean = new TestBean("secret123", "john_doe", "key-12345", "public info"); + String json = mapper.writeValueAsString(bean); + JsonNode node = mapper.readTree(json); + + assertThat(node.get("apiKey").asText()).isEqualTo(REDACTED_MARKER); + assertThat(node.get("password").asText()).isEqualTo("secret123"); + assertThat(node.get("username").asText()).isEqualTo("john_doe"); + assertThat(node.get("publicData").asText()).isEqualTo("public info"); + } + + @Test + void shouldRedactFieldsWithWildcardSuffix() throws Exception { + ObjectMapper mapper = new ObjectMapper(); + SimpleModule module = new SimpleModule(); + module.setSerializerModifier(new FieldRedactionSerializerModifier(Set.of("api*"))); + mapper.registerModule(module); + + TestBean bean = new TestBean("secret123", "john_doe", "key-12345", "public info"); + String json = mapper.writeValueAsString(bean); + JsonNode node = mapper.readTree(json); + + assertThat(node.get("apiKey").asText()).isEqualTo(REDACTED_MARKER); + assertThat(node.get("password").asText()).isEqualTo("secret123"); + assertThat(node.get("username").asText()).isEqualTo("john_doe"); + assertThat(node.get("publicData").asText()).isEqualTo("public info"); + } + + @Test + void shouldRedactFieldsWithMultiplePatterns() throws Exception { + ObjectMapper mapper = new ObjectMapper(); + SimpleModule module = new SimpleModule(); + module.setSerializerModifier( + new FieldRedactionSerializerModifier(Set.of("*word", "api*", "username"))); + mapper.registerModule(module); + + TestBean bean = new TestBean("secret123", "john_doe", "key-12345", "public info"); + String json = mapper.writeValueAsString(bean); + JsonNode node = mapper.readTree(json); + + assertThat(node.get("password").asText()).isEqualTo(REDACTED_MARKER); // matches *word + assertThat(node.get("apiKey").asText()).isEqualTo(REDACTED_MARKER); // matches api* + assertThat(node.get("username").asText()).isEqualTo(REDACTED_MARKER); // exact match + assertThat(node.get("publicData").asText()).isEqualTo("public info"); + } + + @Test + void shouldNotRedactWhenNoPatterns() throws Exception { + ObjectMapper mapper = new ObjectMapper(); + SimpleModule module = new SimpleModule(); + module.setSerializerModifier(new FieldRedactionSerializerModifier(Set.of())); + mapper.registerModule(module); + + TestBean bean = new TestBean("secret123", "john_doe", "key-12345", "public info"); + String json = mapper.writeValueAsString(bean); + JsonNode node = mapper.readTree(json); + + assertThat(node.get("password").asText()).isEqualTo("secret123"); + assertThat(node.get("apiKey").asText()).isEqualTo("key-12345"); + assertThat(node.get("username").asText()).isEqualTo("john_doe"); + assertThat(node.get("publicData").asText()).isEqualTo("public info"); + } + + @Test + void shouldHandleWildcardInMiddle() throws Exception { + ObjectMapper mapper = new ObjectMapper(); + SimpleModule module = new SimpleModule(); + module.setSerializerModifier(new FieldRedactionSerializerModifier(Set.of("*Data"))); + mapper.registerModule(module); + + TestBean bean = new TestBean("secret123", "john_doe", "key-12345", "public info"); + String json = mapper.writeValueAsString(bean); + JsonNode node = mapper.readTree(json); + + assertThat(node.get("publicData").asText()).isEqualTo(REDACTED_MARKER); + assertThat(node.get("password").asText()).isEqualTo("secret123"); + assertThat(node.get("apiKey").asText()).isEqualTo("key-12345"); + assertThat(node.get("username").asText()).isEqualTo("john_doe"); + } +} + diff --git a/runtime/service/src/test/java/org/apache/polaris/service/events/json/RedactingSerializerTest.java b/runtime/service/src/test/java/org/apache/polaris/service/events/json/RedactingSerializerTest.java new file mode 100644 index 0000000000..ead921a12d --- /dev/null +++ b/runtime/service/src/test/java/org/apache/polaris/service/events/json/RedactingSerializerTest.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.polaris.service.events.json; + +import static org.assertj.core.api.Assertions.assertThat; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.annotation.JsonSerialize; +import org.junit.jupiter.api.Test; + +class RedactingSerializerTest { + + private static final String REDACTED_MARKER = RedactingSerializer.getRedactedMarker(); + + static class TestBean { + @JsonSerialize(using = RedactingSerializer.class) + public String sensitiveField; + + public String normalField; + + public TestBean(String sensitiveField, String normalField) { + this.sensitiveField = sensitiveField; + this.normalField = normalField; + } + } + + @Test + void shouldRedactAnnotatedField() throws Exception { + ObjectMapper mapper = new ObjectMapper(); + TestBean bean = new TestBean("secret-value", "public-value"); + + String json = mapper.writeValueAsString(bean); + JsonNode node = mapper.readTree(json); + + assertThat(node.get("sensitiveField").asText()).isEqualTo(REDACTED_MARKER); + assertThat(node.get("normalField").asText()).isEqualTo("public-value"); + } + + @Test + void shouldRedactNullValues() throws Exception { + ObjectMapper mapper = new ObjectMapper(); + TestBean bean = new TestBean(null, "public-value"); + + String json = mapper.writeValueAsString(bean); + JsonNode node = mapper.readTree(json); + + // Note: Jackson doesn't call custom serializers for null values by default + // The field will be serialized as null unless we configure the mapper differently + assertThat(node.get("sensitiveField").isNull()).isTrue(); + assertThat(node.get("normalField").asText()).isEqualTo("public-value"); + } + + @Test + void shouldRedactNumericValues() throws Exception { + ObjectMapper mapper = new ObjectMapper(); + + class NumericBean { + @JsonSerialize(using = RedactingSerializer.class) + public Integer sensitiveNumber; + + public NumericBean(Integer sensitiveNumber) { + this.sensitiveNumber = sensitiveNumber; + } + } + + NumericBean bean = new NumericBean(12345); + String json = mapper.writeValueAsString(bean); + JsonNode node = mapper.readTree(json); + + assertThat(node.get("sensitiveNumber").asText()).isEqualTo(REDACTED_MARKER); + } + + @Test + void shouldRedactComplexObjects() throws Exception { + ObjectMapper mapper = new ObjectMapper(); + + class ComplexBean { + @JsonSerialize(using = RedactingSerializer.class) + public Object complexObject; + + public ComplexBean(Object complexObject) { + this.complexObject = complexObject; + } + } + + ComplexBean bean = new ComplexBean(new TestBean("nested-secret", "nested-public")); + String json = mapper.writeValueAsString(bean); + JsonNode node = mapper.readTree(json); + + assertThat(node.get("complexObject").asText()).isEqualTo(REDACTED_MARKER); + } + + @Test + void shouldReturnCorrectRedactedMarker() { + assertThat(RedactingSerializer.getRedactedMarker()).isEqualTo("***REDACTED***"); + } + + @Test + void shouldRedactEmptyStrings() throws Exception { + ObjectMapper mapper = new ObjectMapper(); + TestBean bean = new TestBean("", "public-value"); + + String json = mapper.writeValueAsString(bean); + JsonNode node = mapper.readTree(json); + + assertThat(node.get("sensitiveField").asText()).isEqualTo(REDACTED_MARKER); + assertThat(node.get("normalField").asText()).isEqualTo("public-value"); + } + + @Test + void shouldRedactLongStrings() throws Exception { + ObjectMapper mapper = new ObjectMapper(); + String longSecret = "a".repeat(10000); + TestBean bean = new TestBean(longSecret, "public-value"); + + String json = mapper.writeValueAsString(bean); + JsonNode node = mapper.readTree(json); + + assertThat(node.get("sensitiveField").asText()).isEqualTo(REDACTED_MARKER); + assertThat(json.length()).isLessThan(longSecret.length()); + } +} + diff --git a/runtime/service/src/test/java/org/apache/polaris/service/events/json/mixins/RedactionMixinsTest.java b/runtime/service/src/test/java/org/apache/polaris/service/events/json/mixins/RedactionMixinsTest.java new file mode 100644 index 0000000000..241b2e78e7 --- /dev/null +++ b/runtime/service/src/test/java/org/apache/polaris/service/events/json/mixins/RedactionMixinsTest.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.polaris.service.events.json.mixins; + +import static org.assertj.core.api.Assertions.assertThat; + +import org.junit.jupiter.api.Test; + +/** + * Unit tests for RedactionMixins. + * + *

This test verifies that all the mixin classes exist and can be referenced. The actual + * redaction behavior is tested in the integration tests (AwsCloudWatchEventListenerTest) where the + * mixins are applied to real objects and serialized. + */ +class RedactionMixinsTest { + + @Test + void shouldVerifyTableMetadataMixinsExist() { + // Verify that the TableMetadata mixins exist + assertThat(RedactionMixins.TableMetadataPartialRedactionMixin.class).isNotNull(); + assertThat(RedactionMixins.TableMetadataFullRedactionMixin.class).isNotNull(); + } + + @Test + void shouldVerifyViewMetadataMixinsExist() { + // Verify that the ViewMetadata mixins exist + assertThat(RedactionMixins.ViewMetadataPartialRedactionMixin.class).isNotNull(); + assertThat(RedactionMixins.ViewMetadataFullRedactionMixin.class).isNotNull(); + } + + @Test + void shouldVerifyResponseMixinsExist() { + // Verify that the response mixins exist + assertThat(RedactionMixins.LoadTableResponseRedactionMixin.class).isNotNull(); + assertThat(RedactionMixins.LoadViewResponseRedactionMixin.class).isNotNull(); + assertThat(RedactionMixins.ConfigResponseRedactionMixin.class).isNotNull(); + } + + @Test + void shouldVerifyEventMixinsExist() { + // Verify that the event mixins exist + assertThat(RedactionMixins.AfterGetConfigEventRedactionMixin.class).isNotNull(); + } + + @Test + void shouldVerifyRequestMixinsExist() { + // Verify that the request mixins exist + assertThat(RedactionMixins.CreateNamespaceRequestRedactionMixin.class).isNotNull(); + assertThat(RedactionMixins.UpdateNamespacePropertiesRequestRedactionMixin.class).isNotNull(); + } +} + diff --git a/runtime/service/src/test/java/org/apache/polaris/service/events/listeners/aws/cloudwatch/AwsCloudWatchEventListenerTest.java b/runtime/service/src/test/java/org/apache/polaris/service/events/listeners/aws/cloudwatch/AwsCloudWatchEventListenerTest.java index 708e88bd4e..4b47f203e9 100644 --- a/runtime/service/src/test/java/org/apache/polaris/service/events/listeners/aws/cloudwatch/AwsCloudWatchEventListenerTest.java +++ b/runtime/service/src/test/java/org/apache/polaris/service/events/listeners/aws/cloudwatch/AwsCloudWatchEventListenerTest.java @@ -394,4 +394,355 @@ private void verifyLogGroupAndStreamExist(CloudWatchLogsAsyncClient client) { .first() .satisfies(stream -> assertThat(stream.logStreamName()).isEqualTo(LOG_STREAM)); } + + @Test + void shouldRedactConfigResponseDirectly() throws Exception { + // Test that the mixin works when serializing ConfigResponse directly + ObjectMapper testMapper = new ObjectMapper(); + testMapper.addMixIn( + org.apache.iceberg.rest.responses.ConfigResponse.class, + org.apache.polaris.service.events.json.mixins.RedactionMixins + .ConfigResponseRedactionMixin.class); + + org.apache.iceberg.rest.responses.ConfigResponse configResponse = + org.apache.iceberg.rest.responses.ConfigResponse.builder() + .withDefault("s3.access-key-id", "AKIAIOSFODNN7EXAMPLE") + .withDefault("s3.secret-access-key", "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY") + .withDefault("token", "some-bearer-token") + .withOverride("prefix", "test-catalog") + .build(); + + String json = testMapper.writeValueAsString(configResponse); + System.out.println("Direct ConfigResponse JSON: " + json); + + // Verify that defaults and overrides are not in the JSON + assertThat(json).doesNotContain("defaults"); + assertThat(json).doesNotContain("overrides"); + assertThat(json).doesNotContain("AKIAIOSFODNN7EXAMPLE"); + } + + @Test + void shouldRedactSensitiveFieldsInPartialMode() throws Exception { + // Configure for PARTIAL redaction mode + when(config.redactionMode()) + .thenReturn(AwsCloudWatchConfiguration.RedactionMode.PARTIAL); + + CloudWatchLogsAsyncClient client = createCloudWatchAsyncClient(); + AwsCloudWatchEventListener listener = createListener(client); + listener.start(); + + try { + // Create an event with sensitive data + org.apache.iceberg.rest.responses.ConfigResponse configResponse = + org.apache.iceberg.rest.responses.ConfigResponse.builder() + .withDefault("s3.access-key-id", "AKIAIOSFODNN7EXAMPLE") + .withDefault("s3.secret-access-key", "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY") + .withDefault("token", "some-bearer-token") + .withOverride("prefix", "test-catalog") + .build(); + + listener.onAfterGetConfig( + new IcebergRestCatalogEvents.AfterGetConfigEvent(configResponse)); + + // Wait for event to be sent + Awaitility.await("event should be sent to CloudWatch") + .atMost(Duration.ofSeconds(30)) + .pollDelay(Duration.ofMillis(100)) + .pollInterval(Duration.ofMillis(100)) + .untilAsserted( + () -> { + GetLogEventsResponse resp = + client + .getLogEvents( + GetLogEventsRequest.builder() + .logGroupName(LOG_GROUP) + .logStreamName(LOG_STREAM) + .build()) + .join(); + assertThat(resp.events().size()).isGreaterThan(0); + }); + + // Retrieve and verify the logged event + GetLogEventsResponse logEvents = + client + .getLogEvents( + GetLogEventsRequest.builder() + .logGroupName(LOG_GROUP) + .logStreamName(LOG_STREAM) + .build()) + .join(); + + assertThat(logEvents.events()).hasSize(1); + String message = logEvents.events().get(0).message(); + System.out.println("REDACTED MESSAGE: " + message); + JsonNode eventJson = objectMapper.readTree(message); + + // Verify that the entire configResponse field is redacted (omitted from JSON) + assertThat(eventJson.has("config_response")) + .as("config_response field should be redacted") + .isFalse(); + + // Verify that the original values are NOT present + assertThat(message).doesNotContain("AKIAIOSFODNN7EXAMPLE"); + assertThat(message).doesNotContain("wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY"); + assertThat(message).doesNotContain("some-bearer-token"); + } finally { + client.close(); + listener.shutdown(); + } + } + + @Test + void shouldNotRedactInNoneMode() throws Exception { + // Configure for NONE redaction mode + when(config.redactionMode()).thenReturn(AwsCloudWatchConfiguration.RedactionMode.NONE); + + CloudWatchLogsAsyncClient client = createCloudWatchAsyncClient(); + AwsCloudWatchEventListener listener = createListener(client); + listener.start(); + + try { + // Create an event with data that would normally be redacted + org.apache.iceberg.rest.responses.ConfigResponse configResponse = + org.apache.iceberg.rest.responses.ConfigResponse.builder() + .withDefault("test-key", "test-value") + .withOverride("prefix", "test-catalog") + .build(); + + listener.onAfterGetConfig( + new IcebergRestCatalogEvents.AfterGetConfigEvent(configResponse)); + + // Wait for event to be sent + Awaitility.await("event should be sent to CloudWatch") + .atMost(Duration.ofSeconds(30)) + .pollDelay(Duration.ofMillis(100)) + .pollInterval(Duration.ofMillis(100)) + .untilAsserted( + () -> { + GetLogEventsResponse resp = + client + .getLogEvents( + GetLogEventsRequest.builder() + .logGroupName(LOG_GROUP) + .logStreamName(LOG_STREAM) + .build()) + .join(); + assertThat(resp.events().size()).isGreaterThan(0); + }); + + // Retrieve and verify the logged event + GetLogEventsResponse logEvents = + client + .getLogEvents( + GetLogEventsRequest.builder() + .logGroupName(LOG_GROUP) + .logStreamName(LOG_STREAM) + .build()) + .join(); + + assertThat(logEvents.events()).hasSize(1); + String message = logEvents.events().get(0).message(); + System.out.println("NON-REDACTED MESSAGE: " + message); + JsonNode eventJson = objectMapper.readTree(message); + + // Verify that values are NOT redacted (config_response field is present) + assertThat(eventJson.has("config_response")) + .as("config_response field should be present") + .isTrue(); + assertThat(message).contains("test-key"); + assertThat(message).contains("test-value"); + } finally { + client.close(); + listener.shutdown(); + } + } + + @Test + void shouldRedactAdditionalFieldsWhenConfigured() throws Exception { + // Configure additional fields to redact + when(config.redactionMode()).thenReturn(AwsCloudWatchConfiguration.RedactionMode.NONE); + when(config.additionalRedactedFields()) + .thenReturn(java.util.Optional.of(Set.of("catalogName", "namespace"))); + + CloudWatchLogsAsyncClient client = createCloudWatchAsyncClient(); + AwsCloudWatchEventListener listener = createListener(client); + listener.start(); + + try { + // Create and send a test event + Namespace namespaceTest = Namespace.of("sensitive_namespace"); + TableIdentifier testTable = TableIdentifier.of(namespaceTest, "test_table"); + listener.onAfterRefreshTable( + new IcebergRestCatalogEvents.AfterRefreshTableEvent("sensitive_catalog", testTable)); + + // Wait for the event to be sent + Awaitility.await() + .atMost(Duration.ofSeconds(10)) + .pollInterval(Duration.ofMillis(100)) + .until( + () -> { + GetLogEventsResponse response = + client + .getLogEvents( + GetLogEventsRequest.builder() + .logGroupName(LOG_GROUP) + .logStreamName(LOG_STREAM) + .build()) + .join(); + return !response.events().isEmpty(); + }); + + GetLogEventsResponse logEvents = + client + .getLogEvents( + GetLogEventsRequest.builder() + .logGroupName(LOG_GROUP) + .logStreamName(LOG_STREAM) + .build()) + .join(); + + assertThat(logEvents.events()).hasSize(1); + String message = logEvents.events().get(0).message(); + System.out.println("FIELD-REDACTED MESSAGE: " + message); + JsonNode eventJson = objectMapper.readTree(message); + + // Verify that configured fields are redacted + assertThat(eventJson.get("catalog_name").asText()) + .isEqualTo(org.apache.polaris.service.events.json.RedactingSerializer.getRedactedMarker()); + assertThat(eventJson.get("namespace").asText()) + .isEqualTo(org.apache.polaris.service.events.json.RedactingSerializer.getRedactedMarker()); + + // Verify that other fields are NOT redacted + assertThat(eventJson.has("table_identifier")).isTrue(); + assertThat(message).doesNotContain("sensitive_catalog"); + assertThat(message).doesNotContain("sensitive_namespace"); + } finally { + client.close(); + listener.shutdown(); + } + } + + @Test + void shouldRedactFieldsWithWildcardPatterns() throws Exception { + // Configure wildcard patterns to redact + when(config.redactionMode()).thenReturn(AwsCloudWatchConfiguration.RedactionMode.NONE); + when(config.additionalRedactedFields()) + .thenReturn(java.util.Optional.of(Set.of("*Name", "table*"))); + + CloudWatchLogsAsyncClient client = createCloudWatchAsyncClient(); + AwsCloudWatchEventListener listener = createListener(client); + listener.start(); + + try { + // Create and send a test event + Namespace namespaceTest = Namespace.of("test_namespace"); + TableIdentifier testTable = TableIdentifier.of(namespaceTest, "test_table"); + listener.onAfterRefreshTable( + new IcebergRestCatalogEvents.AfterRefreshTableEvent("test_catalog", testTable)); + + // Wait for the event to be sent + Awaitility.await() + .atMost(Duration.ofSeconds(10)) + .pollInterval(Duration.ofMillis(100)) + .until( + () -> { + GetLogEventsResponse response = + client + .getLogEvents( + GetLogEventsRequest.builder() + .logGroupName(LOG_GROUP) + .logStreamName(LOG_STREAM) + .build()) + .join(); + return !response.events().isEmpty(); + }); + + GetLogEventsResponse logEvents = + client + .getLogEvents( + GetLogEventsRequest.builder() + .logGroupName(LOG_GROUP) + .logStreamName(LOG_STREAM) + .build()) + .join(); + + assertThat(logEvents.events()).hasSize(1); + String message = logEvents.events().get(0).message(); + System.out.println("WILDCARD-REDACTED MESSAGE: " + message); + JsonNode eventJson = objectMapper.readTree(message); + + // Verify that fields matching wildcard patterns are redacted + // *Name should match catalogName, tableName + assertThat(eventJson.get("catalog_name").asText()) + .isEqualTo(org.apache.polaris.service.events.json.RedactingSerializer.getRedactedMarker()); + + // table* should match tableIdentifier + assertThat(eventJson.get("table_identifier").asText()) + .isEqualTo(org.apache.polaris.service.events.json.RedactingSerializer.getRedactedMarker()); + } finally { + client.close(); + listener.shutdown(); + } + } + + @Test + void shouldCombineRedactionModeWithAdditionalFields() throws Exception { + // Configure PARTIAL mode with additional fields + when(config.redactionMode()).thenReturn(AwsCloudWatchConfiguration.RedactionMode.PARTIAL); + when(config.additionalRedactedFields()) + .thenReturn(java.util.Optional.of(Set.of("catalogName"))); + + CloudWatchLogsAsyncClient client = createCloudWatchAsyncClient(); + AwsCloudWatchEventListener listener = createListener(client); + listener.start(); + + try { + // Create and send a test event + Namespace namespaceTest = Namespace.of("test_namespace"); + TableIdentifier testTable = TableIdentifier.of(namespaceTest, "test_table"); + listener.onAfterRefreshTable( + new IcebergRestCatalogEvents.AfterRefreshTableEvent("test_catalog", testTable)); + + // Wait for the event to be sent + Awaitility.await() + .atMost(Duration.ofSeconds(10)) + .pollInterval(Duration.ofMillis(100)) + .until( + () -> { + GetLogEventsResponse response = + client + .getLogEvents( + GetLogEventsRequest.builder() + .logGroupName(LOG_GROUP) + .logStreamName(LOG_STREAM) + .build()) + .join(); + return !response.events().isEmpty(); + }); + + GetLogEventsResponse logEvents = + client + .getLogEvents( + GetLogEventsRequest.builder() + .logGroupName(LOG_GROUP) + .logStreamName(LOG_STREAM) + .build()) + .join(); + + assertThat(logEvents.events()).hasSize(1); + String message = logEvents.events().get(0).message(); + System.out.println("COMBINED-REDACTED MESSAGE: " + message); + JsonNode eventJson = objectMapper.readTree(message); + + // Verify that catalogName is redacted (from additional fields) + assertThat(eventJson.get("catalog_name").asText()) + .isEqualTo(org.apache.polaris.service.events.json.RedactingSerializer.getRedactedMarker()); + + // Verify that default PARTIAL mode redactions are also applied + // (This would be tested with events that have properties/config fields) + } finally { + client.close(); + listener.shutdown(); + } + } }