diff --git a/README.md b/README.md index b9299c9b..3fff89b4 100644 --- a/README.md +++ b/README.md @@ -33,6 +33,7 @@ The zip archive will be found under `./kafka-connect-runtime/build/distributions | iceberg.tables.auto-create-enabled | Set to `true` to automatically create destination tables, default is `false` | | iceberg.tables.evolve-schema-enabled | Set to `true` to add any missing record fields to the table schema, default is `false` | | iceberg.tables.schema-force-optional | Set to `true` to set columns as optional during table create and evolution, default is `false` to respect schema | +| iceberg.tables.schema-case-insensitive | Set to `true` to look up table columns by case-insensitive name, default is `false` for case-sensitive | | iceberg.tables.auto-create-props.* | Properties set on new tables during auto-create | | iceberg.tables.write-props.* | Properties passed through to Iceberg writer initialization, these take precedence | | iceberg.table.\.commit-branch | Table-specific branch for commits, use `iceberg.tables.default-commit-branch` if not specified | diff --git a/kafka-connect/src/main/java/io/tabular/iceberg/connect/IcebergSinkConfig.java b/kafka-connect/src/main/java/io/tabular/iceberg/connect/IcebergSinkConfig.java index d3ade9f2..1c212879 100644 --- a/kafka-connect/src/main/java/io/tabular/iceberg/connect/IcebergSinkConfig.java +++ b/kafka-connect/src/main/java/io/tabular/iceberg/connect/IcebergSinkConfig.java @@ -77,10 +77,12 @@ public class IcebergSinkConfig extends AbstractConfig { "iceberg.tables.upsert-mode-enabled"; private static final String TABLES_AUTO_CREATE_ENABLED_PROP = "iceberg.tables.auto-create-enabled"; - private static final String TABLES_SCHEMA_FORCE_OPTIONAL_PROP = - "iceberg.tables.schema-force-optional"; private static final String TABLES_EVOLVE_SCHEMA_ENABLED_PROP = "iceberg.tables.evolve-schema-enabled"; + private static final String TABLES_SCHEMA_FORCE_OPTIONAL_PROP = + "iceberg.tables.schema-force-optional"; + private static final String TABLES_SCHEMA_CASE_INSENSITIVE_PROP = + "iceberg.tables.schema-case-insensitive"; private static final String CONTROL_TOPIC_PROP = "iceberg.control.topic"; private static final String CONTROL_GROUP_ID_PROP = "iceberg.control.group-id"; private static final String COMMIT_INTERVAL_MS_PROP = "iceberg.control.commit.interval-ms"; @@ -175,6 +177,12 @@ private static ConfigDef newConfigDef() { false, Importance.MEDIUM, "Set to true to set columns as optional during table create and evolution, false to respect schema"); + configDef.define( + TABLES_SCHEMA_CASE_INSENSITIVE_PROP, + Type.BOOLEAN, + false, + Importance.MEDIUM, + "Set to true to look up table columns by case-insensitive name, false for case-sensitive"); configDef.define( TABLES_EVOLVE_SCHEMA_ENABLED_PROP, Type.BOOLEAN, @@ -427,12 +435,16 @@ public boolean autoCreateEnabled() { return getBoolean(TABLES_AUTO_CREATE_ENABLED_PROP); } + public boolean evolveSchemaEnabled() { + return getBoolean(TABLES_EVOLVE_SCHEMA_ENABLED_PROP); + } + public boolean schemaForceOptional() { return getBoolean(TABLES_SCHEMA_FORCE_OPTIONAL_PROP); } - public boolean evolveSchemaEnabled() { - return getBoolean(TABLES_EVOLVE_SCHEMA_ENABLED_PROP); + public boolean schemaCaseInsensitive() { + return getBoolean(TABLES_SCHEMA_CASE_INSENSITIVE_PROP); } public JsonConverter jsonConverter() { diff --git a/kafka-connect/src/main/java/io/tabular/iceberg/connect/data/RecordConverter.java b/kafka-connect/src/main/java/io/tabular/iceberg/connect/data/RecordConverter.java index 568b3054..5f8d90ea 100644 --- a/kafka-connect/src/main/java/io/tabular/iceberg/connect/data/RecordConverter.java +++ b/kafka-connect/src/main/java/io/tabular/iceberg/connect/data/RecordConverter.java @@ -250,7 +250,9 @@ private GenericRecord convertToStruct( private NestedField lookupStructField(String fieldName, StructType schema, int structFieldId) { if (nameMapping == null) { - return schema.field(fieldName); + return config.schemaCaseInsensitive() + ? schema.caseInsensitiveField(fieldName) + : schema.field(fieldName); } return structNameMap diff --git a/kafka-connect/src/test/java/io/tabular/iceberg/connect/data/RecordConverterTest.java b/kafka-connect/src/test/java/io/tabular/iceberg/connect/data/RecordConverterTest.java index 8f96be5a..e90855b2 100644 --- a/kafka-connect/src/test/java/io/tabular/iceberg/connect/data/RecordConverterTest.java +++ b/kafka-connect/src/test/java/io/tabular/iceberg/connect/data/RecordConverterTest.java @@ -77,7 +77,10 @@ import org.apache.kafka.connect.json.JsonConverterConfig; import org.apache.kafka.connect.storage.ConverterConfig; import org.apache.kafka.connect.storage.ConverterType; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; public class RecordConverterTest { @@ -154,24 +157,30 @@ public class RecordConverterTest { private static final List LIST_VAL = ImmutableList.of("hello", "world"); private static final Map MAP_VAL = ImmutableMap.of("one", "1", "two", "2"); - private static final IcebergSinkConfig CONFIG = mock(IcebergSinkConfig.class); + private static final JsonConverter JSON_CONVERTER = new JsonConverter(); static { - JsonConverter jsonConverter = new JsonConverter(); - jsonConverter.configure( + JSON_CONVERTER.configure( ImmutableMap.of( JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, false, ConverterConfig.TYPE_CONFIG, ConverterType.VALUE.getName())); - when(CONFIG.jsonConverter()).thenReturn(jsonConverter); + } + + private IcebergSinkConfig config; + + @BeforeEach + public void before() { + this.config = mock(IcebergSinkConfig.class); + when(config.jsonConverter()).thenReturn(JSON_CONVERTER); } @Test public void testMapConvert() { Table table = mock(Table.class); when(table.schema()).thenReturn(SCHEMA); - RecordConverter converter = new RecordConverter(table, CONFIG); + RecordConverter converter = new RecordConverter(table, config); Map data = createMapData(); Record record = converter.convert(data); @@ -182,7 +191,7 @@ public void testMapConvert() { public void testNestedMapConvert() { Table table = mock(Table.class); when(table.schema()).thenReturn(NESTED_SCHEMA); - RecordConverter converter = new RecordConverter(table, CONFIG); + RecordConverter converter = new RecordConverter(table, config); Map nestedData = createNestedMapData(); Record record = converter.convert(nestedData); @@ -194,7 +203,7 @@ public void testNestedMapConvert() { public void testMapToString() throws Exception { Table table = mock(Table.class); when(table.schema()).thenReturn(SIMPLE_SCHEMA); - RecordConverter converter = new RecordConverter(table, CONFIG); + RecordConverter converter = new RecordConverter(table, config); Map nestedData = createNestedMapData(); Record record = converter.convert(nestedData); @@ -208,7 +217,7 @@ public void testMapToString() throws Exception { public void testStructConvert() { Table table = mock(Table.class); when(table.schema()).thenReturn(SCHEMA); - RecordConverter converter = new RecordConverter(table, CONFIG); + RecordConverter converter = new RecordConverter(table, config); Struct data = createStructData(); Record record = converter.convert(data); @@ -219,7 +228,7 @@ public void testStructConvert() { public void testNestedStructConvert() { Table table = mock(Table.class); when(table.schema()).thenReturn(NESTED_SCHEMA); - RecordConverter converter = new RecordConverter(table, CONFIG); + RecordConverter converter = new RecordConverter(table, config); Struct nestedData = createNestedStructData(); Record record = converter.convert(nestedData); @@ -231,7 +240,7 @@ public void testNestedStructConvert() { public void testStructToString() throws Exception { Table table = mock(Table.class); when(table.schema()).thenReturn(SIMPLE_SCHEMA); - RecordConverter converter = new RecordConverter(table, CONFIG); + RecordConverter converter = new RecordConverter(table, config); Struct nestedData = createNestedStructData(); Record record = converter.convert(nestedData); @@ -252,18 +261,45 @@ public void testNameMapping() { ImmutableMap.of( TableProperties.DEFAULT_NAME_MAPPING, NameMappingParser.toJson(nameMapping))); - RecordConverter converter = new RecordConverter(table, CONFIG); + RecordConverter converter = new RecordConverter(table, config); Map data = ImmutableMap.of("renamed_ii", 123); Record record = converter.convert(data); assertThat(record.getField("ii")).isEqualTo(123); } + @ParameterizedTest + @ValueSource(booleans = {false, true}) + public void testCaseSensitivity(boolean caseInsensitive) { + Table table = mock(Table.class); + when(table.schema()).thenReturn(SIMPLE_SCHEMA); + + when(config.schemaCaseInsensitive()).thenReturn(caseInsensitive); + + RecordConverter converter = new RecordConverter(table, config); + + Map mapData = ImmutableMap.of("II", 123); + Record record1 = converter.convert(mapData); + + Struct structData = + new Struct(SchemaBuilder.struct().field("II", Schema.INT32_SCHEMA).build()).put("II", 123); + Record record2 = converter.convert(structData); + + if (caseInsensitive) { + assertThat(record1.getField("ii")).isEqualTo(123); + assertThat(record2.getField("ii")).isEqualTo(123); + } else { + assertThat(record1.getField("ii")).isEqualTo(null); + assertThat(record2.getField("ii")).isEqualTo(null); + } + } + @Test public void testDecimalConversion() { Table table = mock(Table.class); when(table.schema()).thenReturn(SIMPLE_SCHEMA); - RecordConverter converter = new RecordConverter(table, CONFIG); + + RecordConverter converter = new RecordConverter(table, config); BigDecimal expected = new BigDecimal("123.45"); @@ -288,7 +324,7 @@ public void testDecimalConversion() { public void testDateConversion() { Table table = mock(Table.class); when(table.schema()).thenReturn(SIMPLE_SCHEMA); - RecordConverter converter = new RecordConverter(table, CONFIG); + RecordConverter converter = new RecordConverter(table, config); LocalDate expected = LocalDate.of(2023, 11, 15); @@ -310,7 +346,7 @@ public void testDateConversion() { public void testTimeConversion() { Table table = mock(Table.class); when(table.schema()).thenReturn(SIMPLE_SCHEMA); - RecordConverter converter = new RecordConverter(table, CONFIG); + RecordConverter converter = new RecordConverter(table, config); LocalTime expected = LocalTime.of(7, 51, 30, 888_000_000); @@ -345,7 +381,7 @@ public void testTimestampWithoutZoneConversion() { private void convertToTimestamps(Temporal expected, long expectedMillis, TimestampType type) { Table table = mock(Table.class); when(table.schema()).thenReturn(SIMPLE_SCHEMA); - RecordConverter converter = new RecordConverter(table, CONFIG); + RecordConverter converter = new RecordConverter(table, config); List inputList = ImmutableList.of( @@ -375,7 +411,7 @@ private void convertToTimestamps(Temporal expected, long expectedMillis, Timesta public void testMissingColumnDetectionMap() { Table table = mock(Table.class); when(table.schema()).thenReturn(ID_SCHEMA); - RecordConverter converter = new RecordConverter(table, CONFIG); + RecordConverter converter = new RecordConverter(table, config); Map data = Maps.newHashMap(createMapData()); data.put("null", null); @@ -413,7 +449,7 @@ public void testMissingColumnDetectionMap() { public void testMissingColumnDetectionMapNested() { Table table = mock(Table.class); when(table.schema()).thenReturn(ID_SCHEMA); - RecordConverter converter = new RecordConverter(table, CONFIG); + RecordConverter converter = new RecordConverter(table, config); Map nestedData = createNestedMapData(); List addCols = Lists.newArrayList(); @@ -449,7 +485,7 @@ public void testMissingColumnDetectionMapNested() { public void testMissingColumnDetectionStruct() { Table table = mock(Table.class); when(table.schema()).thenReturn(ID_SCHEMA); - RecordConverter converter = new RecordConverter(table, CONFIG); + RecordConverter converter = new RecordConverter(table, config); Struct data = createStructData(); List updates = Lists.newArrayList(); @@ -487,7 +523,7 @@ public void testEvolveTypeDetectionStruct() { Table table = mock(Table.class); when(table.schema()).thenReturn(tableSchema); - RecordConverter converter = new RecordConverter(table, CONFIG); + RecordConverter converter = new RecordConverter(table, config); Schema valueSchema = SchemaBuilder.struct().field("ii", Schema.INT64_SCHEMA).field("ff", Schema.FLOAT64_SCHEMA); @@ -521,7 +557,7 @@ public void testEvolveTypeDetectionStructNested() { Table table = mock(Table.class); when(table.schema()).thenReturn(tableSchema); - RecordConverter converter = new RecordConverter(table, CONFIG); + RecordConverter converter = new RecordConverter(table, config); Schema structSchema = SchemaBuilder.struct().field("ii", Schema.INT64_SCHEMA).field("ff", Schema.FLOAT64_SCHEMA); @@ -548,7 +584,7 @@ public void testEvolveTypeDetectionStructNested() { public void testMissingColumnDetectionStructNested() { Table table = mock(Table.class); when(table.schema()).thenReturn(ID_SCHEMA); - RecordConverter converter = new RecordConverter(table, CONFIG); + RecordConverter converter = new RecordConverter(table, config); Struct nestedData = createNestedStructData(); List addCols = Lists.newArrayList();