Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable BigQuery CDC configuration for Python BigQuery sink #32529

Merged
merged 40 commits into from
Oct 15, 2024

fixes wrong property reference

1487acc
Select commit
Loading
Failed to load commit list.
Sign in for the full log view
Merged

Enable BigQuery CDC configuration for Python BigQuery sink #32529

fixes wrong property reference
1487acc
Select commit
Loading
Failed to load commit list.
GitHub Actions / Test Results failed Oct 2, 2024 in 0s

8 fail, 23 skipped, 6 pass in 4m 32s

37 tests  ±0    6 ✅ ±0   4m 32s ⏱️ - 39m 15s
 1 suites ±0   23 💤 +1 
 1 files   ±0    8 ❌  - 1 

Results for commit 1487acc. ± Comparison against earlier commit 72d1bf4.

Annotations

Check warning on line 0 in apache_beam.io.external.xlang_bigqueryio_it_test.BigQueryXlangStorageWriteIT

See this annotation in the file changed.

@github-actions github-actions / Test Results

test_all_types (apache_beam.io.external.xlang_bigqueryio_it_test.BigQueryXlangStorageWriteIT) failed

sdks/python/pytest_gcpCrossLanguage.xml [took 7s]
Raw output
ValueError: Unknown fields: {'cdc_writes_primary_key'}. Valid fields: ('auto_sharding', 'create_disposition', 'error_handling', 'num_streams', 'primary_key', 'table', 'triggering_frequency_seconds', 'use_at_least_once_semantics', 'use_cdc_writes', 'write_disposition')
self = <apache_beam.io.external.xlang_bigqueryio_it_test.BigQueryXlangStorageWriteIT testMethod=test_all_types>

    def test_all_types(self):
      table_name = "all_types"
      schema = self.ALL_TYPES_SCHEMA
>     self.run_storage_write_test(table_name, self.ELEMENTS, schema)

apache_beam/io/external/xlang_bigqueryio_it_test.py:171: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
apache_beam/io/external/xlang_bigqueryio_it_test.py:159: in run_storage_write_test
    p
apache_beam/pvalue.py:138: in __or__
    return self.pipeline.apply(ptransform, self)
apache_beam/pipeline.py:754: in apply
    pvalueish_result = self.runner.apply(transform, pvalueish, self._options)
apache_beam/runners/runner.py:191: in apply
    return self.apply_PTransform(transform, input, options)
apache_beam/runners/runner.py:195: in apply_PTransform
    return transform.expand(input)
apache_beam/io/gcp/bigquery.py:2295: in expand
    return pcoll | StorageWriteToBigQuery(
apache_beam/pvalue.py:138: in __or__
    return self.pipeline.apply(ptransform, self)
apache_beam/pipeline.py:754: in apply
    pvalueish_result = self.runner.apply(transform, pvalueish, self._options)
apache_beam/runners/runner.py:191: in apply
    return self.apply_PTransform(transform, input, options)
apache_beam/runners/runner.py:195: in apply_PTransform
    return transform.expand(input)
apache_beam/io/gcp/bigquery.py:2637: in expand
    input_beam_rows
apache_beam/pvalue.py:138: in __or__
    return self.pipeline.apply(ptransform, self)
apache_beam/pipeline.py:754: in apply
    pvalueish_result = self.runner.apply(transform, pvalueish, self._options)
apache_beam/runners/runner.py:191: in apply
    return self.apply_PTransform(transform, input, options)
apache_beam/runners/runner.py:195: in apply_PTransform
    return transform.expand(input)
apache_beam/transforms/external.py:429: in expand
    return pcolls | self._payload_builder.identifier() >> ExternalTransform(
apache_beam/transforms/external.py:653: in __init__
    payload.payload() if isinstance(payload, PayloadBuilder) else payload)
apache_beam/transforms/external.py:111: in payload
    return self.build().SerializeToString()
apache_beam/transforms/external.py:256: in build
    dict_to_row(self._schema_proto, self._kwargs)))
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

schema_proto = fields {
  name: "auto_sharding"
  type {
    nullable: true
    atomic_type: BOOLEAN
  }
}
fields {
  name: "create_d...rite_disposition"
  type {
    nullable: true
    atomic_type: STRING
  }
}
id: "84231bf2-f917-4a7f-a28b-4bb0594c6ef5"

py_value = {'auto_sharding': False, 'cdc_writes_primary_key': None, 'create_disposition': 'CREATE_IF_NEEDED', 'error_handling': {'output': 'FailedRowsWithErrors'}, ...}

    def dict_to_row(schema_proto, py_value):
      row_type = named_tuple_from_schema(schema_proto)
      if isinstance(py_value, dict):
        extra = set(py_value.keys()) - set(row_type._fields)
        if extra:
>         raise ValueError(
              f"Unknown fields: {extra}. Valid fields: {row_type._fields}")
E         ValueError: Unknown fields: {'cdc_writes_primary_key'}. Valid fields: ('auto_sharding', 'create_disposition', 'error_handling', 'num_streams', 'primary_key', 'table', 'triggering_frequency_seconds', 'use_at_least_once_semantics', 'use_cdc_writes', 'write_disposition')

apache_beam/transforms/external.py:241: ValueError

Check warning on line 0 in apache_beam.io.external.xlang_bigqueryio_it_test.BigQueryXlangStorageWriteIT

See this annotation in the file changed.

@github-actions github-actions / Test Results

test_nested_records_and_lists (apache_beam.io.external.xlang_bigqueryio_it_test.BigQueryXlangStorageWriteIT) failed

