Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clean up http buffer #402

Merged
merged 1 commit into from
Sep 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ release (0.9.0), unrecognized arguments/keywords for these methods of creating a
instead of being passed as ClickHouse server settings. This is in conjunction with some refactoring in Client construction.
The supported method of passing ClickHouse server settings is to prefix such arguments/query parameters with`ch_`.

## 0.8.1, 2024-09-29
### Bug Fix
- Fixed an edge case where the HTTP buffer could theoretically return empty blocks.

## 0.8.0, 2024-09-26
### Experimental Feature - "New" JSON/Dynamic/Variant DataTypes
#### Usage Notes
Expand Down
2 changes: 1 addition & 1 deletion clickhouse_connect/__version__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version = '0.8.0'
version = '0.8.1'
22 changes: 13 additions & 9 deletions clickhouse_connect/driver/httputil.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,13 +207,15 @@ def zstd_decompress(c: deque) -> Tuple[bytes, int]:

def lz_decompress(c: deque) -> Tuple[Optional[bytes], int]:
read_amt = 0
while lz4_decom.needs_input:
data = c.popleft()
read_amt += len(data)
if lz4_decom.unused_data:
data = lz4_decom.unused_data + data
return lz4_decom.decompress(data), read_amt
return None, 0
data = c.popleft()
read_amt += len(data)
if lz4_decom.unused_data:
read_amt += len(lz4_decom.unused_data)
data = lz4_decom.unused_data + data
block = lz4_decom.decompress(data)
if lz4_decom.unused_data:
read_amt -= len(lz4_decom.unused_data)
return block, read_amt

decompress = lz_decompress

Expand All @@ -225,13 +227,15 @@ def buffered():
current_size = 0
read_gen = response.read_chunked(chunk_size, decompress is None)
while True:
while not done and current_size < buffer_size:
chunk = next(read_gen, None)
while not done:
chunk = next(read_gen, None) # Always try to read at least one chunk if there are any left
if not chunk:
done = True
break
chunks.append(chunk)
current_size += len(chunk)
if current_size > buffer_size:
break
if len(chunks) == 0:
return
if decompress:
Expand Down