Skip to content

Commit

Permalink
Fix parsing bug for partition spec config (#147)
Browse files Browse the repository at this point in the history
  • Loading branch information
bryanck authored Nov 4, 2023
1 parent b264544 commit 4d52833
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.Properties;
import java.util.regex.Pattern;
import org.apache.iceberg.IcebergBuild;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
Expand Down Expand Up @@ -100,6 +101,8 @@ public class IcebergSinkConfig extends AbstractConfig {
public static final int SCHEMA_UPDATE_RETRIES = 2; // 3 total attempts
public static final int CREATE_TABLE_RETRIES = 2; // 3 total attempts

@VisibleForTesting static final String COMMA_NO_PARENS_REGEX = ",(?![^()]*+\\))";

public static final ConfigDef CONFIG_DEF = newConfigDef();

public static String version() {
Expand Down Expand Up @@ -349,11 +352,11 @@ public TableSinkConfig tableConfig(String tableName) {
Pattern routeRegex = routeRegexStr == null ? null : Pattern.compile(routeRegexStr);

String idColumnsStr = tableConfig.getOrDefault(ID_COLUMNS, tablesDefaultIdColumns());
List<String> idColumns = stringToList(idColumnsStr);
List<String> idColumns = stringToList(idColumnsStr, ",");

String partitionByStr =
tableConfig.getOrDefault(PARTITION_BY, tablesDefaultPartitionBy());
List<String> partitionBy = stringToList(partitionByStr);
List<String> partitionBy = stringToList(partitionByStr, COMMA_NO_PARENS_REGEX);

String commitBranch =
tableConfig.getOrDefault(COMMIT_BRANCH, tablesDefaultCommitBranch());
Expand All @@ -362,12 +365,13 @@ public TableSinkConfig tableConfig(String tableName) {
});
}

private List<String> stringToList(String value) {
@VisibleForTesting
static List<String> stringToList(String value, String regex) {
if (value == null || value.isEmpty()) {
return ImmutableList.of();
}

return Arrays.stream(value.split(",")).map(String::trim).collect(toList());
return Arrays.stream(value.split(regex)).map(String::trim).collect(toList());
}

public String tablesCdcField() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

import java.util.List;
import java.util.Map;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.kafka.common.config.ConfigException;
Expand Down Expand Up @@ -57,4 +58,34 @@ public void testGetDefault() {
IcebergSinkConfig config = new IcebergSinkConfig(props);
assertThat(config.commitIntervalMs()).isEqualTo(300_000);
}

@Test
public void testStringToList() {
List<String> result = IcebergSinkConfig.stringToList(null, ",");
assertThat(result).isEmpty();

result = IcebergSinkConfig.stringToList("", ",");
assertThat(result).isEmpty();

result = IcebergSinkConfig.stringToList("one ", ",");
assertThat(result).contains("one");

result = IcebergSinkConfig.stringToList("one, two", ",");
assertThat(result).contains("one", "two");

result = IcebergSinkConfig.stringToList("bucket(id, 4)", ",");
assertThat(result).contains("bucket(id", "4)");

result =
IcebergSinkConfig.stringToList("bucket(id, 4)", IcebergSinkConfig.COMMA_NO_PARENS_REGEX);
assertThat(result).contains("bucket(id, 4)");

result =
IcebergSinkConfig.stringToList(
"bucket(id, 4), type", IcebergSinkConfig.COMMA_NO_PARENS_REGEX);
assertThat(result).contains("bucket(id, 4)", "type");
}

@Test
public void testStringWithParensToList() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,14 @@
import java.time.LocalTime;
import java.time.OffsetDateTime;
import java.util.List;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Table;
import org.apache.iceberg.UpdateSchema;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.transforms.Transform;
import org.apache.iceberg.transforms.Transforms;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types.BinaryType;
import org.apache.iceberg.types.Types.BooleanType;
Expand Down Expand Up @@ -157,13 +160,17 @@ public void testCreatePartitionSpec() {
"s");
PartitionSpec spec = SchemaUtils.createPartitionSpec(SCHEMA_FOR_SPEC, partitionFields);
assertThat(spec.isPartitioned()).isTrue();
assertThat(spec.fields()).anyMatch(val -> val.transform().toString().startsWith("year"));
assertThat(spec.fields()).anyMatch(val -> val.transform().toString().startsWith("month"));
assertThat(spec.fields()).anyMatch(val -> val.transform().toString().startsWith("day"));
assertThat(spec.fields()).anyMatch(val -> val.transform().toString().startsWith("hour"));
assertThat(spec.fields()).anyMatch(val -> val.transform().toString().startsWith("bucket"));
assertThat(spec.fields()).anyMatch(val -> val.transform().toString().startsWith("truncate"));
assertThat(spec.fields()).anyMatch(val -> val.transform().toString().startsWith("identity"));
assertThat(spec.fields()).anyMatch(val -> matchingTransform(val, Transforms.year()));
assertThat(spec.fields()).anyMatch(val -> matchingTransform(val, Transforms.month()));
assertThat(spec.fields()).anyMatch(val -> matchingTransform(val, Transforms.day()));
assertThat(spec.fields()).anyMatch(val -> matchingTransform(val, Transforms.hour()));
assertThat(spec.fields()).anyMatch(val -> matchingTransform(val, Transforms.bucket(4)));
assertThat(spec.fields()).anyMatch(val -> matchingTransform(val, Transforms.truncate(10)));
assertThat(spec.fields()).anyMatch(val -> matchingTransform(val, Transforms.identity()));
}

boolean matchingTransform(PartitionField partitionField, Transform<?, ?> expectedTransform) {
return partitionField.transform().equals(expectedTransform);
}

@ParameterizedTest
Expand Down

0 comments on commit 4d52833

Please sign in to comment.