Skip to content

Commit 325b634

Browse files
committed
IGNITE-27947 Apply code-review
** Postpone removal of firstReqToTx resource to after the transaction is commited/rolled back. ** Added test
1 parent cdf9608 commit 325b634

File tree

6 files changed

+257
-9
lines changed

6 files changed

+257
-9
lines changed

modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1225,8 +1225,6 @@ private void processOperationInternal(
12251225
writeError(requestId, opCode, e, ctx, false);
12261226
metrics.requestsFailedIncrement();
12271227
}
1228-
1229-
firstReqToTxResMap.remove(requestId);
12301228
});
12311229
}
12321230

modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTableCommon.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -516,7 +516,11 @@ public static TableNotFoundException tableIdNotFoundException(Integer tableId) {
516516
});
517517

518518
InternalTxOptions txOptions = builder.build();
519-
var tx = startExplicitTx(tsUpdater, txManager, HybridTimestamp.nullableHybridTimestamp(observableTs), readOnly, txOptions);
519+
var tx = new DirectTransactionWithFirstRequest(
520+
startExplicitTx(tsUpdater, txManager, HybridTimestamp.nullableHybridTimestamp(observableTs), readOnly, txOptions),
521+
reqToTxMap,
522+
requestId
523+
);
520524

521525
// Attach resource id only on first direct request.
522526
resourceIdHolder[0] = resources.put(new ClientResource(tx, tx::rollbackAsync));
Lines changed: 190 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,190 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.ignite.client.handler.requests.table;
19+
20+
import java.util.Map;
21+
import java.util.UUID;
22+
import java.util.concurrent.CompletableFuture;
23+
import org.apache.ignite.internal.hlc.HybridTimestamp;
24+
import org.apache.ignite.internal.replicator.ZonePartitionId;
25+
import org.apache.ignite.internal.tx.InternalTransaction;
26+
import org.apache.ignite.internal.tx.PendingTxPartitionEnlistment;
27+
import org.apache.ignite.internal.tx.TxState;
28+
import org.apache.ignite.internal.wrapper.Wrapper;
29+
import org.apache.ignite.tx.TransactionException;
30+
import org.jetbrains.annotations.Nullable;
31+
32+
class DirectTransactionWithFirstRequest implements InternalTransaction, Wrapper {
33+
private final InternalTransaction base;
34+
35+
// We could alos just accept a lambda.
36+
private final Map<Long, Long> reqToTxMap;
37+
38+
private final long firstReqId;
39+
40+
DirectTransactionWithFirstRequest(InternalTransaction base, Map<Long, Long> reqToTxMap, long firstReqId) {
41+
this.base = base;
42+
this.reqToTxMap = reqToTxMap;
43+
this.firstReqId = firstReqId;
44+
}
45+
46+
@Override
47+
public UUID id() {
48+
return base.id();
49+
}
50+
51+
@Override
52+
public PendingTxPartitionEnlistment enlistedPartition(ZonePartitionId replicationGroupId) {
53+
return base.enlistedPartition(replicationGroupId);
54+
}
55+
56+
@Override
57+
public TxState state() {
58+
return base.state();
59+
}
60+
61+
@Override
62+
public boolean assignCommitPartition(ZonePartitionId commitPartitionId) {
63+
return base.assignCommitPartition(commitPartitionId);
64+
}
65+
66+
@Override
67+
public ZonePartitionId commitPartition() {
68+
return base.commitPartition();
69+
}
70+
71+
@Override
72+
public void enlist(ZonePartitionId replicationGroupId, int tableId, String primaryNodeConsistentId, long consistencyToken) {
73+
base.enlist(replicationGroupId, tableId, primaryNodeConsistentId, consistencyToken);
74+
}
75+
76+
@Override
77+
public @Nullable HybridTimestamp readTimestamp() {
78+
return base.readTimestamp();
79+
}
80+
81+
@Override
82+
public HybridTimestamp schemaTimestamp() {
83+
return base.schemaTimestamp();
84+
}
85+
86+
@Override
87+
public UUID coordinatorId() {
88+
return base.coordinatorId();
89+
}
90+
91+
@Override
92+
public boolean implicit() {
93+
return base.implicit();
94+
}
95+
96+
@Override
97+
public boolean remote() {
98+
return base.remote();
99+
}
100+
101+
@Override
102+
public boolean remoteOnCoordinator() {
103+
return base.remoteOnCoordinator();
104+
}
105+
106+
@Override
107+
public CompletableFuture<Void> finish(boolean commit, @Nullable HybridTimestamp executionTimestamp, boolean full,
108+
@Nullable Throwable finishReason) {
109+
throw new UnsupportedOperationException("DirectTransactionWithFirstRequest should not be finished manually, "
110+
+ "implement this method or used commit/rollback.");
111+
}
112+
113+
@Override
114+
public boolean isFinishingOrFinished() {
115+
return base.isFinishingOrFinished();
116+
}
117+
118+
@Override
119+
public long getTimeout() {
120+
return base.getTimeout();
121+
}
122+
123+
@Override
124+
public CompletableFuture<Void> kill() {
125+
throw new UnsupportedOperationException("DirectTransactionWithFirstRequest should not be killed manually, "
126+
+ "implement this method or used commit/rollback.");
127+
}
128+
129+
@Override
130+
public CompletableFuture<Void> rollbackWithExceptionAsync(Throwable throwable) {
131+
throw new UnsupportedOperationException("DirectTransactionWithFirstRequest should not be rollbackWithExceptionAsync manually, "
132+
+ "implement this method or used commit/rollback.");
133+
}
134+
135+
@Override
136+
public boolean isRolledBackWithTimeoutExceeded() {
137+
return base.isRolledBackWithTimeoutExceeded();
138+
}
139+
140+
@Override
141+
public void processDelayedAck(Object val, @Nullable Throwable err) {
142+
base.processDelayedAck(val, err);
143+
}
144+
145+
@Override
146+
public void commit() throws TransactionException {
147+
try {
148+
base.commit();
149+
} finally {
150+
removeMapping();
151+
}
152+
}
153+
154+
@Override
155+
public CompletableFuture<Void> commitAsync() {
156+
return base.commitAsync().whenComplete((v, err) -> removeMapping());
157+
}
158+
159+
@Override
160+
public void rollback() throws TransactionException {
161+
try {
162+
base.rollback();
163+
} finally {
164+
removeMapping();
165+
}
166+
}
167+
168+
@Override
169+
public CompletableFuture<Void> rollbackAsync() {
170+
return base.rollbackAsync().whenComplete((v, err) -> removeMapping());
171+
}
172+
173+
@Override
174+
public boolean isReadOnly() {
175+
return base.isReadOnly();
176+
}
177+
178+
public InternalTransaction base() {
179+
return base;
180+
}
181+
182+
@Override
183+
public <T> T unwrap(Class<T> classToUnwrap) {
184+
return (T) base;
185+
}
186+
187+
private void removeMapping() {
188+
reqToTxMap.remove(firstReqId);
189+
}
190+
}

modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/tx/ClientTransactionCommitRequest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import org.apache.ignite.internal.tx.PendingTxPartitionEnlistment;
3939
import org.apache.ignite.internal.tx.impl.ReadWriteTransactionImpl;
4040
import org.apache.ignite.internal.util.ExceptionUtils;
41+
import org.apache.ignite.internal.wrapper.Wrappers;
4142
import org.apache.ignite.tx.TransactionException;
4243

4344
/**
@@ -89,7 +90,7 @@ public static CompletableFuture<ResponseWriter> process(
8990
// Update causality. Used to assign commit timestamp after all enlistments.
9091
clockService.updateClock(HybridTimestamp.hybridTimestamp(causality));
9192

92-
ReadWriteTransactionImpl tx0 = (ReadWriteTransactionImpl) tx;
93+
ReadWriteTransactionImpl tx0 = Wrappers.unwrap(tx, ReadWriteTransactionImpl.class);
9394

9495
// Enforce cleanup.
9596
tx0.noRemoteWrites(sendRemoteWritesFlag && in.unpackBoolean());

modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/tx/ClientTransactionRollbackRequest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.apache.ignite.internal.table.TableViewInternal;
3131
import org.apache.ignite.internal.tx.InternalTransaction;
3232
import org.apache.ignite.internal.tx.impl.ReadWriteTransactionImpl;
33+
import org.apache.ignite.internal.wrapper.Wrappers;
3334
import org.apache.ignite.lang.ErrorGroups.Client;
3435
import org.apache.ignite.lang.IgniteException;
3536

@@ -76,8 +77,7 @@ public static CompletableFuture<ResponseWriter> process(
7677
}
7778

7879
tx = resources.remove(actualResourceId).get(InternalTransaction.class);
79-
80-
reqToTxMap.remove(reqId);
80+
// Will not remove right away from reqToTxMap, it will be remove automatically on rollback.
8181
} else {
8282
tx = resources.remove(resourceId).get(InternalTransaction.class);
8383

@@ -100,7 +100,7 @@ public static CompletableFuture<ResponseWriter> process(
100100
if (cnt > 0) {
101101
in.unpackLong(); // Unpack causality.
102102

103-
ReadWriteTransactionImpl tx0 = (ReadWriteTransactionImpl) tx;
103+
ReadWriteTransactionImpl tx0 = Wrappers.unwrap(tx, ReadWriteTransactionImpl.class);
104104

105105
// Enforce cleanup.
106106
tx0.noRemoteWrites(sendRemoteWritesFlag && in.unpackBoolean());

modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientTransactionsTest.java

Lines changed: 57 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@
6161
import java.util.stream.Stream;
6262
import org.apache.ignite.Ignite;
6363
import org.apache.ignite.client.IgniteClient;
64+
import org.apache.ignite.client.handler.ClientInboundMessageHandler;
6465
import org.apache.ignite.internal.app.IgniteImpl;
6566
import org.apache.ignite.internal.client.ClientChannel;
6667
import org.apache.ignite.internal.client.ClientTransactionInflights;
@@ -1415,12 +1416,20 @@ static boolean isUsingReverseComparator() {
14151416
@Nested
14161417
class OnConflictDuringFirstRequest {
14171418
class Data {
1419+
final IgniteImpl ignite;
14181420
final List<Tuple> tuples;
14191421
final ClientLazyTransaction tx1;
14201422
final ClientLazyTransaction tx2;
14211423
final CompletableFuture<?> req2Fut;
14221424

1423-
Data(List<Tuple> tuples, ClientLazyTransaction tx1, ClientLazyTransaction tx2, CompletableFuture<?> req2Fut) {
1425+
Data(
1426+
IgniteImpl ignite,
1427+
List<Tuple> tuples,
1428+
ClientLazyTransaction tx1,
1429+
ClientLazyTransaction tx2,
1430+
CompletableFuture<?> req2Fut
1431+
) {
1432+
this.ignite = ignite;
14241433
this.tuples = tuples;
14251434
this.tx1 = tx1;
14261435
this.tx2 = tx2;
@@ -1468,7 +1477,7 @@ Data prepareBlockedTransaction(KillTestContext ctx) throws InterruptedException
14681477
assertThat(fut2.isDone(), is(false));
14691478
IgniteTestUtils.assertThrows(AssertionError.class, () -> ClientTransaction.get(tx2), "Transaction is starting");
14701479

1471-
return new Data(tuples0, tx1, tx2, fut2);
1480+
return new Data(ignite, tuples0, tx1, tx2, fut2);
14721481
}
14731482

14741483
@ParameterizedTest
@@ -1512,6 +1521,52 @@ public void testOperationsBlockWaitingForLock(KillTestContext ctx) throws Interr
15121521
KeyValueView<Tuple, Tuple> kvView = table().keyValueView();
15131522
assertThat(kvView.removeAllAsync(null, test.tuples.subList(0, 3)), willSucceedIn(5, TimeUnit.SECONDS));
15141523
}
1524+
1525+
@ParameterizedTest
1526+
@MethodSource("org.apache.ignite.internal.runner.app.client.ItThinClientTransactionsTest#killTestContextFactory")
1527+
public void testCancelByRequestIdNotAvailable(KillTestContext ctx) throws InterruptedException {
1528+
var test = prepareBlockedTransaction(ctx);
1529+
1530+
// Remove firstReqMapping from the server side.
1531+
{
1532+
Map<Long, Long> firstReqToTxResMap = IgniteTestUtils.getFieldValue(
1533+
test.ignite.clientInboundMessageHandler(),
1534+
ClientInboundMessageHandler.class,
1535+
"firstReqToTxResMap"
1536+
);
1537+
1538+
CompletableFuture<Object> reqInfoFut = IgniteTestUtils.getFieldValue(test.tx2, ClientLazyTransaction.class,
1539+
"requestInfoFuture");
1540+
Object requestInfo = reqInfoFut.join();
1541+
long firstReqId = IgniteTestUtils.getFieldValue(requestInfo, "firstReqId");
1542+
firstReqToTxResMap.remove(firstReqId);
1543+
}
1544+
1545+
// Will block because of the error.
1546+
var rollbackTx2Fut1 = test.tx2.rollbackAsync();
1547+
Thread.sleep(1_000);
1548+
assertThat(rollbackTx2Fut1.isDone(), is(false));
1549+
1550+
// Do another concurrent rollback call just to make sure.
1551+
// If we allow multiple rollback requests by id to be sent concurrently, the outcome might be different.
1552+
var rollbackTx2Fut2 = test.tx2.rollbackAsync();
1553+
Thread.sleep(1_000);
1554+
assertThat(rollbackTx2Fut2.isDone(), is(false));
1555+
1556+
// Now unblock the transaction.
1557+
assertThat(test.tx1.rollbackAsync(), willSucceedIn(1, TimeUnit.SECONDS));
1558+
1559+
// Requests should rollback.
1560+
assertThat(rollbackTx2Fut1, willSucceedIn(1, TimeUnit.SECONDS));
1561+
assertThat(rollbackTx2Fut2, willSucceedIn(1, TimeUnit.SECONDS));
1562+
1563+
var ex = assertThrows(TransactionException.class, () -> ClientTransaction.get(test.tx2));
1564+
assertThat(ex.getMessage(), containsString("Transaction is already finished"));
1565+
assertThat(ex.getMessage(), containsString("committed=false"));
1566+
1567+
KeyValueView<Tuple, Tuple> kvView = table().keyValueView();
1568+
assertThat(kvView.removeAllAsync(null, test.tuples.subList(0, 2)), willSucceedIn(5, TimeUnit.SECONDS));
1569+
}
15151570
}
15161571

15171572
@ParameterizedTest

0 commit comments

Comments
 (0)