Skip to content

Commit

Permalink
make Ch.stream/4 collectable
Browse files Browse the repository at this point in the history
  • Loading branch information
ruslandoga committed Jan 11, 2024
1 parent a10488f commit 2b34876
Show file tree
Hide file tree
Showing 7 changed files with 121 additions and 58 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
- drop `:headers` from `%Ch.Result{}` but add `:data` https://github.com/plausible/ch/pull/144
- fix query string escaping for `\t`, `\\`, and `\n` https://github.com/plausible/ch/pull/147
- make `Ch.stream/4` emit `%Ch.Result{data: iodata}` https://github.com/plausible/ch/pull/148
- make `Ch.stream/4` collectable and remove stream support in `Ch.query/4` https://github.com/plausible/ch/pull/149

## 0.2.2 (2023-12-23)

Expand Down
18 changes: 6 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ row_binary = Ch.RowBinary.encode_rows(rows, types)
])
```

#### Insert rows in custom [format](https://clickhouse.com/docs/en/interfaces/formats)
#### Insert custom [format](https://clickhouse.com/docs/en/interfaces/formats)

```elixir
{:ok, pid} = Ch.start_link()
Expand All @@ -133,26 +133,20 @@ csv = "0\n1"
Ch.query!(pid, ["INSERT INTO ch_demo(id) FORMAT CSV\n" | csv])
```

#### Insert rows as [chunked](https://en.wikipedia.org/wiki/Chunked_transfer_encoding) RowBinary stream
#### Insert [chunked](https://en.wikipedia.org/wiki/Chunked_transfer_encoding) RowBinary stream

```elixir
{:ok, pid} = Ch.start_link()

Ch.query!(pid, "CREATE TABLE IF NOT EXISTS ch_demo(id UInt64) ENGINE Null")

