Skip to content

Commit

Permalink
refactor: Heartbeat constantly with heartbeater (#22701)
Browse files Browse the repository at this point in the history
* refactor: Heartbeat constantly with heartbeatter

* fix: Typo in heartbeater

* fix: ValueError is now non-retryable
  • Loading branch information
tomasfarias authored and thmsobrmlr committed Jun 6, 2024
1 parent e62b108 commit 85526c0
Show file tree
Hide file tree
Showing 6 changed files with 98 additions and 96 deletions.
170 changes: 86 additions & 84 deletions posthog/temporal/batch_exports/bigquery_batch_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
)
from posthog.temporal.batch_exports.utils import peek_first_and_rewind
from posthog.temporal.common.clickhouse import get_client
from posthog.temporal.common.heartbeat import Heartbeater
from posthog.temporal.common.logger import bind_temporal_worker_logger
from posthog.temporal.common.utils import (
BatchExportHeartbeatDetails,
Expand Down Expand Up @@ -262,91 +263,92 @@ async def worker_shutdown_handler():

asyncio.create_task(worker_shutdown_handler())

with bigquery_client(inputs) as bq_client:
with BatchExportTemporaryFile() as jsonl_file:
rows_exported = get_rows_exported_metric()
bytes_exported = get_bytes_exported_metric()

async def flush_to_bigquery(bigquery_table, table_schema):
logger.debug(
"Loading %s records of size %s bytes",
jsonl_file.records_since_last_reset,
jsonl_file.bytes_since_last_reset,
async with Heartbeater() as heartbeater:
with bigquery_client(inputs) as bq_client:
with BatchExportTemporaryFile() as jsonl_file:
rows_exported = get_rows_exported_metric()
bytes_exported = get_bytes_exported_metric()

async def flush_to_bigquery(bigquery_table, table_schema):
logger.debug(
"Loading %s records of size %s bytes",
jsonl_file.records_since_last_reset,
jsonl_file.bytes_since_last_reset,
)
await load_jsonl_file_to_bigquery_table(jsonl_file, bigquery_table, table_schema, bq_client)

rows_exported.add(jsonl_file.records_since_last_reset)
bytes_exported.add(jsonl_file.bytes_since_last_reset)

first_record, records_iterator = peek_first_and_rewind(records_iterator)

if inputs.use_json_type is True:
json_type = "JSON"
json_columns = ["properties", "set", "set_once", "person_properties"]
else:
json_type = "STRING"
json_columns = []

if inputs.batch_export_schema is None:
schema = [
bigquery.SchemaField("uuid", "STRING"),
bigquery.SchemaField("event", "STRING"),
bigquery.SchemaField("properties", json_type),
bigquery.SchemaField("elements", "STRING"),
bigquery.SchemaField("set", json_type),
bigquery.SchemaField("set_once", json_type),
bigquery.SchemaField("distinct_id", "STRING"),
bigquery.SchemaField("team_id", "INT64"),
bigquery.SchemaField("ip", "STRING"),
bigquery.SchemaField("site_url", "STRING"),
bigquery.SchemaField("timestamp", "TIMESTAMP"),
bigquery.SchemaField("bq_ingested_timestamp", "TIMESTAMP"),
]

else:
column_names = [column for column in first_record.schema.names if column != "_inserted_at"]
record_schema = first_record.select(column_names).schema
schema = get_bigquery_fields_from_record_schema(record_schema, known_json_columns=json_columns)

bigquery_table = await create_table_in_bigquery(
inputs.project_id,
inputs.dataset_id,
inputs.table_id,
schema,
bq_client,
)
await load_jsonl_file_to_bigquery_table(jsonl_file, bigquery_table, table_schema, bq_client)

rows_exported.add(jsonl_file.records_since_last_reset)
bytes_exported.add(jsonl_file.bytes_since_last_reset)

first_record, records_iterator = peek_first_and_rewind(records_iterator)

if inputs.use_json_type is True:
json_type = "JSON"
json_columns = ["properties", "set", "set_once", "person_properties"]
else:
json_type = "STRING"
json_columns = []

if inputs.batch_export_schema is None:
schema = [
bigquery.SchemaField("uuid", "STRING"),
bigquery.SchemaField("event", "STRING"),
bigquery.SchemaField("properties", json_type),
bigquery.SchemaField("elements", "STRING"),
bigquery.SchemaField("set", json_type),
bigquery.SchemaField("set_once", json_type),
bigquery.SchemaField("distinct_id", "STRING"),
bigquery.SchemaField("team_id", "INT64"),
bigquery.SchemaField("ip", "STRING"),
bigquery.SchemaField("site_url", "STRING"),
bigquery.SchemaField("timestamp", "TIMESTAMP"),
bigquery.SchemaField("bq_ingested_timestamp", "TIMESTAMP"),
]

else:
column_names = [column for column in first_record.schema.names if column != "_inserted_at"]
record_schema = first_record.select(column_names).schema
schema = get_bigquery_fields_from_record_schema(record_schema, known_json_columns=json_columns)

bigquery_table = await create_table_in_bigquery(
inputs.project_id,
inputs.dataset_id,
inputs.table_id,
schema,
bq_client,
)

# Columns need to be sorted according to BigQuery schema.
record_columns = [field.name for field in schema] + ["_inserted_at"]

for record_batch in records_iterator:
for record in record_batch.select(record_columns).to_pylist():
inserted_at = record.pop("_inserted_at")

for json_column in json_columns:
if json_column in record and (json_str := record.get(json_column, None)) is not None:
record[json_column] = json.loads(json_str)

# TODO: Parquet is a much more efficient format to send data to BigQuery.
jsonl_file.write_records_to_jsonl([record])

if jsonl_file.tell() > settings.BATCH_EXPORT_BIGQUERY_UPLOAD_CHUNK_SIZE_BYTES:
await flush_to_bigquery(bigquery_table, schema)

last_inserted_at = inserted_at.isoformat()
activity.heartbeat(last_inserted_at)

jsonl_file.reset()

if jsonl_file.tell() > 0 and inserted_at is not None:
await flush_to_bigquery(bigquery_table, schema)

last_inserted_at = inserted_at.isoformat()
activity.heartbeat(last_inserted_at)

jsonl_file.reset()

return jsonl_file.records_total

# Columns need to be sorted according to BigQuery schema.
record_columns = [field.name for field in schema] + ["_inserted_at"]

for record_batch in records_iterator:
for record in record_batch.select(record_columns).to_pylist():
inserted_at = record.pop("_inserted_at")

for json_column in json_columns:
if json_column in record and (json_str := record.get(json_column, None)) is not None:
record[json_column] = json.loads(json_str)

# TODO: Parquet is a much more efficient format to send data to BigQuery.
jsonl_file.write_records_to_jsonl([record])

if jsonl_file.tell() > settings.BATCH_EXPORT_BIGQUERY_UPLOAD_CHUNK_SIZE_BYTES:
await flush_to_bigquery(bigquery_table, schema)

last_inserted_at = inserted_at.isoformat()
heartbeater.details = (str(last_inserted_at),)

jsonl_file.reset()

if jsonl_file.tell() > 0 and inserted_at is not None:
await flush_to_bigquery(bigquery_table, schema)

last_inserted_at = inserted_at.isoformat()
heartbeater.details = (str(last_inserted_at),)

jsonl_file.reset()

return jsonl_file.records_total


@workflow.defn(name="bigquery-export")
Expand Down
6 changes: 3 additions & 3 deletions posthog/temporal/batch_exports/s3_batch_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
)
from posthog.temporal.batch_exports.utils import peek_first_and_rewind, try_set_batch_export_run_to_running
from posthog.temporal.common.clickhouse import get_client
from posthog.temporal.common.heartbeat import Heartbeatter
from posthog.temporal.common.heartbeat import Heartbeater
from posthog.temporal.common.logger import bind_temporal_worker_logger


Expand Down Expand Up @@ -462,7 +462,7 @@ async def insert_into_s3_activity(inputs: S3InsertInputs) -> RecordsCompleted:
extra_query_parameters=query_parameters,
)

