Skip to content

Commit 0ee463d

Browse files
marinov-codeVENTSISLAV MARINOV
andauthored
Replacing CompletableFuture.supplyAsync() with fixed thread pool executor. CompletableFuture.supplyAsync() uses the common ForkJoinPool, which may not have enough threads. (#7908)
* Those tests have a race condition: it assumes all tasks start before await() times out — which is not guaranteed. * Replacing CompletableFuture.supplyAsync() with fixed thread pool executor. CompletableFuture.supplyAsync() uses the common ForkJoinPool, which may not have enough threads. * Replacing CompletableFuture.supplyAsync() with fixed thread pool executor. CompletableFuture.supplyAsync() uses the common ForkJoinPool, which may not have enough threads. --------- Co-authored-by: VENTSISLAV MARINOV <ventsislav.marinov@sas.com>
1 parent c1ba95c commit 0ee463d

1 file changed

Lines changed: 40 additions & 34 deletions

File tree

geode-wan/src/test/java/org/apache/geode/cache/wan/internal/WanCopyRegionFunctionServiceTest.java

Lines changed: 40 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
import java.util.concurrent.Callable;
2323
import java.util.concurrent.CompletableFuture;
2424
import java.util.concurrent.CountDownLatch;
25+
import java.util.concurrent.ExecutorService;
26+
import java.util.concurrent.Executors;
2527
import java.util.concurrent.TimeUnit;
2628
import java.util.concurrent.atomic.AtomicInteger;
2729

@@ -115,64 +117,66 @@ public void cancelNotRunningExecutionReturnsError() {
115117

116118
@Test
117119
public void cancelAllExecutionsWithRunningExecutionsReturnsCanceledExecutions() {
118-
CountDownLatch latch = new CountDownLatch(2);
120+
int executions = 2;
121+
CountDownLatch latch = new CountDownLatch(executions);
122+
ExecutorService executorService = Executors.newFixedThreadPool(executions);
119123
Callable<CliFunctionResult> firstExecution = () -> {
120124
latch.await(GeodeAwaitility.getTimeout().getSeconds(), TimeUnit.SECONDS);
121125
return null;
122126
};
123127

124-
CompletableFuture
125-
.supplyAsync(() -> {
126-
try {
127-
return service.execute(firstExecution, "myRegion", "mySender1");
128-
} catch (Exception e) {
129-
return null;
130-
}
131-
});
128+
executorService.submit(() -> {
129+
try {
130+
return service.execute(firstExecution, "myRegion", "mySender1");
131+
} catch (Exception e) {
132+
return null;
133+
}
134+
});
132135

133136
Callable<CliFunctionResult> secondExecution = () -> {
134137
latch.await(GeodeAwaitility.getTimeout().getSeconds(), TimeUnit.SECONDS);
135138
return null;
136139
};
137140

138-
CompletableFuture
139-
.supplyAsync(() -> {
140-
try {
141-
return service.execute(secondExecution, "myRegion", "mySender");
142-
} catch (Exception e) {
143-
return null;
144-
}
145-
});
141+
executorService.submit(() -> {
142+
try {
143+
return service.execute(secondExecution, "myRegion", "mySender");
144+
} catch (Exception e) {
145+
return null;
146+
}
147+
});
146148

147149
// Wait for the functions to start execution
148-
await().untilAsserted(() -> assertThat(service.getNumberOfCurrentExecutions()).isEqualTo(2));
150+
await().untilAsserted(
151+
() -> assertThat(service.getNumberOfCurrentExecutions()).isEqualTo(executions));
149152

150153
// Cancel the function execution
151154
String executionsString = service.cancelAll();
152155

153156
assertThat(executionsString).isEqualTo("[(myRegion,mySender1), (myRegion,mySender)]");
154157
await().untilAsserted(() -> assertThat(service.getNumberOfCurrentExecutions()).isEqualTo(0));
158+
executorService.shutdown();
155159
}
156160

157161
@Test
158162
public void severalExecuteWithDifferentRegionOrSenderAreAllowed() {
159163
int executions = 5;
160164
CountDownLatch latch = new CountDownLatch(executions);
165+
ExecutorService executorService = Executors.newFixedThreadPool(executions);
161166
for (int i = 0; i < executions; i++) {
162167
Callable<CliFunctionResult> execution = () -> {
163168
latch.await(GeodeAwaitility.getTimeout().getSeconds(), TimeUnit.SECONDS);
164169
return null;
165170
};
166171

167172
final String regionName = String.valueOf(i);
168-
CompletableFuture
169-
.supplyAsync(() -> {
170-
try {
171-
return service.execute(execution, regionName, "mySender1");
172-
} catch (Exception e) {
173-
return null;
174-
}
175-
});
173+
executorService.submit(() -> {
174+
try {
175+
return service.execute(execution, regionName, "mySender1");
176+
} catch (Exception e) {
177+
return null;
178+
}
179+
});
176180
}
177181

178182
// Wait for the functions to start execution
@@ -183,6 +187,7 @@ public void severalExecuteWithDifferentRegionOrSenderAreAllowed() {
183187
for (int i = 0; i < executions; i++) {
184188
latch.countDown();
185189
}
190+
executorService.shutdown();
186191
}
187192

188193
@Test
@@ -193,6 +198,7 @@ public void concurrentExecutionsDoesNotExceedMaxConcurrentExecutions() {
193198
int executions = 4;
194199
CountDownLatch latch = new CountDownLatch(executions);
195200
AtomicInteger concurrentExecutions = new AtomicInteger(0);
201+
ExecutorService executorService = Executors.newFixedThreadPool(executions);
196202
for (int i = 0; i < executions; i++) {
197203
Callable<CliFunctionResult> execution = () -> {
198204
concurrentExecutions.incrementAndGet();
@@ -202,14 +208,13 @@ public void concurrentExecutionsDoesNotExceedMaxConcurrentExecutions() {
202208
};
203209

204210
final String regionName = String.valueOf(i);
205-
CompletableFuture
206-
.supplyAsync(() -> {
207-
try {
208-
return service.execute(execution, regionName, "mySender1");
209-
} catch (Exception e) {
210-
return null;
211-
}
212-
});
211+
executorService.submit(() -> {
212+
try {
213+
return service.execute(execution, regionName, "mySender1");
214+
} catch (Exception e) {
215+
return null;
216+
}
217+
});
213218
}
214219

215220
// Wait for the functions to start execution
@@ -225,6 +230,7 @@ public void concurrentExecutionsDoesNotExceedMaxConcurrentExecutions() {
225230
}
226231

227232
await().untilAsserted(() -> assertThat(concurrentExecutions.get()).isEqualTo(0));
233+
executorService.shutdown();
228234
}
229235

230236
}

0 commit comments

Comments
 (0)