row_binary =
DBConnection.run(pid, fn conn ->
Stream.repeatedly(fn -> [:rand.uniform(100)] end)
|> Stream.chunk_every(100_000)
|> Stream.map(fn chunk -> Ch.RowBinary.encode_rows(chunk, _types = ["UInt64"]) end)
|> Stream.take(10)

%Ch.Result{num_rows: 1_000_000} =
Ch.query(pid,
Stream.concat(
["INSERT INTO ch_demo(id) FORMAT RowBinary\n"],
row_binary
)
)
|> Stream.take(10)
|> Enum.into(Ch.stream(conn, "INSERT INTO ch_demo(id) FORMAT RowBinary\n"))
end)
```

#### Insert rows via [input](https://clickhouse.com/docs/en/sql-reference/table-functions/input) function
Expand Down
4 changes: 2 additions & 2 deletions lib/ch.ex
Original file line number Diff line number Diff line change
Expand Up @@ -102,10 +102,10 @@ defmodule Ch do
@doc """
Returns a stream for a query on a connection.
"""
@spec stream(DBConnection.t(), statement, params, [query_option]) :: DBConnection.Stream.t()
@spec stream(DBConnection.t(), statement, params, [query_option]) :: Ch.Stream.t()
def stream(conn, statement, params \\ [], opts \\ []) do
query = Query.build(statement, opts)
DBConnection.stream(conn, query, params, opts)
%Ch.Stream{conn: conn, query: query, params: params, opts: opts}
end

if Code.ensure_loaded?(Ecto.ParameterizedType) do
Expand Down
74 changes: 33 additions & 41 deletions lib/ch/connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -183,20 +183,46 @@ defmodule Ch.Connection do
end

@impl true
def handle_execute(%Query{statement: statement} = query, params, opts, conn) do
def handle_execute(%Query{statement: statement} = query, {:stream, params}, opts, conn) do
{query_params, extra_headers} = params

path = path(conn, query_params, opts)
headers = headers(conn, extra_headers, opts)

result =
if is_list(statement) or is_binary(statement) do
request(conn, "POST", path, headers, statement, opts)
else
request_chunked(conn, "POST", path, headers, statement, opts)
with {:ok, conn, ref} <- send_request(conn, "POST", path, headers, :stream) do
case HTTP.stream_request_body(conn, ref, statement) do
{:ok, conn} -> {:ok, query, ref, conn}
{:error, conn, reason} -> {:disconnect, reason, conn}
end
end
end

def handle_execute(%Query{} = query, {:stream, ref, body}, opts, conn) do
case HTTP.stream_request_body(conn, ref, body) do
{:ok, conn} ->
case body do
:eof ->
with {:ok, conn, responses} <- receive_full_response(conn, timeout(conn, opts)) do
{:ok, query, responses, conn}
end

_other ->
{:ok, query, ref, conn}
end

{:error, conn, reason} ->
{:disconnect, reason, conn}
end
end

def handle_execute(%Query{statement: statement} = query, params, opts, conn)
when is_list(statement) or is_binary(statement) do
{query_params, extra_headers} = params

path = path(conn, query_params, opts)
headers = headers(conn, extra_headers, opts)

with {:ok, conn, responses} <- result do
with {:ok, conn, responses} <- request(conn, "POST", path, headers, statement, opts) do
{:ok, query, responses, conn}
end
end
Expand Down Expand Up @@ -226,40 +252,6 @@ defmodule Ch.Connection do
end
end

@spec request_chunked(
conn,
method :: String.t(),
path :: String.t(),
Mint.Types.headers(),
body :: Enumerable.t(),
[Ch.query_option()]
) ::
{:ok, conn, [response]}
| {:error, Error.t(), conn}
| {:disconnect, Mint.Types.error(), conn}
def request_chunked(conn, method, path, headers, stream, opts) do
with {:ok, conn, ref} <- send_request(conn, method, path, headers, :stream),
{:ok, conn} <- stream_body(conn, ref, stream),
do: receive_full_response(conn, timeout(conn, opts))
end

@spec stream_body(conn, Mint.Types.request_ref(), Enumerable.t()) ::
{:ok, conn} | {:disconnect, Mint.Types.error(), conn}
defp stream_body(conn, ref, stream) do
result =
stream
|> Stream.concat([:eof])
|> Enum.reduce_while({:ok, conn}, fn
chunk, {:ok, conn} -> {:cont, HTTP.stream_request_body(conn, ref, chunk)}
_chunk, {:error, _conn, _reason} = error -> {:halt, error}
end)

case result do
{:ok, _conn} = ok -> ok
{:error, conn, reason} -> {:disconnect, reason, conn}
end
end

# stacktrace is a bit cleaner with this function inlined
@compile inline: [send_request: 5]
defp send_request(conn, method, path, headers, body) do
Expand Down
20 changes: 18 additions & 2 deletions lib/ch/query.ex
Original file line number Diff line number Diff line change
Expand Up @@ -78,12 +78,26 @@ defimpl DBConnection.Query, for: Ch.Query do

@spec encode(Query.t(), Ch.params(), [Ch.query_option()]) ::
{Ch.Query.params(), Mint.Types.headers()}
def encode(%Query{}, params, opts) do
def encode(%Query{}, params, opts) when is_list(params) or is_map(params) do
format = Keyword.get(opts, :format, "RowBinaryWithNamesAndTypes")
headers = Keyword.get(opts, :headers, [])
{query_params(params), [{"x-clickhouse-format", format} | headers]}
end

# stream: insert init
@spec encode(Query.t(), {:stream, Ch.params()}, [Ch.query_option()]) ::
{:stream, {Ch.Query.params(), Mint.Types.headers()}}
def encode(query, {:stream, params}, opts) do
{:stream, encode(query, params, opts)}
end

# stream: insert data chunk
@spec encode(Query.t(), {:stream, Mint.Types.request_ref(), iodata | :eof}, [Ch.query_option()]) ::
{:stream, Mint.Types.request_ref(), iodata | :eof}
def encode(_query, {:stream, ref, data}, _opts) do
{:stream, ref, data}
end

@spec decode(Query.t(), [response], [Ch.query_option()]) :: Result.t()
when response: Mint.Types.status() | Mint.Types.headers() | binary
def decode(%Query{command: command}, responses, opts) when is_list(responses) do
Expand All @@ -110,8 +124,10 @@ defimpl DBConnection.Query, for: Ch.Query do
end
end

# stream result
# stream: select result
def decode(_query, %Result{} = result, _opts), do: result
# stream: insert result
def decode(_query, ref, _opts) when is_reference(ref), do: ref

defp get_header(headers, key) do
case List.keyfind(headers, key, 0) do
Expand Down
43 changes: 43 additions & 0 deletions lib/ch/stream.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
defmodule Ch.Stream do
@moduledoc false

@derive {Inspect, only: []}
defstruct [:conn, :ref, :query, :params, :opts]

@type t :: %__MODULE__{
conn: DBConnection.conn(),
ref: Mint.Types.request_ref() | nil,
query: Ch.Query.t(),
params: Ch.params(),
opts: [Ch.query_option()]
}

defimpl Enumerable do
def reduce(stream, acc, fun) do
%Ch.Stream{conn: conn, query: query, params: params, opts: opts} = stream
stream = %DBConnection.Stream{conn: conn, query: query, params: params, opts: opts}
DBConnection.reduce(stream, acc, fun)
end

def member?(_, _), do: {:error, __MODULE__}
def count(_), do: {:error, __MODULE__}
def slice(_), do: {:error, __MODULE__}
end

defimpl Collectable do
def into(stream) do
%Ch.Stream{conn: conn, query: query, params: params, opts: opts} = stream
ref = DBConnection.execute!(conn, query, {:stream, params}, opts)
{%{stream | ref: ref}, &collect/2}
end

defp collect(%{conn: conn, query: query, ref: ref} = stream, {:cont, data}) do
^ref = DBConnection.execute!(conn, query, {:stream, ref, data})
stream
end

defp collect(%{conn: conn, query: query, ref: ref}, eof) when eof in [:halt, :done] do
DBConnection.execute!(conn, query, {:stream, ref, :eof})
end
end
end
19 changes: 18 additions & 1 deletion test/ch/stream_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ defmodule Ch.StreamTest do
{:ok, conn: start_supervised!({Ch, database: Ch.Test.database()})}
end

describe "Ch.stream/4" do
describe "enumerable Ch.stream/4" do
test "emits %Ch.Result{}", %{conn: conn} do
count = 1_000_000

Expand Down Expand Up @@ -37,4 +37,21 @@ defmodule Ch.StreamTest do
end
end
end

describe "collectable Ch.stream/4" do
test "inserts chunks", %{conn: conn} do
Ch.query!(conn, "create table collect_stream(i UInt64) engine Memory")

assert %Ch.Result{command: :insert, num_rows: 1_000_000} =
DBConnection.run(conn, fn conn ->
Stream.repeatedly(fn -> [:rand.uniform(100)] end)
|> Stream.chunk_every(100_000)
|> Stream.map(fn chunk -> RowBinary.encode_rows(chunk, _types = ["UInt64"]) end)
|> Stream.take(10)
|> Enum.into(Ch.stream(conn, "insert into collect_stream(i) format RowBinary\n"))
end)

assert Ch.query!(conn, "select count(*) from collect_stream").rows == [[1_000_000]]
end
end
end

0 comments on commit 2b34876

Please sign in to comment.