Skip to content

Commit 8c10829

Browse files
authored
IGNITE-28380 Fix NPE during txn cleanup (#7887)
1 parent 9108d15 commit 8c10829

File tree

2 files changed

+48
-36
lines changed

2 files changed

+48
-36
lines changed

modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/handlers/TxStateCommitPartitionReplicaRequestHandler.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import static java.util.concurrent.CompletableFuture.completedFuture;
2121
import static java.util.concurrent.TimeUnit.SECONDS;
22+
import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
2223
import static org.apache.ignite.internal.tx.TxState.ABANDONED;
2324
import static org.apache.ignite.internal.tx.TxState.FINISHING;
2425
import static org.apache.ignite.internal.tx.TxState.PENDING;
@@ -46,6 +47,7 @@
4647
import org.apache.ignite.internal.tx.impl.PlacementDriverHelper;
4748
import org.apache.ignite.internal.tx.impl.TxMessageSender;
4849
import org.apache.ignite.internal.tx.impl.TxRecoveryEngine;
50+
import org.apache.ignite.internal.tx.message.RowIdMessage;
4951
import org.apache.ignite.internal.tx.message.TxStateCommitPartitionRequest;
5052
import org.apache.ignite.internal.tx.storage.state.TxStatePartitionStorage;
5153
import org.jetbrains.annotations.Nullable;
@@ -117,7 +119,7 @@ public CompletableFuture<TransactionMeta> handle(TxStateCommitPartitionRequest r
117119
request.senderCurrentConsistencyToken(),
118120
senderGroupId,
119121
senderId,
120-
request.rowId().asRowId(),
122+
extractRowId(request.rowId()),
121123
request.newestCommitTimestamp()
122124
);
123125
} else {
@@ -168,6 +170,12 @@ private CompletableFuture<TransactionMeta> triggerTxRecoveryOnTxStateResolutionI
168170
// - txn is not finished, volatile state is lost
169171
// - txn was finished, state was vacuumized
170172
// both mean primary replica resolution path.
173+
if (rowId == null) {
174+
throw new IllegalStateException(format("Failed to resolve transaction state for transaction "
175+
+ "using primary replica path, because row id is not provided [txId={}, commitGroupId={}, tableId={}, "
176+
+ "senderGroupId={}, readTimestamp={}].", txId, commitGroupId, tableId, senderGroupId, readTimestamp));
177+
}
178+
171179
return resolveTxStateFromPrimaryReplica(
172180
txId,
173181
tableId,
@@ -321,4 +329,9 @@ private CompletableFuture<TransactionMeta> resolveTxStateFromPrimaryReplica(
321329
private void markAbandoned(UUID txId) {
322330
txManager.updateTxMeta(txId, stateMeta -> stateMeta != null ? stateMeta.abandoned() : null);
323331
}
332+
333+
@Nullable
334+
private static RowId extractRowId(@Nullable RowIdMessage rowIdMessage) {
335+
return rowIdMessage == null ? null : rowIdMessage.asRowId();
336+
}
324337
}

modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestSender.java

Lines changed: 34 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
import static java.util.stream.Collectors.toMap;
2525
import static org.apache.ignite.internal.logger.Loggers.toThrottledLogger;
2626
import static org.apache.ignite.internal.tx.TxStateMeta.builder;
27-
import static org.apache.ignite.internal.tx.impl.TxStateResolutionParameters.txStateResolutionParameters;
2827
import static org.apache.ignite.internal.util.ExceptionUtils.sneakyThrow;
2928
import static org.apache.ignite.internal.util.IgniteUtils.scheduleRetry;
3029

@@ -51,9 +50,7 @@
5150
import org.apache.ignite.internal.replicator.ReplicatorRecoverableExceptions;
5251
import org.apache.ignite.internal.replicator.ZonePartitionId;
5352
import org.apache.ignite.internal.tx.PartitionEnlistment;
54-
import org.apache.ignite.internal.tx.TransactionMeta;
5553
import org.apache.ignite.internal.tx.TxState;
56-
import org.apache.ignite.internal.tx.TxStateMeta;
5754
import org.apache.ignite.internal.tx.message.CleanupReplicatedInfo;
5855
import org.apache.ignite.internal.tx.message.CleanupReplicatedInfoMessage;
5956
import org.apache.ignite.internal.tx.message.TxCleanupMessageErrorResponse;
@@ -149,41 +146,25 @@ private void onCleanupReplicated(CleanupReplicatedInfo info) {
149146
});
150147

151148
if (ctx != null && ctx.partitions.isEmpty()) {
152-
markTxnCleanupReplicated(info.txId(), ctx.txState, ctx.commitPartitionId);
149+
markTxnCleanupReplicated(info.txId(), ctx.txState, ctx.commitTimestamp, ctx.commitPartitionId);
153150

154151
writeIntentsReplicated.remove(info.txId());
155152
}
156153
}
157154

