Skip to content

Commit 852f3a2

Browse files
authored
NIFI-13937 Add command pg-empty-queues to NiFi CLI (#9459)
Signed-off-by: David Handermann <[email protected]>
1 parent 7de2e68 commit 852f3a2

File tree

4 files changed

+141
-0
lines changed

4 files changed

+141
-0
lines changed

nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/ProcessGroupClient.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import org.apache.nifi.web.api.entity.ControllerServiceEntity;
2020
import org.apache.nifi.web.api.entity.CopySnippetRequestEntity;
21+
import org.apache.nifi.web.api.entity.DropRequestEntity;
2122
import org.apache.nifi.web.api.entity.FlowComparisonEntity;
2223
import org.apache.nifi.web.api.entity.FlowEntity;
2324
import org.apache.nifi.web.api.entity.ProcessGroupEntity;
@@ -62,4 +63,8 @@ FlowEntity copySnippet(String processGroupId, CopySnippetRequestEntity copySnipp
6263
FlowComparisonEntity getLocalModifications(String processGroupId) throws NiFiClientException, IOException;
6364

6465
File exportProcessGroup(String processGroupId, boolean includeReferencedServices, File outputFile) throws NiFiClientException, IOException;
66+
67+
DropRequestEntity emptyQueues(String processGroupId) throws NiFiClientException, IOException;
68+
69+
DropRequestEntity getEmptyQueuesRequest(String processGroupId, String requestId) throws NiFiClientException, IOException;
6570
}

nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/impl/JerseyProcessGroupClient.java

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.nifi.toolkit.cli.impl.util.FileUtils;
2525
import org.apache.nifi.web.api.entity.ControllerServiceEntity;
2626
import org.apache.nifi.web.api.entity.CopySnippetRequestEntity;
27+
import org.apache.nifi.web.api.entity.DropRequestEntity;
2728
import org.apache.nifi.web.api.entity.FlowComparisonEntity;
2829
import org.apache.nifi.web.api.entity.FlowEntity;
2930
import org.apache.nifi.web.api.entity.ProcessGroupEntity;
@@ -286,4 +287,39 @@ public File exportProcessGroup(final String processGroupId, final boolean includ
286287
});
287288
}
288289

290+
@Override
291+
public DropRequestEntity emptyQueues(String processGroupId) throws NiFiClientException, IOException {
292+
if (StringUtils.isBlank(processGroupId)) {
293+
throw new IllegalArgumentException("Process group id cannot be null or blank");
294+
}
295+
296+
return executeAction("Error emptying queues in Process Group", () -> {
297+
final WebTarget target = processGroupsTarget
298+
.path("{id}/empty-all-connections-requests")
299+
.resolveTemplate("id", processGroupId);
300+
301+
return getRequestBuilder(target).post(null, DropRequestEntity.class);
302+
});
303+
}
304+
305+
@Override
306+
public DropRequestEntity getEmptyQueuesRequest(String processGroupId, String requestId)
307+
throws NiFiClientException, IOException {
308+
if (StringUtils.isBlank(processGroupId)) {
309+
throw new IllegalArgumentException("Process group id cannot be null or blank");
310+
}
311+
if (StringUtils.isBlank(requestId)) {
312+
throw new IllegalArgumentException("Request id cannot be null or blank");
313+
}
314+
315+
return executeAction("Error getting Drop Request status for Process Group", () -> {
316+
final WebTarget target = processGroupsTarget
317+
.path("{id}/empty-all-connections-requests/{drop-request-id}")
318+
.resolveTemplate("id", processGroupId)
319+
.resolveTemplate("drop-request-id", requestId);
320+
321+
return getRequestBuilder(target).get(DropRequestEntity.class);
322+
});
323+
}
324+
289325
}

nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/nifi/NiFiCommandGroup.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@
8585
import org.apache.nifi.toolkit.cli.impl.command.nifi.pg.PGCreate;
8686
import org.apache.nifi.toolkit.cli.impl.command.nifi.pg.PGCreateControllerService;
8787
import org.apache.nifi.toolkit.cli.impl.command.nifi.pg.PGDisableControllerServices;
88+
import org.apache.nifi.toolkit.cli.impl.command.nifi.pg.PGEmptyQueues;
8889
import org.apache.nifi.toolkit.cli.impl.command.nifi.pg.PGEnableControllerServices;
8990
import org.apache.nifi.toolkit.cli.impl.command.nifi.pg.PGExport;
9091
import org.apache.nifi.toolkit.cli.impl.command.nifi.pg.PGGetAllVersions;
@@ -163,6 +164,7 @@ protected List<Command> createCommands() {
163164
commands.add(new PGSetParamContext());
164165
commands.add(new PGReplace());
165166
commands.add(new PGExport());
167+
commands.add(new PGEmptyQueues());
166168
commands.add(new GetControllerServices());
167169
commands.add(new GetControllerService());
168170
commands.add(new CreateControllerService());
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.nifi.toolkit.cli.impl.command.nifi.pg;
18+
19+
import org.apache.commons.cli.MissingOptionException;
20+
import org.apache.nifi.toolkit.cli.api.Context;
21+
import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClient;
22+
import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientException;
23+
import org.apache.nifi.toolkit.cli.impl.client.nifi.ProcessGroupClient;
24+
import org.apache.nifi.toolkit.cli.impl.command.CommandOption;
25+
import org.apache.nifi.toolkit.cli.impl.command.nifi.AbstractNiFiCommand;
26+
import org.apache.nifi.toolkit.cli.impl.result.VoidResult;
27+
import org.apache.nifi.web.api.entity.DropRequestEntity;
28+
29+
import java.io.IOException;
30+
import java.util.Properties;
31+
32+
/**
33+
* Command to stop the components of a process group.
34+
*/
35+
public class PGEmptyQueues extends AbstractNiFiCommand<VoidResult> {
36+
37+
public static final int MAX_ITERATIONS = 20;
38+
public static final long DELAY_MS = 1000;
39+
40+
public PGEmptyQueues() {
41+
super("pg-empty-queues", VoidResult.class);
42+
}
43+
44+
@Override
45+
public String getDescription() {
46+
return "Empty all queues, recursively, in the specified Process Group. It is recommended to first use pg-stop.";
47+
}
48+
49+
@Override
50+
protected void doInitialize(final Context context) {
51+
addOption(CommandOption.PG_ID.createOption());
52+
}
53+
54+
@Override
55+
public VoidResult doExecute(final NiFiClient client, final Properties properties)
56+
throws NiFiClientException, IOException, MissingOptionException {
57+
58+
final String pgId = getRequiredArg(properties, CommandOption.PG_ID);
59+
60+
final ProcessGroupClient pgClient = client.getProcessGroupClient();
61+
DropRequestEntity requestEntity = pgClient.emptyQueues(pgId);
62+
final String requestId = requestEntity.getDropRequest().getId();
63+
64+
int iterations = 1;
65+
while (!requestEntity.getDropRequest().isFinished() && iterations < MAX_ITERATIONS) {
66+
if (shouldPrint(properties)) {
67+
println("Emptying queues, currently at " + requestEntity.getDropRequest().getPercentCompleted() + "% ("
68+
+ iterations + " of " + MAX_ITERATIONS + ")...");
69+
}
70+
sleep(DELAY_MS);
71+
iterations++;
72+
requestEntity = pgClient.getEmptyQueuesRequest(pgId, requestId);
73+
}
74+
75+
if (shouldPrint(properties)) {
76+
if (requestEntity.getDropRequest().isFinished()) {
77+
println("Drop request completed. Deleted: " + requestEntity.getDropRequest().getDropped());
78+
} else {
79+
println("Drop request didn't complete yet. Thus far, deleted: " + requestEntity.getDropRequest().getDropped());
80+
}
81+
}
82+
83+
return VoidResult.getInstance();
84+
}
85+
86+
private boolean shouldPrint(final Properties properties) {
87+
return isInteractive() || isVerbose(properties);
88+
}
89+
90+
private void sleep(long millis) {
91+
try {
92+
Thread.sleep(millis);
93+
} catch (InterruptedException e) {
94+
Thread.interrupted();
95+
}
96+
}
97+
98+
}

0 commit comments

Comments
 (0)