org.apache.beam
beam-it-jdbc
diff --git a/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/exception/SuitableIndexNotFoundException.java b/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/exception/SuitableIndexNotFoundException.java
index 4a6d662794..3bc8162f7a 100644
--- a/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/exception/SuitableIndexNotFoundException.java
+++ b/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/exception/SuitableIndexNotFoundException.java
@@ -15,14 +15,14 @@
*/
package com.google.cloud.teleport.v2.source.reader.io.exception;
-import com.google.cloud.teleport.v2.source.reader.io.jdbc.iowrapper.config.JdbcIOWrapperConfig;
+import com.google.cloud.teleport.v2.source.reader.io.jdbc.iowrapper.config.JdbcIoWrapperConfigGroup;
/**
* Exception thrown when a suitable indexed column that can act as the partition column is not
* found.
*
* Please refer to {@link
- * com.google.cloud.teleport.v2.source.reader.io.jdbc.iowrapper.JdbcIoWrapper#of(JdbcIOWrapperConfig)}
+ * com.google.cloud.teleport.v2.source.reader.io.jdbc.iowrapper.JdbcIoWrapper#of(JdbcIoWrapperConfigGroup)}
* for details on the cases where this is thrown.
*/
public class SuitableIndexNotFoundException extends SchemaDiscoveryException {
diff --git a/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/iowrapper/JdbcIoWrapper.java b/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/iowrapper/JdbcIoWrapper.java
index c4ea69b087..802d2f734b 100644
--- a/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/iowrapper/JdbcIoWrapper.java
+++ b/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/iowrapper/JdbcIoWrapper.java
@@ -49,6 +49,7 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
import java.sql.SQLException;
import java.util.List;
import java.util.concurrent.ExecutionException;
@@ -102,25 +103,6 @@ public final class JdbcIoWrapper implements IoWrapper {
*
Retries: Individual shard discovery operations are automatically retried with
* exponential backoff as configured in the {@link JdbcIOWrapperConfig}.
*
- * @param config configuration for reading from a JDBC source.
- * @return JdbcIOWrapper
- * @throws SuitableIndexNotFoundException if a suitable index is not found to act as the partition
- * column. Please refer to {@link JdbcIoWrapper#autoInferTableConfigs(JdbcIOWrapperConfig,
- * SchemaDiscovery, DataSource)} for details on situation where this is thrown.
- */
- /* Todo remove this function in subsequent PR for multishard graphsize support */
- public static JdbcIoWrapper of(JdbcIOWrapperConfig config) throws SuitableIndexNotFoundException {
- PerSourceDiscovery perSourceDiscovery = getPerSourceDiscovery(config);
- ImmutableMap, PTransform>>
- tableReaders = buildTableReaders(ImmutableList.of(perSourceDiscovery));
- return new JdbcIoWrapper(tableReaders, ImmutableList.of(perSourceDiscovery.sourceSchema()));
- }
-
- /**
- * Construct a JdbcIOWrapper from the configuration.
- *
- * This method performs schema discovery for a single source source.
- *
* @param configGroup configurations for reading from a JDBC source.
* @return JdbcIOWrapper
* @throws SuitableIndexNotFoundException if a suitable index is not found to act as the partition
@@ -659,42 +641,47 @@ private static PTransform> getJdbcIO(
/* Todo in subsequent PR for multishard graphsize support, pass this to table reader. */
DataSourceProvider dataSourceProvider = getDataSourceProvider(perSourceDiscoveries);
- for (PerSourceDiscovery perSourceDiscovery : perSourceDiscoveries) {
- ImmutableList.Builder tableReferencesBuilder = ImmutableList.builder();
- ImmutableList.Builder splitSpecsBuilder = ImmutableList.builder();
- ImmutableMap.Builder> readSpecsBuilder =
- ImmutableMap.builder();
- JdbcIOWrapperConfig config = perSourceDiscovery.config();
- DataSourceConfiguration dataSourceConfiguration =
- perSourceDiscovery.dataSourceConfiguration();
- ImmutableList tableConfigs = perSourceDiscovery.tableConfigs();
-
- if (!config.readWithUniformPartitionsFeatureEnabled() || tableConfigs.isEmpty()) {
- continue;
- }
- accumulateSpecs(
- perSourceDiscovery, tableReferencesBuilder, splitSpecsBuilder, readSpecsBuilder);
-
- ReadWithUniformPartitions readWithUniformPartitions =
- ReadWithUniformPartitions.builder()
- .setTableSplitSpecifications(splitSpecsBuilder.build())
- .setTableReadSpecifications(readSpecsBuilder.build())
- .setDataSourceProviderFn(
- JdbcIO.PoolableDataSourceProvider.of(dataSourceConfiguration))
- .setDbAdapter(dialectAdapter)
- .setWaitOn(waitOn)
- .setDbParallelizationForSplitProcess(dbParallelizationForSplitProcess)
- .setDbParallelizationForReads(dbParallelizationForReads)
- .setAdditionalOperationsOnRanges(additionalOperationsOnRanges)
- .build();
-
- LOG.info(
- "Configured Multi-Table ReadWithUniformPartitions {} for tables {} with config {}",
- readWithUniformPartitions,
- tableConfigs.stream().map(TableConfig::tableName).collect(Collectors.toList()),
- config);
- tableReadersBuilder.put(tableReferencesBuilder.build(), readWithUniformPartitions);
+ ImmutableList.Builder tableReferencesBuilder = ImmutableList.builder();
+ ImmutableList.Builder splitSpecsBuilder = ImmutableList.builder();
+ ImmutableMap.Builder> readSpecsBuilder =
+ ImmutableMap.builder();
+ accumulateSpecs(
+ perSourceDiscoveries, tableReferencesBuilder, splitSpecsBuilder, readSpecsBuilder);
+
+ ImmutableList tableReferences = tableReferencesBuilder.build();
+ if (tableReferences.isEmpty()) {
+ return ImmutableMap.of();
}
+
+ ReadWithUniformPartitions readWithUniformPartitions =
+ ReadWithUniformPartitions.builder()
+ .setTableSplitSpecifications(splitSpecsBuilder.build())
+ .setTableReadSpecifications(readSpecsBuilder.build())
+ .setDataSourceProvider(dataSourceProvider)
+ .setDbAdapter(dialectAdapter)
+ .setWaitOn(waitOn)
+ .setDbParallelizationForSplitProcess(dbParallelizationForSplitProcess)
+ .setDbParallelizationForReads(dbParallelizationForReads)
+ .setAdditionalOperationsOnRanges(additionalOperationsOnRanges)
+ .build();
+
+ Lists.partition(perSourceDiscoveries, 50)
+ .forEach(
+ batch ->
+ LOG.info(
+ "Configured Multi-Table ReadWithUniformPartitions for sources batch: {}",
+ batch.stream()
+ .map(
+ d ->
+ "id="
+ + d.config().id()
+ + ":"
+ + "shard_id="
+ + d.config().shardID()
+ + ":'"
+ + d.sourceSchema().schemaReference().jdbc().toString())
+ .collect(Collectors.joining(","))));
+ tableReadersBuilder.put(tableReferences, readWithUniformPartitions);
return tableReadersBuilder.build();
}
@@ -720,58 +707,71 @@ private static PTransform> getJdbcIO(
/* Todo in subsequent PR for multishard graphsize support accumulate specs across a list of sourceDiscovereies */
@VisibleForTesting
protected static void accumulateSpecs(
- PerSourceDiscovery perSourceDiscovery,
+ ImmutableList perSourceDiscoveries,
ImmutableList.Builder tableReferencesBuilder,
ImmutableList.Builder splitSpecsBuilder,
ImmutableMap.Builder> readSpecsBuilder) {
- JdbcIOWrapperConfig config = perSourceDiscovery.config();
- SourceSchemaReference sourceSchemaReference =
- perSourceDiscovery.sourceSchema().schemaReference();
- ImmutableList tableConfigs = perSourceDiscovery.tableConfigs();
- for (TableConfig tableConfig : tableConfigs) {
- SourceTableSchema sourceTableSchema =
- findSourceTableSchema(perSourceDiscovery.sourceSchema(), tableConfig);
- int fetchSize = getFetchSize(config, tableConfig, sourceTableSchema);
- TableIdentifier tableIdentifier = getTableIdentifier(tableConfig);
-
- TableSplitSpecification.Builder tableSplitSpecificationBuilder =
- TableSplitSpecification.builder()
- .setTableIdentifier(tableIdentifier)
- .setPartitionColumns(tableConfig.partitionColumns())
- .setApproxRowCount(tableConfig.approxRowCount());
- if (tableConfig.maxPartitions() != null) {
- tableSplitSpecificationBuilder =
- tableSplitSpecificationBuilder.setMaxPartitionsHint((long) tableConfig.maxPartitions());
- }
- if (config.splitStageCountHint() >= 0) {
- tableSplitSpecificationBuilder =
- tableSplitSpecificationBuilder.setSplitStagesCount((long) config.splitStageCountHint());
- }
- splitSpecsBuilder.add(tableSplitSpecificationBuilder.build());
-
- TableReadSpecification.Builder tableReadSpecificationBuilder =
- TableReadSpecification.builder()
- .setFetchSize(fetchSize)
- .setTableIdentifier(tableIdentifier)
- .setRowMapper(
- new JdbcSourceRowMapper(
- config.valueMappingsProvider(),
- sourceSchemaReference,
- sourceTableSchema,
- config.shardID()));
- if (config.maxFetchSize() != null) {
- tableReadSpecificationBuilder =
- tableReadSpecificationBuilder.setFetchSize(config.maxFetchSize());
+ for (PerSourceDiscovery perSourceDiscovery : perSourceDiscoveries) {
+ JdbcIOWrapperConfig config = perSourceDiscovery.config();
+ SourceSchemaReference sourceSchemaReference =
+ perSourceDiscovery.sourceSchema().schemaReference();
+ ImmutableList tableConfigs = perSourceDiscovery.tableConfigs();
+ for (TableConfig tableConfig : tableConfigs) {
+
+ if (!config.readWithUniformPartitionsFeatureEnabled() || tableConfigs.isEmpty()) {
+ continue;
+ }
+ SourceTableSchema sourceTableSchema =
+ findSourceTableSchema(perSourceDiscovery.sourceSchema(), tableConfig);
+ int fetchSize = getFetchSize(config, tableConfig, sourceTableSchema);
+ TableIdentifier tableIdentifier = getTableIdentifier(tableConfig);
+
+ TableSplitSpecification.Builder tableSplitSpecificationBuilder =
+ TableSplitSpecification.builder()
+ .setTableIdentifier(tableIdentifier)
+ .setPartitionColumns(tableConfig.partitionColumns())
+ .setApproxRowCount(tableConfig.approxRowCount());
+ if (tableConfig.maxPartitions() != null) {
+ tableSplitSpecificationBuilder =
+ tableSplitSpecificationBuilder.setMaxPartitionsHint(
+ (long) tableConfig.maxPartitions());
+ }
+ if (config.splitStageCountHint() >= 0) {
+ tableSplitSpecificationBuilder =
+ tableSplitSpecificationBuilder.setSplitStagesCount(
+ (long) config.splitStageCountHint());
+ }
+ splitSpecsBuilder.add(tableSplitSpecificationBuilder.build());
+
+ TableReadSpecification.Builder tableReadSpecificationBuilder =
+ TableReadSpecification.builder()
+ .setFetchSize(fetchSize)
+ .setTableIdentifier(tableIdentifier)
+ .setRowMapper(
+ new JdbcSourceRowMapper(
+ config.valueMappingsProvider(),
+ sourceSchemaReference,
+ sourceTableSchema,
+ config.shardID()));
+ if (config.maxFetchSize() != null) {
+ tableReadSpecificationBuilder =
+ tableReadSpecificationBuilder.setFetchSize(config.maxFetchSize());
+ }
+ readSpecsBuilder.put(tableIdentifier, tableReadSpecificationBuilder.build());
+
+ tableReferencesBuilder.add(
+ SourceTableReference.builder()
+ .setSourceSchemaReference(sourceSchemaReference)
+ .setSourceTableName(delimitIdentifier(sourceTableSchema.tableName()))
+ .setSourceTableSchemaUUID(sourceTableSchema.tableSchemaUUID())
+ .build());
+ LOG.info(
+ "Configuring Multi-Table ReadWithUniformPartitions for source-id {} tables {} with config {}",
+ config.id(),
+ tableConfigs.stream().map(TableConfig::tableName).collect(Collectors.toList()),
+ config);
}
- readSpecsBuilder.put(tableIdentifier, tableReadSpecificationBuilder.build());
-
- tableReferencesBuilder.add(
- SourceTableReference.builder()
- .setSourceSchemaReference(sourceSchemaReference)
- .setSourceTableName(delimitIdentifier(sourceTableSchema.tableName()))
- .setSourceTableSchemaUUID(sourceTableSchema.tableSchemaUUID())
- .build());
}
}
diff --git a/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/uniformsplitter/transforms/CollationMapperDoFn.java b/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/uniformsplitter/transforms/CollationMapperDoFn.java
index 44e96b523e..d19245157a 100644
--- a/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/uniformsplitter/transforms/CollationMapperDoFn.java
+++ b/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/uniformsplitter/transforms/CollationMapperDoFn.java
@@ -15,9 +15,8 @@
*/
package com.google.cloud.teleport.v2.source.reader.io.jdbc.uniformsplitter.transforms;
-import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
-
import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.google.cloud.teleport.v2.source.reader.io.jdbc.uniformsplitter.DataSourceProvider;
import com.google.cloud.teleport.v2.source.reader.io.jdbc.uniformsplitter.UniformSplitterDBAdapter;
import com.google.cloud.teleport.v2.source.reader.io.jdbc.uniformsplitter.stringmapper.CollationMapper;
import com.google.cloud.teleport.v2.source.reader.io.jdbc.uniformsplitter.stringmapper.CollationReference;
@@ -27,48 +26,44 @@
import javax.annotation.Nullable;
import javax.sql.DataSource;
import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.KV;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** Discover the Collation Mapping information for a given {@link CollationReference}. */
public class CollationMapperDoFn
- extends DoFn>
+ extends DoFn, KV>
implements Serializable {
private static final Logger logger = LoggerFactory.getLogger(CollationMapper.class);
- private final SerializableFunction dataSourceProviderFn;
+ private final DataSourceProvider dataSourceProvider;
private final UniformSplitterDBAdapter dbAdapter;
+ private transient DataSourceManager dataSourceManager;
@JsonIgnore private transient @Nullable DataSource dataSource;
public CollationMapperDoFn(
- SerializableFunction dataSourceProviderFn,
- UniformSplitterDBAdapter dbAdapter) {
- this.dataSourceProviderFn = dataSourceProviderFn;
+ DataSourceProvider dataSourceProvider, UniformSplitterDBAdapter dbAdapter) {
+ this.dataSourceProvider = dataSourceProvider;
this.dbAdapter = dbAdapter;
this.dataSource = null;
}
- @Setup
- public void setup() throws Exception {
- dataSource = dataSourceProviderFn.apply(null);
- }
-
- private Connection acquireConnection() throws SQLException {
- return checkStateNotNull(this.dataSource).getConnection();
+ @StartBundle
+ public void startBundle() throws Exception {
+ this.dataSourceManager =
+ DataSourceManagerImpl.builder().setDataSourceProvider(dataSourceProvider).build();
}
@ProcessElement
public void processElement(
- @Element CollationReference input,
+ @Element KV input,
OutputReceiver> out)
throws SQLException {
-
- try (Connection conn = acquireConnection()) {
- CollationMapper mapper = CollationMapper.fromDB(conn, dbAdapter, input);
- out.output(KV.of(input, mapper));
+ DataSource dataSource = dataSourceManager.getDatasource(input.getKey());
+ try (Connection conn = dataSource.getConnection()) {
+ CollationMapper mapper = CollationMapper.fromDB(conn, dbAdapter, input.getValue());
+ out.output(KV.of(input.getValue(), mapper));
} catch (Exception e) {
logger.error(
"Exception: {} while generating collationMapper for dataSource: {}, collationReference: {}",
@@ -79,4 +74,21 @@ public void processElement(
throw e;
}
}
+
+ @FinishBundle
+ public void finishBundle() throws Exception {
+ cleanupDataSource();
+ }
+
+ @Teardown
+ public void tearDown() throws Exception {
+ cleanupDataSource();
+ }
+
+ void cleanupDataSource() {
+ if (this.dataSourceManager != null) {
+ this.dataSourceManager.closeAll();
+ this.dataSourceManager = null;
+ }
+ }
}
diff --git a/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/uniformsplitter/transforms/CollationMapperTransform.java b/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/uniformsplitter/transforms/CollationMapperTransform.java
index 96424207c8..acd9862228 100644
--- a/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/uniformsplitter/transforms/CollationMapperTransform.java
+++ b/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/uniformsplitter/transforms/CollationMapperTransform.java
@@ -16,21 +16,21 @@
package com.google.cloud.teleport.v2.source.reader.io.jdbc.uniformsplitter.transforms;
import com.google.auto.value.AutoValue;
+import com.google.cloud.teleport.v2.source.reader.io.jdbc.uniformsplitter.DataSourceProvider;
import com.google.cloud.teleport.v2.source.reader.io.jdbc.uniformsplitter.UniformSplitterDBAdapter;
import com.google.cloud.teleport.v2.source.reader.io.jdbc.uniformsplitter.stringmapper.CollationMapper;
import com.google.cloud.teleport.v2.source.reader.io.jdbc.uniformsplitter.stringmapper.CollationReference;
+import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import java.io.Serializable;
import java.util.Map;
-import java.util.stream.Collectors;
-import javax.sql.DataSource;
import org.apache.beam.sdk.coders.CannotProvideCoderException;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollectionView;
@@ -44,10 +44,10 @@ public abstract class CollationMapperTransform
implements Serializable {
/** List of {@link CollationReference} to discover the mapping for. */
- public abstract ImmutableList collationReferences();
+ public abstract ImmutableList> collationReferences();
/** Provider for connection pool. */
- public abstract SerializableFunction dataSourceProviderFn();
+ public abstract DataSourceProvider dataSourceProvider();
/** Provider to dialect specific Collation mapping query. */
public abstract UniformSplitterDBAdapter dbAdapter();
@@ -73,12 +73,10 @@ public PCollectionView