diff --git a/lib/ch/connection.ex b/lib/ch/connection.ex index 3742f47..32cbacb 100644 --- a/lib/ch/connection.ex +++ b/lib/ch/connection.ex @@ -228,6 +228,9 @@ defmodule Ch.Connection do path = path(conn, query_params, opts) headers = headers(conn, extra_headers, opts) + headers = headers ++ [{"Content-Encoding", "gzip"}] + body = body |> compress_body() + result = if is_function(body, 2) do request_chunked(conn, "POST", path, headers, body, opts) @@ -279,6 +282,35 @@ defmodule Ch.Connection do do: receive_full_response(conn, timeout(conn, opts)) end + defp compress_body(body) when is_binary(body) or is_list(body), do: :zlib.gzip(body) + defp compress_body(body) when is_function(body, 2) do + eof = make_ref() + + body + |> Stream.concat([eof]) + |> Stream.transform( + fn -> + z = :zlib.open() + # https://github.com/erlang/otp/blob/OTP-26.0/erts/preloaded/src/zlib.erl#L551 + :ok = :zlib.deflateInit(z, :default, :deflated, 16 + 15, 8, :default) + z + end, + fn + ^eof, z -> + buf = :zlib.deflate(z, [], :finish) + {buf, z} + + data, z -> + buf = :zlib.deflate(z, data) + {buf, z} + end, + fn z -> + :ok = :zlib.deflateEnd(z) + :ok = :zlib.close(z) + end + ) + end + @spec stream_body(conn, Mint.Types.request_ref(), Enumerable.t()) :: {:ok, conn} | {:disconnect, Mint.Types.error(), conn} defp stream_body(conn, ref, stream) do