sdks/python/pytest_gcpCrossLanguage.xml [took 6s]
Raw output
ValueError: Unknown fields: {'cdc_writes_primary_key'}. Valid fields: ('auto_sharding', 'create_disposition', 'error_handling', 'num_streams', 'primary_key', 'table', 'triggering_frequency_seconds', 'use_at_least_once_semantics', 'use_cdc_writes', 'write_disposition')
self = <apache_beam.io.external.xlang_bigqueryio_it_test.BigQueryXlangStorageWriteIT testMethod=test_nested_records_and_lists>

    def test_nested_records_and_lists(self):
      table_name = "nested_records_and_lists"
      schema = {
          "fields": [{
              "name": "repeated_int", "type": "INTEGER", "mode": "REPEATED"
          },
                     {
                         "name": "struct",
                         "type": "STRUCT",
                         "fields": [{
                             "name": "nested_int", "type": "INTEGER"
                         }, {
                             "name": "nested_str", "type": "STRING"
                         }]
                     },
                     {
                         "name": "repeated_struct",
                         "type": "STRUCT",
                         "mode": "REPEATED",
                         "fields": [{
                             "name": "nested_numeric", "type": "NUMERIC"
                         }, {
                             "name": "nested_bytes", "type": "BYTES"
                         }]
                     }]
      }
      items = [{
          "repeated_int": [1, 2, 3],
          "struct": {
              "nested_int": 1, "nested_str": "a"
          },
          "repeated_struct": [{
              "nested_numeric": Decimal("1.23"), "nested_bytes": b'a'
          },
                              {
                                  "nested_numeric": Decimal("3.21"),
                                  "nested_bytes": b'aa'
                              }]
      }]
    
>     self.run_storage_write_test(table_name, items, schema)

apache_beam/io/external/xlang_bigqueryio_it_test.py:219: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
apache_beam/io/external/xlang_bigqueryio_it_test.py:159: in run_storage_write_test
    p
apache_beam/pvalue.py:138: in __or__
    return self.pipeline.apply(ptransform, self)
apache_beam/pipeline.py:754: in apply
    pvalueish_result = self.runner.apply(transform, pvalueish, self._options)
apache_beam/runners/runner.py:191: in apply
    return self.apply_PTransform(transform, input, options)
apache_beam/runners/runner.py:195: in apply_PTransform
    return transform.expand(input)
apache_beam/io/gcp/bigquery.py:2295: in expand
    return pcoll | StorageWriteToBigQuery(
apache_beam/pvalue.py:138: in __or__
    return self.pipeline.apply(ptransform, self)
apache_beam/pipeline.py:754: in apply
    pvalueish_result = self.runner.apply(transform, pvalueish, self._options)
apache_beam/runners/runner.py:191: in apply
    return self.apply_PTransform(transform, input, options)
apache_beam/runners/runner.py:195: in apply_PTransform
    return transform.expand(input)
apache_beam/io/gcp/bigquery.py:2637: in expand
    input_beam_rows
apache_beam/pvalue.py:138: in __or__
    return self.pipeline.apply(ptransform, self)
apache_beam/pipeline.py:754: in apply
    pvalueish_result = self.runner.apply(transform, pvalueish, self._options)
apache_beam/runners/runner.py:191: in apply
    return self.apply_PTransform(transform, input, options)
apache_beam/runners/runner.py:195: in apply_PTransform
    return transform.expand(input)
apache_beam/transforms/external.py:429: in expand
    return pcolls | self._payload_builder.identifier() >> ExternalTransform(
apache_beam/transforms/external.py:653: in __init__
    payload.payload() if isinstance(payload, PayloadBuilder) else payload)
apache_beam/transforms/external.py:111: in payload
    return self.build().SerializeToString()
apache_beam/transforms/external.py:256: in build
    dict_to_row(self._schema_proto, self._kwargs)))
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

schema_proto = fields {
  name: "auto_sharding"
  type {
    nullable: true
    atomic_type: BOOLEAN
  }
}
fields {
  name: "create_d...rite_disposition"
  type {
    nullable: true
    atomic_type: STRING
  }
}
id: "d7ccdd52-3fab-41ad-8b62-7df19549c593"

py_value = {'auto_sharding': False, 'cdc_writes_primary_key': None, 'create_disposition': 'CREATE_IF_NEEDED', 'error_handling': {'output': 'FailedRowsWithErrors'}, ...}

    def dict_to_row(schema_proto, py_value):
      row_type = named_tuple_from_schema(schema_proto)
      if isinstance(py_value, dict):
        extra = set(py_value.keys()) - set(row_type._fields)
        if extra:
>         raise ValueError(
              f"Unknown fields: {extra}. Valid fields: {row_type._fields}")
E         ValueError: Unknown fields: {'cdc_writes_primary_key'}. Valid fields: ('auto_sharding', 'create_disposition', 'error_handling', 'num_streams', 'primary_key', 'table', 'triggering_frequency_seconds', 'use_at_least_once_semantics', 'use_cdc_writes', 'write_disposition')

apache_beam/transforms/external.py:241: ValueError

Check warning on line 0 in apache_beam.io.external.xlang_bigqueryio_it_test.BigQueryXlangStorageWriteIT

See this annotation in the file changed.

@github-actions github-actions / Test Results

test_streaming_with_at_least_once (apache_beam.io.external.xlang_bigqueryio_it_test.BigQueryXlangStorageWriteIT) failed

