Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

make Ch.stream/4 collectable #149

Merged
merged 1 commit into from
Jan 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
- drop `:headers` from `%Ch.Result{}` but add `:data` https://github.com/plausible/ch/pull/144
- fix query string escaping for `\t`, `\\`, and `\n` https://github.com/plausible/ch/pull/147
- make `Ch.stream/4` emit `%Ch.Result{data: iodata}` https://github.com/plausible/ch/pull/148
- make `Ch.stream/4` collectable and remove stream support in `Ch.query/4` https://github.com/plausible/ch/pull/149

## 0.2.2 (2023-12-23)

Expand Down
18 changes: 6 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ row_binary = Ch.RowBinary.encode_rows(rows, types)
])
```

#### Insert rows in custom [format](https://clickhouse.com/docs/en/interfaces/formats)
#### Insert custom [format](https://clickhouse.com/docs/en/interfaces/formats)

```elixir
{:ok, pid} = Ch.start_link()
Expand All @@ -133,26 +133,20 @@ csv = "0\n1"
Ch.query!(pid, ["INSERT INTO ch_demo(id) FORMAT CSV\n" | csv])
```

#### Insert rows as [chunked](https://en.wikipedia.org/wiki/Chunked_transfer_encoding) RowBinary stream
#### Insert [chunked](https://en.wikipedia.org/wiki/Chunked_transfer_encoding) RowBinary stream

```elixir
{:ok, pid} = Ch.start_link()

Ch.query!(pid, "CREATE TABLE IF NOT EXISTS ch_demo(id UInt64) ENGINE Null")

