diff --git a/.gitignore b/.gitignore index 785cf8b936..e1b1a6e062 100644 --- a/.gitignore +++ b/.gitignore @@ -29,3 +29,6 @@ # Derby log **/derby.log* + +# build log +**/build.log* diff --git a/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/iowrapper/JdbcIoWrapper.java b/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/iowrapper/JdbcIoWrapper.java index 410e95dc7d..c4ea69b087 100644 --- a/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/iowrapper/JdbcIoWrapper.java +++ b/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/iowrapper/JdbcIoWrapper.java @@ -15,15 +15,22 @@ */ package com.google.cloud.teleport.v2.source.reader.io.jdbc.iowrapper; +import static com.google.cloud.teleport.v2.source.reader.io.schema.SchemaDiscoveryImpl.convertException; import static com.google.cloud.teleport.v2.source.reader.io.schema.SourceColumnIndexInfo.INDEX_TYPE_TO_CLASS; +import com.google.auto.value.AutoValue; import com.google.cloud.teleport.v2.source.reader.io.IoWrapper; import com.google.cloud.teleport.v2.source.reader.io.datasource.DataSource; import com.google.cloud.teleport.v2.source.reader.io.exception.SuitableIndexNotFoundException; +import com.google.cloud.teleport.v2.source.reader.io.jdbc.dialectadapter.DialectAdapter; import com.google.cloud.teleport.v2.source.reader.io.jdbc.iowrapper.config.JdbcIOWrapperConfig; +import com.google.cloud.teleport.v2.source.reader.io.jdbc.iowrapper.config.JdbcIoWrapperConfigGroup; import com.google.cloud.teleport.v2.source.reader.io.jdbc.iowrapper.config.TableConfig; import com.google.cloud.teleport.v2.source.reader.io.jdbc.rowmapper.JdbcSourceRowMapper; +import com.google.cloud.teleport.v2.source.reader.io.jdbc.uniformsplitter.DataSourceProvider; +import com.google.cloud.teleport.v2.source.reader.io.jdbc.uniformsplitter.DataSourceProviderImpl; import com.google.cloud.teleport.v2.source.reader.io.jdbc.uniformsplitter.range.PartitionColumn; +import com.google.cloud.teleport.v2.source.reader.io.jdbc.uniformsplitter.range.Range; import com.google.cloud.teleport.v2.source.reader.io.jdbc.uniformsplitter.range.TableIdentifier; import com.google.cloud.teleport.v2.source.reader.io.jdbc.uniformsplitter.range.TableReadSpecification; import com.google.cloud.teleport.v2.source.reader.io.jdbc.uniformsplitter.range.TableSplitSpecification; @@ -42,13 +49,20 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; +import java.sql.SQLException; import java.util.List; -import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.stream.Collectors; import org.apache.beam.sdk.io.jdbc.JdbcIO; import org.apache.beam.sdk.io.jdbc.JdbcIO.DataSourceConfiguration; import org.apache.beam.sdk.io.jdbc.JdbcIO.ReadWithPartitions; import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.transforms.Wait.OnSignal; +import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; import org.apache.commons.dbcp2.BasicDataSource; @@ -64,6 +78,9 @@ */ public final class JdbcIoWrapper implements IoWrapper { private static final Logger LOG = LoggerFactory.getLogger(JdbcIoWrapper.class); + // We parallelize to 4 threads to work well with the default launcher machine we get. + // With this for 1024 shards test we have the launcher complete in 4 minutes in the load test. + private static final int SOURCE_DISCOVERY_PARALLELISM = 4; private final ImmutableMap< ImmutableList, PTransform>> @@ -73,7 +90,17 @@ public final class JdbcIoWrapper implements IoWrapper { private static final Logger logger = LoggerFactory.getLogger(JdbcIoWrapper.class); /** - * Construct a JdbcIOWrapper from the configuration. + * Construct a JdbcIOWrapper from the configuration group. + * + *

This method performs schema discovery for all shards in the group. + * + *

Error Isolation: If schema discovery fails for any single shard in the group, this + * method will throw an exception, causing the job to fail-fast. This behavior ensures + * consistency: if the pipeline is successfully constructed, all required shards and tables are + * guaranteed to have been successfully discovered. + * + *

