Skip to content

Commit

Permalink
Issue One Event Per Node Failure (#8210)
Browse files Browse the repository at this point in the history
* Replaced the FirstRunResultError and AfterFirstRunResultError events with RunResultError.

* Attempts at reasonable unit tests.

* Restore event manager after unit test.
  • Loading branch information
peterallenwebb authored Aug 2, 2023
1 parent 6130a6e commit 5814928
Show file tree
Hide file tree
Showing 7 changed files with 78 additions and 57 deletions.
7 changes: 7 additions & 0 deletions .changes/unreleased/Breaking Changes-20230725-171359.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
kind: Breaking Changes
body: Removed the FirstRunResultError and AfterFirstRunResultError event types, using
the existing RunResultError in their place.
time: 2023-07-25T17:13:59.441682-04:00
custom:
Author: peterallenwebb
Issue: "7963"
26 changes: 24 additions & 2 deletions core/dbt/events/eventmgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from logging.handlers import RotatingFileHandler
import threading
import traceback
from typing import Any, Callable, List, Optional, TextIO
from typing import Any, Callable, List, Optional, TextIO, Protocol
from uuid import uuid4
from dbt.events.format import timestamp_to_datetime_string

Expand Down Expand Up @@ -206,7 +206,7 @@ def fire_event(self, e: BaseEvent, level: Optional[EventLevel] = None) -> None:
for callback in self.callbacks:
callback(msg)

def add_logger(self, config: LoggerConfig):
def add_logger(self, config: LoggerConfig) -> None:
logger = (
_JsonLogger(self, config)
if config.line_format == LineFormat.Json
Expand All @@ -218,3 +218,25 @@ def add_logger(self, config: LoggerConfig):
def flush(self):
for logger in self.loggers:
logger.flush()


class IEventManager(Protocol):
callbacks: List[Callable[[EventMsg], None]]
invocation_id: str

def fire_event(self, e: BaseEvent, level: Optional[EventLevel] = None) -> None:
...

def add_logger(self, config: LoggerConfig) -> None:
...


class TestEventManager(IEventManager):
def __init__(self):
self.event_history = []

def fire_event(self, e: BaseEvent, level: Optional[EventLevel] = None) -> None:
self.event_history.append((e, level))

def add_logger(self, config: LoggerConfig) -> None:
raise NotImplementedError()
9 changes: 7 additions & 2 deletions core/dbt/events/functions.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from dbt.constants import METADATA_ENV_PREFIX
from dbt.events.base_types import BaseEvent, EventLevel, EventMsg
from dbt.events.eventmgr import EventManager, LoggerConfig, LineFormat, NoFilter
from dbt.events.eventmgr import EventManager, LoggerConfig, LineFormat, NoFilter, IEventManager
from dbt.events.helpers import env_secrets, scrub_secrets
from dbt.events.types import Formatting, Note
from dbt.flags import get_flags, ENABLE_LEGACY_LOGGER
Expand Down Expand Up @@ -182,7 +182,7 @@ def cleanup_event_logger():
# Since dbt-rpc does not do its own log setup, and since some events can
# currently fire before logs can be configured by setup_event_logger(), we
# create a default configuration with default settings and no file output.
EVENT_MANAGER: EventManager = EventManager()
EVENT_MANAGER: IEventManager = EventManager()
EVENT_MANAGER.add_logger(
_get_logbook_log_config(False, True, False, False) # type: ignore
if ENABLE_LEGACY_LOGGER
Expand Down Expand Up @@ -295,3 +295,8 @@ def set_invocation_id() -> None:
# This is primarily for setting the invocation_id for separate
# commands in the dbt servers. It shouldn't be necessary for the CLI.
EVENT_MANAGER.invocation_id = str(uuid.uuid4())


def ctx_set_event_manager(event_manager: IEventManager):
global EVENT_MANAGER
EVENT_MANAGER = event_manager
20 changes: 1 addition & 19 deletions core/dbt/events/types.proto
Original file line number Diff line number Diff line change
Expand Up @@ -2245,25 +2245,7 @@ message CheckNodeTestFailureMsg {
CheckNodeTestFailure data = 2;
}

// Z028
message FirstRunResultError {
string msg = 1;
}

message FirstRunResultErrorMsg {
EventInfo info = 1;
FirstRunResultError data = 2;
}

// Z029
message AfterFirstRunResultError {
string msg = 1;
}

message AfterFirstRunResultErrorMsg {
EventInfo info = 1;
AfterFirstRunResultError data = 2;
}
// Skipped Z028, Z029

// Z030
message EndOfRunSummary {
Expand Down
20 changes: 1 addition & 19 deletions core/dbt/events/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -2171,25 +2171,7 @@ def message(self) -> str:
return f" See test failures:\n {border}\n {msg}\n {border}"


# FirstRunResultError and AfterFirstRunResultError are just splitting the message from the result
# object into multiple log lines
# TODO: is this reallly needed? See printer.py


class FirstRunResultError(ErrorLevel):
def code(self):
return "Z028"

def message(self) -> str:
return yellow(self.msg)


class AfterFirstRunResultError(ErrorLevel):
def code(self):
return "Z029"

def message(self) -> str:
return self.msg
# Skipped Z028, Z029


class EndOfRunSummary(InfoLevel):
Expand Down
12 changes: 1 addition & 11 deletions core/dbt/task/printer.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@
RunResultErrorNoMessage,
SQLCompiledPath,
CheckNodeTestFailure,
FirstRunResultError,
AfterFirstRunResultError,
EndOfRunSummary,
)

Expand Down Expand Up @@ -118,15 +116,7 @@ def print_run_result_error(result, newline: bool = True, is_warning: bool = Fals
fire_event(CheckNodeTestFailure(relation_name=result.node.relation_name))

elif result.message is not None:
first = True
for line in result.message.split("\n"):
# TODO: why do we format like this? Is there a reason this needs to
# be split instead of sending it as a single log line?
if first:
fire_event(FirstRunResultError(msg=line))
first = False
else:
fire_event(AfterFirstRunResultError(msg=line))
fire_event(RunResultError(msg=result.message))


def print_run_end_messages(results, keyboard_interrupt: bool = False) -> None:
Expand Down
41 changes: 37 additions & 4 deletions tests/unit/test_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import re
from typing import TypeVar

from dbt.contracts.results import TimingInfo
from dbt.contracts.results import TimingInfo, RunResult, RunStatus
from dbt.events import AdapterLogger, types
from dbt.events.base_types import (
BaseEvent,
Expand All @@ -14,11 +14,15 @@
WarnLevel,
msg_from_base_event,
)
from dbt.events.functions import msg_to_dict, msg_to_json
from dbt.events.eventmgr import TestEventManager, EventManager
from dbt.events.functions import msg_to_dict, msg_to_json, ctx_set_event_manager
from dbt.events.helpers import get_json_string_utcnow
from dbt.events.types import RunResultError
from dbt.flags import set_from_args
from argparse import Namespace

from dbt.task.printer import print_run_result_error

set_from_args(Namespace(WARN_ERROR=False), None)


Expand Down Expand Up @@ -388,8 +392,6 @@ def test_event_codes(self):
types.RunResultErrorNoMessage(status=""),
types.SQLCompiledPath(path=""),
types.CheckNodeTestFailure(relation_name=""),
types.FirstRunResultError(msg=""),
types.AfterFirstRunResultError(msg=""),
types.EndOfRunSummary(num_errors=0, num_warnings=0, keyboard_interrupt=False),
types.LogSkipBecauseError(schema="", relation="", index=0, total=0),
types.EnsureGitInstalled(),
Expand Down Expand Up @@ -485,3 +487,34 @@ def test_bad_serialization():
str(excinfo.value)
== "[Note]: Unable to parse dict {'param_event_doesnt_have': 'This should break'}"
)


def test_single_run_error():

try:
# Add a recording event manager to the context, so we can test events.
event_mgr = TestEventManager()
ctx_set_event_manager(event_mgr)

error_result = RunResult(
status=RunStatus.Error,
timing=[],
thread_id="",
execution_time=0.0,
node=None,
adapter_response=dict(),
message="oh no!",
failures=[],
)

print_run_result_error(error_result)
events = [e for e in event_mgr.event_history if isinstance(e[0], RunResultError)]

assert len(events) == 1
assert events[0][0].msg == "oh no!"

finally:
# Set an empty event manager unconditionally on exit. This is an early
# attempt at unit testing events, and we need to think about how it
# could be done in a thread safe way in the long run.
ctx_set_event_manager(EventManager())

0 comments on commit 5814928

Please sign in to comment.