Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for tuple ClickHouse #29715

Merged
merged 15 commits into from
Jan 2, 2024
Merged
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
* Adding support for LowCardinality DataType in ClickHouse (Java) ([#29533](https://github.com/apache/beam/pull/29533)).
* Added support for handling bad records to KafkaIO (Java) ([#29546](https://github.com/apache/beam/pull/29546))
* Add support for generating text embeddings in MLTransform for Vertex AI and Hugging Face Hub models.([#29564](https://github.com/apache/beam/pull/29564))
* Adding support for Tuples DataType in ClickHouse (Java) ([Tuple Support](https://github.com/apache/beam/pull/29715)).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can combine this with LowCardinality DataType above, e.g.

Adding support for LowCardinality and Tuples DataType in ClickHouse (Java) ([#29533](https://github.com/apache/beam/pull/29533), [#29715](https://github.com/apache/beam/pull/29715)).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Release cut is today so this may not get into 2.53.0. Could leave it as is and fix when the PR is finalized

* NATS IO connector added (Go) ([#29000](https://github.com/apache/beam/issues/29000)).

## New Features / Improvements
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -112,7 +113,8 @@
* </table>
*
* Nullable row columns are supported through Nullable type in ClickHouse. Low cardinality hint is
* supported through LowCardinality DataType in ClickHouse.
* supported through LowCardinality DataType in ClickHouse supported through Tuple DataType in
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider put the documentation Tuple to the table above. Broken sentence here currently.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please check this sentence

* ClickHouse see: Example in ClickHouseIOTest.testComplexTupleType.
*
* <p>Nested rows should be unnested using {@link Select#flattenedSchema()}. Type casting should be
* done using {@link org.apache.beam.sdk.schemas.transforms.Cast} before {@link ClickHouseIO}.
Expand Down Expand Up @@ -475,6 +477,15 @@ abstract static class Builder<T> {
}
}

private static String tuplePreprocessing(String payload) {
List<String> l =
Arrays.stream(payload.trim().split(","))
.map(s -> s.trim().replaceAll(" +", "':;"))
.collect(Collectors.toList());
String content =
String.join(",", l).trim().replaceAll("Tuple\\(", "Tuple('").replaceAll(",", ",'");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The caller has a condition if (type.toLowerCase().trim().startsWith("tuple(")) { below, so here payload always (case insensitive) starts with "tuple(", so here the replace of "Tuple\(" won't work

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The condition is on with lower case, but the actual parsing is on the original string

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean below when " if (type.toLowerCase().trim().startsWith("tuple(")) {" the this preprocessing will be executed.

So Tuple( (with backslash) is considered here but not the condition in the caller there. Coild this cause issue?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right. Would you mind adding some comments about the proprocessing rule and example, for example, sth like // Tuple(a String, b Integer) -> ...

return content;
}
/**
* Returns {@link TableSchema} for a given table.
*
Expand All @@ -498,7 +509,13 @@ public static TableSchema getTableSchema(String jdbcUrl, String table) {
String defaultTypeStr = rs.getString("default_type");
String defaultExpression = rs.getString("default_expression");

ColumnType columnType = ColumnType.parse(type);
ColumnType columnType = null;
if (type.toLowerCase().trim().startsWith("tuple(")) {
String content = tuplePreprocessing(type);
columnType = ColumnType.parse(content);
} else {
columnType = ColumnType.parse(type);
}
DefaultType defaultType = DefaultType.parse(defaultTypeStr).orElse(null);

Object defaultValue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@
import com.clickhouse.client.ClickHousePipedOutputStream;
import com.clickhouse.client.data.BinaryStreamUtils;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import org.apache.beam.sdk.io.clickhouse.TableSchema.ColumnType;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.RowWithStorage;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Charsets;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import org.joda.time.Days;
Expand Down Expand Up @@ -146,6 +148,20 @@ static void writeValue(ClickHouseOutputStream stream, ColumnType columnType, Obj
case BOOL:
BinaryStreamUtils.writeBoolean(stream, (Boolean) value);
break;
case TUPLE:
RowWithStorage rowValues = (RowWithStorage) value;
List<Object> tupleValues = rowValues.getValues();
Collection<ColumnType> columnTypesList = columnType.tupleTypes().values();
int index = 0;
for (ColumnType ct : columnTypesList) {
if (ct.nullable()) {
writeNullableValue(stream, ct, tupleValues.get(index));
} else {
writeValue(stream, ct, tupleValues.get(index));
}
index++;
}
break;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.logicaltypes.FixedBytes;
import org.checkerframework.checker.nullness.qual.Nullable;
Expand Down Expand Up @@ -111,6 +112,14 @@ public static Schema.FieldType getEquivalentFieldType(ColumnType columnType) {
return Schema.FieldType.STRING;
case BOOL:
return Schema.FieldType.BOOLEAN;
case TUPLE:
List<Schema.Field> fields =
columnType.tupleTypes().entrySet().stream()
.map(x -> Schema.Field.of(x.getKey(), Schema.FieldType.DATETIME))
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @mzitnik,
Shouldn't each key inside a tuple be mapped to their respective data type ? Any reason for being DATETIME?

Copy link

@wattache wattache Feb 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi,

This quick adaptation seems to work:

case TUPLE:
     List<TableSchema.Column> columns = columnType.tupleTypes().entrySet().stream().map(
                              x -> TableSchema.Column.of(x.getKey(), x.getValue())
                         ).collect(Collectors.toList());
     TableSchema tupleSchema = TableSchema.of(columns.toArray(new TableSchema.Column[0]));
     Schema tupleAsRowSchema = getEquivalentSchema(tupleSchema);
     return Schema.FieldType.row(tupleAsRowSchema);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi,

Also, it seems that Array(Tuple(*)) is not supported when calling ClickHouseIO.getTableSchema.

This seems to fix the issue :
if (type.toLowerCase().trim().startsWith("tuple(")) { String content = tuplePreprocessing(type); columnType = TableSchema.ColumnType.parse(content); } else if (type.toLowerCase().trim().startsWith("array(tuple(")) { String content = tuplePreprocessing(type); columnType = TableSchema.ColumnType.parse(content); }

Copy link
Contributor Author

@mzitnik mzitnik Feb 28, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@erube and @wattache, thanks for the feedback i will fork and also add some tests to cover Array(Tuple(*))

.collect(Collectors.toList());
Schema.Field[] array = fields.toArray(new Schema.Field[fields.size()]);
Schema schema = Schema.of(array);
return Schema.FieldType.row(schema);
}

// not possible, errorprone checks for exhaustive switch
Expand Down Expand Up @@ -168,7 +177,9 @@ public enum TypeName {
// Composite type
ARRAY,
// Primitive type
BOOL
BOOL,
// Composite type
TUPLE
}

/**
Expand Down Expand Up @@ -208,6 +219,7 @@ public abstract static class ColumnType implements Serializable {
public static final ColumnType UINT32 = ColumnType.of(TypeName.UINT32);
public static final ColumnType UINT64 = ColumnType.of(TypeName.UINT64);
public static final ColumnType BOOL = ColumnType.of(TypeName.BOOL);
public static final ColumnType TUPLE = ColumnType.of(TypeName.TUPLE);

// ClickHouse doesn't allow nested nullables, so boolean flag is enough
public abstract boolean nullable();
Expand All @@ -220,6 +232,8 @@ public abstract static class ColumnType implements Serializable {

public abstract @Nullable ColumnType arrayElementType();

public abstract @Nullable Map<String, ColumnType> tupleTypes();

public ColumnType withNullable(boolean nullable) {
return toBuilder().nullable(nullable).build();
}
Expand Down Expand Up @@ -265,6 +279,14 @@ public static ColumnType array(ColumnType arrayElementType) {
.build();
}

public static ColumnType tuple(Map<String, ColumnType> elements) {
return ColumnType.builder()
.typeName(TypeName.TUPLE)
.nullable(false)
.tupleTypes(elements)
.build();
}

/**
* Parse string with ClickHouse type to {@link ColumnType}.
*
Expand Down Expand Up @@ -339,6 +361,8 @@ abstract static class Builder {

public abstract Builder fixedStringSize(Integer size);

public abstract Builder tupleTypes(Map<String, ColumnType> tupleElements);

public abstract ColumnType build();
}
}
Expand Down
49 changes: 49 additions & 0 deletions sdks/java/io/clickhouse/src/main/javacc/ColumnTypeParser.jj
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@
*/
options {
IGNORE_CASE=true;
DEBUG_PARSER = false;
DEBUG_LOOKAHEAD = false;
DEBUG_TOKEN_MANAGER = false;
STATIC = false;
}

PARSER_BEGIN(ColumnTypeParser)
Expand Down Expand Up @@ -99,6 +103,9 @@ TOKEN :
| < EQ : "=" >
| < BOOL : "BOOL" >
| < LOWCARDINALITY : "LOWCARDINALITY" >
| < TUPLE : "TUPLE" >
| < COLON : ":" >
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is COLON and SEMI_COLON for tuple or another feature?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using that since i have some ambiguity in parsing (looking for other options), but for now decided to leave it that way

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it's get added to SDK then there is expectation of backward compatibility. So I would prefer to keep consistent with ClickHouse's SQL syntax

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a temporary solution since there is some ambiguity when using javacc having difficulty to detecting between tokens and strings

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed COLON and SEMI_ COLON

| < SEMI_COLON : ";" >
}

public ColumnType columnType() :
Expand All @@ -113,6 +120,7 @@ public ColumnType columnType() :
| ct = array()
| ct = nullable()
| ct = lowcardenality()
| ct = tuple()
)
{
return ct;
Expand Down Expand Up @@ -263,6 +271,33 @@ private Map<String, Integer> enumElements() :
}
}

private Map.Entry<String, ColumnType> tupleElement() :
{
String key;
ColumnType value;
Token token;
}
{
( (key = string() ) <COLON> <SEMI_COLON> ( value = columnType() ) ) {
return Maps.immutableEntry(key, value);
}
}

private Map<String, ColumnType> tupleElements() :
{
Map.Entry<String, ColumnType> el;
List<Map.Entry<String, ColumnType>> entries = Lists.newArrayList();
}
{
(
( el = tupleElement() { entries.add(el); } )
( <COMMA> ( el = tupleElement() { entries.add(el); } ) )*
)
{
return ImmutableMap.copyOf(entries);
}
}

private ColumnType enum_() :
{
Map<String, Integer> elements;
Expand All @@ -289,4 +324,18 @@ private ColumnType lowcardenality() :
(
(<LOWCARDINALITY> <LPAREN> (ct = primitive()) <RPAREN>) { return ct; }
)
}

private ColumnType tuple() :
{
Map<String, ColumnType> elements;
}
{
(
(<TUPLE> <LPAREN> ( elements = tupleElements() ) <RPAREN>)
{
return ColumnType.tuple(elements);
}
)

}
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,84 @@ public void testArrayOfArrayOfInt64() throws Exception {
assertEquals(15L, sum0);
}

@Test
public void testTupleType() throws Exception {
Schema tupleSchema =
Schema.of(
Schema.Field.of("f0", FieldType.STRING), Schema.Field.of("f1", FieldType.BOOLEAN));
Schema schema = Schema.of(Schema.Field.of("t0", FieldType.row(tupleSchema)));
Row row1Tuple = Row.withSchema(tupleSchema).addValue("tuple").addValue(true).build();

Row row1 = Row.withSchema(schema).addValue(row1Tuple).build();

executeSql(
"CREATE TABLE test_named_tuples (" + "t0 Tuple(`f0` String, `f1` Bool)" + ") ENGINE=Log");

pipeline.apply(Create.of(row1).withRowSchema(schema)).apply(write("test_named_tuples"));

pipeline.run().waitUntilFinish();

try (ResultSet rs = executeQuery("SELECT * FROM test_named_tuples")) {
rs.next();
assertEquals("('tuple',true)", rs.getString("t0"));
}

try (ResultSet rs = executeQuery("SELECT t0.f0 as f0, t0.f1 as f1 FROM test_named_tuples")) {
rs.next();
assertEquals("tuple", rs.getString("f0"));
assertEquals("true", rs.getString("f1"));
}
}

@Test
public void testComplexTupleType() throws Exception {
Schema sizeSchema =
Schema.of(
Schema.Field.of("width", FieldType.INT64.withNullable(true)),
Schema.Field.of("height", FieldType.INT64.withNullable(true)));

Schema browserSchema =
Schema.of(
Schema.Field.of("name", FieldType.STRING.withNullable(true)),
Schema.Field.of("size", FieldType.row(sizeSchema)),
Schema.Field.of("version", FieldType.STRING.withNullable(true)));

Schema propSchema =
Schema.of(
Schema.Field.of("browser", FieldType.row(browserSchema)),
Schema.Field.of("deviceCategory", FieldType.STRING.withNullable(true)));

Schema schema = Schema.of(Schema.Field.of("prop", FieldType.row(propSchema)));

Row sizeRow = Row.withSchema(sizeSchema).addValue(10L).addValue(20L).build();
Row browserRow =
Row.withSchema(browserSchema).addValue("test").addValue(sizeRow).addValue("1.0.0").build();
Row propRow = Row.withSchema(propSchema).addValue(browserRow).addValue("mobile").build();
Row row1 = Row.withSchema(schema).addValue(propRow).build();

executeSql(
"CREATE TABLE test_named_complex_tuples ("
+ "`prop` Tuple(`browser` Tuple(`name` Nullable(String),`size` Tuple(`width` Nullable(Int64), `height` Nullable(Int64)),`version` Nullable(String)),`deviceCategory` Nullable(String))"
+ ") ENGINE=Log");

pipeline.apply(Create.of(row1).withRowSchema(schema)).apply(write("test_named_complex_tuples"));

pipeline.run().waitUntilFinish();

try (ResultSet rs = executeQuery("SELECT * FROM test_named_complex_tuples")) {
rs.next();
assertEquals("(('test',(10,20),'1.0.0'),'mobile')", rs.getString("prop"));
}

try (ResultSet rs =
executeQuery(
"SELECT prop.browser.name as name, prop.browser.size as size FROM test_named_complex_tuples")) {
rs.next();
assertEquals("test", rs.getString("name"));
assertEquals("(10,20)", rs.getString("size"));
}
}

@Test
public void testPrimitiveTypes() throws Exception {
Schema schema =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import static org.junit.Assert.assertEquals;

import java.util.HashMap;
import java.util.Map;
import org.apache.beam.sdk.io.clickhouse.TableSchema.ColumnType;
import org.apache.beam.sdk.schemas.Schema;
Expand Down Expand Up @@ -196,4 +197,55 @@ public void testEquivalentSchema() {

assertEquals(expected, TableSchema.getEquivalentSchema(tableSchema));
}

@Test
public void testParseTupleSingle() {
Map<String, ColumnType> m1 = new HashMap<>();
m1.put("s", ColumnType.STRING);
ColumnType columnType01 = ColumnType.parse("Tuple('s':;String)");
assertEquals(ColumnType.tuple(m1), columnType01);
}

@Test
public void testParseTupleDouble() {
Map<String, ColumnType> m2 = new HashMap<>();
m2.put("a1", ColumnType.STRING);
m2.put("b", ColumnType.BOOL);
ColumnType columnType02 = ColumnType.parse("Tuple('a1':;String,'b':;Bool)");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is :; a standard syntax for ClickHouse? https://clickhouse.com/docs/en/sql-reference/data-types/tuple seems not mentioning this pattern.

Copy link
Contributor Author

@mzitnik mzitnik Dec 13, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Abacn Removed :;

assertEquals(ColumnType.tuple(m2), columnType02);
}

@Test
public void testTupleNested() {
Map<String, ColumnType> m1 = new HashMap<>();
m1.put("a", ColumnType.STRING);
Map<String, ColumnType> m3 = new HashMap<>();
m3.put("a", ColumnType.STRING);
m3.put("b", ColumnType.BOOL);
m3.put("c", ColumnType.tuple(m1));
ColumnType columnType03 =
ColumnType.parse("Tuple('a':;String,'b':;Bool,'c':;Tuple('a':;String))");
assertEquals(ColumnType.tuple(m3), columnType03);
}

@Test
public void testTupleComplex() {
Map<String, ColumnType> m1 = new HashMap<>();
m1.put("width", ColumnType.INT64.withNullable(true));
m1.put("height", ColumnType.INT64.withNullable(true));

Map<String, ColumnType> m2 = new HashMap<>();
m2.put("name", ColumnType.STRING.withNullable(true));
m2.put("size", ColumnType.tuple(m1));
m2.put("version", ColumnType.STRING.withNullable(true));

Map<String, ColumnType> m3 = new HashMap<>();
m3.put("browser", ColumnType.tuple(m2));
m3.put("deviceCategory", ColumnType.STRING.withNullable(true));

ColumnType columnType03 =
ColumnType.parse(
"Tuple('browser':;Tuple('name':;Nullable(String),'size':;Tuple('width':;Nullable(Int64),'height':;Nullable(Int64)),'version':;Nullable(String)),'deviceCategory':; Nullable(String))");
assertEquals(ColumnType.tuple(m3), columnType03);
}
}
Loading