Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CONTRIBUTING.rst
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ Please see SECURITY.md for reporting security vulnerabilities.
Creating Issues
===============

In order to file bugs or new feature requests, please use http://issues.cask.co.
In order to file bugs or new feature requests, please use https://cdap.atlassian.net/jira/

Feature Requests
================
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,10 +140,15 @@ public void complete() {
} catch (Exception e) {
// If there is exception, use the remote execution controller to try killing the remote process
try {
LOG.debug("Force termination of remote process for program run {}", programRunId);
remoteProcessController.kill(RuntimeJobStatus.RUNNING);
RuntimeJobStatus currentStatus = remoteProcessController.getStatus();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Prior Logic :

  • While STATUS is RUNNING , keep checking every second.
  • If it exceeds for more than 5 seconds, then throw IllegalStateException
  • So the moment getStatus is called, the 5 second check is done without any gap and immediately goes to catch block for force termination.

I agree between this few MS the dataproc job status could be DONE

But in the new extra check, the GAP for error still exists and this intermittent Wrong killing of pipeline would still happen.

My point is there is not much time gap between the existing getStatus == running and remoteProcessController.kill() , and similar to extra check..

if (currentStatus == RuntimeJobStatus.RUNNING || currentStatus == RuntimeJobStatus.STARTING) {
LOG.debug("Force termination of remote process for program run {} as it is in state {}", programRunId, currentStatus);
remoteProcessController.kill(currentStatus);
} else {
LOG.debug("Skipping termination of remote process for program run {} as it is already in terminal state {}", programRunId, currentStatus);
}
} catch (Exception ex) {
LOG.warn("Failed to terminate remote process for program run {}", programRunId, ex);
LOG.warn("Failed to get status or terminate remote process for program run {} during force kill attempt", programRunId, ex);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package io.cdap.cdap.internal.app.runtime.distributed.remote;

import io.cdap.cdap.common.conf.CConfiguration;
import io.cdap.cdap.common.conf.Constants;
import io.cdap.cdap.common.service.RetryStrategies;
import io.cdap.cdap.proto.id.NamespaceId;
import io.cdap.cdap.proto.id.ProgramId;
import io.cdap.cdap.proto.id.ProgramRunId;
import io.cdap.cdap.proto.ProgramType;
import io.cdap.cdap.runtime.spi.runtimejob.RuntimeJobStatus;
import org.apache.twill.api.TwillController;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

/**
* Tests for {@link RemoteExecutionTwillController}.
*/
public class RemoteExecutionTwillControllerTest {

private CConfiguration cConf;
private ProgramRunId programRunId;
private RemoteProcessController remoteProcessController;
private ScheduledExecutorService scheduler;
private RemoteExecutionService remoteExecutionService;
private CompletableFuture<Void> startupCompletionFuture;

@Before
public void setUp() {
cConf = CConfiguration.create();
cConf.setLong(Constants.RuntimeMonitor.POLL_TIME_MS, 100);
programRunId = new ProgramRunId(NamespaceId.DEFAULT.getNamespace(), "testapp", ProgramType.WORKFLOW, "testworkflow", "testrun");
remoteProcessController = Mockito.mock(RemoteProcessController.class);
scheduler = Executors.newSingleThreadScheduledExecutor();
remoteExecutionService = Mockito.mock(RemoteExecutionService.class);
startupCompletionFuture = new CompletableFuture<>();
}

private RemoteExecutionTwillController createController(boolean terminateWithController) {
return new RemoteExecutionTwillController(cConf, programRunId, startupCompletionFuture,
remoteProcessController, scheduler, remoteExecutionService,
terminateWithController);
}

@Test
public void testComplete_KillSkippedForTerminalState() throws Exception {
RemoteExecutionTwillController controller = createController(true);
startupCompletionFuture.complete(null);

// Simulate getStatus throwing an exception to enter the catch block
when(remoteProcessController.getStatus()).thenThrow(new RuntimeException("Simulated poll failure"));

// In the catch block, simulate the job being COMPLETED
when(remoteProcessController.getStatus()).thenReturn(RuntimeJobStatus.COMPLETED);

controller.complete();

// Verify kill was NOT called because the status was terminal
verify(remoteProcessController, never()).kill(any());
}

@Test
public void testComplete_KillCalledForRunningState() throws Exception {
RemoteExecutionTwillController controller = createController(true);
startupCompletionFuture.complete(null);

// Simulate getStatus throwing an exception to enter the catch block
when(remoteProcessController.getStatus()).thenThrow(new RuntimeException("Simulated poll failure"));
// In the catch block, simulate the job being RUNNING
when(remoteProcessController.getStatus()).thenReturn(RuntimeJobStatus.RUNNING);

controller.complete();

// Verify kill WAS called
verify(remoteProcessController).kill(RuntimeJobStatus.RUNNING);
}

@Test
public void testComplete_StatusCheckFailsInCatch() throws Exception {
RemoteExecutionTwillController controller = createController(true);
startupCompletionFuture.complete(null);

// Simulate getStatus throwing an exception to enter the catch block
when(remoteProcessController.getStatus()).thenThrow(new RuntimeException("Simulated poll failure"))
.thenThrow(new RuntimeException("Simulated getStatus failure in catch"));

controller.complete();

// Verify kill was NOT called because getStatus failed in the catch
verify(remoteProcessController, never()).kill(any());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -127,10 +127,19 @@ public final ClusterStatus deleteClusterWithStatus(ProvisionerContext context, C
// Status details is specific to dataproc jobs, so it was not added to RuntimeJobDetail spi.
String statusDetails = ((DataprocRuntimeJobDetail) jobDetail).getJobStatusDetails();
if (statusDetails != null) {
ProgramRunFailureException e = new ProgramRunFailureException(
String.format("Dataproc job '%s' status details: %s",
((DataprocRuntimeJobDetail) jobDetail).getJobId(), statusDetails));
LOG.error("Dataproc Job {}", jobDetail.getStatus(), e);
// Check if the failure is due to attempting to cancel a job already DONE
if (jobDetail.getStatus() == RuntimeJobStatus.FAILED && statusDetails.contains("is not supported in the current state: DONE")) {
Copy link
Contributor

@sahusanket sahusanket Nov 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This exception of dataproc seems to be covered under FAILED_PRECONDITION

and we are already handling it DataprocRuntimeJobManager.java#L923

So, this check might not work.

We are assuming Failure for all conditions of FAILED_PRECONDITION , may be we can have a specific check there.

LOG.warn("Attempted to cancel Dataproc job {} which was already DONE. This is not a pipeline failure. Continuing with deprovisioning.", ((DataprocRuntimeJobDetail) jobDetail).getJobId());
} else {
ProgramRunFailureException e = new ProgramRunFailureException(
String.format("Dataproc job '%s' status details: %s",
((DataprocRuntimeJobDetail) jobDetail).getJobId(), statusDetails));
LOG.error("Dataproc Job {} failed with error: {}", jobDetail.getStatus(), statusDetails, e);
// Rethrow only if it's not the specific Cancel-on-DONE issue
// We need to be careful here, as this method is expected to return ClusterStatus.DELETING
// and an exception here might halt the deprovisioning process in the caller.
// For now, let's just log and not throw for the specific case.
}
}
}
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -494,4 +494,64 @@ public void testCommonDataprocLabelsInvalidLabel() {
Map<String, String> labels = provisioner.getCommonDataprocLabels(context);
Assert.assertNull(labels.get("email"));
}

@Test
public void testDeleteClusterWithStatus_CancelOnDoneJob() throws Exception {
context.addProperty("accountKey", "testKey");
context.addProperty(DataprocConf.PROJECT_ID_KEY, "testProject");
context.addProperty("region", "testRegion");
context.setErrorCategory(new ErrorCategory(ErrorCategoryEnum.DEPROVISIONING));

DataprocRuntimeJobManager jobManager = Mockito.mock(DataprocRuntimeJobManager.class);
DataprocRuntimeJobDetail jobDetail = Mockito.mock(DataprocRuntimeJobDetail.class);

Mockito.when(provisioner.getRuntimeJobManager(context)).thenReturn(Optional.of(jobManager));
Mockito.when(jobManager.getDetail(context.getProgramRunInfo())).thenReturn(Optional.of(jobDetail));
Mockito.when(jobDetail.getStatus()).thenReturn(RuntimeJobStatus.FAILED);
Mockito.when(jobDetail.getJobId()).thenReturn("test-job-id");
String cancelErrorMessage = "Cancellation for task: Task(ofr-2hc-battery-management-dev/test-cluster-uuid/job-test-job-id) is not supported in the current state: DONE.";
Mockito.when(jobDetail.getJobStatusDetails()).thenReturn(cancelErrorMessage);

Mockito.when(cluster.getName()).thenReturn("testClusterName");
DataprocConf conf = DataprocConf.create(provisioner.createContextProperties(context));

// Simulate doDeleteCluster to avoid actual deletion call
Mockito.doNothing().when(provisioner).doDeleteCluster(context, cluster, conf);

ClusterStatus status = provisioner.deleteClusterWithStatus(context, cluster);

Assert.assertEquals(ClusterStatus.DELETING, status);
// N.B. We are not asserting on logs here, just that no exception is thrown
// and the status is DELETING.
}

@Test
public void testDeleteClusterWithStatus_OtherJobFailure() throws Exception {
context.addProperty("accountKey", "testKey");
context.addProperty(DataprocConf.PROJECT_ID_KEY, "testProject");
context.addProperty("region", "testRegion");
context.setErrorCategory(new ErrorCategory(ErrorCategoryEnum.DEPROVISIONING));

DataprocRuntimeJobManager jobManager = Mockito.mock(DataprocRuntimeJobManager.class);
DataprocRuntimeJobDetail jobDetail = Mockito.mock(DataprocRuntimeJobDetail.class);

Mockito.when(provisioner.getRuntimeJobManager(context)).thenReturn(Optional.of(jobManager));
Mockito.when(jobManager.getDetail(context.getProgramRunInfo())).thenReturn(Optional.of(jobDetail));
Mockito.when(jobDetail.getStatus()).thenReturn(RuntimeJobStatus.FAILED);
Mockito.when(jobDetail.getJobId()).thenReturn("test-job-id");
String otherErrorMessage = "Some other fatal error";
Mockito.when(jobDetail.getJobStatusDetails()).thenReturn(otherErrorMessage);

Mockito.when(cluster.getName()).thenReturn("testClusterName");
DataprocConf conf = DataprocConf.create(provisioner.createContextProperties(context));

// Simulate doDeleteCluster to avoid actual deletion call
Mockito.doNothing().when(provisioner).doDeleteCluster(context, cluster, conf);

ClusterStatus status = provisioner.deleteClusterWithStatus(context, cluster);
Assert.assertEquals(ClusterStatus.DELETING, status);
// In this case, the error is not the specific one we are suppressing, so the LOG.error in the original code would be called.
// We are not re-throwing, so the status remains DELETING.
}
}