|
21 | 21 | import static java.util.Collections.emptyList; |
22 | 22 | import static java.util.Comparator.comparing; |
23 | 23 | import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl; |
| 24 | +import static org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrowFast; |
24 | 25 | import static org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrowWithCauseOrSuppressed; |
25 | 26 | import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedFast; |
| 27 | +import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedIn; |
26 | 28 | import static org.awaitility.Awaitility.await; |
27 | 29 | import static org.hamcrest.MatcherAssert.assertThat; |
28 | 30 | import static org.hamcrest.Matchers.containsString; |
| 31 | +import static org.hamcrest.Matchers.is; |
29 | 32 | import static org.hamcrest.Matchers.startsWith; |
30 | 33 | import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; |
31 | 34 | import static org.junit.jupiter.api.Assertions.assertEquals; |
|
71 | 74 | import org.apache.ignite.internal.testframework.IgniteTestUtils; |
72 | 75 | import org.apache.ignite.internal.tx.Lock; |
73 | 76 | import org.apache.ignite.internal.tx.TxState; |
| 77 | +import org.apache.ignite.internal.tx.impl.DeadlockPreventionPolicyImpl.TxIdComparators; |
| 78 | +import org.apache.ignite.internal.tx.impl.TxManagerImpl; |
74 | 79 | import org.apache.ignite.internal.util.CollectionUtils; |
75 | 80 | import org.apache.ignite.lang.ErrorGroups; |
76 | 81 | import org.apache.ignite.lang.ErrorGroups.Transactions; |
|
90 | 95 | import org.apache.ignite.tx.TransactionException; |
91 | 96 | import org.apache.ignite.tx.TransactionOptions; |
92 | 97 | import org.junit.jupiter.api.AfterEach; |
93 | | -import org.junit.jupiter.api.Disabled; |
| 98 | +import org.junit.jupiter.api.Nested; |
94 | 99 | import org.junit.jupiter.api.Test; |
| 100 | +import org.junit.jupiter.api.condition.EnabledIf; |
95 | 101 | import org.junit.jupiter.params.ParameterizedTest; |
96 | 102 | import org.junit.jupiter.params.provider.Arguments; |
97 | 103 | import org.junit.jupiter.params.provider.MethodSource; |
@@ -1400,34 +1406,112 @@ public void testRollbackDoesNotBlockOnLockConflict(KillTestContext ctx) { |
1400 | 1406 | assertThat(kvView.removeAllAsync(null, Arrays.asList(key0, key, key2)), willSucceedFast()); |
1401 | 1407 | } |
1402 | 1408 |
|
1403 | | - @Test |
1404 | | - @Disabled("https://issues.apache.org/jira/browse/IGNITE-27947") |
1405 | | - public void testRollbackDoesNotBlockOnLockConflictDuringFirstRequest() throws InterruptedException { |
1406 | | - // Note: reversed tx priority is required for this test. |
1407 | | - ClientTable table = (ClientTable) table(); |
1408 | | - KeyValueView<Tuple, Tuple> kvView = table().keyValueView(); |
| 1409 | + static boolean isUsingReverseComparator() { |
| 1410 | + TxIdComparators comparator = IgniteTestUtils.getFieldValue(null, TxManagerImpl.class, "DEFAULT_TX_ID_COMPARATOR"); |
| 1411 | + return comparator == TxIdComparators.REVERSED; |
| 1412 | + } |
1409 | 1413 |
|
1410 | | - Map<Partition, ClusterNode> map = table.partitionDistribution().primaryReplicasAsync().join(); |
1411 | | - List<Tuple> tuples0 = generateKeysForPartition(100, 10, map, 0, table); |
| 1414 | + @EnabledIf("org.apache.ignite.internal.runner.app.client.ItThinClientTransactionsTest#isUsingReverseComparator") |
| 1415 | + @Nested |
| 1416 | + class OnConflictDuringFirstRequest { |
| 1417 | + class Data { |
| 1418 | + final List<Tuple> tuples; |
| 1419 | + final ClientLazyTransaction tx1; |
| 1420 | + final ClientLazyTransaction tx2; |
| 1421 | + final CompletableFuture<?> req2Fut; |
| 1422 | + |
| 1423 | + Data(List<Tuple> tuples, ClientLazyTransaction tx1, ClientLazyTransaction tx2, CompletableFuture<?> req2Fut) { |
| 1424 | + this.tuples = tuples; |
| 1425 | + this.tx1 = tx1; |
| 1426 | + this.tx2 = tx2; |
| 1427 | + this.req2Fut = req2Fut; |
| 1428 | + } |
| 1429 | + } |
1412 | 1430 |
|
1413 | | - // We need a waiter for this scenario. |
1414 | | - Tuple key = tuples0.get(0); |
1415 | | - Tuple val = val("1"); |
| 1431 | + Data prepareBlockedTransaction(KillTestContext ctx) throws InterruptedException { |
| 1432 | + ClientTable table = (ClientTable) table(); |
| 1433 | + ClientSql sql = (ClientSql) client().sql(); |
| 1434 | + |
| 1435 | + Map<Partition, ClusterNode> map = table.partitionDistribution().primaryReplicasAsync().join(); |
| 1436 | + Entry<Partition, ClusterNode> mapping = map.entrySet().iterator().next(); |
| 1437 | + List<Tuple> tuples0 = generateKeysForPartition(100, 10, map, (int) mapping.getKey().id(), table); |
| 1438 | + Ignite server = server(mapping.getValue()); |
| 1439 | + IgniteImpl ignite = unwrapIgniteImpl(server); |
| 1440 | + |
| 1441 | + // Init SQL mappings. |
| 1442 | + Tuple key0 = tuples0.get(0); |
| 1443 | + sql.execute(format("INSERT INTO %s (%s, %s) VALUES (?, ?)", TABLE_NAME, COLUMN_KEY, COLUMN_VAL), |
| 1444 | + key0.intValue(0), key0.intValue(0) + ""); |
| 1445 | + await().atMost(2, TimeUnit.SECONDS) |
| 1446 | + .until(() -> sql.partitionAwarenessCachedMetas().stream().allMatch(PartitionMappingProvider::ready)); |
| 1447 | + |
| 1448 | + // We need a waiter for this scenario. |
| 1449 | + Tuple key = tuples0.get(1); |
| 1450 | + |
| 1451 | + ClientLazyTransaction tx2 = (ClientLazyTransaction) client().transactions().begin(); |
| 1452 | + ClientLazyTransaction tx1 = (ClientLazyTransaction) client().transactions().begin(); |
| 1453 | + |
| 1454 | + // Starts the transaction. |
| 1455 | + assertThat(ctx.put.apply(client(), tx1, key), willSucceedIn(120, TimeUnit.SECONDS)); |
| 1456 | + |
| 1457 | + await().atMost(3, TimeUnit.SECONDS).until(() -> { |
| 1458 | + Iterator<Lock> locks = ignite.txManager().lockManager().locks(tx1.startedTx().txId()); |
| 1459 | + |
| 1460 | + int count = CollectionUtils.count(locks); |
| 1461 | + return count == 2; |
| 1462 | + }); |
| 1463 | + |
| 1464 | + // Will wait for lock. |
| 1465 | + CompletableFuture<?> fut2 = ctx.put.apply(client(), tx2, key); |
| 1466 | + Thread.sleep(500); |
| 1467 | + |
| 1468 | + assertThat(fut2.isDone(), is(false)); |
| 1469 | + IgniteTestUtils.assertThrows(AssertionError.class, () -> ClientTransaction.get(tx2), "Transaction is starting"); |
1416 | 1470 |
|
1417 | | - ClientLazyTransaction tx1 = (ClientLazyTransaction) client().transactions().begin(); |
1418 | | - ClientLazyTransaction tx2 = (ClientLazyTransaction) client().transactions().begin(); |
| 1471 | + return new Data(tuples0, tx1, tx2, fut2); |
| 1472 | + } |
| 1473 | + |
| 1474 | + @ParameterizedTest |
| 1475 | + @MethodSource("org.apache.ignite.internal.runner.app.client.ItThinClientTransactionsTest#killTestContextFactory") |
| 1476 | + public void testRollbackDoesNotBlock(KillTestContext ctx) throws InterruptedException { |
| 1477 | + var test = prepareBlockedTransaction(ctx); |
1419 | 1478 |
|
1420 | | - kvView.put(tx1, key, val); |
| 1479 | + // Rollback should not be blocked. |
| 1480 | + assertThat(test.tx2.rollbackAsync(), willSucceedIn(1, TimeUnit.SECONDS)); |
| 1481 | + assertThat(test.req2Fut, willThrowFast( |
| 1482 | + ctx.expectedErr, |
| 1483 | + "Can't acquire a lock because transaction is already finished")); |
1421 | 1484 |
|
1422 | | - // Will wait for lock. |
1423 | | - CompletableFuture<Void> fut2 = kvView.putAsync(tx2, key, val); |
1424 | | - assertFalse(fut2.isDone()); |
| 1485 | + assertThat(test.tx1.rollbackAsync(), willSucceedIn(1, TimeUnit.SECONDS)); |
1425 | 1486 |
|
1426 | | - Thread.sleep(500); |
| 1487 | + var ex = assertThrows(TransactionException.class, () -> ClientTransaction.get(test.tx2)); |
| 1488 | + assertThat(ex.getMessage(), containsString("Transaction is already finished")); |
| 1489 | + assertThat(ex.getMessage(), containsString("committed=false")); |
| 1490 | + |
| 1491 | + KeyValueView<Tuple, Tuple> kvView = table().keyValueView(); |
| 1492 | + assertThat(kvView.removeAllAsync(null, test.tuples.subList(0, 2)), willSucceedIn(5, TimeUnit.SECONDS)); |
| 1493 | + } |
1427 | 1494 |
|
1428 | | - // Rollback should not be blocked. |
1429 | | - assertThat(tx2.rollbackAsync(), willSucceedFast()); |
1430 | | - assertThat(tx1.rollbackAsync(), willSucceedFast()); |
| 1495 | + @ParameterizedTest |
| 1496 | + @MethodSource("org.apache.ignite.internal.runner.app.client.ItThinClientTransactionsTest#killTestContextFactory") |
| 1497 | + public void testOperationsBlockWaitingForLock(KillTestContext ctx) throws InterruptedException { |
| 1498 | + var test = prepareBlockedTransaction(ctx); |
| 1499 | + |
| 1500 | + CompletableFuture<?> fut3 = ctx.put.apply(client(), test.tx2, test.tuples.get(2)); |
| 1501 | + |
| 1502 | + assertDoesNotThrow(test.tx1::startedTx); |
| 1503 | + |
| 1504 | + assertThat(test.tx1.rollbackAsync(), willSucceedIn(1, TimeUnit.SECONDS)); |
| 1505 | + |
| 1506 | + // After the lock is open, the requests are free to complete. |
| 1507 | + assertThat(test.req2Fut, willSucceedIn(1, TimeUnit.SECONDS)); |
| 1508 | + assertThat(fut3, willSucceedIn(1, TimeUnit.SECONDS)); |
| 1509 | + |
| 1510 | + assertThat(test.tx2.rollbackAsync(), willSucceedIn(1, TimeUnit.SECONDS)); |
| 1511 | + |
| 1512 | + KeyValueView<Tuple, Tuple> kvView = table().keyValueView(); |
| 1513 | + assertThat(kvView.removeAllAsync(null, test.tuples.subList(0, 3)), willSucceedIn(5, TimeUnit.SECONDS)); |
| 1514 | + } |
1431 | 1515 | } |
1432 | 1516 |
|
1433 | 1517 | @ParameterizedTest |
|
0 commit comments