Skip to content

Commit

Permalink
Add config to force optional cols on auto create and schema evolution (
Browse files Browse the repository at this point in the history
  • Loading branch information
bryanck authored Oct 25, 2023
1 parent 1cdb6d2 commit e778e06
Show file tree
Hide file tree
Showing 8 changed files with 142 additions and 98 deletions.
57 changes: 29 additions & 28 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,34 +20,35 @@ The zip archive will be found under `./kafka-connect-runtime/build/distributions

# Configuration

| Property | Description |
|--------------------------------------------|---------------------------------------------------------------------------------------------------------------|
| iceberg.tables | Comma-separated list of destination tables |
| iceberg.tables.dynamic-enabled | Set to `true` to route to a table specified in `routeField` instead of using `routeRegex`, default is `false` |
| iceberg.tables.route-field | For multi-table fan-out, the name of the field used to route records to tables |
| iceberg.tables.default-commit-branch | Default branch for commits, main is used if not specified |
| iceberg.tables.default-id-columns | Default comma-separated list of columns that identify a row in tables (primary key) |
| iceberg.tables.default-partition-by | Default comma-separated list of partition fields to use when creating tables |
| iceberg.tables.cdc-field | Name of the field containing the CDC operation, `I`, `U`, or `D`, default is none |
| iceberg.tables.upsert-mode-enabled | Set to `true` to enable upsert mode, default is `false` |
| iceberg.tables.auto-create-enabled | Set to `true` to automatically create destination tables, default is `false` |
| iceberg.tables.evolve-schema-enabled | Set to `true` to add any missing record fields to the table schema, default is `false` |
| iceberg.tables.auto-create-props.* | Properties set on new tables during auto-create |
| iceberg.tables.write-props.* | Properties passed through to Iceberg writer initialization, these take precedence |
| iceberg.table.\<table name\>.commit-branch | Table-specific branch for commits, use `iceberg.tables.default-commit-branch` if not specified |
| iceberg.table.\<table name\>.id-columns | Comma-separated list of columns that identify a row in the table (primary key) |
| iceberg.table.\<table name\>.partition-by | Comma-separated list of partition fields to use when creating the table |
| iceberg.table.\<table name\>.route-regex | The regex used to match a record's `routeField` to a table |
| iceberg.control.topic | Name of the control topic, default is `control-iceberg` |
| iceberg.control.group-id | Name of the consumer group to store offsets, default is `cg-control-<connector name>` |
| iceberg.control.commit.interval-ms | Commit interval in msec, default is 300,000 (5 min) |
| iceberg.control.commit.timeout-ms | Commit timeout interval in msec, default is 30,000 (30 sec) |
| iceberg.control.commit.threads | Number of threads to use for commits, default is (cores * 2) |
| iceberg.catalog | Name of the catalog, default is `iceberg` |
| iceberg.catalog.* | Properties passed through to Iceberg catalog initialization |
| iceberg.hadoop-conf-dir | If specified, Hadoop config files in this directory will be loaded |
| iceberg.hadoop.* | Properties passed through to the Hadoop configuration |
| iceberg.kafka.* | Properties passed through to control topic Kafka client initialization |
| Property | Description |
|--------------------------------------------|------------------------------------------------------------------------------------------------------------------|
| iceberg.tables | Comma-separated list of destination tables |
| iceberg.tables.dynamic-enabled | Set to `true` to route to a table specified in `routeField` instead of using `routeRegex`, default is `false` |
| iceberg.tables.route-field | For multi-table fan-out, the name of the field used to route records to tables |
| iceberg.tables.default-commit-branch | Default branch for commits, main is used if not specified |
| iceberg.tables.default-id-columns | Default comma-separated list of columns that identify a row in tables (primary key) |
| iceberg.tables.default-partition-by | Default comma-separated list of partition fields to use when creating tables |
| iceberg.tables.cdc-field | Name of the field containing the CDC operation, `I`, `U`, or `D`, default is none |
| iceberg.tables.upsert-mode-enabled | Set to `true` to enable upsert mode, default is `false` |
| iceberg.tables.auto-create-enabled | Set to `true` to automatically create destination tables, default is `false` |
| iceberg.tables.evolve-schema-enabled | Set to `true` to add any missing record fields to the table schema, default is `false` |
| iceberg.tables.schema-force-optional | Set to `true` to set columns as optional during table create and evolution, default is `false` to respect schema |
| iceberg.tables.auto-create-props.* | Properties set on new tables during auto-create |
| iceberg.tables.write-props.* | Properties passed through to Iceberg writer initialization, these take precedence |
| iceberg.table.\<table name\>.commit-branch | Table-specific branch for commits, use `iceberg.tables.default-commit-branch` if not specified |
| iceberg.table.\<table name\>.id-columns | Comma-separated list of columns that identify a row in the table (primary key) |
| iceberg.table.\<table name\>.partition-by | Comma-separated list of partition fields to use when creating the table |
| iceberg.table.\<table name\>.route-regex | The regex used to match a record's `routeField` to a table |
| iceberg.control.topic | Name of the control topic, default is `control-iceberg` |
| iceberg.control.group-id | Name of the consumer group to store offsets, default is `cg-control-<connector name>` |
| iceberg.control.commit.interval-ms | Commit interval in msec, default is 300,000 (5 min) |
| iceberg.control.commit.timeout-ms | Commit timeout interval in msec, default is 30,000 (30 sec) |
| iceberg.control.commit.threads | Number of threads to use for commits, default is (cores * 2) |
| iceberg.catalog | Name of the catalog, default is `iceberg` |
| iceberg.catalog.* | Properties passed through to Iceberg catalog initialization |
| iceberg.hadoop-conf-dir | If specified, Hadoop config files in this directory will be loaded |
| iceberg.hadoop.* | Properties passed through to the Hadoop configuration |
| iceberg.kafka.* | Properties passed through to control topic Kafka client initialization |

If `iceberg.tables.dynamic-enabled` is `false` (the default) then you must specify `iceberg.tables`. If
`iceberg.tables.dynamic-enabled` is `true` then you must specify `iceberg.tables.route-field` which will
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ public class IcebergSinkConfig extends AbstractConfig {
"iceberg.tables.upsert-mode-enabled";
private static final String TABLES_AUTO_CREATE_ENABLED_PROP =
"iceberg.tables.auto-create-enabled";
private static final String TABLES_SCHEMA_FORCE_OPTIONAL_PROP =
"iceberg.tables.schema-force-optional";
private static final String TABLES_EVOLVE_SCHEMA_ENABLED_PROP =
"iceberg.tables.evolve-schema-enabled";
private static final String CONTROL_TOPIC_PROP = "iceberg.control.topic";
Expand Down Expand Up @@ -164,6 +166,12 @@ private static ConfigDef newConfigDef() {
false,
Importance.MEDIUM,
"Set to true to automatically create destination tables, false otherwise");
configDef.define(
TABLES_SCHEMA_FORCE_OPTIONAL_PROP,
Type.BOOLEAN,
false,
Importance.MEDIUM,
"Set to true to set columns as optional during table create and evolution, false to respect schema");
configDef.define(
TABLES_EVOLVE_SCHEMA_ENABLED_PROP,
Type.BOOLEAN,
Expand Down Expand Up @@ -415,6 +423,10 @@ public boolean autoCreateEnabled() {
return getBoolean(TABLES_AUTO_CREATE_ENABLED_PROP);
}

public boolean schemaForceOptional() {
return getBoolean(TABLES_SCHEMA_FORCE_OPTIONAL_PROP);
}

public boolean evolveSchemaEnabled() {
return getBoolean(TABLES_EVOLVE_SCHEMA_ENABLED_PROP);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public IcebergWriter(Table table, String tableName, IcebergSinkConfig config) {

private void initNewWriter() {
this.writer = Utilities.createTableWriter(table, tableName, config);
this.recordConverter = new RecordConverter(table, config.jsonConverter());
this.recordConverter = new RecordConverter(table, config);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,9 @@ public RecordWriter createWriter(
Table autoCreateTable(String tableName, SinkRecord sample) {
StructType structType;
if (sample.valueSchema() == null) {
structType = SchemaUtils.inferIcebergType(sample.value()).asStructType();
structType = SchemaUtils.inferIcebergType(sample.value(), config).asStructType();
} else {
structType = SchemaUtils.toIcebergType(sample.valueSchema()).asStructType();
structType = SchemaUtils.toIcebergType(sample.valueSchema(), config).asStructType();
}

org.apache.iceberg.Schema schema = new org.apache.iceberg.Schema(structType.fields());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static java.util.stream.Collectors.toList;

import com.fasterxml.jackson.databind.ObjectMapper;
import io.tabular.iceberg.connect.IcebergSinkConfig;
import io.tabular.iceberg.connect.data.SchemaUpdate.AddColumn;
import io.tabular.iceberg.connect.data.SchemaUpdate.MakeOptional;
import io.tabular.iceberg.connect.data.SchemaUpdate.UpdateType;
Expand Down Expand Up @@ -66,7 +67,6 @@
import org.apache.iceberg.types.Types.TimestampType;
import org.apache.iceberg.util.DateTimeUtil;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.json.JsonConverter;

public class RecordConverter {

Expand All @@ -80,13 +80,13 @@ public class RecordConverter {

private final Schema tableSchema;
private final NameMapping nameMapping;
private final JsonConverter jsonConverter;
private final IcebergSinkConfig config;
private final Map<Integer, Map<String, NestedField>> structNameMap = Maps.newHashMap();

public RecordConverter(Table table, JsonConverter jsonConverter) {
public RecordConverter(Table table, IcebergSinkConfig config) {
this.tableSchema = table.schema();
this.nameMapping = createNameMapping(table);
this.jsonConverter = jsonConverter;
this.config = config;
}

public Record convert(Object data) {
Expand Down Expand Up @@ -175,7 +175,7 @@ private GenericRecord convertToStruct(
if (schemaUpdateConsumer != null && recordFieldValue != null) {
String parentFieldName =
structFieldId < 0 ? null : tableSchema.findColumnName(structFieldId);
Type type = SchemaUtils.inferIcebergType(recordFieldValue);
Type type = SchemaUtils.inferIcebergType(recordFieldValue, config);
schemaUpdateConsumer.accept(new AddColumn(parentFieldName, recordFieldName, type));
}
} else {
Expand Down Expand Up @@ -208,7 +208,7 @@ private GenericRecord convertToStruct(
if (schemaUpdateConsumer != null) {
String parentFieldName =
structFieldId < 0 ? null : tableSchema.findColumnName(structFieldId);
Type type = SchemaUtils.toIcebergType(recordField.schema());
Type type = SchemaUtils.toIcebergType(recordField.schema(), config);
schemaUpdateConsumer.accept(
new AddColumn(parentFieldName, recordField.name(), type));
}
Expand Down Expand Up @@ -375,7 +375,7 @@ protected String convertString(Object value) {
return MAPPER.writeValueAsString(value);
} else if (value instanceof Struct) {
Struct struct = (Struct) value;
byte[] data = jsonConverter.fromConnectData(null, struct.schema(), struct);
byte[] data = config.jsonConverter().fromConnectData(null, struct.schema(), struct);
return new String(data, StandardCharsets.UTF_8);
}
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,18 +210,24 @@ private static Pair<String, Integer> transformArgPair(String argsStr) {
return Pair.of(parts[0].trim(), Integer.parseInt(parts[1].trim()));
}

public static Type toIcebergType(Schema valueSchema) {
return new SchemaGenerator().toIcebergType(valueSchema);
public static Type toIcebergType(Schema valueSchema, IcebergSinkConfig config) {
return new SchemaGenerator(config).toIcebergType(valueSchema);
}

public static Type inferIcebergType(Object value) {
return new SchemaGenerator().inferIcebergType(value);
public static Type inferIcebergType(Object value, IcebergSinkConfig config) {
return new SchemaGenerator(config).inferIcebergType(value);
}

static class SchemaGenerator {

private int fieldId = 1;
private final IcebergSinkConfig config;

SchemaGenerator(IcebergSinkConfig config) {
this.config = config;
}

@SuppressWarnings("checkstyle:CyclomaticComplexity")
Type toIcebergType(Schema valueSchema) {
switch (valueSchema.type()) {
case BOOLEAN:
Expand Down Expand Up @@ -255,15 +261,15 @@ Type toIcebergType(Schema valueSchema) {
return DoubleType.get();
case ARRAY:
Type elementType = toIcebergType(valueSchema.valueSchema());
if (valueSchema.valueSchema().isOptional()) {
if (config.schemaForceOptional() || valueSchema.valueSchema().isOptional()) {
return ListType.ofOptional(nextId(), elementType);
} else {
return ListType.ofRequired(nextId(), elementType);
}
case MAP:
Type keyType = toIcebergType(valueSchema.keySchema());
Type valueType = toIcebergType(valueSchema.valueSchema());
if (valueSchema.valueSchema().isOptional()) {
if (config.schemaForceOptional() || valueSchema.valueSchema().isOptional()) {
return MapType.ofOptional(nextId(), nextId(), keyType, valueType);
} else {
return MapType.ofRequired(nextId(), nextId(), keyType, valueType);
Expand All @@ -275,7 +281,7 @@ Type toIcebergType(Schema valueSchema) {
field ->
NestedField.of(
nextId(),
field.schema().isOptional(),
config.schemaForceOptional() || field.schema().isOptional(),
field.name(),
toIcebergType(field.schema())))
.collect(toList());
Expand Down
Loading

0 comments on commit e778e06

Please sign in to comment.