Skip to content

Commit

Permalink
new-api
Browse files Browse the repository at this point in the history
  • Loading branch information
ruslandoga committed Dec 16, 2023
1 parent 3b49063 commit 2f1d1ef
Show file tree
Hide file tree
Showing 4 changed files with 182 additions and 167 deletions.
155 changes: 118 additions & 37 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ Your ideas are welcome [here.](https://github.com/plausible/ch/issues/82)
```elixir
defp deps do
[
{:ch, "~> 0.2.0"}
{:ch, "~> 0.3.0"}
]
end
```
Expand Down Expand Up @@ -51,19 +51,79 @@ defaults = [

{:ok, %Ch.Result{rows: [[0], [1], [2]]}} =
Ch.query(pid, "SELECT * FROM system.numbers LIMIT 3")

{:ok, %Ch.Result{rows: [[0], [1], [2]]}} =
Ch.query(pid, "SELECT * FROM system.numbers LIMIT {$0:UInt8}", [3])


{:ok, %Ch.Result{rows: [[0], [1], [2]]}} =
Ch.query(pid, "SELECT * FROM system.numbers LIMIT {limit:UInt8}", %{"limit" => 3})
```

Note on datetime encoding in query parameters:

- `%NaiveDateTime{}` is encoded as text to make it assume the column's or ClickHouse server's timezone
- `%NaiveDateTime{}` is encoded as text to make it assume the column's or ClickHouse's timezone

```elixir
Mix.install([:ch, :tz])

{:ok, pid} = Ch.start_link()
naive = ~N[2023-12-16 12:00:00]

%Ch.Result{rows: [["UTC"]]} = Ch.query!(pid, "SELECT timezone()")

%Ch.Result{rows: [[~N[2023-12-16 12:00:00]]]} =
Ch.query!(pid, "SELECT {naive:DateTime}", %{"naive" => naive})

# https://clickhouse.com/docs/en/operations/settings/settings#session_timezone
%Ch.Result{rows: [["Europe/Berlin"]]} =
Ch.query!(pid, "SELECT timezone()", [], settings: [session_timezone: "Europe/Berlin"])

%Ch.Result{rows: [[~N[2023-12-16 11:00:00]]]} =
Ch.query!(pid, "SELECT {naive:DateTime}", %{"naive" => naive}, settings: [session_timezone: "Europe/Berlin"])

:ok = Calendar.put_time_zone_database(Tz.TimeZoneDatabase)

%Ch.Result{rows: [[taipei]]} =
Ch.query!(pid, "SELECT {naive:DateTime('Asia/Taipei')}", %{"naive" => naive})

"#DateTime<2023-12-16 12:00:00+08:00 CST Asia/Taipei>" = inspect(taipei)
```

- `%DateTime{time_zone: "Etc/UTC"}` is encoded as unix timestamp and is treated as UTC timestamp by ClickHouse
- encoding non UTC `%DateTime{}` raises `ArgumentError`

```elixir
{:ok, pid} = Ch.start_link()

```

- encoding non-UTC `%DateTime{}` requires a timezone database be configured

```elixir
Mix.install([:ch, :tz])

:ok = Calendar.put_time_zone_database(Tz.TimeZoneDatabase)

utc = ~U[2023-12-16 10:20:51Z]
taipei = DateTime.new!(~D[2023-12-16], ~T[18:20:51], "Asia/Taipei")
berlin = DateTime.new!(~D[2023-12-16], ~T[11:20:51], "Europe/Berlin")

%Ch.Result{rows: []} =
Ch.query!(pid, "SELECT {utc:DateTime}", %{"utc" => utc})

%Ch.Result{rows: []} =
Ch.query!(pid, "SELECT {taipei:DateTime}", %{"taipei" => taipei})

%Ch.Result{rows: []} =
Ch.query!(pid, "SELECT {berlin:DateTime}", %{"berlin" => berlin})

%Ch.Result{rows: []} =
Ch.query!(pid, "SELECT {utc:DateTime}", %{"utc" => utc}, settings: [session_timezone: "Europe/Berlin"])

%Ch.Result{rows: []} =
Ch.query!(pid, "SELECT {taipei:DateTime('UTC')}", %{"ts" => taipei})

%Ch.Result{rows: []} =
Ch.query!(pid, "SELECT {taipei:DateTime('Asia/Taipei')}", %{"ts" => taipei})

```

#### Insert rows

Expand All @@ -75,17 +135,14 @@ Ch.query!(pid, "CREATE TABLE IF NOT EXISTS ch_demo(id UInt64) ENGINE Null")
%Ch.Result{num_rows: 2} =
Ch.query!(pid, "INSERT INTO ch_demo(id) VALUES (0), (1)")

%Ch.Result{num_rows: 2} =
Ch.query!(pid, "INSERT INTO ch_demo(id) VALUES ({$0:UInt8}), ({$1:UInt32})", [0, 1])

%Ch.Result{num_rows: 2} =
Ch.query!(pid, "INSERT INTO ch_demo(id) VALUES ({a:UInt16}), ({b:UInt64})", %{"a" => 0, "b" => 1})

%Ch.Result{num_rows: 2} =
Ch.query!(pid, "INSERT INTO ch_demo(id) SELECT number FROM system.numbers LIMIT {limit:UInt8}", %{"limit" => 2})
```

#### Insert rows as [RowBinary](https://clickhouse.com/docs/en/interfaces/formats#rowbinary) (efficient)
#### Insert [RowBinary](https://clickhouse.com/docs/en/interfaces/formats#rowbinary) (fast)

```elixir
{:ok, pid} = Ch.start_link()
Expand All @@ -98,20 +155,24 @@ types = [Ch.Types.u64()]
# or
types = [:u64]

%Ch.Result{num_rows: 2} =
Ch.query!(pid, "INSERT INTO ch_demo(id) FORMAT RowBinary", [[0], [1]], types: types)
```
rows = [[0], [1], [2]]
row_binary = Ch.RowBinary.encode_rows(rows, types)

Note that RowBinary format encoding requires `:types` option to be provided.
%Ch.Result{num_rows: 3} =
Ch.query!(pid, ["INSERT INTO ch_demo(id) FORMAT RowBinary\n" | row_binary])
```

Similarly, you can use [`RowBinaryWithNamesAndTypes`](https://clickhouse.com/docs/en/interfaces/formats#rowbinarywithnamesandtypes) which would additionally do something like a type check.

```elixir
sql = "INSERT INTO ch_demo FORMAT RowBinaryWithNamesAndTypes"
opts = [names: ["id"], types: ["UInt64"]]
rows = [[0], [1]]
names = ["id"]
types = ["UInt64"]

%Ch.Result{num_rows: 2} = Ch.query!(pid, sql, rows, opts)
header = Ch.RowBinary.encode_names_and_types(names, types)
row_binary = Ch.RowBinary.encode_rows(rows, types)

%Ch.Result{num_rows: 3} =
Ch.query!(pid, ["INSERT INTO ch_demo FORMAT RowBinaryWithNamesAndTypes\n", header | row_binary])
```

#### Insert rows in custom [format](https://clickhouse.com/docs/en/interfaces/formats)
Expand All @@ -121,10 +182,10 @@ rows = [[0], [1]]

Ch.query!(pid, "CREATE TABLE IF NOT EXISTS ch_demo(id UInt64) ENGINE Null")

csv = [0, 1] |> Enum.map(&to_string/1) |> Enum.intersperse(?\n)
csv = "0\n1\n2"

%Ch.Result{num_rows: 2} =
Ch.query!(pid, "INSERT INTO ch_demo(id) FORMAT CSV", csv, encode: false)
%Ch.Result{num_rows: 3} =
Ch.query!(pid, ["INSERT INTO ch_demo(id) FORMAT CSV\n" | csv])
```

#### Insert rows as chunked RowBinary stream
Expand All @@ -134,17 +195,31 @@ csv = [0, 1] |> Enum.map(&to_string/1) |> Enum.intersperse(?\n)

Ch.query!(pid, "CREATE TABLE IF NOT EXISTS ch_demo(id UInt64) ENGINE Null")

stream = Stream.repeatedly(fn -> [:rand.uniform(100)] end)
chunked = Stream.chunk_every(stream, 100)
encoded = Stream.map(chunked, fn chunk -> Ch.RowBinary.encode_rows(chunk, _types = ["UInt64"]) end)
ten_encoded_chunks = Stream.take(encoded, 10)
row_binary =
Stream.repeatedly(fn -> [:rand.uniform(100)] end)
|> Stream.chunk_every(1_000_000)
|> Stream.map(fn chunk -> Ch.RowBinary.encode_rows(chunk, _types = ["UInt64"]) end)
|> Stream.take(100)

%Ch.Result{num_rows: 1000} =
Ch.query(pid, "INSERT INTO ch_demo(id) FORMAT RowBinary", ten_encoded_chunks, encode: false)
stream = Stream.concat(["INSERT INTO ch_demo(id) FORMAT RowBinary\n"], row_binary)
%Ch.Result{num_rows: 100_000_000} = Ch.query(pid, stream)
```

This query makes a [`transfer-encoding: chunked`](https://en.wikipedia.org/wiki/Chunked_transfer_encoding) HTTP request while unfolding the stream resulting in lower memory usage.

#### Insert rows via [input](https://clickhouse.com/docs/en/sql-reference/table-functions/input) function

```elixir
{:ok, pid} = Ch.start_link()

Ch.query!(pid, "CREATE TABLE IF NOT EXISTS ch_demo(id UInt64) ENGINE Null")

sql = "INSERT INTO ch_demo SELECT id + {ego:Int64} FROM input('id UInt64') FORMAT RowBinary\n"
row_binary = Ch.RowBinary.encode_rows([[1], [2], [3]], ["UInt64"])

%Ch.Result{num_rows: 3} = Ch.query!(pid, [sql | row_binary], %{"ego" => -1})
```

#### Query with custom [settings](https://clickhouse.com/docs/en/operations/settings/settings)

```elixir
Expand Down Expand Up @@ -179,13 +254,13 @@ CREATE TABLE ch_nulls (
""")

types = ["Nullable(UInt8)", "UInt8", "UInt8"]
inserted_rows = [[nil, nil, nil]]
selected_rows = [[nil, 0, 0]]
rows = [[nil, nil, nil]]
row_binary = Ch.RowBinary.encode_rows(rows, types)

%Ch.Result{num_rows: 1} =
Ch.query!(pid, "INSERT INTO ch_nulls(a, b, c) FORMAT RowBinary", inserted_rows, types: types)
Ch.query!(pid, ["INSERT INTO ch_nulls(a, b, c) FORMAT RowBinary\n" | row_binary])

%Ch.Result{rows: ^selected_rows} =
%Ch.Result{rows: [[nil, 0, 0]]} =
Ch.query!(pid, "SELECT * FROM ch_nulls")
```

Expand All @@ -197,13 +272,17 @@ However, [`input()`](https://clickhouse.com/docs/en/sql-reference/table-function
sql = """
INSERT INTO ch_nulls
SELECT * FROM input('a Nullable(UInt8), b Nullable(UInt8), c UInt8')
FORMAT RowBinary\
FORMAT RowBinary
"""

Ch.query!(pid, sql, inserted_rows, types: ["Nullable(UInt8)", "Nullable(UInt8)", "UInt8"])
types = ["Nullable(UInt8)", "Nullable(UInt8)", "UInt8"]
row_binary = Ch.RowBinary.encode_rows(rows, types)

%Ch.Result{rows: [[0], [10]]} =
Ch.query!(pid, "SELECT b FROM ch_nulls ORDER BY b")
%Ch.Result{num_rows: 1} =
Ch.query!(pid, [sql | row_binary])

%Ch.Result{rows: [_before = [nil, 0, 0], _after = [nil, 10, 0]]} =
Ch.query!(pid, "SELECT * FROM ch_nulls ORDER BY b")
```

#### UTF-8 in RowBinary
Expand All @@ -218,8 +297,10 @@ Ch.query!(pid, "CREATE TABLE ch_utf8(str String) ENGINE Memory")
bin = "\x61\xF0\x80\x80\x80b"
utf8 = "a�b"

row_binary = Ch.RowBinary.encode(:string, bin)

%Ch.Result{num_rows: 1} =
Ch.query!(pid, "INSERT INTO ch_utf8(str) FORMAT RowBinary", [[bin]], types: ["String"])
Ch.query!(pid, ["INSERT INTO ch_utf8(str) FORMAT RowBinary\n" | row_binary])

%Ch.Result{rows: [[^utf8]]} =
Ch.query!(pid, "SELECT * FROM ch_utf8")
Expand Down Expand Up @@ -268,7 +349,7 @@ utc = DateTime.utc_now()
taipei = DateTime.shift_zone!(utc, "Asia/Taipei")

# ** (ArgumentError) non-UTC timezones are not supported for encoding: 2023-04-26 01:49:43.044569+08:00 CST Asia/Taipei
Ch.query!(pid, "INSERT INTO ch_datetimes(datetime) FORMAT RowBinary", [[naive], [utc], [taipei]], types: ["DateTime"])
Ch.RowBinary.encode_rows([[naive], [utc], [taipei]], ["DateTime"])
```

## Benchmarks
Expand Down
27 changes: 16 additions & 11 deletions lib/ch.ex
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,19 @@ defmodule Ch do
DBConnection.child_spec(Connection, opts)
end

@type statement :: iodata | Enumerable.t()
@type params :: %{String.t() => term} | [{String.t(), term}]
@type option ::
{:settings, Keyword.t()}
| {:database, String.t()}
| {:username, String.t()}
| {:password, String.t()}
| {:command, Ch.Query.command()}
| {:decode, boolean}
| {:headers, String.t()}

@type options :: [option | DBConnection.connection_option()]

@doc """
Runs a query and returns the result as `{:ok, %Ch.Result{}}` or
`{:error, Exception.t()}` if there was a database error.
Expand All @@ -42,9 +55,8 @@ defmodule Ch do
* `:password` - User password
"""
@spec query(DBConnection.conn(), iodata, params, Keyword.t()) ::
@spec query(DBConnection.conn(), statement, params, options) ::
{:ok, Result.t()} | {:error, Exception.t()}
when params: map | [term] | [row :: [term]] | iodata | Enumerable.t()
def query(conn, statement, params \\ [], opts \\ []) do
query = Query.build(statement, opts)

Expand All @@ -57,26 +69,19 @@ defmodule Ch do
Runs a query and returns the result or raises `Ch.Error` if
there was an error. See `query/4`.
"""
@spec query!(DBConnection.conn(), iodata, params, Keyword.t()) :: Result.t()
when params: map | [term] | [row :: [term]] | iodata | Enumerable.t()
@spec query!(DBConnection.conn(), statement, params, options) :: Result.t()
def query!(conn, statement, params \\ [], opts \\ []) do
query = Query.build(statement, opts)
DBConnection.execute!(conn, query, params, opts)
end

@doc false
@spec stream(DBConnection.t(), iodata, map | [term], Keyword.t()) :: DBConnection.Stream.t()
@spec stream(DBConnection.t(), statement, params, options) :: DBConnection.Stream.t()
def stream(conn, statement, params \\ [], opts \\ []) do
query = Query.build(statement, opts)
DBConnection.stream(conn, query, params, opts)
end

@doc false
@spec run(DBConnection.conn(), (DBConnection.t() -> any), Keyword.t()) :: any
def run(conn, f, opts \\ []) when is_function(f, 1) do
DBConnection.run(conn, f, opts)
end

if Code.ensure_loaded?(Ecto.ParameterizedType) do
@behaviour Ecto.ParameterizedType

Expand Down
Loading

0 comments on commit 2f1d1ef

Please sign in to comment.