From a234b6b7b7c33ce5ddbbd1d4016be6ec20a0c191 Mon Sep 17 00:00:00 2001 From: Deepak Kovvuri Date: Tue, 30 Apr 2024 13:48:18 +0000 Subject: [PATCH] Initial Commit for Workflow Triggers --- .../@aws-cdk/aws-glue-alpha/lib/constants.ts | 41 ++ packages/@aws-cdk/aws-glue-alpha/lib/index.ts | 4 +- .../lib/triggers/conditional-triggers.ts | 7 - .../lib/triggers/notify-event-trigger.ts | 10 - .../lib/triggers/on-demand-trigger.ts | 8 - .../lib/triggers/scheduled-trigger.ts | 12 - .../lib/triggers/trigger-options.ts | 238 ++++++++++ .../aws-glue-alpha/lib/triggers/trigger.ts | 7 - .../aws-glue-alpha/lib/triggers/workflow.ts | 433 ++++++++++++++++++ .../aws-glue-alpha/test/integ.workflow.ts | 60 +++ .../test/workflow-triggers.test.ts | 289 ++++++++++++ 11 files changed, 1064 insertions(+), 45 deletions(-) delete mode 100644 packages/@aws-cdk/aws-glue-alpha/lib/triggers/conditional-triggers.ts delete mode 100644 packages/@aws-cdk/aws-glue-alpha/lib/triggers/notify-event-trigger.ts delete mode 100644 packages/@aws-cdk/aws-glue-alpha/lib/triggers/on-demand-trigger.ts delete mode 100644 packages/@aws-cdk/aws-glue-alpha/lib/triggers/scheduled-trigger.ts create mode 100644 packages/@aws-cdk/aws-glue-alpha/lib/triggers/trigger-options.ts delete mode 100644 packages/@aws-cdk/aws-glue-alpha/lib/triggers/trigger.ts create mode 100644 packages/@aws-cdk/aws-glue-alpha/lib/triggers/workflow.ts create mode 100644 packages/@aws-cdk/aws-glue-alpha/test/integ.workflow.ts create mode 100644 packages/@aws-cdk/aws-glue-alpha/test/workflow-triggers.test.ts diff --git a/packages/@aws-cdk/aws-glue-alpha/lib/constants.ts b/packages/@aws-cdk/aws-glue-alpha/lib/constants.ts index 7b1cfd7896fdf..fac94436e128b 100644 --- a/packages/@aws-cdk/aws-glue-alpha/lib/constants.ts +++ b/packages/@aws-cdk/aws-glue-alpha/lib/constants.ts @@ -238,3 +238,44 @@ export enum JobType { } +/** + * Represents the logical operator for combining multiple conditions in the Glue Trigger API. + */ +export enum PredicateLogical { + /** All conditions must be true for the predicate to be true. */ + AND = 'AND', + + /** At least one condition must be true for the predicate to be true. */ + ANY = 'ANY', +} + +/** + * Represents the logical operator for evaluating a single condition in the Glue Trigger API. + */ +export enum ConditionLogicalOperator { + /** The condition is true if the values are equal. */ + EQUALS = 'EQUALS', +} + +/** + * Represents the state of a crawler for a condition in the Glue Trigger API. + */ +export enum CrawlerState { + /** The crawler is currently running. */ + RUNNING = 'RUNNING', + + /** The crawler is in the process of being cancelled. */ + CANCELLING = 'CANCELLING', + + /** The crawler has been cancelled. */ + CANCELLED = 'CANCELLED', + + /** The crawler has completed its operation successfully. */ + SUCCEEDED = 'SUCCEEDED', + + /** The crawler has failed to complete its operation. */ + FAILED = 'FAILED', + + /** The crawler encountered an error during its operation. */ + ERROR = 'ERROR', +} diff --git a/packages/@aws-cdk/aws-glue-alpha/lib/index.ts b/packages/@aws-cdk/aws-glue-alpha/lib/index.ts index 5c5d13f9b5c76..0d81d99cd1e96 100644 --- a/packages/@aws-cdk/aws-glue-alpha/lib/index.ts +++ b/packages/@aws-cdk/aws-glue-alpha/lib/index.ts @@ -23,4 +23,6 @@ export * from './jobs/spark-ui-utils'; // export * from './jobs/spark-etl-job'; //export * from './jobs/streaming-job'; export * from './table-base'; -export * from './table-deprecated'; \ No newline at end of file +export * from './table-deprecated'; +export * from './triggers/workflow'; +export * from './triggers/trigger-options'; \ No newline at end of file diff --git a/packages/@aws-cdk/aws-glue-alpha/lib/triggers/conditional-triggers.ts b/packages/@aws-cdk/aws-glue-alpha/lib/triggers/conditional-triggers.ts deleted file mode 100644 index 487bf3b1ed291..0000000000000 --- a/packages/@aws-cdk/aws-glue-alpha/lib/triggers/conditional-triggers.ts +++ /dev/null @@ -1,7 +0,0 @@ -/** - * Conditional Trigger Class - * - * Conditional triggers have a predicate and actions associated with them. - * When the predicateCondition is true, the trigger actions will be executed. - * - */ diff --git a/packages/@aws-cdk/aws-glue-alpha/lib/triggers/notify-event-trigger.ts b/packages/@aws-cdk/aws-glue-alpha/lib/triggers/notify-event-trigger.ts deleted file mode 100644 index aaed7b7b623c8..0000000000000 --- a/packages/@aws-cdk/aws-glue-alpha/lib/triggers/notify-event-trigger.ts +++ /dev/null @@ -1,10 +0,0 @@ -/** - * Notify Event Trigger Class - * - * Workflows are mandatory for this trigger type. There are two types of notify event triggers, - * batching and non-batching trigger. - * For batching triggers, developers must specify BatchSize but for non-batching BatchSize will - * be set to 1. - * For both triggers, BatchWindow will be default to 900 seconds. - * - */ diff --git a/packages/@aws-cdk/aws-glue-alpha/lib/triggers/on-demand-trigger.ts b/packages/@aws-cdk/aws-glue-alpha/lib/triggers/on-demand-trigger.ts deleted file mode 100644 index f9aa131a1f7d2..0000000000000 --- a/packages/@aws-cdk/aws-glue-alpha/lib/triggers/on-demand-trigger.ts +++ /dev/null @@ -1,8 +0,0 @@ -/** - * On Demand Trigger Class - * - * On demand triggers can start glue jobs or crawlers. - * The trigger method will take an optional description but abstract the requirement of an - * actions list using the job or crawler objects using conditional types. - * - */ diff --git a/packages/@aws-cdk/aws-glue-alpha/lib/triggers/scheduled-trigger.ts b/packages/@aws-cdk/aws-glue-alpha/lib/triggers/scheduled-trigger.ts deleted file mode 100644 index c34e61330a519..0000000000000 --- a/packages/@aws-cdk/aws-glue-alpha/lib/triggers/scheduled-trigger.ts +++ /dev/null @@ -1,12 +0,0 @@ -/** - * Scheduled Trigger Base Class - * - * Schedule triggers are a way for developers to create jobs using cron expressions. - * We’ll provide daily, weekly, and monthly convenience functions, as well as a custom function - * that will allow developers to create their own custom timing using the existing - * event Schedule object without having to build their own cron expressions. - * - * The trigger method will take an optional description and list of Actions - * which can refer to Jobs or crawlers via conditional types. - * - */ diff --git a/packages/@aws-cdk/aws-glue-alpha/lib/triggers/trigger-options.ts b/packages/@aws-cdk/aws-glue-alpha/lib/triggers/trigger-options.ts new file mode 100644 index 0000000000000..2157249295724 --- /dev/null +++ b/packages/@aws-cdk/aws-glue-alpha/lib/triggers/trigger-options.ts @@ -0,0 +1,238 @@ +/** + * Triggers + * + * In AWS Glue, developers can use workflows to create and visualize complex extract, + * transform, and load (ETL) activities involving multiple crawlers, jobs, and triggers. + * + */ + +import * as cdk from 'aws-cdk-lib/core'; +import { JobState, CrawlerState, ConditionLogicalOperator, PredicateLogical } from '../constants'; +import { IJob } from '../jobs/job'; // Use IJob interface instead of concrete class +import { CfnCrawler } from 'aws-cdk-lib/aws-glue'; +import { ISecurityConfiguration } from '../security-configuration'; +import * as events from 'aws-cdk-lib/aws-events'; + +/** + * Represents a trigger action. + */ +export interface Action { + /** + * The job to be executed. + * + * @default - no job is executed + */ + readonly job?: IJob; + + /** + * The job arguments used when this trigger fires. + * + * @default - no arguments are passed to the job + */ + readonly arguments?: { [key: string]: string }; + + /** + * The job run timeout. This is the maximum time that a job run can consume resources before it is terminated and enters TIMEOUT status. + * + * @default - the default timeout value set in the job definition + */ + readonly timeout?: cdk.Duration; + + /** + * The `SecurityConfiguration` to be used with this action. + * + * @default - no security configuration is used + */ + readonly securityConfiguration?: ISecurityConfiguration; + + /** + * The name of the crawler to be used with this action. + * + * @default - no crawler is used + */ + readonly crawler?: CfnCrawler; +} + +/** + * Represents a trigger schedule. + */ +export class TriggerSchedule { + /** + * Creates a new TriggerSchedule instance with a cron expression. + * + * @param options The cron options for the schedule. + * @returns A new TriggerSchedule instance. + */ + public static cron(options: events.CronOptions): TriggerSchedule { + return new TriggerSchedule(events.Schedule.cron(options).expressionString); + } + + /** + * Creates a new TriggerSchedule instance with a custom expression. + * + * @param expression The custom expression for the schedule. + * @returns A new TriggerSchedule instance. + */ + public static expression(expression: string): TriggerSchedule { + return new TriggerSchedule(expression); + } + + /** + * @param expressionString The expression string for the schedule. + */ + private constructor(public readonly expressionString: string) {} +} + +/** + * Represents a trigger predicate. + */ +export interface Predicate { + /** + * The logical operator to be applied to the conditions. + * + * @default - ConditionLogical.AND if multiple conditions are provided, no logical operator if only one condition + */ + readonly logical?: PredicateLogical; + + /** + * A list of the conditions that determine when the trigger will fire. + * + * @default - no conditions are provided + */ + readonly conditions?: Condition[]; +} + +/** + * Represents a trigger condition. + */ +export interface Condition { + /** + * The logical operator for the condition. + * + * @default ConditionLogicalOperator.EQUALS + */ + readonly logicalOperator?: ConditionLogicalOperator; + + /** + * The job to which this condition applies. + * + * @default - no job is specified + */ + readonly job?: IJob; + + /** + * The condition job state. + * + * @default - no job state is specified + */ + readonly state?: JobState; + + /** + * The name of the crawler to which this condition applies. + * + * @default - no crawler is specified + */ + readonly crawlerName?: string; + + /** + * The condition crawler state. + * + * @default - no crawler state is specified + */ + readonly crawlState?: CrawlerState; +} + +/** + * Represents event trigger batch condition. + */ +export interface EventBatchingCondition { + /** + * Number of events that must be received from Amazon EventBridge before EventBridge event trigger fires. + */ + readonly batchSize: number; + + /** + * Window of time in seconds after which EventBridge event trigger fires. + * + * @default - 900 seconds + */ + readonly batchWindow?: cdk.Duration; +} + +/** + * Properties for configuring a Glue Trigger + */ +export interface TriggerOptions { + /** + * A name for the trigger. + * + * @default - no name is provided + */ + readonly name?: string; + + /** + * A description for the trigger. + * + * @default - no description + */ + readonly description?: string; + + /** + * The actions initiated by this trigger. + */ + readonly actions: Action[]; +} + +/** + * Properties for configuring an on-demand Glue Trigger. + */ +export interface OnDemandTriggerOptions extends TriggerOptions {} + +/** + * Properties for configuring a daily-scheduled Glue Trigger. + */ +export interface DailyScheduleTriggerOptions extends TriggerOptions { + /** + * Whether to start the trigger on creation or not. + * + * @default - false + */ + readonly startOnCreation?: boolean; +} + +/** + * Properties for configuring a weekly-scheduled Glue Trigger. + */ +export interface WeeklyScheduleTriggerOptions extends DailyScheduleTriggerOptions {} + +/** + * Properties for configuring a custom-scheduled Glue Trigger. + */ +export interface CustomScheduledTriggerOptions extends WeeklyScheduleTriggerOptions { + /** + * The custom schedule for the trigger. + */ + readonly schedule: TriggerSchedule; +} + +/** + * Properties for configuring an Event Bridge based Glue Trigger. + */ +export interface NotifyEventTriggerOptions extends TriggerOptions { + /** + * Batch condition for the trigger. + * + * @default - no batch condition + */ + readonly eventBatchingCondition?: EventBatchingCondition; +} + +/** + * Properties for configuring a Condition (Predicate) based Glue Trigger. + */ +export interface ConditionalTriggerOptions extends DailyScheduleTriggerOptions{ + /** + * The predicate for the trigger. + */ + readonly predicate: Predicate; +} \ No newline at end of file diff --git a/packages/@aws-cdk/aws-glue-alpha/lib/triggers/trigger.ts b/packages/@aws-cdk/aws-glue-alpha/lib/triggers/trigger.ts deleted file mode 100644 index c40c3d0efe805..0000000000000 --- a/packages/@aws-cdk/aws-glue-alpha/lib/triggers/trigger.ts +++ /dev/null @@ -1,7 +0,0 @@ -/** - * Workflow Trigger Base Class - * - * In AWS Glue, developers can use workflows to create and visualize complex extract, - * transform, and load (ETL) activities involving multiple crawlers, jobs, and triggers. - * - */ diff --git a/packages/@aws-cdk/aws-glue-alpha/lib/triggers/workflow.ts b/packages/@aws-cdk/aws-glue-alpha/lib/triggers/workflow.ts new file mode 100644 index 0000000000000..537fd9b51d1bc --- /dev/null +++ b/packages/@aws-cdk/aws-glue-alpha/lib/triggers/workflow.ts @@ -0,0 +1,433 @@ +/** + * This module defines a construct for creating and managing AWS Glue Workflows and Triggers. + * + * AWS Glue Workflows are orchestration services that allow you to create, manage, and monitor complex extract, transform, and load (ETL) activities involving multiple crawlers, jobs, and triggers. Workflows are designed to allow you to manage interdependent jobs and crawlers as a single unit, making it easier to orchestrate and monitor complex ETL pipelines. + * + * Triggers are used to initiate an AWS Glue Workflow. You can configure different types of triggers, such as on-demand, scheduled, event-based, or conditional triggers, to start your Workflow based on specific conditions or events. + * + * @see https://docs.aws.amazon.com/glue/latest/dg/workflows_overview.html + * @see https://docs.aws.amazon.com/glue/latest/dg/about-triggers.html + * + * ## Usage Example + * + * ```typescript + * import * as cdk from 'aws-cdk-lib'; + * import * as glue from 'aws-glue-cdk-lib'; + * + * const app = new cdk.App(); + * const stack = new cdk.Stack(app, 'TestStack'); + * + * // Create a Glue Job + * const job = new glue.Job(stack, 'TestJob', { + * // Job properties + * }); + * + * // Create a Glue Workflow + * const workflow = new glue.Workflow(stack, 'TestWorkflow', { + * // Workflow properties + * }); + * + * // Add an on-demand trigger to the Workflow + * workflow.addOnDemandTrigger('OnDemandTrigger', { + * actions: [{ job: job }], + * }); + * ``` + */ + +import * as cdk from 'aws-cdk-lib/core'; +import * as constructs from 'constructs'; +import { CfnWorkflow, CfnTrigger } from 'aws-cdk-lib/aws-glue'; +import { + ConditionLogicalOperator, + PredicateLogical, +} from '../constants'; +import { + Action, + TriggerSchedule, + OnDemandTriggerOptions, + WeeklyScheduleTriggerOptions, + DailyScheduleTriggerOptions, + CustomScheduledTriggerOptions, + NotifyEventTriggerOptions, + ConditionalTriggerOptions, +} from './trigger-options'; + +/** + * The base interface for Glue Workflow + * + * @see {@link Workflow} + * @see https://docs.aws.amazon.com/glue/latest/dg/workflows_overview.html + */ +export interface IWorkflow extends cdk.IResource { + /** + * The name of the workflow + * @attribute + */ + readonly workflowName: string; + + /** + * The ARN of the workflow + * @attribute + */ + readonly workflowArn: string; + + /** + * Add an on-demand trigger to the workflow + */ + addOnDemandTrigger(id: string, options: OnDemandTriggerOptions): CfnTrigger; + + /** + * Add an daily-scheduled trigger to the workflow + */ + addDailyScheduledTrigger(id: string, options: DailyScheduleTriggerOptions): CfnTrigger; + + /** + * Add an weekly-scheduled trigger to the workflow + */ + addWeeklyScheduledTrigger(id: string, options: WeeklyScheduleTriggerOptions): CfnTrigger; + + /** + * Add an custom-scheduled trigger to the workflow + */ + addCustomScheduledTrigger(id: string, options: CustomScheduledTriggerOptions): CfnTrigger; +} + +/** + * Properties for importing a Workflow using its attributes + */ +export interface WorkflowAttributes { + /** + * The name of the workflow to import + */ + readonly workflowName: string; + /** + * The ARN of the workflow to import + * + * @default - derived from the workflow name + */ + readonly workflowArn?: string; +} + +/** + * Properties for defining a Workflow + */ +export interface WorkflowProps { + /** + * Name of the workflow + * + * @default - a name will be generated + */ + readonly workflowName?: string; + + /** + * A description of the workflow + * + * @default - no description + */ + readonly description?: string; + + /** + * A map of properties to use when this workflow is executed + * + * @default - no default run properties + */ + readonly defaultRunProperties?: { [key: string]: string }; + + /** + * The maximum number of concurrent runs allowed for the workflow + * + * @default - no limit + */ + readonly maxConcurrentRuns?: number; +} + +/** + * Base abstract class for Workflow + * + * @see https://docs.aws.amazon.com/glue/latest/dg/about-triggers.html + */ +export abstract class WorkflowBase extends cdk.Resource implements IWorkflow { + /** + * Extract workflowName from arn + */ + protected static extractNameFromArn(scope: constructs.Construct, workflowArn: string): string { + return cdk.Stack.of(scope).splitArn( + workflowArn, + cdk.ArnFormat.SLASH_RESOURCE_NAME).resourceName!; + } + + public abstract readonly workflowName: string; + public abstract readonly workflowArn: string; + + /** + * Add an on-demand trigger to the workflow. + * + * @param id The id of the trigger. + * @param options Additional options for the trigger. + * @throws If both job and crawler are provided, or if neither job nor crawler is provided. + * @returns The created CfnTrigger resource. + */ + public addOnDemandTrigger(id: string, options: OnDemandTriggerOptions): CfnTrigger { + const trigger = new CfnTrigger(this, id, { + ...options, + workflowName: this.workflowName, + type: 'ON_DEMAND', + actions: options.actions?.map(this.renderAction), + description: options.description || undefined, + }); + + return trigger; + } + + /** + * Add a daily-scheduled trigger to the workflow. + * + * @param id The id of the trigger. + * @param options Additional options for the trigger. + * @throws If both job and crawler are provided, or if neither job nor crawler is provided. + * @returns The created CfnTrigger resource. + */ + public addDailyScheduledTrigger(id: string, options: DailyScheduleTriggerOptions): CfnTrigger { + const dailySchedule = TriggerSchedule.cron({ + minute: '0', + hour: '0', + }); + + const trigger = new CfnTrigger(this, id, { + ...options, + workflowName: this.workflowName, + type: 'SCHEDULED', + actions: options.actions?.map(this.renderAction), + schedule: dailySchedule.expressionString, + startOnCreation: options.startOnCreation ?? false, + }); + + return trigger; + } + + /** + * Add a weekly-scheduled trigger to the workflow. + * + * @param id The id of the trigger. + * @param options Additional options for the trigger. + * @throws If both job and crawler are provided, or if neither job nor crawler is provided. + * @returns The created CfnTrigger resource. + */ + public addWeeklyScheduledTrigger(id: string, options: WeeklyScheduleTriggerOptions): CfnTrigger { + const weeklySchedule = TriggerSchedule.cron({ + minute: '0', + hour: '0', + weekDay: 'SUN', + }); + + const trigger = new CfnTrigger(this, id, { + ...options, + workflowName: this.workflowName, + type: 'SCHEDULED', + actions: options.actions?.map(this.renderAction), + schedule: weeklySchedule.expressionString, + startOnCreation: options.startOnCreation ?? false, + }); + + return trigger; + } + + /** + * Add a custom-scheduled trigger to the workflow. + * + * @param id The id of the trigger. + * @param options Additional options for the trigger. + * @throws If both job and crawler are provided, or if neither job nor crawler is provided. + * @returns The created CfnTrigger resource. + */ + public addCustomScheduledTrigger(id: string, options: CustomScheduledTriggerOptions): CfnTrigger { + const trigger = new CfnTrigger(this, id, { + ...options, + workflowName: this.workflowName, + type: 'SCHEDULED', + actions: options.actions?.map(this.renderAction), + schedule: options.schedule.expressionString, + startOnCreation: options.startOnCreation ?? false, + }); + + return trigger; + } + + /** + * Add an Event Bridge based trigger to the workflow. + * + * @param id The id of the trigger. + * @param options Additional options for the trigger. + * @throws If both job and crawler are provided, or if neither job nor crawler is provided. + * @returns The created CfnTrigger resource. + */ + public addNotifyEventTrigger(id: string, options: NotifyEventTriggerOptions): CfnTrigger { + const trigger = new CfnTrigger(this, id, { + ...options, + workflowName: this.workflowName, + type: 'EVENT', + actions: options.actions?.map(this.renderAction), + eventBatchingCondition: this.renderEventBatchingCondition(options), + description: options.description ?? undefined, + }); + + return trigger; + } + + /** + * Add a Condition (Predicate) based trigger to the workflow. + * + * @param id The id of the trigger. + * @param options Additional options for the trigger. + * @throws If both job and crawler are provided, or if neither job nor crawler is provided for any condition. + * @throws If a job is provided without a job state, or if a crawler is provided without a crawler state for any condition. + * @returns The created CfnTrigger resource. + */ + public addconditionalTrigger(id: string, options: ConditionalTriggerOptions): CfnTrigger { + const trigger = new CfnTrigger(this, id, { + ...options, + workflowName: this.workflowName, + type: 'CONDITIONAL', + actions: options.actions?.map(this.renderAction), + predicate: this.renderPredicate(options), + eventBatchingCondition: this.renderEventBatchingCondition(options), + description: options.description ?? undefined, + }); + + return trigger; + } + + private renderAction(action: Action): CfnTrigger.ActionProperty { + // Validate that either job or crawler is provided, but not both + if (!action.job && !action.crawler) { + throw new Error('You must provide either a job or a crawler for the action.'); + } else if (action.job && action.crawler) { + throw new Error('You cannot provide both a job and a crawler for the action.'); + } + + return { + jobName: action.job?.jobName, + arguments: action.arguments, + timeout: action.timeout?.toMinutes(), + securityConfiguration: action.securityConfiguration?.securityConfigurationName, + crawlerName: action.crawler?.name, + }; + } + + private renderPredicate(props: ConditionalTriggerOptions): CfnTrigger.PredicateProperty { + const conditions = props.predicate.conditions?.map(condition => { + // Validate that either job or crawler is provided, but not both + if (!condition.job && !condition.crawlerName) { + throw new Error('You must provide either a job or a crawler for the condition.'); + } else if (condition.job && condition.crawlerName) { + throw new Error('You cannot provide both a job and a crawler for the condition.'); + } + + // Validate that if job is provided, job state is also provided + if (condition.job && !condition.state) { + throw new Error('If you provide a job for the condition, you must also provide a job state.'); + } + + // Validate that if crawler is provided, crawler state is also provided + if (condition.crawlerName && !condition.crawlState) { + throw new Error('If you provide a crawler for the condition, you must also provide a crawler state.'); + } + + return { + logicalOperator: condition.logicalOperator ?? ConditionLogicalOperator.EQUALS, + jobName: condition.job?.jobName ?? undefined, + state: condition.state ?? undefined, + crawlerName: condition.crawlerName ?? undefined, + crawlState: condition.crawlState ?? undefined, + }; + }); + + return { + logical: props.predicate.conditions?.length === 1 ? undefined : props.predicate.logical ?? PredicateLogical.AND, + conditions: conditions, + }; + } + + private renderEventBatchingCondition(props: NotifyEventTriggerOptions): CfnTrigger.EventBatchingConditionProperty { + + const defaultBatchSize = 1; + const defaultBatchWindow = cdk.Duration.seconds(900).toSeconds(); + + if (!props.eventBatchingCondition) { + return { + batchSize: defaultBatchSize, + batchWindow: defaultBatchWindow, + }; + } + + return { + batchSize: props.eventBatchingCondition.batchSize || defaultBatchSize, + batchWindow: props.eventBatchingCondition.batchWindow?.toSeconds() || defaultBatchWindow, + }; + } + + protected buildWorkflowArn(scope: constructs.Construct, workflowName: string): string { + return cdk.Stack.of(scope).formatArn({ + service: 'glue', + resource: 'workflow', + resourceName: workflowName, + }); + } +} + +/** + * A class used for defining a Glue Workflow + * + * @resource AWS::Glue::Workflow + */ +export class Workflow extends WorkflowBase { + /** + * Import a workflow from its name + */ + public static fromWorkflowName(scope: constructs.Construct, id: string, workflowName: string): IWorkflow { + return this.fromWorkflowAttributes(scope, id, { + workflowName, + }); + } + + /** + * Import an workflow from it's name + */ + public static fromWorkflowArn(scope: constructs.Construct, id: string, workflowArn: string): IWorkflow { + return this.fromWorkflowAttributes(scope, id, { + workflowName: this.extractNameFromArn(scope, workflowArn), + workflowArn, + }); + } + + /** + * Import an existing workflow + */ + public static fromWorkflowAttributes(scope: constructs.Construct, id: string, attrs: WorkflowAttributes): IWorkflow { + class Import extends WorkflowBase { + public readonly workflowName = attrs.workflowName; + public readonly workflowArn = this.buildWorkflowArn(scope, this.workflowName); + } + + return new Import(scope, id); + } + + public readonly workflowName: string; + public readonly workflowArn: string; + + constructor(scope: constructs.Construct, id: string, props?: WorkflowProps) { + super(scope, id, { + physicalName: props?.workflowName, + }); + + const resource = new CfnWorkflow(this, 'Resource', { + name: this.physicalName, + description: props?.description, + defaultRunProperties: props?.defaultRunProperties, + maxConcurrentRuns: props?.maxConcurrentRuns, + }); + + this.workflowName = this.getResourceNameAttribute(resource.ref); + this.workflowArn = this.buildWorkflowArn(this, this.workflowName); + } +} \ No newline at end of file diff --git a/packages/@aws-cdk/aws-glue-alpha/test/integ.workflow.ts b/packages/@aws-cdk/aws-glue-alpha/test/integ.workflow.ts new file mode 100644 index 0000000000000..5fb91a398d4ad --- /dev/null +++ b/packages/@aws-cdk/aws-glue-alpha/test/integ.workflow.ts @@ -0,0 +1,60 @@ +import * as integ from '@aws-cdk/integ-tests-alpha'; +import * as cdk from 'aws-cdk-lib'; +import * as glue from '../lib'; +import * as iam from 'aws-cdk-lib/aws-iam'; +import * as path from 'path'; + +const app = new cdk.App(); +const stack = new cdk.Stack(app, 'GlueWorkflowTriggerStack'); + +const workflow = new glue.Workflow(stack, 'Workflow', { + description: 'MyWorkflow', +}); + +const role = new iam.Role(stack, 'JobRole', { + assumedBy: new iam.ServicePrincipal('glue.amazonaws.com'), +}); + +const script = glue.Code.fromAsset(path.join(__dirname, 'job-script', 'hello_world.py')); + +const OutboundJob = new glue.PySparkEtlJob(stack, 'OutboundJob', { + script: script, + role, + glueVersion: glue.GlueVersion.V4_0, + workerType: glue.WorkerType.G_2X, + numberOrWorkers: 2, +}); + +const InboundJob = new glue.PySparkEtlJob(stack, 'InboundJob', { + script: script, + role, + glueVersion: glue.GlueVersion.V4_0, + workerType: glue.WorkerType.G_2X, + numberOrWorkers: 2, +}); + +workflow.addOnDemandTrigger('OnDemandTrigger', { + actions: [{ job: InboundJob }], +}); + +workflow.addconditionalTrigger('ConditionalTrigger', { + actions: [{ job: OutboundJob }], + predicate: { + conditions: [ + { + job: InboundJob, + state: glue.JobState.SUCCEEDED, + }, + ], + }, +}); + +new cdk.CfnOutput(stack, 'WorkflowName', { + value: workflow.workflowName, +}); + +new integ.IntegTest(app, 'aws-cdk-glue-workflow-trigger-integ', { + testCases: [stack], +}); + +app.synth(); \ No newline at end of file diff --git a/packages/@aws-cdk/aws-glue-alpha/test/workflow-triggers.test.ts b/packages/@aws-cdk/aws-glue-alpha/test/workflow-triggers.test.ts new file mode 100644 index 0000000000000..74d80b7b455c4 --- /dev/null +++ b/packages/@aws-cdk/aws-glue-alpha/test/workflow-triggers.test.ts @@ -0,0 +1,289 @@ +import * as cdk from 'aws-cdk-lib'; +import { Template, Capture } from 'aws-cdk-lib/assertions'; +import * as glue from '../lib'; +import { TriggerSchedule } from '../lib/triggers/trigger-options'; +import * as iam from 'aws-cdk-lib/aws-iam'; + +describe('Workflow and Triggers', () => { + let stack: cdk.Stack; + let workflow: glue.Workflow; + let job: glue.PySparkEtlJob; + let role: iam.Role; + + beforeEach(() => { + stack = new cdk.Stack(); + workflow = new glue.Workflow(stack, 'Workflow', { + description: 'MyWorkflow', + }); + + role = new iam.Role(stack, 'JobRole', { + assumedBy: new iam.ServicePrincipal('glue.amazonaws.com'), + }); + + job = new glue.PySparkEtlJob(stack, 'Job', { + script: glue.Code.fromAsset('test/job-script/hello_world.py'), + role, + glueVersion: glue.GlueVersion.V4_0, + workerType: glue.WorkerType.G_2X, + numberOrWorkers: 10, + }); + }); + + test('creates a workflow with triggers and actions', () => { + workflow.addOnDemandTrigger('OnDemandTrigger', { + actions: [{ job }], + }); + + Template.fromStack(stack).hasResourceProperties('AWS::Glue::Workflow', { + Description: 'MyWorkflow', + }); + + Template.fromStack(stack).resourceCountIs('AWS::Glue::Trigger', 1); + + const workflowReference = new Capture(); + const actionReference = new Capture(); + Template.fromStack(stack).hasResourceProperties('AWS::Glue::Trigger', { + Type: 'ON_DEMAND', + WorkflowName: workflowReference, + Actions: [actionReference], + }); + + expect(workflowReference.asObject()).toEqual( + { + Ref: 'Workflow193EF7C1', + }, + ); + + expect(actionReference.asObject()).toEqual( + { + JobName: { + Ref: 'JobB9D00F9F', + }, + }, + ); + + }); + + test('creates a workflow with conditional trigger', () => { + workflow.addconditionalTrigger('ConditionalTrigger', { + actions: [{ job }], + predicate: { + conditions: [ + { + job, + state: glue.JobState.SUCCEEDED, + }, + ], + }, + }); + + Template.fromStack(stack).resourceCountIs('AWS::Glue::Trigger', 1); + + const workflowReference = new Capture(); + const actionReference = new Capture(); + const predicateReference = new Capture(); + Template.fromStack(stack).hasResourceProperties('AWS::Glue::Trigger', { + Type: 'CONDITIONAL', + WorkflowName: workflowReference, + Actions: [actionReference], + Predicate: predicateReference, + }); + + expect(workflowReference.asObject()).toEqual( + expect.objectContaining({ + Ref: 'Workflow193EF7C1', + }), + ); + + expect(actionReference.asObject()).toEqual( + expect.objectContaining({ + JobName: { + Ref: 'JobB9D00F9F', + }, + }), + ); + + expect(predicateReference.asObject()).toEqual( + expect.objectContaining({ + Conditions: [ + { + JobName: { + Ref: 'JobB9D00F9F', + }, + LogicalOperator: 'EQUALS', + State: 'SUCCEEDED', + }, + ], + }), + ); + }); + + test('creates a workflow with daily scheduled trigger', () => { + workflow.addDailyScheduledTrigger('DailyScheduledTrigger', { + actions: [{ job }], + startOnCreation: true, + }); + + Template.fromStack(stack).resourceCountIs('AWS::Glue::Trigger', 1); + + const workflowReference = new Capture(); + const actionReference = new Capture(); + Template.fromStack(stack).hasResourceProperties('AWS::Glue::Trigger', { + Type: 'SCHEDULED', + WorkflowName: workflowReference, + Schedule: 'cron(0 0 * * ? *)', + StartOnCreation: true, + Actions: [actionReference], + }); + + expect(workflowReference.asObject()).toEqual( + expect.objectContaining({ + Ref: 'Workflow193EF7C1', + }), + ); + + expect(actionReference.asObject()).toEqual( + expect.objectContaining({ + JobName: { + Ref: 'JobB9D00F9F', + }, + }), + ); + }); + + test('creates a workflow with weekly scheduled trigger', () => { + workflow.addWeeklyScheduledTrigger('WeeklyScheduledTrigger', { + actions: [{ job }], + startOnCreation: false, + }); + + Template.fromStack(stack).resourceCountIs('AWS::Glue::Trigger', 1); + + const workflowReference = new Capture(); + const actionReference = new Capture(); + Template.fromStack(stack).hasResourceProperties('AWS::Glue::Trigger', { + Type: 'SCHEDULED', + WorkflowName: workflowReference, + Schedule: 'cron(0 0 ? * SUN *)', + StartOnCreation: false, + Actions: [actionReference], + }); + + expect(workflowReference.asObject()).toEqual( + expect.objectContaining({ + Ref: 'Workflow193EF7C1', + }), + ); + + expect(actionReference.asObject()).toEqual( + expect.objectContaining({ + JobName: { + Ref: 'JobB9D00F9F', + }, + }), + ); + }); + + test('creates a workflow with custom scheduled trigger', () => { + const customSchedule = TriggerSchedule.cron({ + minute: '0', + hour: '20', + weekDay: 'THU', + }); + + workflow.addCustomScheduledTrigger('CustomScheduledTrigger', { + actions: [{ job }], + schedule: customSchedule, + startOnCreation: true, + }); + + Template.fromStack(stack).resourceCountIs('AWS::Glue::Trigger', 1); + + const workflowReference = new Capture(); + const actionReference = new Capture(); + Template.fromStack(stack).hasResourceProperties('AWS::Glue::Trigger', { + Type: 'SCHEDULED', + WorkflowName: workflowReference, + Schedule: 'cron(0 20 ? * THU *)', + StartOnCreation: true, + Actions: [actionReference], + }); + + expect(workflowReference.asObject()).toEqual( + expect.objectContaining({ + Ref: 'Workflow193EF7C1', + }), + ); + + expect(actionReference.asObject()).toEqual( + expect.objectContaining({ + JobName: { + Ref: 'JobB9D00F9F', + }, + }), + ); + }); + + test('creates a workflow with notify event trigger', () => { + workflow.addNotifyEventTrigger('NotifyEventTrigger', { + actions: [{ job }], + eventBatchingCondition: { + batchSize: 10, + batchWindow: cdk.Duration.minutes(5), + }, + }); + + Template.fromStack(stack).resourceCountIs('AWS::Glue::Trigger', 1); + + const workflowReference = new Capture(); + const actionReference = new Capture(); + const eventBatchingConditionReference = new Capture(); + Template.fromStack(stack).hasResourceProperties('AWS::Glue::Trigger', { + Type: 'EVENT', + WorkflowName: workflowReference, + Actions: [actionReference], + EventBatchingCondition: eventBatchingConditionReference, + }); + + expect(workflowReference.asObject()).toEqual( + expect.objectContaining({ + Ref: 'Workflow193EF7C1', + }), + ); + + expect(actionReference.asObject()).toEqual( + expect.objectContaining({ + JobName: { + Ref: 'JobB9D00F9F', + }, + }), + ); + + expect(eventBatchingConditionReference.asObject()).toEqual( + expect.objectContaining({ + BatchSize: 10, + BatchWindow: 300, + }), + ); + }); +}); + +describe('.fromWorkflowAttributes()', () => { + let stack: cdk.Stack; + + beforeEach(() => { + stack = new cdk.Stack(); + }); + + test('with required attrs only', () => { + const workflowName = 'my-existing-workflow'; + const importedWorkflow = glue.Workflow.fromWorkflowAttributes(stack, 'ImportedWorkflow', { workflowName }); + + expect(importedWorkflow.workflowName).toEqual(workflowName); + expect(importedWorkflow.workflowArn).toEqual(stack.formatArn({ + service: 'glue', + resource: 'workflow', + resourceName: workflowName, + })); + }); +}); \ No newline at end of file