sdks/python/pytest_gcpCrossLanguage.xml [took 8s]
Raw output
ValueError: Unknown fields: {'cdc_writes_primary_key'}. Valid fields: ('auto_sharding', 'create_disposition', 'error_handling', 'num_streams', 'primary_key', 'table', 'triggering_frequency_seconds', 'use_at_least_once_semantics', 'use_cdc_writes', 'write_disposition')
self = <apache_beam.io.external.xlang_bigqueryio_it_test.BigQueryXlangStorageWriteIT testMethod=test_streaming_with_at_least_once>

    def test_streaming_with_at_least_once(self):
      table = 'streaming_with_at_least_once'
>     self.run_streaming(table_name=table, use_at_least_once=True)

apache_beam/io/external/xlang_bigqueryio_it_test.py:399: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
apache_beam/io/external/xlang_bigqueryio_it_test.py:363: in run_streaming
    p
apache_beam/pvalue.py:138: in __or__
    return self.pipeline.apply(ptransform, self)
apache_beam/pipeline.py:754: in apply
    pvalueish_result = self.runner.apply(transform, pvalueish, self._options)
apache_beam/runners/runner.py:191: in apply
    return self.apply_PTransform(transform, input, options)
apache_beam/runners/runner.py:195: in apply_PTransform
    return transform.expand(input)
apache_beam/io/gcp/bigquery.py:2295: in expand
    return pcoll | StorageWriteToBigQuery(
apache_beam/pvalue.py:138: in __or__
    return self.pipeline.apply(ptransform, self)
apache_beam/pipeline.py:754: in apply
    pvalueish_result = self.runner.apply(transform, pvalueish, self._options)
apache_beam/runners/runner.py:191: in apply
    return self.apply_PTransform(transform, input, options)
apache_beam/runners/runner.py:195: in apply_PTransform
    return transform.expand(input)
apache_beam/io/gcp/bigquery.py:2637: in expand
    input_beam_rows
apache_beam/pvalue.py:138: in __or__
    return self.pipeline.apply(ptransform, self)
apache_beam/pipeline.py:754: in apply
    pvalueish_result = self.runner.apply(transform, pvalueish, self._options)
apache_beam/runners/runner.py:191: in apply
    return self.apply_PTransform(transform, input, options)
apache_beam/runners/runner.py:195: in apply_PTransform
    return transform.expand(input)
apache_beam/transforms/external.py:429: in expand
    return pcolls | self._payload_builder.identifier() >> ExternalTransform(
apache_beam/transforms/external.py:653: in __init__
    payload.payload() if isinstance(payload, PayloadBuilder) else payload)
apache_beam/transforms/external.py:111: in payload
    return self.build().SerializeToString()
apache_beam/transforms/external.py:256: in build
    dict_to_row(self._schema_proto, self._kwargs)))
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

schema_proto = fields {
  name: "auto_sharding"
  type {
    nullable: true
    atomic_type: BOOLEAN
  }
}
fields {
  name: "create_d...rite_disposition"
  type {
    nullable: true
    atomic_type: STRING
  }
}
id: "d5281d1d-1b3a-40ec-a11d-a1894a49f053"

py_value = {'auto_sharding': True, 'cdc_writes_primary_key': None, 'create_disposition': 'CREATE_IF_NEEDED', 'error_handling': {'output': 'FailedRowsWithErrors'}, ...}

    def dict_to_row(schema_proto, py_value):
      row_type = named_tuple_from_schema(schema_proto)
      if isinstance(py_value, dict):
        extra = set(py_value.keys()) - set(row_type._fields)
        if extra:
>         raise ValueError(
              f"Unknown fields: {extra}. Valid fields: {row_type._fields}")
E         ValueError: Unknown fields: {'cdc_writes_primary_key'}. Valid fields: ('auto_sharding', 'create_disposition', 'error_handling', 'num_streams', 'primary_key', 'table', 'triggering_frequency_seconds', 'use_at_least_once_semantics', 'use_cdc_writes', 'write_disposition')

apache_beam/transforms/external.py:241: ValueError

Check warning on line 0 in apache_beam.io.external.xlang_bigqueryio_it_test.BigQueryXlangStorageWriteIT

See this annotation in the file changed.

@github-actions github-actions / Test Results

test_with_at_least_once_semantics (apache_beam.io.external.xlang_bigqueryio_it_test.BigQueryXlangStorageWriteIT) failed

sdks/python/pytest_gcpCrossLanguage.xml [took 7s]
Raw output
ValueError: Unknown fields: {'cdc_writes_primary_key'}. Valid fields: ('auto_sharding', 'create_disposition', 'error_handling', 'num_streams', 'primary_key', 'table', 'triggering_frequency_seconds', 'use_at_least_once_semantics', 'use_cdc_writes', 'write_disposition')
self = <apache_beam.io.external.xlang_bigqueryio_it_test.BigQueryXlangStorageWriteIT testMethod=test_with_at_least_once_semantics>

    def test_with_at_least_once_semantics(self):
      table_name = "with_at_least_once_semantics"
      schema = self.ALL_TYPES_SCHEMA
>     self.run_storage_write_test(
          table_name, self.ELEMENTS, schema, use_at_least_once=True)

apache_beam/io/external/xlang_bigqueryio_it_test.py:176: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
apache_beam/io/external/xlang_bigqueryio_it_test.py:159: in run_storage_write_test
    p
apache_beam/pvalue.py:138: in __or__
    return self.pipeline.apply(ptransform, self)
apache_beam/pipeline.py:754: in apply
    pvalueish_result = self.runner.apply(transform, pvalueish, self._options)
apache_beam/runners/runner.py:191: in apply
    return self.apply_PTransform(transform, input, options)
apache_beam/runners/runner.py:195: in apply_PTransform
    return transform.expand(input)
