Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Python] BigQuery handler for enrichment transform #31295

Merged
merged 8 commits into from
May 17, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
* Beam YAML now supports the jinja templating syntax.
Template variables can be passed with the (json-formatted) `--jinja_variables` flag.
* DataFrame API now supports pandas 2.1.x and adds 12 more string functions for Series.([#31185](https://github.com/apache/beam/pull/31185)).
* Added BigQuery handler for enrichment transform (Python) ([#31295](https://github.com/apache/beam/pull/31295))

## Breaking Changes

Expand Down
61 changes: 31 additions & 30 deletions sdks/python/apache_beam/transforms/enrichment_handlers/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,23 +42,23 @@ def _validate_batch_query_fn(query_fn, min_batch_size, max_batch_size):

def _validate_bigquery_metadata(
table_name, row_restriction_template, fields, condition_value_fn, query_fn):
if query_fn and bool(table_name or row_restriction_template or fields or
condition_value_fn):
raise ValueError(
"Please provide either `query_fn` or the parameters "
"`table_name`, `row_restriction_template`, and `fields` "
"together.")
elif not query_fn and not (table_name and row_restriction_template and
(fields or condition_value_fn)):
raise ValueError(
"Please provide either `query_fn` or the parameters "
"`table_name`, `row_restriction_template`, and"
"`fields/condition_value_fn` together.")
if not query_fn and ((fields and condition_value_fn) or
(not fields and not condition_value_fn)):
raise ValueError(
"Please provide exactly one of `fields` or "
"`condition_value_fn`")
if query_fn:
if bool(table_name or row_restriction_template or fields or
condition_value_fn):
raise ValueError(
"Please provide either `query_fn` or the parameters `table_name`, "
"`row_restriction_template`, and `fields/condition_value_fn` "
"together.")
else:
if not (table_name and row_restriction_template):
raise ValueError(
"Please provide either `query_fn` or the parameters "
"`table_name`, `row_restriction_template` together.")
if ((fields and condition_value_fn) or
(not fields and not condition_value_fn)):
raise ValueError(
"Please provide exactly one of `fields` or "
"`condition_value_fn`")


class BigQueryEnrichmentHandler(EnrichmentSourceHandler[Union[Row, List[Row]],
Expand Down Expand Up @@ -124,17 +124,17 @@ def __init__(
placeholder `{}` of `WHERE` clause in the query.
query_fn: (Optional[Callable[[beam.Row], str]]) A function that takes a
`beam.Row` and returns a complete BigQuery SQL query string.
min_batch_size: (Optional[int]) Minimum number of rows to batch together
when querying BigQuery.
max_batch_size: (Optional[int]) Maximum number of rows to batch together.
min_batch_size (int): Minimum number of rows to batch together when
querying BigQuery. Defaults to 1 if `query_fn` is not specified.
max_batch_size (int): Maximum number of rows to batch together.
Defaults to 10,000 if `query_fn` is not specified.
**kwargs: Additional keyword arguments to pass to `bigquery.Client`.

Note:
* `min_batch_size` and `max_batch_size` won't have any effect if the
* `min_batch_size` and `max_batch_size` cannot be defined if the
`query_fn` is provided.
* Either `fields` or `condition_value_fn` must be provided for query
construction if `query_fn` is not provided.
* If `query_fn` is provided, it overrides the default query construction.
* Ensure appropriate permissions are granted for BigQuery access.
"""
_validate_bigquery_metadata(
Expand All @@ -157,10 +157,11 @@ def __init__(
(self.select_fields, self.table_name, self.row_restriction_template))
self.kwargs = kwargs
self._batching_kwargs = {}
if min_batch_size is not None:
self._batching_kwargs['min_batch_size'] = min_batch_size
if max_batch_size is not None:
self._batching_kwargs['max_batch_size'] = max_batch_size
if not query_fn:
riteshghorse marked this conversation as resolved.
Show resolved Hide resolved
self._batching_kwargs['min_batch_size'] = (
min_batch_size if min_batch_size else 1)
self._batching_kwargs['max_batch_size'] = (
max_batch_size if max_batch_size else 10000)

def __enter__(self):
self.client = bigquery.Client(project=self.project, **self.kwargs)
Expand Down Expand Up @@ -235,11 +236,11 @@ def get_cache_key(self, request: Union[beam.Row, List[beam.Row]]):
cache_keys = []
for req in request:
req_dict = req._asdict()
current_values = (
self.condition_value_fn(req) if self.condition_value_fn else
[req_dict[field] for field in self.fields])
key = ";".join(["%s"] * len(current_values))
try:
current_values = (
self.condition_value_fn(req) if self.condition_value_fn else
[req_dict[field] for field in self.fields])
key = ";".join(["%s"] * len(current_values))
cache_keys.extend([key % tuple(current_values)])
except KeyError as e:
raise KeyError(
Expand Down
Loading