Skip to content

Commit 95361c0

Browse files
committed
Refactoring Read With Uniform Partitions for multi shard graph size
1 parent f28a305 commit 95361c0

19 files changed

+1082
-194
lines changed

v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/uniformsplitter/transforms/CollationMapperDoFn.java

Lines changed: 32 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,8 @@
1515
*/
1616
package com.google.cloud.teleport.v2.source.reader.io.jdbc.uniformsplitter.transforms;
1717

18-
import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
19-
2018
import com.fasterxml.jackson.annotation.JsonIgnore;
19+
import com.google.cloud.teleport.v2.source.reader.io.jdbc.uniformsplitter.DataSourceProvider;
2120
import com.google.cloud.teleport.v2.source.reader.io.jdbc.uniformsplitter.UniformSplitterDBAdapter;
2221
import com.google.cloud.teleport.v2.source.reader.io.jdbc.uniformsplitter.stringmapper.CollationMapper;
2322
import com.google.cloud.teleport.v2.source.reader.io.jdbc.uniformsplitter.stringmapper.CollationReference;
@@ -27,48 +26,44 @@
2726
import javax.annotation.Nullable;
2827
import javax.sql.DataSource;
2928
import org.apache.beam.sdk.transforms.DoFn;
30-
import org.apache.beam.sdk.transforms.SerializableFunction;
3129
import org.apache.beam.sdk.values.KV;
3230
import org.slf4j.Logger;
3331
import org.slf4j.LoggerFactory;
3432