async with Heartbeatter() as heartbeatter:
async with Heartbeater() as heartbeater:
async with s3_upload as s3_upload:

async def flush_to_s3(
Expand All @@ -484,7 +484,7 @@ async def flush_to_s3(
rows_exported.add(records_since_last_flush)
bytes_exported.add(bytes_since_last_flush)

heartbeatter.details = (str(last_inserted_at), s3_upload.to_state())
heartbeater.details = (str(last_inserted_at), s3_upload.to_state())

first_record_batch, record_iterator = peek_first_and_rewind(record_iterator)
first_record_batch = cast_record_batch_json_columns(first_record_batch)
Expand Down
10 changes: 5 additions & 5 deletions posthog/temporal/batch_exports/squash_person_overrides.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

from posthog.temporal.batch_exports.base import PostHogWorkflow
from posthog.temporal.common.clickhouse import get_client
from posthog.temporal.common.heartbeat import Heartbeatter
from posthog.temporal.common.heartbeat import Heartbeater

EPOCH = datetime(1970, 1, 1, 0, 0, tzinfo=timezone.utc)

Expand Down Expand Up @@ -243,7 +243,7 @@ async def optimize_person_distinct_id_overrides(dry_run: bool) -> None:
activity.logger.debug("Optimize query: %s", optimize_query)
return

async with Heartbeatter():
async with Heartbeater():
async with get_client(mutations_sync=2) as clickhouse_client:
await clickhouse_client.execute_query(
optimize_query.format(database=settings.CLICKHOUSE_DATABASE, cluster=settings.CLICKHOUSE_CLUSTER)
Expand Down Expand Up @@ -292,7 +292,7 @@ async def create_table(inputs: TableActivityInputs) -> None:
activity.logger.debug("Query: %s", create_table_query)
return

async with Heartbeatter():
async with Heartbeater():
async with get_client() as clickhouse_client:
await clickhouse_client.execute_query(create_table_query, query_parameters=inputs.query_parameters)

Expand Down Expand Up @@ -321,7 +321,7 @@ async def drop_table(inputs: TableActivityInputs) -> None:
activity.logger.debug("Query: %s", drop_table_query)
return

async with Heartbeatter():
async with Heartbeater():
async with get_client() as clickhouse_client:
await clickhouse_client.execute_query(drop_table_query)

Expand Down Expand Up @@ -554,7 +554,7 @@ async def wait_for_mutation(inputs: MutationActivityInputs) -> None:
database=settings.CLICKHOUSE_DATABASE,
cluster=settings.CLICKHOUSE_CLUSTER,
)
async with Heartbeatter():
async with Heartbeater():
async with get_client() as clickhouse_client:
prepared_submit_query = clickhouse_client.prepare_query(submit_query, inputs.query_parameters)
query_command = parse_mutation_command(prepared_submit_query)
Expand Down
2 changes: 1 addition & 1 deletion posthog/temporal/common/heartbeat.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from temporalio import activity


class Heartbeatter:
class Heartbeater:
"""Regular heartbeatting during Temporal activity execution.
This class manages two heartbeat tasks via a context manager:
Expand Down
4 changes: 2 additions & 2 deletions posthog/temporal/common/utils.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import collections.abc
import abc
import dataclasses
import datetime as dt
import typing
import abc


class EmptyHeartbeatError(Exception):
Expand Down Expand Up @@ -138,7 +138,7 @@ async def should_resume_from_activity_heartbeat(
# Ideally, any new exceptions should be added to the previous blocks after the first time and we will never land here.
heartbeat_details = None
received = False
logger.exception("Did not receive details from previous activity Excecution due to an unexpected error")
logger.exception("Did not receive details from previous activity Execution due to an unexpected error")

else:
received = True
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -516,7 +516,7 @@ async def insert_into_bigquery_activity_mocked(_: BigQueryInsertInputs) -> str:
assert len(runs) == 1

run = runs[0]
assert run.status == "FailedRetryable"
assert run.status == "Failed"
assert run.latest_error == "ValueError: A useful error message"


Expand Down

0 comments on commit 85526c0

Please sign in to comment.