diff --git a/README.md b/README.md index 4f55203..7216182 100644 --- a/README.md +++ b/README.md @@ -21,7 +21,7 @@ Your ideas are welcome [here.](https://github.com/plausible/ch/issues/82) ```elixir defp deps do [ - {:ch, "~> 0.2.0"} + {:ch, "~> 0.3.0"} ] end ``` @@ -51,9 +51,7 @@ defaults = [ {:ok, %Ch.Result{rows: [[0], [1], [2]]}} = Ch.query(pid, "SELECT * FROM system.numbers LIMIT 3") - -{:ok, %Ch.Result{rows: [[0], [1], [2]]}} = - Ch.query(pid, "SELECT * FROM system.numbers LIMIT {$0:UInt8}", [3]) + {:ok, %Ch.Result{rows: [[0], [1], [2]]}} = Ch.query(pid, "SELECT * FROM system.numbers LIMIT {limit:UInt8}", %{"limit" => 3}) @@ -61,9 +59,71 @@ defaults = [ Note on datetime encoding in query parameters: -- `%NaiveDateTime{}` is encoded as text to make it assume the column's or ClickHouse server's timezone +- `%NaiveDateTime{}` is encoded as text to make it assume the column's or ClickHouse's timezone + +```elixir +Mix.install([:ch, :tz]) + +{:ok, pid} = Ch.start_link() +naive = ~N[2023-12-16 12:00:00] + +%Ch.Result{rows: [["UTC"]]} = Ch.query!(pid, "SELECT timezone()") + +%Ch.Result{rows: [[~N[2023-12-16 12:00:00]]]} = + Ch.query!(pid, "SELECT {naive:DateTime}", %{"naive" => naive}) + +# https://clickhouse.com/docs/en/operations/settings/settings#session_timezone +%Ch.Result{rows: [["Europe/Berlin"]]} = + Ch.query!(pid, "SELECT timezone()", [], settings: [session_timezone: "Europe/Berlin"]) + +%Ch.Result{rows: [[~N[2023-12-16 11:00:00]]]} = + Ch.query!(pid, "SELECT {naive:DateTime}", %{"naive" => naive}, settings: [session_timezone: "Europe/Berlin"]) + +:ok = Calendar.put_time_zone_database(Tz.TimeZoneDatabase) + +%Ch.Result{rows: [[taipei]]} = + Ch.query!(pid, "SELECT {naive:DateTime('Asia/Taipei')}", %{"naive" => naive}) + +"#DateTime<2023-12-16 12:00:00+08:00 CST Asia/Taipei>" = inspect(taipei) +``` + - `%DateTime{time_zone: "Etc/UTC"}` is encoded as unix timestamp and is treated as UTC timestamp by ClickHouse -- encoding non UTC `%DateTime{}` raises `ArgumentError` + +```elixir +{:ok, pid} = Ch.start_link() + +``` + +- encoding non-UTC `%DateTime{}` requires a timezone database be configured + +```elixir +Mix.install([:ch, :tz]) + +:ok = Calendar.put_time_zone_database(Tz.TimeZoneDatabase) + +utc = ~U[2023-12-16 10:20:51Z] +taipei = DateTime.new!(~D[2023-12-16], ~T[18:20:51], "Asia/Taipei") +berlin = DateTime.new!(~D[2023-12-16], ~T[11:20:51], "Europe/Berlin") + +%Ch.Result{rows: []} = + Ch.query!(pid, "SELECT {utc:DateTime}", %{"utc" => utc}) + +%Ch.Result{rows: []} = + Ch.query!(pid, "SELECT {taipei:DateTime}", %{"taipei" => taipei}) + +%Ch.Result{rows: []} = + Ch.query!(pid, "SELECT {berlin:DateTime}", %{"berlin" => berlin}) + +%Ch.Result{rows: []} = + Ch.query!(pid, "SELECT {utc:DateTime}", %{"utc" => utc}, settings: [session_timezone: "Europe/Berlin"]) + +%Ch.Result{rows: []} = + Ch.query!(pid, "SELECT {taipei:DateTime('UTC')}", %{"ts" => taipei}) + +%Ch.Result{rows: []} = + Ch.query!(pid, "SELECT {taipei:DateTime('Asia/Taipei')}", %{"ts" => taipei}) + +``` #### Insert rows @@ -75,9 +135,6 @@ Ch.query!(pid, "CREATE TABLE IF NOT EXISTS ch_demo(id UInt64) ENGINE Null") %Ch.Result{num_rows: 2} = Ch.query!(pid, "INSERT INTO ch_demo(id) VALUES (0), (1)") -%Ch.Result{num_rows: 2} = - Ch.query!(pid, "INSERT INTO ch_demo(id) VALUES ({$0:UInt8}), ({$1:UInt32})", [0, 1]) - %Ch.Result{num_rows: 2} = Ch.query!(pid, "INSERT INTO ch_demo(id) VALUES ({a:UInt16}), ({b:UInt64})", %{"a" => 0, "b" => 1}) @@ -85,7 +142,7 @@ Ch.query!(pid, "CREATE TABLE IF NOT EXISTS ch_demo(id UInt64) ENGINE Null") Ch.query!(pid, "INSERT INTO ch_demo(id) SELECT number FROM system.numbers LIMIT {limit:UInt8}", %{"limit" => 2}) ``` -#### Insert rows as [RowBinary](https://clickhouse.com/docs/en/interfaces/formats#rowbinary) (efficient) +#### Insert [RowBinary](https://clickhouse.com/docs/en/interfaces/formats#rowbinary) (fast) ```elixir {:ok, pid} = Ch.start_link() @@ -98,20 +155,24 @@ types = [Ch.Types.u64()] # or types = [:u64] -%Ch.Result{num_rows: 2} = - Ch.query!(pid, "INSERT INTO ch_demo(id) FORMAT RowBinary", [[0], [1]], types: types) -``` +rows = [[0], [1], [2]] +row_binary = Ch.RowBinary.encode_rows(rows, types) -Note that RowBinary format encoding requires `:types` option to be provided. +%Ch.Result{num_rows: 3} = + Ch.query!(pid, ["INSERT INTO ch_demo(id) FORMAT RowBinary\n" | row_binary]) +``` Similarly, you can use [`RowBinaryWithNamesAndTypes`](https://clickhouse.com/docs/en/interfaces/formats#rowbinarywithnamesandtypes) which would additionally do something like a type check. ```elixir -sql = "INSERT INTO ch_demo FORMAT RowBinaryWithNamesAndTypes" -opts = [names: ["id"], types: ["UInt64"]] -rows = [[0], [1]] +names = ["id"] +types = ["UInt64"] -%Ch.Result{num_rows: 2} = Ch.query!(pid, sql, rows, opts) +header = Ch.RowBinary.encode_names_and_types(names, types) +row_binary = Ch.RowBinary.encode_rows(rows, types) + +%Ch.Result{num_rows: 3} = + Ch.query!(pid, ["INSERT INTO ch_demo FORMAT RowBinaryWithNamesAndTypes\n", header | row_binary]) ``` #### Insert rows in custom [format](https://clickhouse.com/docs/en/interfaces/formats) @@ -121,10 +182,10 @@ rows = [[0], [1]] Ch.query!(pid, "CREATE TABLE IF NOT EXISTS ch_demo(id UInt64) ENGINE Null") -csv = [0, 1] |> Enum.map(&to_string/1) |> Enum.intersperse(?\n) +csv = "0\n1\n2" -%Ch.Result{num_rows: 2} = - Ch.query!(pid, "INSERT INTO ch_demo(id) FORMAT CSV", csv, encode: false) +%Ch.Result{num_rows: 3} = + Ch.query!(pid, ["INSERT INTO ch_demo(id) FORMAT CSV\n" | csv]) ``` #### Insert rows as chunked RowBinary stream @@ -134,17 +195,31 @@ csv = [0, 1] |> Enum.map(&to_string/1) |> Enum.intersperse(?\n) Ch.query!(pid, "CREATE TABLE IF NOT EXISTS ch_demo(id UInt64) ENGINE Null") -stream = Stream.repeatedly(fn -> [:rand.uniform(100)] end) -chunked = Stream.chunk_every(stream, 100) -encoded = Stream.map(chunked, fn chunk -> Ch.RowBinary.encode_rows(chunk, _types = ["UInt64"]) end) -ten_encoded_chunks = Stream.take(encoded, 10) +row_binary = + Stream.repeatedly(fn -> [:rand.uniform(100)] end) + |> Stream.chunk_every(1_000_000) + |> Stream.map(fn chunk -> Ch.RowBinary.encode_rows(chunk, _types = ["UInt64"]) end) + |> Stream.take(100) -%Ch.Result{num_rows: 1000} = - Ch.query(pid, "INSERT INTO ch_demo(id) FORMAT RowBinary", ten_encoded_chunks, encode: false) +stream = Stream.concat(["INSERT INTO ch_demo(id) FORMAT RowBinary\n"], row_binary) +%Ch.Result{num_rows: 100_000_000} = Ch.query(pid, stream) ``` This query makes a [`transfer-encoding: chunked`](https://en.wikipedia.org/wiki/Chunked_transfer_encoding) HTTP request while unfolding the stream resulting in lower memory usage. +#### Insert rows via [input](https://clickhouse.com/docs/en/sql-reference/table-functions/input) function + +```elixir +{:ok, pid} = Ch.start_link() + +Ch.query!(pid, "CREATE TABLE IF NOT EXISTS ch_demo(id UInt64) ENGINE Null") + +sql = "INSERT INTO ch_demo SELECT id + {ego:Int64} FROM input('id UInt64') FORMAT RowBinary\n" +row_binary = Ch.RowBinary.encode_rows([[1], [2], [3]], ["UInt64"]) + +%Ch.Result{num_rows: 3} = Ch.query!(pid, [sql | row_binary], %{"ego" => -1}) +``` + #### Query with custom [settings](https://clickhouse.com/docs/en/operations/settings/settings) ```elixir @@ -179,13 +254,13 @@ CREATE TABLE ch_nulls ( """) types = ["Nullable(UInt8)", "UInt8", "UInt8"] -inserted_rows = [[nil, nil, nil]] -selected_rows = [[nil, 0, 0]] +rows = [[nil, nil, nil]] +row_binary = Ch.RowBinary.encode_rows(rows, types) %Ch.Result{num_rows: 1} = - Ch.query!(pid, "INSERT INTO ch_nulls(a, b, c) FORMAT RowBinary", inserted_rows, types: types) + Ch.query!(pid, ["INSERT INTO ch_nulls(a, b, c) FORMAT RowBinary\n" | row_binary]) -%Ch.Result{rows: ^selected_rows} = +%Ch.Result{rows: [[nil, 0, 0]]} = Ch.query!(pid, "SELECT * FROM ch_nulls") ``` @@ -197,13 +272,17 @@ However, [`input()`](https://clickhouse.com/docs/en/sql-reference/table-function sql = """ INSERT INTO ch_nulls SELECT * FROM input('a Nullable(UInt8), b Nullable(UInt8), c UInt8') - FORMAT RowBinary\ + FORMAT RowBinary """ -Ch.query!(pid, sql, inserted_rows, types: ["Nullable(UInt8)", "Nullable(UInt8)", "UInt8"]) +types = ["Nullable(UInt8)", "Nullable(UInt8)", "UInt8"] +row_binary = Ch.RowBinary.encode_rows(rows, types) -%Ch.Result{rows: [[0], [10]]} = - Ch.query!(pid, "SELECT b FROM ch_nulls ORDER BY b") +%Ch.Result{num_rows: 1} = + Ch.query!(pid, [sql | row_binary]) + +%Ch.Result{rows: [_before = [nil, 0, 0], _after = [nil, 10, 0]]} = + Ch.query!(pid, "SELECT * FROM ch_nulls ORDER BY b") ``` #### UTF-8 in RowBinary @@ -218,8 +297,10 @@ Ch.query!(pid, "CREATE TABLE ch_utf8(str String) ENGINE Memory") bin = "\x61\xF0\x80\x80\x80b" utf8 = "a�b" +row_binary = Ch.RowBinary.encode(:string, bin) + %Ch.Result{num_rows: 1} = - Ch.query!(pid, "INSERT INTO ch_utf8(str) FORMAT RowBinary", [[bin]], types: ["String"]) + Ch.query!(pid, ["INSERT INTO ch_utf8(str) FORMAT RowBinary\n" | row_binary]) %Ch.Result{rows: [[^utf8]]} = Ch.query!(pid, "SELECT * FROM ch_utf8") @@ -268,7 +349,7 @@ utc = DateTime.utc_now() taipei = DateTime.shift_zone!(utc, "Asia/Taipei") # ** (ArgumentError) non-UTC timezones are not supported for encoding: 2023-04-26 01:49:43.044569+08:00 CST Asia/Taipei -Ch.query!(pid, "INSERT INTO ch_datetimes(datetime) FORMAT RowBinary", [[naive], [utc], [taipei]], types: ["DateTime"]) +Ch.RowBinary.encode_rows([[naive], [utc], [taipei]], ["DateTime"]) ``` ## Benchmarks diff --git a/lib/ch.ex b/lib/ch.ex index a027102..5959ca7 100644 --- a/lib/ch.ex +++ b/lib/ch.ex @@ -29,6 +29,19 @@ defmodule Ch do DBConnection.child_spec(Connection, opts) end + @type statement :: iodata | Enumerable.t() + @type params :: %{String.t() => term} | [{String.t(), term}] + @type option :: + {:settings, Keyword.t()} + | {:database, String.t()} + | {:username, String.t()} + | {:password, String.t()} + | {:command, Ch.Query.command()} + | {:decode, boolean} + | {:headers, String.t()} + + @type options :: [option | DBConnection.connection_option()] + @doc """ Runs a query and returns the result as `{:ok, %Ch.Result{}}` or `{:error, Exception.t()}` if there was a database error. @@ -42,9 +55,8 @@ defmodule Ch do * `:password` - User password """ - @spec query(DBConnection.conn(), iodata, params, Keyword.t()) :: + @spec query(DBConnection.conn(), statement, params, options) :: {:ok, Result.t()} | {:error, Exception.t()} - when params: map | [term] | [row :: [term]] | iodata | Enumerable.t() def query(conn, statement, params \\ [], opts \\ []) do query = Query.build(statement, opts) @@ -57,26 +69,19 @@ defmodule Ch do Runs a query and returns the result or raises `Ch.Error` if there was an error. See `query/4`. """ - @spec query!(DBConnection.conn(), iodata, params, Keyword.t()) :: Result.t() - when params: map | [term] | [row :: [term]] | iodata | Enumerable.t() + @spec query!(DBConnection.conn(), statement, params, options) :: Result.t() def query!(conn, statement, params \\ [], opts \\ []) do query = Query.build(statement, opts) DBConnection.execute!(conn, query, params, opts) end @doc false - @spec stream(DBConnection.t(), iodata, map | [term], Keyword.t()) :: DBConnection.Stream.t() + @spec stream(DBConnection.t(), statement, params, options) :: DBConnection.Stream.t() def stream(conn, statement, params \\ [], opts \\ []) do query = Query.build(statement, opts) DBConnection.stream(conn, query, params, opts) end - @doc false - @spec run(DBConnection.conn(), (DBConnection.t() -> any), Keyword.t()) :: any - def run(conn, f, opts \\ []) when is_function(f, 1) do - DBConnection.run(conn, f, opts) - end - if Code.ensure_loaded?(Ecto.ParameterizedType) do @behaviour Ecto.ParameterizedType diff --git a/lib/ch/query.ex b/lib/ch/query.ex index f321376..ce4a24e 100644 --- a/lib/ch/query.ex +++ b/lib/ch/query.ex @@ -1,16 +1,16 @@ defmodule Ch.Query do @moduledoc "Query struct wrapping the SQL statement." - defstruct [:statement, :command, :encode, :decode] + defstruct [:statement, :command, :decode] - @type t :: %__MODULE__{statement: iodata, command: atom, encode: boolean, decode: boolean} + @type t :: %__MODULE__{statement: Ch.statement(), command: command, decode: boolean} + @type params :: [{String.t(), String.t()}] @doc false - @spec build(iodata, Keyword.t()) :: t + @spec build(Ch.statement(), [Ch.option()]) :: t def build(statement, opts \\ []) do command = Keyword.get(opts, :command) || extract_command(statement) - encode = Keyword.get(opts, :encode, true) decode = Keyword.get(opts, :decode, true) - %__MODULE__{statement: statement, command: command, encode: encode, decode: decode} + %__MODULE__{statement: statement, command: command, decode: decode} end statements = [ @@ -43,6 +43,13 @@ defmodule Ch.Query do {"WATCH", :watch} ] + command_union = + statements + |> Enum.map(fn {_, command} -> command end) + |> Enum.reduce(&{:|, [], [&1, &2]}) + + @type command :: unquote(command_union) + defp extract_command(statement) for {statement, command} <- statements do @@ -54,8 +61,8 @@ defmodule Ch.Query do extract_command(rest) end - defp extract_command([first_segment | _] = statement) do - extract_command(first_segment) || extract_command(IO.iodata_to_binary(statement)) + defp extract_command([first_segment | _]) do + extract_command(first_segment) end defp extract_command(_other), do: nil @@ -64,118 +71,47 @@ end defimpl DBConnection.Query, for: Ch.Query do alias Ch.{Query, Result, RowBinary} - @spec parse(Query.t(), Keyword.t()) :: Query.t() + @spec parse(Query.t(), [Ch.option()]) :: Query.t() def parse(query, _opts), do: query - @spec describe(Query.t(), Keyword.t()) :: Query.t() + @spec describe(Query.t(), [Ch.option()]) :: Query.t() def describe(query, _opts), do: query - @spec encode(Query.t(), params, Keyword.t()) :: {query_params, Mint.Types.headers(), body} - when params: map | [term] | [row :: [term]] | iodata | Enumerable.t(), - query_params: [{String.t(), String.t()}], - body: iodata | Enumerable.t() - - def encode(%Query{command: :insert, encode: false, statement: statement}, data, opts) do - body = - case data do - _ when is_list(data) or is_binary(data) -> [statement, ?\n | data] - _ -> Stream.concat([[statement, ?\n]], data) - end - - {_query_params = [], headers(opts), body} - end - - def encode(%Query{command: :insert, statement: statement}, params, opts) do - cond do - names = Keyword.get(opts, :names) -> - types = Keyword.fetch!(opts, :types) - header = RowBinary.encode_names_and_types(names, types) - data = RowBinary.encode_rows(params, types) - {_query_params = [], headers(opts), [statement, ?\n, header | data]} - - format_row_binary?(statement) -> - types = Keyword.fetch!(opts, :types) - data = RowBinary.encode_rows(params, types) - {_query_params = [], headers(opts), [statement, ?\n | data]} - - true -> - {query_params(params), headers(opts), statement} - end - end - + @spec encode(Query.t(), Ch.params(), [Ch.option()]) :: + {Ch.Query.params(), Mint.Types.headers(), Ch.statement()} def encode(%Query{statement: statement}, params, opts) do - types = Keyword.get(opts, :types) - default_format = if types, do: "RowBinary", else: "RowBinaryWithNamesAndTypes" - format = Keyword.get(opts, :format) || default_format - {query_params(params), [{"x-clickhouse-format", format} | headers(opts)], statement} - end - - defp format_row_binary?(statement) when is_binary(statement) do - statement |> String.trim_trailing() |> String.ends_with?("RowBinary") + format = Keyword.get(opts, :format, "RowBinaryWithNamesAndTypes") + headers = Keyword.get(opts, :headers, []) + {query_params(params), [{"x-clickhouse-format", format} | headers], statement} end - defp format_row_binary?(statement) when is_list(statement) do - statement - |> IO.iodata_to_binary() - |> format_row_binary?() - end - - @spec decode(Query.t(), [response], Keyword.t()) :: Result.t() + @spec decode(Query.t(), [response], [Ch.option()]) :: Result.t() when response: Mint.Types.status() | Mint.Types.headers() | binary - def decode(%Query{command: :insert}, responses, _opts) do - [_status, headers | _data] = responses - - num_rows = - if summary = get_header(headers, "x-clickhouse-summary") do - %{"written_rows" => written_rows} = Jason.decode!(summary) - String.to_integer(written_rows) - end - - %Result{num_rows: num_rows, rows: nil, command: :insert, headers: headers} - end - - def decode(%Query{decode: false, command: command}, responses, _opts) when is_list(responses) do + def decode(%Query{command: command, decode: decode}, responses, _opts) + when is_list(responses) do # TODO potentially fails on x-progress-headers [_status, headers | data] = responses - %Result{rows: data, command: command, headers: headers} - end + format = get_header(headers, "x-clickhouse-format") - def decode(%Query{command: command}, responses, opts) when is_list(responses) do - # TODO potentially fails on x-progress-headers - [_status, headers | data] = responses - - case get_header(headers, "x-clickhouse-format") do - "RowBinary" -> - types = Keyword.fetch!(opts, :types) - rows = data |> IO.iodata_to_binary() |> RowBinary.decode_rows(types) - %Result{num_rows: length(rows), rows: rows, command: command, headers: headers} - - "RowBinaryWithNamesAndTypes" -> + cond do + decode and format == "RowBinaryWithNamesAndTypes" -> rows = data |> IO.iodata_to_binary() |> RowBinary.decode_rows() - %Result{num_rows: length(rows), rows: rows, command: command, headers: headers} - - _other -> - %Result{rows: data, command: command, headers: headers} - end - end + %Result{num_rows: length(rows), rows: rows, command: command} - # TODO merge :stream `decode/3` with "normal" `decode/3` clause above - @spec decode(Query.t(), {:stream, nil, responses}, Keyword.t()) :: responses - when responses: [Mint.Types.response()] - def decode(_query, {:stream, nil, responses}, _opts), do: responses + format == nil -> + num_rows = + if summary = get_header(headers, "x-clickhouse-summary") do + %{"written_rows" => written_rows} = Jason.decode!(summary) + String.to_integer(written_rows) + end - @spec decode(Query.t(), {:stream, [atom], [Mint.Types.response()]}, Keyword.t()) :: [[term]] - def decode(_query, {:stream, types, responses}, _opts) do - decode_stream_data(responses, types) - end + %Result{num_rows: num_rows, command: command} - defp decode_stream_data([{:data, _ref, data} | rest], types) do - [RowBinary.decode_rows(data, types) | decode_stream_data(rest, types)] + true -> + %Result{rows: data, command: command} + end end - defp decode_stream_data([_ | rest], types), do: decode_stream_data(rest, types) - defp decode_stream_data([] = done, _types), do: done - defp get_header(headers, key) do case List.keyfind(headers, key, 0) do {_, value} -> value @@ -183,15 +119,13 @@ defimpl DBConnection.Query, for: Ch.Query do end end - defp query_params(params) when is_map(params) do - Enum.map(params, fn {k, v} -> {"param_#{k}", encode_param(v)} end) + @compile inline: [query_params: 1] + defp query_params(params) do + Enum.map(params, &__MODULE__.query_param/1) end - defp query_params(params) when is_list(params) do - params - |> Enum.with_index() - |> Enum.map(fn {v, idx} -> {"param_$#{idx}", encode_param(v)} end) - end + @doc false + def query_param({k, v}), do: {"param_#{k}", encode_param(v)} defp encode_param(n) when is_integer(n), do: Integer.to_string(n) defp encode_param(f) when is_float(f), do: Float.to_string(f) @@ -266,9 +200,6 @@ defimpl DBConnection.Query, for: Ch.Query do end defp escape_param([], param), do: param - - @spec headers(Keyword.t()) :: Mint.Types.headers() - defp headers(opts), do: Keyword.get(opts, :headers, []) end defimpl String.Chars, for: Ch.Query do diff --git a/lib/ch/result.ex b/lib/ch/result.ex index ddca4df..24ce2fe 100644 --- a/lib/ch/result.ex +++ b/lib/ch/result.ex @@ -2,21 +2,19 @@ defmodule Ch.Result do @moduledoc """ Result struct returned from any successful query. Its fields are: - * `command` - An atom of the query command, for example: `:select`, `:insert`; + * `command` - An atom of the query command, for example: `:select`, `:insert` * `rows` - The result set. One of: - a list of lists, each inner list corresponding to a - row, each element in the inner list corresponds to a column; + row, each element in the inner list corresponds to a column - raw iodata when the response is not automatically decoded, e.g. `x-clickhouse-format: CSV` - * `num_rows` - The number of fetched or affected rows; - * `headers` - The HTTP response headers + * `num_rows` - The number of fetched or affected rows """ defstruct [:command, :num_rows, :rows, :headers] @type t :: %__MODULE__{ - command: atom, + command: Ch.Query.command() | nil, num_rows: non_neg_integer | nil, - rows: [[term]] | iodata | nil, - headers: Mint.Types.headers() + rows: [[term]] | iodata | nil } end