3533
/** Discover the Collation Mapping information for a given {@link CollationReference}. */
3634
public class CollationMapperDoFn
37-
extends DoFn<CollationReference, KV<CollationReference, CollationMapper>>
35+
extends DoFn<KV<String, CollationReference>, KV<CollationReference, CollationMapper>>
3836
implements Serializable {
3937

4038
private static final Logger logger = LoggerFactory.getLogger(CollationMapper.class);
41-
private final SerializableFunction<Void, DataSource> dataSourceProviderFn;
39+
private final DataSourceProvider dataSourceProvider;
4240
private final UniformSplitterDBAdapter dbAdapter;
41+
private transient DataSourceManager dataSourceManager;
4342

4443
@JsonIgnore private transient @Nullable DataSource dataSource;
4544

4645
public CollationMapperDoFn(
47-
SerializableFunction<Void, DataSource> dataSourceProviderFn,
48-
UniformSplitterDBAdapter dbAdapter) {
49-
this.dataSourceProviderFn = dataSourceProviderFn;
46+
DataSourceProvider dataSourceProvider, UniformSplitterDBAdapter dbAdapter) {
47+
this.dataSourceProvider = dataSourceProvider;
5048
this.dbAdapter = dbAdapter;
5149
this.dataSource = null;
5250
}
5351

54-
@Setup
55-
public void setup() throws Exception {
56-
dataSource = dataSourceProviderFn.apply(null);
57-
}
58-
59-
private Connection acquireConnection() throws SQLException {
60-
return checkStateNotNull(this.dataSource).getConnection();
52+
@StartBundle
53+
public void startBundle() throws Exception {
54+
this.dataSourceManager =
55+
DataSourceManagerImpl.builder().setDataSourceProvider(dataSourceProvider).build();
6156
}
6257

6358
@ProcessElement
6459
public void processElement(
65-
@Element CollationReference input,
60+
@Element KV<String, CollationReference> input,
6661
OutputReceiver<KV<CollationReference, CollationMapper>> out)
6762
throws SQLException {
68-
69-
try (Connection conn = acquireConnection()) {
70-
CollationMapper mapper = CollationMapper.fromDB(conn, dbAdapter, input);
71-
out.output(KV.of(input, mapper));
63+
DataSource dataSource = dataSourceManager.getDatasource(input.getKey());
64+
try (Connection conn = dataSource.getConnection()) {
65+
CollationMapper mapper = CollationMapper.fromDB(conn, dbAdapter, input.getValue());
66+
out.output(KV.of(input.getValue(), mapper));
7267
} catch (Exception e) {
7368
logger.error(
7469
"Exception: {} while generating collationMapper for dataSource: {}, collationReference: {}",
@@ -79,4 +74,21 @@ public void processElement(
7974
throw e;
8075
}
8176
}
77+
78+
@FinishBundle
79+
public void finishBundle() throws Exception {
80+
cleanupDataSource();
81+
}
82+
83+
@Teardown
84+
public void tearDown() throws Exception {
85+
cleanupDataSource();
86+
}
87+
88+
void cleanupDataSource() {
89+
if (this.dataSourceManager != null) {
90+
this.dataSourceManager.closeAll();
91+
this.dataSourceManager = null;
92+
}
93+
}
8294
}

v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/uniformsplitter/transforms/CollationMapperTransform.java

Lines changed: 37 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -16,21 +16,21 @@
1616
package com.google.cloud.teleport.v2.source.reader.io.jdbc.uniformsplitter.transforms;
1717

1818
import com.google.auto.value.AutoValue;
19+
import com.google.cloud.teleport.v2.source.reader.io.jdbc.uniformsplitter.DataSourceProvider;
1920
import com.google.cloud.teleport.v2.source.reader.io.jdbc.uniformsplitter.UniformSplitterDBAdapter;
2021
import com.google.cloud.teleport.v2.source.reader.io.jdbc.uniformsplitter.stringmapper.CollationMapper;
2122
import com.google.cloud.teleport.v2.source.reader.io.jdbc.uniformsplitter.stringmapper.CollationReference;
23+
import com.google.common.base.Preconditions;
2224
import com.google.common.collect.ImmutableList;
2325
import java.io.Serializable;
2426
import java.util.Map;
25-
import java.util.stream.Collectors;
26-
import javax.sql.DataSource;
2727
import org.apache.beam.sdk.coders.CannotProvideCoderException;
2828
import org.apache.beam.sdk.coders.KvCoder;
2929
import org.apache.beam.sdk.transforms.Create;
3030
import org.apache.beam.sdk.transforms.PTransform;
3131
import org.apache.beam.sdk.transforms.ParDo;
32-
import org.apache.beam.sdk.transforms.SerializableFunction;
3332
import org.apache.beam.sdk.transforms.View;
33+
import org.apache.beam.sdk.values.KV;
3434
import org.apache.beam.sdk.values.PBegin;
3535
import org.apache.beam.sdk.values.PCollectionView;
3636

@@ -44,10 +44,10 @@ public abstract class CollationMapperTransform
4444
implements Serializable {
4545

4646
/** List of {@link CollationReference} to discover the mapping for. */
47-
public abstract ImmutableList<CollationReference> collationReferences();
47+
public abstract ImmutableList<KV<String, CollationReference>> collationReferences();
4848

4949
/** Provider for connection pool. */
50-
public abstract SerializableFunction<Void, DataSource> dataSourceProviderFn();
50+
public abstract DataSourceProvider dataSourceProvider();
5151

5252
/** Provider to dialect specific Collation mapping query. */
5353
public abstract UniformSplitterDBAdapter dbAdapter();
@@ -73,12 +73,10 @@ public PCollectionView<Map<CollationReference, CollationMapper>> expand(PBegin i
7373
.apply("To Empty Map View", View.asMap());
7474
}
7575
return input
76-
.apply(
77-
"Create-Collation-References",
78-
Create.of(collationReferences().stream().distinct().collect(Collectors.toList())))
76+
.apply("Create-Collation-References", Create.of(collationReferences()))
7977
.apply(
8078
"Generate-Mappers",
81-
ParDo.of(new CollationMapperDoFn(dataSourceProviderFn(), dbAdapter())))
79+
ParDo.of(new CollationMapperDoFn(dataSourceProvider(), dbAdapter())))
8280
.setCoder(
8381
KvCoder.of(
8482
input.getPipeline().getCoderRegistry().getCoder(CollationReference.class),
@@ -96,13 +94,40 @@ public static Builder builder() {
9694

9795
@AutoValue.Builder
9896
public abstract static class Builder {
97+
private ImmutableList<CollationReference> collationReferencesToDiscover;
98+
99+
public Builder setCollationReferencesToDiscover(ImmutableList<CollationReference> value) {
100+
this.collationReferencesToDiscover = value;
101+
return this;
102+
}
103+
104+
abstract Builder setCollationReferences(ImmutableList<KV<String, CollationReference>> value);
99105

100-
public abstract Builder setCollationReferences(ImmutableList<CollationReference> value);
106+
public abstract Builder setDataSourceProvider(DataSourceProvider value);
101107

102-
public abstract Builder setDataSourceProviderFn(SerializableFunction<Void, DataSource> value);
108+
abstract DataSourceProvider dataSourceProvider();
103109

104110
public abstract Builder setDbAdapter(UniformSplitterDBAdapter value);
105111

106-
public abstract CollationMapperTransform build();
112+
public abstract CollationMapperTransform autoBuild();
113+
114+
public CollationMapperTransform build() {
115+
ImmutableList<CollationReference> deDupedRefs =
116+
collationReferencesToDiscover.stream()
117+
.distinct()
118+
.collect(ImmutableList.toImmutableList());
119+
ImmutableList<String> ids =
120+
ImmutableList.copyOf(this.dataSourceProvider().getDataSourceIds());
121+
Preconditions.checkState(ids.size() > 0, "No DataSources Configured for collation detection");
122+
ImmutableList.Builder<KV<String, CollationReference>> collationReferencesBuilder =
123+
ImmutableList.builder();
124+
// Round-robin collations across available shards.
125+
// All shards of the same database should have the same collation mapping.
126+
for (int i = 0; i < deDupedRefs.size(); i++) {
127+
collationReferencesBuilder.add(KV.of(ids.get(i % ids.size()), deDupedRefs.get(i)));
128+
}
129+
this.setCollationReferences(collationReferencesBuilder.build());
130+
return autoBuild();
131+
}
107132
}
108133
}

v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/uniformsplitter/transforms/MultiTableReadAll.java

Lines changed: 34 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
2121

2222
import com.google.auto.value.AutoValue;
23+
import com.google.cloud.teleport.v2.source.reader.io.jdbc.uniformsplitter.DataSourceProvider;
24+
import com.google.cloud.teleport.v2.source.reader.io.jdbc.uniformsplitter.DataSourceProviderImpl;
2325
import com.google.cloud.teleport.v2.source.reader.io.jdbc.uniformsplitter.range.TableIdentifier;
2426
import com.google.cloud.teleport.v2.source.reader.io.jdbc.uniformsplitter.range.TableReadSpecification;
2527
import com.google.common.annotations.VisibleForTesting;
@@ -83,7 +85,7 @@ public abstract class MultiTableReadAll<ParameterT, OutputT>
8385
private static final boolean DEFAULT_DISABLE_AUTO_COMMIT = true;
8486

8587
@Pure
86-
protected abstract @Nullable SerializableFunction<Void, DataSource> getDataSourceProviderFn();
88+
protected abstract @Nullable DataSourceProvider getDataSourceProvider();
8789

8890
@Pure
8991
protected abstract @Nullable ValueProvider<QueryProvider> getQueryProvider();
@@ -121,8 +123,8 @@ public static Builder builder() {
121123

122124
@AutoValue.Builder
123125
abstract static class Builder<ParameterT, OutputT> {
124-
abstract Builder<ParameterT, OutputT> setDataSourceProviderFn(
125-
SerializableFunction<Void, DataSource> dataSourceProviderFn);
126+
abstract Builder<ParameterT, OutputT> setDataSourceProvider(
127+
DataSourceProvider dataSourceProvider);
126128

127129
abstract Builder<ParameterT, OutputT> setQueryProvider(ValueProvider<QueryProvider> query);
128130

@@ -151,8 +153,8 @@ abstract Builder<ParameterT, OutputT> setTableIdentifierFn(
151153
* @return a new transform instance with the data source configured.
152154
*/
153155
public MultiTableReadAll<ParameterT, OutputT> withDataSourceConfiguration(
154-
DataSourceConfiguration config) {
155-
return withDataSourceProviderFn(DataSourceProviderFromDataSourceConfiguration.of(config));
156+
String id, DataSourceConfiguration config) {
157+
return withDataSourceProviderFn(id, DataSourceProviderFromDataSourceConfiguration.of(config));
156158
}
157159

158160
/**
@@ -162,13 +164,34 @@ public MultiTableReadAll<ParameterT, OutputT> withDataSourceConfiguration(
162164
* @return a new transform instance.
163165
*/
164166
public MultiTableReadAll<ParameterT, OutputT> withDataSourceProviderFn(
165-
SerializableFunction<Void, DataSource> dataSourceProviderFn) {
166-
if (getDataSourceProviderFn() != null) {
167+
String id, SerializableFunction<Void, DataSource> dataSourceProviderFn) {
168+
if (getDataSourceProvider() != null) {
167169
throw new IllegalArgumentException(
168170
"A dataSourceConfiguration or dataSourceProviderFn has "
169171
+ "already been provided, and does not need to be provided again.");
170172
}
171-
return toBuilder().setDataSourceProviderFn(dataSourceProviderFn).build();
173+
return withDataSourceProvider(
174+
DataSourceProviderImpl.builder().addDataSource(id, dataSourceProviderFn).build());
175+
}
176+
177+
/**
178+
* Configures a provider function for the data source.
179+
*
180+
* @param dataSourceProvider the data source provider function.
181+
* @return a new transform instance.
182+
*/
183+
public MultiTableReadAll<ParameterT, OutputT> withDataSourceProvider(
184+
DataSourceProvider dataSourceProvider) {
185+
if (dataSourceProvider == null) {
186+
throw new IllegalArgumentException(
187+
"DataSource can not be null "
188+
+ "already been provided, and does not need to be provided again.");
189+
}
190+
if (getDataSourceProvider() != null) {
191+
throw new IllegalArgumentException(
192+
"A dataSource has " + "already been provided, and does not need to be provided again.");
193+
}
194+
return toBuilder().setDataSourceProvider(dataSourceProvider).build();
172195
}
173196

174197
/**
@@ -322,7 +345,7 @@ public PCollection<OutputT> expand(PCollection<ParameterT> input) {
322345
.apply(
323346
ParDo.of(
324347
new MultiTableReadFn<>(
325-
checkStateNotNull(getDataSourceProviderFn()),
348+
checkStateNotNull(getDataSourceProvider()),
326349
checkStateNotNull(getQueryProvider()),
327350
checkStateNotNull(getParameterSetter()),
328351
getTableReadSpecifications(),
@@ -371,8 +394,8 @@ public void populateDisplayData(DisplayData.Builder builder) {
371394
if (getCoder() != null) {
372395
builder.add(DisplayData.item("coder", getCoder().getClass().getName()));
373396
}
374-
if (getDataSourceProviderFn() instanceof HasDisplayData) {
375-
((HasDisplayData) getDataSourceProviderFn()).populateDisplayData(builder);
397+
if (getDataSourceProvider() instanceof HasDisplayData) {
398+
((HasDisplayData) getDataSourceProvider()).populateDisplayData(builder);
376399
}
377400
}
378401

0 commit comments

Comments
 (0)