apache_beam/io/gcp/bigquery.py:2295: in expand
    return pcoll | StorageWriteToBigQuery(
apache_beam/pvalue.py:138: in __or__
    return self.pipeline.apply(ptransform, self)
apache_beam/pipeline.py:754: in apply
    pvalueish_result = self.runner.apply(transform, pvalueish, self._options)
apache_beam/runners/runner.py:191: in apply
    return self.apply_PTransform(transform, input, options)
apache_beam/runners/runner.py:195: in apply_PTransform
    return transform.expand(input)
apache_beam/io/gcp/bigquery.py:2637: in expand
    input_beam_rows
apache_beam/pvalue.py:138: in __or__
    return self.pipeline.apply(ptransform, self)
apache_beam/pipeline.py:754: in apply
    pvalueish_result = self.runner.apply(transform, pvalueish, self._options)
apache_beam/runners/runner.py:191: in apply
    return self.apply_PTransform(transform, input, options)
apache_beam/runners/runner.py:195: in apply_PTransform
    return transform.expand(input)
apache_beam/transforms/external.py:429: in expand
    return pcolls | self._payload_builder.identifier() >> ExternalTransform(
apache_beam/transforms/external.py:653: in __init__
    payload.payload() if isinstance(payload, PayloadBuilder) else payload)
apache_beam/transforms/external.py:111: in payload
    return self.build().SerializeToString()
apache_beam/transforms/external.py:256: in build
    dict_to_row(self._schema_proto, self._kwargs)))
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

schema_proto = fields {
  name: "auto_sharding"
  type {
    nullable: true
    atomic_type: BOOLEAN
  }
}
fields {
  name: "create_d...rite_disposition"
  type {
    nullable: true
    atomic_type: STRING
  }
}
id: "00512774-0a05-4610-b6b4-0db5ca1e0216"

py_value = {'auto_sharding': False, 'cdc_writes_primary_key': None, 'create_disposition': 'CREATE_IF_NEEDED', 'error_handling': {'output': 'FailedRowsWithErrors'}, ...}

    def dict_to_row(schema_proto, py_value):
      row_type = named_tuple_from_schema(schema_proto)
      if isinstance(py_value, dict):
        extra = set(py_value.keys()) - set(row_type._fields)
        if extra:
>         raise ValueError(
              f"Unknown fields: {extra}. Valid fields: {row_type._fields}")
E         ValueError: Unknown fields: {'cdc_writes_primary_key'}. Valid fields: ('auto_sharding', 'create_disposition', 'error_handling', 'num_streams', 'primary_key', 'table', 'triggering_frequency_seconds', 'use_at_least_once_semantics', 'use_cdc_writes', 'write_disposition')

apache_beam/transforms/external.py:241: ValueError

Check warning on line 0 in apache_beam.io.external.xlang_bigqueryio_it_test.BigQueryXlangStorageWriteIT

See this annotation in the file changed.

@github-actions github-actions / Test Results

test_write_to_dynamic_destinations (apache_beam.io.external.xlang_bigqueryio_it_test.BigQueryXlangStorageWriteIT) failed

sdks/python/pytest_gcpCrossLanguage.xml [took 7s]
Raw output
ValueError: Unknown fields: {'cdc_writes_primary_key'}. Valid fields: ('auto_sharding', 'create_disposition', 'error_handling', 'num_streams', 'primary_key', 'table', 'triggering_frequency_seconds', 'use_at_least_once_semantics', 'use_cdc_writes', 'write_disposition')
self = <apache_beam.io.external.xlang_bigqueryio_it_test.BigQueryXlangStorageWriteIT testMethod=test_write_to_dynamic_destinations>

    def test_write_to_dynamic_destinations(self):
      base_table_spec = '{}.dynamic_dest_'.format(self.dataset_id)
      spec_with_project = '{}:{}'.format(self.project, base_table_spec)
      tables = [base_table_spec + str(record['int']) for record in self.ELEMENTS]
    
      bq_matchers = [
          BigqueryFullResultMatcher(
              project=self.project,
              query="SELECT * FROM %s" % tables[i],
              data=self.parse_expected_data(self.ELEMENTS[i]))
          for i in range(len(tables))
      ]
    
      with beam.Pipeline(argv=self.args) as p:
        _ = (
>           p
            | beam.Create(self.ELEMENTS)
            | beam.io.WriteToBigQuery(
                table=lambda record: spec_with_project + str(record['int']),
                method=beam.io.WriteToBigQuery.Method.STORAGE_WRITE_API,
                schema=self.ALL_TYPES_SCHEMA,
                use_at_least_once=False))

apache_beam/io/external/xlang_bigqueryio_it_test.py:302: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
apache_beam/pvalue.py:138: in __or__
    return self.pipeline.apply(ptransform, self)
apache_beam/pipeline.py:754: in apply
    pvalueish_result = self.runner.apply(transform, pvalueish, self._options)
apache_beam/runners/runner.py:191: in apply
    return self.apply_PTransform(transform, input, options)
apache_beam/runners/runner.py:195: in apply_PTransform
    return transform.expand(input)