Retries: Individual shard discovery operations are automatically retried with + * exponential backoff as configured in the {@link JdbcIOWrapperConfig}. * * @param config configuration for reading from a JDBC source. * @return JdbcIOWrapper @@ -81,23 +108,104 @@ public final class JdbcIoWrapper implements IoWrapper { * column. Please refer to {@link JdbcIoWrapper#autoInferTableConfigs(JdbcIOWrapperConfig, * SchemaDiscovery, DataSource)} for details on situation where this is thrown. */ + /* Todo remove this function in subsequent PR for multishard graphsize support */ public static JdbcIoWrapper of(JdbcIOWrapperConfig config) throws SuitableIndexNotFoundException { - DataSourceConfiguration dataSourceConfiguration = getDataSourceConfiguration(config); - - javax.sql.DataSource dataSource = dataSourceConfiguration.buildDatasource(); - setDataSourceLoginTimeout((BasicDataSource) dataSource, config); + PerSourceDiscovery perSourceDiscovery = getPerSourceDiscovery(config); + ImmutableMap, PTransform>> + tableReaders = buildTableReaders(ImmutableList.of(perSourceDiscovery)); + return new JdbcIoWrapper(tableReaders, ImmutableList.of(perSourceDiscovery.sourceSchema())); + } - SchemaDiscovery schemaDiscovery = - new SchemaDiscoveryImpl(config.dialectAdapter(), config.schemaDiscoveryBackOff()); + /** + * Construct a JdbcIOWrapper from the configuration. + * + *

This method performs schema discovery for a single source source. + * + * @param configGroup configurations for reading from a JDBC source. + * @return JdbcIOWrapper + * @throws SuitableIndexNotFoundException if a suitable index is not found to act as the partition + * column. + */ + public static JdbcIoWrapper of(JdbcIoWrapperConfigGroup configGroup) + throws SuitableIndexNotFoundException { - ImmutableList tableConfigs = - autoInferTableConfigs(config, schemaDiscovery, DataSource.ofJdbc(dataSource)); - SourceSchema sourceSchema = - getSourceSchema(config, schemaDiscovery, DataSource.ofJdbc(dataSource), tableConfigs); + ImmutableList perSourceDiscoveries = getPerSourceDiscoveries(configGroup); ImmutableMap, PTransform>> - tableReaders = - buildTableReaders(config, tableConfigs, dataSourceConfiguration, sourceSchema); - return new JdbcIoWrapper(tableReaders, ImmutableList.of(sourceSchema)); + tableReaders = buildTableReaders(perSourceDiscoveries); + return new JdbcIoWrapper( + tableReaders, + perSourceDiscoveries.stream() + .map(e -> e.sourceSchema()) + .collect(ImmutableList.toImmutableList())); + } + + /** + * Executes isolated schema discovery and table inference for a group of shard configurations. + * + *

Discovery for each shard is performed in parallel to reduce startup latency. Failure in any + * shard discovery will lead to an overall failure of the discovery process (fail-fast). + * + * @param configGroup The group of configurations. + * @return A list of {@link PerSourceDiscovery} results. + */ + @VisibleForTesting + protected static ImmutableList getPerSourceDiscoveries( + JdbcIoWrapperConfigGroup configGroup) { + ExecutorService executor = Executors.newFixedThreadPool(SOURCE_DISCOVERY_PARALLELISM); + try { + List> futures = + configGroup.shardConfigs().stream() + .map(config -> executor.submit(() -> getPerSourceDiscovery(config))) + .collect(Collectors.toList()); + + ImmutableList.Builder discoveries = ImmutableList.builder(); + for (Future future : futures) { + try { + discoveries.add(future.get()); + } catch (InterruptedException | ExecutionException e) { + convertException(e); + } + } + return discoveries.build(); + } finally { + executor.shutdown(); + } + } + + /** + * Executes isolated schema discovery and table inference for a single JdbcIOWrapperConfig. + * + * @param config The configuration. + * @return A {@link PerSourceDiscovery} containing the results of the discovery. + */ + private static PerSourceDiscovery getPerSourceDiscovery(JdbcIOWrapperConfig config) { + PerSourceDiscovery.Builder perSourceDiscoveryBuilder = PerSourceDiscovery.builder(); + DataSourceConfiguration dataSourceConfiguration = + getDataSourceConfiguration( + config.toBuilder().setMaxConnections(SchemaDiscoveryImpl.getParallelism()).build()); + + BasicDataSource dataSource = (BasicDataSource) dataSourceConfiguration.buildDatasource(); + try { + setDataSourceLoginTimeout((BasicDataSource) dataSource, config); + SchemaDiscovery schemaDiscovery = + new SchemaDiscoveryImpl(config.dialectAdapter(), config.schemaDiscoveryBackOff()); + + ImmutableList tableConfigs = + autoInferTableConfigs(config, schemaDiscovery, DataSource.ofJdbc(dataSource)); + SourceSchema sourceSchema = + getSourceSchema(config, schemaDiscovery, DataSource.ofJdbc(dataSource), tableConfigs); + perSourceDiscoveryBuilder.setConfig(config); + perSourceDiscoveryBuilder.setDataSourceConfiguration(getDataSourceConfiguration(config)); + perSourceDiscoveryBuilder.setSourceSchema(sourceSchema); + perSourceDiscoveryBuilder.setTableConfigs(tableConfigs); + } finally { + try { + dataSource.close(); + } catch (SQLException e) { + LOG.warn("Exception while closing datasource {}", dataSource, e); + } + } + return perSourceDiscoveryBuilder.build(); } /** @@ -176,44 +284,71 @@ public ImmutableList discoverTableSchema() { return this.sourceSchema; } - static ImmutableMap< + /** + * Aggregates reader transforms from all provided source discoveries. + * + * @param perSourceDiscoveries List of discovery results for all shards. + * @return A map of table reference groups to their corresponding reader transforms. + */ + @VisibleForTesting + protected static ImmutableMap< + ImmutableList, PTransform>> + buildTableReaders(ImmutableList perSourceDiscoveries) { + return ImmutableMap + ., PTransform>>builder() + .putAll(getMultiTableReadWithUniformPartitionIO(perSourceDiscoveries)) + .putAll(getjdbcIOs(perSourceDiscoveries)) + .build(); + } + + /** + * Builds legacy {@link JdbcIO} read transforms for shards where uniform partitioning is disabled. + * + *

Ignores Configurations maped for readWithUniformPartitions. + * + * @param perSourceDiscoveries List of discovery results. + * @return A map of single-table references to legacy JdbcIO transforms. + */ + private static ImmutableMap< ImmutableList, PTransform>> - buildTableReaders( - JdbcIOWrapperConfig config, - ImmutableList tableConfigs, - DataSourceConfiguration dataSourceConfiguration, - SourceSchema sourceSchema) { - if (config.readWithUniformPartitionsFeatureEnabled() && !tableConfigs.isEmpty()) { - return getMultiTableReadWithUniformPartitionIO( - config, - dataSourceConfiguration, - sourceSchema.schemaReference(), - tableConfigs, - sourceSchema); + getjdbcIOs(ImmutableList perSourceDiscoveries) { + + ImmutableMap.Builder< + ImmutableList, PTransform>> + tableReadersBuilder = ImmutableMap.builder(); + for (PerSourceDiscovery perSourceDiscovery : perSourceDiscoveries) { + JdbcIOWrapperConfig config = perSourceDiscovery.config(); + ImmutableList tableConfigs = perSourceDiscovery.tableConfigs(); + DataSourceConfiguration dataSourceConfiguration = + perSourceDiscovery.dataSourceConfiguration(); + SourceSchema sourceSchema = perSourceDiscovery.sourceSchema(); + if (config.readWithUniformPartitionsFeatureEnabled() || tableConfigs.isEmpty()) { + continue; + } + tableConfigs.stream() + .forEach( + tableConfig -> { + SourceTableSchema sourceTableSchema = + findSourceTableSchema(sourceSchema, tableConfig); + int fetchSize = getFetchSize(config, tableConfig, sourceTableSchema); + SourceTableReference sourceTableReference = + SourceTableReference.builder() + .setSourceSchemaReference(sourceSchema.schemaReference()) + .setSourceTableName(delimitIdentifier(sourceTableSchema.tableName())) + .setSourceTableSchemaUUID(sourceTableSchema.tableSchemaUUID()) + .build(); + tableReadersBuilder.put( + ImmutableList.of(sourceTableReference), + getJdbcIO( + config, + dataSourceConfiguration, + sourceSchema.schemaReference(), + tableConfig, + sourceTableSchema, + fetchSize)); + }); } - return tableConfigs.stream() - .map( - tableConfig -> { - SourceTableSchema sourceTableSchema = - findSourceTableSchema(sourceSchema, tableConfig); - int fetchSize = getFetchSize(config, tableConfig, sourceTableSchema); - SourceTableReference sourceTableReference = - SourceTableReference.builder() - .setSourceSchemaReference(sourceSchema.schemaReference()) - .setSourceTableName(delimitIdentifier(sourceTableSchema.tableName())) - .setSourceTableSchemaUUID(sourceTableSchema.tableSchemaUUID()) - .build(); - return Map.entry( - ImmutableList.of(sourceTableReference), - getJdbcIO( - config, - dataSourceConfiguration, - sourceSchema.schemaReference(), - tableConfig, - sourceTableSchema, - fetchSize)); - }) - .collect(ImmutableMap.toImmutableMap(Map.Entry::getKey, Map.Entry::getValue)); + return tableReadersBuilder.build(); } private static int getFetchSize( @@ -493,22 +628,110 @@ private static PTransform> getJdbcIO( * @return a map with a single entry where the key is a list of all table references and the value * is the multi-table reader transform. */ - private static ImmutableMap< + /** + * Builds optimized {@link ReadWithUniformPartitions} transforms for shards where the feature is + * enabled. + * + *

Ignores Configurations maped for legacy JdbcIO. + * + * @param perSourceDiscoveries List of discovery results. + * @return A map of multi-table reference lists to RWUPT transforms. + */ + protected static ImmutableMap< ImmutableList, PTransform>> getMultiTableReadWithUniformPartitionIO( - JdbcIOWrapperConfig config, - DataSourceConfiguration dataSourceConfiguration, - SourceSchemaReference sourceSchemaReference, - ImmutableList tableConfigs, - SourceSchema sourceSchema) { + ImmutableList perSourceDiscoveries) { + ImmutableMap.Builder< + ImmutableList, PTransform>> + tableReadersBuilder = ImmutableMap.builder(); - ImmutableList.Builder splitSpecsBuilder = ImmutableList.builder(); - ImmutableMap.Builder> readSpecsBuilder = - ImmutableMap.builder(); - ImmutableList.Builder tableReferencesBuilder = ImmutableList.builder(); + if (perSourceDiscoveries.isEmpty()) { + return ImmutableMap.of(); + } + DialectAdapter dialectAdapter = perSourceDiscoveries.get(0).config().dialectAdapter(); + OnSignal waitOn = perSourceDiscoveries.get(0).config().waitOn(); + Integer dbParallelizationForSplitProcess = + perSourceDiscoveries.get(0).config().dbParallelizationForSplitProcess(); + Integer dbParallelizationForReads = + perSourceDiscoveries.get(0).config().dbParallelizationForReads(); + PTransform>>, ?> additionalOperationsOnRanges = + perSourceDiscoveries.get(0).config().additionalOperationsOnRanges(); + + /* Todo in subsequent PR for multishard graphsize support, pass this to table reader. */ + DataSourceProvider dataSourceProvider = getDataSourceProvider(perSourceDiscoveries); + for (PerSourceDiscovery perSourceDiscovery : perSourceDiscoveries) { + ImmutableList.Builder tableReferencesBuilder = ImmutableList.builder(); + ImmutableList.Builder splitSpecsBuilder = ImmutableList.builder(); + ImmutableMap.Builder> readSpecsBuilder = + ImmutableMap.builder(); + JdbcIOWrapperConfig config = perSourceDiscovery.config(); + DataSourceConfiguration dataSourceConfiguration = + perSourceDiscovery.dataSourceConfiguration(); + ImmutableList tableConfigs = perSourceDiscovery.tableConfigs(); + + if (!config.readWithUniformPartitionsFeatureEnabled() || tableConfigs.isEmpty()) { + continue; + } + accumulateSpecs( + perSourceDiscovery, tableReferencesBuilder, splitSpecsBuilder, readSpecsBuilder); + + ReadWithUniformPartitions readWithUniformPartitions = + ReadWithUniformPartitions.builder() + .setTableSplitSpecifications(splitSpecsBuilder.build()) + .setTableReadSpecifications(readSpecsBuilder.build()) + .setDataSourceProviderFn( + JdbcIO.PoolableDataSourceProvider.of(dataSourceConfiguration)) + .setDbAdapter(dialectAdapter) + .setWaitOn(waitOn) + .setDbParallelizationForSplitProcess(dbParallelizationForSplitProcess) + .setDbParallelizationForReads(dbParallelizationForReads) + .setAdditionalOperationsOnRanges(additionalOperationsOnRanges) + .build(); + + LOG.info( + "Configured Multi-Table ReadWithUniformPartitions {} for tables {} with config {}", + readWithUniformPartitions, + tableConfigs.stream().map(TableConfig::tableName).collect(Collectors.toList()), + config); + tableReadersBuilder.put(tableReferencesBuilder.build(), readWithUniformPartitions); + } + return tableReadersBuilder.build(); + } + /** + * Accumulates table-specific specifications (read specs, split specs, and table references) from + * a source discovery into the provided builders. + * + *

This method iterates over the table configurations in the {@link PerSourceDiscovery} and + * constructs: + * + *

+ * + * @param perSourceDiscovery The discovery results for a single source database. + * @param tableReferencesBuilder A builder to collect the resulting {@link SourceTableReference}s. + * @param splitSpecsBuilder A builder to collect the resulting {@link TableSplitSpecification}s. + * @param readSpecsBuilder A builder to collect the resulting {@link TableReadSpecification}s, + * mapped by {@link TableIdentifier}. + */ + /* Todo in subsequent PR for multishard graphsize support accumulate specs across a list of sourceDiscovereies */ + @VisibleForTesting + protected static void accumulateSpecs( + PerSourceDiscovery perSourceDiscovery, + ImmutableList.Builder tableReferencesBuilder, + ImmutableList.Builder splitSpecsBuilder, + ImmutableMap.Builder> readSpecsBuilder) { + + JdbcIOWrapperConfig config = perSourceDiscovery.config(); + SourceSchemaReference sourceSchemaReference = + perSourceDiscovery.sourceSchema().schemaReference(); + ImmutableList tableConfigs = perSourceDiscovery.tableConfigs(); for (TableConfig tableConfig : tableConfigs) { - SourceTableSchema sourceTableSchema = findSourceTableSchema(sourceSchema, tableConfig); + SourceTableSchema sourceTableSchema = + findSourceTableSchema(perSourceDiscovery.sourceSchema(), tableConfig); int fetchSize = getFetchSize(config, tableConfig, sourceTableSchema); TableIdentifier tableIdentifier = getTableIdentifier(tableConfig); @@ -550,26 +773,31 @@ private static PTransform> getJdbcIO( .setSourceTableSchemaUUID(sourceTableSchema.tableSchemaUUID()) .build()); } + } - ReadWithUniformPartitions readWithUniformPartitions = - ReadWithUniformPartitions.builder() - .setTableSplitSpecifications(splitSpecsBuilder.build()) - .setTableReadSpecifications(readSpecsBuilder.build()) - .setDataSourceProviderFn(JdbcIO.PoolableDataSourceProvider.of(dataSourceConfiguration)) - .setDbAdapter(config.dialectAdapter()) - .setWaitOn(config.waitOn()) - .setDbParallelizationForSplitProcess(config.dbParallelizationForSplitProcess()) - .setDbParallelizationForReads(config.dbParallelizationForReads()) - .setAdditionalOperationsOnRanges(config.additionalOperationsOnRanges()) - .build(); - - LOG.info( - "Configured Multi-Table ReadWithUniformPartitions {} for tables {} with config {}", - readWithUniformPartitions, - tableConfigs.stream().map(TableConfig::tableName).collect(Collectors.toList()), - config); - - return ImmutableMap.of(tableReferencesBuilder.build(), readWithUniformPartitions); + /** + * Creates a {@link DataSourceProvider} from a list of source discoveries. + * + *

Each discovery provides a {@link DataSourceConfiguration} which is used to create a poolable + * data source. The data sources are mapped by their corresponding configuration's unique ID. + * + * @param perSourceDiscoveries A list of discovery results for one or more source databases. + * @return A {@link DataSourceProvider} that can provide {@link javax.sql.DataSource}s for each + * source. + */ + @VisibleForTesting + protected static DataSourceProvider getDataSourceProvider( + ImmutableList perSourceDiscoveries) { + DataSourceProviderImpl.Builder datasourceProviderBuilder = DataSourceProviderImpl.builder(); + for (PerSourceDiscovery perSourceDiscovery : perSourceDiscoveries) { + DataSourceConfiguration dataSourceConfiguration = + perSourceDiscovery.dataSourceConfiguration(); + + SerializableFunction fn = + JdbcIO.PoolableDataSourceProvider.of(dataSourceConfiguration); + datasourceProviderBuilder.addDataSource(perSourceDiscovery.config().id(), fn); + } + return datasourceProviderBuilder.build(); } @VisibleForTesting @@ -612,4 +840,40 @@ private JdbcIoWrapper( this.tableReaders = tableReaders; this.sourceSchema = sourceSchema; } + + /** + * Value class that encapsulates all metadata discovered for a specific data source. + * + *

This includes the original configuration specific to the source, the table configurations, + * the discovered schema, and the {@link DataSourceConfiguration}. + */ + @AutoValue + abstract static class PerSourceDiscovery { + abstract JdbcIOWrapperConfig config(); + + abstract DataSourceConfiguration dataSourceConfiguration(); + + abstract ImmutableList tableConfigs(); + + abstract SourceSchema sourceSchema(); + + static Builder builder() { + return new AutoValue_JdbcIoWrapper_PerSourceDiscovery.Builder(); + } + + /** Builder for {@link PerSourceDiscovery}. */ + @AutoValue.Builder + public abstract static class Builder { + + public abstract Builder setConfig(JdbcIOWrapperConfig value); + + public abstract Builder setDataSourceConfiguration(DataSourceConfiguration value); + + public abstract Builder setTableConfigs(ImmutableList value); + + public abstract Builder setSourceSchema(SourceSchema value); + + public abstract PerSourceDiscovery build(); + } + } } diff --git a/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/schema/SchemaDiscoveryImpl.java b/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/schema/SchemaDiscoveryImpl.java index bf78a0a518..c1c1f1f4a3 100644 --- a/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/schema/SchemaDiscoveryImpl.java +++ b/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/schema/SchemaDiscoveryImpl.java @@ -137,8 +137,7 @@ public ImmutableMap> discoverTabl return result.build(); } - @VisibleForTesting - protected static void convertException(Exception e) { + public static void convertException(Exception e) { Throwable cause = e; while (cause != null) { if (cause instanceof SchemaDiscoveryException) { @@ -226,6 +225,10 @@ protected static List> partitionWork(List items, int batchSize) { return Lists.partition(items, batchSize); } + public static long getParallelism() { + return THREAD_POOL_SIZE; + } + private T doRetries(SchemaDiscoveryOperation operation) throws SchemaDiscoveryException { BackOff backoff = this.fluentBackoff.backoff(); diff --git a/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/transform/AccumulatingTableReader.java b/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/transform/AccumulatingTableReader.java index 5498bd9803..d3d47bc61a 100644 --- a/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/transform/AccumulatingTableReader.java +++ b/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/transform/AccumulatingTableReader.java @@ -110,14 +110,14 @@ private static PCollection getSourceTableReference( "CreateZeroCounts." + groupName, Create.of( tableReferences.stream() - .map(ref -> KV.of(ref.sourceTableName(), 0L)) + .map(ref -> KV.of(ref.sourceTableSchemaUUID(), 0L)) .collect(Collectors.toList()))) .setCoder(KvCoder.of(StringUtf8Coder.of(), VarLongCoder.of())); // 2. Actual counts from the reader. PCollection> actualCounts = groupRows - .apply("ExtractTableNames." + groupName, ParDo.of(new ExtractTableNameFn())) + .apply("ExtractTableNames." + groupName, ParDo.of(new ExtractTableIdFn())) .apply("CountPerTable." + groupName, Count.perElement()); // 3. Aggregate and Emit Completions. diff --git a/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/transform/ExtractTableNameFn.java b/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/transform/ExtractTableIdFn.java similarity index 82% rename from v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/transform/ExtractTableNameFn.java rename to v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/transform/ExtractTableIdFn.java index 506564547d..43f78f0f14 100644 --- a/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/transform/ExtractTableNameFn.java +++ b/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/transform/ExtractTableIdFn.java @@ -23,13 +23,9 @@ * DoFn to extract the table name from a {@link SourceRow} and delimit it to match the format used * in {@link SourceTableReference}. */ -class ExtractTableNameFn extends DoFn { +class ExtractTableIdFn extends DoFn { @ProcessElement public void processElement(@Element SourceRow row, OutputReceiver out) { - out.output(delimitIdentifier(row.tableName())); - } - - private String delimitIdentifier(String identifier) { - return "\"" + identifier.replaceAll("\"", "\"\"") + "\""; + out.output(row.tableSchemaUUID()); } } diff --git a/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/transform/GroupCompletionDoFn.java b/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/transform/GroupCompletionDoFn.java index 41e742d7be..2e602f175c 100644 --- a/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/transform/GroupCompletionDoFn.java +++ b/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/transform/GroupCompletionDoFn.java @@ -32,7 +32,7 @@ class GroupCompletionDoFn extends DoFn, SourceTableReference> { public GroupCompletionDoFn(ImmutableList tableReferences) { this.tableReferencesMap = tableReferences.stream() - .collect(Collectors.toMap(SourceTableReference::sourceTableName, ref -> ref)); + .collect(Collectors.toMap(SourceTableReference::sourceTableSchemaUUID, ref -> ref)); } @ProcessElement diff --git a/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/iowrapper/JdbcIoWrapperTest.java b/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/iowrapper/JdbcIoWrapperTest.java index c2732b0c34..d6b4c2fc47 100644 --- a/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/iowrapper/JdbcIoWrapperTest.java +++ b/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/iowrapper/JdbcIoWrapperTest.java @@ -21,20 +21,29 @@ import static org.junit.Assert.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.lenient; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.mockConstruction; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import com.google.cloud.teleport.v2.source.reader.auth.dbauth.LocalCredentialsProvider; import com.google.cloud.teleport.v2.source.reader.io.exception.RetriableSchemaDiscoveryException; +import com.google.cloud.teleport.v2.source.reader.io.exception.SchemaDiscoveryException; import com.google.cloud.teleport.v2.source.reader.io.exception.SuitableIndexNotFoundException; import com.google.cloud.teleport.v2.source.reader.io.jdbc.JdbcSchemaReference; import com.google.cloud.teleport.v2.source.reader.io.jdbc.dialectadapter.DialectAdapter; import com.google.cloud.teleport.v2.source.reader.io.jdbc.iowrapper.config.JdbcIOWrapperConfig; +import com.google.cloud.teleport.v2.source.reader.io.jdbc.iowrapper.config.JdbcIoWrapperConfigGroup; import com.google.cloud.teleport.v2.source.reader.io.jdbc.iowrapper.config.SQLDialect; import com.google.cloud.teleport.v2.source.reader.io.jdbc.iowrapper.config.TableConfig; +import com.google.cloud.teleport.v2.source.reader.io.jdbc.uniformsplitter.DataSourceProvider; +import com.google.cloud.teleport.v2.source.reader.io.jdbc.uniformsplitter.range.PartitionColumn; import com.google.cloud.teleport.v2.source.reader.io.jdbc.uniformsplitter.range.TableIdentifier; +import com.google.cloud.teleport.v2.source.reader.io.jdbc.uniformsplitter.range.TableReadSpecification; +import com.google.cloud.teleport.v2.source.reader.io.jdbc.uniformsplitter.range.TableSplitSpecification; import com.google.cloud.teleport.v2.source.reader.io.jdbc.uniformsplitter.transforms.ReadWithUniformPartitions; import com.google.cloud.teleport.v2.source.reader.io.row.SourceRow; import com.google.cloud.teleport.v2.source.reader.io.schema.SourceColumnIndexInfo; @@ -59,6 +68,8 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.Mock; +import org.mockito.MockedConstruction; +import org.mockito.Mockito; import org.mockito.junit.MockitoJUnitRunner; /** Test class for {@link JdbcIoWrapper}. */ @@ -108,18 +119,21 @@ public void testJdbcIoWrapperBasic() throws RetriableSchemaDiscoveryException { .thenReturn(ImmutableMap.of("testTable", ImmutableMap.of(testCol, testColType))); JdbcIoWrapper jdbcIoWrapper = JdbcIoWrapper.of( - JdbcIOWrapperConfig.builderWithMySqlDefaults() - .setSourceDbURL("jdbc:derby://myhost/memory:TestingDB;create=true") - .setSourceSchemaReference(testSourceSchemaReference) - .setShardID("test") - .setDbAuth( - LocalCredentialsProvider.builder() - .setUserName("testUser") - .setPassword("testPassword") + JdbcIoWrapperConfigGroup.builder() + .addShardConfig( + JdbcIOWrapperConfig.builderWithMySqlDefaults() + .setSourceDbURL("jdbc:derby://myhost/memory:TestingDB;create=true") + .setSourceSchemaReference(testSourceSchemaReference) + .setShardID("test") + .setDbAuth( + LocalCredentialsProvider.builder() + .setUserName("testUser") + .setPassword("testPassword") + .build()) + .setJdbcDriverJars("") + .setJdbcDriverClassName("org.apache.derby.jdbc.EmbeddedDriver") + .setDialectAdapter(mockDialectAdapter) .build()) - .setJdbcDriverJars("") - .setJdbcDriverClassName("org.apache.derby.jdbc.EmbeddedDriver") - .setDialectAdapter(mockDialectAdapter) .build()); SourceSchema sourceSchema = jdbcIoWrapper.discoverTableSchema().get(0); assertThat(sourceSchema.schemaReference()).isEqualTo(testSourceSchemaReference); @@ -159,19 +173,23 @@ public void testJdbcIoWrapperWithoutInference() throws RetriableSchemaDiscoveryE .thenReturn(ImmutableMap.of("testTable", ImmutableMap.of(testCol, testColType))); JdbcIoWrapper jdbcIoWrapper = JdbcIoWrapper.of( - JdbcIOWrapperConfig.builderWithMySqlDefaults() - .setSourceDbURL("jdbc:derby://myhost/memory:TestingDB;create=true") - .setSourceSchemaReference(testSourceSchemaReference) - .setShardID("test") - .setTableVsPartitionColumns(ImmutableMap.of("testTable", ImmutableList.of("ID"))) - .setDbAuth( - LocalCredentialsProvider.builder() - .setUserName("testUser") - .setPassword("testPassword") + JdbcIoWrapperConfigGroup.builder() + .addShardConfig( + JdbcIOWrapperConfig.builderWithMySqlDefaults() + .setSourceDbURL("jdbc:derby://myhost/memory:TestingDB;create=true") + .setSourceSchemaReference(testSourceSchemaReference) + .setShardID("test") + .setTableVsPartitionColumns( + ImmutableMap.of("testTable", ImmutableList.of("ID"))) + .setDbAuth( + LocalCredentialsProvider.builder() + .setUserName("testUser") + .setPassword("testPassword") + .build()) + .setJdbcDriverJars("") + .setJdbcDriverClassName("org.apache.derby.jdbc.EmbeddedDriver") + .setDialectAdapter(mockDialectAdapter) .build()) - .setJdbcDriverJars("") - .setJdbcDriverClassName("org.apache.derby.jdbc.EmbeddedDriver") - .setDialectAdapter(mockDialectAdapter) .build()); SourceSchema sourceSchema = jdbcIoWrapper.discoverTableSchema().get(0); assertThat(sourceSchema.schemaReference()).isEqualTo(testSourceSchemaReference); @@ -217,18 +235,21 @@ public void testJdbcIoWrapperNoPrimaryKeyExistsOnTable() SuitableIndexNotFoundException.class, () -> JdbcIoWrapper.of( - JdbcIOWrapperConfig.builderWithMySqlDefaults() - .setSourceDbURL("jdbc:derby://myhost/memory:TestingDB;create=true") - .setSourceSchemaReference(testSourceSchemaReference) - .setShardID("test") - .setDbAuth( - LocalCredentialsProvider.builder() - .setUserName("testUser") - .setPassword("testPassword") + JdbcIoWrapperConfigGroup.builder() + .addShardConfig( + JdbcIOWrapperConfig.builderWithMySqlDefaults() + .setSourceDbURL("jdbc:derby://myhost/memory:TestingDB;create=true") + .setSourceSchemaReference(testSourceSchemaReference) + .setShardID("test") + .setDbAuth( + LocalCredentialsProvider.builder() + .setUserName("testUser") + .setPassword("testPassword") + .build()) + .setJdbcDriverJars("") + .setJdbcDriverClassName("org.apache.derby.jdbc.EmbeddedDriver") + .setDialectAdapter(mockDialectAdapter) .build()) - .setJdbcDriverJars("") - .setJdbcDriverClassName("org.apache.derby.jdbc.EmbeddedDriver") - .setDialectAdapter(mockDialectAdapter) .build())); } @@ -262,36 +283,42 @@ public void testJdbcIoWrapperNoIndexException() throws RetriableSchemaDiscoveryE SuitableIndexNotFoundException.class, () -> JdbcIoWrapper.of( - JdbcIOWrapperConfig.builderWithMySqlDefaults() - .setSourceDbURL("jdbc:derby://myhost/memory:TestingDB;create=true") - .setSourceSchemaReference(testSourceSchemaReference) - .setShardID("test") - .setDbAuth( - LocalCredentialsProvider.builder() - .setUserName("testUser") - .setPassword("testPassword") + JdbcIoWrapperConfigGroup.builder() + .addShardConfig( + JdbcIOWrapperConfig.builderWithMySqlDefaults() + .setSourceDbURL("jdbc:derby://myhost/memory:TestingDB;create=true") + .setSourceSchemaReference(testSourceSchemaReference) + .setShardID("test") + .setDbAuth( + LocalCredentialsProvider.builder() + .setUserName("testUser") + .setPassword("testPassword") + .build()) + .setJdbcDriverJars("") + .setJdbcDriverClassName("org.apache.derby.jdbc.EmbeddedDriver") + .setDialectAdapter(mockDialectAdapter) .build()) - .setJdbcDriverJars("") - .setJdbcDriverClassName("org.apache.derby.jdbc.EmbeddedDriver") - .setDialectAdapter(mockDialectAdapter) .build())); /* No Numeric Index on table */ assertThrows( SuitableIndexNotFoundException.class, () -> JdbcIoWrapper.of( - JdbcIOWrapperConfig.builderWithMySqlDefaults() - .setSourceDbURL("jdbc:derby://myhost/memory:TestingDB;create=true") - .setSourceSchemaReference(testSourceSchemaReference) - .setShardID("test") - .setDbAuth( - LocalCredentialsProvider.builder() - .setUserName("testUser") - .setPassword("testPassword") + JdbcIoWrapperConfigGroup.builder() + .addShardConfig( + JdbcIOWrapperConfig.builderWithMySqlDefaults() + .setSourceDbURL("jdbc:derby://myhost/memory:TestingDB;create=true") + .setSourceSchemaReference(testSourceSchemaReference) + .setShardID("test") + .setDbAuth( + LocalCredentialsProvider.builder() + .setUserName("testUser") + .setPassword("testPassword") + .build()) + .setJdbcDriverJars("") + .setJdbcDriverClassName("org.apache.derby.jdbc.EmbeddedDriver") + .setDialectAdapter(mockDialectAdapter) .build()) - .setJdbcDriverJars("") - .setJdbcDriverClassName("org.apache.derby.jdbc.EmbeddedDriver") - .setDialectAdapter(mockDialectAdapter) .build())); } @@ -324,19 +351,22 @@ public void testJdbcIoWrapperDifferentTables() throws RetriableSchemaDiscoveryEx .thenReturn(ImmutableMap.of("testTable", ImmutableMap.of(testCol, testColType))); JdbcIoWrapper jdbcIoWrapper = JdbcIoWrapper.of( - JdbcIOWrapperConfig.builderWithMySqlDefaults() - .setSourceDbURL("jdbc:derby://myhost/memory:TestingDB;create=true") - .setSourceSchemaReference(testSourceSchemaReference) - .setShardID("test") - .setDbAuth( - LocalCredentialsProvider.builder() - .setUserName("testUser") - .setPassword("testPassword") + JdbcIoWrapperConfigGroup.builder() + .addShardConfig( + JdbcIOWrapperConfig.builderWithMySqlDefaults() + .setSourceDbURL("jdbc:derby://myhost/memory:TestingDB;create=true") + .setSourceSchemaReference(testSourceSchemaReference) + .setShardID("test") + .setDbAuth( + LocalCredentialsProvider.builder() + .setUserName("testUser") + .setPassword("testPassword") + .build()) + .setJdbcDriverJars("") + .setJdbcDriverClassName("org.apache.derby.jdbc.EmbeddedDriver") + .setDialectAdapter(mockDialectAdapter) + .setTables(ImmutableList.of("spanner_table")) .build()) - .setJdbcDriverJars("") - .setJdbcDriverClassName("org.apache.derby.jdbc.EmbeddedDriver") - .setDialectAdapter(mockDialectAdapter) - .setTables(ImmutableList.of("spanner_table")) .build()); ImmutableMap, PTransform>> tableReaders = jdbcIoWrapper.getTableReaders(); @@ -417,8 +447,12 @@ public void testReadWithUniformPartitionFeatureFlag() throws RetriableSchemaDisc configWithFeatureEnabled.toBuilder() .setReadWithUniformPartitionsFeatureEnabled(false) .build(); - JdbcIoWrapper jdbcIOWrapperWithFeatureEnabled = JdbcIoWrapper.of(configWithFeatureEnabled); - JdbcIoWrapper jdbcIOWrapperWithFeatureDisabled = JdbcIoWrapper.of(configWithFeatureDisabled); + JdbcIoWrapper jdbcIOWrapperWithFeatureEnabled = + JdbcIoWrapper.of( + JdbcIoWrapperConfigGroup.builder().addShardConfig(configWithFeatureEnabled).build()); + JdbcIoWrapper jdbcIOWrapperWithFeatureDisabled = + JdbcIoWrapper.of( + JdbcIoWrapperConfigGroup.builder().addShardConfig(configWithFeatureDisabled).build()); assertThat( jdbcIOWrapperWithFeatureDisabled.getTableReaders().values().stream().findFirst().get()) .isInstanceOf(JdbcIO.ReadWithPartitions.class); @@ -428,11 +462,19 @@ public void testReadWithUniformPartitionFeatureFlag() throws RetriableSchemaDisc // We test that setting the fetch size works for both modes. The more detailed testing of the // fetch size getting applied to JdbcIO is covered in {@link ReadWithUniformPartitionTest} assertThat( - JdbcIoWrapper.of(configWithFeatureEnabled.toBuilder().setMaxFetchSize(42).build()) + JdbcIoWrapper.of( + JdbcIoWrapperConfigGroup.builder() + .addShardConfig( + configWithFeatureEnabled.toBuilder().setMaxFetchSize(42).build()) + .build()) .getTableReaders()) .hasSize(1); assertThat( - JdbcIoWrapper.of(configWithFeatureDisabled.toBuilder().setMaxFetchSize(42).build()) + JdbcIoWrapper.of( + JdbcIoWrapperConfigGroup.builder() + .addShardConfig( + configWithFeatureDisabled.toBuilder().setMaxFetchSize(42).build()) + .build()) .getTableReaders()) .hasSize(1); } @@ -494,7 +536,8 @@ public void testReadWithUniformPartitionMultiTable() throws RetriableSchemaDisco .setTables(ImmutableList.of("table1", "table2")) .build(); - JdbcIoWrapper jdbcIoWrapper = JdbcIoWrapper.of(config); + JdbcIoWrapper jdbcIoWrapper = + JdbcIoWrapper.of(JdbcIoWrapperConfigGroup.builder().addShardConfig(config).build()); ImmutableMap, PTransform>> tableReaders = jdbcIoWrapper.getTableReaders(); @@ -553,7 +596,8 @@ public void testReadWithUniformPartitionConfigOverrides() .setDialectAdapter(mockDialectAdapter) .build(); - JdbcIoWrapper jdbcIoWrapper = JdbcIoWrapper.of(config); + JdbcIoWrapper jdbcIoWrapper = + JdbcIoWrapper.of(JdbcIoWrapperConfigGroup.builder().addShardConfig(config).build()); assertThat(jdbcIoWrapper.getTableReaders()).hasSize(1); } @@ -600,7 +644,8 @@ public void testGetJdbcIOWithMaxPartitions() throws RetriableSchemaDiscoveryExce .setDialectAdapter(mockDialectAdapter) .build(); - JdbcIoWrapper jdbcIoWrapper = JdbcIoWrapper.of(config); + JdbcIoWrapper jdbcIoWrapper = + JdbcIoWrapper.of(JdbcIoWrapperConfigGroup.builder().addShardConfig(config).build()); assertThat(jdbcIoWrapper.getTableReaders()).hasSize(1); assertThat(jdbcIoWrapper.getTableReaders().values().iterator().next()) .isInstanceOf(JdbcIO.ReadWithPartitions.class); @@ -774,4 +819,763 @@ public void testGetTableIdentifier() { assertThat(tableIdentifier.tableName()).isEqualTo("\"testTable\""); assertThat(tableIdentifier.dataSourceId()).isEqualTo("testDataSource"); } + + @Test + public void testBuildTableReaders() { + SourceSchemaReference schemaRef = + SourceSchemaReference.ofJdbc(JdbcSchemaReference.builder().setDbName("testDB").build()); + String colName = "ID"; + SourceColumnType colType = new SourceColumnType("INTEGER", new Long[] {}, null); + + // 1. Legacy Source 1: 0 table configs + JdbcIOWrapperConfig legacyConfig1 = + JdbcIOWrapperConfig.builderWithMySqlDefaults() + .setSourceDbURL("jdbc:mysql://localhost/test") + .setDbAuth( + LocalCredentialsProvider.builder().setUserName("user").setPassword("pass").build()) + .setJdbcDriverJars("") + .setJdbcDriverClassName("com.mysql.cj.jdbc.Driver") + .setReadWithUniformPartitionsFeatureEnabled(false) + .setSourceSchemaReference(schemaRef) + .build(); + JdbcIoWrapper.PerSourceDiscovery legacyDiscovery1 = + JdbcIoWrapper.PerSourceDiscovery.builder() + .setConfig(legacyConfig1) + .setDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create("driver", "url")) + .setTableConfigs(ImmutableList.of()) + .setSourceSchema(SourceSchema.builder().setSchemaReference(schemaRef).build()) + .build(); + + // 2. Legacy Source 2: 1 table config + JdbcIOWrapperConfig legacyConfig2 = + legacyConfig1.toBuilder() + .setId(JdbcIOWrapperConfig.generateId()) + .setShardID("shard2") + .build(); + TableConfig tableConfig2 = + TableConfig.builder("t2") + .setDataSourceId(legacyConfig2.id()) + .withPartitionColum( + PartitionColumn.builder().setColumnName("ID").setColumnClass(Long.class).build()) + .build(); + SourceTableSchema tableSchema2 = + SourceTableSchema.builder(SQLDialect.MYSQL) + .setTableName("t2") + .setEstimatedRowSize(100L) + .addSourceColumnNameToSourceColumnType(colName, colType) + .build(); + JdbcIoWrapper.PerSourceDiscovery legacyDiscovery2 = + JdbcIoWrapper.PerSourceDiscovery.builder() + .setConfig(legacyConfig2) + .setDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create("driver", "url")) + .setTableConfigs(ImmutableList.of(tableConfig2)) + .setSourceSchema( + SourceSchema.builder() + .setSchemaReference(schemaRef) + .addTableSchema(tableSchema2) + .build()) + .build(); + + // 3. Legacy Source 3: 2 table configs + JdbcIOWrapperConfig legacyConfig3 = + legacyConfig1.toBuilder() + .setId(JdbcIOWrapperConfig.generateId()) + .setShardID("shard3") + .build(); + TableConfig tableConfig3a = + TableConfig.builder("t3a") + .setDataSourceId(legacyConfig3.id()) + .withPartitionColum( + PartitionColumn.builder().setColumnName("ID").setColumnClass(Long.class).build()) + .build(); + TableConfig tableConfig3b = + TableConfig.builder("t3b") + .setDataSourceId(legacyConfig3.id()) + .withPartitionColum( + PartitionColumn.builder().setColumnName("ID").setColumnClass(Long.class).build()) + .build(); + SourceTableSchema tableSchema3a = + SourceTableSchema.builder(SQLDialect.MYSQL) + .setTableName("t3a") + .setEstimatedRowSize(100L) + .addSourceColumnNameToSourceColumnType(colName, colType) + .build(); + SourceTableSchema tableSchema3b = + SourceTableSchema.builder(SQLDialect.MYSQL) + .setTableName("t3b") + .setEstimatedRowSize(100L) + .addSourceColumnNameToSourceColumnType(colName, colType) + .build(); + JdbcIoWrapper.PerSourceDiscovery legacyDiscovery3 = + JdbcIoWrapper.PerSourceDiscovery.builder() + .setConfig(legacyConfig3) + .setDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create("driver", "url")) + .setTableConfigs(ImmutableList.of(tableConfig3a, tableConfig3b)) + .setSourceSchema( + SourceSchema.builder() + .setSchemaReference(schemaRef) + .addTableSchema(tableSchema3a) + .addTableSchema(tableSchema3b) + .build()) + .build(); + + // 4. Uniform Source 4: 0 table configs + JdbcIOWrapperConfig uniformConfig4 = + legacyConfig1.toBuilder() + .setId(JdbcIOWrapperConfig.generateId()) + .setReadWithUniformPartitionsFeatureEnabled(true) + .build(); + JdbcIoWrapper.PerSourceDiscovery uniformDiscovery4 = + JdbcIoWrapper.PerSourceDiscovery.builder() + .setConfig(uniformConfig4) + .setDataSourceConfiguration(legacyDiscovery1.dataSourceConfiguration()) + .setTableConfigs(legacyDiscovery1.tableConfigs()) + .setSourceSchema(legacyDiscovery1.sourceSchema()) + .build(); + + // 5. Uniform Source 5: 1 table config + JdbcIOWrapperConfig uniformConfig5 = + uniformConfig4.toBuilder() + .setId(JdbcIOWrapperConfig.generateId()) + .setShardID("shard5") + .build(); + TableConfig tableConfig5 = + TableConfig.builder("t5").setDataSourceId(uniformConfig5.id()).build(); + SourceTableSchema tableSchema5 = + SourceTableSchema.builder(SQLDialect.MYSQL) + .setTableName("t5") + .setEstimatedRowSize(100L) + .addSourceColumnNameToSourceColumnType(colName, colType) + .build(); + JdbcIoWrapper.PerSourceDiscovery uniformDiscovery5 = + JdbcIoWrapper.PerSourceDiscovery.builder() + .setConfig(uniformConfig5) + .setDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create("driver", "url")) + .setTableConfigs(ImmutableList.of(tableConfig5)) + .setSourceSchema( + SourceSchema.builder() + .setSchemaReference(schemaRef) + .addTableSchema(tableSchema5) + .build()) + .build(); + + // 6. Uniform Source 6: 2 table configs + JdbcIOWrapperConfig uniformConfig6 = + uniformConfig4.toBuilder() + .setId(JdbcIOWrapperConfig.generateId()) + .setShardID("shard6") + .build(); + TableConfig tableConfig6a = + TableConfig.builder("t6a").setDataSourceId(uniformConfig6.id()).build(); + TableConfig tableConfig6b = + TableConfig.builder("t6b").setDataSourceId(uniformConfig6.id()).build(); + SourceTableSchema tableSchema6a = + SourceTableSchema.builder(SQLDialect.MYSQL) + .setTableName("t6a") + .setEstimatedRowSize(100L) + .addSourceColumnNameToSourceColumnType(colName, colType) + .build(); + SourceTableSchema tableSchema6b = + SourceTableSchema.builder(SQLDialect.MYSQL) + .setTableName("t6b") + .setEstimatedRowSize(100L) + .addSourceColumnNameToSourceColumnType(colName, colType) + .build(); + JdbcIoWrapper.PerSourceDiscovery uniformDiscovery6 = + JdbcIoWrapper.PerSourceDiscovery.builder() + .setConfig(uniformConfig6) + .setDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create("driver", "url")) + .setTableConfigs(ImmutableList.of(tableConfig6a, tableConfig6b)) + .setSourceSchema( + SourceSchema.builder() + .setSchemaReference(schemaRef) + .addTableSchema(tableSchema6a) + .addTableSchema(tableSchema6b) + .build()) + .build(); + + ImmutableList discoveries = + ImmutableList.of( + legacyDiscovery1, + legacyDiscovery2, + legacyDiscovery3, + uniformDiscovery4, + uniformDiscovery5, + uniformDiscovery6); + + // Act + ImmutableMap, PTransform>> + tableReaders = JdbcIoWrapper.buildTableReaders(discoveries); + + // Assert + // Legacy: shard2.t2 (1), shard3.t3a (1), shard3.t3b (1) = 3 transforms + // Uniform: shard5 (1), shard6 (1) = 2 transforms + // Total = 5 entries + assertThat(tableReaders).hasSize(5); + + // Verify Legacy readers + long legacyCount = + tableReaders.values().stream().filter(v -> v instanceof JdbcIO.ReadWithPartitions).count(); + assertThat(legacyCount).isEqualTo(3); + + // Verify Uniform readers + long uniformCount = + tableReaders.values().stream().filter(v -> v instanceof ReadWithUniformPartitions).count(); + assertThat(uniformCount).isEqualTo(2); + + // Verify table names in keys + java.util.List allTableNames = + tableReaders.keySet().stream() + .flatMap(java.util.List::stream) + .map(SourceTableReference::sourceTableName) + .collect(Collectors.toList()); + assertThat(allTableNames) + .containsExactly("\"t2\"", "\"t3a\"", "\"t3b\"", "\"t5\"", "\"t6a\"", "\"t6b\""); + } + + @Test + public void testGetPerSourceDiscoveries() throws RetriableSchemaDiscoveryException { + SourceSchemaReference schemaRef = + SourceSchemaReference.ofJdbc(JdbcSchemaReference.builder().setDbName("testDB").build()); + JdbcIOWrapperConfig shardConfig1 = + JdbcIOWrapperConfig.builderWithMySqlDefaults() + .setSourceDbURL("jdbc:derby://myhost/memory:db1;create=true") + .setSourceSchemaReference(schemaRef) + .setShardID("shard1") + .setDbAuth(LocalCredentialsProvider.builder().setUserName("u").setPassword("p").build()) + .setJdbcDriverJars("") + .setJdbcDriverClassName("org.apache.derby.jdbc.EmbeddedDriver") + .setDialectAdapter(mockDialectAdapter) + .setTableVsPartitionColumns(ImmutableMap.of("t1", ImmutableList.of("id"))) + .build(); + JdbcIOWrapperConfig shardConfig2 = + shardConfig1.toBuilder() + .setSourceDbURL("jdbc:derby://myhost/memory:db2;create=true") + .setShardID("shard2") + .build(); + + JdbcIoWrapperConfigGroup group = + JdbcIoWrapperConfigGroup.builder() + .setSourceDbDialect(SQLDialect.MYSQL) + .addShardConfig(shardConfig1) + .addShardConfig(shardConfig2) + .build(); + + // Mock discovery for both shards + when(mockDialectAdapter.discoverTables(any(), (SourceSchemaReference) any())) + .thenReturn(ImmutableList.of("t1")); + when(mockDialectAdapter.discoverTableIndexes(any(), (SourceSchemaReference) any(), any())) + .thenReturn( + ImmutableMap.of( + "t1", + ImmutableList.of( + SourceColumnIndexInfo.builder() + .setColumnName("id") + .setIndexType(IndexType.NUMERIC) + .setOrdinalPosition(1) + .setIndexName("PRIMARY") + .setIsPrimary(true) + .setCardinality(10L) + .setIsUnique(true) + .build()))); + when(mockDialectAdapter.discoverTableSchema(any(), (SourceSchemaReference) any(), any())) + .thenReturn( + ImmutableMap.of( + "t1", ImmutableMap.of("id", new SourceColumnType("INT", new Long[] {}, null)))); + + ImmutableList discoveries = + JdbcIoWrapper.getPerSourceDiscoveries(group); + + assertThat(discoveries).hasSize(2); + // Note: parallelStream order is not guaranteed. + // We check containment to be safe. + assertThat(discoveries.stream().map(d -> d.config().shardID()).collect(Collectors.toList())) + .containsAnyIn(ImmutableList.of("shard1", "shard2")); + assertThat(discoveries.stream().map(d -> d.config().shardID()).collect(Collectors.toList())) + .containsNoDuplicates(); + } + + @Test + public void testGetPerSourceDiscoveries_Fails() throws RetriableSchemaDiscoveryException { + SourceSchemaReference schemaRef = + SourceSchemaReference.ofJdbc(JdbcSchemaReference.builder().setDbName("testDB").build()); + JdbcIOWrapperConfig shardConfig1 = + JdbcIOWrapperConfig.builderWithMySqlDefaults() + .setSourceDbURL("jdbc:derby://myhost/memory:db1;create=true") + .setSourceSchemaReference(schemaRef) + .setShardID("shard1") + .setDbAuth(LocalCredentialsProvider.builder().setUserName("u").setPassword("p").build()) + .setJdbcDriverJars("") + .setJdbcDriverClassName("org.apache.derby.jdbc.EmbeddedDriver") + .setDialectAdapter(mockDialectAdapter) + .build(); + + JdbcIoWrapperConfigGroup group = + JdbcIoWrapperConfigGroup.builder() + .setSourceDbDialect(SQLDialect.MYSQL) + .addShardConfig(shardConfig1) + .build(); + + when(mockDialectAdapter.discoverTables(any(), (SourceSchemaReference) any())) + .thenThrow(new RuntimeException("Test Exception")); + + assertThrows(RuntimeException.class, () -> JdbcIoWrapper.getPerSourceDiscoveries(group)); + } + + @Test + public void testGetPerSourceDiscovery_LogsWarning_WhenCloseFails() throws Exception { + SourceSchemaReference schemaRef = + SourceSchemaReference.ofJdbc(JdbcSchemaReference.builder().setDbName("testDB").build()); + JdbcIOWrapperConfig shardConfig1 = + JdbcIOWrapperConfig.builderWithMySqlDefaults() + .setSourceDbURL("jdbc:derby://myhost/memory:db1;create=true") + .setSourceSchemaReference(schemaRef) + .setShardID("shard1") + .setDbAuth(LocalCredentialsProvider.builder().setUserName("u").setPassword("p").build()) + .setJdbcDriverJars("") + .setJdbcDriverClassName("org.apache.derby.jdbc.EmbeddedDriver") + .setDialectAdapter(mockDialectAdapter) + .build(); + + JdbcIoWrapperConfigGroup group = + JdbcIoWrapperConfigGroup.builder() + .setSourceDbDialect(SQLDialect.MYSQL) + .addShardConfig(shardConfig1) + .build(); + + // Mock successful discovery so it reaches the finally block + when(mockDialectAdapter.discoverTables(any(), (SourceSchemaReference) any())) + .thenReturn(ImmutableList.of("t1")); + when(mockDialectAdapter.discoverTableIndexes(any(), (SourceSchemaReference) any(), any())) + .thenReturn( + ImmutableMap.of( + "t1", + ImmutableList.of( + SourceColumnIndexInfo.builder() + .setIndexType(IndexType.NUMERIC) + .setIndexName("PRIMARY") + .setIsPrimary(true) + .setCardinality(42L) + .setColumnName("ID") + .setIsUnique(true) + .setOrdinalPosition(1) + .build()))); + when(mockDialectAdapter.discoverTableSchema(any(), (SourceSchemaReference) any(), any())) + .thenReturn( + ImmutableMap.of( + "t1", ImmutableMap.of("ID", new SourceColumnType("INTEGER", new Long[] {}, null)))); + + try (MockedConstruction mocked = + mockConstruction( + BasicDataSource.class, + (mock, context) -> { + doThrow(new SQLException("Close Failure")).when(mock).close(); + })) { + // Act + ImmutableList result = + JdbcIoWrapper.getPerSourceDiscoveries(group); + + // Assert + assertThat(result).hasSize(1); + // The SQLException is caught and logged, so it shouldn't propagate. + } + } + + @Test + public void testGetPerSourceDiscovery_SuitableIndexNotFoundException() + throws RetriableSchemaDiscoveryException { + SourceSchemaReference schemaRef = + SourceSchemaReference.ofJdbc(JdbcSchemaReference.builder().setDbName("testDB").build()); + JdbcIOWrapperConfig shardConfig = + JdbcIOWrapperConfig.builderWithMySqlDefaults() + .setSourceDbURL("jdbc:derby://myhost/memory:db1;create=true") + .setSourceSchemaReference(schemaRef) + .setShardID("shard1") + .setDbAuth(LocalCredentialsProvider.builder().setUserName("u").setPassword("p").build()) + .setJdbcDriverJars("") + .setJdbcDriverClassName("org.apache.derby.jdbc.EmbeddedDriver") + .setDialectAdapter(mockDialectAdapter) + .build(); + + JdbcIoWrapperConfigGroup group = + JdbcIoWrapperConfigGroup.builder() + .setSourceDbDialect(SQLDialect.MYSQL) + .addShardConfig(shardConfig) + .build(); + + // Force SuitableIndexNotFoundException during autoInferTableConfigs + when(mockDialectAdapter.discoverTables(any(), (SourceSchemaReference) any())) + .thenReturn(ImmutableList.of("t1")); + when(mockDialectAdapter.discoverTableIndexes(any(), (SourceSchemaReference) any(), any())) + .thenReturn(ImmutableMap.of()); // Empty indexes triggers exception + + assertThrows( + SuitableIndexNotFoundException.class, () -> JdbcIoWrapper.getPerSourceDiscoveries(group)); + } + + /** + * Tests that {@link JdbcIoWrapper#getPerSourceDiscoveries} handles InterruptedException during + * parallel discovery. + */ + @Test + public void testGetPerSourceDiscovery_InterruptedException() throws Exception { + JdbcIoWrapperConfigGroup mockGroup = Mockito.mock(JdbcIoWrapperConfigGroup.class); + JdbcIOWrapperConfig mockConfig = Mockito.mock(JdbcIOWrapperConfig.class); + when(mockGroup.shardConfigs()).thenReturn(ImmutableList.of(mockConfig)); + + try (org.mockito.MockedStatic mockedExecutors = + Mockito.mockStatic(java.util.concurrent.Executors.class)) { + java.util.concurrent.ExecutorService mockExecutor = + Mockito.mock(java.util.concurrent.ExecutorService.class); + mockedExecutors + .when(() -> java.util.concurrent.Executors.newFixedThreadPool(Mockito.anyInt())) + .thenReturn(mockExecutor); + + java.util.concurrent.Future mockFuture = + Mockito.mock(java.util.concurrent.Future.class); + when(mockFuture.get()).thenThrow(new InterruptedException("Interrupted")); + when(mockExecutor.submit(any(java.util.concurrent.Callable.class))).thenReturn(mockFuture); + + assertThrows( + SchemaDiscoveryException.class, () -> JdbcIoWrapper.getPerSourceDiscoveries(mockGroup)); + } + } + + @Test + public void testAccumulateSpecs() { + SourceSchemaReference schemaRef = + SourceSchemaReference.ofJdbc(JdbcSchemaReference.builder().setDbName("testDB").build()); + JdbcIOWrapperConfig config = + JdbcIOWrapperConfig.builderWithMySqlDefaults() + .setSourceDbURL("jdbc:mysql://localhost/test") + .setDbAuth( + LocalCredentialsProvider.builder().setUserName("user").setPassword("pass").build()) + .setJdbcDriverJars("") + .setJdbcDriverClassName("com.mysql.cj.jdbc.Driver") + .setSourceSchemaReference(schemaRef) + .setMaxPartitions(10) + .setSplitStageCountHint(5L) + .setMaxFetchSize(42) + .build(); + + SourceColumnType colType = new SourceColumnType("INTEGER", new Long[] {}, null); + SourceTableSchema tableSchema1 = + SourceTableSchema.builder(SQLDialect.MYSQL) + .setTableName("t1") + .setTableSchemaUUID("uuid1") + .setEstimatedRowSize(100L) + .addSourceColumnNameToSourceColumnType("ID", colType) + .build(); + TableConfig tableConfig1 = + TableConfig.builder("t1") + .setDataSourceId(config.id()) + .setApproxRowCount(1000L) + .setMaxPartitions(10) + .withPartitionColum( + PartitionColumn.builder().setColumnName("ID").setColumnClass(Long.class).build()) + .build(); + + SourceTableSchema tableSchema2 = + SourceTableSchema.builder(SQLDialect.MYSQL) + .setTableName("t2") + .setTableSchemaUUID("uuid2") + .setEstimatedRowSize(200L) + .addSourceColumnNameToSourceColumnType("ID", colType) + .build(); + TableConfig tableConfig2 = + TableConfig.builder("t2").setDataSourceId(config.id()).setApproxRowCount(2000L).build(); + + JdbcIoWrapper.PerSourceDiscovery discovery = + JdbcIoWrapper.PerSourceDiscovery.builder() + .setConfig(config) + .setDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create("driver", "url")) + .setTableConfigs(ImmutableList.of(tableConfig1, tableConfig2)) + .setSourceSchema( + SourceSchema.builder() + .setSchemaReference(schemaRef) + .addTableSchema(tableSchema1) + .addTableSchema(tableSchema2) + .build()) + .build(); + + ImmutableList.Builder tableReferencesBuilder = ImmutableList.builder(); + ImmutableList.Builder splitSpecsBuilder = ImmutableList.builder(); + ImmutableMap.Builder> readSpecsBuilder = + ImmutableMap.builder(); + + JdbcIoWrapper.accumulateSpecs( + discovery, tableReferencesBuilder, splitSpecsBuilder, readSpecsBuilder); + + ImmutableList tableRefs = tableReferencesBuilder.build(); + ImmutableList splitSpecs = splitSpecsBuilder.build(); + ImmutableMap> readSpecs = + readSpecsBuilder.build(); + + assertThat(tableRefs).hasSize(2); + assertThat(tableRefs.get(0).sourceTableName()).isEqualTo("\"t1\""); + assertThat(tableRefs.get(0).sourceTableSchemaUUID()).isEqualTo("uuid1"); + assertThat(tableRefs.get(1).sourceTableName()).isEqualTo("\"t2\""); + assertThat(tableRefs.get(1).sourceTableSchemaUUID()).isEqualTo("uuid2"); + + assertThat(splitSpecs).hasSize(2); + assertThat(splitSpecs.get(0).tableIdentifier().tableName()).isEqualTo("\"t1\""); + assertThat(splitSpecs.get(0).approxRowCount()).isEqualTo(1000L); + assertThat(splitSpecs.get(0).maxPartitionsHint()).isEqualTo(10L); + assertThat(splitSpecs.get(0).splitStagesCount()).isEqualTo(5L); + + assertThat(readSpecs).hasSize(2); + TableIdentifier id1 = + TableIdentifier.builder().setTableName("\"t1\"").setDataSourceId(config.id()).build(); + assertThat(readSpecs.get(id1).fetchSize()).isEqualTo(42); + } + + @Test + public void testAccumulateSpecs_Empty() { + SourceSchemaReference schemaRef = + SourceSchemaReference.ofJdbc(JdbcSchemaReference.builder().setDbName("testDB").build()); + JdbcIOWrapperConfig config = + JdbcIOWrapperConfig.builderWithMySqlDefaults() + .setSourceDbURL("jdbc:mysql://localhost/test") + .setDbAuth( + LocalCredentialsProvider.builder().setUserName("user").setPassword("pass").build()) + .setJdbcDriverJars("") + .setJdbcDriverClassName("com.mysql.cj.jdbc.Driver") + .setSourceSchemaReference(schemaRef) + .build(); + + JdbcIoWrapper.PerSourceDiscovery discovery = + JdbcIoWrapper.PerSourceDiscovery.builder() + .setConfig(config) + .setDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create("driver", "url")) + .setTableConfigs(ImmutableList.of()) + .setSourceSchema(SourceSchema.builder().setSchemaReference(schemaRef).build()) + .build(); + + ImmutableList.Builder tableReferencesBuilder = ImmutableList.builder(); + ImmutableList.Builder splitSpecsBuilder = ImmutableList.builder(); + ImmutableMap.Builder> readSpecsBuilder = + ImmutableMap.builder(); + + JdbcIoWrapper.accumulateSpecs( + discovery, tableReferencesBuilder, splitSpecsBuilder, readSpecsBuilder); + + assertThat(tableReferencesBuilder.build()).isEmpty(); + assertThat(splitSpecsBuilder.build()).isEmpty(); + assertThat(readSpecsBuilder.build()).isEmpty(); + } + + /** + * Tests that {@link JdbcIoWrapper#accumulateSpecs} does not generate split specifications when + * the uniform partitions feature is disabled. + */ + @Test + public void testGetDataSourceProvider() { + JdbcIOWrapperConfig config1 = + JdbcIOWrapperConfig.builderWithMySqlDefaults() + .setSourceDbURL("jdbc:mysql://host1/test") + .setDbAuth(LocalCredentialsProvider.builder().setUserName("u").setPassword("p").build()) + .setJdbcDriverJars("") + .setJdbcDriverClassName("c") + .setSourceSchemaReference(JdbcSchemaReference.builder().setDbName("db1").build()) + .build(); + JdbcIOWrapperConfig config2 = + JdbcIOWrapperConfig.builderWithMySqlDefaults() + .setSourceDbURL("jdbc:mysql://host2/test") + .setDbAuth(LocalCredentialsProvider.builder().setUserName("u").setPassword("p").build()) + .setJdbcDriverJars("") + .setJdbcDriverClassName("c") + .setSourceSchemaReference(JdbcSchemaReference.builder().setDbName("db2").build()) + .build(); + + JdbcIoWrapper.PerSourceDiscovery discovery1 = + JdbcIoWrapper.PerSourceDiscovery.builder() + .setConfig(config1) + .setDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create("c", "u1")) + .setTableConfigs(ImmutableList.of()) + .setSourceSchema( + SourceSchema.builder().setSchemaReference(config1.sourceSchemaReference()).build()) + .build(); + JdbcIoWrapper.PerSourceDiscovery discovery2 = + JdbcIoWrapper.PerSourceDiscovery.builder() + .setConfig(config2) + .setDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create("c", "u2")) + .setTableConfigs(ImmutableList.of()) + .setSourceSchema( + SourceSchema.builder().setSchemaReference(config2.sourceSchemaReference()).build()) + .build(); + + DataSourceProvider provider = + JdbcIoWrapper.getDataSourceProvider(ImmutableList.of(discovery1, discovery2)); + + assertThat(provider.getDataSourceIds()).containsExactly(config1.id(), config2.id()); + } + + @Test + public void testGetDataSourceProvider_Empty() { + DataSourceProvider provider = JdbcIoWrapper.getDataSourceProvider(ImmutableList.of()); + assertThat(provider.getDataSourceIds()).isEmpty(); + } + + @Test + public void testGetMultiTableReadWithUniformPartitionIO_Empty() { + assertThat(JdbcIoWrapper.getMultiTableReadWithUniformPartitionIO(ImmutableList.of())).isEmpty(); + } + + @Test + public void testGetMultiTableReadWithUniformPartitionIO_FeatureDisabled() { + JdbcIOWrapperConfig config = + JdbcIOWrapperConfig.builderWithMySqlDefaults() + .setSourceDbURL("jdbc:mysql://host1/test") + .setDbAuth(LocalCredentialsProvider.builder().setUserName("u").setPassword("p").build()) + .setJdbcDriverJars("") + .setJdbcDriverClassName("c") + .setSourceSchemaReference(JdbcSchemaReference.builder().setDbName("db1").build()) + .setReadWithUniformPartitionsFeatureEnabled(false) + .setDialectAdapter(mockDialectAdapter) + .build(); + + JdbcIoWrapper.PerSourceDiscovery discovery = + JdbcIoWrapper.PerSourceDiscovery.builder() + .setConfig(config) + .setDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create("c", "u1")) + .setTableConfigs( + ImmutableList.of(TableConfig.builder("table1").setDataSourceId("ds1").build())) + .setSourceSchema( + SourceSchema.builder().setSchemaReference(config.sourceSchemaReference()).build()) + .build(); + + assertThat(JdbcIoWrapper.getMultiTableReadWithUniformPartitionIO(ImmutableList.of(discovery))) + .isEmpty(); + } + + @Test + public void testGetMultiTableReadWithUniformPartitionIO_EmptyTableConfigs() { + JdbcIOWrapperConfig config = + JdbcIOWrapperConfig.builderWithMySqlDefaults() + .setSourceDbURL("jdbc:mysql://host1/test") + .setDbAuth(LocalCredentialsProvider.builder().setUserName("u").setPassword("p").build()) + .setJdbcDriverJars("") + .setJdbcDriverClassName("c") + .setSourceSchemaReference(JdbcSchemaReference.builder().setDbName("db1").build()) + .setReadWithUniformPartitionsFeatureEnabled(true) + .setDialectAdapter(mockDialectAdapter) + .build(); + + JdbcIoWrapper.PerSourceDiscovery discovery = + JdbcIoWrapper.PerSourceDiscovery.builder() + .setConfig(config) + .setDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create("c", "u1")) + .setTableConfigs(ImmutableList.of()) + .setSourceSchema( + SourceSchema.builder().setSchemaReference(config.sourceSchemaReference()).build()) + .build(); + + assertThat(JdbcIoWrapper.getMultiTableReadWithUniformPartitionIO(ImmutableList.of(discovery))) + .isEmpty(); + } + + @Test + public void testGetMultiTableReadWithUniformPartitionIO_Success() { + JdbcIOWrapperConfig config = + JdbcIOWrapperConfig.builderWithMySqlDefaults() + .setSourceDbURL("jdbc:mysql://host1/test") + .setDbAuth(LocalCredentialsProvider.builder().setUserName("u").setPassword("p").build()) + .setJdbcDriverJars("") + .setJdbcDriverClassName("c") + .setSourceSchemaReference(JdbcSchemaReference.builder().setDbName("db1").build()) + .setReadWithUniformPartitionsFeatureEnabled(true) + .setDialectAdapter(mockDialectAdapter) + .build(); + + String tableName = "table1"; + TableConfig tableConfig = TableConfig.builder(tableName).setDataSourceId("ds1").build(); + SourceSchema sourceSchema = + SourceSchema.builder() + .setSchemaReference(config.sourceSchemaReference()) + .addTableSchema( + SourceTableSchema.builder(SQLDialect.MYSQL) + .setTableName(tableName) + .setTableSchemaUUID("uuid") + .setEstimatedRowSize(100L) + .addSourceColumnNameToSourceColumnType( + "col1", new SourceColumnType("INTEGER", new Long[] {}, null)) + .build()) + .build(); + + JdbcIoWrapper.PerSourceDiscovery discovery = + JdbcIoWrapper.PerSourceDiscovery.builder() + .setConfig(config) + .setDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create("c", "u1")) + .setTableConfigs(ImmutableList.of(tableConfig)) + .setSourceSchema(sourceSchema) + .build(); + + ImmutableMap, PTransform>> + result = JdbcIoWrapper.getMultiTableReadWithUniformPartitionIO(ImmutableList.of(discovery)); + + assertThat(result).hasSize(1); + assertThat(result.values().iterator().next()).isInstanceOf(ReadWithUniformPartitions.class); + } + + @Test + public void testGetMultiTableReadWithUniformPartitionIO_Mix() { + JdbcIOWrapperConfig configUniform = + JdbcIOWrapperConfig.builderWithMySqlDefaults() + .setSourceDbURL("jdbc:mysql://host1/test") + .setDbAuth(LocalCredentialsProvider.builder().setUserName("u").setPassword("p").build()) + .setJdbcDriverJars("") + .setJdbcDriverClassName("c") + .setSourceSchemaReference(JdbcSchemaReference.builder().setDbName("db1").build()) + .setReadWithUniformPartitionsFeatureEnabled(true) + .setDialectAdapter(mockDialectAdapter) + .build(); + + JdbcIOWrapperConfig configLegacy = + configUniform.toBuilder() + .setId(JdbcIOWrapperConfig.generateId()) + .setSourceDbURL("jdbc:mysql://host2/test") + .setReadWithUniformPartitionsFeatureEnabled(false) + .build(); + + String tableName = "table1"; + TableConfig tableConfig = TableConfig.builder(tableName).setDataSourceId("ds1").build(); + SourceSchema sourceSchema = + SourceSchema.builder() + .setSchemaReference(configUniform.sourceSchemaReference()) + .addTableSchema( + SourceTableSchema.builder(SQLDialect.MYSQL) + .setTableName(tableName) + .setTableSchemaUUID("uuid") + .setEstimatedRowSize(100L) + .addSourceColumnNameToSourceColumnType( + "col1", new SourceColumnType("INTEGER", new Long[] {}, null)) + .build()) + .build(); + + JdbcIoWrapper.PerSourceDiscovery discoveryUniform = + JdbcIoWrapper.PerSourceDiscovery.builder() + .setConfig(configUniform) + .setDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create("c", "u1")) + .setTableConfigs(ImmutableList.of(tableConfig)) + .setSourceSchema(sourceSchema) + .build(); + + JdbcIoWrapper.PerSourceDiscovery discoveryLegacy = + JdbcIoWrapper.PerSourceDiscovery.builder() + .setConfig(configLegacy) + .setDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create("c", "u1")) + .setTableConfigs(ImmutableList.of(tableConfig)) + .setSourceSchema(sourceSchema) + .build(); + + ImmutableMap, PTransform>> + result = + JdbcIoWrapper.getMultiTableReadWithUniformPartitionIO( + ImmutableList.of(discoveryUniform, discoveryLegacy)); + + assertThat(result).hasSize(1); + ImmutableList tableRefs = result.keySet().iterator().next(); + assertThat(tableRefs).hasSize(1); + assertThat(tableRefs.get(0).sourceTableName()).isEqualTo("\"table1\""); + } } diff --git a/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/source/reader/io/schema/SchemaDiscoveryImplTest.java b/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/source/reader/io/schema/SchemaDiscoveryImplTest.java index d8f4cfdfc7..61ef28aa84 100644 --- a/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/source/reader/io/schema/SchemaDiscoveryImplTest.java +++ b/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/source/reader/io/schema/SchemaDiscoveryImplTest.java @@ -226,6 +226,7 @@ public void testSchemaDiscoveryImpl() throws RetriableSchemaDiscoveryException { .isEqualTo(ImmutableMap.of("testTable", ImmutableMap.of())); verify(mockRetriableSchemaDiscovery, times(expectedCallsCount)) .discoverTableSchema(any(), any(), any()); + assertThat(SchemaDiscoveryImpl.getParallelism()).isEqualTo(4L); } @Test diff --git a/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/source/reader/io/transform/ExtractTableNameFnTest.java b/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/source/reader/io/transform/ExtractTableIdFnTest.java similarity index 88% rename from v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/source/reader/io/transform/ExtractTableNameFnTest.java rename to v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/source/reader/io/transform/ExtractTableIdFnTest.java index 207b69dfa4..794bf0a8a3 100644 --- a/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/source/reader/io/transform/ExtractTableNameFnTest.java +++ b/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/source/reader/io/transform/ExtractTableIdFnTest.java @@ -31,9 +31,9 @@ import org.junit.runner.RunWith; import org.junit.runners.JUnit4; -/** Test class for {@link ExtractTableNameFn}. */ +/** Test class for {@link ExtractTableIdFn}. */ @RunWith(JUnit4.class) -public class ExtractTableNameFnTest implements Serializable { +public class ExtractTableIdFnTest implements Serializable { @Rule public final transient TestPipeline pipeline = TestPipeline.create(); @Test @@ -48,9 +48,9 @@ public void testExtractTableNameFn_simpleName() { .build(); PCollection output = - pipeline.apply(Create.of(row)).apply(ParDo.of(new ExtractTableNameFn())); + pipeline.apply(Create.of(row)).apply(ParDo.of(new ExtractTableIdFn())); - PAssert.that(output).containsInAnyOrder("\"table1\""); + PAssert.that(output).containsInAnyOrder(tableSchema.tableSchemaUUID()); pipeline.run(); } @@ -66,10 +66,10 @@ public void testExtractTableNameFn_withQuotes() { .build(); PCollection output = - pipeline.apply(Create.of(row)).apply(ParDo.of(new ExtractTableNameFn())); + pipeline.apply(Create.of(row)).apply(ParDo.of(new ExtractTableIdFn())); - // Delimit logic: table"with"quotes -> "table""with""quotes" - PAssert.that(output).containsInAnyOrder("\"table\"\"with\"\"quotes\""); + // tableSchemaUUID is unique and stable + PAssert.that(output).containsInAnyOrder(tableSchema.tableSchemaUUID()); pipeline.run(); } } diff --git a/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/source/reader/io/transform/GroupCompletionDoFnTest.java b/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/source/reader/io/transform/GroupCompletionDoFnTest.java index 26c8fbd8d4..4a1a15d452 100644 --- a/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/source/reader/io/transform/GroupCompletionDoFnTest.java +++ b/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/source/reader/io/transform/GroupCompletionDoFnTest.java @@ -57,7 +57,7 @@ public void testGroupCompletionDoFn_matchingTable() { GroupCompletionDoFn fn = new GroupCompletionDoFn(tableReferences); PCollection output = - pipeline.apply(Create.of(KV.of("table1", 100L))).apply(ParDo.of(fn)); + pipeline.apply(Create.of(KV.of(ref1.sourceTableSchemaUUID(), 100L))).apply(ParDo.of(fn)); PAssert.that(output).containsInAnyOrder(ref1.toBuilder().setRecordCount(100L).build()); pipeline.run(); diff --git a/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/PipelineControllerTest.java b/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/PipelineControllerTest.java index 917094f973..0b8a7a8f7f 100644 --- a/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/PipelineControllerTest.java +++ b/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/PipelineControllerTest.java @@ -394,7 +394,9 @@ public void shardedDbConfigContainerMySqlTest() { sourceDbToSpannerOptions.setUsername(testUser); sourceDbToSpannerOptions.setPassword(testPassword); sourceDbToSpannerOptions.setTables("table1,table2"); - mockedStaticJdbcIoWrapper.when(() -> JdbcIoWrapper.of(any())).thenReturn(mockJdbcIoWrapper); + mockedStaticJdbcIoWrapper + .when(() -> JdbcIoWrapper.of((JdbcIOWrapperConfig) any())) + .thenReturn(mockJdbcIoWrapper); Shard shard = new Shard("shard1", "localhost", "3306", "user", "password", null, null, null, null);