Skip to content

Commit 786c9eb

Browse files
committed
Getting MultiShard GraphSize Optimization together
1 parent 19707e3 commit 786c9eb

File tree

3 files changed

+151
-108
lines changed

3 files changed

+151
-108
lines changed

v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/iowrapper/JdbcIoWrapper.java

Lines changed: 101 additions & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import com.google.common.collect.ImmutableList;
5050
import com.google.common.collect.ImmutableMap;
5151
import com.google.common.collect.ImmutableSet;
52+
import com.google.common.collect.Lists;
5253
import java.sql.SQLException;
5354
import java.util.List;
5455
import java.util.concurrent.ExecutionException;
@@ -640,42 +641,47 @@ private static PTransform<PBegin, PCollection<SourceRow>> getJdbcIO(
640641

641642
/* Todo in subsequent PR for multishard graphsize support, pass this to table reader. */
642643
DataSourceProvider dataSourceProvider = getDataSourceProvider(perSourceDiscoveries);
643-
for (PerSourceDiscovery perSourceDiscovery : perSourceDiscoveries) {
644-
ImmutableList.Builder<SourceTableReference> tableReferencesBuilder = ImmutableList.builder();
645-
ImmutableList.Builder<TableSplitSpecification> splitSpecsBuilder = ImmutableList.builder();
646-
ImmutableMap.Builder<TableIdentifier, TableReadSpecification<SourceRow>> readSpecsBuilder =
647-
ImmutableMap.builder();
648-
JdbcIOWrapperConfig config = perSourceDiscovery.config();
649-
DataSourceConfiguration dataSourceConfiguration =
650-
perSourceDiscovery.dataSourceConfiguration();
651-
ImmutableList<TableConfig> tableConfigs = perSourceDiscovery.tableConfigs();
652-
653-
if (!config.readWithUniformPartitionsFeatureEnabled() || tableConfigs.isEmpty()) {
654-
continue;
655-
}
656-
accumulateSpecs(
657-
perSourceDiscovery, tableReferencesBuilder, splitSpecsBuilder, readSpecsBuilder);
658-
659-
ReadWithUniformPartitions<SourceRow> readWithUniformPartitions =
660-
ReadWithUniformPartitions.<SourceRow>builder()
661-
.setTableSplitSpecifications(splitSpecsBuilder.build())
662-
.setTableReadSpecifications(readSpecsBuilder.build())
663-
.setDataSourceProviderFn(
664-
JdbcIO.PoolableDataSourceProvider.of(dataSourceConfiguration))
665-
.setDbAdapter(dialectAdapter)
666-
.setWaitOn(waitOn)
667-
.setDbParallelizationForSplitProcess(dbParallelizationForSplitProcess)
668-
.setDbParallelizationForReads(dbParallelizationForReads)
669-
.setAdditionalOperationsOnRanges(additionalOperationsOnRanges)
670-
.build();
671-
672-
LOG.info(
673-
"Configured Multi-Table ReadWithUniformPartitions {} for tables {} with config {}",
674-
readWithUniformPartitions,
675-
tableConfigs.stream().map(TableConfig::tableName).collect(Collectors.toList()),
676-
config);
677-
tableReadersBuilder.put(tableReferencesBuilder.build(), readWithUniformPartitions);
644+
ImmutableList.Builder<SourceTableReference> tableReferencesBuilder = ImmutableList.builder();
645+
ImmutableList.Builder<TableSplitSpecification> splitSpecsBuilder = ImmutableList.builder();
646+
ImmutableMap.Builder<TableIdentifier, TableReadSpecification<SourceRow>> readSpecsBuilder =
647+
ImmutableMap.builder();
648+
accumulateSpecs(
649+
perSourceDiscoveries, tableReferencesBuilder, splitSpecsBuilder, readSpecsBuilder);
650+
651+
ImmutableList<SourceTableReference> tableReferences = tableReferencesBuilder.build();
652+
if (tableReferences.isEmpty()) {
653+
return ImmutableMap.of();
678654
}
655+
656+
ReadWithUniformPartitions<SourceRow> readWithUniformPartitions =
657+
ReadWithUniformPartitions.<SourceRow>builder()
658+
.setTableSplitSpecifications(splitSpecsBuilder.build())
659+
.setTableReadSpecifications(readSpecsBuilder.build())
660+
.setDataSourceProvider(dataSourceProvider)
661+
.setDbAdapter(dialectAdapter)
662+
.setWaitOn(waitOn)
663+
.setDbParallelizationForSplitProcess(dbParallelizationForSplitProcess)
664+
.setDbParallelizationForReads(dbParallelizationForReads)
665+
.setAdditionalOperationsOnRanges(additionalOperationsOnRanges)
666+
.build();
667+
668+
Lists.partition(perSourceDiscoveries, 50)
669+
.forEach(
670+
batch ->
671+
LOG.info(
672+
"Configured Multi-Table ReadWithUniformPartitions for sources batch: {}",
673+
batch.stream()
674+
.map(
675+
d ->
676+
"id="
677+
+ d.config().id()
678+
+ ":"
679+
+ "shard_id="
680+
+ d.config().shardID()
681+
+ ":'"
682+
+ d.sourceSchema().schemaReference().jdbc().toString())
683+
.collect(Collectors.joining(","))));
684+
tableReadersBuilder.put(tableReferences, readWithUniformPartitions);
679685
return tableReadersBuilder.build();
680686
}
681687

@@ -701,58 +707,71 @@ private static PTransform<PBegin, PCollection<SourceRow>> getJdbcIO(
701707
/* Todo in subsequent PR for multishard graphsize support accumulate specs across a list of sourceDiscovereies */
702708
@VisibleForTesting
703709
protected static void accumulateSpecs(
704-
PerSourceDiscovery perSourceDiscovery,
710+
ImmutableList<PerSourceDiscovery> perSourceDiscoveries,
705711
ImmutableList.Builder<SourceTableReference> tableReferencesBuilder,
706712
ImmutableList.Builder<TableSplitSpecification> splitSpecsBuilder,
707713
ImmutableMap.Builder<TableIdentifier, TableReadSpecification<SourceRow>> readSpecsBuilder) {
708714

709-
JdbcIOWrapperConfig config = perSourceDiscovery.config();
710-
SourceSchemaReference sourceSchemaReference =
711-
perSourceDiscovery.sourceSchema().schemaReference();
712-
ImmutableList<TableConfig> tableConfigs = perSourceDiscovery.tableConfigs();
713-
for (TableConfig tableConfig : tableConfigs) {
714-
SourceTableSchema sourceTableSchema =
715-
findSourceTableSchema(perSourceDiscovery.sourceSchema(), tableConfig);
716-
int fetchSize = getFetchSize(config, tableConfig, sourceTableSchema);
717-
TableIdentifier tableIdentifier = getTableIdentifier(tableConfig);
718-
719-
TableSplitSpecification.Builder tableSplitSpecificationBuilder =
720-
TableSplitSpecification.builder()
721-
.setTableIdentifier(tableIdentifier)
722-
.setPartitionColumns(tableConfig.partitionColumns())
723-
.setApproxRowCount(tableConfig.approxRowCount());
724-
if (tableConfig.maxPartitions() != null) {
725-
tableSplitSpecificationBuilder =
726-
tableSplitSpecificationBuilder.setMaxPartitionsHint((long) tableConfig.maxPartitions());
727-
}
728-
if (config.splitStageCountHint() >= 0) {
729-
tableSplitSpecificationBuilder =
730-
tableSplitSpecificationBuilder.setSplitStagesCount((long) config.splitStageCountHint());
731-
}
732-
splitSpecsBuilder.add(tableSplitSpecificationBuilder.build());
733-
734-
TableReadSpecification.Builder<SourceRow> tableReadSpecificationBuilder =
735-
TableReadSpecification.<SourceRow>builder()
736-
.setFetchSize(fetchSize)
737-
.setTableIdentifier(tableIdentifier)
738-
.setRowMapper(
739-
new JdbcSourceRowMapper(
740-
config.valueMappingsProvider(),
741-
sourceSchemaReference,
742-
sourceTableSchema,
743-
config.shardID()));
744-
if (config.maxFetchSize() != null) {
745-
tableReadSpecificationBuilder =
746-
tableReadSpecificationBuilder.setFetchSize(config.maxFetchSize());
715+
for (PerSourceDiscovery perSourceDiscovery : perSourceDiscoveries) {
716+
JdbcIOWrapperConfig config = perSourceDiscovery.config();
717+
SourceSchemaReference sourceSchemaReference =
718+
perSourceDiscovery.sourceSchema().schemaReference();
719+
ImmutableList<TableConfig> tableConfigs = perSourceDiscovery.tableConfigs();
720+
for (TableConfig tableConfig : tableConfigs) {
721+
722+
if (!config.readWithUniformPartitionsFeatureEnabled() || tableConfigs.isEmpty()) {
723+
continue;
724+
}
725+
SourceTableSchema sourceTableSchema =
726+
findSourceTableSchema(perSourceDiscovery.sourceSchema(), tableConfig);
727+
int fetchSize = getFetchSize(config, tableConfig, sourceTableSchema);
728+
TableIdentifier tableIdentifier = getTableIdentifier(tableConfig);
729+
730+
TableSplitSpecification.Builder tableSplitSpecificationBuilder =
731+
TableSplitSpecification.builder()
732+
.setTableIdentifier(tableIdentifier)
733+
.setPartitionColumns(tableConfig.partitionColumns())
734+
.setApproxRowCount(tableConfig.approxRowCount());
735+
if (tableConfig.maxPartitions() != null) {
736+
tableSplitSpecificationBuilder =
737+
tableSplitSpecificationBuilder.setMaxPartitionsHint(
738+
(long) tableConfig.maxPartitions());
739+
}
740+
if (config.splitStageCountHint() >= 0) {
741+
tableSplitSpecificationBuilder =
742+
tableSplitSpecificationBuilder.setSplitStagesCount(
743+
(long) config.splitStageCountHint());
744+
}
745+
splitSpecsBuilder.add(tableSplitSpecificationBuilder.build());
746+
747+
TableReadSpecification.Builder<SourceRow> tableReadSpecificationBuilder =
748+
TableReadSpecification.<SourceRow>builder()
749+
.setFetchSize(fetchSize)
750+
.setTableIdentifier(tableIdentifier)
751+
.setRowMapper(
752+
new JdbcSourceRowMapper(
753+
config.valueMappingsProvider(),
754+
sourceSchemaReference,
755+
sourceTableSchema,
756+
config.shardID()));
757+
if (config.maxFetchSize() != null) {
758+
tableReadSpecificationBuilder =
759+
tableReadSpecificationBuilder.setFetchSize(config.maxFetchSize());
760+
}
761+
readSpecsBuilder.put(tableIdentifier, tableReadSpecificationBuilder.build());
762+
763+
tableReferencesBuilder.add(
764+
SourceTableReference.builder()
765+
.setSourceSchemaReference(sourceSchemaReference)
766+
.setSourceTableName(delimitIdentifier(sourceTableSchema.tableName()))
767+
.setSourceTableSchemaUUID(sourceTableSchema.tableSchemaUUID())
768+
.build());
769+
LOG.info(
770+
"Configuring Multi-Table ReadWithUniformPartitions for source-id {} tables {} with config {}",
771+
config.id(),
772+
tableConfigs.stream().map(TableConfig::tableName).collect(Collectors.toList()),
773+
config);
747774
}
748-
readSpecsBuilder.put(tableIdentifier, tableReadSpecificationBuilder.build());
749-
750-
tableReferencesBuilder.add(
751-
SourceTableReference.builder()
752-
.setSourceSchemaReference(sourceSchemaReference)
753-
.setSourceTableName(delimitIdentifier(sourceTableSchema.tableName()))
754-
.setSourceTableSchemaUUID(sourceTableSchema.tableSchemaUUID())
755-
.build());
756775
}
757776
}
758777

v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/uniformsplitter/transforms/ReadWithUniformPartitions.java

Lines changed: 0 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
import com.google.auto.value.AutoValue;
1919
import com.google.auto.value.extension.memoized.Memoized;
2020
import com.google.cloud.teleport.v2.source.reader.io.jdbc.uniformsplitter.DataSourceProvider;
21-
import com.google.cloud.teleport.v2.source.reader.io.jdbc.uniformsplitter.DataSourceProviderImpl;
2221
import com.google.cloud.teleport.v2.source.reader.io.jdbc.uniformsplitter.UniformSplitterDBAdapter;
2322
import com.google.cloud.teleport.v2.source.reader.io.jdbc.uniformsplitter.columnboundary.ColumnForBoundaryQuery;
2423
import com.google.cloud.teleport.v2.source.reader.io.jdbc.uniformsplitter.range.BoundaryTypeMapper;
@@ -569,15 +568,6 @@ public abstract static class Builder<T> {
569568

570569
public abstract Builder<T> setDataSourceProvider(DataSourceProvider value);
571570

572-
// TODO Remove in subsequent PR
573-
private SerializableFunction<Void, DataSource> dataSourceProviderSerializableFunction = null;
574-
575-
// TODO Remove in subsequent PR
576-
public Builder<T> setDataSourceProviderFn(SerializableFunction<Void, DataSource> value) {
577-
this.dataSourceProviderSerializableFunction = value;
578-
return this;
579-
}
580-
581571
public abstract Builder<T> setDbAdapter(UniformSplitterDBAdapter value);
582572

583573
public abstract Builder<T> setCountQueryTimeoutMillis(long value);
@@ -626,15 +616,6 @@ protected String getTransformPrefix() {
626616
}
627617

628618
public ReadWithUniformPartitions<T> build() {
629-
// TODO Remove in subsequent PR
630-
if (this.dataSourceProviderSerializableFunction != null) {
631-
this.setDataSourceProvider(
632-
DataSourceProviderImpl.builder()
633-
.addDataSource(
634-
this.tableSplitSpecifications().get(0).tableIdentifier().dataSourceId(),
635-
this.dataSourceProviderSerializableFunction)
636-
.build());
637-
}
638619
validateParameters();
639620

640621
java.util.Set<TableIdentifier> splitIdentifiers =

v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/iowrapper/JdbcIoWrapperTest.java

Lines changed: 50 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1009,19 +1009,19 @@ public void testBuildTableReaders() {
10091009

10101010
// Assert
10111011
// Legacy: shard2.t2 (1), shard3.t3a (1), shard3.t3b (1) = 3 transforms
1012-
// Uniform: shard5 (1), shard6 (1) = 2 transforms
1013-
// Total = 5 entries
1014-
assertThat(tableReaders).hasSize(5);
1012+
// Uniform: shard5 (1), shard6 (1) = 1 transform (combined)
1013+
// Total = 4 entries
1014+
assertThat(tableReaders).hasSize(4);
10151015

10161016
// Verify Legacy readers
10171017
long legacyCount =
10181018
tableReaders.values().stream().filter(v -> v instanceof JdbcIO.ReadWithPartitions).count();
10191019
assertThat(legacyCount).isEqualTo(3);
10201020

1021-
// Verify Uniform readers
1021+
// Verify that we have one combined Uniform reader
10221022
long uniformCount =
10231023
tableReaders.values().stream().filter(v -> v instanceof ReadWithUniformPartitions).count();
1024-
assertThat(uniformCount).isEqualTo(2);
1024+
assertThat(uniformCount).isEqualTo(1);
10251025

10261026
// Verify table names in keys
10271027
java.util.List<String> allTableNames =
@@ -1095,6 +1095,10 @@ public void testGetPerSourceDiscoveries() throws RetriableSchemaDiscoveryExcepti
10951095
.containsNoDuplicates();
10961096
}
10971097

1098+
/**
1099+
* Tests that {@link JdbcIoWrapper#getPerSourceDiscoveries} correctly propagates exceptions when
1100+
* schema discovery fails for a shard.
1101+
*/
10981102
@Test
10991103
public void testGetPerSourceDiscoveries_Fails() throws RetriableSchemaDiscoveryException {
11001104
SourceSchemaReference schemaRef =
@@ -1122,6 +1126,10 @@ public void testGetPerSourceDiscoveries_Fails() throws RetriableSchemaDiscoveryE
11221126
assertThrows(RuntimeException.class, () -> JdbcIoWrapper.getPerSourceDiscoveries(group));
11231127
}
11241128

1129+
/**
1130+
* Tests that {@link JdbcIoWrapper#getPerSourceDiscoveries} logs a warning and continues when
1131+
* closing a data source fails after discovery.
1132+
*/
11251133
@Test
11261134
public void testGetPerSourceDiscovery_LogsWarning_WhenCloseFails() throws Exception {
11271135
SourceSchemaReference schemaRef =
@@ -1304,7 +1312,7 @@ public void testAccumulateSpecs() {
13041312
ImmutableMap.builder();
13051313

13061314
JdbcIoWrapper.accumulateSpecs(
1307-
discovery, tableReferencesBuilder, splitSpecsBuilder, readSpecsBuilder);
1315+
ImmutableList.of(discovery), tableReferencesBuilder, splitSpecsBuilder, readSpecsBuilder);
13081316

13091317
ImmutableList<SourceTableReference> tableRefs = tableReferencesBuilder.build();
13101318
ImmutableList<TableSplitSpecification> splitSpecs = splitSpecsBuilder.build();
@@ -1357,7 +1365,7 @@ public void testAccumulateSpecs_Empty() {
13571365
ImmutableMap.builder();
13581366

13591367
JdbcIoWrapper.accumulateSpecs(
1360-
discovery, tableReferencesBuilder, splitSpecsBuilder, readSpecsBuilder);
1368+
ImmutableList.of(discovery), tableReferencesBuilder, splitSpecsBuilder, readSpecsBuilder);
13611369

13621370
assertThat(tableReferencesBuilder.build()).isEmpty();
13631371
assertThat(splitSpecsBuilder.build()).isEmpty();
@@ -1368,6 +1376,41 @@ public void testAccumulateSpecs_Empty() {
13681376
* Tests that {@link JdbcIoWrapper#accumulateSpecs} does not generate split specifications when
13691377
* the uniform partitions feature is disabled.
13701378
*/
1379+
@Test
1380+
public void testAccumulateSpecs_FeatureDisabled() {
1381+
SourceSchemaReference schemaRef =
1382+
SourceSchemaReference.ofJdbc(JdbcSchemaReference.builder().setDbName("testDB").build());
1383+
JdbcIOWrapperConfig config =
1384+
JdbcIOWrapperConfig.builderWithMySqlDefaults()
1385+
.setReadWithUniformPartitionsFeatureEnabled(false)
1386+
.setSourceDbURL("jdbc:mysql://localhost/test")
1387+
.setDbAuth(
1388+
LocalCredentialsProvider.builder().setUserName("user").setPassword("pass").build())
1389+
.setJdbcDriverJars("")
1390+
.setJdbcDriverClassName("com.mysql.cj.jdbc.Driver")
1391+
.setSourceSchemaReference(schemaRef)
1392+
.build();
1393+
TableConfig tableConfig = TableConfig.builder("testTable").setDataSourceId("shard1").build();
1394+
JdbcIoWrapper.PerSourceDiscovery discovery =
1395+
JdbcIoWrapper.PerSourceDiscovery.builder()
1396+
.setConfig(config)
1397+
.setDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create("c", "u"))
1398+
.setTableConfigs(ImmutableList.of(tableConfig))
1399+
.setSourceSchema(SourceSchema.builder().setSchemaReference(schemaRef).build())
1400+
.build();
1401+
1402+
ImmutableList.Builder<SourceTableReference> tableReferencesBuilder = ImmutableList.builder();
1403+
ImmutableList.Builder<TableSplitSpecification> splitSpecsBuilder = ImmutableList.builder();
1404+
ImmutableMap.Builder<TableIdentifier, TableReadSpecification<SourceRow>> readSpecsBuilder =
1405+
ImmutableMap.builder();
1406+
1407+
JdbcIoWrapper.accumulateSpecs(
1408+
ImmutableList.of(discovery), tableReferencesBuilder, splitSpecsBuilder, readSpecsBuilder);
1409+
1410+
assertThat(splitSpecsBuilder.build()).isEmpty();
1411+
assertThat(readSpecsBuilder.build()).isEmpty();
1412+
}
1413+
13711414
@Test
13721415
public void testGetDataSourceProvider() {
13731416
JdbcIOWrapperConfig config1 =

0 commit comments

Comments
 (0)