Skip to content

Commit

Permalink
make microbatch models skippable (#11020)
Browse files Browse the repository at this point in the history
  • Loading branch information
MichelleArk authored Nov 21, 2024
1 parent 6fccfe8 commit a42303c
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 5 deletions.
6 changes: 6 additions & 0 deletions .changes/unreleased/Fixes-20241121-112638.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Fixes
body: Make microbatch models skippable
time: 2024-11-21T11:26:38.192345-05:00
custom:
Author: michelleark
Issue: "11021"
16 changes: 11 additions & 5 deletions core/dbt/task/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import threading
import time
from copy import deepcopy
from dataclasses import asdict, field
from dataclasses import asdict
from datetime import datetime
from multiprocessing.pool import ThreadPool
from typing import AbstractSet, Any, Dict, Iterable, List, Optional, Set, Tuple, Type
Expand Down Expand Up @@ -334,9 +334,12 @@ def execute(self, model, manifest):


class MicrobatchModelRunner(ModelRunner):
batch_idx: Optional[int] = None
batches: Dict[int, BatchType] = field(default_factory=dict)
relation_exists: bool = False
def __init__(self, config, adapter, node, node_index: int, num_nodes: int):
super().__init__(config, adapter, node, node_index, num_nodes)

self.batch_idx: Optional[int] = None
self.batches: Dict[int, BatchType] = {}
self.relation_exists: bool = False

def set_batch_idx(self, batch_idx: int) -> None:
self.batch_idx = batch_idx
Expand Down Expand Up @@ -704,8 +707,11 @@ def handle_microbatch_model(
runner: MicrobatchModelRunner,
pool: ThreadPool,
) -> RunResult:
# Initial run computes batch metadata
# Initial run computes batch metadata, unless model is skipped
result = self.call_runner(runner)
if result.status == RunStatus.Skipped:
return result

batch_results: List[RunResult] = []

# Execute batches serially until a relation exists, at which point future batches are run in parallel
Expand Down
24 changes: 24 additions & 0 deletions tests/functional/microbatch/test_microbatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@
select 3 as id, TIMESTAMP '2020-01-03 00:00:00-0' as event_time
"""

input_model_invalid_sql = """
{{ config(materialized='table', event_time='event_time') }}
select invalid as event_time
"""

input_model_without_event_time_sql = """
{{ config(materialized='table') }}
Expand Down Expand Up @@ -835,6 +841,24 @@ def test_microbatch(
assert len(catch_aw.caught_events) == 1


class TestMicrobatchModelSkipped(BaseMicrobatchTest):
@pytest.fixture(scope="class")
def models(self):
return {
"input_model.sql": input_model_invalid_sql,
"microbatch_model.sql": microbatch_model_sql,
}

def test_microbatch_model_skipped(self, project) -> None:
run_dbt(["run"], expect_pass=False)

run_results = get_artifact(project.project_root, "target", "run_results.json")

microbatch_result = run_results["results"][1]
assert microbatch_result["status"] == "skipped"
assert microbatch_result["batch_results"] is None


class TestMicrobatchCanRunParallelOrSequential(BaseMicrobatchTest):
@pytest.fixture
def batch_exc_catcher(self) -> EventCatcher:
Expand Down

0 comments on commit a42303c

Please sign in to comment.