refactor(batch-exports): Buffer batches while async flushing #25631
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Problem
If we keep a connection to ClickHouse open without activity for too long, it will be closed. It's unclear if the http client, pod OS, or ClickHouse is closing it. Based on ClickHouse logs, it would appear it's somewhere on the client side. I have tried all the knobs offered by
aiohttp
to keep the connection open for longer, but around 30s seems to be the limit (which, to be fair, is quite a lot).Anyways, let's not keep the connection inactive for too long. For that, we will read from the socket into a buffer of record batches. This buffer will allow us to keep on reading even if other activities down stream are slow (like flushing).
Speaking of downstream, the consumer of this queue is a writing loop that writes to one file at a time. After each file is written, we allow the main thread to continue, and start writing to a new file. This is done because flushing to a destination can be slow, so we can continue writing new files instead of waiting sequentially.
Potential improvements would write to multiple files at a time, round-robin style, but then error recovery becomes a bit more complicated, so left that out as an improvement for later.
This solution will put more memory pressure and disk pressure in our pods, but our current resource utilization is about 20%-40% of total memory requested, so we have some to spare. The size of the in-memory buffer queue can be configured to never go above a max to ensure we don't OOM.
Changes
AsyncRecordBatchProducer
class to read record batches from an async interator of bytes into a queue.aproduce_query_as_arrow_record_batches
method to ourClickHouseClient
to asynchronously read record batches into a queue from a given ClickHouse query, using the class mentioned in the previous point.start_produce_batch_export_record_batches
to initialize the production of record batches in batch exports, using the previous two items.consume_batch_export_record_batches
to initialize consumption of record batches from a queue, writing them to a file, and issuing a flush when done.google-cloud-biquery
library version. Saw some weird retry errors otherwise.👉 Stay up-to-date with PostHog coding conventions for a smoother review.
Does this work well for both Cloud and self-hosted?
How did you test this code?
Ran bigquery batch export tests, mostly as-is except that now JSON values are automagically deserialized, so no need to call
json.loads
again. All passed: