Skip to content

Commit

Permalink
Improve DLT UX (#1474)
Browse files Browse the repository at this point in the history
## Changes
- Load events with `retry` instead of `setTimeout`
- Better handle cancellation
- Show spinning icon for more DLT states
- Merge Event Log and Run Status tree sections
- Remove run "cause" item, since the same thing is visible in one of the
events

<img width="602" alt="Screenshot 2024-11-28 at 16 24 04"
src="https://github.com/user-attachments/assets/0481ca7e-d00d-4256-99ff-192fd59f696a">


## Tests
<!-- How is this tested? -->
  • Loading branch information
ilia-db authored Dec 4, 2024
1 parent ba9518d commit 27f8a9b
Show file tree
Hide file tree
Showing 11 changed files with 186 additions and 264 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -570,9 +570,10 @@ export async function locationToRange(
}

// Cell URIs are private and there is no public API to generate them.
// Here we generate a URI for a cell in the same way as VS Code does it, but without appending base64 schema to it (vsode can still parse such uris).
// Here we generate a URI for a cell in the same way as VS Code does it internally.
// https://github.com/microsoft/vscode/blob/9508be851891834c4036da28461824c664dfa2c0/src/vs/workbench/services/notebook/common/notebookDocumentService.ts#L45C41-L45C47
// As an alternative we can access these URIs by relying on open notebook editors, which means you won't get diagnostics in the problems panel unless you open a notebook.
// As an alternative we can access these URIs by relying on open notebook editors,
// which means you won't get diagnostics in the problems panel unless you open a notebook.
// (Which is how it actually is for disgnostics that python extension provides)
function generateNotebookCellURI(notebook: Uri, handle: number): Uri {
const lengths = ["W", "X", "Y", "Z", "a", "b", "c", "d", "e", "f"];
Expand Down
1 change: 1 addition & 0 deletions packages/databricks-vscode/src/bundle/run/JobRunStatus.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ export class JobRunStatus extends BundleRunStatus {
return;
}

this.runState = "cancelling";
const client = await this.authProvider.getWorkspaceClient();
await (
await client.jobs.cancelRun({run_id: parseInt(this.runId)})
Expand Down
144 changes: 76 additions & 68 deletions packages/databricks-vscode/src/bundle/run/PipelineRunStatus.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,19 @@
import {BundleRunStatus} from "./BundleRunStatus";
import {AuthProvider} from "../../configuration/auth/AuthProvider";
import {onError} from "../../utils/onErrorDecorator";
import {pipelines, WorkspaceClient} from "@databricks/databricks-sdk";
import {
logging,
pipelines,
retry,
Time,
TimeUnits,
WorkspaceClient,
} from "@databricks/databricks-sdk";
import {
LinearRetryPolicy,
RetriableError,
} from "@databricks/databricks-sdk/dist/retries/retries";
import {Loggers} from "../../logger";

function isRunning(status?: pipelines.UpdateInfoState) {
if (status === undefined) {
Expand All @@ -16,7 +28,7 @@ export class PipelineRunStatus extends BundleRunStatus {
public data: pipelines.UpdateInfo | undefined;
public events: pipelines.PipelineEvent[] | undefined;

private interval?: NodeJS.Timeout;
private logger = logging.NamedLogger.getOrCreate(Loggers.Extension);

constructor(
private readonly authProvider: AuthProvider,
Expand Down Expand Up @@ -47,43 +59,53 @@ export class PipelineRunStatus extends BundleRunStatus {
return;
}

if (this.runId === undefined) {
const runId = this.runId;
if (runId === undefined) {
throw new Error("No update id");
}

const client = await this.authProvider.getWorkspaceClient();
this.runState = "running";

this.interval = setInterval(async () => {
try {
if (this.runId === undefined) {
throw new Error("No update id");
}
const getUpdateResponse = await client.pipelines.getUpdate({
pipeline_id: this.pipelineId,
update_id: this.runId,
});
this.data = getUpdateResponse.update;

if (this.data?.creation_time !== undefined) {
this.events = await this.fetchUpdateEvents(
client,
this.data?.creation_time,
this.data?.update_id
);
}
try {
await retry({
timeout: new Time(48, TimeUnits.hours),
retryPolicy: new LinearRetryPolicy(
new Time(5, TimeUnits.seconds)
),
fn: async () => {
if (this.runState !== "running") {
return;
}
await this.updateRunData(runId);
if (isRunning(this.data?.state)) {
throw new RetriableError();
} else {
this.runState = "completed";
}
},
});
} catch (e) {
this.runState = "error";
throw e;
}
}

// If update is completed, we stop polling.
if (!isRunning(this.data?.state)) {
this.markCompleted();
} else {
this.onDidChangeEmitter.fire();
}
} catch (e) {
this.runState = "error";
throw e;
}
}, 5_000);
private async updateRunData(runId: string) {
const client = await this.authProvider.getWorkspaceClient();
const getUpdateResponse = await client.pipelines.getUpdate({
pipeline_id: this.pipelineId,
update_id: runId,
});
this.data = getUpdateResponse.update;
this.onDidChangeEmitter.fire();
if (this.data?.creation_time !== undefined) {
this.events = await this.fetchUpdateEvents(
client,
this.data?.creation_time,
this.data?.update_id
);
this.onDidChangeEmitter.fire();
}
}

private async fetchUpdateEvents(
Expand All @@ -106,47 +128,33 @@ export class PipelineRunStatus extends BundleRunStatus {
return events;
}

private markCompleted() {
if (this.interval !== undefined) {
clearInterval(this.interval);
this.interval = undefined;
}
this.runState = "completed";
}

private markCancelled() {
if (this.interval !== undefined) {
clearInterval(this.interval);
this.interval = undefined;
}
this.runState = "cancelled";
}

async cancel() {
if (this.runState !== "running" || this.runId === undefined) {
this.markCancelled();
this.runState = "cancelled";
return;
}

const client = await this.authProvider.getWorkspaceClient();
const update = await client.pipelines.getUpdate({
pipeline_id: this.pipelineId,
update_id: this.runId,
});
// Only stop the pipeline if the tracked update is still running. The stop API stops the
// latest update, which might not be the tracked update.
if (isRunning(update.update?.state)) {
await (
await client.pipelines.stop({
this.runState = "cancelling";
try {
const client = await this.authProvider.getWorkspaceClient();
const update = await client.pipelines.getUpdate({
pipeline_id: this.pipelineId,
update_id: this.runId,
});
// Only stop the pipeline if the tracked update is still running. The stop API stops the
// latest update, which might not be the tracked update.
if (isRunning(update.update?.state)) {
const stopRequest = await client.pipelines.stop({
pipeline_id: this.pipelineId,
})
).wait();
});
await stopRequest.wait();
}
await this.updateRunData(this.runId);
this.runState = "cancelled";
} catch (e) {
this.logger.error("Failed to cancel pipeline run", e);
this.runState = "error";
throw e;
}
const getUpdateResponse = await client.pipelines.getUpdate({
pipeline_id: this.pipelineId,
update_id: this.runId,
});
this.data = getUpdateResponse.update;
this.markCancelled();
}
}
1 change: 1 addition & 0 deletions packages/databricks-vscode/src/bundle/run/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@ export type RunState =
| "unknown"
| "error"
| "timeout"
| "cancelling"
| "cancelled";
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ describe("Deploy and run pipeline", async function () {
resourceExplorerView,
"Pipelines",
pipelineName,
"Completed",
"Success",
// Long timeout, as the pipeline will be waiting for its cluster to start
15 * 60 * 1000
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,11 +92,14 @@ export class JobRunStatusTreeNode implements BundleResourceExplorerTreeNode {
};
}

const status = JobRunStateUtils.getSimplifiedRunState(this.runDetails);
const icon = RunStateUtils.getThemeIconForStatus(status);
const status =
this.runMonitor?.runState === "cancelling"
? "Cancelling"
: JobRunStateUtils.getSimplifiedRunState(this.runDetails);

return {
label: "Run Status",
iconPath: icon,
iconPath: RunStateUtils.getThemeIconForStatus(status),
description: status,
contextValue: ContextUtils.getContextString({
nodeType: this.type,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
import {ThemeColor, ThemeIcon} from "vscode";
import {ContextUtils} from "./utils";
import {TreeItemTreeNode} from "../TreeItemTreeNode";
import {
EventLevel,
PipelineEvent,
} from "@databricks/databricks-sdk/dist/apis/pipelines";

export class PipelineEventTreeNode<T> extends TreeItemTreeNode<T> {
constructor(
public event: PipelineEvent,
parent: T
) {
super(
{
label: event.message ?? event.event_type ?? "unknown",
iconPath: getEventIcon(event.level),
tooltip: event.message,
contextValue: ContextUtils.getContextString({
nodeType: "pipeline_run_event",
hasPipelineDetails: hasDetails(event),
}),
},
parent
);
}
}

function hasDetails(event: PipelineEvent): boolean {
return (
event.error?.exceptions !== undefined &&
event.error.exceptions.length > 0
);
}

function getEventIcon(level: EventLevel | undefined): ThemeIcon {
switch (level) {
case "ERROR":
return new ThemeIcon(
"error",
new ThemeColor("list.errorForeground")
);
case "INFO":
return new ThemeIcon("info");
case "METRICS":
return new ThemeIcon("dashboard");
case "WARN":
return new ThemeIcon(
"warning",
new ThemeColor("list.warningForeground")
);
default:
return new ThemeIcon("question");
}
}
Loading

0 comments on commit 27f8a9b

Please sign in to comment.