diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/app/guice/AppFabricServiceRuntimeModule.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/app/guice/AppFabricServiceRuntimeModule.java index 15077d215f52..d3a7555f768e 100644 --- a/cdap-app-fabric/src/main/java/io/cdap/cdap/app/guice/AppFabricServiceRuntimeModule.java +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/app/guice/AppFabricServiceRuntimeModule.java @@ -44,6 +44,7 @@ import io.cdap.cdap.app.mapreduce.LocalMRJobInfoFetcher; import io.cdap.cdap.app.mapreduce.MRJobInfoFetcher; import io.cdap.cdap.app.store.Store; +import io.cdap.cdap.app.upgrade.UpgradeModule; import io.cdap.cdap.common.conf.CConfiguration; import io.cdap.cdap.common.conf.Constants; import io.cdap.cdap.common.conf.Constants.AppFabric; @@ -158,7 +159,9 @@ import io.cdap.cdap.messaging.server.FetchHandler; import io.cdap.cdap.messaging.server.MetadataHandler; import io.cdap.cdap.messaging.server.StoreHandler; +import io.cdap.cdap.metadata.DefaultMetadataAdmin; import io.cdap.cdap.metadata.LocalPreferencesFetcherInternal; +import io.cdap.cdap.metadata.MetadataAdmin; import io.cdap.cdap.metadata.PreferencesFetcher; import io.cdap.cdap.pipeline.PipelineFactory; import io.cdap.cdap.scheduler.CoreSchedulerService; @@ -239,6 +242,7 @@ public Module getInMemoryModules() { new MasterCredentialProviderModule(), new OperationModule(), new DataStorageAeadEncryptionModule(), + new UpgradeModule(), BootstrapModules.getInMemoryModule(), new AbstractModule() { @Override @@ -299,6 +303,7 @@ public Module getStandaloneModules() { new MasterCredentialProviderModule(), new OperationModule(), new DataStorageAeadEncryptionModule(), + new UpgradeModule(), serviceTypes.contains(ServiceType.PROCESSOR) ? BootstrapModules.getFileBasedModule() : BootstrapModules.getNoOpModule(), new AbstractModule() { @@ -360,6 +365,7 @@ public Module getDistributedModules() { new MasterCredentialProviderModule(), new OperationModule(), new DataStorageAeadEncryptionModule(), + new UpgradeModule(), serviceTypes.contains(ServiceType.PROCESSOR) ? BootstrapModules.getFileBasedModule() : BootstrapModules.getNoOpModule(), new AbstractModule() { @@ -374,6 +380,7 @@ protected void configure() { bind(StorageProviderNamespaceAdmin.class) .to(DistributedStorageProviderNamespaceAdmin.class); bind(UGIProvider.class).toProvider(UgiProviderProvider.class); + bind(MetadataAdmin.class).to(DefaultMetadataAdmin.class); bind(ProgramRunDispatcher.class).to(RemoteProgramRunDispatcher.class) .in(Scopes.SINGLETON); diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/app/preview/PreviewRunnerModule.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/app/preview/PreviewRunnerModule.java index 70111a1a794c..fa926c3cfd50 100644 --- a/cdap-app-fabric/src/main/java/io/cdap/cdap/app/preview/PreviewRunnerModule.java +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/app/preview/PreviewRunnerModule.java @@ -28,6 +28,7 @@ import io.cdap.cdap.app.deploy.ManagerFactory; import io.cdap.cdap.app.guice.AppFabricServiceRuntimeModule; import io.cdap.cdap.app.store.Store; +import io.cdap.cdap.app.upgrade.UpgradeModule; import io.cdap.cdap.common.conf.CConfiguration; import io.cdap.cdap.common.guice.LocalLocationModule; import io.cdap.cdap.common.namespace.NamespaceAdmin; @@ -204,6 +205,7 @@ protected void configure() { expose(OwnerAdmin.class); bind(CapabilityReader.class).to(CapabilityStatusStore.class); + install(new UpgradeModule()); } /** diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/app/upgrade/UpgradeManager.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/app/upgrade/UpgradeManager.java new file mode 100644 index 000000000000..3f74cddb5947 --- /dev/null +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/app/upgrade/UpgradeManager.java @@ -0,0 +1,36 @@ +/* + * Copyright © 2025 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.cdap.app.upgrade; + +import io.cdap.cdap.proto.id.NamespaceId; +import io.cdap.cdap.proto.upgrade.ApplicationUpgradeDetail; +import java.util.List; + +/** + * Manager for all upgrade related operations. + */ +public interface UpgradeManager { + + /** + * Lists upgrade details for applications in a namespace. + * + * @param namespace the namespace in which applications are searched. + * @return A list of {@link ApplicationUpgradeDetail} containing application and current/latest + * version details for the application artifact and the plugins. + */ + List listUpgrades(NamespaceId namespace) throws Exception; +} diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/app/upgrade/UpgradeModule.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/app/upgrade/UpgradeModule.java new file mode 100644 index 000000000000..490beb7ee1d8 --- /dev/null +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/app/upgrade/UpgradeModule.java @@ -0,0 +1,36 @@ +/* + * Copyright © 2025 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.cdap.app.upgrade; + +import com.google.inject.AbstractModule; +import com.google.inject.Scopes; +import io.cdap.cdap.internal.app.upgrade.ApplicationPluginMappingFetcher; +import io.cdap.cdap.internal.app.upgrade.DefaultUpgradeManager; +import io.cdap.cdap.internal.app.upgrade.MetadataApplicationPluginMappingFetcher; + +/** + * Module that configures Upgrade related classes. + */ +public class UpgradeModule extends AbstractModule { + + @Override + protected void configure() { + bind(UpgradeManager.class).to(DefaultUpgradeManager.class).in(Scopes.SINGLETON); + bind(ApplicationPluginMappingFetcher.class).to(MetadataApplicationPluginMappingFetcher.class) + .in(Scopes.SINGLETON); + } +} diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/gateway/handlers/AppLifecycleHttpHandler.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/gateway/handlers/AppLifecycleHttpHandler.java index 2f17e4c6b640..ff8f88a7a5b0 100644 --- a/cdap-app-fabric/src/main/java/io/cdap/cdap/gateway/handlers/AppLifecycleHttpHandler.java +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/gateway/handlers/AppLifecycleHttpHandler.java @@ -69,6 +69,8 @@ import io.cdap.cdap.proto.id.KerberosPrincipalId; import io.cdap.cdap.proto.id.NamespaceId; import io.cdap.cdap.proto.security.StandardPermission; +import io.cdap.cdap.proto.upgrade.ListUpgradeRequest; +import io.cdap.cdap.proto.upgrade.ListUpgradeResponse; import io.cdap.cdap.security.spi.authentication.AuthenticationContext; import io.cdap.cdap.security.spi.authorization.AccessEnforcer; import io.cdap.cdap.security.spi.authorization.UnauthorizedException; @@ -641,6 +643,30 @@ public void upgradeApplications(FullHttpRequest request, HttpResponder responder } } + /** + * Lists upgrade details for all pipelines in the namespace. + * + *

+ * The response will be of type {@link ListUpgradeResponse} which contains a list of + * {@link io.cdap.cdap.proto.upgrade.ApplicationUpgradeDetail} containing pipeline and plugin + * upgrade details. + *

+ */ + @POST + @Path("/upgrade/list") + @AuditPolicy(AuditDetail.REQUEST_BODY) + public void listApplicationUpgrades(FullHttpRequest request, HttpResponder responder, + @PathParam("namespace-id") String namespace) throws Exception { + NamespaceId namespaceId = validateNamespace(namespace); + ListUpgradeRequest upgradeRequest = + DECODE_GSON.fromJson(request.content().toString(StandardCharsets.UTF_8), + new TypeToken() { + }.getType()); + LOG.info("received list upgrade request {}", upgradeRequest); + responder.sendJson(HttpResponseStatus.OK, + GSON.toJson(applicationLifecycleService.listUpgradeResponse(namespaceId))); + } + /** * Gets {@link ApplicationDetail} for a set of applications. It expects a post body as a array of * object, with each object specifying the applciation id and an optional version. E.g. diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/services/ApplicationLifecycleService.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/services/ApplicationLifecycleService.java index 22eef1f32806..c180a76757dd 100644 --- a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/services/ApplicationLifecycleService.java +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/services/ApplicationLifecycleService.java @@ -52,6 +52,7 @@ import io.cdap.cdap.app.store.ApplicationFilter; import io.cdap.cdap.app.store.ScanApplicationsRequest; import io.cdap.cdap.app.store.Store; +import io.cdap.cdap.app.upgrade.UpgradeManager; import io.cdap.cdap.common.ApplicationNotFoundException; import io.cdap.cdap.common.ArtifactAlreadyExistsException; import io.cdap.cdap.common.ArtifactNotFoundException; @@ -113,6 +114,7 @@ import io.cdap.cdap.proto.security.Principal; import io.cdap.cdap.proto.security.StandardPermission; import io.cdap.cdap.proto.sourcecontrol.SourceControlMeta; +import io.cdap.cdap.proto.upgrade.ListUpgradeResponse; import io.cdap.cdap.security.impersonation.EntityImpersonator; import io.cdap.cdap.security.impersonation.Impersonator; import io.cdap.cdap.security.impersonation.OwnerAdmin; @@ -182,6 +184,7 @@ public class ApplicationLifecycleService extends AbstractIdleService { private final int batchSize; private final MetricsCollectionService metricsCollectionService; private final FeatureFlagsProvider featureFlagsProvider; + private final UpgradeManager upgradeManager; /** * Construct the ApplicationLifeCycleService with service factory and cConf coming from guice @@ -197,7 +200,7 @@ public ApplicationLifecycleService(CConfiguration cConf, AccessEnforcer accessEnforcer, AuthenticationContext authenticationContext, MessagingService messagingService, Impersonator impersonator, CapabilityReader capabilityReader, - MetricsCollectionService metricsCollectionService) { + MetricsCollectionService metricsCollectionService, UpgradeManager upgradeManager) { this.cConf = cConf; this.appUpdateSchedules = cConf.getBoolean(Constants.AppFabric.APP_UPDATE_SCHEDULES, Constants.AppFabric.DEFAULT_APP_UPDATE_SCHEDULES); @@ -215,6 +218,7 @@ public ApplicationLifecycleService(CConfiguration cConf, this.authenticationContext = authenticationContext; this.impersonator = impersonator; this.capabilityReader = capabilityReader; + this.upgradeManager = upgradeManager; this.adminEventPublisher = new AdminEventPublisher(cConf, new MultiThreadMessagingContext(messagingService)); this.metricsCollectionService = metricsCollectionService; @@ -665,6 +669,17 @@ public ApplicationId upgradeApplication(ApplicationId appId, return updateApplicationByArtifact(appId, currentSpec, allowedArtifactScopes, allowSnapshot); } + /** + * Lists the upgrade details for all applications in the namespace. + * + * @param namespace namespace for which upgrades are computed. + * @return A {@link ListUpgradeResponse} object that will be returned from the HTTP client. + * @throws Exception if there was any exception while listing upgrades. + */ + public ListUpgradeResponse listUpgradeResponse(NamespaceId namespace) throws Exception { + return new ListUpgradeResponse(upgradeManager.listUpgrades(namespace)); + } + /** * Upgrades the latest version of an existing application by upgrading application artifact * versions and plugin artifact versions. diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/upgrade/ApplicationPluginMapping.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/upgrade/ApplicationPluginMapping.java new file mode 100644 index 000000000000..ffd912ae695d --- /dev/null +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/upgrade/ApplicationPluginMapping.java @@ -0,0 +1,62 @@ +/* + * Copyright © 2025 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.cdap.internal.app.upgrade; + +import io.cdap.cdap.proto.id.ApplicationId; +import io.cdap.cdap.proto.id.PluginId; +import java.util.Objects; + +/** + * Mapping for the latest application version and its plugin. + */ +public class ApplicationPluginMapping { + + // Version less application Id. + private final ApplicationId applicationId; + private final PluginId pluginId; + + public ApplicationPluginMapping(ApplicationId applicationId, PluginId pluginId) { + this.applicationId = applicationId; + this.pluginId = pluginId; + } + + public ApplicationId getApplicationId() { + return applicationId; + } + + public PluginId getPluginId() { + return pluginId; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof ApplicationPluginMapping)) { + return false; + } + ApplicationPluginMapping that = (ApplicationPluginMapping) o; + return Objects.equals(applicationId, that.applicationId) && Objects.equals( + pluginId, that.pluginId); + } + + @Override + public int hashCode() { + return Objects.hash(applicationId, pluginId); + } +} diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/upgrade/ApplicationPluginMappingFetcher.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/upgrade/ApplicationPluginMappingFetcher.java new file mode 100644 index 000000000000..a195ce12a5af --- /dev/null +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/upgrade/ApplicationPluginMappingFetcher.java @@ -0,0 +1,38 @@ +/* + * Copyright © 2025 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.cdap.internal.app.upgrade; + +import io.cdap.cdap.proto.id.NamespaceId; +import java.util.List; + +/** + * Fetcher for application to plugin mapping. + */ +public interface ApplicationPluginMappingFetcher { + + /** + * Fetches application to plugin mapping for a namespace. + * + * @param namespace the namespace for which mapping will be fetched. + * @return a list of type {@link ApplicationPluginMapping}. An empty list will be returned if no + * applications are found. + * @throws Exception the exception during fetching of application plugin mapping. + */ + List fetchApplicationPluginMapping(NamespaceId namespace) + throws Exception; + +} diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/upgrade/DefaultUpgradeManager.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/upgrade/DefaultUpgradeManager.java new file mode 100644 index 000000000000..30bda0f0131a --- /dev/null +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/upgrade/DefaultUpgradeManager.java @@ -0,0 +1,138 @@ +/* + * Copyright © 2025 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.cdap.internal.app.upgrade; + +import io.cdap.cdap.api.artifact.ArtifactId; +import io.cdap.cdap.api.artifact.ArtifactSummary; +import io.cdap.cdap.api.artifact.ArtifactVersion; +import io.cdap.cdap.app.store.ScanApplicationsRequest; +import io.cdap.cdap.app.store.Store; +import io.cdap.cdap.app.upgrade.UpgradeManager; +import io.cdap.cdap.common.ApplicationNotFoundException; +import io.cdap.cdap.internal.app.runtime.artifact.ArtifactRepository; +import io.cdap.cdap.proto.id.ApplicationId; +import io.cdap.cdap.proto.id.NamespaceId; +import io.cdap.cdap.proto.id.PluginId; +import io.cdap.cdap.proto.upgrade.ApplicationUpgradeDetail; +import io.cdap.cdap.proto.upgrade.ArtifactUpgradeDetail; +import io.cdap.cdap.proto.upgrade.PluginUpgradeDetail; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Optional; +import java.util.stream.Collectors; +import javax.inject.Inject; + +/** + * Default implementation of {@link UpgradeManager}. + */ +public class DefaultUpgradeManager implements UpgradeManager { + + private final ApplicationPluginMappingFetcher mappingFetcher; + private final ArtifactRepository artifactRepository; + private final Store store; + + @Inject + public DefaultUpgradeManager(ApplicationPluginMappingFetcher mappingFetcher, + ArtifactRepository artifactRepository, Store store) { + this.mappingFetcher = mappingFetcher; + this.artifactRepository = artifactRepository; + this.store = store; + } + + @Override + public List listUpgrades(NamespaceId namespace) throws Exception { + List applicationPluginMappings = + mappingFetcher.fetchApplicationPluginMapping(namespace); + Map> applicationToPluginListMap = applicationPluginMappings.stream() + .collect(Collectors.groupingBy(x -> x.getApplicationId().getApplication(), + Collectors.mapping(ApplicationPluginMapping::getPluginId, Collectors.toList()))); + Map applicationIdArtifactIdMap = fetchApplicationArtifactMap( + namespace); + Map> artifactToLatestVersionMap = artifactToLatestVersionMap( + namespace); + return toApplicationUpgradeDetails(namespace, applicationToPluginListMap, + artifactToLatestVersionMap, applicationIdArtifactIdMap); + } + + private Map fetchApplicationArtifactMap(NamespaceId namespace) { + ScanApplicationsRequest scanAppRequest = ScanApplicationsRequest.builder() + .setNamespaceId(namespace).setLatestOnly(true).build(); + Map results = new HashMap<>(); + store.scanApplications(scanAppRequest, Integer.MAX_VALUE, (app, meta) -> { + results.put(app.getApplication(), meta.getSpec().getArtifactId()); + }); + return results; + } + + private Map> artifactToLatestVersionMap(NamespaceId namespace) + throws Exception { + List artifactSummaries = artifactRepository.getArtifactSummaries( + namespace, true); + // Get max version for each artifactID. + return artifactSummaries.stream(). + map(x -> new ArtifactId(x.getName(), new ArtifactVersion(x.getVersion()), x.getScope())). + collect(Collectors.groupingBy(ArtifactId::getName, + Collectors.maxBy(Comparator.comparing(ArtifactId::getVersion)))); + } + + private List toApplicationUpgradeDetails(NamespaceId namespaceId, + Map> appToPluginsMap, + Map> artifactToLatestVersionMap, + Map applicationIdArtifactIdMap) throws ApplicationNotFoundException { + List results = new ArrayList<>(); + for (Entry> entry : appToPluginsMap.entrySet()) { + String appName = entry.getKey(); + List pluginUpgradeDetails = toPluginUpgradeDetails(entry.getValue(), + artifactToLatestVersionMap); + ArtifactId currentApplicationArtifact = applicationIdArtifactIdMap.get(appName); + if (currentApplicationArtifact == null) { + throw new ApplicationNotFoundException( + new ApplicationId(namespaceId.getNamespace(), appName)); + } + results.add( + new ApplicationUpgradeDetail(appName, + toArtifactUpgradeDetails(currentApplicationArtifact, artifactToLatestVersionMap), + pluginUpgradeDetails)); + } + return results; + } + + private ArtifactUpgradeDetail toArtifactUpgradeDetails(ArtifactId current, + Map> artifactToLatestVersionMap) { + ArtifactId latest = artifactToLatestVersionMap.getOrDefault(current.getName(), Optional.empty()) + .orElse(current); + return new ArtifactUpgradeDetail(current.getName(), current.getVersion().getVersion(), + latest.getVersion().getVersion()); + } + + private List toPluginUpgradeDetails(List pluginIdList, + Map> artifactToLatestVersionMap) { + return pluginIdList.stream().map( + x -> { + ArtifactUpgradeDetail artifactUpgradeDetail = toArtifactUpgradeDetails( + x.getParent().toApiArtifactId(), + artifactToLatestVersionMap); + return new PluginUpgradeDetail(artifactUpgradeDetail, x.getPlugin(), x.getType()); + }).collect( + Collectors.toList()); + } + +} diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/upgrade/MetadataApplicationPluginMappingFetcher.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/upgrade/MetadataApplicationPluginMappingFetcher.java new file mode 100644 index 000000000000..01d1876e247c --- /dev/null +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/upgrade/MetadataApplicationPluginMappingFetcher.java @@ -0,0 +1,109 @@ +/* + * Copyright © 2025 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.cdap.internal.app.upgrade; + +import io.cdap.cdap.api.metadata.MetadataEntity; +import io.cdap.cdap.api.metadata.MetadataScope; +import io.cdap.cdap.metadata.MetadataAdmin; +import io.cdap.cdap.proto.id.ApplicationId; +import io.cdap.cdap.proto.id.NamespaceId; +import io.cdap.cdap.proto.id.PluginId; +import io.cdap.cdap.spi.metadata.MetadataRecord; +import io.cdap.cdap.spi.metadata.SearchRequest; +import io.cdap.cdap.spi.metadata.SearchResponse; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import javax.inject.Inject; + +/** + * Fetcher for application to plugin mapping from the stored metadata. + */ +public class MetadataApplicationPluginMappingFetcher implements ApplicationPluginMappingFetcher { + + private static final String CONNECTOR_PROPERTY = "connector"; + private static final String PROPERTY_SEPARATOR = ":"; + + private final MetadataAdmin metadataAdmin; + + @Inject + public MetadataApplicationPluginMappingFetcher(MetadataAdmin metadataAdmin) { + this.metadataAdmin = metadataAdmin; + } + + @Override + public List fetchApplicationPluginMapping(NamespaceId namespaceId) + throws Exception { + String namespace = namespaceId.getNamespace(); + List results = new ArrayList<>(); + results.addAll(fromSearchResponse( + metadataAdmin.search(buildSearchRequest(MetadataScope.USER, namespaceId)), + namespace)); + results.addAll( + fromSearchResponse( + metadataAdmin.search(buildSearchRequest(MetadataScope.SYSTEM, namespaceId)), + namespace)); + return results; + } + + private List fromSearchResponse(SearchResponse response, + String namespace) { + List results = new ArrayList<>(); + for (MetadataRecord result : response.getResults()) { + MetadataEntity pluginEntity = result.getEntity(); + if (!MetadataEntity.PLUGIN.equalsIgnoreCase(pluginEntity.getType())) { + continue; + } + PluginId pluginDetail = toPluginId(pluginEntity); + String namespacePrefix = namespace + PROPERTY_SEPARATOR; + // Pipelines are present as part of system scope. + Map pluginProperties = result.getMetadata() + .getProperties(MetadataScope.SYSTEM); + for (String propertyKey : pluginProperties.keySet()) { + if (CONNECTOR_PROPERTY.equalsIgnoreCase(propertyKey) || !propertyKey + .startsWith(namespacePrefix)) { + continue; + } + String[] split = propertyKey.split(PROPERTY_SEPARATOR); + results.add( + new ApplicationPluginMapping(new ApplicationId(split[0], split[1]), pluginDetail)); + } + } + return results; + } + + private PluginId toPluginId(MetadataEntity pluginEntity) { + return new PluginId(pluginEntity.getValue(MetadataEntity.NAMESPACE), + pluginEntity.getValue(MetadataEntity.ARTIFACT), + pluginEntity.getValue(MetadataEntity.VERSION), + pluginEntity.getValue(MetadataEntity.PLUGIN), + pluginEntity.getValue(MetadataEntity.TYPE)); + } + + private SearchRequest buildSearchRequest(MetadataScope scope, NamespaceId namespaceId) { + SearchRequest.Builder builder = SearchRequest.of("*"). + addType(MetadataEntity.PLUGIN). + // Fetching all records is ok here since it will be equal to number of plugins. + setLimit(Integer.MAX_VALUE); + if (MetadataScope.SYSTEM == scope) { + builder.addNamespace(scope.toString().toLowerCase()); + } else { + builder.addNamespace(namespaceId.getNamespace()); + } + return builder.build(); + } +} diff --git a/cdap-app-fabric/src/test/java/io/cdap/cdap/WorkflowAppWithPlugins.java b/cdap-app-fabric/src/test/java/io/cdap/cdap/WorkflowAppWithPlugins.java new file mode 100644 index 000000000000..693381350827 --- /dev/null +++ b/cdap-app-fabric/src/test/java/io/cdap/cdap/WorkflowAppWithPlugins.java @@ -0,0 +1,70 @@ +/* + * Copyright © 2025 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.cdap; + +import io.cdap.cdap.api.annotation.Description; +import io.cdap.cdap.api.annotation.Name; +import io.cdap.cdap.api.annotation.Plugin; +import io.cdap.cdap.api.app.AbstractApplication; +import io.cdap.cdap.api.plugin.PluginProperties; +import io.cdap.cdap.api.workflow.AbstractWorkflow; + +/** + * Application consisting of a plugin and a workflow. + */ +public class WorkflowAppWithPlugins extends AbstractApplication { + + public static final String NAME = "WorkflowAppWithPlugins"; + public static final String DESC = "Application which has a workflow and plugins"; + + public static final String PLUGIN_DESCRIPTION = "test plugin"; + public static final String PLUGIN_NAME = "testplugin-1"; + public static final String PLUGIN_TYPE = "testplugin"; + + @Override + public void configure() { + setName(NAME); + setDescription(DESC); + usePlugin(PLUGIN_TYPE, PLUGIN_NAME, "id", + PluginProperties.builder().build()); + addWorkflow(new NoOpWorkflow()); + } + + /** + * Dummy no-op workflow. + */ + public static class NoOpWorkflow extends AbstractWorkflow { + + public static final String NAME = "NoOpWorkflow"; + + @Override + public void configure() { + setName(NAME); + setDescription("NoOp Workflow description"); + } + } + + /** + * Dummy no-op plugin. + */ + @Plugin(type = PLUGIN_TYPE) + @Name(PLUGIN_NAME) + @Description(PLUGIN_DESCRIPTION) + public static class AppPlugin { + + } +} diff --git a/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/services/http/AppFabricTestBase.java b/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/services/http/AppFabricTestBase.java index e74c4b10b691..a24c99423291 100644 --- a/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/services/http/AppFabricTestBase.java +++ b/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/services/http/AppFabricTestBase.java @@ -117,6 +117,7 @@ import io.cdap.cdap.proto.sourcecontrol.PullMultipleAppsRequest; import io.cdap.cdap.proto.sourcecontrol.PushAppRequest; import io.cdap.cdap.proto.sourcecontrol.PushMultipleAppsRequest; +import io.cdap.cdap.proto.upgrade.ListUpgradeResponse; import io.cdap.cdap.runtime.spi.profile.ProfileStatus; import io.cdap.cdap.scheduler.CoreSchedulerService; import io.cdap.cdap.scheduler.Scheduler; @@ -791,6 +792,13 @@ protected Set getAppVersions(String namespace, String appName) throws Ex return readResponse(response, SET_STRING_TYPE); } + protected ListUpgradeResponse listApplicationUpgrade(String namespace) throws Exception{ + HttpResponse response = doPost(getVersionedApiPath("upgrade/list", + Constants.Gateway.API_VERSION_3_TOKEN, namespace), "{}"); + assertResponseCode(200, response); + return readResponse(response, ListUpgradeResponse.class); + } + /** * Checks the given schedule states. */ diff --git a/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/services/http/handlers/AppLifecycleHttpHandlerTest.java b/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/services/http/handlers/AppLifecycleHttpHandlerTest.java index 4e69de6b8491..c74968ea1b2c 100644 --- a/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/services/http/handlers/AppLifecycleHttpHandlerTest.java +++ b/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/services/http/handlers/AppLifecycleHttpHandlerTest.java @@ -16,7 +16,9 @@ package io.cdap.cdap.internal.app.services.http.handlers; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Sets; import com.google.gson.JsonObject; import com.google.gson.reflect.TypeToken; import com.google.inject.AbstractModule; @@ -29,20 +31,28 @@ import io.cdap.cdap.AppWithNoServices; import io.cdap.cdap.AppWithSchedule; import io.cdap.cdap.ConfigTestApp; +import io.cdap.cdap.WorkflowAppWithPlugins; import io.cdap.cdap.api.Config; import io.cdap.cdap.api.app.ApplicationSpecification; +import io.cdap.cdap.api.artifact.ArtifactRange; import io.cdap.cdap.api.artifact.ArtifactSummary; +import io.cdap.cdap.api.metadata.MetadataScope; import io.cdap.cdap.api.metrics.MetricsSystemClient; import io.cdap.cdap.app.deploy.ManagerFactory; +import io.cdap.cdap.app.program.ManifestFields; import io.cdap.cdap.app.store.Store; +import io.cdap.cdap.app.upgrade.UpgradeManager; import io.cdap.cdap.common.NamespaceNotFoundException; import io.cdap.cdap.common.NotFoundException; import io.cdap.cdap.common.conf.CConfiguration; import io.cdap.cdap.common.conf.Constants; import io.cdap.cdap.common.conf.Constants.Gateway; import io.cdap.cdap.common.id.Id; +import io.cdap.cdap.common.id.Id.Artifact; +import io.cdap.cdap.common.id.Id.Namespace; import io.cdap.cdap.common.metrics.NoOpMetricsCollectionService; import io.cdap.cdap.common.utils.ImmutablePair; +import io.cdap.cdap.common.utils.Tasks; import io.cdap.cdap.config.PreferencesService; import io.cdap.cdap.data2.metadata.writer.MetadataServiceClient; import io.cdap.cdap.data2.registry.UsageRegistry; @@ -72,10 +82,15 @@ import io.cdap.cdap.proto.id.ApplicationReference; import io.cdap.cdap.proto.id.ArtifactId; import io.cdap.cdap.proto.id.NamespaceId; +import io.cdap.cdap.proto.id.PluginId; import io.cdap.cdap.proto.id.ProfileId; import io.cdap.cdap.proto.id.ProgramId; import io.cdap.cdap.proto.id.ProgramReference; import io.cdap.cdap.proto.profile.Profile; +import io.cdap.cdap.proto.upgrade.ApplicationUpgradeDetail; +import io.cdap.cdap.proto.upgrade.ArtifactUpgradeDetail; +import io.cdap.cdap.proto.upgrade.ListUpgradeResponse; +import io.cdap.cdap.proto.upgrade.PluginUpgradeDetail; import io.cdap.cdap.security.impersonation.CurrentUGIProvider; import io.cdap.cdap.security.impersonation.Impersonator; import io.cdap.cdap.security.impersonation.OwnerAdmin; @@ -83,6 +98,7 @@ import io.cdap.cdap.security.spi.authentication.AuthenticationContext; import io.cdap.cdap.security.spi.authorization.AccessEnforcer; import io.cdap.common.http.HttpResponse; +import io.netty.handler.codec.http.HttpResponseStatus; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; @@ -93,6 +109,8 @@ import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.jar.Manifest; import java.util.stream.Collectors; import org.jboss.resteasy.util.HttpResponseCodes; import org.junit.After; @@ -143,12 +161,13 @@ public ApplicationLifecycleService createLifeCycleService(CConfiguration cConf, MetadataServiceClient metadataServiceClient, AccessEnforcer accessEnforcer, AuthenticationContext authenticationContext, MessagingService messagingService, Impersonator impersonator, - CapabilityReader capabilityReader) { + CapabilityReader capabilityReader, UpgradeManager upgradeManager) { return Mockito.spy(new ApplicationLifecycleService(cConf, store, scheduleManager, usageRegistry, preferencesService, metricsSystemClient, ownerAdmin, artifactRepository, managerFactory, metadataServiceClient, accessEnforcer, authenticationContext, - messagingService, impersonator, capabilityReader, new NoOpMetricsCollectionService())); + messagingService, impersonator, capabilityReader, new NoOpMetricsCollectionService(), + upgradeManager)); } }); } @@ -1621,6 +1640,117 @@ public void testGetApplicationCount() throws Exception { Assert.assertEquals(0, result); } + @Test + public void testListUpgradeSuccess() throws Exception { + String ns1AppName = WorkflowAppWithPlugins.NAME; + Id.Namespace ns1 = Id.Namespace.from(TEST_NAMESPACE1); + Id.Artifact ns1ArtifactId = Id.Artifact.from(ns1, WorkflowAppWithPlugins.class.getSimpleName(), + "1.0.0-SNAPSHOT"); + HttpResponse response = addAppArtifact(ns1ArtifactId, WorkflowAppWithPlugins.class); + Assert.assertEquals(200, response.getResponseCode()); + + Id.Artifact pluginArtifact = deployPluginArtifact(ns1, ns1ArtifactId, + WorkflowAppWithPlugins.class, + "app-plugin", "1.0.0"); + Id.Application appId = Id.Application.from(ns1, ns1AppName); + response = deploy(appId, + new AppRequest<>(ArtifactSummary.from(ns1ArtifactId.toArtifactId()))); + Assert.assertEquals(200, response.getResponseCode()); + // Wait for metadata to propagate. + Tasks.waitFor(false, + () -> getMetadataProperties( + new PluginId(TEST_NAMESPACE1, "app-plugin", "1.0.0", "testplugin-1", + "testplugin").toMetadataEntity(), MetadataScope.SYSTEM).isEmpty(), + 10, TimeUnit.SECONDS); + ListUpgradeResponse expected = new ListUpgradeResponse(ImmutableList.of( + new ApplicationUpgradeDetail("WorkflowAppWithPlugins", + new ArtifactUpgradeDetail("WorkflowAppWithPlugins", "1.0.0-SNAPSHOT", "1.0.0-SNAPSHOT"), + ImmutableList.of( + new PluginUpgradeDetail(new ArtifactUpgradeDetail("app-plugin", "1.0.0", "1.0.0"), + "testplugin-1", "testplugin"))))); + + ListUpgradeResponse actual = listApplicationUpgrade(TEST_NAMESPACE1); + + Assert.assertEquals(new HashSet<>(expected.getApplicationUpgradeDetails()), + new HashSet<>(actual.getApplicationUpgradeDetails())); + + deleteArtifact(ns1ArtifactId, 200); + deleteArtifact(pluginArtifact, 200); + deleteApp(appId, 200); + } + + @Test + public void testListUpgradeWithArtifactAndPluginUpgradable() throws Exception { + String ns1AppName = WorkflowAppWithPlugins.NAME; + Id.Namespace ns1 = Id.Namespace.from(TEST_NAMESPACE1); + Id.Artifact ns1ArtifactId = Id.Artifact.from(ns1, WorkflowAppWithPlugins.class.getSimpleName(), + "1.0.0-SNAPSHOT"); + HttpResponse response = addAppArtifact(ns1ArtifactId, WorkflowAppWithPlugins.class); + Assert.assertEquals(200, response.getResponseCode()); + + // Deploying a newer version of application artifact. + Id.Artifact latestAppArtifact = Id.Artifact.from(ns1, + WorkflowAppWithPlugins.class.getSimpleName(), + "1.0.1"); + response = addAppArtifact(latestAppArtifact, WorkflowAppWithPlugins.class); + Assert.assertEquals(200, response.getResponseCode()); + + Id.Artifact pluginArtifact = deployPluginArtifact(ns1, ns1ArtifactId, + WorkflowAppWithPlugins.class, "app-plugin", "1.0.0"); + Id.Application appId = Id.Application.from(ns1, ns1AppName); + response = deploy(appId, + new AppRequest<>(ArtifactSummary.from(ns1ArtifactId.toArtifactId()))); + Assert.assertEquals(200, response.getResponseCode()); + // Deploy a newer version of the plugin artifact. + Id.Artifact latestPluginArtifact = deployPluginArtifact(ns1, ns1ArtifactId, + WorkflowAppWithPlugins.class, "app-plugin", "1.0.1"); + + // Wait for metadata to propagate since metadata writes are async. + Tasks.waitFor(false, + () -> getMetadataProperties( + new PluginId(TEST_NAMESPACE1, "app-plugin", "1.0.0", "testplugin-1", + "testplugin").toMetadataEntity(), MetadataScope.SYSTEM).isEmpty(), + 10, TimeUnit.SECONDS); + ListUpgradeResponse expected = new ListUpgradeResponse(ImmutableList.of( + new ApplicationUpgradeDetail("WorkflowAppWithPlugins", + new ArtifactUpgradeDetail("WorkflowAppWithPlugins", "1.0.0-SNAPSHOT", "1.0.1"), + ImmutableList.of( + new PluginUpgradeDetail(new ArtifactUpgradeDetail("app-plugin", "1.0.0", "1.0.1"), + "testplugin-1", "testplugin"))))); + + ListUpgradeResponse actual = listApplicationUpgrade(TEST_NAMESPACE1); + + Assert.assertEquals( + String.format("Assertion failed. Expected : %s, Actual: %s", GSON.toJson(expected), + GSON.toJson(actual)), new HashSet<>(expected.getApplicationUpgradeDetails()), + new HashSet<>(actual.getApplicationUpgradeDetails())); + + deleteArtifact(ns1ArtifactId, 200); + deleteArtifact(latestAppArtifact, 200); + deleteArtifact(pluginArtifact, 200); + deleteArtifact(latestPluginArtifact, 200); + deleteApp(appId, 200); + } + + private Artifact deployPluginArtifact(Namespace namespace, Artifact artifact, Class claszz, + String artifactName, String version) + throws Exception { + Set parents = Sets.newHashSet(new ArtifactRange( + namespace.getId(), artifact.getName(), + artifact.getVersion(), true, + artifact.getVersion(), true)); + + Manifest manifest = new Manifest(); + manifest.getMainAttributes().put( + ManifestFields.EXPORT_PACKAGE, claszz.getPackage().getName()); + Id.Artifact pluginArtifact = Id.Artifact.fromEntityId( + namespace.toEntityId().artifact(artifactName, version)); + Assert.assertEquals(HttpResponseStatus.OK.code(), + addPluginArtifact(pluginArtifact, claszz, manifest, + parents).getResponseCode()); + return pluginArtifact; + } + @After public void cleanup() throws Exception { setLCMFlag(false); diff --git a/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/upgrade/DefaultUpgradeManagerTest.java b/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/upgrade/DefaultUpgradeManagerTest.java new file mode 100644 index 000000000000..c6921ad0c493 --- /dev/null +++ b/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/upgrade/DefaultUpgradeManagerTest.java @@ -0,0 +1,227 @@ +/* + * Copyright © 2025 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.cdap.internal.app.upgrade; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import io.cdap.cdap.api.app.ApplicationSpecification; +import io.cdap.cdap.api.artifact.ArtifactId; +import io.cdap.cdap.api.artifact.ArtifactScope; +import io.cdap.cdap.api.artifact.ArtifactSummary; +import io.cdap.cdap.api.artifact.ArtifactVersion; +import io.cdap.cdap.app.store.ScanApplicationsRequest; +import io.cdap.cdap.app.store.Store; +import io.cdap.cdap.app.upgrade.UpgradeManager; +import io.cdap.cdap.common.ApplicationNotFoundException; +import io.cdap.cdap.internal.app.runtime.artifact.ArtifactRepository; +import io.cdap.cdap.internal.app.store.ApplicationMeta; +import io.cdap.cdap.proto.id.ApplicationId; +import io.cdap.cdap.proto.id.NamespaceId; +import io.cdap.cdap.proto.id.PluginId; +import io.cdap.cdap.proto.upgrade.ApplicationUpgradeDetail; +import io.cdap.cdap.proto.upgrade.ArtifactUpgradeDetail; +import io.cdap.cdap.proto.upgrade.PluginUpgradeDetail; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.function.BiConsumer; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + + +@RunWith(MockitoJUnitRunner.class) +public class DefaultUpgradeManagerTest { + + @Mock + private ApplicationPluginMappingFetcher mappingFetcher; + @Mock + private ArtifactRepository artifactRepository; + @Mock + private Store store; + + private UpgradeManager upgradeManager; + + @Before + public void setUp() { + this.upgradeManager = new DefaultUpgradeManager(mappingFetcher, artifactRepository, store); + } + + /** + * The following is the test data setup. + *

+ * The test creates the following pipelines(including artifact and plugin details) in the format + * pipeline_name -> artifact_id(version) -> + * plugin1(artifact:version,type), plugin2(artifact,version,type) + * Following pipelines are created: + * pipeline_1 -> cdap-data-pipeline(6.11.0) -> GCS(google-cloud,0.24.0,batchsource) + * ,trash(trash-plugin,1.2.0,batchsink) + * pipeline_2 -> cdap-data-pipeline(6.11.0) -> GCS(google-cloud,0.23.0,batchsource) + * pipeline_3 -> cdap-data-pipeline(6.10.0) -> sap(sap-plugin,1.10.0,batchsink) + *

+ *

+ * Following application artifacts are present in the system: + * cdap-data-pipeline -> 6.11.0(latest) and 6.10.0 + *

+ *

+ * Following plugins are present in the system + * gcs -> google-cloud -> 0.23.0 and 0.24.0(latest) + * trash -> trash-plugin -> 1.1.0 and 1.2.0(latest) + * sap -> sap-plugin ->1.10.0(latest) + *

+ */ + @Test + public void testListUpgrades() throws Exception { + when(mappingFetcher.fetchApplicationPluginMapping(NamespaceId.DEFAULT)).thenReturn( + createAppPluginMappings()); + when(artifactRepository.getArtifactSummaries(NamespaceId.DEFAULT, true)).thenReturn( + createArtifactSummaries()); + + ApplicationSpecification appSpec1 = mock(ApplicationSpecification.class); + when(appSpec1.getArtifactId()).thenReturn(createArtifactId("6.11.0")); + ApplicationSpecification appSpec2 = mock(ApplicationSpecification.class); + when(appSpec2.getArtifactId()).thenReturn(createArtifactId("6.11.0")); + ApplicationSpecification appSpec3 = mock(ApplicationSpecification.class); + when(appSpec3.getArtifactId()).thenReturn(createArtifactId("6.10.0")); + + Map appsInStore = ImmutableMap.of( + new ApplicationId("default", "pipeline_1"), + new ApplicationMeta("pipeline_1", appSpec1, null), + new ApplicationId("default", "pipeline_2"), + new ApplicationMeta("pipeline_2", appSpec2, null), + new ApplicationId("default", "pipeline_3"), + new ApplicationMeta("pipeline_3", appSpec3, null) + ); + doAnswer(invocation -> { + BiConsumer consumer = invocation.getArgument(2); + appsInStore.forEach(consumer); + return null; + }).when(store) + .scanApplications(any(ScanApplicationsRequest.class), anyInt(), any(BiConsumer.class)); + List expected = createExpectedUpgradeDetails(); + + List actual = upgradeManager.listUpgrades(NamespaceId.DEFAULT); + + Assert.assertEquals(new HashSet<>(expected), new HashSet<>(actual)); + verify(store, times(1)).scanApplications(any(ScanApplicationsRequest.class), anyInt(), + any(BiConsumer.class)); + verify(mappingFetcher, times(1)).fetchApplicationPluginMapping(NamespaceId.DEFAULT); + verify(artifactRepository, times(1)).getArtifactSummaries(NamespaceId.DEFAULT, true); + } + + @Test(expected = ApplicationNotFoundException.class) + public void testListUpgradesThrowsException() throws Exception { + when(mappingFetcher.fetchApplicationPluginMapping(NamespaceId.DEFAULT)).thenReturn( + createAppPluginMappings()); + when(artifactRepository.getArtifactSummaries(NamespaceId.DEFAULT, true)).thenReturn( + createArtifactSummaries()); + + ApplicationSpecification appSpec1 = mock(ApplicationSpecification.class); + when(appSpec1.getArtifactId()).thenReturn(createArtifactId("6.11.0")); + ApplicationSpecification appSpec2 = mock(ApplicationSpecification.class); + when(appSpec2.getArtifactId()).thenReturn(createArtifactId("6.11.0")); + + // No application artifact mapping is present for pipeline_3 and should hence throw an + // exception. + Map appsInStore = ImmutableMap.of( + new ApplicationId("default", "pipeline_1"), + new ApplicationMeta("pipeline_1", appSpec1, null), + new ApplicationId("default", "pipeline_2"), + new ApplicationMeta("pipeline_2", appSpec2, null) + ); + doAnswer(invocation -> { + BiConsumer consumer = invocation.getArgument(2); + appsInStore.forEach(consumer); + return null; + }).when(store) + .scanApplications(any(ScanApplicationsRequest.class), anyInt(), any(BiConsumer.class)); + + upgradeManager.listUpgrades(NamespaceId.DEFAULT); + } + + + private ArtifactId createArtifactId(String version) { + return new ArtifactId("cdap-data-pipeline", new ArtifactVersion(version), ArtifactScope.SYSTEM); + } + + private ApplicationPluginMapping getAppPluginMapping(String appName, String pluginNamespace, + String artifactId, String version, String pluginName, String type) { + return new ApplicationPluginMapping(new ApplicationId("default", appName), + new PluginId(pluginNamespace, artifactId, version, pluginName, type)); + } + + private List createAppPluginMappings() { + return ImmutableList.of( + getAppPluginMapping("pipeline_1", "system", "google-cloud", "0.24.0", "GCS", "batchsource"), + getAppPluginMapping("pipeline_1", "default", "trash-plugin", "1.2.0", "trash", "batchsink"), + getAppPluginMapping("pipeline_2", "system", "google-cloud", "0.23.0", "GCS", "batchsource"), + getAppPluginMapping("pipeline_3", "default", "sap-plugin", "1.10.0", "sap", "batchsink") + ); + } + + private List createArtifactSummaries() { + return ImmutableList.of( + new ArtifactSummary("google-cloud", "0.23.0", ArtifactScope.SYSTEM), + new ArtifactSummary("google-cloud", "0.24.0", ArtifactScope.SYSTEM), + new ArtifactSummary("trash-plugin", "1.2.0", ArtifactScope.USER), + new ArtifactSummary("trash-plugin", "1.1.0", ArtifactScope.USER), + new ArtifactSummary("sap-plugin", "1.10.0", ArtifactScope.USER), + new ArtifactSummary("cdap-data-pipeline", "6.10.0", ArtifactScope.SYSTEM), + new ArtifactSummary("cdap-data-pipeline", "6.11.0", ArtifactScope.SYSTEM) + ); + } + + private List createExpectedUpgradeDetails() { + return ImmutableList.of( + // Pipeline 1 has all the latest versions and is does not need to be upgraded. + new ApplicationUpgradeDetail("pipeline_1", + new ArtifactUpgradeDetail("cdap-data-pipeline", "6.11.0", "6.11.0"), + ImmutableList.of( + new PluginUpgradeDetail( + new ArtifactUpgradeDetail("google-cloud", "0.24.0", "0.24.0"), + "GCS", "batchsource"), + new PluginUpgradeDetail(new ArtifactUpgradeDetail("trash-plugin", "1.2.0", "1.2.0"), + "trash", "batchsink"))), + // Pipeline 2 has all the latest application artifact but an older gcs version and is hence + // upgrade eligible. + new ApplicationUpgradeDetail("pipeline_2", + new ArtifactUpgradeDetail("cdap-data-pipeline", "6.11.0", "6.11.0"), + ImmutableList.of( + new PluginUpgradeDetail( + new ArtifactUpgradeDetail("google-cloud", "0.23.0", "0.24.0"), + "GCS", "batchsource"))), + // Pipeline 3 has the latest plugin version but older application artifact and is hence + // upgrade eligible. + new ApplicationUpgradeDetail("pipeline_3", + new ArtifactUpgradeDetail("cdap-data-pipeline", "6.10.0", "6.11.0"), + ImmutableList.of( + new PluginUpgradeDetail(new ArtifactUpgradeDetail("sap-plugin", "1.10.0", "1.10.0"), + "sap", "batchsink"))) + ); + } +} \ No newline at end of file diff --git a/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/upgrade/MetadataApplicationPluginMappingFetcherTest.java b/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/upgrade/MetadataApplicationPluginMappingFetcherTest.java new file mode 100644 index 000000000000..d94713a12130 --- /dev/null +++ b/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/upgrade/MetadataApplicationPluginMappingFetcherTest.java @@ -0,0 +1,160 @@ +/* + * Copyright © 2025 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.cdap.internal.app.upgrade; + +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import io.cdap.cdap.api.metadata.MetadataEntity; +import io.cdap.cdap.api.metadata.MetadataScope; +import io.cdap.cdap.metadata.MetadataAdmin; +import io.cdap.cdap.proto.id.ApplicationId; +import io.cdap.cdap.proto.id.NamespaceId; +import io.cdap.cdap.proto.id.PluginId; +import io.cdap.cdap.spi.metadata.Metadata; +import io.cdap.cdap.spi.metadata.MetadataRecord; +import io.cdap.cdap.spi.metadata.SearchRequest; +import io.cdap.cdap.spi.metadata.SearchResponse; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +/** + * Parameterized tests for {@link MetadataApplicationPluginMappingFetcher} using a parameter class. + */ +@RunWith(Parameterized.class) +public class MetadataApplicationPluginMappingFetcherTest { + + // A static inner class to hold the parameters for each test case + static class TestCaseParams { + + final String name; + final SearchResponse userResponse; + final SearchResponse systemResponse; + final List expectedMappings; + + TestCaseParams(String name, SearchResponse userResponse, SearchResponse systemResponse, + List expectedMappings) { + this.name = name; + this.userResponse = userResponse; + this.systemResponse = systemResponse; + this.expectedMappings = expectedMappings; + } + } + + @Parameters(name = "{index}: {0}") + public static Collection data() { + SearchResponse emptyResponse = new SearchResponse(SearchRequest.of("*").build(), null, 0, + Integer.MAX_VALUE, 0, Collections.emptyList()); + + SearchResponse userPluginResponse = new SearchResponse(SearchRequest.of("*").build(), null, 0, + Integer.MAX_VALUE, 0, ImmutableList.of(new MetadataRecord( + MetadataEntity.builder().append("namespace", "default") + .append("artifact", "trash-plugin") + .append("version", "1.2.0") + .append(MetadataEntity.TYPE, "batchsink") + .appendAsType(MetadataEntity.PLUGIN, "trash") + .build(), + new Metadata( + MetadataScope.SYSTEM, ImmutableMap.of("default:pipeline_1", "1"))))); + + SearchResponse systemPluginResponse = new SearchResponse(SearchRequest.of("*").build(), null, 0, + Integer.MAX_VALUE, 0, ImmutableList.of(new MetadataRecord( + MetadataEntity.builder().append("namespace", "system") + .append("artifact", "google-cloud") + .append("version", "0.24.0") + .append(MetadataEntity.TYPE, "batchsource") + .appendAsType(MetadataEntity.PLUGIN, "GCS") + .build(), + new Metadata( + MetadataScope.SYSTEM, ImmutableMap.of("default:pipeline_1", "1"))))); + + ApplicationPluginMapping applicationSystemPluginMapping = + new ApplicationPluginMapping(new ApplicationId("default", "pipeline_1"), + new PluginId("system", "google-cloud", "0.24.0", "GCS", "batchsource")); + ApplicationPluginMapping applicationUserPluginMapping = + new ApplicationPluginMapping(new ApplicationId("default", "pipeline_1"), + new PluginId("default", "trash-plugin", "1.2.0", "trash", "batchsink")); + + return Arrays.asList(new Object[][]{ + {new TestCaseParams("No Plugin Mappings", emptyResponse, emptyResponse, + Collections.emptyList())}, + {new TestCaseParams("Only System Plugin Mappings", emptyResponse, systemPluginResponse, + ImmutableList.of(applicationSystemPluginMapping))}, + {new TestCaseParams("Only User Plugin Mappings", userPluginResponse, emptyResponse, + ImmutableList.of(applicationUserPluginMapping))}, + {new TestCaseParams("Both Plugin Mappings present", userPluginResponse, + systemPluginResponse, + ImmutableList.of(applicationUserPluginMapping, applicationSystemPluginMapping))} + }); + } + + @Mock + private MetadataAdmin metadataAdmin; + + private MetadataApplicationPluginMappingFetcher mappingFetcher; + + private final TestCaseParams params; + + public MetadataApplicationPluginMappingFetcherTest(TestCaseParams params) { + this.params = params; + } + + @Before + public void setUp() { + MockitoAnnotations.initMocks(this); + mappingFetcher = new MetadataApplicationPluginMappingFetcher(metadataAdmin); + } + + @Test + public void testFetchApplicationPluginMapping() throws Exception { + // Arrange + SearchRequest userPluginRequest = SearchRequest.of("*").addType("plugin") + .setLimit(Integer.MAX_VALUE) + .addNamespace(NamespaceId.DEFAULT.getNamespace()) + .build(); + SearchRequest systemPluginRequest = SearchRequest.of("*").addType("plugin") + .setLimit(Integer.MAX_VALUE) + .addNamespace(NamespaceId.SYSTEM.getNamespace()).build(); + + when(metadataAdmin.search(eq(userPluginRequest))).thenReturn(params.userResponse); + when(metadataAdmin.search(eq(systemPluginRequest))).thenReturn(params.systemResponse); + + // Act + List actual = mappingFetcher.fetchApplicationPluginMapping( + NamespaceId.DEFAULT); + + // Assert + Assert.assertEquals(String.format("Test case '%s' failed.", params.name), + params.expectedMappings, actual); + verify(metadataAdmin, times(1)).search(userPluginRequest); + verify(metadataAdmin, times(1)).search(systemPluginRequest); + } +} \ No newline at end of file diff --git a/cdap-gateway/src/main/java/io/cdap/cdap/gateway/router/ExpectedNumberOfAuditPolicyPaths.java b/cdap-gateway/src/main/java/io/cdap/cdap/gateway/router/ExpectedNumberOfAuditPolicyPaths.java index e8458d4018b2..9ba467e60682 100644 --- a/cdap-gateway/src/main/java/io/cdap/cdap/gateway/router/ExpectedNumberOfAuditPolicyPaths.java +++ b/cdap-gateway/src/main/java/io/cdap/cdap/gateway/router/ExpectedNumberOfAuditPolicyPaths.java @@ -22,5 +22,5 @@ */ public final class ExpectedNumberOfAuditPolicyPaths { - public static final int EXPECTED_PATH_NUMBER = 49; + public static final int EXPECTED_PATH_NUMBER = 50; } diff --git a/cdap-proto/src/main/java/io/cdap/cdap/proto/upgrade/ApplicationUpgradeDetail.java b/cdap-proto/src/main/java/io/cdap/cdap/proto/upgrade/ApplicationUpgradeDetail.java new file mode 100644 index 000000000000..dd526e7ddcb8 --- /dev/null +++ b/cdap-proto/src/main/java/io/cdap/cdap/proto/upgrade/ApplicationUpgradeDetail.java @@ -0,0 +1,78 @@ +/* + * Copyright © 2025 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.cdap.proto.upgrade; + +import java.util.List; +import java.util.Objects; + +/** + * Upgrade details for an application consisting of application artifact and plugin upgrade details. + */ +public class ApplicationUpgradeDetail { + + private final String name; + private final ArtifactUpgradeDetail applicationArtifactUpgradeDetail; + private final List pluginUpgradeDetails; + private final boolean upgradable; + + + public ApplicationUpgradeDetail(String name, ArtifactUpgradeDetail applicationArtifactUpgradeDetail, + List pluginUpgradeDetails) { + this.name = name; + this.applicationArtifactUpgradeDetail = applicationArtifactUpgradeDetail; + this.pluginUpgradeDetails = pluginUpgradeDetails; + this.upgradable = + applicationArtifactUpgradeDetail.isUpgradable() || pluginUpgradeDetails.stream() + .anyMatch(ArtifactUpgradeDetail::isUpgradable); + } + + public String getName() { + return name; + } + + public ArtifactUpgradeDetail getApplicationArtifactUpgradeDetail() { + return applicationArtifactUpgradeDetail; + } + + public List getPluginUpgradeDetails() { + return pluginUpgradeDetails; + } + + public boolean isUpgradable() { + return upgradable; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof ApplicationUpgradeDetail)) { + return false; + } + ApplicationUpgradeDetail that = (ApplicationUpgradeDetail) o; + return upgradable == that.upgradable && Objects.equals(name, that.name) + && Objects.equals(applicationArtifactUpgradeDetail, + that.applicationArtifactUpgradeDetail) && Objects.equals(pluginUpgradeDetails, + that.pluginUpgradeDetails); + } + + @Override + public int hashCode() { + return Objects.hash(name, applicationArtifactUpgradeDetail, pluginUpgradeDetails, upgradable); + } +} diff --git a/cdap-proto/src/main/java/io/cdap/cdap/proto/upgrade/ArtifactUpgradeDetail.java b/cdap-proto/src/main/java/io/cdap/cdap/proto/upgrade/ArtifactUpgradeDetail.java new file mode 100644 index 000000000000..eaca0984dab9 --- /dev/null +++ b/cdap-proto/src/main/java/io/cdap/cdap/proto/upgrade/ArtifactUpgradeDetail.java @@ -0,0 +1,73 @@ +/* + * Copyright © 2025 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.cdap.proto.upgrade; + +import java.util.Objects; + +/** + * Upgrade details for the artifact consisting of artifact name and version details. + */ +public class ArtifactUpgradeDetail { + + private final String artifactName; + private final String currentVersion; + private final String latestVersion; + private final boolean upgradable; + + public ArtifactUpgradeDetail(String artifactName, String currentVersion, + String latestVersion) { + this.artifactName = artifactName; + this.currentVersion = currentVersion; + this.latestVersion = latestVersion; + this.upgradable = !latestVersion.equalsIgnoreCase(currentVersion); + } + + public String getArtifactName() { + return artifactName; + } + + public String getCurrentVersion() { + return currentVersion; + } + + public String getLatestVersion() { + return latestVersion; + } + + public boolean isUpgradable() { + return upgradable; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof ArtifactUpgradeDetail)) { + return false; + } + ArtifactUpgradeDetail that = (ArtifactUpgradeDetail) o; + return upgradable == that.upgradable && Objects.equals(artifactName, that.artifactName) + && Objects.equals(currentVersion, that.currentVersion) && Objects.equals( + latestVersion, that.latestVersion); + } + + @Override + public int hashCode() { + return Objects.hash(artifactName, currentVersion, latestVersion, upgradable); + } +} diff --git a/cdap-proto/src/main/java/io/cdap/cdap/proto/upgrade/ListUpgradeRequest.java b/cdap-proto/src/main/java/io/cdap/cdap/proto/upgrade/ListUpgradeRequest.java new file mode 100644 index 000000000000..4ba3521e322c --- /dev/null +++ b/cdap-proto/src/main/java/io/cdap/cdap/proto/upgrade/ListUpgradeRequest.java @@ -0,0 +1,24 @@ +/* + * Copyright © 2025 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.cdap.proto.upgrade; + +/** + * Request for list upgrades HTTP call. + */ +public class ListUpgradeRequest { + // TODO(CDAP-21168): Add list API filters. +} diff --git a/cdap-proto/src/main/java/io/cdap/cdap/proto/upgrade/ListUpgradeResponse.java b/cdap-proto/src/main/java/io/cdap/cdap/proto/upgrade/ListUpgradeResponse.java new file mode 100644 index 000000000000..3d1091838c35 --- /dev/null +++ b/cdap-proto/src/main/java/io/cdap/cdap/proto/upgrade/ListUpgradeResponse.java @@ -0,0 +1,53 @@ +/* + * Copyright © 2025 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.cdap.proto.upgrade; + +import java.util.List; +import java.util.Objects; + +/** + * Response for list upgrades HTTP call. + */ +public class ListUpgradeResponse { + + private final List applicationUpgradeDetails; + + public ListUpgradeResponse(List applicationUpgradeDetails) { + this.applicationUpgradeDetails = applicationUpgradeDetails; + } + + public List getApplicationUpgradeDetails() { + return applicationUpgradeDetails; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof ListUpgradeResponse)) { + return false; + } + ListUpgradeResponse that = (ListUpgradeResponse) o; + return Objects.equals(applicationUpgradeDetails, that.applicationUpgradeDetails); + } + + @Override + public int hashCode() { + return Objects.hashCode(applicationUpgradeDetails); + } +} diff --git a/cdap-proto/src/main/java/io/cdap/cdap/proto/upgrade/PluginUpgradeDetail.java b/cdap-proto/src/main/java/io/cdap/cdap/proto/upgrade/PluginUpgradeDetail.java new file mode 100644 index 000000000000..134820928a36 --- /dev/null +++ b/cdap-proto/src/main/java/io/cdap/cdap/proto/upgrade/PluginUpgradeDetail.java @@ -0,0 +1,65 @@ +/* + * Copyright © 2025 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.cdap.proto.upgrade; + +import java.util.Objects; + +/** + * Upgrade details of a plugin. This extends the Artifact Upgrade detail class. + */ +public class PluginUpgradeDetail extends ArtifactUpgradeDetail { + + private final String pluginName; + private final String pluginType; + + public PluginUpgradeDetail(ArtifactUpgradeDetail artifactUpgradeDetail, + String pluginName, String pluginType) { + super(artifactUpgradeDetail.getArtifactName(), artifactUpgradeDetail.getCurrentVersion(), + artifactUpgradeDetail.getLatestVersion()); + this.pluginName = pluginName; + this.pluginType = pluginType; + } + + public String getPluginName() { + return pluginName; + } + + public String getPluginType() { + return pluginType; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof PluginUpgradeDetail)) { + return false; + } + if (!super.equals(o)) { + return false; + } + PluginUpgradeDetail that = (PluginUpgradeDetail) o; + return Objects.equals(pluginName, that.pluginName) && Objects.equals( + pluginType, that.pluginType); + } + + @Override + public int hashCode() { + return Objects.hash(super.hashCode(), pluginName, pluginType); + } +}