From 0c95eb62a13f266f671ef58f2aa0de3c80215cad Mon Sep 17 00:00:00 2001 From: Geoff Genz Date: Sun, 29 Sep 2024 11:50:03 -0600 Subject: [PATCH] Clean up http buffer --- CHANGELOG.md | 4 ++++ clickhouse_connect/__version__.py | 2 +- clickhouse_connect/driver/httputil.py | 22 +++++++++++++--------- 3 files changed, 18 insertions(+), 10 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7c015be8..b3848309 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,10 @@ release (0.9.0), unrecognized arguments/keywords for these methods of creating a instead of being passed as ClickHouse server settings. This is in conjunction with some refactoring in Client construction. The supported method of passing ClickHouse server settings is to prefix such arguments/query parameters with`ch_`. +## 0.8.1, 2024-09-29 +### Bug Fix +- Fixed an edge case where the HTTP buffer could theoretically return empty blocks. + ## 0.8.0, 2024-09-26 ### Experimental Feature - "New" JSON/Dynamic/Variant DataTypes #### Usage Notes diff --git a/clickhouse_connect/__version__.py b/clickhouse_connect/__version__.py index c5e4522a..398cfc4c 100644 --- a/clickhouse_connect/__version__.py +++ b/clickhouse_connect/__version__.py @@ -1 +1 @@ -version = '0.8.0' +version = '0.8.1' diff --git a/clickhouse_connect/driver/httputil.py b/clickhouse_connect/driver/httputil.py index 7dd73114..58b5460a 100644 --- a/clickhouse_connect/driver/httputil.py +++ b/clickhouse_connect/driver/httputil.py @@ -207,13 +207,15 @@ def zstd_decompress(c: deque) -> Tuple[bytes, int]: def lz_decompress(c: deque) -> Tuple[Optional[bytes], int]: read_amt = 0 - while lz4_decom.needs_input: - data = c.popleft() - read_amt += len(data) - if lz4_decom.unused_data: - data = lz4_decom.unused_data + data - return lz4_decom.decompress(data), read_amt - return None, 0 + data = c.popleft() + read_amt += len(data) + if lz4_decom.unused_data: + read_amt += len(lz4_decom.unused_data) + data = lz4_decom.unused_data + data + block = lz4_decom.decompress(data) + if lz4_decom.unused_data: + read_amt -= len(lz4_decom.unused_data) + return block, read_amt decompress = lz_decompress @@ -225,13 +227,15 @@ def buffered(): current_size = 0 read_gen = response.read_chunked(chunk_size, decompress is None) while True: - while not done and current_size < buffer_size: - chunk = next(read_gen, None) + while not done: + chunk = next(read_gen, None) # Always try to read at least one chunk if there are any left if not chunk: done = True break chunks.append(chunk) current_size += len(chunk) + if current_size > buffer_size: + break if len(chunks) == 0: return if decompress: