Skip to content

Commit c3fcdcc

Browse files
CyrillKirill Sizov
andauthored
IGNITE-28385 Fix missing enlistment in client commit request handler (#7899)
Co-authored-by: Kirill Sizov <sizov.kirill.y@gmail.com>
1 parent a1df12f commit c3fcdcc

File tree

10 files changed

+92
-23
lines changed

10 files changed

+92
-23
lines changed

modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/tx/ClientTransactionCommitRequest.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -172,8 +172,11 @@ public static boolean merge(InternalTable table, int partId, String consistentId
172172
if (existing == null) {
173173
tx.enlist(replicationGroupId, table.tableId(), consistentId, token);
174174
} else {
175+
boolean tokenMatch = existing.consistencyToken() == token;
176+
existing.addTableId(table.tableId());
177+
175178
// Enlistment tokens should be equal on commit.
176-
return !commit || existing.consistencyToken() == token;
179+
return !commit || tokenMatch;
177180
}
178181

179182
return true;
@@ -207,5 +210,10 @@ String consistentId() {
207210
long token() {
208211
return token;
209212
}
213+
214+
@Override
215+
public String toString() {
216+
return "(tableId=" + tableId + ", partId=" + partitionId + ", node=" + consistentId + ")";
217+
}
210218
}
211219
}

modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -272,7 +272,13 @@ public CompletableFuture<Void> cleanup(
272272
}
273273

274274
@Override
275-
public CompletableFuture<Void> cleanup(ZonePartitionId commitPartitionId, String node, UUID txId) {
275+
public CompletableFuture<Void> cleanup(
276+
ZonePartitionId commitPartitionId,
277+
String node,
278+
UUID txId,
279+
boolean commit,
280+
@Nullable HybridTimestamp commitTimestamp
281+
) {
276282
return nullCompletedFuture();
277283
}
278284

modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/handlers/TxFinishReplicaRequestHandler.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -224,10 +224,13 @@ private CompletableFuture<TransactionResult> finishAndCleanup(
224224
.map(entry -> new EnlistedPartitionGroup(entry.getKey(), entry.getValue().tableIds()))
225225
.collect(toList());
226226
return finishTransaction(enlistedPartitionGroups, txId, commit, commitTimestamp)
227-
.thenCompose(txResult ->
228-
txManager.cleanup(replicationGroupId, enlistedPartitions, commit, commitTimestamp, txId)
229-
.thenApply(v -> txResult)
230-
);
227+
.thenCompose(txResult -> {
228+
boolean actualCommit = txResult.transactionState() == COMMITTED;
229+
HybridTimestamp actualCommitTs = txResult.commitTimestamp();
230+
231+
return txManager.cleanup(replicationGroupId, enlistedPartitions, actualCommit, actualCommitTs, txId)
232+
.thenApply(v -> txResult);
233+
});
231234
}
232235

233236
private static void throwIfSchemaValidationOnCommitFailed(CompatValidationResult validationResult, TransactionResult txResult) {

modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/handlers/TxRecoveryMessageHandler.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.ignite.internal.partition.replicator.handlers;
1919

20+
import static org.apache.ignite.internal.tx.TxState.COMMITTED;
2021
import static org.apache.ignite.internal.tx.TxState.isFinalState;
2122

2223
import java.util.UUID;
@@ -73,7 +74,8 @@ public CompletableFuture<?> handle(TxRecoveryMessage request, UUID senderId) {
7374
// Check whether a transaction has already been finished.
7475
if (txMeta != null && isFinalState(txMeta.txState())) {
7576
// Tx recovery message is processed on the commit partition.
76-
return txRecoveryEngine.runCleanupOnNode(replicationGroupId, txId, senderId);
77+
boolean commit = txMeta.txState() == COMMITTED;
78+
return txRecoveryEngine.runCleanupOnNode(replicationGroupId, txId, senderId, commit, txMeta.commitTimestamp());
7779
}
7880

7981
LOG.info(

modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2050,7 +2050,7 @@ private static void configureTxManager(TxManager txManager) {
20502050
doAnswer(invocation -> nullCompletedFuture())
20512051
.when(txManager).finish(any(), any(), anyBoolean(), any(), anyBoolean(), anyBoolean(), any(), any());
20522052
doAnswer(invocation -> nullCompletedFuture())
2053-
.when(txManager).cleanup(any(), anyString(), any());
2053+
.when(txManager).cleanup(any(), anyString(), any(), anyBoolean(), any());
20542054
}
20552055

20562056
private void testWritesAreSuppliedWithRequiredCatalogVersion(RequestType requestType, RwListenerInvocation listenerInvocation) {

modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/ZonePartitionReplicaListenerTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -991,7 +991,7 @@ private static void configureTxManager(TxManager txManager) {
991991
doAnswer(invocation -> nullCompletedFuture())
992992
.when(txManager).finish(any(), any(), anyBoolean(), any(), anyBoolean(), anyBoolean(), any(), any());
993993
doAnswer(invocation -> nullCompletedFuture())
994-
.when(txManager).cleanup(any(), anyString(), any());
994+
.when(txManager).cleanup(any(), anyString(), any(), anyBoolean(), any());
995995
}
996996

997997
private void upsertInNewTxFor(TestKey key) {

modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -263,9 +263,17 @@ CompletableFuture<Void> cleanup(
263263
* @param commitPartitionId Commit partition id.
264264
* @param node Target node.
265265
* @param txId Transaction id.
266+
* @param commit Whether the transaction was committed.
267+
* @param commitTimestamp Commit timestamp, if committed.
266268
* @return Completable future of Void.
267269
*/
268-
CompletableFuture<Void> cleanup(ZonePartitionId commitPartitionId, String node, UUID txId);
270+
CompletableFuture<Void> cleanup(
271+
ZonePartitionId commitPartitionId,
272+
String node,
273+
UUID txId,
274+
boolean commit,
275+
@Nullable HybridTimestamp commitTimestamp
276+
);
269277

270278
/**
271279
* Locally vacuums no longer needed transactional resources, like txnState both persistent and volatile.

modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestSender.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -188,15 +188,23 @@ private void markTxnCleanupReplicated(UUID txId, TxState state, ZonePartitionId
188188
}
189189

190190
/**
191-
* Sends unlock request to the nodes than initiated recovery.
191+
* Sends cleanup request to the node that initiated recovery.
192192
*
193193
* @param commitPartitionId Commit partition id.
194194
* @param node Target node.
195195
* @param txId Transaction id.
196+
* @param commit Whether the transaction was committed.
197+
* @param commitTimestamp Commit timestamp, if committed.
196198
* @return Completable future of Void.
197199
*/
198-
public CompletableFuture<Void> cleanup(ZonePartitionId commitPartitionId, String node, UUID txId) {
199-
return sendCleanupMessageWithRetries(commitPartitionId, false, null, txId, node, null, RETRY_INITIAL_TIMEOUT_MS, 0);
200+
public CompletableFuture<Void> cleanup(
201+
ZonePartitionId commitPartitionId,
202+
String node,
203+
UUID txId,
204+
boolean commit,
205+
@Nullable HybridTimestamp commitTimestamp
206+
) {
207+
return sendCleanupMessageWithRetries(commitPartitionId, commit, commitTimestamp, txId, node, null, RETRY_INITIAL_TIMEOUT_MS, 0);
200208
}
201209

202210
/**

modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1163,8 +1163,14 @@ public CompletableFuture<Void> cleanup(
11631163
}
11641164

11651165
@Override
1166-
public CompletableFuture<Void> cleanup(ZonePartitionId commitPartitionId, String node, UUID txId) {
1167-
return txCleanupRequestSender.cleanup(commitPartitionId, node, txId);
1166+
public CompletableFuture<Void> cleanup(
1167+
ZonePartitionId commitPartitionId,
1168+
String node,
1169+
UUID txId,
1170+
boolean commit,
1171+
@Nullable HybridTimestamp commitTimestamp
1172+
) {
1173+
return txCleanupRequestSender.cleanup(commitPartitionId, node, txId, commit, commitTimestamp);
11681174
}
11691175

11701176
@Override

modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxRecoveryEngine.java

Lines changed: 36 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
2323
import static org.apache.ignite.internal.tx.TransactionLogUtils.formatTxInfo;
2424
import static org.apache.ignite.internal.tx.TxState.ABORTED;
25+
import static org.apache.ignite.internal.tx.TxState.COMMITTED;
2526
import static org.apache.ignite.internal.tx.TxState.FINISHING;
2627
import static org.apache.ignite.internal.tx.TxState.isFinalState;
2728
import static org.apache.ignite.internal.tx.TxStateMetaFinishing.castToFinishing;
@@ -33,6 +34,7 @@
3334
import java.util.UUID;
3435
import java.util.concurrent.CompletableFuture;
3536
import java.util.function.Function;
37+
import org.apache.ignite.internal.hlc.HybridTimestamp;
3638
import org.apache.ignite.internal.hlc.HybridTimestampTracker;
3739
import org.apache.ignite.internal.network.ClusterNodeResolver;
3840
import org.apache.ignite.internal.network.InternalClusterNode;
@@ -127,10 +129,20 @@ public CompletableFuture<TransactionMeta> triggerTxRecovery(
127129
})
128130
.thenCompose(Function.identity())
129131
.whenComplete((v, ex) -> {
130-
runCleanupOnNode(commitPartitionId, txId, commitPartitionNode);
132+
// Cleanup must use the actual resolved tx state; using commit=false unconditionally
133+
// would corrupt data when the tx is actually COMMITTED (abort cleanup races with
134+
// commit cleanup, producing a mix of committed and aborted rows).
135+
boolean commit = v != null && v.txState() == COMMITTED;
136+
@Nullable HybridTimestamp commitTs = v != null ? v.commitTimestamp() : null;
137+
138+
runCleanupOnNode(commitPartitionId, txId, commitPartitionNode, commit, commitTs);
131139

132140
if (senderGroupId != null && senderId != null) {
133-
runCleanupOnNode(senderGroupId, txId, senderId);
141+
String senderConsistentId = clusterNodeResolver.getConsistentIdById(senderId);
142+
143+
if (senderConsistentId != null) {
144+
runCleanupOnNode(senderGroupId, txId, senderConsistentId, commit, commitTs);
145+
}
134146
}
135147
});
136148
}
@@ -163,13 +175,21 @@ private static PendingTxPartitionEnlistment createAbandonedTxRecoveryEnlistment(
163175
*
164176
* @param groupId Group id.
165177
* @param txId Transaction id.
166-
* @param nodeId Node id (inconsistent).
178+
* @param nodeId Node id (ephemeral).
179+
* @param commit Whether the transaction was committed.
180+
* @param commitTimestamp Commit timestamp, if committed.
167181
*/
168-
public CompletableFuture<Void> runCleanupOnNode(ZonePartitionId groupId, UUID txId, UUID nodeId) {
169-
// Get node id of the sender to send back cleanup requests.
182+
public CompletableFuture<Void> runCleanupOnNode(
183+
ZonePartitionId groupId,
184+
UUID txId,
185+
UUID nodeId,
186+
boolean commit,
187+
@Nullable HybridTimestamp commitTimestamp
188+
) {
170189
String nodeConsistentId = clusterNodeResolver.getConsistentIdById(nodeId);
171190

172-
return nodeConsistentId == null ? nullCompletedFuture() : runCleanupOnNode(groupId, txId, nodeConsistentId);
191+
return nodeConsistentId == null ? nullCompletedFuture()
192+
: runCleanupOnNode(groupId, txId, nodeConsistentId, commit, commitTimestamp);
173193
}
174194

175195
/**
@@ -178,8 +198,16 @@ public CompletableFuture<Void> runCleanupOnNode(ZonePartitionId groupId, UUID tx
178198
* @param commitPartitionId Commit partition id.
179199
* @param txId Transaction id.
180200
* @param nodeName Node consistent id.
201+
* @param commit Whether the transaction was committed.
202+
* @param commitTimestamp Commit timestamp, if committed.
181203
*/
182-
private CompletableFuture<Void> runCleanupOnNode(ZonePartitionId commitPartitionId, UUID txId, String nodeName) {
183-
return txManager.cleanup(commitPartitionId, nodeName, txId);
204+
private CompletableFuture<Void> runCleanupOnNode(
205+
ZonePartitionId commitPartitionId,
206+
UUID txId,
207+
String nodeName,
208+
boolean commit,
209+
@Nullable HybridTimestamp commitTimestamp
210+
) {
211+
return txManager.cleanup(commitPartitionId, nodeName, txId, commit, commitTimestamp);
184212
}
185213
}

0 commit comments

Comments
 (0)