@@ -414,7 +414,9 @@ private CompletableFuture<ClientWriteResponse> writeNonTransaction(
414414
415415 var options = writeOptions != null
416416 ? writeOptions
417- : new ClientWriteOptions ().transactionChunkSize (DEFAULT_MAX_METHOD_PARALLEL_REQS );
417+ : new ClientWriteOptions ()
418+ .transactionChunkSize (1 )
419+ .maxParallelRequests (DEFAULT_MAX_METHOD_PARALLEL_REQS );
418420
419421 if (options .getAdditionalHeaders () == null ) {
420422 options .additionalHeaders (new HashMap <>());
@@ -434,19 +436,50 @@ private CompletableFuture<ClientWriteResponse> writeNonTransaction(
434436 return this .writeTransactions (storeId , emptyTransaction , writeOptions );
435437 }
436438
437- var futureResponse = this .writeTransactions (storeId , transactions .get (0 ), options );
438-
439- for (int i = 1 ; i < transactions .size (); i ++) {
440- final int index = i ; // Must be final in this scope for closure.
439+ int maxParallelRequests = options .getMaxParallelRequests () != null
440+ ? options .getMaxParallelRequests ()
441+ : DEFAULT_MAX_METHOD_PARALLEL_REQS ;
441442
442- // The resulting completable future of this chain will result in either:
443- // 1. The first exception thrown in a failed completion. Other thenCompose() will not be evaluated.
444- // 2. The final successful ClientWriteResponse.
445- futureResponse = futureResponse .thenCompose (
446- _response -> this .writeTransactions (storeId , transactions .get (index ), options ));
443+ if (maxParallelRequests <= 1 ) {
444+ var futureResponse = this .writeTransactions (storeId , transactions .get (0 ), options );
445+ for (int i = 1 ; i < transactions .size (); i ++) {
446+ final int index = i ;
447+ futureResponse = futureResponse .thenCompose (
448+ _response -> this .writeTransactions (storeId , transactions .get (index ), options ));
449+ }
450+ return futureResponse ;
447451 }
448452
449- return futureResponse ;
453+ var executor = Executors .newScheduledThreadPool (maxParallelRequests );
454+ var latch = new CountDownLatch (transactions .size ());
455+ var failure = new AtomicReference <Throwable >();
456+ var lastResponse = new AtomicReference <ClientWriteResponse >();
457+
458+ Consumer <ClientWriteRequest > singleWriteRequest =
459+ tx -> this .writeTransactions (storeId , tx , options ).whenComplete ((response , throwable ) -> {
460+ try {
461+ if (throwable != null ) {
462+ failure .compareAndSet (null , throwable );
463+ } else {
464+ lastResponse .set (response );
465+ }
466+ } finally {
467+ latch .countDown ();
468+ }
469+ });
470+
471+ try {
472+ transactions .forEach (tx -> executor .execute (() -> singleWriteRequest .accept (tx )));
473+ latch .await ();
474+ if (failure .get () != null ) {
475+ return CompletableFuture .failedFuture (failure .get ());
476+ }
477+ return CompletableFuture .completedFuture (lastResponse .get ());
478+ } catch (Exception e ) {
479+ return CompletableFuture .failedFuture (e );
480+ } finally {
481+ executor .shutdown ();
482+ }
450483 }
451484
452485 private <T > Stream <List <T >> chunksOf (int chunkSize , List <T > list ) {
0 commit comments