Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Tidy first] move microbatch compilation to .compile method #11063

Merged
merged 2 commits into from
Nov 28, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 24 additions & 23 deletions core/dbt/task/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -342,10 +342,30 @@
self.relation_exists: bool = False

def compile(self, manifest: Manifest):
# The default compile function is _always_ called. However, we do our
# compilation _later_ in `_execute_microbatch_materialization`. This
# meant the node was being compiled _twice_ for each batch. To get around
# this, we've overriden the default compile method to do nothing
if self.batch_idx is not None:
batch = self.batches[self.batch_idx]

Check warning on line 346 in core/dbt/task/run.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/task/run.py#L345-L346

Added lines #L345 - L346 were not covered by tests

# LEGACY: Set start/end in context prior to re-compiling (Will be removed for 1.10+)
# TODO: REMOVE before 1.10 GA
self.node.config["__dbt_internal_microbatch_event_time_start"] = batch[0]
self.node.config["__dbt_internal_microbatch_event_time_end"] = batch[1]

Check warning on line 351 in core/dbt/task/run.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/task/run.py#L350-L351

Added lines #L350 - L351 were not covered by tests
# Create batch context on model node prior to re-compiling
self.node.batch = BatchContext(

Check warning on line 353 in core/dbt/task/run.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/task/run.py#L353

Added line #L353 was not covered by tests
id=MicrobatchBuilder.batch_id(batch[0], self.node.config.batch_size),
event_time_start=batch[0],
event_time_end=batch[1],
)
# Recompile node to re-resolve refs with event time filters rendered, update context
self.compiler.compile_node(

Check warning on line 359 in core/dbt/task/run.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/task/run.py#L359

Added line #L359 was not covered by tests
self.node,
manifest,
{},
split_suffix=MicrobatchBuilder.format_batch_start(
batch[0], self.node.config.batch_size
),
)

# Skips compilation for non-batch runs
return self.node

def set_batch_idx(self, batch_idx: int) -> None:
Expand Down Expand Up @@ -537,25 +557,6 @@
# call materialization_macro to get a batch-level run result
start_time = time.perf_counter()
try:
# LEGACY: Set start/end in context prior to re-compiling (Will be removed for 1.10+)
# TODO: REMOVE before 1.10 GA
model.config["__dbt_internal_microbatch_event_time_start"] = batch[0]
model.config["__dbt_internal_microbatch_event_time_end"] = batch[1]
# Create batch context on model node prior to re-compiling
model.batch = BatchContext(
id=MicrobatchBuilder.batch_id(batch[0], model.config.batch_size),
event_time_start=batch[0],
event_time_end=batch[1],
)
# Recompile node to re-resolve refs with event time filters rendered, update context
self.compiler.compile_node(
model,
manifest,
{},
split_suffix=MicrobatchBuilder.format_batch_start(
batch[0], model.config.batch_size
),
)
# Update jinja context with batch context members
jinja_context = microbatch_builder.build_jinja_context_for_batch(
incremental_batch=self.relation_exists
Expand Down
Loading