Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enrichment Transform with BigTable handler #30001

Merged
merged 45 commits into from
Jan 19, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
0ad521c
enrichment v1
riteshghorse Dec 15, 2023
e83bad7
add documentation
riteshghorse Dec 15, 2023
5162960
add doc comment
riteshghorse Dec 15, 2023
a45392d
rerun
riteshghorse Dec 18, 2023
9fdbeb3
update docs, lint
riteshghorse Dec 18, 2023
c541148
update docs, lint
riteshghorse Dec 18, 2023
5c9be0e
add generic type
riteshghorse Dec 18, 2023
9df679c
add generic type
riteshghorse Dec 18, 2023
883ff0d
adjust doc path
riteshghorse Dec 18, 2023
818bb8a
create test row
riteshghorse Dec 18, 2023
e1feeb8
use request type
riteshghorse Dec 18, 2023
40275e9
use request type
riteshghorse Dec 18, 2023
be67a88
change module name
riteshghorse Dec 20, 2023
27ed250
more tests
riteshghorse Jan 2, 2024
4af90f5
remove non-functional params
riteshghorse Jan 3, 2024
041fcd0
lint, doc
riteshghorse Jan 3, 2024
91f58b5
change types for general use
riteshghorse Jan 4, 2024
9fd6813
callable type
riteshghorse Jan 4, 2024
036eceb
dict type
riteshghorse Jan 4, 2024
021f9c4
update signatures
riteshghorse Jan 9, 2024
062b9ef
fix unit test
riteshghorse Jan 9, 2024
b11d3ea
bigtable with column family, ids, rrio-throttler
riteshghorse Jan 11, 2024
46e3a6d
update tests for row filter
riteshghorse Jan 11, 2024
433d5fa
convert handler types from dict to Row
riteshghorse Jan 11, 2024
d18d583
update tests for bigtable
riteshghorse Jan 12, 2024
c5e792c
ran pydocs
riteshghorse Jan 12, 2024
9285a5b
ran pydocs
riteshghorse Jan 12, 2024
641bdf7
mark postcommit
riteshghorse Jan 12, 2024
c36a21e
remove _test file, fix import
riteshghorse Jan 12, 2024
3989c16
enable postcommit
riteshghorse Jan 12, 2024
7102acd
add more tests
riteshghorse Jan 12, 2024
87e32bb
skip tests when dependencies are not installed
riteshghorse Jan 16, 2024
57efa52
add deleted imports from last commit
riteshghorse Jan 16, 2024
282c608
add skip test condition
riteshghorse Jan 16, 2024
deecdbc
fix import order, add TooManyRequests to try-catch
riteshghorse Jan 16, 2024
253633e
make throttler, repeater non-optional
riteshghorse Jan 16, 2024
932fae3
add exception level and tests
riteshghorse Jan 17, 2024
cf88d6f
correct pydoc statement
riteshghorse Jan 17, 2024
5b702d2
add throttle tests
riteshghorse Jan 17, 2024
18d9539
add bigtable improvements
riteshghorse Jan 18, 2024
6e251ce
default app_profile_id
riteshghorse Jan 18, 2024
20b8ba6
add documentation, ignore None assignment
riteshghorse Jan 18, 2024
27974b8
add to changes.md
riteshghorse Jan 18, 2024
7c9a03c
change test structure that throws exception, skip http test for now
riteshghorse Jan 18, 2024
5a626e3
drop postcommit trigger file
riteshghorse Jan 18, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions sdks/python/apache_beam/io/requestresponse.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ def _execute_request(
try:
return future.result(timeout=timeout)
except TooManyRequests as e:
_LOGGER.warning(
_LOGGER.info(
'request could not be completed. got code %i from the service.',
e.code)
raise e
Expand Down Expand Up @@ -264,10 +264,10 @@ def __init__(
caller: Caller[RequestT, ResponseT],
timeout: Optional[float] = DEFAULT_TIMEOUT_SECS,
should_backoff: Optional[ShouldBackOff] = None,
repeater: Optional[Repeater] = ExponentialBackOffRepeater(),
repeater: Repeater = ExponentialBackOffRepeater(),
cache_reader: Optional[CacheReader] = None,
cache_writer: Optional[CacheWriter] = None,
throttler: Optional[PreCallThrottler] = DefaultThrottler(),
throttler: PreCallThrottler = DefaultThrottler(),
):
"""
Instantiates a RequestResponseIO transform.
Expand Down Expand Up @@ -343,8 +343,8 @@ def __init__(
caller: Caller[RequestT, ResponseT],
timeout: Optional[float] = DEFAULT_TIMEOUT_SECS,
should_backoff: Optional[ShouldBackOff] = None,
repeater: Optional[Repeater] = None,
throttler: Optional[PreCallThrottler] = None,
repeater: Repeater = None,
throttler: PreCallThrottler = None,
):
self._caller = caller
self._timeout = timeout
Expand Down
10 changes: 9 additions & 1 deletion sdks/python/apache_beam/transforms/enrichment.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#

import logging
from typing import Any
from typing import Callable
from typing import Dict
Expand All @@ -37,6 +37,8 @@

JoinFn = Callable[[Dict[str, Any], Dict[str, Any]], beam.Row]

_LOGGER = logging.getLogger(__name__)


def cross_join(left: Dict[str, Any], right: Dict[str, Any]) -> beam.Row:
"""cross_join performs a cross join on two `dict` objects.
Expand All @@ -54,6 +56,12 @@ def cross_join(left: Dict[str, Any], right: Dict[str, Any]) -> beam.Row:
if k not in left:
# Don't override the values in left.
left[k] = v
riteshghorse marked this conversation as resolved.
Show resolved Hide resolved
elif left[k] != v:
_LOGGER.warning(
'%s exists in the input row as well the row fetched '
'from API but have different values. Using the input '
'value, you can override this behavior by passing a '
'custom `join_fn`.' % k)
riteshghorse marked this conversation as resolved.
Show resolved Hide resolved
return beam.Row(**left)


Expand Down
Loading