Skip to content

Commit ce15f41

Browse files
authored
IGNITE-28344 Don't cancel job future when canceling the job (#7861)
1 parent 8c10829 commit ce15f41

File tree

14 files changed

+394
-112
lines changed

14 files changed

+394
-112
lines changed

modules/client/src/integrationTest/java/org/apache/ignite/internal/client/ItThinClientComputeTest.java

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919

2020
import static java.util.concurrent.CompletableFuture.completedFuture;
2121
import static java.util.concurrent.CompletableFuture.failedFuture;
22-
import static org.apache.ignite.compute.JobStatus.CANCELED;
2322
import static org.apache.ignite.compute.JobStatus.COMPLETED;
2423
import static org.apache.ignite.compute.JobStatus.EXECUTING;
2524
import static org.apache.ignite.compute.JobStatus.FAILED;
@@ -85,6 +84,7 @@
8584
import org.apache.ignite.compute.JobDescriptor;
8685
import org.apache.ignite.compute.JobExecution;
8786
import org.apache.ignite.compute.JobExecutionContext;
87+
import org.apache.ignite.compute.JobStatus;
8888
import org.apache.ignite.compute.JobTarget;
8989
import org.apache.ignite.compute.TaskDescriptor;
9090
import org.apache.ignite.compute.TaskStatus;
@@ -295,8 +295,11 @@ void testCancelOnSpecificNodeAsync(boolean asyncJob) {
295295

296296
assertThat(cancelHandle.cancelAsync(), willCompleteSuccessfully());
297297

298-
await().until(execution1::stateAsync, willBe(jobStateWithStatus(CANCELED)));
299-
await().until(execution2::stateAsync, willBe(jobStateWithStatus(CANCELED)));
298+
// Async job completes normally after cooperative cancellation (returns from isCancelled() check) — COMPLETED.
299+
// Sync job throws RuntimeException on thread interruption — FAILED (not CancellationException).
300+
JobStatus expectedStatus = asyncJob ? COMPLETED : FAILED;
301+
await().until(execution1::stateAsync, willBe(jobStateWithStatus(expectedStatus)));
302+
await().until(execution2::stateAsync, willBe(jobStateWithStatus(expectedStatus)));
300303
}
301304

