Skip to content

Commit

Permalink
bigtable and legacy support
Browse files Browse the repository at this point in the history
  • Loading branch information
riteshghorse committed Feb 23, 2024
1 parent 4a13130 commit 8962b2f
Show file tree
Hide file tree
Showing 7 changed files with 546 additions and 64 deletions.
Empty file.
6 changes: 4 additions & 2 deletions sdks/python/apache_beam/transforms/enrichment.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,8 +161,10 @@ def expand(self,
throttler=self._throttler)

# EnrichmentSourceHandler returns a tuple of (request,response).
return fetched_data | beam.Map(
lambda x: self._join_fn(x[0]._asdict(), x[1]._asdict()))
return (
fetched_data
| "enrichment_join" >>
beam.Map(lambda x: self._join_fn(x[0]._asdict(), x[1]._asdict())))

def with_redis_cache(
self,
Expand Down
20 changes: 2 additions & 18 deletions sdks/python/apache_beam/transforms/enrichment_handlers/bigtable.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
# limitations under the License.
#
import logging
from enum import Enum
from typing import Any
from typing import Dict
from typing import Optional
Expand All @@ -28,30 +27,15 @@

import apache_beam as beam
from apache_beam.transforms.enrichment import EnrichmentSourceHandler
from apache_beam.transforms.enrichment_handlers.utils import ExceptionLevel

__all__ = [
'BigTableEnrichmentHandler',
'ExceptionLevel',
]

_LOGGER = logging.getLogger(__name__)


class ExceptionLevel(Enum):
"""ExceptionLevel defines the exception level options to either
log a warning, or raise an exception, or do nothing when a BigTable query
returns an empty row.
Members:
- RAISE: Raise the exception.
- WARN: Log a warning for exception without raising it.
- QUIET: Neither log nor raise the exception.
"""
RAISE = 0
WARN = 1
QUIET = 2


class BigTableEnrichmentHandler(EnrichmentSourceHandler[beam.Row, beam.Row]):
"""A handler for :class:`apache_beam.transforms.enrichment.Enrichment`
transform to interact with GCP BigTable.
Expand All @@ -70,7 +54,7 @@ class BigTableEnrichmentHandler(EnrichmentSourceHandler[beam.Row, beam.Row]):
encoding (str): encoding type to convert the string to bytes and vice-versa
from BigTable. Default is `utf-8`.
exception_level: a `enum.Enum` value from
``apache_beam.transforms.enrichment_handlers.bigtable.ExceptionLevel``
``apache_beam.transforms.enrichment_handlers.utils.ExceptionLevel``
to set the level when an empty row is returned from the BigTable query.
Defaults to ``ExceptionLevel.WARN``.
include_timestamp (bool): If enabled, the timestamp associated with the
Expand Down
38 changes: 38 additions & 0 deletions sdks/python/apache_beam/transforms/enrichment_handlers/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from enum import Enum

__all__ = [
'ExceptionLevel',
]


class ExceptionLevel(Enum):
"""Options to set the severity of exceptions.
You can set the exception level option to either
log a warning, or raise an exception, or do nothing when an empty row
is fetched from the external service.
Members:
- RAISE: Raise the exception.
- WARN: Log a warning for exception without raising it.
- QUIET: Neither log nor raise the exception.
"""
RAISE = 0
WARN = 1
QUIET = 2
Loading

0 comments on commit 8962b2f

Please sign in to comment.