Skip to content

Commit 72a145d

Browse files
[spanner-to-sourcedb] Add Integration Tests for retryDLQ and retryAllDLQ mode for sharded and non-sharded setup (#3564)
b/457948107 This PR introduces comprehensive integration tests for the Dead Letter Queue (DLQ) retry mechanisms in the Spanner-to-Source Dataflow template. It adds end-to-end verification for both the concurrent batch retry flow (`retryDLQ`) and the streaming retry flow (`retryAllDLQ`), covering both non-sharded and multi-shard schema routing scenarios. ### Tests Added 1. **`SpannerToSourceDBMySQLRetryDLQIT`**: Tests the `retryDLQ` batch job. Validates that it correctly processes and retries DLQ events *alongside* an actively running streaming pipeline by utilizing the active `dlqPubSubConsumer` flow. Uses the overrides file. 2. **`SpannerToSourceDBMySQLRetryAllDLQIT`**: Tests the `retryAllDLQ` batch job. Validates that it correctly processes and retries ALL DLQ events offline when the main pipeline has been safely drained or stopped, utilizing the file-based consumer. Uses the overrides file. 3. **`SpannerToSourceDBShardedMySQLRetryDLQIT`**: The sharded equivalent of `retryDLQ`. Validates that the pipeline successfully evaluates native `migration_shard_id` columns in the source Spanner database to reliably route DLQ/retry events across multiple distributed target instances. Uses session file. 4. **`SpannerToSourceDBShardedMySQLRetryAllDLQIT`**: The sharded equivalent of `retryAllDLQ`. Validates the behavior of dynamic custom shard routing (where ShardIdColumn is not present) by relying on a custom Java shard ID fetcher. Uses overrides file. ### Features & Edge Cases Covered * **DLQ State Integrity:** Consistently verifying that *fixed* items successfully rewrite to MySQL upon retry, while genuinely *un-fixable* items explicitly route back into their appropriate (`severe/` or `retry/`) error buckets without stalling progress. * **Handling Database Constraints:** Generating and resolving retriable database exceptions, including testing resilience against missing parent rows (Foreign Key Violations). * **Handling Logic/Processing Errors:** Simulating unrecoverable severe errors originating inside the runtime execution pipeline (failed custom transformations). * **Active User-Intervention Simulation:** Orchestrating mid-flight environmental repairs. Before retry pipelines are run, the tests actively "fix" the previously generated errors by manually injecting parent rows via JDBC queries and swapping the custom pipeline transformer from `bad` mode to `semi-fixed` mode. * **Multi-Shard Target Routing:** Validating sharding logic by testing both ShardIdColumn flow and Custom Sharding Jar flow. ### Test Setup & Simulated Schema Divergences To accurately simulate volatile production environments, these integration tests operate against highly divergent test schemas between the Spanner source and MySQL target: * Robust type mapping validations utilizing a heavily populated, extensive `AllDataTypes` reference table layout. * **Mismatched Primary Keys**: Guaranteeing successful execution logic in pipelines where the Spanner structure and downstream Source primary keys intentionally diverge. * **Altered / Divergent Columns**: Accommodating architectures where columns have been asynchronously added, deleted, or explicitly renamed across Spanner / Source database platforms.
1 parent d871e1e commit 72a145d

19 files changed

Lines changed: 7345 additions & 1 deletion

File tree

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
/*
2+
* Copyright (C) 2026 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
package com.custom;
17+
18+
import com.google.cloud.teleport.v2.spanner.utils.IShardIdFetcher;
19+
import com.google.cloud.teleport.v2.spanner.utils.ShardIdRequest;
20+
import com.google.cloud.teleport.v2.spanner.utils.ShardIdResponse;
21+
import java.util.Map;
22+
23+
/**
24+
* A custom shard ID fetcher implementation used in sharded integration tests.
25+
*
26+
* <p>This fetcher evaluates the Spanner record's primary keys (either {@code CustomerId} or {@code
27+
* id}) and applies a modulo operation to dynamically route records to specific logical shards
28+
* ({@code testShardA} for odd numbers, {@code testShardB} for even numbers).
29+
*
30+
* <p>It is specifically designed for integration testing (e.g., verifying {@code retryAllDLQ}
31+
* behavior in a sharded schema) to ensure that the pipeline correctly extracts IDs and routes rows
32+
* to their target shards when native {@code migration_shard_id} schema columns are omitted.
33+
*/
34+
public class CustomShardIdFetcherForRetryIT implements IShardIdFetcher {
35+
36+
@Override
37+
public void init(String parameters) {}
38+
39+
@Override
40+
public ShardIdResponse getShardId(ShardIdRequest shardIdRequest) {
41+
Map<String, Object> keys = shardIdRequest.getSpannerRecord();
42+
43+
// Use the Primary Key to identify the correct logical shard
44+
if (keys != null) {
45+
if (keys.containsKey("CustomerId")) {
46+
long customerId = Long.parseLong(keys.get("CustomerId").toString());
47+
long shardIdx = customerId % 2;
48+
49+
ShardIdResponse response = new ShardIdResponse();
50+
if (shardIdx == 0) {
51+
response.setLogicalShardId("testShardB");
52+
} else {
53+
response.setLogicalShardId("testShardA");
54+
}
55+
return response;
56+
} else if (keys.containsKey("id")) {
57+
// Handle AllDataTypes which uses 'id' instead of 'CustomerId'
58+
long id = Long.parseLong(keys.get("id").toString());
59+
long shardIdx = id % 2;
60+
61+
ShardIdResponse response = new ShardIdResponse();
62+
if (shardIdx == 0) {
63+
response.setLogicalShardId("testShardB");
64+
} else {
65+
response.setLogicalShardId("testShardA");
66+
}
67+
return response;
68+
}
69+
}
70+
71+
return new ShardIdResponse();
72+
}
73+
}
Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
/*
2+
* Copyright (C) 2026 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
package com.custom;
17+
18+
import com.google.cloud.teleport.v2.spanner.exceptions.InvalidTransformationException;
19+
import com.google.cloud.teleport.v2.spanner.utils.ISpannerMigrationTransformer;
20+
import com.google.cloud.teleport.v2.spanner.utils.MigrationTransformationRequest;
21+
import com.google.cloud.teleport.v2.spanner.utils.MigrationTransformationResponse;
22+
import java.util.HashMap;
23+
import java.util.Map;
24+
import org.slf4j.Logger;
25+
import org.slf4j.LoggerFactory;
26+
27+
/**
28+
* Custom transformation class used for DLQ testing in Spanner to Source reverse replication.
29+
*
30+
* <p>Overview: This transformer injects severe transformation errors to test the pipeline's Dead
31+
* Letter Queue (DLQ) processing and error handling.
32+
*
33+
* <p>Test Usage modes: - mode=bad: Intentionally throws InvalidTransformationException on rows with
34+
* ID 999 and 888 in the AllDataTypes table. Used during the main pipeline run to force these rows
35+
* into the severe DLQ error bucket. - mode=semi-fixed: Intentionally throws an exception only on
36+
* row ID 888. This simulates a scenario where a user pushes a corrected transformation JAR for the
37+
* retry pipeline. Row 999 successfully migrates during the retry, while row 888 correctly fails
38+
* again and gets routed back into the DLQ.
39+
*
40+
* <p>For the Orders table, it translates the OrderSource column into a custom LegacyOrderSystem
41+
* column to validate standard data transformation operations.
42+
*/
43+
public class SpannerToSourceDbRetryTransformation implements ISpannerMigrationTransformer {
44+
45+
private static final Logger LOG =
46+
LoggerFactory.getLogger(SpannerToSourceDbRetryTransformation.class);
47+
48+
private String mode = "bad";
49+
50+
@Override
51+
public void init(String parameters) {
52+
LOG.info("init called with {}", parameters);
53+
if (parameters != null) {
54+
String[] parts = parameters.split(",");
55+
for (String part : parts) {
56+
String[] kv = part.split("=");
57+
if (kv.length == 2) {
58+
if (kv[0].trim().equals("mode")) {
59+
this.mode = kv[1].trim();
60+
}
61+
}
62+
}
63+
}
64+
LOG.info("Mode set to {}", this.mode);
65+
}
66+
67+
@Override
68+
public MigrationTransformationResponse toSpannerRow(MigrationTransformationRequest request)
69+
throws InvalidTransformationException {
70+
return new MigrationTransformationResponse(new HashMap<>(), false);
71+
}
72+
73+
@Override
74+
public MigrationTransformationResponse toSourceRow(MigrationTransformationRequest request)
75+
throws InvalidTransformationException {
76+
String tableName = request.getTableName();
77+
Map<String, Object> requestRow = request.getRequestRow();
78+
Map<String, Object> responseRow = new HashMap<>();
79+
80+
LOG.info("Processing table {} in mode {}", tableName, mode);
81+
82+
if (tableName.equalsIgnoreCase("Orders")) {
83+
Object sourceObj = requestRow.get("OrderSource");
84+
String source = (sourceObj != null) ? (String) sourceObj : "UNKNOWN_SYSTEM";
85+
// Enclose in single quotes since DML generator injects custom responses directly as raw
86+
// values
87+
String legacySys = "'" + source + "_v1'";
88+
responseRow.put("LegacyOrderSystem", legacySys);
89+
return new MigrationTransformationResponse(responseRow, false);
90+
} else if (tableName.equalsIgnoreCase("AllDataTypes")) {
91+
Object idObj = requestRow.get("id");
92+
Long id = null;
93+
if (idObj instanceof Number) {
94+
id = ((Number) idObj).longValue();
95+
} else if (idObj instanceof String) {
96+
id = Long.parseLong((String) idObj);
97+
}
98+
99+
if (id != null) {
100+
if (mode.equalsIgnoreCase("bad")) {
101+
if (id == 999 || id == 888) {
102+
LOG.info("Crashing on id {} for table {} in mode {}", id, tableName, mode);
103+
throw new InvalidTransformationException("Simulated failure for id " + id);
104+
}
105+
} else if (mode.equalsIgnoreCase("semi-fixed")) {
106+
if (id == 888) {
107+
LOG.info("Crashing on id {} for table {} in mode {}", id, tableName, mode);
108+
throw new InvalidTransformationException("Simulated failure for id " + id);
109+
}
110+
}
111+
}
112+
return new MigrationTransformationResponse(responseRow, false);
113+
}
114+
115+
return new MigrationTransformationResponse(responseRow, false);
116+
}
117+
118+
@Override
119+
public MigrationTransformationResponse transformFailedSpannerMutation(
120+
MigrationTransformationRequest request) throws InvalidTransformationException {
121+
return new MigrationTransformationResponse(new HashMap<>(), false);
122+
}
123+
}

0 commit comments

Comments
 (0)