apache_beam/io/gcp/bigquery.py:2295: in expand
    return pcoll | StorageWriteToBigQuery(
apache_beam/pvalue.py:138: in __or__
    return self.pipeline.apply(ptransform, self)
apache_beam/pipeline.py:754: in apply
    pvalueish_result = self.runner.apply(transform, pvalueish, self._options)
apache_beam/runners/runner.py:191: in apply
    return self.apply_PTransform(transform, input, options)
apache_beam/runners/runner.py:195: in apply_PTransform
    return transform.expand(input)
apache_beam/io/gcp/bigquery.py:2637: in expand
    input_beam_rows
apache_beam/pvalue.py:138: in __or__
    return self.pipeline.apply(ptransform, self)
apache_beam/pipeline.py:754: in apply
    pvalueish_result = self.runner.apply(transform, pvalueish, self._options)
apache_beam/runners/runner.py:191: in apply
    return self.apply_PTransform(transform, input, options)
apache_beam/runners/runner.py:195: in apply_PTransform
    return transform.expand(input)
apache_beam/transforms/external.py:429: in expand
    return pcolls | self._payload_builder.identifier() >> ExternalTransform(
apache_beam/transforms/external.py:653: in __init__
    payload.payload() if isinstance(payload, PayloadBuilder) else payload)
apache_beam/transforms/external.py:111: in payload
    return self.build().SerializeToString()
apache_beam/transforms/external.py:256: in build
    dict_to_row(self._schema_proto, self._kwargs)))
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

schema_proto = fields {
  name: "auto_sharding"
  type {
    nullable: true
    atomic_type: BOOLEAN
  }
}
fields {
  name: "create_d...rite_disposition"
  type {
    nullable: true
    atomic_type: STRING
  }
}
id: "4fef000f-b9d9-4e18-b93f-e70f9171c8d4"

py_value = {'auto_sharding': False, 'cdc_writes_primary_key': None, 'create_disposition': 'CREATE_IF_NEEDED', 'error_handling': {'output': 'FailedRowsWithErrors'}, ...}

    def dict_to_row(schema_proto, py_value):
      row_type = named_tuple_from_schema(schema_proto)
      if isinstance(py_value, dict):
        extra = set(py_value.keys()) - set(row_type._fields)
        if extra:
>         raise ValueError(
              f"Unknown fields: {extra}. Valid fields: {row_type._fields}")
E         ValueError: Unknown fields: {'cdc_writes_primary_key'}. Valid fields: ('auto_sharding', 'create_disposition', 'error_handling', 'num_streams', 'primary_key', 'table', 'triggering_frequency_seconds', 'use_at_least_once_semantics', 'use_cdc_writes', 'write_disposition')

apache_beam/transforms/external.py:241: ValueError

Check warning on line 0 in apache_beam.io.external.xlang_bigqueryio_it_test.BigQueryXlangStorageWriteIT

See this annotation in the file changed.

@github-actions github-actions / Test Results

test_write_to_dynamic_destinations_with_beam_rows (apache_beam.io.external.xlang_bigqueryio_it_test.BigQueryXlangStorageWriteIT) failed

sdks/python/pytest_gcpCrossLanguage.xml [took 7s]
Raw output
ValueError: Unknown fields: {'cdc_writes_primary_key'}. Valid fields: ('auto_sharding', 'create_disposition', 'error_handling', 'num_streams', 'primary_key', 'table', 'triggering_frequency_seconds', 'use_at_least_once_semantics', 'use_cdc_writes', 'write_disposition')
self = <apache_beam.io.external.xlang_bigqueryio_it_test.BigQueryXlangStorageWriteIT testMethod=test_write_to_dynamic_destinations_with_beam_rows>

    def test_write_to_dynamic_destinations_with_beam_rows(self):
      base_table_spec = '{}.dynamic_dest_'.format(self.dataset_id)
      spec_with_project = '{}:{}'.format(self.project, base_table_spec)
      tables = [base_table_spec + str(record['int']) for record in self.ELEMENTS]
    
      bq_matchers = [
          BigqueryFullResultMatcher(
              project=self.project,
              query="SELECT * FROM %s" % tables[i],
              data=self.parse_expected_data(self.ELEMENTS[i]))
          for i in range(len(tables))
      ]
    
      row_elements = [
          beam.Row(
              my_int=e['int'],
              my_float=e['float'],
              my_numeric=e['numeric'],
              my_string=e['str'],
              my_bool=e['bool'],
              my_bytes=e['bytes'],
              my_timestamp=e['timestamp']) for e in self.ELEMENTS
      ]
    
      with beam.Pipeline(argv=self.args) as p:
        _ = (
>           p
            | beam.Create(row_elements)
            | beam.io.WriteToBigQuery(
                table=lambda record: spec_with_project + str(record.my_int),
                method=beam.io.WriteToBigQuery.Method.STORAGE_WRITE_API,
                use_at_least_once=False))

apache_beam/io/external/xlang_bigqueryio_it_test.py:337: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
apache_beam/pvalue.py:138: in __or__
    return self.pipeline.apply(ptransform, self)
apache_beam/pipeline.py:754: in apply
    pvalueish_result = self.runner.apply(transform, pvalueish, self._options)
apache_beam/runners/runner.py:191: in apply
    return self.apply_PTransform(transform, input, options)
apache_beam/runners/runner.py:195: in apply_PTransform
    return transform.expand(input)
apache_beam/io/gcp/bigquery.py:2295: in expand
    return pcoll | StorageWriteToBigQuery(
apache_beam/pvalue.py:138: in __or__
    return self.pipeline.apply(ptransform, self)
apache_beam/pipeline.py:754: in apply
    pvalueish_result = self.runner.apply(transform, pvalueish, self._options)
apache_beam/runners/runner.py:191: in apply
    return self.apply_PTransform(transform, input, options)
apache_beam/runners/runner.py:195: in apply_PTransform
    return transform.expand(input)
apache_beam/io/gcp/bigquery.py:2637: in expand
    input_beam_rows
apache_beam/pvalue.py:138: in __or__
    return self.pipeline.apply(ptransform, self)
apache_beam/pipeline.py:754: in apply
    pvalueish_result = self.runner.apply(transform, pvalueish, self._options)
apache_beam/runners/runner.py:191: in apply
    return self.apply_PTransform(transform, input, options)
apache_beam/runners/runner.py:195: in apply_PTransform
    return transform.expand(input)
apache_beam/transforms/external.py:429: in expand
    return pcolls | self._payload_builder.identifier() >> ExternalTransform(
apache_beam/transforms/external.py:653: in __init__
    payload.payload() if isinstance(payload, PayloadBuilder) else payload)
apache_beam/transforms/external.py:111: in payload
    return self.build().SerializeToString()
apache_beam/transforms/external.py:256: in build
    dict_to_row(self._schema_proto, self._kwargs)))
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

