Skip to content

Commit 22a3f46

Browse files
committed
IGNITE-27947 Reconciled ClientSql and ClientTable transaction error handling logic. (For the most part anyway)
** The code is still very hard, its just a little less duplicated.
1 parent 6c2bf2f commit 22a3f46

File tree

3 files changed

+67
-64
lines changed

3 files changed

+67
-64
lines changed

modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientSql.java

Lines changed: 5 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -377,46 +377,19 @@ public <T> CompletableFuture<AsyncResultSet<T>> executeAsyncInternal(
377377
false
378378
).handle((BiFunction<AsyncResultSet<T>, Throwable, CompletableFuture<AsyncResultSet<T>>>) (r, err) -> {
379379
if (err != null) {
380-
if (tx != null && shouldRecordTransactionFailure(err)) {
381-
tx.recordOperationFailure(err);
380+
if (DirectTxUtils.tryHandleErrorOnFirstRequest(ctx, ch, -1)) {
381+
return failedFuture(err);
382382
}
383383

384-
// We should reconcile this code with ClientTable. Should be the same.
385-
if (ctx.firstReqFut != null) {
386-
// Create failed transaction.
387-
long id = -1;
388-
ClientTransaction failed = new ClientTransaction(ctx.channel, ch, id, ctx.readOnly, null,
389-
ctx.pm, null, ch.observableTimestamp(), 0);
390-
failed.fail();
391-
ctx.firstReqFut.complete(failed);
392-
// Txn was not started, rollback is not required.
393-
return failedFuture(err);
384+
if (tx != null && shouldRecordTransactionFailure(err)) {
385+
tx.recordOperationFailure(err);
394386
}
395387

396388
if (tx == null || !shouldTrackOperation) {
397389
return failedFuture(err);
398390
}
399391

400-
if (ctx.enlistmentToken != null) {
401-
// In case of direct mapping error need to rollback the tx on coordinator.
402-
return tx.rollbackAsync().handle((ignored, err0) -> {
403-
if (err0 != null) {
404-
err.addSuppressed(err0);
405-
}
406-
407-
sneakyThrow(err);
408-
return null;
409-
});
410-
} else {
411-
return tx.rollbackAndDiscardDirectMappings(false).handle((ignored, err0) -> {
412-
if (err0 != null) {
413-
err.addSuppressed(err0);
414-
}
415-
416-
sneakyThrow(err);
417-
return null;
418-
});
419-
}
392+
return DirectTxUtils.handleErrorOnOtherRequests(ctx, tx, err);
420393
}
421394

422395
return completedFuture(r);

modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTable.java

Lines changed: 9 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -507,13 +507,7 @@ private <T> CompletableFuture<T> doSchemaOutInOpAsync(
507507
if (ex != null) {
508508
Throwable cause = ex;
509509

510-
if (ctx.firstReqFut != null) {
511-
// Create failed transaction.
512-
ClientTransaction failed = new ClientTransaction(ctx.channel, ch, id, ctx.readOnly, null,
513-
ctx.pm, null, ch.observableTimestamp(), 0);
514-
failed.fail();
515-
ctx.firstReqFut.complete(failed);
516-
// Txn was not started, rollback is not required.
510+
if (DirectTxUtils.tryHandleErrorOnFirstRequest(ctx, ch, id)) {
517511
fut.completeExceptionally(unwrapCause(ex));
518512
return null;
519513
}
@@ -546,37 +540,20 @@ private <T> CompletableFuture<T> doSchemaOutInOpAsync(
546540

547541
cause = cause.getCause();
548542
}
543+
}
549544

550-
if (tx0 == null) {
551-
fut.completeExceptionally(ex);
552-
} else {
553-
tx0.rollbackAndDiscardDirectMappings(false).handle((ignored, err0) -> {
554-
if (err0 != null) {
555-
ex.addSuppressed(err0);
556-
}
557-
558-
fut.completeExceptionally(ex);
559-
560-
return (T) null;
561-
});
562-
}
545+
if (tx0 == null) {
546+
fut.completeExceptionally(ex);
563547
} else {
564-
// In case of direct mapping error we need to rollback the tx on coordinator.
565-
tx0.rollbackAsync().handle((ignored, err0) -> {
566-
if (err0 != null) {
567-
ex.addSuppressed(err0);
568-
}
569-
570-
fut.completeExceptionally(ex);
571-
572-
return (T) null;
573-
});
548+
DirectTxUtils.handleErrorOnOtherRequests(ctx, tx0, ex)
549+
.whenComplete((ignored, err0) -> fut.completeExceptionally(err0));
574550
}
551+
552+
return null;
575553
} else {
576554
fut.complete(ret);
555+
return null;
577556
}
578-
579-
return null;
580557
});
581558
});
582559
}).exceptionally(ex -> {

modules/client/src/main/java/org/apache/ignite/internal/client/tx/DirectTxUtils.java

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import static org.apache.ignite.internal.client.proto.tx.ClientTxUtils.TX_ID_DIRECT;
2424
import static org.apache.ignite.internal.client.proto.tx.ClientTxUtils.TX_ID_FIRST_DIRECT;
2525
import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
26+
import static org.apache.ignite.internal.util.ExceptionUtils.sneakyThrow;
2627
import static org.apache.ignite.lang.ErrorGroups.Client.CONNECTION_ERR;
2728
import static org.apache.ignite.lang.ErrorGroups.Common.INTERNAL_ERR;
2829

@@ -306,6 +307,58 @@ public static PayloadWriter payloadWriter(WriteContext ctx, @Nullable Transactio
306307
}
307308
}
308309

310+
/**
311+
* Try to handle error on first request. Returns false if not in the context of the first request.
312+
* This method essentially populates context with a failed request instance.
313+
*
314+
* @param ctx The {@link WriteContext} that holds transactional context information.
315+
* @param ch The {@link ReliableChannel} used to resolve the actual communication channel.
316+
* @param id Client Table Id.
317+
* @return Whether the error was handled or not.
318+
*/
319+
public static boolean tryHandleErrorOnFirstRequest(WriteContext ctx, ReliableChannel ch, long id) {
320+
if (ctx.firstReqFut == null) {
321+
return false;
322+
}
323+
324+
// Create failed transaction.
325+
ClientTransaction failed = new ClientTransaction(ctx.channel, ch, id, ctx.readOnly, null,
326+
ctx.pm, null, ch.observableTimestamp(), 0);
327+
failed.fail();
328+
ctx.firstReqFut.complete(failed);
329+
// Txn was not started, rollback is not required.
330+
return true;
331+
}
332+
333+
/**
334+
* Handles errors after the first request.
335+
* Essentially call the rollback on the transaction and appends any errors to the original error.
336+
*
337+
* @param ctx The {@link WriteContext} that holds transactional context information.
338+
* @param tx The client transaction.
339+
* @param err The error to be handled.
340+
* @return A completable future what always fails with the original error plus any suppressed errors.
341+
* @param <T> type of the expected future.
342+
*/
343+
public static <T> CompletableFuture<T> handleErrorOnOtherRequests(WriteContext ctx, ClientTransaction tx, Throwable err) {
344+
CompletableFuture<Void> rollback;
345+
if (ctx.enlistmentToken != null) {
346+
// In case of direct mapping error need to rollback the tx on coordinator.
347+
rollback = tx.rollbackAsync();
348+
} else {
349+
rollback = tx.rollbackAndDiscardDirectMappings(false);
350+
}
351+
352+
return rollback.handle((ignored, err0) -> {
353+
if (err0 != null) {
354+
err.addSuppressed(err0);
355+
}
356+
357+
sneakyThrow(err);
358+
return null;
359+
});
360+
}
361+
309362
private static CompletableFuture<ClientChannel> resolveChannelInner(
310363
WriteContext ctx,
311364
ReliableChannel ch,

0 commit comments

Comments
 (0)