Skip to content

Commit

Permalink
Fixed ReplacingMergeTree EngineSpec parsing: is_deleted column presen…
Browse files Browse the repository at this point in the history
…ce caused error (#357)

* Fixed ReplacingMergeTree EngineSpec parsing: is_deleted column presence caused error

* Preserved is_deleted column in engine spec, added the same fix for ReplicatedReplacingMergeTree
  • Loading branch information
AlexTheKing authored Aug 18, 2024
1 parent ea29fb7 commit 699c0ca
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,8 @@ class AstVisitor extends ClickHouseSQLBaseVisitor[AnyRef] with Logging {
case eg: String if "ReplacingMergeTree" equalsIgnoreCase eg =>
ReplacingMergeTreeEngineSpec(
engine_clause = engineExpr,
version_column = seqToOption(engineArgs).map(_.asInstanceOf[FieldRef]),
version_column = engineArgs.lift(0).map(_.asInstanceOf[FieldRef]),
is_deleted_column = engineArgs.lift(1).map(_.asInstanceOf[FieldRef]),
_sorting_key = tupleIfNeeded(orderByOpt.toList),
_primary_key = tupleIfNeeded(pkOpt.toList),
_partition_key = tupleIfNeeded(partOpt.toList),
Expand All @@ -127,7 +128,8 @@ class AstVisitor extends ClickHouseSQLBaseVisitor[AnyRef] with Logging {
engine_clause = engineExpr,
zk_path = engineArgs.head.asInstanceOf[StringLiteral].value,
replica_name = engineArgs(1).asInstanceOf[StringLiteral].value,
version_column = seqToOption(engineArgs.drop(2)).map(_.asInstanceOf[FieldRef]),
version_column = engineArgs.lift(2).map(_.asInstanceOf[FieldRef]),
is_deleted_column = engineArgs.lift(3).map(_.asInstanceOf[FieldRef]),
_sorting_key = tupleIfNeeded(orderByOpt.toList),
_primary_key = tupleIfNeeded(pkOpt.toList),
_partition_key = tupleIfNeeded(partOpt.toList),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ case class ReplicatedMergeTreeEngineSpec(
case class ReplacingMergeTreeEngineSpec(
engine_clause: String,
version_column: Option[FieldRef] = None,
is_deleted_column: Option[FieldRef] = None,
var _sorting_key: TupleExpr = TupleExpr(List.empty),
var _primary_key: TupleExpr = TupleExpr(List.empty),
var _partition_key: TupleExpr = TupleExpr(List.empty),
Expand All @@ -109,6 +110,7 @@ case class ReplicatedReplacingMergeTreeEngineSpec(
zk_path: String,
replica_name: String,
version_column: Option[FieldRef] = None,
is_deleted_column: Option[FieldRef] = None,
var _sorting_key: TupleExpr = TupleExpr(List.empty),
var _primary_key: TupleExpr = TupleExpr(List.empty),
var _partition_key: TupleExpr = TupleExpr(List.empty),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,21 @@ class SQLParserSuite extends AnyFunSuite {
assert(actual === expected)
}

test("parse ReplacingMergeTree - 3") {
val ddl = "ReplacingMergeTree(ts, is_deleted) " +
"PARTITION BY toYYYYMM(created) ORDER BY id SETTINGS index_granularity = 8192"
val actual = parser.parseEngineClause(ddl)
val expected = ReplacingMergeTreeEngineSpec(
engine_clause = "ReplacingMergeTree(ts, is_deleted)",
version_column = Some(FieldRef("ts")),
is_deleted_column = Some(FieldRef("is_deleted")),
_sorting_key = TupleExpr(FieldRef("id") :: Nil),
_partition_key = TupleExpr(List(FuncExpr("toYYYYMM", List(FieldRef("created"))))),
_settings = Map("index_granularity" -> "8192")
)
assert(actual === expected)
}

test("parse ReplicatedReplacingMergeTree - 1") {
val ddl = "ReplicatedReplacingMergeTree('/clickhouse/tables/{shard}/wj_report/wj_respondent', '{replica}') " +
"PARTITION BY toYYYYMM(created) ORDER BY id SETTINGS index_granularity = 8192"
Expand Down Expand Up @@ -115,6 +130,25 @@ class SQLParserSuite extends AnyFunSuite {
assert(actual === expected)
}

test("parse ReplicatedReplacingMergeTree - 3") {
val ddl = "ReplicatedReplacingMergeTree('/clickhouse/tables/{shard}/wj_report/wj_respondent', '{replica}', " +
"ts, is_deleted) PARTITION BY toYYYYMM(created) ORDER BY id SETTINGS index_granularity = 8192"
val actual = parser.parseEngineClause(ddl)
val expected = ReplicatedReplacingMergeTreeEngineSpec(
engine_clause =
"ReplicatedReplacingMergeTree('/clickhouse/tables/{shard}/wj_report/wj_respondent', '{replica}', " +
"ts, is_deleted)",
zk_path = "/clickhouse/tables/{shard}/wj_report/wj_respondent",
replica_name = "{replica}",
version_column = Some(FieldRef("ts")),
is_deleted_column = Some(FieldRef("is_deleted")),
_sorting_key = TupleExpr(FieldRef("id") :: Nil),
_partition_key = TupleExpr(List(FuncExpr("toYYYYMM", List(FieldRef("created"))))),
_settings = Map("index_granularity" -> "8192")
)
assert(actual === expected)
}

test("parse Distributed - 1") {
val ddl = "Distributed('default', 'wj_report', 'wj_respondent_local')"
val actual = parser.parseEngineClause(ddl)
Expand Down

0 comments on commit 699c0ca

Please sign in to comment.