Skip to content

Commit 613f646

Browse files
authored
IGNITE-28315 Improve index row resolution for RO scan (#7895)
1 parent 98261ce commit 613f646

File tree

1 file changed

+63
-8
lines changed

1 file changed

+63
-8
lines changed

modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java

Lines changed: 63 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import static org.apache.ignite.internal.tx.TxStateMetaUnknown.txStateMetaUnknown;
5050
import static org.apache.ignite.internal.tx.impl.TxStateResolutionParameters.txStateResolutionParameters;
5151
import static org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty;
52+
import static org.apache.ignite.internal.util.CollectionUtils.view;
5253
import static org.apache.ignite.internal.util.CompletableFutures.allOfToList;
5354
import static org.apache.ignite.internal.util.CompletableFutures.emptyCollectionCompletedFuture;
5455
import static org.apache.ignite.internal.util.CompletableFutures.emptyListCompletedFuture;
@@ -208,11 +209,13 @@
208209
import org.apache.ignite.internal.tx.message.TxMessageGroup;
209210
import org.apache.ignite.internal.tx.message.TxStatePrimaryReplicaRequest;
210211
import org.apache.ignite.internal.tx.message.WriteIntentSwitchReplicaRequestBase;
212+
import org.apache.ignite.internal.util.CompletableFutures;
211213
import org.apache.ignite.internal.util.Cursor;
212214
import org.apache.ignite.internal.util.CursorUtils;
213215
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
214216
import org.apache.ignite.internal.util.IgniteUtils;
215217
import org.apache.ignite.internal.util.Lazy;
218+
import org.apache.ignite.internal.util.Pair;
216219
import org.apache.ignite.internal.util.PendingComparableValuesTracker;
217220
import org.apache.ignite.lang.ErrorGroups.Replicator;
218221
import org.apache.ignite.lang.ErrorGroups.Transactions;
@@ -1309,19 +1312,71 @@ private CompletableFuture<Void> continueReadOnlyIndexScan(
13091312
return nullCompletedFuture();
13101313
}
13111314

1312-
IndexRow indexRow = cursor.next();
1315+
List<Pair<IndexRow, CompletableFuture<TimedBinaryRow>>> indexRowWithWriteIntentFutures = null;
1316+
int resultStartIndex = result.size();
1317+
1318+
while (result.size() < batchSize && cursor.hasNext()) {
1319+
IndexRow indexRow = cursor.next();
1320+
RowId rowId = indexRow.rowId();
13131321

1314-
RowId rowId = indexRow.rowId();
1322+
CompletableFuture<@Nullable TimedBinaryRow> resolutionResult = resolvePlainReadResult(rowId, null, readTimestamp);
13151323

1316-
return resolvePlainReadResult(rowId, null, readTimestamp).thenComposeAsync(resolvedReadResult -> {
1317-
BinaryRow binaryRow = upgrade(binaryRow(resolvedReadResult), tableVersion);
1324+
if (resolutionResult.isDone() && !resolutionResult.isCompletedExceptionally()) {
1325+
BinaryRow binaryRow = upgrade(binaryRow(resolutionResult.join()), tableVersion);
13181326

1319-
if (binaryRow != null && indexRowMatches(indexRow, binaryRow, schemaAwareIndexStorage)) {
1320-
result.add(binaryRow);
1327+
if (binaryRow != null && indexRowMatches(indexRow, binaryRow, schemaAwareIndexStorage)) {
1328+
result.add(binaryRow);
1329+
}
1330+
} else {
1331+
if (indexRowWithWriteIntentFutures == null) {
1332+
indexRowWithWriteIntentFutures = new ArrayList<>();
1333+
}
1334+
1335+
indexRowWithWriteIntentFutures.add(new Pair<>(indexRow, resolutionResult));
1336+
result.add(null); // Placeholder; will be filled or removed after write intent resolution.
13211337
}
1338+
}
1339+
1340+
if (nullOrEmpty(indexRowWithWriteIntentFutures)) {
1341+
return nullCompletedFuture();
1342+
}
1343+
1344+
List<Pair<IndexRow, CompletableFuture<TimedBinaryRow>>> finalIndexRowWithWriteIntentFutures = indexRowWithWriteIntentFutures;
1345+
return CompletableFutures.allOf(view(indexRowWithWriteIntentFutures, Pair::getSecond))
1346+
.thenComposeAsync(unused -> {
1347+
int futureIdx = 0;
1348+
1349+
ListIterator<BinaryRow> it = result.listIterator(resultStartIndex);
13221350

1323-
return continueReadOnlyIndexScan(schemaAwareIndexStorage, cursor, readTimestamp, batchSize, result, tableVersion);
1324-
}, scanRequestExecutor);
1351+
while (it.hasNext()) {
1352+
BinaryRow row = it.next();
1353+
1354+
if (row == null) {
1355+
Pair<IndexRow, CompletableFuture<TimedBinaryRow>> indexRowWithWriteIntent
1356+
= finalIndexRowWithWriteIntentFutures.get(futureIdx);
1357+
IndexRow indexRow = indexRowWithWriteIntent.getFirst();
1358+
TimedBinaryRow resolved = indexRowWithWriteIntent.getSecond().join();
1359+
futureIdx++;
1360+
1361+
BinaryRow binaryRow = upgrade(binaryRow(resolved), tableVersion);
1362+
1363+
if (binaryRow != null && indexRowMatches(indexRow, binaryRow, schemaAwareIndexStorage)) {
1364+
it.set(binaryRow);
1365+
} else {
1366+
it.remove();
1367+
}
1368+
}
1369+
}
1370+
1371+
assert futureIdx == finalIndexRowWithWriteIntentFutures.size()
1372+
: "Expected " + finalIndexRowWithWriteIntentFutures.size() + " iterations but was " + futureIdx;
1373+
1374+
if (result.size() < batchSize && cursor.hasNext()) {
1375+
return continueReadOnlyIndexScan(schemaAwareIndexStorage, cursor, readTimestamp, batchSize, result, tableVersion);
1376+
}
1377+
1378+
return nullCompletedFuture();
1379+
}, scanRequestExecutor);
13251380
}
13261381

13271382
/**

0 commit comments

Comments
 (0)