diff --git a/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json b/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json
index b26833333238..e3d6056a5de9 100644
--- a/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json
+++ b/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run",
- "modification": 2
+ "modification": 1
}
diff --git a/CHANGES.md b/CHANGES.md
index 91bdfef69161..1aee8283bcb7 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -88,6 +88,15 @@
This new implementation still supports all (immutable) List methods as before,
but some of the random access methods like get() and size() will be slower.
To use the old implementation one can use View.asList().withRandomAccess().
+* SchemaTransforms implemented with TypedSchemaTransformProvider now produce a
+ configuration Schema with snake_case naming convention
+ ([#31374](https://github.com/apache/beam/pull/31374)). This will make the following
+ cases problematic:
+ * Running a pre-2.57.0 remote SDK pipeline containing a 2.57.0+ Java SchemaTransform,
+ and vice versa:
+ * Running a 2.57.0+ remote SDK pipeline containing a pre-2.57.0 Java SchemaTransform
+ * All direct uses of Python's [SchemaAwareExternalTransform](https://github.com/apache/beam/blob/a998107a1f5c3050821eef6a5ad5843d8adb8aec/sdks/python/apache_beam/transforms/external.py#L381)
+ should be updated to use new snake_case parameter names.
## Deprecations
diff --git a/sdks/go/pkg/beam/io/xlang/bigtableio/bigtable.go b/sdks/go/pkg/beam/io/xlang/bigtableio/bigtable.go
index 5b6d7d916310..81df24223cac 100644
--- a/sdks/go/pkg/beam/io/xlang/bigtableio/bigtable.go
+++ b/sdks/go/pkg/beam/io/xlang/bigtableio/bigtable.go
@@ -62,9 +62,9 @@ import (
)
type bigtableConfig struct {
- InstanceId string `beam:"instanceId"`
- ProjectId string `beam:"projectId"`
- TableId string `beam:"tableId"`
+ InstanceId string `beam:"instance_id"`
+ ProjectId string `beam:"project_id"`
+ TableId string `beam:"table_id"`
}
// Cell represents a single cell in a Bigtable row.
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/TypedSchemaTransformProvider.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/TypedSchemaTransformProvider.java
index d5c6c724c6f5..d9b49dd3ca27 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/TypedSchemaTransformProvider.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/TypedSchemaTransformProvider.java
@@ -17,8 +17,10 @@
*/
package org.apache.beam.sdk.schemas.transforms;
+import static org.apache.beam.sdk.schemas.annotations.DefaultSchema.DefaultSchemaProvider;
import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
+import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;
import java.lang.reflect.ParameterizedType;
import java.util.List;
@@ -26,9 +28,14 @@
import javax.annotation.Nullable;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
import org.apache.beam.sdk.schemas.NoSuchSchemaException;
import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.SchemaProvider;
import org.apache.beam.sdk.schemas.SchemaRegistry;
+import org.apache.beam.sdk.schemas.io.InvalidConfigurationException;
+import org.apache.beam.sdk.schemas.io.InvalidSchemaException;
+import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.Row;
/**
@@ -38,8 +45,12 @@
*
ConfigT should be available in the SchemaRegistry.
*
*
{@link #configurationSchema()} produces a configuration {@link Schema} that is inferred from
- * {@code ConfigT} using the SchemaRegistry. A Beam {@link Row} can still be used produce a {@link
- * SchemaTransform} using {@link #from(Row)}, as long as the Row fits the configuration Schema.
+ * {@code ConfigT} using the SchemaRegistry. A Beam {@link Row} can still be used to produce a
+ * {@link SchemaTransform} using {@link #from(Row)}, as long as the Row fits the configuration
+ * Schema.
+ *
+ *
NOTE: The inferred field names in the configuration {@link Schema} and {@link Row} follow the
+ * {@code snake_case} naming convention.
*
*
Internal only: This interface is actively being worked on and it will likely change as
* we provide implementations for more standard Beam transforms. We provide no backwards
@@ -78,10 +89,11 @@ Optional> dependencies(ConfigT configuration, PipelineOptions optio
}
@Override
- public Schema configurationSchema() {
+ public final Schema configurationSchema() {
try {
// Sort the fields by name to ensure a consistent schema is produced
- return SchemaRegistry.createDefault().getSchema(configurationClass()).sorted();
+ // We also establish a `snake_case` convention for all SchemaTransform configurations
+ return SchemaRegistry.createDefault().getSchema(configurationClass()).sorted().toSnakeCase();
} catch (NoSuchSchemaException e) {
throw new RuntimeException(
"Unable to find schema for "
@@ -90,9 +102,12 @@ public Schema configurationSchema() {
}
}
- /** Produces a {@link SchemaTransform} from a Row configuration. */
+ /**
+ * Produces a {@link SchemaTransform} from a Row configuration. Row fields are expected to have
+ * `snake_case` naming convention.
+ */
@Override
- public SchemaTransform from(Row configuration) {
+ public final SchemaTransform from(Row configuration) {
return from(configFromRow(configuration));
}
@@ -103,9 +118,22 @@ public final Optional> dependencies(Row configuration, PipelineOpti
private ConfigT configFromRow(Row configuration) {
try {
- return SchemaRegistry.createDefault()
- .getFromRowFunction(configurationClass())
- .apply(configuration);
+ SchemaRegistry registry = SchemaRegistry.createDefault();
+ SerializableFunction rowToConfigT =
+ registry.getFromRowFunction(configurationClass());
+
+ // Configuration objects handled by the AutoValueSchema provider will expect Row fields with
+ // camelCase naming convention
+ SchemaProvider schemaProvider = registry.getSchemaProvider(configurationClass());
+ if (schemaProvider.getClass().equals(DefaultSchemaProvider.class)
+ && checkNotNull(
+ ((DefaultSchemaProvider) schemaProvider)
+ .getUnderlyingSchemaProvider(configurationClass()))
+ .getClass()
+ .equals(AutoValueSchema.class)) {
+ configuration = configuration.toCamelCase();
+ }
+ return rowToConfigT.apply(configuration);
} catch (NoSuchSchemaException e) {
throw new RuntimeException(
"Unable to find schema for " + identifier() + "SchemaTransformProvider's config");
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/TypedSchemaTransformProviderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/TypedSchemaTransformProviderTest.java
index b1dc0911a927..2eef0e30f805 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/TypedSchemaTransformProviderTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/TypedSchemaTransformProviderTest.java
@@ -130,8 +130,8 @@ public void testFrom() {
Row inputConfig =
Row.withSchema(provider.configurationSchema())
- .withFieldValue("stringField", "field1")
- .withFieldValue("integerField", Integer.valueOf(13))
+ .withFieldValue("string_field", "field1")
+ .withFieldValue("integer_field", Integer.valueOf(13))
.build();
Configuration outputConfig = ((FakeSchemaTransform) provider.from(inputConfig)).config;
@@ -150,8 +150,8 @@ public void testDependencies() {
SchemaTransformProvider provider = new FakeTypedSchemaIOProvider();
Row inputConfig =
Row.withSchema(provider.configurationSchema())
- .withFieldValue("stringField", "field1")
- .withFieldValue("integerField", Integer.valueOf(13))
+ .withFieldValue("string_field", "field1")
+ .withFieldValue("integer_field", Integer.valueOf(13))
.build();
assertEquals(Arrays.asList("field1", "13"), provider.dependencies(inputConfig, null).get());
diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergReadSchemaTransformProvider.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergReadSchemaTransformProvider.java
index bfe2fab1f9a2..fb32e18d9374 100644
--- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergReadSchemaTransformProvider.java
+++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergReadSchemaTransformProvider.java
@@ -25,7 +25,6 @@
import org.apache.beam.sdk.managed.ManagedTransformConstants;
import org.apache.beam.sdk.schemas.AutoValueSchema;
import org.apache.beam.sdk.schemas.NoSuchSchemaException;
-import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.SchemaRegistry;
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
@@ -132,15 +131,4 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) {
return PCollectionRowTuple.of(OUTPUT_TAG, output);
}
}
-
- // TODO: set global snake_case naming convention and remove these special cases
- @Override
- public SchemaTransform from(Row rowConfig) {
- return super.from(rowConfig.toCamelCase());
- }
-
- @Override
- public Schema configurationSchema() {
- return super.configurationSchema().toSnakeCase();
- }
}
diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProvider.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProvider.java
index 71183c6b0a03..b490693a9adb 100644
--- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProvider.java
+++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProvider.java
@@ -176,15 +176,4 @@ public Row apply(KV input) {
}
}
}
-
- // TODO: set global snake_case naming convention and remove these special cases
- @Override
- public SchemaTransform from(Row rowConfig) {
- return super.from(rowConfig.toCamelCase());
- }
-
- @Override
- public Schema configurationSchema() {
- return super.configurationSchema().toSnakeCase();
- }
}
diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProviderTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProviderTest.java
index d5962a737baf..f5ac5bb54ad7 100644
--- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProviderTest.java
+++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProviderTest.java
@@ -121,17 +121,17 @@ public void testFindTransformAndMakeItWork() {
assertEquals(
Sets.newHashSet(
- "bootstrapServers",
+ "bootstrap_servers",
"topic",
"schema",
- "autoOffsetResetConfig",
- "consumerConfigUpdates",
+ "auto_offset_reset_config",
+ "consumer_config_updates",
"format",
- "confluentSchemaRegistrySubject",
- "confluentSchemaRegistryUrl",
- "errorHandling",
- "fileDescriptorPath",
- "messageName"),
+ "confluent_schema_registry_subject",
+ "confluent_schema_registry_url",
+ "error_handling",
+ "file_descriptor_path",
+ "message_name"),
kafkaProvider.configurationSchema().getFields().stream()
.map(field -> field.getName())
.collect(Collectors.toSet()));
diff --git a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProvider.java b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProvider.java
index 0702137cffd3..6f97983d3260 100644
--- a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProvider.java
+++ b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProvider.java
@@ -258,15 +258,4 @@ synchronized Map getAllProviders() {
throw new RuntimeException(e.getMessage());
}
}
-
- // TODO: set global snake_case naming convention and remove these special cases
- @Override
- public SchemaTransform from(Row rowConfig) {
- return super.from(rowConfig.toCamelCase());
- }
-
- @Override
- public Schema configurationSchema() {
- return super.configurationSchema().toSnakeCase();
- }
}
diff --git a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedTransformConstants.java b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedTransformConstants.java
index 8165633cf15e..141544305a38 100644
--- a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedTransformConstants.java
+++ b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedTransformConstants.java
@@ -45,27 +45,10 @@ public class ManagedTransformConstants {
public static final String KAFKA_WRITE = "beam:schematransform:org.apache.beam:kafka_write:v1";
private static final Map KAFKA_READ_MAPPINGS =
- ImmutableMap.builder()
- .put("topic", "topic")
- .put("bootstrap_servers", "bootstrapServers")
- .put("consumer_config_updates", "consumerConfigUpdates")
- .put("confluent_schema_registry_url", "confluentSchemaRegistryUrl")
- .put("confluent_schema_registry_subject", "confluentSchemaRegistrySubject")
- .put("data_format", "format")
- .put("schema", "schema")
- .put("file_descriptor_path", "fileDescriptorPath")
- .put("message_name", "messageName")
- .build();
+ ImmutableMap.builder().put("data_format", "format").build();
private static final Map KAFKA_WRITE_MAPPINGS =
- ImmutableMap.builder()
- .put("topic", "topic")
- .put("bootstrap_servers", "bootstrapServers")
- .put("producer_config_updates", "producerConfigUpdates")
- .put("data_format", "format")
- .put("file_descriptor_path", "fileDescriptorPath")
- .put("message_name", "messageName")
- .build();
+ ImmutableMap.builder().put("data_format", "format").build();
public static final Map> MAPPINGS =
ImmutableMap.>builder()
diff --git a/sdks/java/managed/src/test/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProviderTest.java b/sdks/java/managed/src/test/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProviderTest.java
index 3a3465406c03..e9edf8751e34 100644
--- a/sdks/java/managed/src/test/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProviderTest.java
+++ b/sdks/java/managed/src/test/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProviderTest.java
@@ -51,7 +51,7 @@ public void testFailWhenNoConfigSpecified() {
@Test
public void testGetConfigRowFromYamlString() {
- String yamlString = "extraString: abc\n" + "extraInteger: 123";
+ String yamlString = "extra_string: abc\n" + "extra_integer: 123";
ManagedConfig config =
ManagedConfig.builder()
.setTransformIdentifier(TestSchemaTransformProvider.IDENTIFIER)
@@ -60,8 +60,8 @@ public void testGetConfigRowFromYamlString() {
Row expectedRow =
Row.withSchema(TestSchemaTransformProvider.SCHEMA)
- .withFieldValue("extraString", "abc")
- .withFieldValue("extraInteger", 123)
+ .withFieldValue("extra_string", "abc")
+ .withFieldValue("extra_integer", 123)
.build();
Row returnedRow =
@@ -84,8 +84,8 @@ public void testGetConfigRowFromYamlFile() throws URISyntaxException {
Schema configSchema = new TestSchemaTransformProvider().configurationSchema();
Row expectedRow =
Row.withSchema(configSchema)
- .withFieldValue("extraString", "abc")
- .withFieldValue("extraInteger", 123)
+ .withFieldValue("extra_string", "abc")
+ .withFieldValue("extra_integer", 123)
.build();
Row configRow =
ManagedSchemaTransformProvider.getRowConfig(
@@ -96,7 +96,7 @@ public void testGetConfigRowFromYamlFile() throws URISyntaxException {
@Test
public void testBuildWithYamlString() {
- String yamlString = "extraString: abc\n" + "extraInteger: 123";
+ String yamlString = "extra_string: abc\n" + "extra_integer: 123";
ManagedConfig config =
ManagedConfig.builder()
diff --git a/sdks/java/managed/src/test/java/org/apache/beam/sdk/managed/ManagedSchemaTransformTranslationTest.java b/sdks/java/managed/src/test/java/org/apache/beam/sdk/managed/ManagedSchemaTransformTranslationTest.java
index 0b0ad532dbd4..f7769a9e1d19 100644
--- a/sdks/java/managed/src/test/java/org/apache/beam/sdk/managed/ManagedSchemaTransformTranslationTest.java
+++ b/sdks/java/managed/src/test/java/org/apache/beam/sdk/managed/ManagedSchemaTransformTranslationTest.java
@@ -91,7 +91,7 @@ public void testReCreateTransformFromRowWithConfigUrl() throws URISyntaxExceptio
@Test
public void testReCreateTransformFromRowWithConfig() {
- String yamlString = "extraString: abc\n" + "extraInteger: 123";
+ String yamlString = "extra_string: abc\n" + "extra_integer: 123";
ManagedConfig originalConfig =
ManagedConfig.builder()
@@ -130,8 +130,8 @@ public void testProtoTranslation() throws Exception {
.setRowSchema(inputSchema);
Map underlyingConfig =
ImmutableMap.builder()
- .put("extraString", "abc")
- .put("extraInteger", 123)
+ .put("extra_string", "abc")
+ .put("extra_integer", 123)
.build();
String yamlStringConfig = YamlUtils.yamlStringFromMap(underlyingConfig);
Managed.ManagedTransform transform =
diff --git a/sdks/java/managed/src/test/java/org/apache/beam/sdk/managed/ManagedTest.java b/sdks/java/managed/src/test/java/org/apache/beam/sdk/managed/ManagedTest.java
index 260085486c81..7ed364d0e174 100644
--- a/sdks/java/managed/src/test/java/org/apache/beam/sdk/managed/ManagedTest.java
+++ b/sdks/java/managed/src/test/java/org/apache/beam/sdk/managed/ManagedTest.java
@@ -90,7 +90,7 @@ public void testManagedTestProviderWithConfigMap() {
.setIdentifier(TestSchemaTransformProvider.IDENTIFIER)
.build()
.withSupportedIdentifiers(Arrays.asList(TestSchemaTransformProvider.IDENTIFIER))
- .withConfig(ImmutableMap.of("extraString", "abc", "extraInteger", 123));
+ .withConfig(ImmutableMap.of("extra_string", "abc", "extra_integer", 123));
runTestProviderTest(writeOp);
}
diff --git a/sdks/java/managed/src/test/resources/test_config.yaml b/sdks/java/managed/src/test/resources/test_config.yaml
index 3967b6095eac..da3bd68546cf 100644
--- a/sdks/java/managed/src/test/resources/test_config.yaml
+++ b/sdks/java/managed/src/test/resources/test_config.yaml
@@ -17,5 +17,5 @@
# under the License.
#
-extraString: "abc"
-extraInteger: 123
+extra_string: "abc"
+extra_integer: 123
diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py b/sdks/python/apache_beam/io/gcp/bigquery.py
index caeed6b7b9b7..29b7b575932d 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery.py
@@ -2599,13 +2599,13 @@ def expand(self, input):
expansion_service=self._expansion_service,
rearrange_based_on_discovery=True,
table=table,
- createDisposition=self._create_disposition,
- writeDisposition=self._write_disposition,
- triggeringFrequencySeconds=self._triggering_frequency,
- autoSharding=self._with_auto_sharding,
- numStreams=self._num_storage_api_streams,
- useAtLeastOnceSemantics=self._use_at_least_once,
- errorHandling={
+ create_disposition=self._create_disposition,
+ write_disposition=self._write_disposition,
+ triggering_frequency_seconds=self._triggering_frequency,
+ auto_sharding=self._with_auto_sharding,
+ num_streams=self._num_storage_api_streams,
+ use_at_least_once_semantics=self._use_at_least_once,
+ error_handling={
'output': StorageWriteToBigQuery.FAILED_ROWS_WITH_ERRORS
}))
diff --git a/sdks/python/apache_beam/io/gcp/bigtableio.py b/sdks/python/apache_beam/io/gcp/bigtableio.py
index f8534f38ddfc..0f3944a791bd 100644
--- a/sdks/python/apache_beam/io/gcp/bigtableio.py
+++ b/sdks/python/apache_beam/io/gcp/bigtableio.py
@@ -225,9 +225,9 @@ def expand(self, input):
identifier=self.schematransform_config.identifier,
expansion_service=self._expansion_service,
rearrange_based_on_discovery=True,
- tableId=self._table_id,
- instanceId=self._instance_id,
- projectId=self._project_id)
+ table_id=self._table_id,
+ instance_id=self._instance_id,
+ project_id=self._project_id)
return (
input
@@ -323,9 +323,9 @@ def expand(self, input):
identifier=self.schematransform_config.identifier,
expansion_service=self._expansion_service,
rearrange_based_on_discovery=True,
- tableId=self._table_id,
- instanceId=self._instance_id,
- projectId=self._project_id)
+ table_id=self._table_id,
+ instance_id=self._instance_id,
+ project_id=self._project_id)
return (
input.pipeline
diff --git a/sdks/python/apache_beam/transforms/external_transform_provider.py b/sdks/python/apache_beam/transforms/external_transform_provider.py
index 2799bd1b9e93..67adda5aec03 100644
--- a/sdks/python/apache_beam/transforms/external_transform_provider.py
+++ b/sdks/python/apache_beam/transforms/external_transform_provider.py
@@ -39,32 +39,6 @@ def snake_case_to_upper_camel_case(string):
return output
-def snake_case_to_lower_camel_case(string):
- """Convert snake_case to lowerCamelCase"""
- if len(string) <= 1:
- return string.lower()
- upper = snake_case_to_upper_camel_case(string)
- return upper[0].lower() + upper[1:]
-
-
-def camel_case_to_snake_case(string):
- """Convert camelCase to snake_case"""
- arr = []
- word = []
- for i, n in enumerate(string):
- # If seeing an upper letter after a lower letter, we just witnessed a word
- # If seeing an upper letter and the next letter is lower, we may have just
- # witnessed an all caps word
- if n.isupper() and ((i > 0 and string[i - 1].islower()) or
- (i + 1 < len(string) and string[i + 1].islower())):
- arr.append(''.join(word))
- word = [n.lower()]
- else:
- word.append(n.lower())
- arr.append(''.join(word))
- return '_'.join(arr).strip('_')
-
-
# Information regarding a Wrapper parameter.
ParamInfo = namedtuple('ParamInfo', ['type', 'description', 'original_name'])
@@ -76,7 +50,7 @@ def get_config_with_descriptions(
descriptions = schematransform.configuration_schema._field_descriptions
fields_with_descriptions = {}
for field in schema.fields:
- fields_with_descriptions[camel_case_to_snake_case(field.name)] = ParamInfo(
+ fields_with_descriptions[field.name] = ParamInfo(
typing_from_runner_api(field.type),
descriptions[field.name],
field.name)
@@ -105,16 +79,11 @@ def __init__(self, expansion_service=None, **kwargs):
expansion_service or self.default_expansion_service
def expand(self, input):
- camel_case_kwargs = {
- snake_case_to_lower_camel_case(k): v
- for k, v in self._kwargs.items()
- }
-
external_schematransform = SchemaAwareExternalTransform(
identifier=self.identifier,
expansion_service=self._expansion_service,
rearrange_based_on_discovery=True,
- **camel_case_kwargs)
+ **self._kwargs)
return input | external_schematransform
diff --git a/sdks/python/apache_beam/transforms/external_transform_provider_it_test.py b/sdks/python/apache_beam/transforms/external_transform_provider_it_test.py
index a53001c85fd3..95720cee7eee 100644
--- a/sdks/python/apache_beam/transforms/external_transform_provider_it_test.py
+++ b/sdks/python/apache_beam/transforms/external_transform_provider_it_test.py
@@ -37,9 +37,7 @@
from apache_beam.transforms.external_transform_provider import STANDARD_URN_PATTERN
from apache_beam.transforms.external_transform_provider import ExternalTransform
from apache_beam.transforms.external_transform_provider import ExternalTransformProvider
-from apache_beam.transforms.external_transform_provider import camel_case_to_snake_case
from apache_beam.transforms.external_transform_provider import infer_name_from_identifier
-from apache_beam.transforms.external_transform_provider import snake_case_to_lower_camel_case
from apache_beam.transforms.external_transform_provider import snake_case_to_upper_camel_case
from apache_beam.transforms.xlang.io import GenerateSequence
@@ -54,26 +52,6 @@ def test_snake_case_to_upper_camel_case(self):
for case in test_cases:
self.assertEqual(case[1], snake_case_to_upper_camel_case(case[0]))
- def test_snake_case_to_lower_camel_case(self):
- test_cases = [("", ""), ("test", "test"), ("test_name", "testName"),
- ("test_double_underscore", "testDoubleUnderscore"),
- ("TEST_CAPITALIZED", "testCapitalized"),
- ("_prepended_underscore", "prependedUnderscore"),
- ("appended_underscore_", "appendedUnderscore")]
- for case in test_cases:
- self.assertEqual(case[1], snake_case_to_lower_camel_case(case[0]))
-
- def test_camel_case_to_snake_case(self):
- test_cases = [("", ""), ("Test", "test"), ("TestName", "test_name"),
- ("TestDoubleUnderscore",
- "test_double_underscore"), ("MyToLoFo", "my_to_lo_fo"),
- ("BEGINNINGAllCaps",
- "beginning_all_caps"), ("AllCapsENDING", "all_caps_ending"),
- ("AllCapsMIDDLEWord", "all_caps_middle_word"),
- ("lowerCamelCase", "lower_camel_case")]
- for case in test_cases:
- self.assertEqual(case[1], camel_case_to_snake_case(case[0]))
-
def test_infer_name_from_identifier(self):
standard_test_cases = [
("beam:schematransform:org.apache.beam:transform:v1", "Transform"),
diff --git a/sdks/python/apache_beam/yaml/standard_io.yaml b/sdks/python/apache_beam/yaml/standard_io.yaml
index 8a5ffd9f6a9c..005e1af05495 100644
--- a/sdks/python/apache_beam/yaml/standard_io.yaml
+++ b/sdks/python/apache_beam/yaml/standard_io.yaml
@@ -30,16 +30,16 @@
mappings:
'ReadFromBigQuery':
query: 'query'
- table: 'tableSpec'
- fields: 'selectedFields'
- row_restriction: 'rowRestriction'
+ table: 'table_spec'
+ fields: 'selected_fields'
+ row_restriction: 'row_restriction'
'WriteToBigQuery':
table: 'table'
- create_disposition: 'createDisposition'
- write_disposition: 'writeDisposition'
- error_handling: 'errorHandling'
+ create_disposition: 'create_disposition'
+ write_disposition: 'write_disposition'
+ error_handling: 'error_handling'
# TODO(https://github.com/apache/beam/issues/30058): Required until autosharding support is fixed
- num_streams: 'numStreams'
+ num_streams: 'num_streams'
underlying_provider:
type: beamJar
transforms:
@@ -56,24 +56,24 @@
mappings:
'ReadFromKafka':
'schema': 'schema'
- 'consumer_config': 'consumerConfigUpdates'
+ 'consumer_config': 'consumer_config_updates'
'format': 'format'
'topic': 'topic'
- 'bootstrap_servers': 'bootstrapServers'
- 'confluent_schema_registry_url': 'confluentSchemaRegistryUrl'
- 'confluent_schema_registry_subject': 'confluentSchemaRegistrySubject'
- 'auto_offset_reset_config': 'autoOffsetResetConfig'
- 'error_handling': 'errorHandling'
- 'file_descriptor_path': 'fileDescriptorPath'
- 'message_name': 'messageName'
+ 'bootstrap_servers': 'bootstrap_servers'
+ 'confluent_schema_registry_url': 'confluent_schema_registry_url'
+ 'confluent_schema_registry_subject': 'confluent_schema_registry_subject'
+ 'auto_offset_reset_config': 'auto_offset_reset_config'
+ 'error_handling': 'error_handling'
+ 'file_descriptor_path': 'file_descriptor_path'
+ 'message_name': 'message_name'
'WriteToKafka':
'format': 'format'
'topic': 'topic'
- 'bootstrap_servers': 'bootstrapServers'
- 'producer_config_updates': 'producerConfigUpdates'
- 'error_handling': 'errorHandling'
- 'file_descriptor_path': 'fileDescriptorPath'
- 'message_name': 'messageName'
+ 'bootstrap_servers': 'bootstrap_servers'
+ 'producer_config_updates': 'producer_config_updates'
+ 'error_handling': 'error_handling'
+ 'file_descriptor_path': 'file_descriptor_path'
+ 'message_name': 'message_name'
'schema': 'schema'
underlying_provider:
type: beamJar
@@ -93,24 +93,24 @@
'project': 'project'
'schema': 'schema'
'format': 'format'
- 'subscription_name': 'subscriptionName'
+ 'subscription_name': 'subscription_name'
'location': 'location'
'attributes': 'attributes'
- 'attribute_map': 'attributeMap'
- 'attribute_id': 'attributeId'
- 'error_handling': 'errorHandling'
- 'file_descriptor_path': 'fileDescriptorPath'
- 'message_name': 'messageName'
+ 'attribute_map': 'attribute_map'
+ 'attribute_id': 'attribute_id'
+ 'error_handling': 'error_handling'
+ 'file_descriptor_path': 'file_descriptor_path'
+ 'message_name': 'message_name'
'WriteToPubSubLite':
'project': 'project'
'format': 'format'
- 'topic_name': 'topicName'
+ 'topic_name': 'topic_name'
'location': 'location'
'attributes': 'attributes'
- 'attribute_id': 'attributeId'
- 'error_handling': 'errorHandling'
- 'file_descriptor_path': 'fileDescriptorPath'
- 'message_name': 'messageName'
+ 'attribute_id': 'attribute_id'
+ 'error_handling': 'error_handling'
+ 'file_descriptor_path': 'file_descriptor_path'
+ 'message_name': 'message_name'
'schema': 'schema'
underlying_provider:
type: beamJar
@@ -205,26 +205,26 @@
config:
mappings:
'ReadFromJdbc':
- driver_class_name: 'driverClassName'
- type: 'jdbcType'
- url: 'jdbcUrl'
+ driver_class_name: 'driver_class_name'
+ type: 'jdbc_type'
+ url: 'jdbc_url'
username: 'username'
password: 'password'
table: 'location'
- query: 'readQuery'
- driver_jars: 'driverJars'
- connection_properties: 'connectionProperties'
- connection_init_sql: 'connectionInitSql'
+ query: 'read_query'
+ driver_jars: 'driver_jars'
+ connection_properties: 'connection_properties'
+ connection_init_sql: 'connection_init_sql'
'WriteToJdbc':
- driver_class_name: 'driverClassName'
- type: 'jdbcType'
- url: 'jdbcUrl'
+ driver_class_name: 'driver_class_name'
+ type: 'jdbc_type'
+ url: 'jdbc_url'
username: 'username'
password: 'password'
table: 'location'
- driver_jars: 'driverJars'
- connection_properties: 'connectionProperties'
- connection_init_sql: 'connectionInitSql'
+ driver_jars: 'driver_jars'
+ connection_properties: 'connection_properties'
+ connection_init_sql: 'connection_init_sql'
'ReadFromMySql': 'ReadFromJdbc'
'WriteToMySql': 'WriteToJdbc'
'ReadFromPostgres': 'ReadFromJdbc'
diff --git a/sdks/python/apache_beam/yaml/standard_providers.yaml b/sdks/python/apache_beam/yaml/standard_providers.yaml
index 89b0cc9d553e..8d0037d4dd9f 100644
--- a/sdks/python/apache_beam/yaml/standard_providers.yaml
+++ b/sdks/python/apache_beam/yaml/standard_providers.yaml
@@ -68,20 +68,20 @@
append: 'append'
drop: 'drop'
fields: 'fields'
- error_handling: 'errorHandling'
+ error_handling: 'error_handling'
'MapToFields-java':
language: 'language'
append: 'append'
drop: 'drop'
fields: 'fields'
- error_handling: 'errorHandling'
+ error_handling: 'error_handling'
'Filter-java':
language: 'language'
keep: 'keep'
- error_handling: 'errorHandling'
+ error_handling: 'error_handling'
'Explode':
fields: 'fields'
- cross_product: 'crossProduct'
+ cross_product: 'cross_product'
underlying_provider:
type: beamJar
transforms:
diff --git a/sdks/python/apache_beam/yaml/yaml_provider.py b/sdks/python/apache_beam/yaml/yaml_provider.py
index d5f6d03c2843..794cad0ec7f4 100755
--- a/sdks/python/apache_beam/yaml/yaml_provider.py
+++ b/sdks/python/apache_beam/yaml/yaml_provider.py
@@ -929,7 +929,7 @@ def java_window_into(java_provider, windowing):
return java_provider.create_transform(
'WindowIntoStrategy',
{
- 'serializedWindowingStrategy': windowing_strategy.to_runner_api(
+ 'serialized_windowing_strategy': windowing_strategy.to_runner_api(
empty_context).SerializeToString()
},
None)
diff --git a/sdks/python/gen_xlang_wrappers.py b/sdks/python/gen_xlang_wrappers.py
index a75fc05cba73..ea4f496c2d04 100644
--- a/sdks/python/gen_xlang_wrappers.py
+++ b/sdks/python/gen_xlang_wrappers.py
@@ -233,24 +233,6 @@ def pretty_type(tp):
return (tp, nullable)
-def camel_case_to_snake_case(string):
- """Convert camelCase to snake_case"""
- arr = []
- word = []
- for i, n in enumerate(string):
- # If seeing an upper letter after a lower letter, we just witnessed a word
- # If seeing an upper letter and the next letter is lower, we may have just
- # witnessed an all caps word
- if n.isupper() and ((i > 0 and string[i - 1].islower()) or
- (i + 1 < len(string) and string[i + 1].islower())):
- arr.append(''.join(word))
- word = [n.lower()]
- else:
- word.append(n.lower())
- arr.append(''.join(word))
- return '_'.join(arr).strip('_')
-
-
def get_wrappers_from_transform_configs(config_file) -> Dict[str, List[str]]:
"""
Generates code for external transform wrapper classes (subclasses of
@@ -287,9 +269,8 @@ def get_wrappers_from_transform_configs(config_file) -> Dict[str, List[str]]:
parameters = []
for param, info in fields.items():
- pythonic_name = camel_case_to_snake_case(param)
param_details = {
- "name": pythonic_name,
+ "name": param,
"type": info['type'],
"description": info['description'],
}