row_binary =
DBConnection.run(pid, fn conn ->
Stream.repeatedly(fn -> [:rand.uniform(100)] end)
|> Stream.chunk_every(100_000)
|> Stream.map(fn chunk -> Ch.RowBinary.encode_rows(chunk, _types = ["UInt64"]) end)
|> Stream.take(10)

%Ch.Result{num_rows: 1_000_000} =
Ch.query(pid,
Stream.concat(
["INSERT INTO ch_demo(id) FORMAT RowBinary\n"],
row_binary
)
)
|> Stream.take(10)
|> Enum.into(Ch.stream(conn, "INSERT INTO ch_demo(id) FORMAT RowBinary\n"))
end)
```

#### Insert rows via [input](https://clickhouse.com/docs/en/sql-reference/table-functions/input) function
Expand Down
6 changes: 3 additions & 3 deletions bench/insert.exs
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,12 @@ Benchee.run(
|> Stream.run()
end,
"insert stream" => fn rows ->
stream =
DBConnection.run(conn, fn conn ->
rows
|> Stream.chunk_every(60_000)
|> Stream.map(fn chunk -> RowBinary.encode_rows(chunk, types) end)

Ch.query!(conn, Stream.concat([statement], stream))
|> Enum.into(Ch.stream(conn, statement))
end)
end
},
inputs: %{
Expand Down
4 changes: 2 additions & 2 deletions lib/ch.ex
Original file line number Diff line number Diff line change
Expand Up @@ -102,10 +102,10 @@ defmodule Ch do
@doc """
Returns a stream for a query on a connection.
"""
@spec stream(DBConnection.t(), statement, params, [query_option]) :: DBConnection.Stream.t()
@spec stream(DBConnection.t(), statement, params, [query_option]) :: Ch.Stream.t()
def stream(conn, statement, params \\ [], opts \\ []) do
query = Query.build(statement, opts)
DBConnection.stream(conn, query, params, opts)
%Ch.Stream{conn: conn, query: query, params: params, opts: opts}
end

if Code.ensure_loaded?(Ecto.ParameterizedType) do
Expand Down
74 changes: 33 additions & 41 deletions lib/ch/connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -183,20 +183,46 @@ defmodule Ch.Connection do
end

@impl true
def handle_execute(%Query{statement: statement} = query, params, opts, conn) do
def handle_execute(%Query{statement: statement} = query, {:stream, params}, opts, conn) do
{query_params, extra_headers} = params

path = path(conn, query_params, opts)
headers = headers(conn, extra_headers, opts)

result =
if is_list(statement) or is_binary(statement) do
request(conn, "POST", path, headers, statement, opts)
else
request_chunked(conn, "POST", path, headers, statement, opts)
with {:ok, conn, ref} <- send_request(conn, "POST", path, headers, :stream) do
case HTTP.stream_request_body(conn, ref, statement) do
{:ok, conn} -> {:ok, query, ref, conn}
{:error, conn, reason} -> {:disconnect, reason, conn}
end
end
end

def handle_execute(%Query{} = query, {:stream, ref, body}, opts, conn) do
case HTTP.stream_request_body(conn, ref, body) do
{:ok, conn} ->
case body do
:eof ->
with {:ok, conn, responses} <- receive_full_response(conn, timeout(conn, opts)) do
{:ok, query, responses, conn}
end

_other ->
{:ok, query, ref, conn}
end

{:error, conn, reason} ->
{:disconnect, reason, conn}
end
end

def handle_execute(%Query{statement: statement} = query, params, opts, conn)
when is_list(statement) or is_binary(statement) do
{query_params, extra_headers} = params

path = path(conn, query_params, opts)
headers = headers(conn, extra_headers, opts)

with {:ok, conn, responses} <- result do
with {:ok, conn, responses} <- request(conn, "POST", path, headers, statement, opts) do
{:ok, query, responses, conn}
end
end
Expand Down Expand Up @@ -226,40 +252,6 @@ defmodule Ch.Connection do
end
end

@spec request_chunked(
conn,
method :: String.t(),
path :: String.t(),
Mint.Types.headers(),
body :: Enumerable.t(),
[Ch.query_option()]
) ::
{:ok, conn, [response]}
| {:error, Error.t(), conn}
| {:disconnect, Mint.Types.error(), conn}
def request_chunked(conn, method, path, headers, stream, opts) do
with {:ok, conn, ref} <- send_request(conn, method, path, headers, :stream),
{:ok, conn} <- stream_body(conn, ref, stream),
do: receive_full_response(conn, timeout(conn, opts))
end

@spec stream_body(conn, Mint.Types.request_ref(), Enumerable.t()) ::
{:ok, conn} | {:disconnect, Mint.Types.error(), conn}
defp stream_body(conn, ref, stream) do
result =
stream
|> Stream.concat([:eof])
|> Enum.reduce_while({:ok, conn}, fn
chunk, {:ok, conn} -> {:cont, HTTP.stream_request_body(conn, ref, chunk)}
_chunk, {:error, _conn, _reason} = error -> {:halt, error}
end)

case result do
{:ok, _conn} = ok -> ok
{:error, conn, reason} -> {:disconnect, reason, conn}
end
end

# stacktrace is a bit cleaner with this function inlined
@compile inline: [send_request: 5]
defp send_request(conn, method, path, headers, body) do
Expand Down
20 changes: 18 additions & 2 deletions lib/ch/query.ex
Original file line number Diff line number Diff line change
Expand Up @@ -78,12 +78,26 @@ defimpl DBConnection.Query, for: Ch.Query do

@spec encode(Query.t(), Ch.params(), [Ch.query_option()]) ::
{Ch.Query.params(), Mint.Types.headers()}
def encode(%Query{}, params, opts) do
def encode(%Query{}, params, opts) when is_list(params) or is_map(params) do
format = Keyword.get(opts, :format, "RowBinaryWithNamesAndTypes")
headers = Keyword.get(opts, :headers, [])
{query_params(params), [{"x-clickhouse-format", format} | headers]}
end

# stream: insert init
@spec encode(Query.t(), {:stream, Ch.params()}, [Ch.query_option()]) ::
{:stream, {Ch.Query.params(), Mint.Types.headers()}}
def encode(query, {:stream, params}, opts) do
{:stream, encode(query, params, opts)}
end

# stream: insert data chunk
@spec encode(Query.t(), {:stream, Mint.Types.request_ref(), iodata | :eof}, [Ch.query_option()]) ::
{:stream, Mint.Types.request_ref(), iodata | :eof}
def encode(_query, {:stream, ref, data}, _opts) do
{:stream, ref, data}
end

@spec decode(Query.t(), [response], [Ch.query_option()]) :: Result.t()
when response: Mint.Types.status() | Mint.Types.headers() | binary
def decode(%Query{command: command}, responses, opts) when is_list(responses) do
Expand All @@ -110,8 +124,10 @@ defimpl DBConnection.Query, for: Ch.Query do
end
end

# stream result
# stream: select result
def decode(_query, %Result{} = result, _opts), do: result
# stream: insert result
def decode(_query, ref, _opts) when is_reference(ref), do: ref

defp get_header(headers, key) do
case List.keyfind(headers, key, 0) do
Expand Down
43 changes: 43 additions & 0 deletions lib/ch/stream.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
defmodule Ch.Stream do
@moduledoc false

@derive {Inspect, only: []}
defstruct [:conn, :ref, :query, :params, :opts]

@type t :: %__MODULE__{
conn: DBConnection.conn(),
ref: Mint.Types.request_ref() | nil,
query: Ch.Query.t(),
params: Ch.params(),
opts: [Ch.query_option()]
}

defimpl Enumerable do
def reduce(stream, acc, fun) do
%Ch.Stream{conn: conn, query: query, params: params, opts: opts} = stream
stream = %DBConnection.Stream{conn: conn, query: query, params: params, opts: opts}
DBConnection.reduce(stream, acc, fun)
end

def member?(_, _), do: {:error, __MODULE__}
def count(_), do: {:error, __MODULE__}
def slice(_), do: {:error, __MODULE__}
end

defimpl Collectable do
def into(stream) do
%Ch.Stream{conn: conn, query: query, params: params, opts: opts} = stream
ref = DBConnection.execute!(conn, query, {:stream, params}, opts)
{%{stream | ref: ref}, &collect/2}
end

defp collect(%{conn: conn, query: query, ref: ref} = stream, {:cont, data}) do
^ref = DBConnection.execute!(conn, query, {:stream, ref, data})
stream
end

defp collect(%{conn: conn, query: query, ref: ref}, eof) when eof in [:halt, :done] do
DBConnection.execute!(conn, query, {:stream, ref, :eof})
end
end
end
12 changes: 7 additions & 5 deletions test/ch/connection_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -258,11 +258,13 @@ defmodule Ch.ConnectionTest do
|> Stream.chunk_every(2)
|> Stream.map(fn chunk -> RowBinary.encode_rows(chunk, types) end)

assert {:ok, %{num_rows: 3}} =
Ch.query(
conn,
Stream.concat(["insert into #{table}(a, b) format RowBinary\n"], row_binary)
)
assert %{num_rows: 3} =
DBConnection.run(conn, fn conn ->
Enum.into(
row_binary,
Ch.stream(conn, "insert into #{table}(a, b) format RowBinary\n")
)
end)

assert {:ok, %{rows: rows}} =
Ch.query(conn, "select * from {table:Identifier}", %{"table" => table})
Expand Down
74 changes: 38 additions & 36 deletions test/ch/faults_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -356,14 +356,17 @@ defmodule Ch.FaultsTest do
:ok = :gen_tcp.close(mint)

spawn_link(fn ->
assert {:error, %Mint.TransportError{reason: :closed}} =
Ch.query(
conn,
Stream.concat(
["insert into unknown_table(a,b) format RowBinary\n"],
stream
)
)
try do
DBConnection.run(conn, fn conn ->
Enum.into(
stream,
Ch.stream(conn, "insert into unknown_table(a,b) format RowBinary\n")
)
end)
rescue
e in [DBConnection.ConnectionError, Mint.TransportError] ->
assert Exception.message(e) =~ "closed"
end
end)

# reconnect
Expand All @@ -374,16 +377,14 @@ defmodule Ch.FaultsTest do
:ok = :gen_tcp.send(mint, intercept_packets(clickhouse))

spawn_link(fn ->
assert {:error, %Ch.Error{code: 60, message: message}} =
Ch.query(
conn,
Stream.concat(
["insert into unknown_table(a,b) format RowBinary\n"],
stream
)
)

assert message =~ ~r/UNKNOWN_TABLE/
assert_raise Ch.Error, ~r/UNKNOWN_TABLE/, fn ->
DBConnection.run(conn, fn conn ->
Enum.into(
stream,
Ch.stream(conn, "insert into unknown_table(a,b) format RowBinary\n")
)
end)
end

send(test, :done)
end)
Expand Down Expand Up @@ -417,14 +418,17 @@ defmodule Ch.FaultsTest do
:ok = :gen_tcp.send(mint, intercept_packets(clickhouse))

spawn_link(fn ->
assert {:error, %Mint.TransportError{reason: :closed}} =
Ch.query(
conn,
Stream.concat(
["insert into unknown_table(a,b) format RowBinary\n"],
stream
)
)
try do
DBConnection.run(conn, fn conn ->
Enum.into(
stream,
Ch.stream(conn, "insert into unknown_table(a,b) format RowBinary\n")
)
end)
rescue
e in [DBConnection.ConnectionError, Mint.TransportError] ->
assert Exception.message(e) =~ "closed"
end
end)

# close after first packet from mint arrives
Expand All @@ -439,16 +443,14 @@ defmodule Ch.FaultsTest do
:ok = :gen_tcp.send(mint, intercept_packets(clickhouse))

spawn_link(fn ->
assert {:error, %Ch.Error{code: 60, message: message}} =
Ch.query(
conn,
Stream.concat(
["insert into unknown_table(a,b) format RowBinary\n"],
stream
)
)

assert message =~ ~r/UNKNOWN_TABLE/
assert_raise Ch.Error, ~r/UNKNOWN_TABLE/, fn ->
DBConnection.run(conn, fn conn ->
Enum.into(
stream,
Ch.stream(conn, "insert into unknown_table(a,b) format RowBinary\n")
)
end)
end

send(test, :done)
end)
Expand Down
Loading