Skip to content

Commit

Permalink
Ensure consistent current_time across microbatch models in an invoc…
Browse files Browse the repository at this point in the history
…ation (#10830)

* Add test that checks microbatch models are all operating with the same `current_time`

* Set an `invocated_at` on the `RuntimeConfig` and plumb to `MicrobatchBuilder`

* Add changie doc

* Rename `invocated_at` to `invoked_at`

* Simply conditional logic for setting MicrobatchBuilder.batch_current_time

* Rename `batch_current_time` to `default_end_time` for MicrobatchBuilder
  • Loading branch information
QMalcolm authored Oct 15, 2024
1 parent ffa75ca commit d18f50b
Show file tree
Hide file tree
Showing 5 changed files with 47 additions and 2 deletions.
6 changes: 6 additions & 0 deletions .changes/unreleased/Features-20241007-115853.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Features
body: Ensure microbatch models use same `current_time` value
time: 2024-10-07T11:58:53.460941-05:00
custom:
Author: QMalcolm
Issue: "10819"
6 changes: 5 additions & 1 deletion core/dbt/config/runtime.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import itertools
import os
from copy import deepcopy
from dataclasses import dataclass
from dataclasses import dataclass, field
from datetime import datetime
from pathlib import Path
from typing import (
Any,
Expand All @@ -15,6 +16,8 @@
Type,
)

import pytz

from dbt import tracking
from dbt.adapters.contracts.connection import (
AdapterRequiredConfig,
Expand Down Expand Up @@ -98,6 +101,7 @@ class RuntimeConfig(Project, Profile, AdapterRequiredConfig):
profile_name: str
cli_vars: Dict[str, Any]
dependencies: Optional[Mapping[str, "RuntimeConfig"]] = None
invoked_at: datetime = field(default_factory=lambda: datetime.now(pytz.UTC))

def __post_init__(self):
self.validate()
Expand Down
4 changes: 3 additions & 1 deletion core/dbt/materializations/incremental/microbatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ def __init__(
is_incremental: bool,
event_time_start: Optional[datetime],
event_time_end: Optional[datetime],
default_end_time: Optional[datetime] = None,
):
if model.config.incremental_strategy != "microbatch":
raise DbtInternalError(
Expand All @@ -35,10 +36,11 @@ def __init__(
event_time_start.replace(tzinfo=pytz.UTC) if event_time_start else None
)
self.event_time_end = event_time_end.replace(tzinfo=pytz.UTC) if event_time_end else None
self.default_end_time = default_end_time or datetime.now(pytz.UTC)

def build_end_time(self):
"""Defaults the end_time to the current time in UTC unless a non `None` event_time_end was provided"""
return self.event_time_end or datetime.now(tz=pytz.utc)
return self.event_time_end or self.default_end_time

def build_start_time(self, checkpoint: Optional[datetime]):
"""Create a start time based off the passed in checkpoint.
Expand Down
1 change: 1 addition & 0 deletions core/dbt/task/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -498,6 +498,7 @@ def _execute_microbatch_materialization(
is_incremental=self._is_incremental(model),
event_time_start=getattr(self.config.args, "EVENT_TIME_START", None),
event_time_end=getattr(self.config.args, "EVENT_TIME_END", None),
default_end_time=self.config.invoked_at,
)
end = microbatch_builder.build_end_time()
start = microbatch_builder.build_start_time(end)
Expand Down
32 changes: 32 additions & 0 deletions tests/functional/microbatch/test_microbatch.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import os
from datetime import datetime
from unittest import mock

import pytest
import pytz

from dbt.events.types import LogModelResult
from dbt.tests.util import (
Expand Down Expand Up @@ -40,6 +42,11 @@
select * from {{ ref('input_model') }}
"""

microbatch_model_downstream_sql = """
{{ config(materialized='incremental', incremental_strategy='microbatch', unique_key='id', event_time='event_time', batch_size='day', begin=modules.datetime.datetime(2020, 1, 1, 0, 0, 0)) }}
select * from {{ ref('microbatch_model') }}
"""


microbatch_model_ref_render_sql = """
{{ config(materialized='incremental', incremental_strategy='microbatch', unique_key='id', event_time='event_time', batch_size='day', begin=modules.datetime.datetime(2020, 1, 1, 0, 0, 0)) }}
Expand Down Expand Up @@ -671,3 +678,28 @@ def test_run_with_event_time(self, project):
with patch_microbatch_end_time("2020-01-03 13:57:00"):
run_dbt(["run", "--full-refresh"])
self.assert_row_count(project, "microbatch_model", 3)


class TestMicrbobatchModelsRunWithSameCurrentTime(BaseMicrobatchTest):

@pytest.fixture(scope="class")
def models(self):
return {
"input_model.sql": input_model_sql,
"microbatch_model.sql": microbatch_model_sql,
"second_microbatch_model.sql": microbatch_model_downstream_sql,
}

@mock.patch.dict(os.environ, {"DBT_EXPERIMENTAL_MICROBATCH": "True"})
def test_microbatch(self, project) -> None:
current_time = datetime.now(pytz.UTC)
run_dbt(["run", "--event-time-start", current_time.strftime("%Y-%m-%d")])

run_results = get_artifact(project.project_root, "target", "run_results.json")
microbatch_model_last_batch = run_results["results"][1]["batch_results"]["successful"][-1]
second_microbatch_model_last_batch = run_results["results"][2]["batch_results"][
"successful"
][-1]

# they should have the same last batch because they are using the _same_ "current_time"
assert microbatch_model_last_batch == second_microbatch_model_last_batch

0 comments on commit d18f50b

Please sign in to comment.