4949import com .google .common .collect .ImmutableList ;
5050import com .google .common .collect .ImmutableMap ;
5151import com .google .common .collect .ImmutableSet ;
52+ import com .google .common .collect .Lists ;
5253import java .sql .SQLException ;
5354import java .util .List ;
5455import java .util .concurrent .ExecutionException ;
@@ -640,42 +641,47 @@ private static PTransform<PBegin, PCollection<SourceRow>> getJdbcIO(
640641
641642 /* Todo in subsequent PR for multishard graphsize support, pass this to table reader. */
642643 DataSourceProvider dataSourceProvider = getDataSourceProvider (perSourceDiscoveries );
643- for (PerSourceDiscovery perSourceDiscovery : perSourceDiscoveries ) {
644- ImmutableList .Builder <SourceTableReference > tableReferencesBuilder = ImmutableList .builder ();
645- ImmutableList .Builder <TableSplitSpecification > splitSpecsBuilder = ImmutableList .builder ();
646- ImmutableMap .Builder <TableIdentifier , TableReadSpecification <SourceRow >> readSpecsBuilder =
647- ImmutableMap .builder ();
648- JdbcIOWrapperConfig config = perSourceDiscovery .config ();
649- DataSourceConfiguration dataSourceConfiguration =
650- perSourceDiscovery .dataSourceConfiguration ();
651- ImmutableList <TableConfig > tableConfigs = perSourceDiscovery .tableConfigs ();
652-
653- if (!config .readWithUniformPartitionsFeatureEnabled () || tableConfigs .isEmpty ()) {
654- continue ;
655- }
656- accumulateSpecs (
657- perSourceDiscovery , tableReferencesBuilder , splitSpecsBuilder , readSpecsBuilder );
658-
659- ReadWithUniformPartitions <SourceRow > readWithUniformPartitions =
660- ReadWithUniformPartitions .<SourceRow >builder ()
661- .setTableSplitSpecifications (splitSpecsBuilder .build ())
662- .setTableReadSpecifications (readSpecsBuilder .build ())
663- .setDataSourceProviderFn (
664- JdbcIO .PoolableDataSourceProvider .of (dataSourceConfiguration ))
665- .setDbAdapter (dialectAdapter )
666- .setWaitOn (waitOn )
667- .setDbParallelizationForSplitProcess (dbParallelizationForSplitProcess )
668- .setDbParallelizationForReads (dbParallelizationForReads )
669- .setAdditionalOperationsOnRanges (additionalOperationsOnRanges )
670- .build ();
671-
672- LOG .info (
673- "Configured Multi-Table ReadWithUniformPartitions {} for tables {} with config {}" ,
674- readWithUniformPartitions ,
675- tableConfigs .stream ().map (TableConfig ::tableName ).collect (Collectors .toList ()),
676- config );
677- tableReadersBuilder .put (tableReferencesBuilder .build (), readWithUniformPartitions );
644+ ImmutableList .Builder <SourceTableReference > tableReferencesBuilder = ImmutableList .builder ();
645+ ImmutableList .Builder <TableSplitSpecification > splitSpecsBuilder = ImmutableList .builder ();
646+ ImmutableMap .Builder <TableIdentifier , TableReadSpecification <SourceRow >> readSpecsBuilder =
647+ ImmutableMap .builder ();
648+ accumulateSpecs (
649+ perSourceDiscoveries , tableReferencesBuilder , splitSpecsBuilder , readSpecsBuilder );
650+
651+ ImmutableList <SourceTableReference > tableReferences = tableReferencesBuilder .build ();
652+ if (tableReferences .isEmpty ()) {
653+ return ImmutableMap .of ();
678654 }
655+
656+ ReadWithUniformPartitions <SourceRow > readWithUniformPartitions =
657+ ReadWithUniformPartitions .<SourceRow >builder ()
658+ .setTableSplitSpecifications (splitSpecsBuilder .build ())
659+ .setTableReadSpecifications (readSpecsBuilder .build ())
660+ .setDataSourceProvider (dataSourceProvider )
661+ .setDbAdapter (dialectAdapter )
662+ .setWaitOn (waitOn )
663+ .setDbParallelizationForSplitProcess (dbParallelizationForSplitProcess )
664+ .setDbParallelizationForReads (dbParallelizationForReads )
665+ .setAdditionalOperationsOnRanges (additionalOperationsOnRanges )
666+ .build ();
667+
668+ Lists .partition (perSourceDiscoveries , 50 )
669+ .forEach (
670+ batch ->
671+ LOG .info (
672+ "Configured Multi-Table ReadWithUniformPartitions for sources batch: {}" ,
673+ batch .stream ()
674+ .map (
675+ d ->
676+ "id="
677+ + d .config ().id ()
678+ + ":"
679+ + "shard_id="
680+ + d .config ().shardID ()
681+ + ":'"
682+ + d .sourceSchema ().schemaReference ().jdbc ().toString ())
683+ .collect (Collectors .joining ("," ))));
684+ tableReadersBuilder .put (tableReferences , readWithUniformPartitions );
679685 return tableReadersBuilder .build ();
680686 }
681687
@@ -701,58 +707,71 @@ private static PTransform<PBegin, PCollection<SourceRow>> getJdbcIO(
701707 /* Todo in subsequent PR for multishard graphsize support accumulate specs across a list of sourceDiscovereies */
702708 @ VisibleForTesting
703709 protected static void accumulateSpecs (
704- PerSourceDiscovery perSourceDiscovery ,
710+ ImmutableList < PerSourceDiscovery > perSourceDiscoveries ,
705711 ImmutableList .Builder <SourceTableReference > tableReferencesBuilder ,
706712 ImmutableList .Builder <TableSplitSpecification > splitSpecsBuilder ,
707713 ImmutableMap .Builder <TableIdentifier , TableReadSpecification <SourceRow >> readSpecsBuilder ) {
708714
709- JdbcIOWrapperConfig config = perSourceDiscovery .config ();
710- SourceSchemaReference sourceSchemaReference =
711- perSourceDiscovery .sourceSchema ().schemaReference ();
712- ImmutableList <TableConfig > tableConfigs = perSourceDiscovery .tableConfigs ();
713- for (TableConfig tableConfig : tableConfigs ) {
714- SourceTableSchema sourceTableSchema =
715- findSourceTableSchema (perSourceDiscovery .sourceSchema (), tableConfig );
716- int fetchSize = getFetchSize (config , tableConfig , sourceTableSchema );
717- TableIdentifier tableIdentifier = getTableIdentifier (tableConfig );
718-
719- TableSplitSpecification .Builder tableSplitSpecificationBuilder =
720- TableSplitSpecification .builder ()
721- .setTableIdentifier (tableIdentifier )
722- .setPartitionColumns (tableConfig .partitionColumns ())
723- .setApproxRowCount (tableConfig .approxRowCount ());
724- if (tableConfig .maxPartitions () != null ) {
725- tableSplitSpecificationBuilder =
726- tableSplitSpecificationBuilder .setMaxPartitionsHint ((long ) tableConfig .maxPartitions ());
727- }
728- if (config .splitStageCountHint () >= 0 ) {
729- tableSplitSpecificationBuilder =
730- tableSplitSpecificationBuilder .setSplitStagesCount ((long ) config .splitStageCountHint ());
731- }
732- splitSpecsBuilder .add (tableSplitSpecificationBuilder .build ());
733-
734- TableReadSpecification .Builder <SourceRow > tableReadSpecificationBuilder =
735- TableReadSpecification .<SourceRow >builder ()
736- .setFetchSize (fetchSize )
737- .setTableIdentifier (tableIdentifier )
738- .setRowMapper (
739- new JdbcSourceRowMapper (
740- config .valueMappingsProvider (),
741- sourceSchemaReference ,
742- sourceTableSchema ,
743- config .shardID ()));
744- if (config .maxFetchSize () != null ) {
745- tableReadSpecificationBuilder =
746- tableReadSpecificationBuilder .setFetchSize (config .maxFetchSize ());
715+ for (PerSourceDiscovery perSourceDiscovery : perSourceDiscoveries ) {
716+ JdbcIOWrapperConfig config = perSourceDiscovery .config ();
717+ SourceSchemaReference sourceSchemaReference =
718+ perSourceDiscovery .sourceSchema ().schemaReference ();
719+ ImmutableList <TableConfig > tableConfigs = perSourceDiscovery .tableConfigs ();
720+ for (TableConfig tableConfig : tableConfigs ) {
721+
722+ if (!config .readWithUniformPartitionsFeatureEnabled () || tableConfigs .isEmpty ()) {
723+ continue ;
724+ }
725+ SourceTableSchema sourceTableSchema =
726+ findSourceTableSchema (perSourceDiscovery .sourceSchema (), tableConfig );
727+ int fetchSize = getFetchSize (config , tableConfig , sourceTableSchema );
728+ TableIdentifier tableIdentifier = getTableIdentifier (tableConfig );
729+
730+ TableSplitSpecification .Builder tableSplitSpecificationBuilder =
731+ TableSplitSpecification .builder ()
732+ .setTableIdentifier (tableIdentifier )
733+ .setPartitionColumns (tableConfig .partitionColumns ())
734+ .setApproxRowCount (tableConfig .approxRowCount ());
735+ if (tableConfig .maxPartitions () != null ) {
736+ tableSplitSpecificationBuilder =
737+ tableSplitSpecificationBuilder .setMaxPartitionsHint (
738+ (long ) tableConfig .maxPartitions ());
739+ }
740+ if (config .splitStageCountHint () >= 0 ) {
741+ tableSplitSpecificationBuilder =
742+ tableSplitSpecificationBuilder .setSplitStagesCount (
743+ (long ) config .splitStageCountHint ());
744+ }
745+ splitSpecsBuilder .add (tableSplitSpecificationBuilder .build ());
746+
747+ TableReadSpecification .Builder <SourceRow > tableReadSpecificationBuilder =
748+ TableReadSpecification .<SourceRow >builder ()
749+ .setFetchSize (fetchSize )
750+ .setTableIdentifier (tableIdentifier )
751+ .setRowMapper (
752+ new JdbcSourceRowMapper (
753+ config .valueMappingsProvider (),
754+ sourceSchemaReference ,
755+ sourceTableSchema ,
756+ config .shardID ()));
757+ if (config .maxFetchSize () != null ) {
758+ tableReadSpecificationBuilder =
759+ tableReadSpecificationBuilder .setFetchSize (config .maxFetchSize ());
760+ }
761+ readSpecsBuilder .put (tableIdentifier , tableReadSpecificationBuilder .build ());
762+
763+ tableReferencesBuilder .add (
764+ SourceTableReference .builder ()
765+ .setSourceSchemaReference (sourceSchemaReference )
766+ .setSourceTableName (delimitIdentifier (sourceTableSchema .tableName ()))
767+ .setSourceTableSchemaUUID (sourceTableSchema .tableSchemaUUID ())
768+ .build ());
769+ LOG .info (
770+ "Configuring Multi-Table ReadWithUniformPartitions for source-id {} tables {} with config {}" ,
771+ config .id (),
772+ tableConfigs .stream ().map (TableConfig ::tableName ).collect (Collectors .toList ()),
773+ config );
747774 }
748- readSpecsBuilder .put (tableIdentifier , tableReadSpecificationBuilder .build ());
749-
750- tableReferencesBuilder .add (
751- SourceTableReference .builder ()
752- .setSourceSchemaReference (sourceSchemaReference )
753- .setSourceTableName (delimitIdentifier (sourceTableSchema .tableName ()))
754- .setSourceTableSchemaUUID (sourceTableSchema .tableSchemaUUID ())
755- .build ());
756775 }
757776 }
758777
0 commit comments