Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add test code to overwrite SQL in Beam Python JDBC #30417

Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 22 additions & 2 deletions sdks/python/apache_beam/io/external/xlang_jdbcio_it_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
from parameterized import parameterized

import apache_beam as beam
from apache_beam import DoFn
from apache_beam import ParDo
from apache_beam import coders
from apache_beam.io.jdbc import ReadFromJdbc
from apache_beam.io.jdbc import WriteToJdbc
Expand Down Expand Up @@ -197,8 +199,6 @@ def test_xlang_jdbc_write_read(self, database):
p.not_use_test_runner_api = True
result = (
p
# TODO(https://github.com/apache/beam/issues/20446) Add test with
# overridden read_query
| 'Read from jdbc' >> ReadFromJdbc(
table_name=table_name,
driver_class_name=self.driver_class_name,
Expand All @@ -209,6 +209,26 @@ def test_xlang_jdbc_write_read(self, database):

assert_that(result, equal_to(expected_row))

with TestPipeline() as p:
p.not_use_test_runner_api = True

class ExtractCount(DoFn):
def process(self, element):
yield element[0]

result = (
p
| 'Read from jdbc override query' >> ReadFromJdbc(
table_name=table_name,
query=f'select count(*) from {table_name}',
driver_class_name=self.driver_class_name,
jdbc_url=self.jdbc_url,
username=self.username,
password=self.password,
classpath=classpath)
| 'ExtractCount' >> ParDo(ExtractCount()))
assert_that(result, equal_to([ROW_COUNT]))

# Try the same read using the partitioned reader code path.
# Outputs should be the same.
with TestPipeline() as p:
Expand Down
Loading