diff --git a/packages/databricks-vscode/src/bundle/BundlePipelinesManager.ts b/packages/databricks-vscode/src/bundle/BundlePipelinesManager.ts index 382469977..e24b6cf4c 100644 --- a/packages/databricks-vscode/src/bundle/BundlePipelinesManager.ts +++ b/packages/databricks-vscode/src/bundle/BundlePipelinesManager.ts @@ -570,9 +570,10 @@ export async function locationToRange( } // Cell URIs are private and there is no public API to generate them. -// Here we generate a URI for a cell in the same way as VS Code does it, but without appending base64 schema to it (vsode can still parse such uris). +// Here we generate a URI for a cell in the same way as VS Code does it internally. // https://github.com/microsoft/vscode/blob/9508be851891834c4036da28461824c664dfa2c0/src/vs/workbench/services/notebook/common/notebookDocumentService.ts#L45C41-L45C47 -// As an alternative we can access these URIs by relying on open notebook editors, which means you won't get diagnostics in the problems panel unless you open a notebook. +// As an alternative we can access these URIs by relying on open notebook editors, +// which means you won't get diagnostics in the problems panel unless you open a notebook. // (Which is how it actually is for disgnostics that python extension provides) function generateNotebookCellURI(notebook: Uri, handle: number): Uri { const lengths = ["W", "X", "Y", "Z", "a", "b", "c", "d", "e", "f"]; diff --git a/packages/databricks-vscode/src/bundle/run/JobRunStatus.ts b/packages/databricks-vscode/src/bundle/run/JobRunStatus.ts index cf4feb08a..4746d37c1 100644 --- a/packages/databricks-vscode/src/bundle/run/JobRunStatus.ts +++ b/packages/databricks-vscode/src/bundle/run/JobRunStatus.ts @@ -72,6 +72,7 @@ export class JobRunStatus extends BundleRunStatus { return; } + this.runState = "cancelling"; const client = await this.authProvider.getWorkspaceClient(); await ( await client.jobs.cancelRun({run_id: parseInt(this.runId)}) diff --git a/packages/databricks-vscode/src/bundle/run/PipelineRunStatus.ts b/packages/databricks-vscode/src/bundle/run/PipelineRunStatus.ts index 1667efaae..d84fdb691 100644 --- a/packages/databricks-vscode/src/bundle/run/PipelineRunStatus.ts +++ b/packages/databricks-vscode/src/bundle/run/PipelineRunStatus.ts @@ -2,7 +2,19 @@ import {BundleRunStatus} from "./BundleRunStatus"; import {AuthProvider} from "../../configuration/auth/AuthProvider"; import {onError} from "../../utils/onErrorDecorator"; -import {pipelines, WorkspaceClient} from "@databricks/databricks-sdk"; +import { + logging, + pipelines, + retry, + Time, + TimeUnits, + WorkspaceClient, +} from "@databricks/databricks-sdk"; +import { + LinearRetryPolicy, + RetriableError, +} from "@databricks/databricks-sdk/dist/retries/retries"; +import {Loggers} from "../../logger"; function isRunning(status?: pipelines.UpdateInfoState) { if (status === undefined) { @@ -16,7 +28,7 @@ export class PipelineRunStatus extends BundleRunStatus { public data: pipelines.UpdateInfo | undefined; public events: pipelines.PipelineEvent[] | undefined; - private interval?: NodeJS.Timeout; + private logger = logging.NamedLogger.getOrCreate(Loggers.Extension); constructor( private readonly authProvider: AuthProvider, @@ -47,43 +59,53 @@ export class PipelineRunStatus extends BundleRunStatus { return; } - if (this.runId === undefined) { + const runId = this.runId; + if (runId === undefined) { throw new Error("No update id"); } - const client = await this.authProvider.getWorkspaceClient(); this.runState = "running"; - this.interval = setInterval(async () => { - try { - if (this.runId === undefined) { - throw new Error("No update id"); - } - const getUpdateResponse = await client.pipelines.getUpdate({ - pipeline_id: this.pipelineId, - update_id: this.runId, - }); - this.data = getUpdateResponse.update; - - if (this.data?.creation_time !== undefined) { - this.events = await this.fetchUpdateEvents( - client, - this.data?.creation_time, - this.data?.update_id - ); - } + try { + await retry({ + timeout: new Time(48, TimeUnits.hours), + retryPolicy: new LinearRetryPolicy( + new Time(5, TimeUnits.seconds) + ), + fn: async () => { + if (this.runState !== "running") { + return; + } + await this.updateRunData(runId); + if (isRunning(this.data?.state)) { + throw new RetriableError(); + } else { + this.runState = "completed"; + } + }, + }); + } catch (e) { + this.runState = "error"; + throw e; + } + } - // If update is completed, we stop polling. - if (!isRunning(this.data?.state)) { - this.markCompleted(); - } else { - this.onDidChangeEmitter.fire(); - } - } catch (e) { - this.runState = "error"; - throw e; - } - }, 5_000); + private async updateRunData(runId: string) { + const client = await this.authProvider.getWorkspaceClient(); + const getUpdateResponse = await client.pipelines.getUpdate({ + pipeline_id: this.pipelineId, + update_id: runId, + }); + this.data = getUpdateResponse.update; + this.onDidChangeEmitter.fire(); + if (this.data?.creation_time !== undefined) { + this.events = await this.fetchUpdateEvents( + client, + this.data?.creation_time, + this.data?.update_id + ); + this.onDidChangeEmitter.fire(); + } } private async fetchUpdateEvents( @@ -106,47 +128,33 @@ export class PipelineRunStatus extends BundleRunStatus { return events; } - private markCompleted() { - if (this.interval !== undefined) { - clearInterval(this.interval); - this.interval = undefined; - } - this.runState = "completed"; - } - - private markCancelled() { - if (this.interval !== undefined) { - clearInterval(this.interval); - this.interval = undefined; - } - this.runState = "cancelled"; - } - async cancel() { if (this.runState !== "running" || this.runId === undefined) { - this.markCancelled(); + this.runState = "cancelled"; return; } - const client = await this.authProvider.getWorkspaceClient(); - const update = await client.pipelines.getUpdate({ - pipeline_id: this.pipelineId, - update_id: this.runId, - }); - // Only stop the pipeline if the tracked update is still running. The stop API stops the - // latest update, which might not be the tracked update. - if (isRunning(update.update?.state)) { - await ( - await client.pipelines.stop({ + this.runState = "cancelling"; + try { + const client = await this.authProvider.getWorkspaceClient(); + const update = await client.pipelines.getUpdate({ + pipeline_id: this.pipelineId, + update_id: this.runId, + }); + // Only stop the pipeline if the tracked update is still running. The stop API stops the + // latest update, which might not be the tracked update. + if (isRunning(update.update?.state)) { + const stopRequest = await client.pipelines.stop({ pipeline_id: this.pipelineId, - }) - ).wait(); + }); + await stopRequest.wait(); + } + await this.updateRunData(this.runId); + this.runState = "cancelled"; + } catch (e) { + this.logger.error("Failed to cancel pipeline run", e); + this.runState = "error"; + throw e; } - const getUpdateResponse = await client.pipelines.getUpdate({ - pipeline_id: this.pipelineId, - update_id: this.runId, - }); - this.data = getUpdateResponse.update; - this.markCancelled(); } } diff --git a/packages/databricks-vscode/src/bundle/run/types.ts b/packages/databricks-vscode/src/bundle/run/types.ts index 07cba9de7..0bfc46be0 100644 --- a/packages/databricks-vscode/src/bundle/run/types.ts +++ b/packages/databricks-vscode/src/bundle/run/types.ts @@ -4,4 +4,5 @@ export type RunState = | "unknown" | "error" | "timeout" + | "cancelling" | "cancelled"; diff --git a/packages/databricks-vscode/src/test/e2e/deploy_and_run_pipeline.e2e.ts b/packages/databricks-vscode/src/test/e2e/deploy_and_run_pipeline.e2e.ts index 747a172dd..cf83a029b 100644 --- a/packages/databricks-vscode/src/test/e2e/deploy_and_run_pipeline.e2e.ts +++ b/packages/databricks-vscode/src/test/e2e/deploy_and_run_pipeline.e2e.ts @@ -86,7 +86,7 @@ describe("Deploy and run pipeline", async function () { resourceExplorerView, "Pipelines", pipelineName, - "Completed", + "Success", // Long timeout, as the pipeline will be waiting for its cluster to start 15 * 60 * 1000 ); diff --git a/packages/databricks-vscode/src/ui/bundle-resource-explorer/JobRunStatusTreeNode.ts b/packages/databricks-vscode/src/ui/bundle-resource-explorer/JobRunStatusTreeNode.ts index 5a6308077..41cdc2713 100644 --- a/packages/databricks-vscode/src/ui/bundle-resource-explorer/JobRunStatusTreeNode.ts +++ b/packages/databricks-vscode/src/ui/bundle-resource-explorer/JobRunStatusTreeNode.ts @@ -92,11 +92,14 @@ export class JobRunStatusTreeNode implements BundleResourceExplorerTreeNode { }; } - const status = JobRunStateUtils.getSimplifiedRunState(this.runDetails); - const icon = RunStateUtils.getThemeIconForStatus(status); + const status = + this.runMonitor?.runState === "cancelling" + ? "Cancelling" + : JobRunStateUtils.getSimplifiedRunState(this.runDetails); + return { label: "Run Status", - iconPath: icon, + iconPath: RunStateUtils.getThemeIconForStatus(status), description: status, contextValue: ContextUtils.getContextString({ nodeType: this.type, diff --git a/packages/databricks-vscode/src/ui/bundle-resource-explorer/PipelineEventTreeNode.ts b/packages/databricks-vscode/src/ui/bundle-resource-explorer/PipelineEventTreeNode.ts new file mode 100644 index 000000000..fdc1417cc --- /dev/null +++ b/packages/databricks-vscode/src/ui/bundle-resource-explorer/PipelineEventTreeNode.ts @@ -0,0 +1,55 @@ +import {ThemeColor, ThemeIcon} from "vscode"; +import {ContextUtils} from "./utils"; +import {TreeItemTreeNode} from "../TreeItemTreeNode"; +import { + EventLevel, + PipelineEvent, +} from "@databricks/databricks-sdk/dist/apis/pipelines"; + +export class PipelineEventTreeNode extends TreeItemTreeNode { + constructor( + public event: PipelineEvent, + parent: T + ) { + super( + { + label: event.message ?? event.event_type ?? "unknown", + iconPath: getEventIcon(event.level), + tooltip: event.message, + contextValue: ContextUtils.getContextString({ + nodeType: "pipeline_run_event", + hasPipelineDetails: hasDetails(event), + }), + }, + parent + ); + } +} + +function hasDetails(event: PipelineEvent): boolean { + return ( + event.error?.exceptions !== undefined && + event.error.exceptions.length > 0 + ); +} + +function getEventIcon(level: EventLevel | undefined): ThemeIcon { + switch (level) { + case "ERROR": + return new ThemeIcon( + "error", + new ThemeColor("list.errorForeground") + ); + case "INFO": + return new ThemeIcon("info"); + case "METRICS": + return new ThemeIcon("dashboard"); + case "WARN": + return new ThemeIcon( + "warning", + new ThemeColor("list.warningForeground") + ); + default: + return new ThemeIcon("question"); + } +} diff --git a/packages/databricks-vscode/src/ui/bundle-resource-explorer/PipelineRunEventsTreeNode.ts b/packages/databricks-vscode/src/ui/bundle-resource-explorer/PipelineRunEventsTreeNode.ts deleted file mode 100644 index 609531ffe..000000000 --- a/packages/databricks-vscode/src/ui/bundle-resource-explorer/PipelineRunEventsTreeNode.ts +++ /dev/null @@ -1,135 +0,0 @@ -import { - BundleResourceExplorerTreeItem, - BundleResourceExplorerTreeNode, -} from "./types"; -import {ThemeColor, ThemeIcon, TreeItemCollapsibleState} from "vscode"; -import {ContextUtils} from "./utils"; -import {PipelineRunStatus} from "../../bundle/run/PipelineRunStatus"; -import {TreeItemTreeNode} from "../TreeItemTreeNode"; -import {ConnectionManager} from "../../configuration/ConnectionManager"; -import { - EventLevel, - PipelineEvent, -} from "@databricks/databricks-sdk/dist/apis/pipelines"; - -export class PipelineRunEventsTreeNode - implements BundleResourceExplorerTreeNode -{ - readonly type = "pipeline_run_events"; - - private get update() { - return this.runMonitor?.data; - } - - private get events() { - return this.runMonitor?.events; - } - - public get url() { - const {host} = this.connectionManager.databricksWorkspace ?? {}; - // eslint-disable-next-line @typescript-eslint/naming-convention - const {pipeline_id, update_id} = this.update ?? {}; - if (!host || !pipeline_id || !update_id) { - return undefined; - } - return `${host}#joblist/pipelines/${pipeline_id}/updates/${update_id}`; - } - - constructor( - private readonly connectionManager: ConnectionManager, - private readonly runMonitor: PipelineRunStatus, - public parent?: BundleResourceExplorerTreeNode - ) {} - - getChildren(): BundleResourceExplorerTreeNode[] { - if (this.events === undefined || this.events.length === 0) { - return []; - } - const children: BundleResourceExplorerTreeNode[] = []; - - for (const event of this.events) { - children.push(new PipelineEventTreeNode(event, this)); - } - - return children; - } - - isLoading(): boolean { - return ( - (this.events === undefined || this.events.length === 0) && - (this.runMonitor.runState === "running" || - this.runMonitor.runState === "unknown") - ); - } - - getTreeItem(): BundleResourceExplorerTreeItem { - if (this.isLoading()) { - return { - label: "Event Log", - iconPath: new ThemeIcon("loading~spin"), - contextValue: ContextUtils.getContextString({ - nodeType: this.type, - }), - collapsibleState: TreeItemCollapsibleState.None, - }; - } - - return { - label: "Event Log", - iconPath: new ThemeIcon("inbox"), - contextValue: ContextUtils.getContextString({ - nodeType: this.type, - hasUrl: this.url !== undefined, - }), - collapsibleState: TreeItemCollapsibleState.Expanded, - }; - } -} - -class PipelineEventTreeNode extends TreeItemTreeNode { - constructor( - public event: PipelineEvent, - parent: T - ) { - super( - { - label: event.message ?? event.event_type ?? "unknown", - iconPath: getEventIcon(event.level), - tooltip: event.message, - contextValue: ContextUtils.getContextString({ - nodeType: "pipeline_run_event", - hasPipelineDetails: hasDetails(event), - }), - }, - parent - ); - } -} - -function hasDetails(event: PipelineEvent): boolean { - return ( - event.error?.exceptions !== undefined && - event.error.exceptions.length > 0 - ); -} - -function getEventIcon(level: EventLevel | undefined): ThemeIcon { - switch (level) { - case "ERROR": - return new ThemeIcon( - "error", - new ThemeColor("list.errorForeground") - ); - case "INFO": - return new ThemeIcon("info"); - case "METRICS": - return new ThemeIcon("dashboard"); - case "WARN": - return new ThemeIcon( - "warning", - new ThemeColor("list.warningForeground") - ); - default: - return new ThemeIcon("question"); - } -} diff --git a/packages/databricks-vscode/src/ui/bundle-resource-explorer/PipelineRunStatusTreeNode.ts b/packages/databricks-vscode/src/ui/bundle-resource-explorer/PipelineRunStatusTreeNode.ts index fe020c118..a5d7bec2e 100644 --- a/packages/databricks-vscode/src/ui/bundle-resource-explorer/PipelineRunStatusTreeNode.ts +++ b/packages/databricks-vscode/src/ui/bundle-resource-explorer/PipelineRunStatusTreeNode.ts @@ -4,11 +4,12 @@ import { } from "./types"; import {ThemeIcon, TreeItemCollapsibleState} from "vscode"; import {ContextUtils, RunStateUtils} from "./utils"; -import {SimplifiedRunState, sentenceCase} from "./utils/RunStateUtils"; +import {SimplifiedRunState} from "./utils/RunStateUtils"; import {GetUpdateResponse} from "@databricks/databricks-sdk/dist/apis/pipelines"; import {PipelineRunStatus} from "../../bundle/run/PipelineRunStatus"; import {TreeItemTreeNode} from "../TreeItemTreeNode"; import {ConnectionManager} from "../../configuration/ConnectionManager"; +import {PipelineEventTreeNode} from "./PipelineEventTreeNode"; function getSimplifiedUpdateState( update?: GetUpdateResponse["update"] @@ -49,6 +50,10 @@ export class PipelineRunStatusTreeNode return this.runMonitor?.data; } + private get events() { + return this.runMonitor?.events; + } + public get url() { const {host} = this.connectionManager.databricksWorkspace ?? {}; // eslint-disable-next-line @typescript-eslint/naming-convention @@ -71,24 +76,12 @@ export class PipelineRunStatusTreeNode } const children: BundleResourceExplorerTreeNode[] = []; - if (this.update.cause) { - children.push( - new TreeItemTreeNode( - { - label: "Cause", - description: this.update.cause, - contextValue: "update_cause", - }, - this - ) - ); - } - if (this.update.creation_time) { children.push( new TreeItemTreeNode( { label: "Start Time", + iconPath: new ThemeIcon("watch"), description: RunStateUtils.humaniseDate( this.update.creation_time ), @@ -99,15 +92,11 @@ export class PipelineRunStatusTreeNode ); } - return children; - } + for (const event of this.events ?? []) { + children.push(new PipelineEventTreeNode(event, this)); + } - isLoading(): boolean { - return ( - this.update === undefined && - (this.runMonitor.runState === "running" || - this.runMonitor.runState === "unknown") - ); + return children; } getTreeItem(): BundleResourceExplorerTreeItem { @@ -122,24 +111,15 @@ export class PipelineRunStatusTreeNode return runMonitorRunStateTreeItem; } - if (this.isLoading()) { - return { - label: "Run Status", - iconPath: new ThemeIcon("loading~spin"), - contextValue: ContextUtils.getContextString({ - nodeType: this.type, - }), - collapsibleState: TreeItemCollapsibleState.None, - }; - } - - const status = getSimplifiedUpdateState(this.update); - const icon = RunStateUtils.getThemeIconForStatus(status); + const status = + this.runMonitor.runState === "cancelling" + ? "Cancelling" + : getSimplifiedUpdateState(this.update); return { label: "Run Status", - iconPath: icon, - description: sentenceCase(this.update?.state), + iconPath: RunStateUtils.getThemeIconForStatus(status), + description: status, contextValue: ContextUtils.getContextString({ nodeType: this.type, hasUrl: this.url !== undefined, diff --git a/packages/databricks-vscode/src/ui/bundle-resource-explorer/PipelineTreeNode.ts b/packages/databricks-vscode/src/ui/bundle-resource-explorer/PipelineTreeNode.ts index c13a88380..ba8931975 100644 --- a/packages/databricks-vscode/src/ui/bundle-resource-explorer/PipelineTreeNode.ts +++ b/packages/databricks-vscode/src/ui/bundle-resource-explorer/PipelineTreeNode.ts @@ -12,7 +12,6 @@ import {ConnectionManager} from "../../configuration/ConnectionManager"; import {PipelineRunStatus} from "../../bundle/run/PipelineRunStatus"; import {TreeItemTreeNode} from "../TreeItemTreeNode"; import {PipelineRunStatusTreeNode} from "./PipelineRunStatusTreeNode"; -import {PipelineRunEventsTreeNode} from "./PipelineRunEventsTreeNode"; import {ThemeIcon} from "vscode"; export class PipelineTreeNode implements BundleResourceExplorerTreeNode { @@ -103,11 +102,6 @@ export class PipelineTreeNode implements BundleResourceExplorerTreeNode { this.connectionManager, runMonitor, this - ), - new PipelineRunEventsTreeNode( - this.connectionManager, - runMonitor, - this ) ); } diff --git a/packages/databricks-vscode/src/ui/bundle-resource-explorer/utils/RunStateUtils.ts b/packages/databricks-vscode/src/ui/bundle-resource-explorer/utils/RunStateUtils.ts index 3fe2d156b..f438db54b 100644 --- a/packages/databricks-vscode/src/ui/bundle-resource-explorer/utils/RunStateUtils.ts +++ b/packages/databricks-vscode/src/ui/bundle-resource-explorer/utils/RunStateUtils.ts @@ -15,6 +15,7 @@ export type SimplifiedRunState = | "Running" | "Terminating" | "Cancelled" + | "Cancelling" | "Success" | "Unknown" | "Timeout"; @@ -54,9 +55,9 @@ export function getThemeIconForStatus(status: SimplifiedRunState): ThemeIcon { case "Skipped": return new ThemeIcon("testing-skipped-icon"); case "Pending": - return new ThemeIcon("watch"); case "Running": return new ThemeIcon("sync~spin", new ThemeColor("charts.green")); + case "Cancelling": case "Terminating": return new ThemeIcon("sync-ignored", new ThemeColor("charts.red")); case "Terminated": @@ -74,16 +75,6 @@ export function getThemeIconForStatus(status: SimplifiedRunState): ThemeIcon { } } -export function sentenceCase(str?: string, sep: string = "_") { - if (str === undefined) { - return undefined; - } - - return (str.charAt(0).toUpperCase() + str.slice(1).toLowerCase()) - .split(sep) - .join(" "); -} - export function getTreeItemFromRunMonitorStatus( type: BundleResourceExplorerTreeNode["type"], url?: string, @@ -101,7 +92,7 @@ export function getTreeItemFromRunMonitorStatus( }; } - if (runMonitor?.runState === "cancelled") { + if (runMonitor?.runState === "cancelled" && !runMonitor.data) { return { label: "Run Status", iconPath: getThemeIconForStatus("Cancelled"), @@ -113,4 +104,27 @@ export function getTreeItemFromRunMonitorStatus( collapsibleState: TreeItemCollapsibleState.None, }; } + + if (runMonitor?.runState === "error" && !runMonitor.data) { + return { + label: "Run Status", + iconPath: getThemeIconForStatus("Failed"), + description: "Failed to fetch run status", + contextValue: ContextUtils.getContextString({nodeType: type}), + collapsibleState: TreeItemCollapsibleState.None, + }; + } + + if ( + (runMonitor?.runState === "running" || + runMonitor?.runState === "unknown") && + !runMonitor.data + ) { + return { + label: "Run Status", + iconPath: new ThemeIcon("loading~spin"), + contextValue: ContextUtils.getContextString({nodeType: type}), + collapsibleState: TreeItemCollapsibleState.None, + }; + } }