Skip to content

Commit c9e1fa0

Browse files
committed
[Enrichment] Expose max_batch_duration_secs in SQL/BQ handlers (#38243)
1 parent ac161e8 commit c9e1fa0

File tree

5 files changed

+113
-7
lines changed

5 files changed

+113
-7
lines changed

CHANGES.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@
8686
## Bugfixes
8787

8888
* Fixed BigQueryEnrichmentHandler batch mode dropping earlier requests when multiple requests share the same enrichment key (Python) ([#38035](https://github.com/apache/beam/issues/38035)).
89+
* Added `max_batch_duration_secs` passthrough support in Python Enrichment BigQuery and CloudSQL handlers so batching duration can be forwarded to `BatchElements` ([#38243](https://github.com/apache/beam/issues/38243)).
8990

9091
## Security Fixes
9192

sdks/python/apache_beam/transforms/enrichment_handlers/bigquery.py

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -73,8 +73,9 @@ class BigQueryEnrichmentHandler(EnrichmentSourceHandler[Union[Row, list[Row]],
7373
names to fetch.
7474
7575
This handler pulls data from BigQuery per element by default. To change this
76-
behavior, set the `min_batch_size` and `max_batch_size` parameters.
77-
These min and max values for batch size are sent to the
76+
behavior, set the `min_batch_size`, `max_batch_size`, and
77+
`max_batch_duration_secs` parameters.
78+
These batching values are sent to the
7879
:class:`apache_beam.transforms.utils.BatchElements` transform.
7980
8081
NOTE: Elements cannot be batched when using the `query_fn` parameter.
@@ -91,6 +92,7 @@ def __init__(
9192
query_fn: Optional[QueryFn] = None,
9293
min_batch_size: int = 1,
9394
max_batch_size: int = 10000,
95+
max_batch_duration_secs: Optional[float] = None,
9496
throw_exception_on_empty_results: bool = True,
9597
**kwargs,
9698
):
@@ -124,11 +126,14 @@ def __init__(
124126
querying BigQuery. Defaults to 1 if `query_fn` is not specified.
125127
max_batch_size (int): Maximum number of rows to batch together.
126128
Defaults to 10,000 if `query_fn` is not specified.
129+
max_batch_duration_secs (float): Maximum amount of time in seconds to
130+
buffer a batch before emitting it. If not provided, batching duration
131+
is determined by `BatchElements` defaults.
127132
**kwargs: Additional keyword arguments to pass to `bigquery.Client`.
128133
129134
Note:
130-
* `min_batch_size` and `max_batch_size` cannot be defined if the
131-
`query_fn` is provided.
135+
* `min_batch_size`, `max_batch_size`, and `max_batch_duration_secs`
136+
are not used if `query_fn` is provided.
132137
* Either `fields` or `condition_value_fn` must be provided for query
133138
construction if `query_fn` is not provided.
134139
* Ensure appropriate permissions are granted for BigQuery access.
@@ -156,6 +161,9 @@ def __init__(
156161
if not query_fn:
157162
self._batching_kwargs['min_batch_size'] = min_batch_size
158163
self._batching_kwargs['max_batch_size'] = max_batch_size
164+
if max_batch_duration_secs is not None:
165+
self._batching_kwargs[
166+
'max_batch_duration_secs'] = max_batch_duration_secs
159167

160168
def __enter__(self):
161169
self.client = bigquery.Client(project=self.project, **self.kwargs)

sdks/python/apache_beam/transforms/enrichment_handlers/bigquery_test.py

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,41 @@ def test_batch_mode_emits_empty_rows_for_all_unmatched_duplicate_keys(self):
112112
[(requests[0], beam.Row()), (requests[1], beam.Row())],
113113
)
114114

115+
def test_batch_elements_kwargs_include_max_batch_duration_secs(self):
116+
handler = BigQueryEnrichmentHandler(
117+
project=self.project,
118+
table_name='project.dataset.table',
119+
row_restriction_template="id='{}'",
120+
fields=['id'],
121+
min_batch_size=2,
122+
max_batch_size=10,
123+
max_batch_duration_secs=0.75,
124+
)
125+
126+
self.assertEqual(
127+
handler.batch_elements_kwargs(),
128+
{
129+
'min_batch_size': 2,
130+
'max_batch_size': 10,
131+
'max_batch_duration_secs': 0.75,
132+
})
133+
134+
def test_batch_elements_kwargs_omit_max_batch_duration_secs_by_default(self):
135+
handler = BigQueryEnrichmentHandler(
136+
project=self.project,
137+
table_name='project.dataset.table',
138+
row_restriction_template="id='{}'",
139+
fields=['id'],
140+
min_batch_size=2,
141+
max_batch_size=10,
142+
)
143+
144+
self.assertEqual(
145+
handler.batch_elements_kwargs(), {
146+
'min_batch_size': 2,
147+
'max_batch_size': 10,
148+
})
149+
115150

116151
if __name__ == '__main__':
117152
unittest.main()

sdks/python/apache_beam/transforms/enrichment_handlers/cloudsql.py

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -243,8 +243,9 @@ class CloudSQLEnrichmentHandler(EnrichmentSourceHandler[beam.Row, beam.Row]):
243243
the desired column names.
244244
245245
This handler queries the Cloud SQL database per element by default.
246-
To enable batching, set the `min_batch_size` and `max_batch_size` parameters.
247-
These values control the batching behavior in the
246+
To enable batching, set the `min_batch_size`, `max_batch_size`, and
247+
`max_batch_duration_secs` parameters. These values control batching behavior
248+
in the
248249
:class:`apache_beam.transforms.utils.BatchElements` transform.
249250
250251
NOTE: Batching is not supported when using the CustomQueryConfig.
@@ -257,6 +258,7 @@ def __init__(
257258
column_names: Optional[list[str]] = None,
258259
min_batch_size: int = 1,
259260
max_batch_size: int = 10000,
261+
max_batch_duration_secs: Optional[float] = None,
260262
**kwargs,
261263
):
262264
"""
@@ -290,11 +292,15 @@ def __init__(
290292
querying the database. Defaults to 1 if `query_fn` is not used.
291293
max_batch_size (int): Maximum number of rows to batch together. Defaults
292294
to 10,000 if `query_fn` is not used.
295+
max_batch_duration_secs (float): Maximum amount of time in seconds to
296+
buffer a batch before emitting it. If not provided, batching duration
297+
is determined by `BatchElements` defaults.
293298
**kwargs: Additional keyword arguments for database connection or query
294299
handling.
295300
296301
Note:
297-
* Cannot use `min_batch_size` or `max_batch_size` with `query_fn`.
302+
* `min_batch_size`, `max_batch_size`, and `max_batch_duration_secs`
303+
are not used with `query_fn`.
298304
* Either `where_clause_fields` or `where_clause_value_fn` must be provided
299305
for query construction if `query_fn` is not provided.
300306
* Ensure that the database user has the necessary permissions to query the
@@ -313,6 +319,9 @@ def __init__(
313319
f"WHERE {query_config.where_clause_template}")
314320
self._batching_kwargs['min_batch_size'] = min_batch_size
315321
self._batching_kwargs['max_batch_size'] = max_batch_size
322+
if max_batch_duration_secs is not None:
323+
self._batching_kwargs[
324+
'max_batch_duration_secs'] = max_batch_duration_secs
316325

317326
def __enter__(self):
318327
connector = self._connection_config.get_connector_handler()

sdks/python/apache_beam/transforms/enrichment_handlers/cloudsql_test.py

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,59 @@ def test_custom_query_config_cache_key_error(self):
213213
with self.assertRaises(NotImplementedError):
214214
handler.get_cache_key(request)
215215

216+
def test_batch_elements_kwargs_include_max_batch_duration_secs(self):
217+
connection_config = ExternalSQLDBConnectionConfig(
218+
db_adapter=DatabaseTypeAdapter.POSTGRESQL,
219+
host='localhost',
220+
port=5432,
221+
user='user',
222+
password='password',
223+
db_id='db')
224+
query_config = TableFieldsQueryConfig(
225+
table_id='my_table',
226+
where_clause_template='id = :id',
227+
where_clause_fields=['id'])
228+
229+
handler = CloudSQLEnrichmentHandler(
230+
connection_config=connection_config,
231+
query_config=query_config,
232+
min_batch_size=2,
233+
max_batch_size=10,
234+
max_batch_duration_secs=0.5)
235+
236+
self.assertEqual(
237+
handler.batch_elements_kwargs(),
238+
{
239+
'min_batch_size': 2,
240+
'max_batch_size': 10,
241+
'max_batch_duration_secs': 0.5,
242+
})
243+
244+
def test_batch_elements_kwargs_omit_max_batch_duration_secs_by_default(self):
245+
connection_config = ExternalSQLDBConnectionConfig(
246+
db_adapter=DatabaseTypeAdapter.POSTGRESQL,
247+
host='localhost',
248+
port=5432,
249+
user='user',
250+
password='password',
251+
db_id='db')
252+
query_config = TableFieldsQueryConfig(
253+
table_id='my_table',
254+
where_clause_template='id = :id',
255+
where_clause_fields=['id'])
256+
257+
handler = CloudSQLEnrichmentHandler(
258+
connection_config=connection_config,
259+
query_config=query_config,
260+
min_batch_size=2,
261+
max_batch_size=10)
262+
263+
self.assertEqual(
264+
handler.batch_elements_kwargs(), {
265+
'min_batch_size': 2,
266+
'max_batch_size': 10,
267+
})
268+
216269
def test_extract_parameter_names(self):
217270
"""Test parameter extraction from SQL templates."""
218271
connection_config = ExternalSQLDBConnectionConfig(

0 commit comments

Comments
 (0)