schema_proto = fields {
  name: "auto_sharding"
  type {
    nullable: true
    atomic_type: BOOLEAN
  }
}
fields {
  name: "create_d...rite_disposition"
  type {
    nullable: true
    atomic_type: STRING
  }
}
id: "a9078f9b-c24b-4364-8c52-0ddcaa585c4a"

py_value = {'auto_sharding': False, 'cdc_writes_primary_key': None, 'create_disposition': 'CREATE_IF_NEEDED', 'error_handling': {'output': 'FailedRowsWithErrors'}, ...}

    def dict_to_row(schema_proto, py_value):
      row_type = named_tuple_from_schema(schema_proto)
      if isinstance(py_value, dict):
        extra = set(py_value.keys()) - set(row_type._fields)
        if extra:
>         raise ValueError(
              f"Unknown fields: {extra}. Valid fields: {row_type._fields}")
E         ValueError: Unknown fields: {'cdc_writes_primary_key'}. Valid fields: ('auto_sharding', 'create_disposition', 'error_handling', 'num_streams', 'primary_key', 'table', 'triggering_frequency_seconds', 'use_at_least_once_semantics', 'use_cdc_writes', 'write_disposition')

apache_beam/transforms/external.py:241: ValueError

Check warning on line 0 in apache_beam.io.external.xlang_bigqueryio_it_test.BigQueryXlangStorageWriteIT

See this annotation in the file changed.

@github-actions github-actions / Test Results

test_write_with_beam_rows (apache_beam.io.external.xlang_bigqueryio_it_test.BigQueryXlangStorageWriteIT) failed

sdks/python/pytest_gcpCrossLanguage.xml [took 8s]
Raw output
ValueError: Unknown fields: {'cdc_writes_primary_key'}. Valid fields: ('auto_sharding', 'create_disposition', 'error_handling', 'num_streams', 'primary_key', 'table', 'triggering_frequency_seconds', 'use_at_least_once_semantics', 'use_cdc_writes', 'write_disposition')
self = <apache_beam.io.external.xlang_bigqueryio_it_test.BigQueryXlangStorageWriteIT testMethod=test_write_with_beam_rows>

    def test_write_with_beam_rows(self):
      table = 'write_with_beam_rows'
      table_id = '{}:{}.{}'.format(self.project, self.dataset_id, table)
    
      row_elements = [
          beam.Row(
              my_int=e['int'],
              my_float=e['float'],
              my_numeric=e['numeric'],
              my_string=e['str'],
              my_bool=e['bool'],
              my_bytes=e['bytes'],
              my_timestamp=e['timestamp']) for e in self.ELEMENTS
      ]
    
      bq_matcher = BigqueryFullResultMatcher(
          project=self.project,
          query="SELECT * FROM {}.{}".format(self.dataset_id, table),
          data=self.parse_expected_data(self.ELEMENTS))
    
      with beam.Pipeline(argv=self.args) as p:
        _ = (
>           p
            | beam.Create(row_elements)
            | StorageWriteToBigQuery(table=table_id))

apache_beam/io/external/xlang_bigqueryio_it_test.py:243: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
apache_beam/pvalue.py:138: in __or__
    return self.pipeline.apply(ptransform, self)
apache_beam/pipeline.py:754: in apply
    pvalueish_result = self.runner.apply(transform, pvalueish, self._options)
apache_beam/runners/runner.py:191: in apply
    return self.apply_PTransform(transform, input, options)
apache_beam/runners/runner.py:195: in apply_PTransform
    return transform.expand(input)
apache_beam/io/gcp/bigquery.py:2637: in expand
    input_beam_rows
apache_beam/pvalue.py:138: in __or__
    return self.pipeline.apply(ptransform, self)
apache_beam/pipeline.py:754: in apply
    pvalueish_result = self.runner.apply(transform, pvalueish, self._options)
apache_beam/runners/runner.py:191: in apply
    return self.apply_PTransform(transform, input, options)
apache_beam/runners/runner.py:195: in apply_PTransform
    return transform.expand(input)
apache_beam/transforms/external.py:429: in expand
    return pcolls | self._payload_builder.identifier() >> ExternalTransform(
apache_beam/transforms/external.py:653: in __init__
    payload.payload() if isinstance(payload, PayloadBuilder) else payload)
apache_beam/transforms/external.py:111: in payload
    return self.build().SerializeToString()
apache_beam/transforms/external.py:256: in build
    dict_to_row(self._schema_proto, self._kwargs)))
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

schema_proto = fields {
  name: "auto_sharding"
  type {
    nullable: true
    atomic_type: BOOLEAN
  }
}
fields {
  name: "create_d...rite_disposition"
  type {
    nullable: true
    atomic_type: STRING
  }
}
id: "a5c5272a-2065-4b22-9383-14d66dfc310c"

