Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add config option to enable case-insensitive column lookup #164

Merged
merged 2 commits into from
Nov 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ The zip archive will be found under `./kafka-connect-runtime/build/distributions
| iceberg.tables.auto-create-enabled | Set to `true` to automatically create destination tables, default is `false` |
| iceberg.tables.evolve-schema-enabled | Set to `true` to add any missing record fields to the table schema, default is `false` |
| iceberg.tables.schema-force-optional | Set to `true` to set columns as optional during table create and evolution, default is `false` to respect schema |
| iceberg.tables.schema-case-insensitive | Set to `true` to look up table columns by case-insensitive name, default is `false` for case-sensitive |
| iceberg.tables.auto-create-props.* | Properties set on new tables during auto-create |
| iceberg.tables.write-props.* | Properties passed through to Iceberg writer initialization, these take precedence |
| iceberg.table.\<table name\>.commit-branch | Table-specific branch for commits, use `iceberg.tables.default-commit-branch` if not specified |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,12 @@ public class IcebergSinkConfig extends AbstractConfig {
"iceberg.tables.upsert-mode-enabled";
private static final String TABLES_AUTO_CREATE_ENABLED_PROP =
"iceberg.tables.auto-create-enabled";
private static final String TABLES_SCHEMA_FORCE_OPTIONAL_PROP =
"iceberg.tables.schema-force-optional";
private static final String TABLES_EVOLVE_SCHEMA_ENABLED_PROP =
"iceberg.tables.evolve-schema-enabled";
private static final String TABLES_SCHEMA_FORCE_OPTIONAL_PROP =
"iceberg.tables.schema-force-optional";
private static final String TABLES_SCHEMA_CASE_INSENSITIVE_PROP =
"iceberg.tables.schema-case-insensitive";
private static final String CONTROL_TOPIC_PROP = "iceberg.control.topic";
private static final String CONTROL_GROUP_ID_PROP = "iceberg.control.group-id";
private static final String COMMIT_INTERVAL_MS_PROP = "iceberg.control.commit.interval-ms";
Expand Down Expand Up @@ -175,6 +177,12 @@ private static ConfigDef newConfigDef() {
false,
Importance.MEDIUM,
"Set to true to set columns as optional during table create and evolution, false to respect schema");
configDef.define(
TABLES_SCHEMA_CASE_INSENSITIVE_PROP,
Type.BOOLEAN,
false,
Importance.MEDIUM,
"Set to true to look up table columns by case-insensitive name, false for case-sensitive");
configDef.define(
TABLES_EVOLVE_SCHEMA_ENABLED_PROP,
Type.BOOLEAN,
Expand Down Expand Up @@ -427,12 +435,16 @@ public boolean autoCreateEnabled() {
return getBoolean(TABLES_AUTO_CREATE_ENABLED_PROP);
}

public boolean evolveSchemaEnabled() {
return getBoolean(TABLES_EVOLVE_SCHEMA_ENABLED_PROP);
}

public boolean schemaForceOptional() {
return getBoolean(TABLES_SCHEMA_FORCE_OPTIONAL_PROP);
}

public boolean evolveSchemaEnabled() {
return getBoolean(TABLES_EVOLVE_SCHEMA_ENABLED_PROP);
public boolean schemaCaseInsensitive() {
return getBoolean(TABLES_SCHEMA_CASE_INSENSITIVE_PROP);
}

public JsonConverter jsonConverter() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,9 @@ private GenericRecord convertToStruct(

private NestedField lookupStructField(String fieldName, StructType schema, int structFieldId) {
if (nameMapping == null) {
return schema.field(fieldName);
return config.schemaCaseInsensitive()
? schema.caseInsensitiveField(fieldName)
: schema.field(fieldName);
}

return structNameMap
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,10 @@
import org.apache.kafka.connect.json.JsonConverterConfig;
import org.apache.kafka.connect.storage.ConverterConfig;
import org.apache.kafka.connect.storage.ConverterType;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

public class RecordConverterTest {

Expand Down Expand Up @@ -154,24 +157,30 @@ public class RecordConverterTest {
private static final List<String> LIST_VAL = ImmutableList.of("hello", "world");
private static final Map<String, String> MAP_VAL = ImmutableMap.of("one", "1", "two", "2");

private static final IcebergSinkConfig CONFIG = mock(IcebergSinkConfig.class);
private static final JsonConverter JSON_CONVERTER = new JsonConverter();

static {
JsonConverter jsonConverter = new JsonConverter();
jsonConverter.configure(
JSON_CONVERTER.configure(
ImmutableMap.of(
JsonConverterConfig.SCHEMAS_ENABLE_CONFIG,
false,
ConverterConfig.TYPE_CONFIG,
ConverterType.VALUE.getName()));
when(CONFIG.jsonConverter()).thenReturn(jsonConverter);
}

private IcebergSinkConfig config;

@BeforeEach
public void before() {
this.config = mock(IcebergSinkConfig.class);
when(config.jsonConverter()).thenReturn(JSON_CONVERTER);
}

@Test
public void testMapConvert() {
Table table = mock(Table.class);
when(table.schema()).thenReturn(SCHEMA);
RecordConverter converter = new RecordConverter(table, CONFIG);
RecordConverter converter = new RecordConverter(table, config);

Map<String, Object> data = createMapData();
Record record = converter.convert(data);
Expand All @@ -182,7 +191,7 @@ public void testMapConvert() {
public void testNestedMapConvert() {
Table table = mock(Table.class);
when(table.schema()).thenReturn(NESTED_SCHEMA);
RecordConverter converter = new RecordConverter(table, CONFIG);
RecordConverter converter = new RecordConverter(table, config);

Map<String, Object> nestedData = createNestedMapData();
Record record = converter.convert(nestedData);
Expand All @@ -194,7 +203,7 @@ public void testNestedMapConvert() {
public void testMapToString() throws Exception {
Table table = mock(Table.class);
when(table.schema()).thenReturn(SIMPLE_SCHEMA);
RecordConverter converter = new RecordConverter(table, CONFIG);
RecordConverter converter = new RecordConverter(table, config);

Map<String, Object> nestedData = createNestedMapData();
Record record = converter.convert(nestedData);
Expand All @@ -208,7 +217,7 @@ public void testMapToString() throws Exception {
public void testStructConvert() {
Table table = mock(Table.class);
when(table.schema()).thenReturn(SCHEMA);
RecordConverter converter = new RecordConverter(table, CONFIG);
RecordConverter converter = new RecordConverter(table, config);

Struct data = createStructData();
Record record = converter.convert(data);
Expand All @@ -219,7 +228,7 @@ public void testStructConvert() {
public void testNestedStructConvert() {
Table table = mock(Table.class);
when(table.schema()).thenReturn(NESTED_SCHEMA);
RecordConverter converter = new RecordConverter(table, CONFIG);
RecordConverter converter = new RecordConverter(table, config);

Struct nestedData = createNestedStructData();
Record record = converter.convert(nestedData);
Expand All @@ -231,7 +240,7 @@ public void testNestedStructConvert() {
public void testStructToString() throws Exception {
Table table = mock(Table.class);
when(table.schema()).thenReturn(SIMPLE_SCHEMA);
RecordConverter converter = new RecordConverter(table, CONFIG);
RecordConverter converter = new RecordConverter(table, config);

Struct nestedData = createNestedStructData();
Record record = converter.convert(nestedData);
Expand All @@ -252,18 +261,45 @@ public void testNameMapping() {
ImmutableMap.of(
TableProperties.DEFAULT_NAME_MAPPING, NameMappingParser.toJson(nameMapping)));

RecordConverter converter = new RecordConverter(table, CONFIG);
RecordConverter converter = new RecordConverter(table, config);

Map<String, Object> data = ImmutableMap.of("renamed_ii", 123);
Record record = converter.convert(data);
assertThat(record.getField("ii")).isEqualTo(123);
}

@ParameterizedTest
@ValueSource(booleans = {false, true})
public void testCaseSensitivity(boolean caseInsensitive) {
Table table = mock(Table.class);
when(table.schema()).thenReturn(SIMPLE_SCHEMA);

when(config.schemaCaseInsensitive()).thenReturn(caseInsensitive);

RecordConverter converter = new RecordConverter(table, config);

Map<String, Object> mapData = ImmutableMap.of("II", 123);
Record record1 = converter.convert(mapData);

Struct structData =
new Struct(SchemaBuilder.struct().field("II", Schema.INT32_SCHEMA).build()).put("II", 123);
Record record2 = converter.convert(structData);

if (caseInsensitive) {
assertThat(record1.getField("ii")).isEqualTo(123);
assertThat(record2.getField("ii")).isEqualTo(123);
} else {
assertThat(record1.getField("ii")).isEqualTo(null);
assertThat(record2.getField("ii")).isEqualTo(null);
}
}

@Test
public void testDecimalConversion() {
Table table = mock(Table.class);
when(table.schema()).thenReturn(SIMPLE_SCHEMA);
RecordConverter converter = new RecordConverter(table, CONFIG);

RecordConverter converter = new RecordConverter(table, config);

BigDecimal expected = new BigDecimal("123.45");

Expand All @@ -288,7 +324,7 @@ public void testDecimalConversion() {
public void testDateConversion() {
Table table = mock(Table.class);
when(table.schema()).thenReturn(SIMPLE_SCHEMA);
RecordConverter converter = new RecordConverter(table, CONFIG);
RecordConverter converter = new RecordConverter(table, config);

LocalDate expected = LocalDate.of(2023, 11, 15);

Expand All @@ -310,7 +346,7 @@ public void testDateConversion() {
public void testTimeConversion() {
Table table = mock(Table.class);
when(table.schema()).thenReturn(SIMPLE_SCHEMA);
RecordConverter converter = new RecordConverter(table, CONFIG);
RecordConverter converter = new RecordConverter(table, config);

LocalTime expected = LocalTime.of(7, 51, 30, 888_000_000);

Expand Down Expand Up @@ -345,7 +381,7 @@ public void testTimestampWithoutZoneConversion() {
private void convertToTimestamps(Temporal expected, long expectedMillis, TimestampType type) {
Table table = mock(Table.class);
when(table.schema()).thenReturn(SIMPLE_SCHEMA);
RecordConverter converter = new RecordConverter(table, CONFIG);
RecordConverter converter = new RecordConverter(table, config);

List<Object> inputList =
ImmutableList.of(
Expand Down Expand Up @@ -375,7 +411,7 @@ private void convertToTimestamps(Temporal expected, long expectedMillis, Timesta
public void testMissingColumnDetectionMap() {
Table table = mock(Table.class);
when(table.schema()).thenReturn(ID_SCHEMA);
RecordConverter converter = new RecordConverter(table, CONFIG);
RecordConverter converter = new RecordConverter(table, config);

Map<String, Object> data = Maps.newHashMap(createMapData());
data.put("null", null);
Expand Down Expand Up @@ -413,7 +449,7 @@ public void testMissingColumnDetectionMap() {
public void testMissingColumnDetectionMapNested() {
Table table = mock(Table.class);
when(table.schema()).thenReturn(ID_SCHEMA);
RecordConverter converter = new RecordConverter(table, CONFIG);
RecordConverter converter = new RecordConverter(table, config);

Map<String, Object> nestedData = createNestedMapData();
List<SchemaUpdate> addCols = Lists.newArrayList();
Expand Down Expand Up @@ -449,7 +485,7 @@ public void testMissingColumnDetectionMapNested() {
public void testMissingColumnDetectionStruct() {
Table table = mock(Table.class);
when(table.schema()).thenReturn(ID_SCHEMA);
RecordConverter converter = new RecordConverter(table, CONFIG);
RecordConverter converter = new RecordConverter(table, config);

Struct data = createStructData();
List<SchemaUpdate> updates = Lists.newArrayList();
Expand Down Expand Up @@ -487,7 +523,7 @@ public void testEvolveTypeDetectionStruct() {

Table table = mock(Table.class);
when(table.schema()).thenReturn(tableSchema);
RecordConverter converter = new RecordConverter(table, CONFIG);
RecordConverter converter = new RecordConverter(table, config);

Schema valueSchema =
SchemaBuilder.struct().field("ii", Schema.INT64_SCHEMA).field("ff", Schema.FLOAT64_SCHEMA);
Expand Down Expand Up @@ -521,7 +557,7 @@ public void testEvolveTypeDetectionStructNested() {

Table table = mock(Table.class);
when(table.schema()).thenReturn(tableSchema);
RecordConverter converter = new RecordConverter(table, CONFIG);
RecordConverter converter = new RecordConverter(table, config);

Schema structSchema =
SchemaBuilder.struct().field("ii", Schema.INT64_SCHEMA).field("ff", Schema.FLOAT64_SCHEMA);
Expand All @@ -548,7 +584,7 @@ public void testEvolveTypeDetectionStructNested() {
public void testMissingColumnDetectionStructNested() {
Table table = mock(Table.class);
when(table.schema()).thenReturn(ID_SCHEMA);
RecordConverter converter = new RecordConverter(table, CONFIG);
RecordConverter converter = new RecordConverter(table, config);

Struct nestedData = createNestedStructData();
List<SchemaUpdate> addCols = Lists.newArrayList();
Expand Down