diff --git a/CHANGELOG.md b/CHANGELOG.md index deaa40c..973e6dd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ - drop `:headers` from `%Ch.Result{}` but add `:data` https://github.com/plausible/ch/pull/144 - fix query string escaping for `\t`, `\\`, and `\n` https://github.com/plausible/ch/pull/147 - make `Ch.stream/4` emit `%Ch.Result{data: iodata}` https://github.com/plausible/ch/pull/148 +- make `Ch.stream/4` collectable and remove stream support in `Ch.query/4` https://github.com/plausible/ch/pull/149 ## 0.2.2 (2023-12-23) diff --git a/README.md b/README.md index 7d84aeb..cf9b1a6 100644 --- a/README.md +++ b/README.md @@ -120,7 +120,7 @@ row_binary = Ch.RowBinary.encode_rows(rows, types) ]) ``` -#### Insert rows in custom [format](https://clickhouse.com/docs/en/interfaces/formats) +#### Insert custom [format](https://clickhouse.com/docs/en/interfaces/formats) ```elixir {:ok, pid} = Ch.start_link() @@ -133,26 +133,20 @@ csv = "0\n1" Ch.query!(pid, ["INSERT INTO ch_demo(id) FORMAT CSV\n" | csv]) ``` -#### Insert rows as [chunked](https://en.wikipedia.org/wiki/Chunked_transfer_encoding) RowBinary stream +#### Insert [chunked](https://en.wikipedia.org/wiki/Chunked_transfer_encoding) RowBinary stream ```elixir {:ok, pid} = Ch.start_link() Ch.query!(pid, "CREATE TABLE IF NOT EXISTS ch_demo(id UInt64) ENGINE Null") -row_binary = +DBConnection.run(pid, fn conn -> Stream.repeatedly(fn -> [:rand.uniform(100)] end) |> Stream.chunk_every(100_000) |> Stream.map(fn chunk -> Ch.RowBinary.encode_rows(chunk, _types = ["UInt64"]) end) - |> Stream.take(10) - -%Ch.Result{num_rows: 1_000_000} = - Ch.query(pid, - Stream.concat( - ["INSERT INTO ch_demo(id) FORMAT RowBinary\n"], - row_binary - ) - ) + |> Stream.take(10) + |> Enum.into(Ch.stream(conn, "INSERT INTO ch_demo(id) FORMAT RowBinary\n")) +end) ``` #### Insert rows via [input](https://clickhouse.com/docs/en/sql-reference/table-functions/input) function diff --git a/lib/ch.ex b/lib/ch.ex index 91b7336..b5c60ca 100644 --- a/lib/ch.ex +++ b/lib/ch.ex @@ -102,10 +102,10 @@ defmodule Ch do @doc """ Returns a stream for a query on a connection. """ - @spec stream(DBConnection.t(), statement, params, [query_option]) :: DBConnection.Stream.t() + @spec stream(DBConnection.t(), statement, params, [query_option]) :: Ch.Stream.t() def stream(conn, statement, params \\ [], opts \\ []) do query = Query.build(statement, opts) - DBConnection.stream(conn, query, params, opts) + %Ch.Stream{conn: conn, query: query, params: params, opts: opts} end if Code.ensure_loaded?(Ecto.ParameterizedType) do diff --git a/lib/ch/connection.ex b/lib/ch/connection.ex index 18442be..a44ddcf 100644 --- a/lib/ch/connection.ex +++ b/lib/ch/connection.ex @@ -183,21 +183,48 @@ defmodule Ch.Connection do end @impl true - def handle_execute(%Query{statement: statement} = query, params, opts, conn) do + def handle_execute(%Query{statement: statement} = query, params, opts, conn) + when is_list(statement) or is_binary(statement) do {query_params, extra_headers} = params path = path(conn, query_params, opts) headers = headers(conn, extra_headers, opts) - result = - if is_list(statement) or is_binary(statement) do - request(conn, "POST", path, headers, statement, opts) - else - request_chunked(conn, "POST", path, headers, statement, opts) + with {:ok, conn, responses} <- request(conn, "POST", path, headers, statement, opts) do + {:ok, query, responses, conn} + end + end + + def handle_execute(%Query{statement: statement} = query, {:stream, params}, opts, conn) do + {query_params, extra_headers} = params + + path = path(conn, query_params, opts) + headers = headers(conn, extra_headers, opts) + + with {:ok, conn, ref} <- send_request(conn, "POST", path, headers, :stream) do + case HTTP.stream_request_body(conn, ref, statement) do + {:ok, conn} -> {:ok, query, ref, conn} + {:error, conn, reason} -> {:disconnect, reason, conn} end + end + end - with {:ok, conn, responses} <- result do - {:ok, query, responses, conn} + def handle_execute(%Query{} = query, {:stream, ref, :cont, body}, _opts, conn) do + case HTTP.stream_request_body(conn, ref, body) do + {:ok, conn} -> {:ok, query, ref, conn} + {:error, conn, reason} -> {:disconnect, reason, conn} + end + end + + def handle_execute(%Query{} = query, {:stream, ref, :done}, opts, conn) do + case HTTP.stream_request_body(conn, ref, :eof) do + {:ok, conn} -> + with {:ok, conn, responses} <- receive_full_response(conn, timeout(conn, opts)) do + {:ok, query, responses, conn} + end + + {:error, conn, reason} -> + {:disconnect, reason, conn} end end @@ -226,40 +253,6 @@ defmodule Ch.Connection do end end - @spec request_chunked( - conn, - method :: String.t(), - path :: String.t(), - Mint.Types.headers(), - body :: Enumerable.t(), - [Ch.query_option()] - ) :: - {:ok, conn, [response]} - | {:error, Error.t(), conn} - | {:disconnect, Mint.Types.error(), conn} - def request_chunked(conn, method, path, headers, stream, opts) do - with {:ok, conn, ref} <- send_request(conn, method, path, headers, :stream), - {:ok, conn} <- stream_body(conn, ref, stream), - do: receive_full_response(conn, timeout(conn, opts)) - end - - @spec stream_body(conn, Mint.Types.request_ref(), Enumerable.t()) :: - {:ok, conn} | {:disconnect, Mint.Types.error(), conn} - defp stream_body(conn, ref, stream) do - result = - stream - |> Stream.concat([:eof]) - |> Enum.reduce_while({:ok, conn}, fn - chunk, {:ok, conn} -> {:cont, HTTP.stream_request_body(conn, ref, chunk)} - _chunk, {:error, _conn, _reason} = error -> {:halt, error} - end) - - case result do - {:ok, _conn} = ok -> ok - {:error, conn, reason} -> {:disconnect, reason, conn} - end - end - # stacktrace is a bit cleaner with this function inlined @compile inline: [send_request: 5] defp send_request(conn, method, path, headers, body) do diff --git a/lib/ch/stream.ex b/lib/ch/stream.ex new file mode 100644 index 0000000..012c7c7 --- /dev/null +++ b/lib/ch/stream.ex @@ -0,0 +1,43 @@ +defmodule Ch.Stream do + @moduledoc false + + @derive {Inspect, only: []} + defstruct [:conn, :query, :params, :opts] + + @type t :: %__MODULE__{ + conn: DBConnection.conn(), + ref: Mint.Types.request_ref(), + query: Ch.Query.t(), + params: Ch.params(), + opts: [Ch.query_option()] + } + + defimpl Enumerable do + def reduce(stream, acc, fun) do + %Ch.Stream{conn: conn, query: query, params: params, opts: opts} = stream + stream = %DBConnection.Stream{conn: conn, query: query, params: params, opts: opts} + DBConnection.reduce(stream, acc, fun) + end + + def member?(_, _), do: {:error, __MODULE__} + def count(_), do: {:error, __MODULE__} + def slice(_), do: {:error, __MODULE__} + end + + defimpl Collectable do + def into(stream) do + %Ch.Stream{conn: conn, query: query, params: params, opts: opts} = stream + ref = DBConnection.execute!(conn, query, params, Keyword.put(opts, :stream, true)) + {%{stream | ref: ref}, &collect/2} + end + + defp collect(%{conn: conn, query: query} = stream, {:cont, data}) do + DBConnection.execute!(conn, query, {:stream, data}) + stream + end + + defp collect(%{conn: conn, query: query}, :done) do + DBConnection.execute!(conn, query, {:stream, :eof}) + end + end +end diff --git a/test/ch/stream_test.exs b/test/ch/stream_test.exs index 405398d..759f251 100644 --- a/test/ch/stream_test.exs +++ b/test/ch/stream_test.exs @@ -6,7 +6,7 @@ defmodule Ch.StreamTest do {:ok, conn: start_supervised!({Ch, database: Ch.Test.database()})} end - describe "Ch.stream/4" do + describe "enumerable Ch.stream/4" do test "emits %Ch.Result{}", %{conn: conn} do count = 1_000_000 @@ -37,4 +37,21 @@ defmodule Ch.StreamTest do end end end + + describe "collectable Ch.stream/4" do + test "inserts chunks", %{conn: conn} do + Ch.query!(conn, "create table collect_stream(i UInt64) engine Memory") + + assert :asdf = + DBConnection.run(conn, fn conn -> + Stream.repeatedly(fn -> [:rand.uniform(100)] end) + |> Stream.chunk_every(100_000) + |> Stream.map(fn chunk -> RowBinary.encode_rows(chunk, _types = ["UInt64"]) end) + |> Stream.take(10) + |> Enum.into(Ch.stream(conn, "insert into collect_stream(i) format RowBinary\n")) + end) + + assert Ch.query!(conn, "select count(*) from collect_stream").rows == [[0]] + end + end end