py_value = {'auto_sharding': False, 'cdc_writes_primary_key': None, 'create_disposition': 'CREATE_IF_NEEDED', 'error_handling': {'output': 'FailedRowsWithErrors'}, ...}

    def dict_to_row(schema_proto, py_value):
      row_type = named_tuple_from_schema(schema_proto)
      if isinstance(py_value, dict):
        extra = set(py_value.keys()) - set(row_type._fields)
        if extra:
>         raise ValueError(
              f"Unknown fields: {extra}. Valid fields: {row_type._fields}")
E         ValueError: Unknown fields: {'cdc_writes_primary_key'}. Valid fields: ('auto_sharding', 'create_disposition', 'error_handling', 'num_streams', 'primary_key', 'table', 'triggering_frequency_seconds', 'use_at_least_once_semantics', 'use_cdc_writes', 'write_disposition')

apache_beam/transforms/external.py:241: ValueError

Check warning on line 0 in apache_beam.io.external.xlang_bigqueryio_it_test.BigQueryXlangStorageWriteIT

See this annotation in the file changed.

@github-actions github-actions / Test Results

test_write_with_beam_rows_cdc (apache_beam.io.external.xlang_bigqueryio_it_test.BigQueryXlangStorageWriteIT) failed

sdks/python/pytest_gcpCrossLanguage.xml [took 7s]
Raw output
ValueError: Unknown fields: {'cdc_writes_primary_key'}. Valid fields: ('auto_sharding', 'create_disposition', 'error_handling', 'num_streams', 'primary_key', 'table', 'triggering_frequency_seconds', 'use_at_least_once_semantics', 'use_cdc_writes', 'write_disposition')
self = <apache_beam.io.external.xlang_bigqueryio_it_test.BigQueryXlangStorageWriteIT testMethod=test_write_with_beam_rows_cdc>

    def test_write_with_beam_rows_cdc(self):
      table = 'write_with_beam_rows_cdc'
      table_id = '{}:{}.{}'.format(self.project, self.dataset_id, table)
    
      expected_data_on_bq = [
          # (name, value)
          {
              "name": "cdc_test",
              "value": 5,
          }
      ]
    
      rows_with_cdc = [
          beam.Row(
              cdc_info=beam.Row(
                  mutation_type="UPSERT", change_sequence_number="AAA/2"),
              record=beam.Row(name="cdc_test", value=5)),
          beam.Row(
              cdc_info=beam.Row(
                  mutation_type="UPSERT", change_sequence_number="AAA/1"),
              record=beam.Row(name="cdc_test", value=3))
      ]
    
      bq_matcher = BigqueryFullResultMatcher(
          project=self.project,
          query="SELECT * FROM {}.{}".format(self.dataset_id, table),
          data=self.parse_expected_data(expected_data_on_bq))
    
      with beam.Pipeline(argv=self.args) as p:
        _ = (
>           p
            | beam.Create(rows_with_cdc)
            | StorageWriteToBigQuery(
                table=table_id,
                use_at_least_once=True,
                use_cdc_writes=True,
                cdc_writes_primary_key=["name"]))

apache_beam/io/external/xlang_bigqueryio_it_test.py:278: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
apache_beam/pvalue.py:138: in __or__
    return self.pipeline.apply(ptransform, self)
apache_beam/pipeline.py:754: in apply
    pvalueish_result = self.runner.apply(transform, pvalueish, self._options)
apache_beam/runners/runner.py:191: in apply
    return self.apply_PTransform(transform, input, options)
apache_beam/runners/runner.py:195: in apply_PTransform
    return transform.expand(input)
apache_beam/io/gcp/bigquery.py:2637: in expand
    input_beam_rows
apache_beam/pvalue.py:138: in __or__
    return self.pipeline.apply(ptransform, self)
apache_beam/pipeline.py:754: in apply
    pvalueish_result = self.runner.apply(transform, pvalueish, self._options)
apache_beam/runners/runner.py:191: in apply
    return self.apply_PTransform(transform, input, options)
apache_beam/runners/runner.py:195: in apply_PTransform
    return transform.expand(input)
apache_beam/transforms/external.py:429: in expand
    return pcolls | self._payload_builder.identifier() >> ExternalTransform(
apache_beam/transforms/external.py:653: in __init__
    payload.payload() if isinstance(payload, PayloadBuilder) else payload)
apache_beam/transforms/external.py:111: in payload
    return self.build().SerializeToString()
apache_beam/transforms/external.py:256: in build
    dict_to_row(self._schema_proto, self._kwargs)))
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

schema_proto = fields {
  name: "auto_sharding"
  type {
    nullable: true
    atomic_type: BOOLEAN
  }
}
fields {
  name: "create_d...rite_disposition"
  type {
    nullable: true
    atomic_type: STRING
  }
}
id: "e35130d7-8ab1-4ec6-aa3e-2768c6f33f5e"

py_value = {'auto_sharding': False, 'cdc_writes_primary_key': ['name'], 'create_disposition': 'CREATE_IF_NEEDED', 'error_handling': {'output': 'FailedRowsWithErrors'}, ...}

    def dict_to_row(schema_proto, py_value):
      row_type = named_tuple_from_schema(schema_proto)
      if isinstance(py_value, dict):
        extra = set(py_value.keys()) - set(row_type._fields)
        if extra:
>         raise ValueError(
              f"Unknown fields: {extra}. Valid fields: {row_type._fields}")
E         ValueError: Unknown fields: {'cdc_writes_primary_key'}. Valid fields: ('auto_sharding', 'create_disposition', 'error_handling', 'num_streams', 'primary_key', 'table', 'triggering_frequency_seconds', 'use_at_least_once_semantics', 'use_cdc_writes', 'write_disposition')

apache_beam/transforms/external.py:241: ValueError

