diff --git a/CHANGELOG.md b/CHANGELOG.md index deaa40c..973e6dd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ - drop `:headers` from `%Ch.Result{}` but add `:data` https://github.com/plausible/ch/pull/144 - fix query string escaping for `\t`, `\\`, and `\n` https://github.com/plausible/ch/pull/147 - make `Ch.stream/4` emit `%Ch.Result{data: iodata}` https://github.com/plausible/ch/pull/148 +- make `Ch.stream/4` collectable and remove stream support in `Ch.query/4` https://github.com/plausible/ch/pull/149 ## 0.2.2 (2023-12-23) diff --git a/README.md b/README.md index 7d84aeb..cf9b1a6 100644 --- a/README.md +++ b/README.md @@ -120,7 +120,7 @@ row_binary = Ch.RowBinary.encode_rows(rows, types) ]) ``` -#### Insert rows in custom [format](https://clickhouse.com/docs/en/interfaces/formats) +#### Insert custom [format](https://clickhouse.com/docs/en/interfaces/formats) ```elixir {:ok, pid} = Ch.start_link() @@ -133,26 +133,20 @@ csv = "0\n1" Ch.query!(pid, ["INSERT INTO ch_demo(id) FORMAT CSV\n" | csv]) ``` -#### Insert rows as [chunked](https://en.wikipedia.org/wiki/Chunked_transfer_encoding) RowBinary stream +#### Insert [chunked](https://en.wikipedia.org/wiki/Chunked_transfer_encoding) RowBinary stream ```elixir {:ok, pid} = Ch.start_link() Ch.query!(pid, "CREATE TABLE IF NOT EXISTS ch_demo(id UInt64) ENGINE Null") -row_binary = +DBConnection.run(pid, fn conn -> Stream.repeatedly(fn -> [:rand.uniform(100)] end) |> Stream.chunk_every(100_000) |> Stream.map(fn chunk -> Ch.RowBinary.encode_rows(chunk, _types = ["UInt64"]) end) - |> Stream.take(10) - -%Ch.Result{num_rows: 1_000_000} = - Ch.query(pid, - Stream.concat( - ["INSERT INTO ch_demo(id) FORMAT RowBinary\n"], - row_binary - ) - ) + |> Stream.take(10) + |> Enum.into(Ch.stream(conn, "INSERT INTO ch_demo(id) FORMAT RowBinary\n")) +end) ``` #### Insert rows via [input](https://clickhouse.com/docs/en/sql-reference/table-functions/input) function diff --git a/bench/insert.exs b/bench/insert.exs index b852196..7c38b36 100644 --- a/bench/insert.exs +++ b/bench/insert.exs @@ -43,12 +43,12 @@ Benchee.run( |> Stream.run() end, "insert stream" => fn rows -> - stream = + DBConnection.run(conn, fn conn -> rows |> Stream.chunk_every(60_000) |> Stream.map(fn chunk -> RowBinary.encode_rows(chunk, types) end) - - Ch.query!(conn, Stream.concat([statement], stream)) + |> Enum.into(Ch.stream(conn, statement)) + end) end }, inputs: %{ diff --git a/lib/ch.ex b/lib/ch.ex index 91b7336..b5c60ca 100644 --- a/lib/ch.ex +++ b/lib/ch.ex @@ -102,10 +102,10 @@ defmodule Ch do @doc """ Returns a stream for a query on a connection. """ - @spec stream(DBConnection.t(), statement, params, [query_option]) :: DBConnection.Stream.t() + @spec stream(DBConnection.t(), statement, params, [query_option]) :: Ch.Stream.t() def stream(conn, statement, params \\ [], opts \\ []) do query = Query.build(statement, opts) - DBConnection.stream(conn, query, params, opts) + %Ch.Stream{conn: conn, query: query, params: params, opts: opts} end if Code.ensure_loaded?(Ecto.ParameterizedType) do diff --git a/lib/ch/connection.ex b/lib/ch/connection.ex index 18442be..a40f2c8 100644 --- a/lib/ch/connection.ex +++ b/lib/ch/connection.ex @@ -183,20 +183,46 @@ defmodule Ch.Connection do end @impl true - def handle_execute(%Query{statement: statement} = query, params, opts, conn) do + def handle_execute(%Query{statement: statement} = query, {:stream, params}, opts, conn) do {query_params, extra_headers} = params path = path(conn, query_params, opts) headers = headers(conn, extra_headers, opts) - result = - if is_list(statement) or is_binary(statement) do - request(conn, "POST", path, headers, statement, opts) - else - request_chunked(conn, "POST", path, headers, statement, opts) + with {:ok, conn, ref} <- send_request(conn, "POST", path, headers, :stream) do + case HTTP.stream_request_body(conn, ref, statement) do + {:ok, conn} -> {:ok, query, ref, conn} + {:error, conn, reason} -> {:disconnect, reason, conn} end + end + end + + def handle_execute(%Query{} = query, {:stream, ref, body}, opts, conn) do + case HTTP.stream_request_body(conn, ref, body) do + {:ok, conn} -> + case body do + :eof -> + with {:ok, conn, responses} <- receive_full_response(conn, timeout(conn, opts)) do + {:ok, query, responses, conn} + end + + _other -> + {:ok, query, ref, conn} + end + + {:error, conn, reason} -> + {:disconnect, reason, conn} + end + end + + def handle_execute(%Query{statement: statement} = query, params, opts, conn) + when is_list(statement) or is_binary(statement) do + {query_params, extra_headers} = params + + path = path(conn, query_params, opts) + headers = headers(conn, extra_headers, opts) - with {:ok, conn, responses} <- result do + with {:ok, conn, responses} <- request(conn, "POST", path, headers, statement, opts) do {:ok, query, responses, conn} end end @@ -226,40 +252,6 @@ defmodule Ch.Connection do end end - @spec request_chunked( - conn, - method :: String.t(), - path :: String.t(), - Mint.Types.headers(), - body :: Enumerable.t(), - [Ch.query_option()] - ) :: - {:ok, conn, [response]} - | {:error, Error.t(), conn} - | {:disconnect, Mint.Types.error(), conn} - def request_chunked(conn, method, path, headers, stream, opts) do - with {:ok, conn, ref} <- send_request(conn, method, path, headers, :stream), - {:ok, conn} <- stream_body(conn, ref, stream), - do: receive_full_response(conn, timeout(conn, opts)) - end - - @spec stream_body(conn, Mint.Types.request_ref(), Enumerable.t()) :: - {:ok, conn} | {:disconnect, Mint.Types.error(), conn} - defp stream_body(conn, ref, stream) do - result = - stream - |> Stream.concat([:eof]) - |> Enum.reduce_while({:ok, conn}, fn - chunk, {:ok, conn} -> {:cont, HTTP.stream_request_body(conn, ref, chunk)} - _chunk, {:error, _conn, _reason} = error -> {:halt, error} - end) - - case result do - {:ok, _conn} = ok -> ok - {:error, conn, reason} -> {:disconnect, reason, conn} - end - end - # stacktrace is a bit cleaner with this function inlined @compile inline: [send_request: 5] defp send_request(conn, method, path, headers, body) do diff --git a/lib/ch/query.ex b/lib/ch/query.ex index 2f850d4..3fa2de8 100644 --- a/lib/ch/query.ex +++ b/lib/ch/query.ex @@ -78,12 +78,26 @@ defimpl DBConnection.Query, for: Ch.Query do @spec encode(Query.t(), Ch.params(), [Ch.query_option()]) :: {Ch.Query.params(), Mint.Types.headers()} - def encode(%Query{}, params, opts) do + def encode(%Query{}, params, opts) when is_list(params) or is_map(params) do format = Keyword.get(opts, :format, "RowBinaryWithNamesAndTypes") headers = Keyword.get(opts, :headers, []) {query_params(params), [{"x-clickhouse-format", format} | headers]} end + # stream: insert init + @spec encode(Query.t(), {:stream, Ch.params()}, [Ch.query_option()]) :: + {:stream, {Ch.Query.params(), Mint.Types.headers()}} + def encode(query, {:stream, params}, opts) do + {:stream, encode(query, params, opts)} + end + + # stream: insert data chunk + @spec encode(Query.t(), {:stream, Mint.Types.request_ref(), iodata | :eof}, [Ch.query_option()]) :: + {:stream, Mint.Types.request_ref(), iodata | :eof} + def encode(_query, {:stream, ref, data}, _opts) do + {:stream, ref, data} + end + @spec decode(Query.t(), [response], [Ch.query_option()]) :: Result.t() when response: Mint.Types.status() | Mint.Types.headers() | binary def decode(%Query{command: command}, responses, opts) when is_list(responses) do @@ -110,8 +124,10 @@ defimpl DBConnection.Query, for: Ch.Query do end end - # stream result + # stream: select result def decode(_query, %Result{} = result, _opts), do: result + # stream: insert result + def decode(_query, ref, _opts) when is_reference(ref), do: ref defp get_header(headers, key) do case List.keyfind(headers, key, 0) do diff --git a/lib/ch/stream.ex b/lib/ch/stream.ex new file mode 100644 index 0000000..6569782 --- /dev/null +++ b/lib/ch/stream.ex @@ -0,0 +1,43 @@ +defmodule Ch.Stream do + @moduledoc false + + @derive {Inspect, only: []} + defstruct [:conn, :ref, :query, :params, :opts] + + @type t :: %__MODULE__{ + conn: DBConnection.conn(), + ref: Mint.Types.request_ref() | nil, + query: Ch.Query.t(), + params: Ch.params(), + opts: [Ch.query_option()] + } + + defimpl Enumerable do + def reduce(stream, acc, fun) do + %Ch.Stream{conn: conn, query: query, params: params, opts: opts} = stream + stream = %DBConnection.Stream{conn: conn, query: query, params: params, opts: opts} + DBConnection.reduce(stream, acc, fun) + end + + def member?(_, _), do: {:error, __MODULE__} + def count(_), do: {:error, __MODULE__} + def slice(_), do: {:error, __MODULE__} + end + + defimpl Collectable do + def into(stream) do + %Ch.Stream{conn: conn, query: query, params: params, opts: opts} = stream + ref = DBConnection.execute!(conn, query, {:stream, params}, opts) + {%{stream | ref: ref}, &collect/2} + end + + defp collect(%{conn: conn, query: query, ref: ref} = stream, {:cont, data}) do + ^ref = DBConnection.execute!(conn, query, {:stream, ref, data}) + stream + end + + defp collect(%{conn: conn, query: query, ref: ref}, eof) when eof in [:halt, :done] do + DBConnection.execute!(conn, query, {:stream, ref, :eof}) + end + end +end diff --git a/test/ch/connection_test.exs b/test/ch/connection_test.exs index 729d873..c045756 100644 --- a/test/ch/connection_test.exs +++ b/test/ch/connection_test.exs @@ -258,11 +258,13 @@ defmodule Ch.ConnectionTest do |> Stream.chunk_every(2) |> Stream.map(fn chunk -> RowBinary.encode_rows(chunk, types) end) - assert {:ok, %{num_rows: 3}} = - Ch.query( - conn, - Stream.concat(["insert into #{table}(a, b) format RowBinary\n"], row_binary) - ) + assert %{num_rows: 3} = + DBConnection.run(conn, fn conn -> + Enum.into( + row_binary, + Ch.stream(conn, "insert into #{table}(a, b) format RowBinary\n") + ) + end) assert {:ok, %{rows: rows}} = Ch.query(conn, "select * from {table:Identifier}", %{"table" => table}) diff --git a/test/ch/faults_test.exs b/test/ch/faults_test.exs index bdf8d34..dc09bb8 100644 --- a/test/ch/faults_test.exs +++ b/test/ch/faults_test.exs @@ -356,14 +356,17 @@ defmodule Ch.FaultsTest do :ok = :gen_tcp.close(mint) spawn_link(fn -> - assert {:error, %Mint.TransportError{reason: :closed}} = - Ch.query( - conn, - Stream.concat( - ["insert into unknown_table(a,b) format RowBinary\n"], - stream - ) - ) + try do + DBConnection.run(conn, fn conn -> + Enum.into( + stream, + Ch.stream(conn, "insert into unknown_table(a,b) format RowBinary\n") + ) + end) + rescue + e in [DBConnection.ConnectionError, Mint.TransportError] -> + assert Exception.message(e) =~ "closed" + end end) # reconnect @@ -374,16 +377,14 @@ defmodule Ch.FaultsTest do :ok = :gen_tcp.send(mint, intercept_packets(clickhouse)) spawn_link(fn -> - assert {:error, %Ch.Error{code: 60, message: message}} = - Ch.query( - conn, - Stream.concat( - ["insert into unknown_table(a,b) format RowBinary\n"], - stream - ) - ) - - assert message =~ ~r/UNKNOWN_TABLE/ + assert_raise Ch.Error, ~r/UNKNOWN_TABLE/, fn -> + DBConnection.run(conn, fn conn -> + Enum.into( + stream, + Ch.stream(conn, "insert into unknown_table(a,b) format RowBinary\n") + ) + end) + end send(test, :done) end) @@ -417,14 +418,17 @@ defmodule Ch.FaultsTest do :ok = :gen_tcp.send(mint, intercept_packets(clickhouse)) spawn_link(fn -> - assert {:error, %Mint.TransportError{reason: :closed}} = - Ch.query( - conn, - Stream.concat( - ["insert into unknown_table(a,b) format RowBinary\n"], - stream - ) - ) + try do + DBConnection.run(conn, fn conn -> + Enum.into( + stream, + Ch.stream(conn, "insert into unknown_table(a,b) format RowBinary\n") + ) + end) + rescue + e in [DBConnection.ConnectionError, Mint.TransportError] -> + assert Exception.message(e) =~ "closed" + end end) # close after first packet from mint arrives @@ -439,16 +443,14 @@ defmodule Ch.FaultsTest do :ok = :gen_tcp.send(mint, intercept_packets(clickhouse)) spawn_link(fn -> - assert {:error, %Ch.Error{code: 60, message: message}} = - Ch.query( - conn, - Stream.concat( - ["insert into unknown_table(a,b) format RowBinary\n"], - stream - ) - ) - - assert message =~ ~r/UNKNOWN_TABLE/ + assert_raise Ch.Error, ~r/UNKNOWN_TABLE/, fn -> + DBConnection.run(conn, fn conn -> + Enum.into( + stream, + Ch.stream(conn, "insert into unknown_table(a,b) format RowBinary\n") + ) + end) + end send(test, :done) end) diff --git a/test/ch/stream_test.exs b/test/ch/stream_test.exs index 405398d..7d0fa84 100644 --- a/test/ch/stream_test.exs +++ b/test/ch/stream_test.exs @@ -6,7 +6,7 @@ defmodule Ch.StreamTest do {:ok, conn: start_supervised!({Ch, database: Ch.Test.database()})} end - describe "Ch.stream/4" do + describe "enumerable Ch.stream/4" do test "emits %Ch.Result{}", %{conn: conn} do count = 1_000_000 @@ -37,4 +37,21 @@ defmodule Ch.StreamTest do end end end + + describe "collectable Ch.stream/4" do + test "inserts chunks", %{conn: conn} do + Ch.query!(conn, "create table collect_stream(i UInt64) engine Memory") + + assert %Ch.Result{command: :insert, num_rows: 1_000_000} = + DBConnection.run(conn, fn conn -> + Stream.repeatedly(fn -> [:rand.uniform(100)] end) + |> Stream.chunk_every(100_000) + |> Stream.map(fn chunk -> RowBinary.encode_rows(chunk, _types = ["UInt64"]) end) + |> Stream.take(10) + |> Enum.into(Ch.stream(conn, "insert into collect_stream(i) format RowBinary\n")) + end) + + assert Ch.query!(conn, "select count(*) from collect_stream").rows == [[1_000_000]] + end + end end