Skip to content

Commit 79fb360

Browse files
authored
[Python] Expand SDF by default in PortableRunner (#37965)
* [python] Expand SDF in PortableRunner default optimization Enable translations.expand_sdf in PortableRunner's default pre-optimization path so Python Read transforms are expanded for portable runners like Spark. Also add optimizer coverage for default SDF expansion, explicit pre_optimize=expand_sdf, and bounded Read expansion.\n\nRefs #24422. * [python] Fix formatting for PortableRunner SDF optimization * Make expand_sdf portable pre_optimize opt-in * chore: retrigger CI
1 parent ce20f73 commit 79fb360

File tree

6 files changed

+108
-5
lines changed

6 files changed

+108
-5
lines changed
Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
{
22
"https://github.com/apache/beam/pull/32648": "testing addition of Flink 1.19 support",
3-
"https://github.com/apache/beam/pull/34830": "testing"
3+
"https://github.com/apache/beam/pull/34830": "testing",
4+
"trigger-2026-04-04": "portable_runner expand_sdf opt-in"
45
}
Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
{
22
"https://github.com/apache/beam/pull/34830": "testing",
3-
"https://github.com/apache/beam/issues/35429": "testing"
3+
"https://github.com/apache/beam/issues/35429": "testing",
4+
"trigger-2026-04-04": "portable_runner expand_sdf opt-in"
45
}
Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
11
{
2-
"modification": 2
2+
"modification": 2,
3+
"trigger-2026-04-04": "portable_runner expand_sdf opt-in"
34
}
Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1,3 @@
1-
{}
1+
{
2+
"trigger-2026-04-04": "portable_runner expand_sdf opt-in"
3+
}

sdks/python/apache_beam/runners/portability/portable_runner.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -335,7 +335,7 @@ def _optimize_pipeline(
335335
phases = []
336336
for phase_name in pre_optimize.split(','):
337337
# For now, these are all we allow.
338-
if phase_name in ('pack_combiners', 'lift_combiners'):
338+
if phase_name in ('pack_combiners', 'lift_combiners', 'expand_sdf'):
339339
phases.append(getattr(translations, phase_name))
340340
else:
341341
raise ValueError(

sdks/python/apache_beam/runners/portability/portable_runner_test.py

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -460,6 +460,104 @@ def create_options(self):
460460
return options
461461

462462

463+
class PortableRunnerOptimizationTest(unittest.TestCase):
464+
"""Tests for PortableRunner._optimize_pipeline."""
465+
@staticmethod
466+
def _transform_urns(proto, options):
467+
optimized = PortableRunner._optimize_pipeline(proto, options)
468+
return {
469+
t.spec.urn
470+
for t in optimized.components.transforms.values() if t.spec.urn
471+
}
472+
473+
def test_custom_optimize_expand_sdf(self):
474+
"""Verify that expand_sdf can be requested explicitly.
475+
476+
See https://github.com/apache/beam/issues/24422.
477+
"""
478+
from apache_beam.io import restriction_trackers
479+
from apache_beam.portability import common_urns
480+
481+
class ExpandStringsProvider(beam.transforms.core.RestrictionProvider):
482+
def initial_restriction(self, element):
483+
return restriction_trackers.OffsetRange(0, len(element))
484+
485+
def create_tracker(self, restriction):
486+
return restriction_trackers.OffsetRestrictionTracker(restriction)
487+
488+
def restriction_size(self, element, restriction):
489+
return restriction.size()
490+
491+
class ExpandingStringsDoFn(beam.DoFn):
492+
def process(
493+
self,
494+
element,
495+
restriction_tracker=beam.DoFn.RestrictionParam(
496+
ExpandStringsProvider())):
497+
cur = restriction_tracker.current_restriction().start
498+
while restriction_tracker.try_claim(cur):
499+
yield element[cur]
500+
cur += 1
501+
502+
p = beam.Pipeline()
503+
_ = (p | beam.Create(['abc']) | beam.ParDo(ExpandingStringsDoFn()))
504+
proto = p.to_runner_api()
505+
506+
transform_urns = self._transform_urns(
507+
proto, PipelineOptions(['--experiments=pre_optimize=expand_sdf']))
508+
509+
self.assertIn(
510+
common_urns.sdf_components.PAIR_WITH_RESTRICTION.urn, transform_urns)
511+
self.assertIn(
512+
common_urns.sdf_components.SPLIT_AND_SIZE_RESTRICTIONS.urn,
513+
transform_urns)
514+
self.assertIn(
515+
common_urns.sdf_components.PROCESS_SIZED_ELEMENTS_AND_RESTRICTIONS.urn,
516+
transform_urns)
517+
518+
def test_custom_optimize_expands_bounded_read(self):
519+
"""Verify that iobase.Read(BoundedSource) expands with explicit expand_sdf.
520+
521+
This is the end-to-end scenario from
522+
https://github.com/apache/beam/issues/24422: Read transforms like
523+
ReadFromParquet use SDFs internally. With explicit expand_sdf, these are
524+
expanded before reaching the Spark job server as a single ParDo,
525+
executing on one partition with no parallelization.
526+
"""
527+
from apache_beam.io import iobase
528+
from apache_beam.portability import common_urns
529+
530+
class _FakeBoundedSource(iobase.BoundedSource):
531+
def get_range_tracker(self, start_position, stop_position):
532+
return None
533+
534+
def read(self, range_tracker):
535+
return iter([])
536+
537+
def estimate_size(self):
538+
return 0
539+
540+
p = beam.Pipeline()
541+
_ = p | beam.io.Read(_FakeBoundedSource())
542+
proto = p.to_runner_api()
543+
544+
transform_urns = self._transform_urns(
545+
proto, PipelineOptions(['--experiments=pre_optimize=expand_sdf']))
546+
547+
# The SDFBoundedSourceReader DoFn should have been expanded into
548+
# SDF component stages.
549+
self.assertIn(
550+
common_urns.sdf_components.PAIR_WITH_RESTRICTION.urn, transform_urns)
551+
self.assertIn(
552+
common_urns.sdf_components.SPLIT_AND_SIZE_RESTRICTIONS.urn,
553+
transform_urns)
554+
self.assertIn(
555+
common_urns.sdf_components.PROCESS_SIZED_ELEMENTS_AND_RESTRICTIONS.urn,
556+
transform_urns)
557+
# Reshuffle should be present to enable parallelization.
558+
self.assertIn(common_urns.composites.RESHUFFLE.urn, transform_urns)
559+
560+
463561
if __name__ == '__main__':
464562
logging.getLogger().setLevel(logging.INFO)
465563
unittest.main()

0 commit comments

Comments
 (0)