4949import com .google .common .collect .ImmutableList ;
5050import com .google .common .collect .ImmutableMap ;
5151import com .google .common .collect .ImmutableSet ;
52+ import com .google .common .collect .Lists ;
5253import java .sql .SQLException ;
5354import java .util .List ;
5455import java .util .concurrent .ExecutionException ;
@@ -638,42 +639,47 @@ private static PTransform<PBegin, PCollection<SourceRow>> getJdbcIO(
638639
639640 /* Todo in subsequent PR for multishard graphsize support, pass this to table reader. */
640641 DataSourceProvider dataSourceProvider = getDataSourceProvider (perSourceDiscoveries );
641- for (PerSourceDiscovery perSourceDiscovery : perSourceDiscoveries ) {
642- ImmutableList .Builder <SourceTableReference > tableReferencesBuilder = ImmutableList .builder ();
643- ImmutableList .Builder <TableSplitSpecification > splitSpecsBuilder = ImmutableList .builder ();
644- ImmutableMap .Builder <TableIdentifier , TableReadSpecification <SourceRow >> readSpecsBuilder =
645- ImmutableMap .builder ();
646- JdbcIOWrapperConfig config = perSourceDiscovery .config ();
647- DataSourceConfiguration dataSourceConfiguration =
648- perSourceDiscovery .dataSourceConfiguration ();
649- ImmutableList <TableConfig > tableConfigs = perSourceDiscovery .tableConfigs ();
650-
651- if (!config .readWithUniformPartitionsFeatureEnabled () || tableConfigs .isEmpty ()) {
652- continue ;
653- }
654- accumulateSpecs (
655- perSourceDiscovery , tableReferencesBuilder , splitSpecsBuilder , readSpecsBuilder );
656-
657- ReadWithUniformPartitions <SourceRow > readWithUniformPartitions =
658- ReadWithUniformPartitions .<SourceRow >builder ()
659- .setTableSplitSpecifications (splitSpecsBuilder .build ())
660- .setTableReadSpecifications (readSpecsBuilder .build ())
661- .setDataSourceProviderFn (
662- JdbcIO .PoolableDataSourceProvider .of (dataSourceConfiguration ))
663- .setDbAdapter (dialectAdapter )
664- .setWaitOn (waitOn )
665- .setDbParallelizationForSplitProcess (dbParallelizationForSplitProcess )
666- .setDbParallelizationForReads (dbParallelizationForReads )
667- .setAdditionalOperationsOnRanges (additionalOperationsOnRanges )
668- .build ();
669-
670- LOG .info (
671- "Configured Multi-Table ReadWithUniformPartitions {} for tables {} with config {}" ,
672- readWithUniformPartitions ,
673- tableConfigs .stream ().map (TableConfig ::tableName ).collect (Collectors .toList ()),
674- config );
675- tableReadersBuilder .put (tableReferencesBuilder .build (), readWithUniformPartitions );
642+ ImmutableList .Builder <SourceTableReference > tableReferencesBuilder = ImmutableList .builder ();
643+ ImmutableList .Builder <TableSplitSpecification > splitSpecsBuilder = ImmutableList .builder ();
644+ ImmutableMap .Builder <TableIdentifier , TableReadSpecification <SourceRow >> readSpecsBuilder =
645+ ImmutableMap .builder ();
646+ accumulateSpecs (
647+ perSourceDiscoveries , tableReferencesBuilder , splitSpecsBuilder , readSpecsBuilder );
648+
649+ ImmutableList <SourceTableReference > tableReferences = tableReferencesBuilder .build ();
650+ if (tableReferences .isEmpty ()) {
651+ return ImmutableMap .of ();
676652 }
653+
654+ ReadWithUniformPartitions <SourceRow > readWithUniformPartitions =
655+ ReadWithUniformPartitions .<SourceRow >builder ()
656+ .setTableSplitSpecifications (splitSpecsBuilder .build ())
657+ .setTableReadSpecifications (readSpecsBuilder .build ())
658+ .setDataSourceProvider (dataSourceProvider )
659+ .setDbAdapter (dialectAdapter )
660+ .setWaitOn (waitOn )
661+ .setDbParallelizationForSplitProcess (dbParallelizationForSplitProcess )
662+ .setDbParallelizationForReads (dbParallelizationForReads )
663+ .setAdditionalOperationsOnRanges (additionalOperationsOnRanges )
664+ .build ();
665+
666+ Lists .partition (perSourceDiscoveries , 50 )
667+ .forEach (
668+ batch ->
669+ LOG .info (
670+ "Configured Multi-Table ReadWithUniformPartitions for sources batch: {}" ,
671+ batch .stream ()
672+ .map (
673+ d ->
674+ "id="
675+ + d .config ().id ()
676+ + ":"
677+ + "shard_id="
678+ + d .config ().shardID ()
679+ + ":'"
680+ + d .sourceSchema ().schemaReference ().jdbc ().toString ())
681+ .collect (Collectors .joining ("," ))));
682+ tableReadersBuilder .put (tableReferences , readWithUniformPartitions );
677683 return tableReadersBuilder .build ();
678684 }
679685
@@ -699,58 +705,71 @@ private static PTransform<PBegin, PCollection<SourceRow>> getJdbcIO(
699705 /* Todo in subsequent PR for multishard graphsize support accumulate specs across a list of sourceDiscovereies */
700706 @ VisibleForTesting
701707 protected static void accumulateSpecs (
702- PerSourceDiscovery perSourceDiscovery ,
708+ ImmutableList < PerSourceDiscovery > perSourceDiscoveries ,
703709 ImmutableList .Builder <SourceTableReference > tableReferencesBuilder ,
704710 ImmutableList .Builder <TableSplitSpecification > splitSpecsBuilder ,
705711 ImmutableMap .Builder <TableIdentifier , TableReadSpecification <SourceRow >> readSpecsBuilder ) {
706712
707- JdbcIOWrapperConfig config = perSourceDiscovery .config ();
708- SourceSchemaReference sourceSchemaReference =
709- perSourceDiscovery .sourceSchema ().schemaReference ();
710- ImmutableList <TableConfig > tableConfigs = perSourceDiscovery .tableConfigs ();
711- for (TableConfig tableConfig : tableConfigs ) {
712- SourceTableSchema sourceTableSchema =
713- findSourceTableSchema (perSourceDiscovery .sourceSchema (), tableConfig );
714- int fetchSize = getFetchSize (config , tableConfig , sourceTableSchema );
715- TableIdentifier tableIdentifier = getTableIdentifier (tableConfig );
716-
717- TableSplitSpecification .Builder tableSplitSpecificationBuilder =
718- TableSplitSpecification .builder ()
719- .setTableIdentifier (tableIdentifier )
720- .setPartitionColumns (tableConfig .partitionColumns ())
721- .setApproxRowCount (tableConfig .approxRowCount ());
722- if (tableConfig .maxPartitions () != null ) {
723- tableSplitSpecificationBuilder =
724- tableSplitSpecificationBuilder .setMaxPartitionsHint ((long ) tableConfig .maxPartitions ());
725- }
726- if (config .splitStageCountHint () >= 0 ) {
727- tableSplitSpecificationBuilder =
728- tableSplitSpecificationBuilder .setSplitStagesCount ((long ) config .splitStageCountHint ());
729- }
730- splitSpecsBuilder .add (tableSplitSpecificationBuilder .build ());
731-
732- TableReadSpecification .Builder <SourceRow > tableReadSpecificationBuilder =
733- TableReadSpecification .<SourceRow >builder ()
734- .setFetchSize (fetchSize )
735- .setTableIdentifier (tableIdentifier )
736- .setRowMapper (
737- new JdbcSourceRowMapper (
738- config .valueMappingsProvider (),
739- sourceSchemaReference ,
740- sourceTableSchema ,
741- config .shardID ()));
742- if (config .maxFetchSize () != null ) {
743- tableReadSpecificationBuilder =
744- tableReadSpecificationBuilder .setFetchSize (config .maxFetchSize ());
713+ for (PerSourceDiscovery perSourceDiscovery : perSourceDiscoveries ) {
714+ JdbcIOWrapperConfig config = perSourceDiscovery .config ();
715+ SourceSchemaReference sourceSchemaReference =
716+ perSourceDiscovery .sourceSchema ().schemaReference ();
717+ ImmutableList <TableConfig > tableConfigs = perSourceDiscovery .tableConfigs ();
718+ for (TableConfig tableConfig : tableConfigs ) {
719+
720+ if (!config .readWithUniformPartitionsFeatureEnabled () || tableConfigs .isEmpty ()) {
721+ continue ;
722+ }
723+ SourceTableSchema sourceTableSchema =
724+ findSourceTableSchema (perSourceDiscovery .sourceSchema (), tableConfig );
725+ int fetchSize = getFetchSize (config , tableConfig , sourceTableSchema );
726+ TableIdentifier tableIdentifier = getTableIdentifier (tableConfig );
727+
728+ TableSplitSpecification .Builder tableSplitSpecificationBuilder =
729+ TableSplitSpecification .builder ()
730+ .setTableIdentifier (tableIdentifier )
731+ .setPartitionColumns (tableConfig .partitionColumns ())
732+ .setApproxRowCount (tableConfig .approxRowCount ());
733+ if (tableConfig .maxPartitions () != null ) {
734+ tableSplitSpecificationBuilder =
735+ tableSplitSpecificationBuilder .setMaxPartitionsHint (
736+ (long ) tableConfig .maxPartitions ());
737+ }
738+ if (config .splitStageCountHint () >= 0 ) {
739+ tableSplitSpecificationBuilder =
740+ tableSplitSpecificationBuilder .setSplitStagesCount (
741+ (long ) config .splitStageCountHint ());
742+ }
743+ splitSpecsBuilder .add (tableSplitSpecificationBuilder .build ());
744+
745+ TableReadSpecification .Builder <SourceRow > tableReadSpecificationBuilder =
746+ TableReadSpecification .<SourceRow >builder ()
747+ .setFetchSize (fetchSize )
748+ .setTableIdentifier (tableIdentifier )
749+ .setRowMapper (
750+ new JdbcSourceRowMapper (
751+ config .valueMappingsProvider (),
752+ sourceSchemaReference ,
753+ sourceTableSchema ,
754+ config .shardID ()));
755+ if (config .maxFetchSize () != null ) {
756+ tableReadSpecificationBuilder =
757+ tableReadSpecificationBuilder .setFetchSize (config .maxFetchSize ());
758+ }
759+ readSpecsBuilder .put (tableIdentifier , tableReadSpecificationBuilder .build ());
760+
761+ tableReferencesBuilder .add (
762+ SourceTableReference .builder ()
763+ .setSourceSchemaReference (sourceSchemaReference )
764+ .setSourceTableName (delimitIdentifier (sourceTableSchema .tableName ()))
765+ .setSourceTableSchemaUUID (sourceTableSchema .tableSchemaUUID ())
766+ .build ());
767+ LOG .info (
768+ "Configuring Multi-Table ReadWithUniformPartitions for source-id {} tables {} with config {}" ,
769+ config .id (),
770+ tableConfigs .stream ().map (TableConfig ::tableName ).collect (Collectors .toList ()),
771+ config );
745772 }
746- readSpecsBuilder .put (tableIdentifier , tableReadSpecificationBuilder .build ());
747-
748- tableReferencesBuilder .add (
749- SourceTableReference .builder ()
750- .setSourceSchemaReference (sourceSchemaReference )
751- .setSourceTableName (delimitIdentifier (sourceTableSchema .tableName ()))
752- .setSourceTableSchemaUUID (sourceTableSchema .tableSchemaUUID ())
753- .build ());
754773 }
755774 }
756775
0 commit comments