158-
private void markTxnCleanupReplicated(UUID txId, TxState state, ZonePartitionId commitPartitionId) {
155+
private void markTxnCleanupReplicated(
156+
UUID txId,
157+
TxState state,
158+
@Nullable HybridTimestamp commitTimestamp,
159+
ZonePartitionId commitPartitionId
160+
) {
159161
long cleanupCompletionTimestamp = System.currentTimeMillis();
160162

161-
TxStateMeta txStateMeta = txStateVolatileStorage.state(txId);
162-
final CompletableFuture<HybridTimestamp> commitTimestampFuture;
163-
if (state == TxState.COMMITTED && (txStateMeta == null || txStateMeta.commitTimestamp() == null)) {
164-
commitTimestampFuture = placementDriverHelper.awaitPrimaryReplicaWithExceptionHandling(commitPartitionId)
165-
.thenCompose(replicaMeta -> {
166-
String primaryNode = replicaMeta.getLeaseholder();
167-
HybridTimestamp startTime = replicaMeta.getStartTime();
168-
return txMessageSender.resolveTxStateFromCommitPartition(
169-
txStateResolutionParameters().txId(txId).commitGroupId(commitPartitionId).build(),
170-
primaryNode,
171-
startTime.longValue()
172-
)
173-
.thenApply(TransactionMeta::commitTimestamp);
174-
}
175-
);
176-
} else {
177-
HybridTimestamp existingCommitTs = txStateMeta == null ? null : txStateMeta.commitTimestamp();
178-
commitTimestampFuture = CompletableFuture.completedFuture(existingCommitTs);
179-
}
180-
181-
commitTimestampFuture.thenAccept(commitTimestamp ->
182-
txStateVolatileStorage.updateMeta(txId, oldMeta -> builder(oldMeta, state)
183-
.commitPartitionId(commitPartitionId)
184-
.commitTimestamp(commitTimestamp)
185-
.cleanupCompletionTimestamp(cleanupCompletionTimestamp)
186-
.build())
163+
txStateVolatileStorage.updateMeta(txId, oldMeta -> builder(oldMeta, state)
164+
.commitPartitionId(commitPartitionId)
165+
.commitTimestamp(commitTimestamp)
166+
.cleanupCompletionTimestamp(cleanupCompletionTimestamp)
167+
.build()
187168
);
188169
}
189170

@@ -228,7 +209,12 @@ public CompletableFuture<Void> cleanup(
228209
if (commitPartitionId != null) {
229210
writeIntentsReplicated.put(
230211
txId,
231-
new CleanupContext(commitPartitionId, enlistedPartitions.keySet(), commit ? TxState.COMMITTED : TxState.ABORTED)
212+
new CleanupContext(
213+
commitPartitionId,
214+
enlistedPartitions.keySet(),
215+
commit ? TxState.COMMITTED : TxState.ABORTED,
216+
commitTimestamp
217+
)
232218
);
233219
}
234220

@@ -280,8 +266,12 @@ private CompletableFuture<Void> cleanup(
280266
// Start tracking the partitions we want to learn the replication confirmation from.
281267
writeIntentsReplicated.put(
282268
txId,
283-
new CleanupContext(commitPartitionId, new HashSet<>(partitionIds.keySet()),
284-
commit ? TxState.COMMITTED : TxState.ABORTED)
269+
new CleanupContext(
270+
commitPartitionId,
271+
new HashSet<>(partitionIds.keySet()),
272+
commit ? TxState.COMMITTED : TxState.ABORTED,
273+
commitTimestamp
274+
)
285275
);
286276
}
287277

@@ -492,10 +482,19 @@ private static class CleanupContext {
492482
*/
493483
private final TxState txState;
494484

495-
private CleanupContext(ZonePartitionId commitPartitionId, Set<ZonePartitionId> partitions, TxState txState) {
485+
@Nullable
486+
private final HybridTimestamp commitTimestamp;
487+
488+
private CleanupContext(
489+
ZonePartitionId commitPartitionId,
490+
Set<ZonePartitionId> partitions,
491+
TxState txState,
492+
@Nullable HybridTimestamp commitTimestamp
493+
) {
496494
this.commitPartitionId = commitPartitionId;
497495
this.partitions = partitions;
498496
this.txState = txState;
497+
this.commitTimestamp = commitTimestamp;
499498
}
500499
}
501500

0 commit comments

Comments
 (0)