Check notice on line 0 in .github

See this annotation in the file changed.

@github-actions github-actions / Test Results

23 skipped tests found

There are 23 skipped tests, see "Raw output" for the full list of skipped tests.
Raw output
apache_beam.examples.ml_transform.ml_transform_it_test
apache_beam.examples.snippets.snippets_test.SnippetsTest ‑ test_model_bigqueryio_xlang
apache_beam.examples.snippets.transforms.elementwise.mltransform_test
apache_beam.examples.snippets.transforms.elementwise.runinference_test
apache_beam.io.external.xlang_bigqueryio_it_test.BigQueryXlangStorageWriteIT ‑ test_streaming_with_auto_sharding
apache_beam.io.external.xlang_bigqueryio_it_test.BigQueryXlangStorageWriteIT ‑ test_streaming_with_fixed_num_streams
apache_beam.ml.inference.huggingface_inference_it_test
apache_beam.ml.inference.huggingface_inference_test
apache_beam.ml.inference.onnx_inference_test
apache_beam.ml.inference.pytorch_inference_test
apache_beam.ml.inference.tensorflow_inference_test
apache_beam.ml.inference.tensorrt_inference_test
apache_beam.ml.inference.vertex_ai_inference_it_test
apache_beam.ml.inference.xgboost_inference_test
apache_beam.ml.transforms.handlers_test
apache_beam.ml.transforms.tft_test
apache_beam.runners.dask.dask_runner_test
apache_beam.testing.analyzers.perf_analysis_test
apache_beam.testing.benchmarks.cloudml.cloudml_benchmark_test
apache_beam.transforms.enrichment_handlers.feast_feature_store_it_test
apache_beam.transforms.enrichment_handlers.feast_feature_store_test
apache_beam.typehints.pytorch_type_compatibility_test
apache_beam.yaml.yaml_ml_test

Check notice on line 0 in .github

See this annotation in the file changed.

@github-actions github-actions / Test Results

37 tests found

There are 37 tests, see "Raw output" for the full list of tests.
Raw output
apache_beam.examples.ml_transform.ml_transform_it_test
apache_beam.examples.snippets.snippets_test.SnippetsTest ‑ test_model_bigqueryio_xlang
apache_beam.examples.snippets.transforms.elementwise.mltransform_test
apache_beam.examples.snippets.transforms.elementwise.runinference_test
apache_beam.io.external.xlang_bigqueryio_it_test.BigQueryXlangStorageWriteIT ‑ test_all_types
apache_beam.io.external.xlang_bigqueryio_it_test.BigQueryXlangStorageWriteIT ‑ test_nested_records_and_lists
apache_beam.io.external.xlang_bigqueryio_it_test.BigQueryXlangStorageWriteIT ‑ test_streaming_with_at_least_once
apache_beam.io.external.xlang_bigqueryio_it_test.BigQueryXlangStorageWriteIT ‑ test_streaming_with_auto_sharding
apache_beam.io.external.xlang_bigqueryio_it_test.BigQueryXlangStorageWriteIT ‑ test_streaming_with_fixed_num_streams
apache_beam.io.external.xlang_bigqueryio_it_test.BigQueryXlangStorageWriteIT ‑ test_with_at_least_once_semantics
apache_beam.io.external.xlang_bigqueryio_it_test.BigQueryXlangStorageWriteIT ‑ test_write_to_dynamic_destinations
apache_beam.io.external.xlang_bigqueryio_it_test.BigQueryXlangStorageWriteIT ‑ test_write_to_dynamic_destinations_with_beam_rows
apache_beam.io.external.xlang_bigqueryio_it_test.BigQueryXlangStorageWriteIT ‑ test_write_with_beam_rows
apache_beam.io.external.xlang_bigqueryio_it_test.BigQueryXlangStorageWriteIT ‑ test_write_with_beam_rows_cdc
apache_beam.io.gcp.bigtableio_it_test.TestReadFromBigTableIT ‑ test_read_xlang
apache_beam.io.gcp.bigtableio_it_test.TestWriteToBigtableXlangIT ‑ test_delete_cells_mutation
apache_beam.io.gcp.bigtableio_it_test.TestWriteToBigtableXlangIT ‑ test_delete_cells_with_timerange_mutation
apache_beam.io.gcp.bigtableio_it_test.TestWriteToBigtableXlangIT ‑ test_delete_column_family_mutation
apache_beam.io.gcp.bigtableio_it_test.TestWriteToBigtableXlangIT ‑ test_delete_row_mutation
apache_beam.io.gcp.bigtableio_it_test.TestWriteToBigtableXlangIT ‑ test_set_mutation
apache_beam.ml.inference.huggingface_inference_it_test
apache_beam.ml.inference.huggingface_inference_test
apache_beam.ml.inference.onnx_inference_test
apache_beam.ml.inference.pytorch_inference_test
apache_beam.ml.inference.tensorflow_inference_test
apache_beam.ml.inference.tensorrt_inference_test
apache_beam.ml.inference.vertex_ai_inference_it_test
apache_beam.ml.inference.xgboost_inference_test
apache_beam.ml.transforms.handlers_test
apache_beam.ml.transforms.tft_test
apache_beam.runners.dask.dask_runner_test
apache_beam.testing.analyzers.perf_analysis_test
apache_beam.testing.benchmarks.cloudml.cloudml_benchmark_test
apache_beam.transforms.enrichment_handlers.feast_feature_store_it_test
apache_beam.transforms.enrichment_handlers.feast_feature_store_test
apache_beam.typehints.pytorch_type_compatibility_test
apache_beam.yaml.yaml_ml_test