302305
@Test
@@ -329,7 +332,8 @@ void changeJobPriority() {
329332

330333
// Cancel task 1, task 3 should start executing
331334
assertThat(cancelHandle1.cancelAsync(), willCompleteSuccessfully());
332-
await().until(execution1::stateAsync, willBe(jobStateWithStatus(CANCELED)));
335+
// SleepJob throws RuntimeException, not CancellationException.
336+
await().until(execution1::stateAsync, willBe(jobStateWithStatus(FAILED)));
333337
await().until(execution3::stateAsync, willBe(jobStateWithStatus(EXECUTING)));
334338

335339
// Task 2 is still queued
@@ -408,9 +412,10 @@ void testCancelBroadcastAllNodes() {
408412

409413
cancelHandle.cancel();
410414

415+
// SleepJob throws RuntimeException on interrupt, not CancellationException.
411416
await().until(() -> executions, contains(
412-
jobExecutionWithStatus(CANCELED),
413-
jobExecutionWithStatus(CANCELED)
417+
jobExecutionWithStatus(FAILED),
418+
jobExecutionWithStatus(FAILED)
414419
));
415420

416421
assertThat(broadcastExecution.resultsAsync(), willThrow(ComputeException.class));
@@ -725,7 +730,7 @@ void testCancelColocatedTuple(int key, int port) {
725730

726731
assertThat(cancelHandle.cancelAsync(), willCompleteSuccessfully());
727732

728-
await().until(tupleExecution::stateAsync, willBe(jobStateWithStatus(CANCELED)));
733+
await().until(tupleExecution::stateAsync, willBe(jobStateWithStatus(FAILED)));
729734
}
730735

731736
@ParameterizedTest
@@ -747,7 +752,7 @@ void testCancelColocatedPojo(int key, int port) {
747752

748753
assertThat(cancelHandle.cancelAsync(), willCompleteSuccessfully());
749754

750-
await().until(pojoExecution::stateAsync, willBe(jobStateWithStatus(CANCELED)));
755+
await().until(pojoExecution::stateAsync, willBe(jobStateWithStatus(FAILED)));
751756
}
752757

753758
@Test

modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeBaseTest.java

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -679,7 +679,7 @@ void cancelsJob(boolean local) {
679679
// RuntimeException is thrown when SleepJob catches the InterruptedException
680680
assertThat(runtimeException.toString(), containsString(InterruptedException.class.getName()));
681681

682-
await().until(execution::stateAsync, willBe(jobStateWithStatus(CANCELED)));
682+
await().until(execution::stateAsync, willBe(jobStateWithStatus(FAILED)));
683683
}
684684

685685
@ParameterizedTest
@@ -729,7 +729,31 @@ void cancelsQueuedJob(boolean local) {
729729

730730
// Cancel running task
731731
assertThat(cancelHandle1.cancelAsync(), willCompleteSuccessfully());
732-
await().until(execution1::stateAsync, willBe(jobStateWithStatus(CANCELED)));
732+
await().until(execution1::stateAsync, willBe(jobStateWithStatus(FAILED)));
733+
}
734+
735+
@ParameterizedTest(name = "local: {0}")
736+
@ValueSource(booleans = {true, false})
737+
void asyncJobCompletesNormallyAfterCooperativeCancellation(boolean local) {
738+
Ignite executeNode = local ? node(0) : node(1);
739+
740+
CancelHandle cancelHandle = CancelHandle.create();
741+
742+
JobExecution<String> execution = submit(
743+
JobTarget.node(clusterNode(executeNode)),
744+
asyncDelayedCompleteJob(),
745+
cancelHandle.token(),
746+
null
747+
);
748+
749+
await().until(execution::stateAsync, willBe(jobStateWithStatus(EXECUTING)));
750+
751+
cancelHandle.cancel();
752+
753+
// The async job detects cancellation via isCancelled(), does cleanup, then completes with a result.
754+
// Cooperative cancellation should honor the result — status must be COMPLETED, not CANCELED.
755+
assertThat(execution.resultAsync(), willBe(is("completed-after-cancel")));
756+
await().until(execution::stateAsync, willBe(jobStateWithStatus(COMPLETED)));
733757
}
734758

735759
@ParameterizedTest
@@ -963,6 +987,10 @@ private TaskDescriptor<Void, Void> infiniteMapReduceTask() {
963987
return TaskDescriptor.<Void, Void>builder(jobClassName("InfiniteMapReduceTask")).units(units()).build();
964988
}
965989

990+
private JobDescriptor<Void, String> asyncDelayedCompleteJob() {
991+
return JobDescriptor.<Void, String>builder(jobClassName("AsyncDelayedCompleteJob")).units(units()).build();
992+
}
993+
966994
private JobDescriptor<Tuple, Integer> tupleJob() {
967995
return JobDescriptor.<Tuple, Integer>builder(jobClassName("TupleJob")).units(units()).build();
968996
}

modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItFailoverCandidateNotFoundTest.java

Lines changed: 22 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.ignite.internal.compute;
1919

2020
import static org.apache.ignite.internal.compute.events.ComputeEventMetadata.Type.SINGLE;
21+
import static org.apache.ignite.internal.eventlog.api.IgniteEventType.COMPUTE_JOB_CANCELED;
2122
import static org.apache.ignite.internal.eventlog.api.IgniteEventType.COMPUTE_JOB_EXECUTING;
2223
import static org.apache.ignite.internal.eventlog.api.IgniteEventType.COMPUTE_JOB_FAILED;
2324
import static org.apache.ignite.internal.eventlog.api.IgniteEventType.COMPUTE_JOB_QUEUED;
@@ -134,14 +135,17 @@ void failoverCandidateLeavesCluster() throws Exception {
134135

135136
String jobClassName = InteractiveJobs.globalJob().name();
136137

137-
// When node is shut down gracefully, the job execution is interrupted and event could be logged anyway
138-
// So there would be 2 events from a worker node, 1 failed events from a worker node and 1 failed event from the coordinator
139-
await().until(logInspector::events, contains(
138+
// When node is shut down gracefully, the job execution is interrupted and CancellationException is thrown.
139+
// There would be 2 events from a worker node (QUEUED, EXECUTING), then a CANCELED event from the worker node
140+
// and a FAILED event from the coordinator. The order of the last two events is not determined.
141+
await().until(logInspector::events, hasSize(4));
142+
143+
assertThat(logInspector.events(), containsInRelativeOrder(
140144
jobEvent(COMPUTE_JOB_QUEUED, jobId, jobClassName, workerNodeName),
141-
jobEvent(COMPUTE_JOB_EXECUTING, jobId, jobClassName, workerNodeName),
142-
jobEvent(COMPUTE_JOB_FAILED, jobId, jobClassName, workerNodeName),
143-
jobEvent(COMPUTE_JOB_FAILED, jobId, jobClassName, workerNodeName)
145+
jobEvent(COMPUTE_JOB_EXECUTING, jobId, jobClassName, workerNodeName)
144146
));
147+
assertThat(logInspector.events(), hasItem(jobEvent(COMPUTE_JOB_CANCELED, jobId, jobClassName, workerNodeName)));
148+
assertThat(logInspector.events(), hasItem(jobEvent(COMPUTE_JOB_FAILED, jobId, jobClassName, workerNodeName)));
145149
}
146150

147151
@Test
@@ -177,9 +181,9 @@ void failoverAndThenStopWorker() throws Exception {
177181
// Then the job is failed, because there are no more failover workers.
178182
execution.assertFailed();
179183

180-
// When node is shut down gracefully, the job execution is interrupted and event could be logged anyway
181-
// So there would be 4 events from worker nodes, 2 failed events from both worker nodes and 1 failed event from the coordinator
182-
// The order of failed events is not determined
184+
// Each worker node logs QUEUED, EXECUTING, CANCELED (interrupted on shutdown → CancellationException).
185+
// The coordinator logs FAILED (no more failover candidates).
186+
// The order of the last 3 terminal events is not determined.
183187
await().until(logInspector::events, hasSize(7));
184188

185189
String jobClassName = InteractiveJobs.globalJob().name();
@@ -189,12 +193,18 @@ void failoverAndThenStopWorker() throws Exception {
189193
jobEvent(COMPUTE_JOB_EXECUTING, jobId, jobClassName, workerNodeName),
190194
jobEvent(COMPUTE_JOB_QUEUED, jobId, jobClassName, failoverWorker),
191195
jobEvent(COMPUTE_JOB_EXECUTING, jobId, jobClassName, failoverWorker),
192-
jobEvent(COMPUTE_JOB_FAILED, jobId, jobClassName, failoverWorker)
196+
// Interrupted on shutdown → CancellationException → CANCELED
197+
jobEvent(COMPUTE_JOB_CANCELED, jobId, jobClassName, failoverWorker)
193198
));
194199

195-
// Failed event from second worker node
200+
// First worker was also interrupted on shutdown → CANCELED
196201
assertThat(logInspector.events(), hasItem(
197-
jobEvent(COMPUTE_JOB_FAILED, jobId, jobClassName, workerNodeName)
202+
jobEvent(COMPUTE_JOB_CANCELED, jobId, jobClassName, workerNodeName)
203+
));
204+
205+
// Coordinator's failover gives up (no more candidates) → FAILED, targetNode is the last worker attempted
206+
assertThat(logInspector.events(), hasItem(
207+
jobEvent(COMPUTE_JOB_FAILED, jobId, jobClassName, failoverWorker)
198208
));
199209
}
200210

modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/utils/InteractiveJobs.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import java.util.List;
2727
import java.util.Map;
2828
import java.util.concurrent.BlockingQueue;
29+
import java.util.concurrent.CancellationException;
2930
import java.util.concurrent.CompletableFuture;
3031
import java.util.concurrent.ConcurrentHashMap;
3132
import java.util.concurrent.LinkedBlockingQueue;
@@ -172,7 +173,7 @@ private static Signal listenSignal() {
172173
try {
173174
return GLOBAL_SIGNALS.take();
174175
} catch (InterruptedException e) {
175-
throw new RuntimeException(e);
176+
throw new CancellationException();
176177
}
177178
}
178179

@@ -233,7 +234,7 @@ private static Signal listenSignal(BlockingQueue<Signal> channel) {
233234
try {
234235
return channel.take();
235236
} catch (InterruptedException e) {
236-
throw new RuntimeException(e);
237+
throw new CancellationException();
237238
}
238239
}
239240

modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/utils/InteractiveTasks.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import java.util.Set;
3030
import java.util.UUID;
3131
import java.util.concurrent.BlockingQueue;
32+
import java.util.concurrent.CancellationException;
3233
import java.util.concurrent.CompletableFuture;
3334
import java.util.concurrent.LinkedBlockingQueue;
3435
import java.util.concurrent.TimeUnit;
@@ -166,7 +167,7 @@ private Signal listenSignal() {
166167
return GLOBAL_SIGNALS.take();
167168
} catch (InterruptedException e) {
168169
if (throwExceptionOnInterruption) {
169-
throw new RuntimeException(e);
170+
throw new CancellationException();
170171
} else {
171172
Thread.currentThread().interrupt();
172173
return Signal.CHECK_CANCEL;
@@ -201,7 +202,7 @@ public CompletableFuture<List<MapReduceJob<String, String>>> splitAsync(TaskExec
201202
).collect(toList()));
202203
case CHECK_CANCEL:
203204
if (context.isCancelled()) {
204-
throw new RuntimeException("Task is cancelled");
205+
throw new CancellationException("Task is cancelled");
205206
}
206207
break;
207208
default:
@@ -229,7 +230,7 @@ public CompletableFuture<List<String>> reduceAsync(TaskExecutionContext context,
229230
return completedFuture(new ArrayList<>(results.values()));
230231
case CHECK_CANCEL:
231232
if (context.isCancelled()) {
232-
throw new RuntimeException("Task is cancelled");
233+
throw new CancellationException("Task is cancelled");
233234
}
234235
break;
235236
default:
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.example.jobs.embedded;
19+
20+
import java.util.concurrent.CompletableFuture;
21+
import org.apache.ignite.compute.ComputeJob;
22+
import org.apache.ignite.compute.JobExecutionContext;
23+
24+
/**
25+
* Async compute job that returns a future immediately. A background thread polls
26+
* {@code isCancelled()}, then completes the future with a result after a brief delay.
27+
* This tests that cooperative cancellation of async jobs is honored.
28+
*/
29+
public class AsyncDelayedCompleteJob implements ComputeJob<Void, String> {
30+
@Override
31+
public CompletableFuture<String> executeAsync(JobExecutionContext context, Void arg) {
32+
CompletableFuture<String> result = new CompletableFuture<>();
33+
34+
Thread thread = new Thread(() -> {
35+
while (!context.isCancelled()) {
36+
try {
37+
Thread.sleep(50);
38+
} catch (InterruptedException e) {
39+
break;
40+
}
41+
}
42+
43+
// Simulate cleanup after cancellation
44+
try {
45+
Thread.sleep(500);
46+
} catch (InterruptedException ignored) {
47+
// ignored
48+
}
49+
50+
result.complete("completed-after-cancel");
51+
});
52+
thread.setDaemon(true);
53+
thread.start();
54+
55+
return result;
56+
}
57+
}
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.example.jobs.standalone;
19+
20+
import java.util.concurrent.CompletableFuture;
21+
import org.apache.ignite.compute.ComputeJob;
22+
import org.apache.ignite.compute.JobExecutionContext;
23+
24+
/**
25+
* Async compute job that returns a future immediately. A background thread polls
26+
* {@code isCancelled()}, then completes the future with a result after a brief delay.
27+
* This tests that cooperative cancellation of async jobs is honored.
28+
*/
29+
public class AsyncDelayedCompleteJob implements ComputeJob<Void, String> {
30+
@Override
31+
public CompletableFuture<String> executeAsync(JobExecutionContext context, Void arg) {
32+
CompletableFuture<String> result = new CompletableFuture<>();
33+
34+
Thread thread = new Thread(() -> {
35+
while (!context.isCancelled()) {
36+
try {
37+
Thread.sleep(50);
38+
} catch (InterruptedException e) {
39+
break;
40+
}
41+
}
42+
43+
// Simulate cleanup after cancellation
44+
try {
45+
Thread.sleep(500);
46+
} catch (InterruptedException ignored) {
47+
// ignored
48+
}
49+
50+
result.complete("completed-after-cancel");
51+
});
52+
thread.setDaemon(true);
53+
thread.start();
54+
55+
return result;
56+
}
57+
}

0 commit comments

Comments
 (0)