Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(batch-exports): Buffer batches while async flushing #25631

Merged
merged 7 commits into from
Oct 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions posthog/settings/temporal.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
BATCH_EXPORT_BIGQUERY_UPLOAD_CHUNK_SIZE_BYTES: int = 1024 * 1024 * 100 # 100MB
BATCH_EXPORT_HTTP_UPLOAD_CHUNK_SIZE_BYTES: int = 1024 * 1024 * 50 # 50MB
BATCH_EXPORT_HTTP_BATCH_SIZE: int = 5000
BATCH_EXPORT_BUFFER_QUEUE_MAX_SIZE_BYTES: int = 1024 * 1024 * 300 # 300MB

UNCONSTRAINED_TIMESTAMP_TEAM_IDS: list[str] = get_list(os.getenv("UNCONSTRAINED_TIMESTAMP_TEAM_IDS", ""))
ASYNC_ARROW_STREAMING_TEAM_IDS: list[str] = get_list(os.getenv("ASYNC_ARROW_STREAMING_TEAM_IDS", ""))
Expand Down
131 changes: 131 additions & 0 deletions posthog/temporal/batch_exports/batch_exports.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import asyncio
import collections
import collections.abc
import dataclasses
import datetime as dt
Expand Down Expand Up @@ -251,6 +253,135 @@ async def iter_records_from_model_view(
yield record_batch


class RecordBatchQueue(asyncio.Queue):
"""A queue of pyarrow RecordBatch instances limited by bytes."""

def __init__(self, max_size_bytes=0):
super().__init__(maxsize=max_size_bytes)
self._bytes_size = 0
self._schema_set = asyncio.Event()
self.record_batch_schema = None
# This is set by `asyncio.Queue.__init__` calling `_init`
self._queue: collections.deque

def _get(self) -> pa.RecordBatch:
"""Override parent `_get` to keep track of bytes."""
item = self._queue.popleft()
self._bytes_size -= item.get_total_buffer_size()
return item

def _put(self, item: pa.RecordBatch) -> None:
"""Override parent `_put` to keep track of bytes."""
self._bytes_size += item.get_total_buffer_size()

if not self._schema_set.is_set():
self.set_schema(item)

self._queue.append(item)

def set_schema(self, record_batch: pa.RecordBatch) -> None:
"""Used to keep track of schema of events in queue."""
self.record_batch_schema = record_batch.schema
self._schema_set.set()

async def get_schema(self) -> pa.Schema:
"""Return the schema of events in queue.

Currently, this is not enforced. It's purely for reporting to users of
the queue what do the record batches look like. It's up to the producer
to ensure all record batches have the same schema.
"""
await self._schema_set.wait()
return self.record_batch_schema

def qsize(self) -> int:
"""Size in bytes of record batches in the queue.

This is used to determine when the queue is full, so it returns the
number of bytes.
"""
return self._bytes_size


def start_produce_batch_export_record_batches(
client: ClickHouseClient,
model_name: str,
is_backfill: bool,
team_id: int,
interval_start: str,
interval_end: str,
fields: list[BatchExportField] | None = None,
destination_default_fields: list[BatchExportField] | None = None,
**parameters,
):
"""Start producing batch export record batches from a model query.

Depending on the model, we issue a query to ClickHouse and initialize a
producer to stream record batches to a queue. Callers can then consume from
this queue as the record batches arrive. The producer runs asynchronously as
a background task, which is returned.

Returns:
A tuple containing the record batch queue, an event used by the producer
to indicate there is nothing more to produce, and a reference to the
producer task
"""
if fields is None:
if destination_default_fields is None:
fields = default_fields()
else:
fields = destination_default_fields

if model_name == "persons":
view = SELECT_FROM_PERSONS_VIEW

else:
if parameters.get("exclude_events", None):
parameters["exclude_events"] = list(parameters["exclude_events"])
else:
parameters["exclude_events"] = []

if parameters.get("include_events", None):
parameters["include_events"] = list(parameters["include_events"])
else:
parameters["include_events"] = []

if str(team_id) in settings.UNCONSTRAINED_TIMESTAMP_TEAM_IDS:
query_template = SELECT_FROM_EVENTS_VIEW_UNBOUNDED
elif is_backfill:
query_template = SELECT_FROM_EVENTS_VIEW_BACKFILL
else:
query_template = SELECT_FROM_EVENTS_VIEW
lookback_days = settings.OVERRIDE_TIMESTAMP_TEAM_IDS.get(team_id, settings.DEFAULT_TIMESTAMP_LOOKBACK_DAYS)
parameters["lookback_days"] = lookback_days

if "_inserted_at" not in [field["alias"] for field in fields]:
control_fields = [BatchExportField(expression="_inserted_at", alias="_inserted_at")]
else:
control_fields = []

query_fields = ",".join(f"{field['expression']} AS {field['alias']}" for field in fields + control_fields)

view = query_template.substitute(fields=query_fields)

parameters["team_id"] = team_id
parameters["interval_start"] = dt.datetime.fromisoformat(interval_start).strftime("%Y-%m-%d %H:%M:%S")
parameters["interval_end"] = dt.datetime.fromisoformat(interval_end).strftime("%Y-%m-%d %H:%M:%S")
extra_query_parameters = parameters.pop("extra_query_parameters", {}) or {}
parameters = {**parameters, **extra_query_parameters}

queue = RecordBatchQueue(max_size_bytes=settings.BATCH_EXPORT_BUFFER_QUEUE_MAX_SIZE_BYTES)
query_id = uuid.uuid4()
done_event = asyncio.Event()
produce_task = asyncio.create_task(
client.aproduce_query_as_arrow_record_batches(
view, queue=queue, done_event=done_event, query_parameters=parameters, query_id=str(query_id)
)
)

return queue, done_event, produce_task


def iter_records(
client: ClickHouseClient,
team_id: int,
Expand Down
Loading
Loading