diff --git a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/ProcessGroupClient.java b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/ProcessGroupClient.java index 456df8ed880c..e46dcf195632 100644 --- a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/ProcessGroupClient.java +++ b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/ProcessGroupClient.java @@ -18,6 +18,7 @@ import org.apache.nifi.web.api.entity.ControllerServiceEntity; import org.apache.nifi.web.api.entity.CopySnippetRequestEntity; +import org.apache.nifi.web.api.entity.DropRequestEntity; import org.apache.nifi.web.api.entity.FlowComparisonEntity; import org.apache.nifi.web.api.entity.FlowEntity; import org.apache.nifi.web.api.entity.ProcessGroupEntity; @@ -62,4 +63,8 @@ FlowEntity copySnippet(String processGroupId, CopySnippetRequestEntity copySnipp FlowComparisonEntity getLocalModifications(String processGroupId) throws NiFiClientException, IOException; File exportProcessGroup(String processGroupId, boolean includeReferencedServices, File outputFile) throws NiFiClientException, IOException; + + DropRequestEntity emptyQueues(String processGroupId) throws NiFiClientException, IOException; + + DropRequestEntity getEmptyQueuesRequest(String processGroupId, String requestId) throws NiFiClientException, IOException; } diff --git a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/impl/JerseyProcessGroupClient.java b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/impl/JerseyProcessGroupClient.java index 87ce433fb433..13d9d325e5f3 100644 --- a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/impl/JerseyProcessGroupClient.java +++ b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/impl/JerseyProcessGroupClient.java @@ -24,6 +24,7 @@ import org.apache.nifi.toolkit.cli.impl.util.FileUtils; import org.apache.nifi.web.api.entity.ControllerServiceEntity; import org.apache.nifi.web.api.entity.CopySnippetRequestEntity; +import org.apache.nifi.web.api.entity.DropRequestEntity; import org.apache.nifi.web.api.entity.FlowComparisonEntity; import org.apache.nifi.web.api.entity.FlowEntity; import org.apache.nifi.web.api.entity.ProcessGroupEntity; @@ -286,4 +287,39 @@ public File exportProcessGroup(final String processGroupId, final boolean includ }); } + @Override + public DropRequestEntity emptyQueues(String processGroupId) throws NiFiClientException, IOException { + if (StringUtils.isBlank(processGroupId)) { + throw new IllegalArgumentException("Process group id cannot be null or blank"); + } + + return executeAction("Error emptying queues in Process Group", () -> { + final WebTarget target = processGroupsTarget + .path("{id}/empty-all-connections-requests") + .resolveTemplate("id", processGroupId); + + return getRequestBuilder(target).post(null, DropRequestEntity.class); + }); + } + + @Override + public DropRequestEntity getEmptyQueuesRequest(String processGroupId, String requestId) + throws NiFiClientException, IOException { + if (StringUtils.isBlank(processGroupId)) { + throw new IllegalArgumentException("Process group id cannot be null or blank"); + } + if (StringUtils.isBlank(requestId)) { + throw new IllegalArgumentException("Request id cannot be null or blank"); + } + + return executeAction("Error getting Drop Request status for Process Group", () -> { + final WebTarget target = processGroupsTarget + .path("{id}/empty-all-connections-requests/{drop-request-id}") + .resolveTemplate("id", processGroupId) + .resolveTemplate("drop-request-id", requestId); + + return getRequestBuilder(target).get(DropRequestEntity.class); + }); + } + } \ No newline at end of file diff --git a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/nifi/NiFiCommandGroup.java b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/nifi/NiFiCommandGroup.java index cca3b4d8a239..d3b6092918da 100644 --- a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/nifi/NiFiCommandGroup.java +++ b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/nifi/NiFiCommandGroup.java @@ -85,6 +85,7 @@ import org.apache.nifi.toolkit.cli.impl.command.nifi.pg.PGCreate; import org.apache.nifi.toolkit.cli.impl.command.nifi.pg.PGCreateControllerService; import org.apache.nifi.toolkit.cli.impl.command.nifi.pg.PGDisableControllerServices; +import org.apache.nifi.toolkit.cli.impl.command.nifi.pg.PGEmptyQueues; import org.apache.nifi.toolkit.cli.impl.command.nifi.pg.PGEnableControllerServices; import org.apache.nifi.toolkit.cli.impl.command.nifi.pg.PGExport; import org.apache.nifi.toolkit.cli.impl.command.nifi.pg.PGGetAllVersions; @@ -163,6 +164,7 @@ protected List createCommands() { commands.add(new PGSetParamContext()); commands.add(new PGReplace()); commands.add(new PGExport()); + commands.add(new PGEmptyQueues()); commands.add(new GetControllerServices()); commands.add(new GetControllerService()); commands.add(new CreateControllerService()); diff --git a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/nifi/pg/PGEmptyQueues.java b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/nifi/pg/PGEmptyQueues.java new file mode 100644 index 000000000000..f41ed4dad18e --- /dev/null +++ b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/nifi/pg/PGEmptyQueues.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.toolkit.cli.impl.command.nifi.pg; + +import org.apache.commons.cli.MissingOptionException; +import org.apache.nifi.toolkit.cli.api.Context; +import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClient; +import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientException; +import org.apache.nifi.toolkit.cli.impl.client.nifi.ProcessGroupClient; +import org.apache.nifi.toolkit.cli.impl.command.CommandOption; +import org.apache.nifi.toolkit.cli.impl.command.nifi.AbstractNiFiCommand; +import org.apache.nifi.toolkit.cli.impl.result.VoidResult; +import org.apache.nifi.web.api.entity.DropRequestEntity; + +import java.io.IOException; +import java.util.Properties; + +/** + * Command to stop the components of a process group. + */ +public class PGEmptyQueues extends AbstractNiFiCommand { + + public static final int MAX_ITERATIONS = 20; + public static final long DELAY_MS = 1000; + + public PGEmptyQueues() { + super("pg-empty-queues", VoidResult.class); + } + + @Override + public String getDescription() { + return "Empty all queues, recursively, in the specified Process Group. It is recommended to first use pg-stop."; + } + + @Override + protected void doInitialize(final Context context) { + addOption(CommandOption.PG_ID.createOption()); + } + + @Override + public VoidResult doExecute(final NiFiClient client, final Properties properties) + throws NiFiClientException, IOException, MissingOptionException { + + final String pgId = getRequiredArg(properties, CommandOption.PG_ID); + + final ProcessGroupClient pgClient = client.getProcessGroupClient(); + DropRequestEntity requestEntity = pgClient.emptyQueues(pgId); + final String requestId = requestEntity.getDropRequest().getId(); + + int iterations = 1; + while (!requestEntity.getDropRequest().isFinished() && iterations < MAX_ITERATIONS) { + if (shouldPrint(properties)) { + println("Emptying queues, currently at " + requestEntity.getDropRequest().getPercentCompleted() + "% (" + + iterations + " of " + MAX_ITERATIONS + ")..."); + } + sleep(DELAY_MS); + iterations++; + requestEntity = pgClient.getEmptyQueuesRequest(pgId, requestId); + } + + if (shouldPrint(properties)) { + if (requestEntity.getDropRequest().isFinished()) { + println("Drop request completed. Deleted: " + requestEntity.getDropRequest().getDropped()); + } else { + println("Drop request didn't complete yet. Thus far, deleted: " + requestEntity.getDropRequest().getDropped()); + } + } + + return VoidResult.getInstance(); + } + + private boolean shouldPrint(final Properties properties) { + return isInteractive() || isVerbose(properties); + } + + private void sleep(long millis) { + try { + Thread.sleep(millis); + } catch (InterruptedException e) { + Thread.interrupted(); + } + } + +}