From 81b533c4769d3f8c8fa433767df3268cedcca46a Mon Sep 17 00:00:00 2001 From: Rico Huijbers Date: Tue, 16 Oct 2018 16:36:15 +0200 Subject: [PATCH] feat: add support for Step Functions (#827) Adds a construct library to model state machines in AWS Step Functions. Integrates with AWS Lambda to make it easy to build serverless workflows. --- .../@aws-cdk/aws-lambda/lib/lambda-ref.ts | 16 +- packages/@aws-cdk/aws-lambda/package.json | 1 + packages/@aws-cdk/aws-stepfunctions/README.md | 376 ++++++++- .../aws-stepfunctions/lib/activity.ts | 146 ++++ .../@aws-cdk/aws-stepfunctions/lib/chain.ts | 82 ++ .../aws-stepfunctions/lib/condition.ts | 227 ++++++ .../@aws-cdk/aws-stepfunctions/lib/index.ts | 18 + .../aws-stepfunctions/lib/state-graph.ts | 157 ++++ .../lib/state-machine-fragment.ts | 77 ++ .../aws-stepfunctions/lib/state-machine.ts | 228 ++++++ .../lib/state-transition-metrics.ts | 58 ++ .../aws-stepfunctions/lib/states/choice.ts | 136 ++++ .../aws-stepfunctions/lib/states/fail.ts | 58 ++ .../aws-stepfunctions/lib/states/parallel.ts | 131 ++++ .../aws-stepfunctions/lib/states/pass.ts | 95 +++ .../aws-stepfunctions/lib/states/state.ts | 487 ++++++++++++ .../aws-stepfunctions/lib/states/succeed.ts | 60 ++ .../aws-stepfunctions/lib/states/task.ts | 303 ++++++++ .../aws-stepfunctions/lib/states/wait.ts | 98 +++ .../@aws-cdk/aws-stepfunctions/lib/types.ts | 141 ++++ .../@aws-cdk/aws-stepfunctions/package.json | 6 +- .../test/integ.job-poller.expected.json | 75 ++ .../test/integ.job-poller.ts | 49 ++ .../aws-stepfunctions/test/test.activity.ts | 77 ++ .../aws-stepfunctions/test/test.condition.ts | 12 + .../aws-stepfunctions/test/test.metrics.ts | 44 ++ .../test/test.state-machine-resources.ts | 111 +++ .../test/test.states-language.ts | 733 ++++++++++++++++++ .../test/test.stepfunctions.ts | 8 - .../lib/cloudformation/cloudformation-json.ts | 2 + 30 files changed, 4000 insertions(+), 12 deletions(-) create mode 100644 packages/@aws-cdk/aws-stepfunctions/lib/activity.ts create mode 100644 packages/@aws-cdk/aws-stepfunctions/lib/chain.ts create mode 100644 packages/@aws-cdk/aws-stepfunctions/lib/condition.ts create mode 100644 packages/@aws-cdk/aws-stepfunctions/lib/state-graph.ts create mode 100644 packages/@aws-cdk/aws-stepfunctions/lib/state-machine-fragment.ts create mode 100644 packages/@aws-cdk/aws-stepfunctions/lib/state-machine.ts create mode 100644 packages/@aws-cdk/aws-stepfunctions/lib/state-transition-metrics.ts create mode 100644 packages/@aws-cdk/aws-stepfunctions/lib/states/choice.ts create mode 100644 packages/@aws-cdk/aws-stepfunctions/lib/states/fail.ts create mode 100644 packages/@aws-cdk/aws-stepfunctions/lib/states/parallel.ts create mode 100644 packages/@aws-cdk/aws-stepfunctions/lib/states/pass.ts create mode 100644 packages/@aws-cdk/aws-stepfunctions/lib/states/state.ts create mode 100644 packages/@aws-cdk/aws-stepfunctions/lib/states/succeed.ts create mode 100644 packages/@aws-cdk/aws-stepfunctions/lib/states/task.ts create mode 100644 packages/@aws-cdk/aws-stepfunctions/lib/states/wait.ts create mode 100644 packages/@aws-cdk/aws-stepfunctions/lib/types.ts create mode 100644 packages/@aws-cdk/aws-stepfunctions/test/integ.job-poller.expected.json create mode 100644 packages/@aws-cdk/aws-stepfunctions/test/integ.job-poller.ts create mode 100644 packages/@aws-cdk/aws-stepfunctions/test/test.activity.ts create mode 100644 packages/@aws-cdk/aws-stepfunctions/test/test.condition.ts create mode 100644 packages/@aws-cdk/aws-stepfunctions/test/test.metrics.ts create mode 100644 packages/@aws-cdk/aws-stepfunctions/test/test.state-machine-resources.ts create mode 100644 packages/@aws-cdk/aws-stepfunctions/test/test.states-language.ts delete mode 100644 packages/@aws-cdk/aws-stepfunctions/test/test.stepfunctions.ts diff --git a/packages/@aws-cdk/aws-lambda/lib/lambda-ref.ts b/packages/@aws-cdk/aws-lambda/lib/lambda-ref.ts index 4fcdda6e09d2e..92869fb73aeb3 100644 --- a/packages/@aws-cdk/aws-lambda/lib/lambda-ref.ts +++ b/packages/@aws-cdk/aws-lambda/lib/lambda-ref.ts @@ -5,6 +5,7 @@ import events = require('@aws-cdk/aws-events'); import iam = require('@aws-cdk/aws-iam'); import logs = require('@aws-cdk/aws-logs'); import s3n = require('@aws-cdk/aws-s3-notifications'); +import stepfunctions = require('@aws-cdk/aws-stepfunctions'); import cdk = require('@aws-cdk/cdk'); import { cloudformation } from './lambda.generated'; import { Permission } from './permission'; @@ -39,7 +40,7 @@ export interface FunctionRefProps { export abstract class FunctionRef extends cdk.Construct implements events.IEventRuleTarget, logs.ILogSubscriptionDestination, s3n.IBucketNotificationDestination, - ec2.IConnectable { + ec2.IConnectable, stepfunctions.IStepFunctionsTaskResource { /** * Creates a Lambda function object which represents a function not defined @@ -352,6 +353,19 @@ export abstract class FunctionRef extends cdk.Construct }; } + public asStepFunctionsTaskResource(_callingTask: stepfunctions.Task): stepfunctions.StepFunctionsTaskResourceProps { + return { + resourceArn: this.functionArn, + metricPrefixSingular: 'LambdaFunction', + metricPrefixPlural: 'LambdaFunctions', + metricDimensions: { LambdaFunctionArn: this.functionArn }, + policyStatements: [new iam.PolicyStatement() + .addResource(this.functionArn) + .addActions("lambda:InvokeFunction") + ] + }; + } + private parsePermissionPrincipal(principal?: iam.PolicyPrincipal) { if (!principal) { return undefined; diff --git a/packages/@aws-cdk/aws-lambda/package.json b/packages/@aws-cdk/aws-lambda/package.json index a24be53a3561a..daac9c5ad34b2 100644 --- a/packages/@aws-cdk/aws-lambda/package.json +++ b/packages/@aws-cdk/aws-lambda/package.json @@ -72,6 +72,7 @@ "@aws-cdk/aws-s3": "^0.12.0", "@aws-cdk/aws-s3-notifications": "^0.12.0", "@aws-cdk/aws-sqs": "^0.12.0", + "@aws-cdk/aws-stepfunctions": "^0.12.0", "@aws-cdk/cdk": "^0.12.0", "@aws-cdk/cx-api": "^0.12.0" }, diff --git a/packages/@aws-cdk/aws-stepfunctions/README.md b/packages/@aws-cdk/aws-stepfunctions/README.md index 30aed40788ef4..a0527c883b83c 100644 --- a/packages/@aws-cdk/aws-stepfunctions/README.md +++ b/packages/@aws-cdk/aws-stepfunctions/README.md @@ -1,2 +1,374 @@ -## The CDK Construct Library for AWS Step Functions -This module is part of the [AWS Cloud Development Kit](https://github.com/awslabs/aws-cdk) project. +## AWS Step Functions Construct Library + +The `@aws-cdk/aws-stepfunctions` package contains constructs for building +serverless workflows. Using objects. Defining a workflow looks like this +(for the [Step Functions Job Poller +example](https://docs.aws.amazon.com/step-functions/latest/dg/job-status-poller-sample.html)): + +```ts +const submitLambda = new lambda.Function(this, 'SubmitLambda', { ... }); +const getStatusLambda = new lambda.Function(this, 'CheckLambda', { ... }); + +const submitJob = new stepfunctions.Task(this, 'Submit Job', { + resource: submitLambda, + // Put Lambda's result here in the execution's state object + resultPath: '$.guid', +}); + +const waitX = new stepfunctions.Wait(this, 'Wait X Seconds', { secondsPath: '$.wait_time' }); + +const getStatus = new stepfunctions.Task(this, 'Get Job Status', { + resource: getStatusLambda, + // Pass just the field named "guid" into the Lambda, put the + // Lambda's result in a field called "status" + inputPath: '$.guid', + resultPath: '$.status', +}); + +const jobFailed = new stepfunctions.Fail(this, 'Job Failed', { + cause: 'AWS Batch Job Failed', + error: 'DescribeJob returned FAILED', +}); + +const finalStatus = new stepfunctions.Task(this, 'Get Final Job Status', { + resource: getStatusLambda, + // Use "guid" field as input, output of the Lambda becomes the + // entire state machine output. + inputPath: '$.guid', +}); + +const definition = submitJob + .next(waitX) + .next(getStatus) + .next(new stepfunctions.Choice(this, 'Job Complete?') + // Look at the "status" field + .when(stepfunctions.Condition.stringEquals('$.status', 'FAILED'), jobFailed) + .when(stepfunctions.Condition.stringEquals('$.status', 'SUCCEEDED'), finalStatus) + .otherwise(waitX)); + +new stepfunctions.StateMachine(this, 'StateMachine', { + definition, + timeoutSec: 300 +}); +``` + +## State Machine + +A `stepfunctions.StateMachine` is a resource that takes a state machine +definition. The definition is specified by its start state, and encompasses +all states reachable from the start state: + +```ts +const startState = new stepfunctions.Pass(this, 'StartState'); + +new stepfunctions.StateMachine(this, 'StateMachine', { + definition: startState +}); +``` + +State machines execute using an IAM Role, which will automatically have all +permissions added that are required to make all state machine tasks execute +properly (for example, permissions to invoke any Lambda functions you add to +your workflow). A role will be created by default, but you can supply an +existing one as well. + +## Amazon States Language + +This library comes with a set of classes that model the [Amazon States +Language](https://states-language.net/spec.html). The following State classes +are supported: + +* `Task` +* `Pass` +* `Wait` +* `Choice` +* `Parallel` +* `Succeed` +* `Fail` + +An arbitrary JSON object (specified at execution start) is passed from state to +state and transformed during the execution of the workflow. For more +information, see the States Language spec. + +### Task + +A `Task` represents some work that needs to be done. It takes a `resource` +property that is either a Lambda `Function` or a Step Functions `Activity` +(A Lambda Function runs your task's code on AWS Lambda, whereas an `Activity` +is used to run your task's code on an arbitrary compute fleet you manage). + +```ts +const task = new stepfunctions.Task(this, 'Invoke The Lambda', { + resource: myLambda, + inputPath: '$.input', + timeoutSeconds: 300, +}); + +// Add a retry policy +task.addRetry({ + intervalSeconds: 5, + maxAttempts: 10 +}); + +// Add an error handler +task.addCatch(errorHandlerState); + +// Set the next state +task.next(nextState); +``` + +### Pass + +A `Pass` state does no work, but it can optionally transform the execution's +JSON state. + +```ts +// Makes the current JSON state { ..., "subObject": { "hello": "world" } } +const pass = new stepfunctions.Pass(this, 'Add Hello World', { + result: { hello: "world" }, + resultPath: '$.subObject', +}); + +// Set the next state +pass.next(nextState); +``` + +### Wait + +A `Wait` state waits for a given number of seconds, or until the current time +hits a particular time. The time to wait may be taken from the execution's JSON +state. + +```ts +// Wait until it's the time mentioned in the the state object's "triggerTime" +// field. +const wait = new stepfunctions.Wait(this, 'Wait For Trigger Time', { + timestampPath: '$.triggerTime', +}); + +// Set the next state +wait.next(startTheWork); +``` + +### Choice + +A `Choice` state can take a differen path through the workflow based on the +values in the execution's JSON state: + +```ts +const choice = new stepfunctions.Choice(this, 'Did it work?'); + +// Add conditions with .when() +choice.when(stepfunctions.Condition.stringEqual('$.status', 'SUCCESS'), successState); +choice.when(stepfunctions.Condition.numberGreaterThan('$.attempts', 5), failureState); + +// Use .otherwise() to indicate what should be done if none of the conditions match +choice.otherwise(tryAgainState); +``` + +If you want to temporarily branch your workflow based on a condition, but have +all branches come together and continuing as one (similar to how an `if ... +then ... else` works in a programming language), use the `.afterwards()` method: + +```ts +const choice = new stepfunctions.Choice(this, 'What color is it?'); +choice.when(stepfunctions.Condition.stringEqual('$.color', 'BLUE'), handleBlueItem); +choice.when(stepfunctions.Condition.stringEqual('$.color', 'RED'), handleRedItem); +choice.otherwise(handleOtherItemColor); + +// Use .afterwards() to join all possible paths back together and continue +choice.afterwards().next(shipTheItem); +``` + +If your `Choice` doesn't have an `otherwise()` and none of the conditions match +the JSON state, a `NoChoiceMatched` error will be thrown. Wrap the state machine +in a `Parallel` state if you want to catch and recover from this. + +### Parallel + +A `Parallel` state executes one or more subworkflows in parallel. It can also +be used to catch and recover from errors in subworkflows. + +```ts +const parallel = new stepfunctions.Parallel(this, 'Do the work in parallel'); + +// Add branches to be executed in parallel +parallel.branch(shipItem); +parallel.branch(sendInvoice); +parallel.branch(restock); + +// Retry the whole workflow if something goes wrong +parallel.addRetry({ maxAttempts: 1 }); + +// How to recover from errors +parallel.addCatch(sendFailureNotification); + +// What to do in case everything succeeded +parallel.next(closeOrder); +``` + +### Succeed + +Reaching a `Succeed` state terminates the state machine execution with a +succesful status. + +```ts +const success = new stepfunctions.Succeed(this, 'We did it!'); +``` + +### Fail + +Reaching a `Fail` state terminates the state machine execution with a +failure status. The fail state should report the reason for the failure. +Failures can be caught by encompassing `Parallel` states. + +```ts +const success = new stepfunctions.Fail(this, 'Fail', { + error: 'WorkflowFailure', + cause: "Something went wrong" +}); +``` + +## Task Chaining + +To make defining work flows as convenient (and readable in a top-to-bottom way) +as writing regular programs, it is possible to chain most methods invocations. +In particular, the `.next()` method can be repeated. The result of a series of +`.next()` calls is called a **Chain**, and can be used when defining the jump +targets of `Choice.on` or `Parallel.branch`: + +```ts +const definition = step1 + .next(step2) + .next(choice + .when(condition1, step3.next(step4).next(step5)) + .otherwise(step6) + .afterwards()) + .next(parallel + .branch(step7.next(step8)) + .branch(step9.next(step10))) + .next(finish); + +new stepfunctions.StateMachine(this, 'StateMachine', { + definition, +}); +``` + +If you don't like the visual look of starting a chain directly off the first +step, you can use `Chain.start`: + +```ts +const definition = stepfunctions.Chain + .start(step1) + .next(step2) + .next(step3) + // ... +``` + +## State Machine Fragments + +It is possible to define reusable (or abstracted) mini-state machines by +defining a construct that implements `IChainable`, which requires you to define +two fields: + +* `startState: State`, representing the entry point into this state machine. +* `endStates: INextable[]`, representing the (one or more) states that outgoing + transitions will be added to if you chain onto the fragment. + +Since states will be named after their construct IDs, you may need to prefix the +IDs of states if you plan to instantiate the same state machine fragment +multiples times (otherwise all states in every instantiation would have the same +name). + +The class `StateMachineFragment` contains some helper functions (like +`prefixStates()`) to make it easier for you to do this. If you define your state +machine as a subclass of this, it will be convenient to use: + +```ts +interface MyJobProps { + jobFlavor: string; +} + +class MyJob extends stepfunctions.StateMachineFragment { + public readonly startState: State; + public readonly endStates: INextable[]; + + constructor(parent: cdk.Construct, id: string, props: MyJobProps) { + super(parent, id); + + const first = new stepfunctions.Task(this, 'First', { ... }); + // ... + const last = new stepfunctions.Task(this, 'Last', { ... }); + + this.startState = first; + this.endStates = [last]; + } +} + +// Do 3 different variants of MyJob in parallel +new stepfunctions.Parallel(this, 'All jobs') + .branch(new MyJob(this, 'Quick', { jobFlavor: 'quick' }).prefixStates()) + .branch(new MyJob(this, 'Medium', { jobFlavor: 'medium' }).prefixStates()) + .branch(new MyJob(this, 'Slow', { jobFlavor: 'slow' }).prefixStates()); +``` + +## Activity + +**Activities** represent work that is done on some non-Lambda worker pool. The +Step Functions workflow will submit work to this Activity, and a worker pool +that you run yourself, probably on EC2, will pull jobs from the Activity and +submit the results of individual jobs back. + +You need the ARN to do so, so if you use Activities be sure to pass the Activity +ARN into your worker pool: + +```ts +const activity = new stepfunctions.Activity(this, 'Activity'); + +// Read this Output from your application and use it to poll for work on +// the activity. +new cdk.Output(this, 'ActivityArn', { value: activity.activityArn }); +``` + +## Metrics + +`Task` object expose various metrics on the execution of that particular task. For example, +to create an alarm on a particular task failing: + +```ts +new cloudwatch.Alarm(this, 'TaskAlarm', { + metric: task.metricFailed(), + threshold: 1, + evaluationPeriods: 1, +}); +``` + +There are also metrics on the complete state machine: + +```ts +new cloudwatch.Alarm(this, 'StateMachineAlarm', { + metric: stateMachine.metricFailed(), + threshold: 1, + evaluationPeriods: 1, +}); +``` + +And there are metrics on the capacity of all state machines in your account: + +```ts +new cloudwatch.Alarm(this, 'ThrottledAlarm', { + metric: StateTransitionMetrics.metricThrottledEvents(), + threshold: 10, + evaluationPeriods: 2, +}); +``` + + +## Future work + +Contributions welcome: + +- [ ] A single `LambdaTask` class that is both a `Lambda` and a `Task` in one + might make for a nice API. +- [ ] Expression parser for Conditions. +- [ ] Simulate state machines in unit tests. diff --git a/packages/@aws-cdk/aws-stepfunctions/lib/activity.ts b/packages/@aws-cdk/aws-stepfunctions/lib/activity.ts new file mode 100644 index 0000000000000..a35422ac352b3 --- /dev/null +++ b/packages/@aws-cdk/aws-stepfunctions/lib/activity.ts @@ -0,0 +1,146 @@ +import cloudwatch = require('@aws-cdk/aws-cloudwatch'); +import cdk = require('@aws-cdk/cdk'); +import { IStepFunctionsTaskResource, StepFunctionsTaskResourceProps, Task } from './states/task'; +import { cloudformation } from './stepfunctions.generated'; + +export interface ActivityProps { + /** + * The name for this activity. + * + * @default If not supplied, a name is generated + */ + activityName?: string; +} + +/** + * Define a new StepFunctions activity + */ +export class Activity extends cdk.Construct implements IStepFunctionsTaskResource { + public readonly activityArn: string; + public readonly activityName: string; + + constructor(parent: cdk.Construct, id: string, props: ActivityProps = {}) { + super(parent, id); + + const resource = new cloudformation.ActivityResource(this, 'Resource', { + activityName: props.activityName || this.generateName() + }); + + this.activityArn = resource.activityArn; + this.activityName = resource.activityName; + } + + public asStepFunctionsTaskResource(_callingTask: Task): StepFunctionsTaskResourceProps { + // No IAM permissions necessary, execution role implicitly has Activity permissions. + return { + resourceArn: this.activityArn, + metricPrefixSingular: 'Activity', + metricPrefixPlural: 'Activities', + metricDimensions: { ActivityArn: this.activityArn }, + }; + } + + /** + * Return the given named metric for this Activity + * + * @default sum over 5 minutes + */ + public metric(metricName: string, props?: cloudwatch.MetricCustomization): cloudwatch.Metric { + return new cloudwatch.Metric({ + namespace: 'AWS/States', + metricName, + dimensions: { ActivityArn: this.activityArn }, + statistic: 'sum', + ...props + }); + } + + /** + * The interval, in milliseconds, between the time the activity starts and the time it closes. + * + * @default average over 5 minutes + */ + public metricRunTime(props?: cloudwatch.MetricCustomization): cloudwatch.Metric { + return this.metric('ActivityRunTime', { statistic: 'avg', ...props }); + } + + /** + * The interval, in milliseconds, for which the activity stays in the schedule state. + * + * @default average over 5 minutes + */ + public metricScheduleTime(props?: cloudwatch.MetricCustomization): cloudwatch.Metric { + return this.metric('ActivityScheduleTime', { statistic: 'avg', ...props }); + } + + /** + * The interval, in milliseconds, between the time the activity is scheduled and the time it closes. + * + * @default average over 5 minutes + */ + public metricTime(props?: cloudwatch.MetricCustomization): cloudwatch.Metric { + return this.metric('ActivityTime', { statistic: 'avg', ...props }); + } + + /** + * Metric for the number of times this activity is scheduled + * + * @default sum over 5 minutes + */ + public metricScheduled(props?: cloudwatch.MetricCustomization): cloudwatch.Metric { + return this.metric('ActivitiesScheduled', props); + } + + /** + * Metric for the number of times this activity times out + * + * @default sum over 5 minutes + */ + public metricTimedOut(props?: cloudwatch.MetricCustomization): cloudwatch.Metric { + return this.metric('ActivitiesTimedOut', props); + } + + /** + * Metric for the number of times this activity is started + * + * @default sum over 5 minutes + */ + public metricStarted(props?: cloudwatch.MetricCustomization): cloudwatch.Metric { + return this.metric('ActivitiesStarted', props); + } + + /** + * Metric for the number of times this activity succeeds + * + * @default sum over 5 minutes + */ + public metricSucceeded(props?: cloudwatch.MetricCustomization): cloudwatch.Metric { + return this.metric('ActivitiesSucceeded', props); + } + + /** + * Metric for the number of times this activity fails + * + * @default sum over 5 minutes + */ + public metricFailed(props?: cloudwatch.MetricCustomization): cloudwatch.Metric { + return this.metric('ActivitiesFailed', props); + } + + /** + * Metric for the number of times the heartbeat times out for this activity + * + * @default sum over 5 minutes + */ + public metricHeartbeatTimedOut(props?: cloudwatch.MetricCustomization): cloudwatch.Metric { + return this.metric('ActivitiesHeartbeatTimedOut', props); + } + + private generateName(): string { + const name = this.uniqueId; + if (name.length > 80) { + return name.substring(0, 40) + name.substring(name.length - 40); + } + return name; + } +} diff --git a/packages/@aws-cdk/aws-stepfunctions/lib/chain.ts b/packages/@aws-cdk/aws-stepfunctions/lib/chain.ts new file mode 100644 index 0000000000000..f5db908bab42c --- /dev/null +++ b/packages/@aws-cdk/aws-stepfunctions/lib/chain.ts @@ -0,0 +1,82 @@ +import { Parallel, ParallelProps } from "./states/parallel"; +import { State } from "./states/state"; +import { IChainable, INextable } from "./types"; + +/** + * A collection of states to chain onto + * + * A Chain has a start and zero or more chainable ends. If there are + * zero ends, calling next() on the Chain will fail. + */ +export class Chain implements IChainable { + /** + * Begin a new Chain from one chainable + */ + public static start(state: IChainable) { + return new Chain(state.startState, state.endStates, state); + } + + /** + * Make a Chain with the start from one chain and the ends from another + */ + public static sequence(start: IChainable, next: IChainable) { + return new Chain(start.startState, next.endStates, next); + } + + /** + * Make a Chain with specific start and end states, and a last-added Chainable + */ + public static custom(startState: State, endStates: INextable[], lastAdded: IChainable) { + return new Chain(startState, endStates, lastAdded); + } + + /** + * Identify this Chain + */ + public readonly id: string; + + /** + * The start state of this chain + */ + public readonly startState: State; + + /** + * The chainable end state(s) of this chain + */ + public readonly endStates: INextable[]; + + private constructor(startState: State, endStates: INextable[], private readonly lastAdded: IChainable) { + this.id = lastAdded.id; + this.startState = startState; + this.endStates = endStates; + } + + /** + * Continue normal execution with the given state + */ + public next(next: IChainable): Chain { + if (this.endStates.length === 0) { + throw new Error(`Cannot add to chain: last state in chain (${this.lastAdded.id}) does not allow it`); + } + + for (const endState of this.endStates) { + endState.next(next); + } + + return new Chain(this.startState, next.endStates, next); + } + + /** + * Return a single state that encompasses all states in the chain + * + * This can be used to add error handling to a sequence of states. + * + * Be aware that this changes the result of the inner state machine + * to be an array with the result of the state machine in it. Adjust + * your paths accordingly. For example, change 'outputPath' to + * '$[0]'. + */ + public toSingleState(id: string, props: ParallelProps = {}): Parallel { + return new Parallel(this.startState, id, props).branch(this); + } +} \ No newline at end of file diff --git a/packages/@aws-cdk/aws-stepfunctions/lib/condition.ts b/packages/@aws-cdk/aws-stepfunctions/lib/condition.ts new file mode 100644 index 0000000000000..d7b1d80dad20a --- /dev/null +++ b/packages/@aws-cdk/aws-stepfunctions/lib/condition.ts @@ -0,0 +1,227 @@ +/** + * A Condition for use in a Choice state branch + */ +export abstract class Condition { + /** + * Matches if a boolean field has the given value + */ + public static booleanEquals(variable: string, value: boolean): Condition { + return new VariableComparison(variable, ComparisonOperator.BooleanEquals, value); + } + + /** + * Matches if a string field has the given value + */ + public static stringEquals(variable: string, value: string): Condition { + return new VariableComparison(variable, ComparisonOperator.StringEquals, value); + } + + /** + * Matches if a string field sorts before a given value + */ + public static stringLessThan(variable: string, value: string): Condition { + return new VariableComparison(variable, ComparisonOperator.StringLessThan, value); + } + + /** + * Matches if a string field sorts equal to or before a given value + */ + public static stringLessThanEquals(variable: string, value: string): Condition { + return new VariableComparison(variable, ComparisonOperator.StringLessThanEquals, value); + } + + /** + * Matches if a string field sorts after a given value + */ + public static stringGreaterThan(variable: string, value: string): Condition { + return new VariableComparison(variable, ComparisonOperator.StringGreaterThan, value); + } + + /** + * Matches if a string field sorts after or equal to a given value + */ + public static stringGreaterThanEquals(variable: string, value: string): Condition { + return new VariableComparison(variable, ComparisonOperator.StringGreaterThanEquals, value); + } + + /** + * Matches if a numeric field has the given value + */ + public static numberEquals(variable: string, value: number): Condition { + return new VariableComparison(variable, ComparisonOperator.NumericEquals, value); + } + + /** + * Matches if a numeric field is less than the given value + */ + public static numberLessThan(variable: string, value: number): Condition { + return new VariableComparison(variable, ComparisonOperator.NumericLessThan, value); + } + + /** + * Matches if a numeric field is less than or equal to the given value + */ + public static numberLessThanEquals(variable: string, value: number): Condition { + return new VariableComparison(variable, ComparisonOperator.NumericLessThanEquals, value); + } + + /** + * Matches if a numeric field is greater than the given value + */ + public static numberGreaterThan(variable: string, value: number): Condition { + return new VariableComparison(variable, ComparisonOperator.NumericGreaterThan, value); + } + + /** + * Matches if a numeric field is greater than or equal to the given value + */ + public static numberGreaterThanEquals(variable: string, value: number): Condition { + return new VariableComparison(variable, ComparisonOperator.NumericGreaterThanEquals, value); + } + + /** + * Matches if a timestamp field is the same time as the given timestamp + */ + public static timestampEquals(variable: string, value: string): Condition { + return new VariableComparison(variable, ComparisonOperator.TimestampEquals, value); + } + + /** + * Matches if a timestamp field is before the given timestamp + */ + public static timestampLessThan(variable: string, value: string): Condition { + return new VariableComparison(variable, ComparisonOperator.TimestampLessThan, value); + } + + /** + * Matches if a timestamp field is before or equal to the given timestamp + */ + public static timestampLessThanEquals(variable: string, value: string): Condition { + return new VariableComparison(variable, ComparisonOperator.TimestampLessThanEquals, value); + } + + /** + * Matches if a timestamp field is after the given timestamp + */ + public static timestampGreaterThan(variable: string, value: string): Condition { + return new VariableComparison(variable, ComparisonOperator.TimestampGreaterThan, value); + } + + /** + * Matches if a timestamp field is after or equal to the given timestamp + */ + public static timestampGreaterThanEquals(variable: string, value: string): Condition { + return new VariableComparison(variable, ComparisonOperator.TimestampGreaterThanEquals, value); + } + + /** + * Combine two or more conditions with a logical AND + */ + public static and(...conditions: Condition[]): Condition { + return new CompoundCondition(CompoundOperator.And, ...conditions); + } + + /** + * Combine two or more conditions with a logical OR + */ + public static or(...conditions: Condition[]): Condition { + return new CompoundCondition(CompoundOperator.Or, ...conditions); + } + + /** + * Negate a condition + */ + public static not(condition: Condition): Condition { + return new NotCondition(condition); + } + + /** + * Render Amazon States Language JSON for the condition + */ + public abstract renderCondition(): any; +} + +/** + * Comparison Operator types + */ +enum ComparisonOperator { + StringEquals, + StringLessThan, + StringGreaterThan, + StringLessThanEquals, + StringGreaterThanEquals, + NumericEquals, + NumericLessThan, + NumericGreaterThan, + NumericLessThanEquals, + NumericGreaterThanEquals, + BooleanEquals, + TimestampEquals, + TimestampLessThan, + TimestampGreaterThan, + TimestampLessThanEquals, + TimestampGreaterThanEquals, +} + +/** + * Compound Operator types + */ +enum CompoundOperator { + And, + Or, +} + +/** + * Scalar comparison + */ +class VariableComparison extends Condition { + constructor(private readonly variable: string, private readonly comparisonOperator: ComparisonOperator, private readonly value: any) { + super(); + if (!variable.startsWith('$.')) { + throw new Error(`Variable reference must start with '$.', got '${variable}'`); + } + } + + public renderCondition(): any { + return { + Variable: this.variable, + [ComparisonOperator[this.comparisonOperator]]: this.value + }; + } +} + +/** + * Logical compound condition + */ +class CompoundCondition extends Condition { + private readonly conditions: Condition[]; + + constructor(private readonly operator: CompoundOperator, ...conditions: Condition[]) { + super(); + this.conditions = conditions; + if (conditions.length === 0) { + throw new Error('Must supply at least one inner condition for a logical combination'); + } + } + + public renderCondition(): any { + return { + [CompoundOperator[this.operator]]: this.conditions.map(c => c.renderCondition()) + }; + } +} + +/** + * Logical unary condition + */ +class NotCondition extends Condition { + constructor(private readonly comparisonOperation: Condition) { + super(); + } + + public renderCondition(): any { + return { + Not: this.comparisonOperation + }; + } +} \ No newline at end of file diff --git a/packages/@aws-cdk/aws-stepfunctions/lib/index.ts b/packages/@aws-cdk/aws-stepfunctions/lib/index.ts index fd4033059ad29..8cf9837545852 100644 --- a/packages/@aws-cdk/aws-stepfunctions/lib/index.ts +++ b/packages/@aws-cdk/aws-stepfunctions/lib/index.ts @@ -1,2 +1,20 @@ +export * from './activity'; +export * from './types'; +export * from './condition'; +export * from './state-machine'; +export * from './state-machine-fragment'; +export * from './state-transition-metrics'; +export * from './chain'; +export * from './state-graph'; + +export * from './states/choice'; +export * from './states/fail'; +export * from './states/parallel'; +export * from './states/pass'; +export * from './states/state'; +export * from './states/succeed'; +export * from './states/task'; +export * from './states/wait'; + // AWS::StepFunctions CloudFormation Resources: export * from './stepfunctions.generated'; diff --git a/packages/@aws-cdk/aws-stepfunctions/lib/state-graph.ts b/packages/@aws-cdk/aws-stepfunctions/lib/state-graph.ts new file mode 100644 index 0000000000000..50d368493ce63 --- /dev/null +++ b/packages/@aws-cdk/aws-stepfunctions/lib/state-graph.ts @@ -0,0 +1,157 @@ +import iam = require('@aws-cdk/aws-iam'); +import { State } from "./states/state"; + +/** + * A collection of connected states + * + * A StateGraph is used to keep track of all states that are connected (have + * transitions between them). It does not include the substatemachines in + * a Parallel's branches: those are their own StateGraphs, but the graphs + * themselves have a hierarchical relationship as well. + * + * By assigning states to a definintive StateGraph, we verify that no state + * machines are constructed. In particular: + * + * - Every state object can only ever be in 1 StateGraph, and not inadvertently + * be used in two graphs. + * - Every stateId must be unique across all states in the entire state + * machine. + * + * All policy statements in all states in all substatemachines are bubbled so + * that the top-level StateMachine instantiation can read them all and add + * them to the IAM Role. + * + * You do not need to instantiate this class; it is used internally. + */ +export class StateGraph { + /** + * Set a timeout to render into the graph JSON. + * + * Read/write. Only makes sense on the top-level graph, subgraphs + * do not support this feature. + * + * @default No timeout + */ + public timeoutSeconds?: number; + + /** + * The accumulated policy statements + */ + public readonly policyStatements = new Array(); + + /** + * All states in this graph + */ + private readonly allStates = new Set(); + + /** + * A mapping of stateId -> Graph for all states in this graph and subgraphs + */ + private readonly allContainedStates = new Map(); + + /** + * Containing graph of this graph + */ + private superGraph?: StateGraph; + + constructor(public readonly startState: State, private readonly graphDescription: string) { + this.allStates.add(startState); + startState.bindToGraph(this); + } + + /** + * Register a state as part of this graph + * + * Called by State.bindToGraph(). + */ + public registerState(state: State) { + this.registerContainedState(state.stateId, this); + this.allStates.add(state); + } + + /** + * Register a Policy Statement used by states in this graph + */ + public registerPolicyStatement(statement: iam.PolicyStatement) { + if (this.superGraph) { + this.superGraph.registerPolicyStatement(statement); + } else { + this.policyStatements.push(statement); + } + } + + /** + * Register this graph as a child of the given graph + * + * Resource changes will be bubbled up to the given graph. + */ + public registerSuperGraph(graph: StateGraph) { + if (this.superGraph === graph) { return; } + if (this.superGraph) { + throw new Error('Every StateGraph can only be registered into one other StateGraph'); + } + this.superGraph = graph; + this.pushContainedStatesUp(graph); + this.pushPolicyStatementsUp(graph); + } + + /** + * Return the Amazon States Language JSON for this graph + */ + public toGraphJson(): object { + const states: any = {}; + for (const state of this.allStates) { + states[state.stateId] = state.toStateJson(); + } + + return { + StartAt: this.startState.stateId, + States: states, + TimeoutSeconds: this.timeoutSeconds + }; + } + + /** + * Return a string description of this graph + */ + public toString() { + const someNodes = Array.from(this.allStates).slice(0, 3).map(x => x.stateId); + if (this.allStates.size > 3) { someNodes.push('...'); } + return `${this.graphDescription} (${someNodes.join(', ')})`; + } + + /** + * Register a stateId and graph where it was registered + */ + private registerContainedState(stateId: string, graph: StateGraph) { + if (this.superGraph) { + this.superGraph.registerContainedState(stateId, graph); + } else { + const existingGraph = this.allContainedStates.get(stateId); + if (existingGraph) { + throw new Error(`State with name '${stateId}' occurs in both ${graph} and ${existingGraph}. All states must have unique names.`); + } + + this.allContainedStates.set(stateId, graph); + } + } + + /** + * Push all contained state info up to the given super graph + */ + private pushContainedStatesUp(superGraph: StateGraph) { + for (const [stateId, graph] of this.allContainedStates) { + superGraph.registerContainedState(stateId, graph); + } + } + + /** + * Push all policy statements to into the given super graph + */ + private pushPolicyStatementsUp(superGraph: StateGraph) { + for (const policyStatement of this.policyStatements) { + superGraph.registerPolicyStatement(policyStatement); + } + } + +} \ No newline at end of file diff --git a/packages/@aws-cdk/aws-stepfunctions/lib/state-machine-fragment.ts b/packages/@aws-cdk/aws-stepfunctions/lib/state-machine-fragment.ts new file mode 100644 index 0000000000000..56177dcbbb57c --- /dev/null +++ b/packages/@aws-cdk/aws-stepfunctions/lib/state-machine-fragment.ts @@ -0,0 +1,77 @@ +import cdk = require('@aws-cdk/cdk'); +import { Chain } from './chain'; +import { Parallel, ParallelProps } from './states/parallel'; +import { State } from './states/state'; +import { IChainable, INextable } from "./types"; + +/** + * Base class for reusable state machine fragments + */ +export abstract class StateMachineFragment extends cdk.Construct implements IChainable { + /** + * The start state of this state machine fragment + */ + public abstract readonly startState: State; + + /** + * The states to chain onto if this fragment is used + */ + public abstract readonly endStates: INextable[]; + + /** + * Prefix the IDs of all states in this state machine fragment + * + * Use this to avoid multiple copies of the state machine all having the + * same state IDs. + * + * @param prefix The prefix to add. Will use construct ID by default. + */ + public prefixStates(prefix?: string): StateMachineFragment { + State.prefixStates(this, prefix || `${this.id}: `); + return this; + } + + /** + * Wrap all states in this state machine fragment up into a single state. + * + * This can be used to add retry or error handling onto this state + * machine fragment. + * + * Be aware that this changes the result of the inner state machine + * to be an array with the result of the state machine in it. Adjust + * your paths accordingly. For example, change 'outputPath' to + * '$[0]'. + */ + public toSingleState(options: SingleStateOptions = {}): Parallel { + const stateId = options.stateId || this.id; + this.prefixStates(options.prefixStates || `${stateId}: `); + + return new Parallel(this, stateId, options).branch(this); + } + + /** + * Continue normal execution with the given state + */ + public next(next: IChainable) { + return Chain.start(this).next(next); + } +} + +/** + * Options for creating a single state + */ +export interface SingleStateOptions extends ParallelProps { + /** + * ID of newly created containing state + * + * @default Construct ID of the StateMachineFragment + */ + stateId?: string; + + /** + * String to prefix all stateIds in the state machine with + * + * @default stateId + */ + prefixStates?: string; +} \ No newline at end of file diff --git a/packages/@aws-cdk/aws-stepfunctions/lib/state-machine.ts b/packages/@aws-cdk/aws-stepfunctions/lib/state-machine.ts new file mode 100644 index 0000000000000..9e7b552126e52 --- /dev/null +++ b/packages/@aws-cdk/aws-stepfunctions/lib/state-machine.ts @@ -0,0 +1,228 @@ +import cloudwatch = require('@aws-cdk/aws-cloudwatch'); +import events = require('@aws-cdk/aws-events'); +import iam = require('@aws-cdk/aws-iam'); +import cdk = require('@aws-cdk/cdk'); +import { StateGraph } from './state-graph'; +import { cloudformation } from './stepfunctions.generated'; +import { IChainable } from './types'; + +/** + * Properties for defining a State Machine + */ +export interface StateMachineProps { + /** + * A name for the state machine + * + * @default A name is automatically generated + */ + stateMachineName?: string; + + /** + * Definition for this state machine + */ + definition: IChainable; + + /** + * The execution role for the state machine service + * + * @default A role is automatically created + */ + role?: iam.Role; + + /** + * Maximum run time for this state machine + * + * @default No timeout + */ + timeoutSec?: number; +} + +/** + * Define a StepFunctions State Machine + */ +export class StateMachine extends cdk.Construct implements IStateMachine { + /** + * Import a state machine + */ + public static import(parent: cdk.Construct, id: string, props: ImportedStateMachineProps): IStateMachine { + return new ImportedStateMachine(parent, id, props); + } + + /** + * Execution role of this state machine + */ + public readonly role: iam.Role; + + /** + * The name of the state machine + */ + public readonly stateMachineName: string; + + /** + * The ARN of the state machine + */ + public readonly stateMachineArn: string; + + /** + * A role used by CloudWatch events to start the State Machine + */ + private eventsRole?: iam.Role; + + constructor(parent: cdk.Construct, id: string, props: StateMachineProps) { + super(parent, id); + + this.role = props.role || new iam.Role(this, 'Role', { + assumedBy: new iam.ServicePrincipal(`states.${new cdk.AwsRegion()}.amazonaws.com`), + }); + + const graph = new StateGraph(props.definition.startState, `State Machine ${id} definition`); + graph.timeoutSeconds = props.timeoutSec; + + const resource = new cloudformation.StateMachineResource(this, 'Resource', { + stateMachineName: props.stateMachineName, + roleArn: this.role.roleArn, + definitionString: cdk.CloudFormationJSON.stringify(graph.toGraphJson()), + }); + + for (const statement of graph.policyStatements) { + this.addToRolePolicy(statement); + } + + this.stateMachineName = resource.stateMachineName; + this.stateMachineArn = resource.stateMachineArn; + } + + /** + * Add the given statement to the role's policy + */ + public addToRolePolicy(statement: iam.PolicyStatement) { + this.role.addToPolicy(statement); + } + + /** + * Allows using state machines as event rule targets. + */ + public asEventRuleTarget(_ruleArn: string, _ruleId: string): events.EventRuleTargetProps { + if (!this.eventsRole) { + this.eventsRole = new iam.Role(this, 'EventsRole', { + assumedBy: new iam.ServicePrincipal('events.amazonaws.com') + }); + + this.eventsRole.addToPolicy(new iam.PolicyStatement() + .addAction('states:StartExecution') + .addResource(this.stateMachineArn)); + } + + return { + id: this.id, + arn: this.stateMachineArn, + roleArn: this.eventsRole.roleArn, + }; + } + + /** + * Return the given named metric for this State Machine's executions + * + * @default sum over 5 minutes + */ + public metric(metricName: string, props?: cloudwatch.MetricCustomization): cloudwatch.Metric { + return new cloudwatch.Metric({ + namespace: 'AWS/States', + metricName, + dimensions: { StateMachineArn: this.stateMachineArn }, + statistic: 'sum', + ...props + }); + } + + /** + * Metric for the number of executions that failed + * + * @default sum over 5 minutes + */ + public metricFailed(props?: cloudwatch.MetricCustomization): cloudwatch.Metric { + return this.metric('ExecutionsFailed', props); + } + + /** + * Metric for the number of executions that were throttled + * + * @default sum over 5 minutes + */ + public metricThrottled(props?: cloudwatch.MetricCustomization): cloudwatch.Metric { + return this.metric('ExecutionThrottled', props); + } + + /** + * Metric for the number of executions that were aborted + * + * @default sum over 5 minutes + */ + public metricAborted(props?: cloudwatch.MetricCustomization): cloudwatch.Metric { + return this.metric('ExecutionsAborted', props); + } + + /** + * Metric for the number of executions that succeeded + * + * @default sum over 5 minutes + */ + public metricSucceeded(props?: cloudwatch.MetricCustomization): cloudwatch.Metric { + return this.metric('ExecutionsSucceeded', props); + } + + /** + * Metric for the number of executions that succeeded + * + * @default sum over 5 minutes + */ + public metricTimedOut(props?: cloudwatch.MetricCustomization): cloudwatch.Metric { + return this.metric('ExecutionsTimedOut', props); + } + + /** + * Metric for the number of executions that were started + * + * @default sum over 5 minutes + */ + public metricStarted(props?: cloudwatch.MetricCustomization): cloudwatch.Metric { + return this.metric('ExecutionsStarted', props); + } + + /** + * Export this state machine + */ + public export(): ImportedStateMachineProps { + return { + stateMachineArn: new cdk.Output(this, 'StateMachineArn', { value: this.stateMachineArn }).makeImportValue().toString(), + }; + } +} + +/** + * A State Machine + */ +export interface IStateMachine { + /** + * The ARN of the state machine + */ + readonly stateMachineArn: string; +} + +/** + * Properties for an imported state machine + */ +export interface ImportedStateMachineProps { + /** + * The ARN of the state machine + */ + stateMachineArn: string; +} + +class ImportedStateMachine extends cdk.Construct implements IStateMachine { + public readonly stateMachineArn: string; + constructor(parent: cdk.Construct, id: string, props: ImportedStateMachineProps) { + super(parent, id); + this.stateMachineArn = props.stateMachineArn; + } +} \ No newline at end of file diff --git a/packages/@aws-cdk/aws-stepfunctions/lib/state-transition-metrics.ts b/packages/@aws-cdk/aws-stepfunctions/lib/state-transition-metrics.ts new file mode 100644 index 0000000000000..d29817a305c89 --- /dev/null +++ b/packages/@aws-cdk/aws-stepfunctions/lib/state-transition-metrics.ts @@ -0,0 +1,58 @@ +import cloudwatch = require('@aws-cdk/aws-cloudwatch'); + +/** + * Metrics on the rate limiting performed on state machine execution. + * + * These rate limits are shared across all state machines. + */ +export class StateTransitionMetric { + /** + * Return the given named metric for the service's state transition metrics + * + * @default average over 5 minutes + */ + public static metric(metricName: string, props?: cloudwatch.MetricCustomization): cloudwatch.Metric { + return new cloudwatch.Metric({ + namespace: 'AWS/States', + metricName, + dimensions: { ServiceMetric: 'StateTransition' }, + ...props + }); + } + + /** + * Metric for the number of available state transitions. + * + * @default average over 5 minutes + */ + public static metricProvisionedBucketSize(props?: cloudwatch.MetricCustomization): cloudwatch.Metric { + return StateTransitionMetric.metric("ProvisionedBucketSize", props); + } + + /** + * Metric for the provisioned steady-state execution rate + * + * @default average over 5 minutes + */ + public static metricProvisionedRefillRate(props?: cloudwatch.MetricCustomization): cloudwatch.Metric { + return StateTransitionMetric.metric("ProvisionedRefillRate", props); + } + + /** + * Metric for the number of available state transitions per second + * + * @default average over 5 minutes + */ + public static metricConsumedCapacity(props?: cloudwatch.MetricCustomization): cloudwatch.Metric { + return StateTransitionMetric.metric("ConsumedCapacity", props); + } + + /** + * Metric for the number of throttled state transitions + * + * @default sum over 5 minutes + */ + public static metricThrottledEvents(props?: cloudwatch.MetricCustomization): cloudwatch.Metric { + return StateTransitionMetric.metric("ThrottledEvents", { statistic: 'sum', ...props }); + } +} \ No newline at end of file diff --git a/packages/@aws-cdk/aws-stepfunctions/lib/states/choice.ts b/packages/@aws-cdk/aws-stepfunctions/lib/states/choice.ts new file mode 100644 index 0000000000000..54cbf54cb4919 --- /dev/null +++ b/packages/@aws-cdk/aws-stepfunctions/lib/states/choice.ts @@ -0,0 +1,136 @@ +import cdk = require('@aws-cdk/cdk'); +import { Chain } from '../chain'; +import { Condition } from '../condition'; +import { IChainable, INextable } from '../types'; +import { State, StateType } from './state'; + +/** + * Properties for defining a Choice state + */ +export interface ChoiceProps { + /** + * An optional description for this state + * + * @default No comment + */ + comment?: string; + + /** + * JSONPath expression to select part of the state to be the input to this state. + * + * May also be the special value DISCARD, which will cause the effective + * input to be the empty object {}. + * + * @default $ + */ + inputPath?: string; + + /** + * JSONPath expression to select part of the state to be the output to this state. + * + * May also be the special value DISCARD, which will cause the effective + * output to be the empty object {}. + * + * @default $ + */ + outputPath?: string; +} + +/** + * Define a Choice in the state machine + * + * A choice state can be used to make decisions based on the execution + * state. + */ +export class Choice extends State { + public readonly endStates: INextable[] = []; + + constructor(parent: cdk.Construct, id: string, props: ChoiceProps = {}) { + super(parent, id, props); + } + + /** + * If the given condition matches, continue execution with the given state + */ + public when(condition: Condition, next: IChainable): Choice { + super.addChoice(condition, next.startState); + return this; + } + + /** + * If none of the given conditions match, continue execution with the given state + * + * If no conditions match and no otherwise() has been given, an execution + * error will be raised. + */ + public otherwise(def: IChainable): Choice { + super.makeDefault(def.startState); + return this; + } + + /** + * Return a Chain that contains all reachable end states from this Choice + * + * Use this to combine all possible choice paths back. + */ + public afterwards(options: AfterwardsOptions = {}): Chain { + const endStates = State.filterNextables(State.findReachableEndStates(this, { includeErrorHandlers: options.includeErrorHandlers })); + if (options.includeOtherwise && this.defaultChoice) { + throw new Error(`'includeOtherwise' set but Choice state ${this.stateId} already has an 'otherwise' transition`); + } + if (options.includeOtherwise) { + endStates.push(new DefaultAsNext(this)); + } + return Chain.custom(this, endStates, this); + } + + /** + * Return the Amazon States Language object for this state + */ + public toStateJson(): object { + return { + Type: StateType.Choice, + Comment: this.comment, + ...this.renderInputOutput(), + ...this.renderChoices(), + }; + } +} + +/** + * Options for selecting the choice paths + */ +export interface AfterwardsOptions { + /** + * Whether to include error handling states + * + * If this is true, all states which are error handlers (added through 'onError') + * and states reachable via error handlers will be included as well. + * + * @default false + */ + includeErrorHandlers?: boolean; + + /** + * Whether to include the default/otherwise transition for the current Choice state + * + * If this is true and the current Choice does not have a default outgoing + * transition, one will be added included when .next() is called on the chain. + * + * @default false + */ + includeOtherwise?: boolean; +} + +/** + * Adapter to make the .otherwise() transition settable through .next() + */ +class DefaultAsNext implements INextable { + constructor(private readonly choice: Choice) { + } + + public next(state: IChainable): Chain { + this.choice.otherwise(state); + return Chain.sequence(this.choice, state); + } +} \ No newline at end of file diff --git a/packages/@aws-cdk/aws-stepfunctions/lib/states/fail.ts b/packages/@aws-cdk/aws-stepfunctions/lib/states/fail.ts new file mode 100644 index 0000000000000..4dd5cbd5fd261 --- /dev/null +++ b/packages/@aws-cdk/aws-stepfunctions/lib/states/fail.ts @@ -0,0 +1,58 @@ +import cdk = require('@aws-cdk/cdk'); +import { INextable } from '../types'; +import { State, StateType } from './state'; + +/** + * Properties for defining a Fail state + */ +export interface FailProps { + /** + * An optional description for this state + * + * @default No comment + */ + comment?: string; + + /** + * Error code used to represent this failure + */ + error: string; + + /** + * A description for the cause of the failure + * + * @default No description + */ + cause?: string; +} + +/** + * Define a Fail state in the state machine + * + * Reaching a Fail state terminates the state execution in failure. + */ +export class Fail extends State { + public readonly endStates: INextable[] = []; + + private readonly error: string; + private readonly cause?: string; + + constructor(parent: cdk.Construct, id: string, props: FailProps) { + super(parent, id, props); + + this.error = props.error; + this.cause = props.cause; + } + + /** + * Return the Amazon States Language object for this state + */ + public toStateJson(): object { + return { + Type: StateType.Fail, + Comment: this.comment, + Error: this.error, + Cause: this.cause, + }; + } +} \ No newline at end of file diff --git a/packages/@aws-cdk/aws-stepfunctions/lib/states/parallel.ts b/packages/@aws-cdk/aws-stepfunctions/lib/states/parallel.ts new file mode 100644 index 0000000000000..5891fa0033b32 --- /dev/null +++ b/packages/@aws-cdk/aws-stepfunctions/lib/states/parallel.ts @@ -0,0 +1,131 @@ +import cdk = require('@aws-cdk/cdk'); +import { Chain } from '../chain'; +import { StateGraph } from '../state-graph'; +import { CatchProps, IChainable, INextable, RetryProps } from '../types'; +import { renderJsonPath, State, StateType } from './state'; + +/** + * Properties for defining a Parallel state + */ +export interface ParallelProps { + /** + * An optional description for this state + * + * @default No comment + */ + comment?: string; + + /** + * JSONPath expression to select part of the state to be the input to this state. + * + * May also be the special value DISCARD, which will cause the effective + * input to be the empty object {}. + * + * @default $ + */ + inputPath?: string; + + /** + * JSONPath expression to select part of the state to be the output to this state. + * + * May also be the special value DISCARD, which will cause the effective + * output to be the empty object {}. + * + * @default $ + */ + outputPath?: string; + + /** + * JSONPath expression to indicate where to inject the state's output + * + * May also be the special value DISCARD, which will cause the state's + * input to become its output. + * + * @default $ + */ + resultPath?: string; +} + +/** + * Define a Parallel state in the state machine + * + * A Parallel state can be used to run one or more state machines at the same + * time. + * + * The Result of a Parallel state is an array of the results of its substatemachines. + */ +export class Parallel extends State implements INextable { + public readonly endStates: INextable[]; + + constructor(parent: cdk.Construct, id: string, props: ParallelProps = {}) { + super(parent, id, props); + + this.endStates = [this]; + } + + /** + * Add retry configuration for this state + * + * This controls if and how the execution will be retried if a particular + * error occurs. + */ + public addRetry(props: RetryProps = {}): Parallel { + super._addRetry(props); + return this; + } + + /** + * Add a recovery handler for this state + * + * When a particular error occurs, execution will continue at the error + * handler instead of failing the state machine execution. + */ + public addCatch(handler: IChainable, props: CatchProps = {}): Parallel { + super._addCatch(handler.startState, props); + return this; + } + + /** + * Continue normal execution with the given state + */ + public next(next: IChainable): Chain { + super.makeNext(next.startState); + return Chain.sequence(this, next); + } + + /** + * Define one or more branches to run in parallel + */ + public branch(...branches: IChainable[]): Parallel { + for (const branch of branches) { + const name = `Parallel '${this.stateId}' branch ${this.branches.length + 1}`; + super.addBranch(new StateGraph(branch.startState, name)); + } + return this; + } + + /** + * Validate this state + */ + public validate(): string[] { + if (this.branches.length === 0) { + return ['Parallel must have at least one branch']; + } + return []; + } + + /** + * Return the Amazon States Language object for this state + */ + public toStateJson(): object { + return { + Type: StateType.Parallel, + Comment: this.comment, + ResultPath: renderJsonPath(this.resultPath), + ...this.renderNextEnd(), + ...this.renderInputOutput(), + ...this.renderRetryCatch(), + ...this.renderBranches(), + }; + } +} diff --git a/packages/@aws-cdk/aws-stepfunctions/lib/states/pass.ts b/packages/@aws-cdk/aws-stepfunctions/lib/states/pass.ts new file mode 100644 index 0000000000000..65ab9561ca742 --- /dev/null +++ b/packages/@aws-cdk/aws-stepfunctions/lib/states/pass.ts @@ -0,0 +1,95 @@ +import cdk = require('@aws-cdk/cdk'); +import { Chain } from '../chain'; +import { IChainable, INextable } from '../types'; +import { renderJsonPath, State, StateType } from './state'; + +/** + * Properties for defining a Pass state + */ +export interface PassProps { + /** + * An optional description for this state + * + * @default No comment + */ + comment?: string; + + /** + * JSONPath expression to select part of the state to be the input to this state. + * + * May also be the special value DISCARD, which will cause the effective + * input to be the empty object {}. + * + * @default $ + */ + inputPath?: string; + + /** + * JSONPath expression to select part of the state to be the output to this state. + * + * May also be the special value DISCARD, which will cause the effective + * output to be the empty object {}. + * + * @default $ + */ + outputPath?: string; + + /** + * JSONPath expression to indicate where to inject the state's output + * + * May also be the special value DISCARD, which will cause the state's + * input to become its output. + * + * @default $ + */ + resultPath?: string; + + /** + * If given, treat as the result of this operation + * + * Can be used to inject or replace the current execution state. + * + * @default No injected result + */ + result?: any; +} + +/** + * Define a Pass in the state machine + * + * A Pass state can be used to transform the current exeuction's state. + */ +export class Pass extends State implements INextable { + public readonly endStates: INextable[]; + + private readonly result?: any; + + constructor(parent: cdk.Construct, id: string, props: PassProps = {}) { + super(parent, id, props); + + this.result = props.result; + this.endStates = [this]; + } + + /** + * Continue normal execution with the given state + */ + public next(next: IChainable): Chain { + super.makeNext(next.startState); + return Chain.sequence(this, next); + } + + /** + * Return the Amazon States Language object for this state + */ + public toStateJson(): object { + return { + Type: StateType.Pass, + Comment: this.comment, + Result: this.result, + ResultPath: renderJsonPath(this.resultPath), + ...this.renderInputOutput(), + ...this.renderNextEnd(), + }; + } +} diff --git a/packages/@aws-cdk/aws-stepfunctions/lib/states/state.ts b/packages/@aws-cdk/aws-stepfunctions/lib/states/state.ts new file mode 100644 index 0000000000000..3ef8b28d27746 --- /dev/null +++ b/packages/@aws-cdk/aws-stepfunctions/lib/states/state.ts @@ -0,0 +1,487 @@ +import cdk = require('@aws-cdk/cdk'); +import { Condition } from '../condition'; +import { StateGraph } from '../state-graph'; +import { CatchProps, DISCARD, Errors, IChainable, INextable, RetryProps } from '../types'; + +/** + * Properties shared by all states + */ +export interface StateProps { + /** + * A comment describing this state + * + * @default No comment + */ + comment?: string; + + /** + * JSONPath expression to select part of the state to be the input to this state. + * + * May also be the special value DISCARD, which will cause the effective + * input to be the empty object {}. + * + * @default $ + */ + inputPath?: string; + + /** + * JSONPath expression to select part of the state to be the output to this state. + * + * May also be the special value DISCARD, which will cause the effective + * output to be the empty object {}. + * + * @default $ + */ + outputPath?: string; + + /** + * JSONPath expression to indicate where to inject the state's output + * + * May also be the special value DISCARD, which will cause the state's + * input to become its output. + * + * @default $ + */ + resultPath?: string; +} + +/** + * Base class for all other state classes + */ +export abstract class State extends cdk.Construct implements IChainable { + /** + * Add a prefix to the stateId of all States found in a construct tree + */ + public static prefixStates(root: cdk.Construct, prefix: string) { + const queue = [root]; + while (queue.length > 0) { + const el = queue.splice(0, 1)[0]!; + if (isPrefixable(el)) { + el.addPrefix(prefix); + } + queue.push(...el.children); + } + } + + /** + * Find the set of end states states reachable through transitions from the given start state + */ + public static findReachableEndStates(start: State, options: FindStateOptions = {}) { + const visited = new Set(); + const ret = new Set(); + const queue = [start]; + while (queue.length > 0) { + const state = queue.splice(0, 1)[0]!; + if (visited.has(state)) { continue; } + visited.add(state); + + const outgoing = state.outgoingTransitions(options); + + if (outgoing.length > 0) { + // We can continue + queue.push(...outgoing); + } else { + // Terminal state + ret.add(state); + } + } + return Array.from(ret); + } + + /** + * Return only the states that allow chaining from an array of states + */ + public static filterNextables(states: State[]): INextable[] { + return states.filter(isNextable) as any; + } + + /** + * First state of this Chainable + */ + public readonly startState: State; + + /** + * Continuable states of this Chainable + */ + public abstract readonly endStates: INextable[]; + + // This class has a superset of most of the features of the other states, + // and the subclasses decide which part of the features to expose. Most + // features are shared by a couple of states, and it becomes cumbersome to + // slice it out across all states. This is not great design, but it is + // pragmatic! + protected readonly comment?: string; + protected readonly inputPath?: string; + protected readonly outputPath?: string; + protected readonly resultPath?: string; + protected readonly branches: StateGraph[] = []; + protected defaultChoice?: State; + protected _next?: State; + + private readonly retries: RetryProps[] = []; + private readonly catches: CatchTransition[] = []; + private readonly choices: ChoiceTransition[] = []; + private readonly prefixes: string[] = []; + + /** + * The graph that this state is part of. + * + * Used for guaranteeing consistency between graphs and graph components. + */ + private containingGraph?: StateGraph; + + /** + * States with references to this state. + * + * Used for finding complete connected graph that a state is part of. + */ + private readonly incomingStates: State[] = []; + + constructor(parent: cdk.Construct, id: string, props: StateProps) { + super(parent, id); + + this.startState = this; + + this.comment = props.comment; + this.inputPath = props.inputPath; + this.outputPath = props.outputPath; + this.resultPath = props.resultPath; + } + + /** + * Tokenized string that evaluates to the state's ID + */ + public get stateId(): string { + return this.prefixes.concat(this.id).join(''); + } + + /** + * Add a prefix to the stateId of this state + */ + public addPrefix(x: string) { + if (x !== '') { + this.prefixes.splice(0, 0, x); + } + } + + /** + * Register this state as part of the given graph + * + * Don't call this. It will be called automatically when you work + * states normally. + */ + public bindToGraph(graph: StateGraph) { + if (this.containingGraph === graph) { return; } + + if (this.containingGraph) { + // tslint:disable-next-line:max-line-length + throw new Error(`Trying to use state '${this.stateId}' in ${graph}, but is already in ${this.containingGraph}. Every state can only be used in one graph.`); + } + + this.containingGraph = graph; + this.onBindToGraph(graph); + + for (const incoming of this.incomingStates) { + incoming.bindToGraph(graph); + } + for (const outgoing of this.outgoingTransitions({ includeErrorHandlers: true })) { + outgoing.bindToGraph(graph); + } + for (const branch of this.branches) { + branch.registerSuperGraph(this.containingGraph); + } + } + + /** + * Render the state as JSON + */ + public abstract toStateJson(): object; + + /** + * Add a retrier to the retry list of this state + */ + protected _addRetry(props: RetryProps = {}) { + this.retries.push({ + ...props, + errors: props.errors ? props.errors : [Errors.All], + }); + } + + /** + * Add an error handler to the catch list of this state + */ + protected _addCatch(handler: State, props: CatchProps = {}) { + this.catches.push({ + next: handler, + props: { + errors: props.errors ? props.errors : [Errors.All], + resultPath: props.resultPath + } + }); + handler.addIncoming(this); + if (this.containingGraph) { + handler.bindToGraph(this.containingGraph); + } + } + + /** + * Make the indicated state the default transition of this state + */ + protected makeNext(next: State) { + // Can't be called 'setNext' because of JSII + if (this._next) { + throw new Error(`State '${this.id}' already has a next state`); + } + this._next = next; + next.addIncoming(this); + if (this.containingGraph) { + next.bindToGraph(this.containingGraph); + } + } + + /** + * Add a choice branch to this state + */ + protected addChoice(condition: Condition, next: State) { + this.choices.push({ condition, next }); + next.startState.addIncoming(this); + if (this.containingGraph) { + next.startState.bindToGraph(this.containingGraph); + } + } + + /** + * Add a paralle branch to this state + */ + protected addBranch(branch: StateGraph) { + this.branches.push(branch); + if (this.containingGraph) { + branch.registerSuperGraph(this.containingGraph); + } + } + + /** + * Make the indicated state the default choice transition of this state + */ + protected makeDefault(def: State) { + // Can't be called 'setDefault' because of JSII + if (this.defaultChoice) { + throw new Error(`Choice '${this.id}' already has a default next state`); + } + this.defaultChoice = def; + } + + /** + * Render the default next state in ASL JSON format + */ + protected renderNextEnd(): any { + if (this._next) { + return { Next: this._next.stateId }; + } else { + return { End: true }; + } + } + + /** + * Render the choices in ASL JSON format + */ + protected renderChoices(): any { + return { + Choices: renderList(this.choices, renderChoice), + Default: this.defaultChoice ? this.defaultChoice.stateId : undefined, + }; + } + + /** + * Render InputPath/OutputPath in ASL JSON format + */ + protected renderInputOutput(): any { + return { + InputPath: renderJsonPath(this.inputPath), + OutputPath: renderJsonPath(this.outputPath), + }; + } + + /** + * Render parallel branches in ASL JSON format + */ + protected renderBranches(): any { + return { + Branches: this.branches.map(b => b.toGraphJson()) + }; + } + + /** + * Render error recovery options in ASL JSON format + */ + protected renderRetryCatch(): any { + return { + Retry: renderList(this.retries, renderRetry), + Catch: renderList(this.catches, renderCatch), + }; + } + + /** + * Called whenever this state is bound to a graph + * + * Can be overridden by subclasses. + */ + protected onBindToGraph(graph: StateGraph) { + graph.registerState(this); + } + + /** + * Add a state to the incoming list + */ + private addIncoming(source: State) { + this.incomingStates.push(source); + } + + /** + * Return all states this state can transition to + */ + private outgoingTransitions(options: FindStateOptions): State[] { + const ret = new Array(); + if (this._next) { ret.push(this._next); } + if (this.defaultChoice) { ret.push(this.defaultChoice); } + for (const c of this.choices) { + ret.push(c.next); + } + if (options.includeErrorHandlers) { + for (const c of this.catches) { + ret.push(c.next); + } + } + return ret; + } +} + +/** + * Options for finding reachable states + */ +export interface FindStateOptions { + /** + * Whether or not to follow error-handling transitions + * + * @default false + */ + includeErrorHandlers?: boolean; +} + +/** + * A Choice Transition + */ +interface ChoiceTransition { + /** + * State to transition to + */ + next: State; + + /** + * Condition for this transition + */ + condition: Condition; +} + +/** + * Render a choice transition + */ +function renderChoice(c: ChoiceTransition) { + return { + ...c.condition.renderCondition(), + Next: c.next.stateId + }; +} + +/** + * A Catch Transition + */ +interface CatchTransition { + /** + * State to transition to + */ + next: State; + + /** + * Additional properties for this transition + */ + props: CatchProps; +} + +/** + * Render a Retry object to ASL + */ +function renderRetry(retry: RetryProps) { + return { + ErrorEquals: retry.errors, + IntervalSeconds: retry.intervalSeconds, + MaxAttempts: retry.maxAttempts, + BackoffRate: retry.backoffRate + }; +} + +/** + * Render a Catch object to ASL + */ +function renderCatch(c: CatchTransition) { + return { + ErrorEquals: c.props.errors, + ResultPath: renderJsonPath(c.props.resultPath), + Next: c.next.stateId, + }; +} + +/** + * Render a list or return undefined for an empty list + */ +export function renderList(xs: T[], fn: (x: T) => any): any { + if (xs.length === 0) { return undefined; } + return xs.map(fn); +} + +/** + * Render JSON path, respecting the special value DISCARD + */ +export function renderJsonPath(jsonPath?: string): undefined | null | string { + if (jsonPath === undefined) { return undefined; } + if (jsonPath === DISCARD) { return null; } + + if (!jsonPath.startsWith('$')) { + throw new Error(`Expected JSON path to start with '$', got: ${jsonPath}`); + } + return jsonPath; +} + +/** + * Interface for structural feature testing (to make TypeScript happy) + */ +interface Prefixable { + addPrefix(x: string): void; +} + +/** + * Whether an object is a Prefixable + */ +function isPrefixable(x: any): x is Prefixable { + return typeof(x) === 'object' && x.addPrefix; +} + +/** + * Whether an object is INextable + */ +function isNextable(x: any): x is INextable { + return typeof(x) === 'object' && x.next; +} + +/** + * State types + */ +export enum StateType { + Pass = 'Pass', + Task = 'Task', + Choice = 'Choice', + Wait = 'Wait', + Succeed = 'Succeed', + Fail = 'Fail', + Parallel = 'Parallel' +} \ No newline at end of file diff --git a/packages/@aws-cdk/aws-stepfunctions/lib/states/succeed.ts b/packages/@aws-cdk/aws-stepfunctions/lib/states/succeed.ts new file mode 100644 index 0000000000000..f19d63156e99d --- /dev/null +++ b/packages/@aws-cdk/aws-stepfunctions/lib/states/succeed.ts @@ -0,0 +1,60 @@ +import cdk = require('@aws-cdk/cdk'); +import { INextable } from '../types'; +import { State, StateType } from './state'; + +/** + * Properties for defining a Succeed state + */ +export interface SucceedProps { + /** + * An optional description for this state + * + * @default No comment + */ + comment?: string; + + /** + * JSONPath expression to select part of the state to be the input to this state. + * + * May also be the special value DISCARD, which will cause the effective + * input to be the empty object {}. + * + * @default $ + */ + inputPath?: string; + + /** + * JSONPath expression to select part of the state to be the output to this state. + * + * May also be the special value DISCARD, which will cause the effective + * output to be the empty object {}. + * + * @default $ + */ + outputPath?: string; + +} + +/** + * Define a Succeed state in the state machine + * + * Reaching a Succeed state terminates the state execution in success. + */ +export class Succeed extends State { + public readonly endStates: INextable[] = []; + + constructor(parent: cdk.Construct, id: string, props: SucceedProps = {}) { + super(parent, id, props); + } + + /** + * Return the Amazon States Language object for this state + */ + public toStateJson(): object { + return { + Type: StateType.Succeed, + Comment: this.comment, + ...this.renderInputOutput(), + }; + } +} diff --git a/packages/@aws-cdk/aws-stepfunctions/lib/states/task.ts b/packages/@aws-cdk/aws-stepfunctions/lib/states/task.ts new file mode 100644 index 0000000000000..2612ce33b305b --- /dev/null +++ b/packages/@aws-cdk/aws-stepfunctions/lib/states/task.ts @@ -0,0 +1,303 @@ +import cloudwatch = require('@aws-cdk/aws-cloudwatch'); +import iam = require('@aws-cdk/aws-iam'); +import cdk = require('@aws-cdk/cdk'); +import { Chain } from '../chain'; +import { StateGraph } from '../state-graph'; +import { CatchProps, IChainable, INextable, RetryProps } from '../types'; +import { renderJsonPath, State, StateType } from './state'; + +/** + * Properties for defining a Task state + */ +export interface TaskProps { + /** + * The resource that represents the work to be executed + * + * Can be either a Lambda Function or an Activity. + */ + resource: IStepFunctionsTaskResource; + + /** + * An optional description for this state + * + * @default No comment + */ + comment?: string; + + /** + * JSONPath expression to select part of the state to be the input to this state. + * + * May also be the special value DISCARD, which will cause the effective + * input to be the empty object {}. + * + * @default $ + */ + inputPath?: string; + + /** + * JSONPath expression to select part of the state to be the output to this state. + * + * May also be the special value DISCARD, which will cause the effective + * output to be the empty object {}. + * + * @default $ + */ + outputPath?: string; + + /** + * JSONPath expression to indicate where to inject the state's output + * + * May also be the special value DISCARD, which will cause the state's + * input to become its output. + * + * @default $ + */ + resultPath?: string; + + /** + * Maximum run time of this state + * + * If the state takes longer than this amount of time to complete, a 'Timeout' error is raised. + * + * @default 60 + */ + timeoutSeconds?: number; + + /** + * Maximum time between heart beats + * + * If the time between heart beats takes longer than this, a 'Timeout' error is raised. + * + * This is only relevant when using an Activity type as resource. + * + * @default No heart beat timeout + */ + heartbeatSeconds?: number; +} + +/** + * Define a Task state in the state machine + * + * Reaching a Task state causes some work to be executed, represented + * by the Task's resource property. + */ +export class Task extends State implements INextable { + public readonly endStates: INextable[]; + private readonly resourceProps: StepFunctionsTaskResourceProps; + private readonly timeoutSeconds?: number; + private readonly heartbeatSeconds?: number; + + constructor(parent: cdk.Construct, id: string, props: TaskProps) { + super(parent, id, props); + + this.timeoutSeconds = props.timeoutSeconds; + this.heartbeatSeconds = props.heartbeatSeconds; + this.resourceProps = props.resource.asStepFunctionsTaskResource(this); + this.endStates = [this]; + } + + /** + * Add retry configuration for this state + * + * This controls if and how the execution will be retried if a particular + * error occurs. + */ + public addRetry(props: RetryProps = {}): Task { + super._addRetry(props); + return this; + } + + /** + * Add a recovery handler for this state + * + * When a particular error occurs, execution will continue at the error + * handler instead of failing the state machine execution. + */ + public addCatch(handler: IChainable, props: CatchProps = {}): Task { + super._addCatch(handler.startState, props); + return this; + } + + /** + * Continue normal execution with the given state + */ + public next(next: IChainable): Chain { + super.makeNext(next.startState); + return Chain.sequence(this, next); + } + + /** + * Return the Amazon States Language object for this state + */ + public toStateJson(): object { + return { + ...this.renderNextEnd(), + ...this.renderRetryCatch(), + ...this.renderInputOutput(), + Type: StateType.Task, + Comment: this.comment, + Resource: this.resourceProps.resourceArn, + ResultPath: renderJsonPath(this.resultPath), + TimeoutSeconds: this.timeoutSeconds, + HeartbeatSeconds: this.heartbeatSeconds, + }; + } + + /** + * Return the given named metric for this Task + * + * @default sum over 5 minutes + */ + public metric(metricName: string, props?: cloudwatch.MetricCustomization): cloudwatch.Metric { + return new cloudwatch.Metric({ + namespace: 'AWS/States', + metricName, + dimensions: this.resourceProps.metricDimensions, + statistic: 'sum', + ...props + }); + } + + /** + * The interval, in milliseconds, between the time the Task starts and the time it closes. + * + * @default average over 5 minutes + */ + public metricRunTime(props?: cloudwatch.MetricCustomization): cloudwatch.Metric { + return this.taskMetric(this.resourceProps.metricPrefixSingular, 'RunTime', { statistic: 'avg', ...props }); + } + + /** + * The interval, in milliseconds, for which the activity stays in the schedule state. + * + * @default average over 5 minutes + */ + public metricScheduleTime(props?: cloudwatch.MetricCustomization): cloudwatch.Metric { + return this.taskMetric(this.resourceProps.metricPrefixSingular, 'ScheduleTime', { statistic: 'avg', ...props }); + } + + /** + * The interval, in milliseconds, between the time the activity is scheduled and the time it closes. + * + * @default average over 5 minutes + */ + public metricTime(props?: cloudwatch.MetricCustomization): cloudwatch.Metric { + return this.taskMetric(this.resourceProps.metricPrefixSingular, 'Time', { statistic: 'avg', ...props }); + } + + /** + * Metric for the number of times this activity is scheduled + * + * @default sum over 5 minutes + */ + public metricScheduled(props?: cloudwatch.MetricCustomization): cloudwatch.Metric { + return this.taskMetric(this.resourceProps.metricPrefixPlural, 'Scheduled', props); + } + + /** + * Metric for the number of times this activity times out + * + * @default sum over 5 minutes + */ + public metricTimedOut(props?: cloudwatch.MetricCustomization): cloudwatch.Metric { + return this.taskMetric(this.resourceProps.metricPrefixPlural, 'TimedOut', props); + } + + /** + * Metric for the number of times this activity is started + * + * @default sum over 5 minutes + */ + public metricStarted(props?: cloudwatch.MetricCustomization): cloudwatch.Metric { + return this.taskMetric(this.resourceProps.metricPrefixPlural, 'Started', props); + } + + /** + * Metric for the number of times this activity succeeds + * + * @default sum over 5 minutes + */ + public metricSucceeded(props?: cloudwatch.MetricCustomization): cloudwatch.Metric { + return this.taskMetric(this.resourceProps.metricPrefixPlural, 'Succeeded', props); + } + + /** + * Metric for the number of times this activity fails + * + * @default sum over 5 minutes + */ + public metricFailed(props?: cloudwatch.MetricCustomization): cloudwatch.Metric { + return this.taskMetric(this.resourceProps.metricPrefixPlural, 'Failed', props); + } + + /** + * Metric for the number of times the heartbeat times out for this activity + * + * @default sum over 5 minutes + */ + public metricHeartbeatTimedOut(props?: cloudwatch.MetricCustomization): cloudwatch.Metric { + return this.taskMetric(this.resourceProps.metricPrefixPlural, 'HeartbeatTimedOut', props); + } + + protected onBindToGraph(graph: StateGraph) { + super.onBindToGraph(graph); + for (const policyStatement of this.resourceProps.policyStatements || []) { + graph.registerPolicyStatement(policyStatement); + } + } + + private taskMetric(prefix: string | undefined, suffix: string, props?: cloudwatch.MetricCustomization): cloudwatch.Metric { + if (prefix === undefined) { + throw new Error('This Task Resource does not expose metrics'); + } + return this.metric(prefix + suffix, props); + } +} + +/** + * Interface for objects that can be invoked in a Task state + */ +export interface IStepFunctionsTaskResource { + /** + * Return the properties required for using this object as a Task resource + */ + asStepFunctionsTaskResource(callingTask: Task): StepFunctionsTaskResourceProps; +} + +/** + * Properties that define how to refer to a TaskResource + */ +export interface StepFunctionsTaskResourceProps { + /** + * The ARN of the resource + */ + resourceArn: string; + + /** + * Additional policy statements to add to the execution role + * + * @default No policy roles + */ + policyStatements?: iam.PolicyStatement[]; + + /** + * Prefix for singular metric names of activity actions + * + * @default No such metrics + */ + metricPrefixSingular?: string; + + /** + * Prefix for plural metric names of activity actions + * + * @default No such metrics + */ + metricPrefixPlural?: string; + + /** + * The dimensions to attach to metrics + * + * @default No metrics + */ + metricDimensions?: cloudwatch.DimensionHash; +} \ No newline at end of file diff --git a/packages/@aws-cdk/aws-stepfunctions/lib/states/wait.ts b/packages/@aws-cdk/aws-stepfunctions/lib/states/wait.ts new file mode 100644 index 0000000000000..6dfc327fbe600 --- /dev/null +++ b/packages/@aws-cdk/aws-stepfunctions/lib/states/wait.ts @@ -0,0 +1,98 @@ +import cdk = require('@aws-cdk/cdk'); +import { Chain } from '../chain'; +import { IChainable, INextable } from '../types'; +import { State, StateType } from './state'; + +/** + * Properties for defining a Wait state + */ +export interface WaitProps { + /** + * An optional description for this state + * + * @default No comment + */ + comment?: string; + + /** + * Wait a fixed number of seconds + * + * Exactly one of seconds, secondsPath, timestamp, timestampPath must be supplied. + */ + seconds?: number; + + /** + * Wait until the given ISO8601 timestamp + * + * Exactly one of seconds, secondsPath, timestamp, timestampPath must be supplied. + * + * @example 2016-03-14T01:59:00Z + */ + timestamp?: string; + + /** + * Wait for a number of seconds stored in the state object. + * + * Exactly one of seconds, secondsPath, timestamp, timestampPath must be supplied. + * + * @example $.waitSeconds + */ + secondsPath?: string; + + /** + * Wait until a timestamp found in the state object. + * + * Exactly one of seconds, secondsPath, timestamp, timestampPath must be supplied. + * + * @example $.waitTimestamp + */ + timestampPath?: string; +} + +/** + * Define a Wait state in the state machine + * + * A Wait state can be used to delay execution of the state machine for a while. + */ +export class Wait extends State implements INextable { + public readonly endStates: INextable[]; + + private readonly seconds?: number; + private readonly timestamp?: string; + private readonly secondsPath?: string; + private readonly timestampPath?: string; + + constructor(parent: cdk.Construct, id: string, props: WaitProps) { + super(parent, id, props); + + this.seconds = props.seconds; + this.timestamp = props.timestamp; + this.secondsPath = props.secondsPath; + this.timestampPath = props.timestampPath; + + this.endStates = [this]; + } + + /** + * Continue normal execution with the given state + */ + public next(next: IChainable): Chain { + super.makeNext(next.startState); + return Chain.sequence(this, next); + } + + /** + * Return the Amazon States Language object for this state + */ + public toStateJson(): object { + return { + Type: StateType.Wait, + Comment: this.comment, + Seconds: this.seconds, + Timestamp: this.timestamp, + SecondsPath: this.secondsPath, + TimestampPath: this.timestampPath, + ...this.renderNextEnd(), + }; + } +} \ No newline at end of file diff --git a/packages/@aws-cdk/aws-stepfunctions/lib/types.ts b/packages/@aws-cdk/aws-stepfunctions/lib/types.ts new file mode 100644 index 0000000000000..8af16b439bbff --- /dev/null +++ b/packages/@aws-cdk/aws-stepfunctions/lib/types.ts @@ -0,0 +1,141 @@ +import { Chain } from './chain'; +import { State } from './states/state'; + +/** + * Interface for states that can have 'next' states + */ +export interface INextable { + next(state: IChainable): Chain; +} + +/** + * Interface for objects that can be used in a Chain + */ +export interface IChainable { + /** + * Descriptive identifier for this chainable + */ + readonly id: string; + + /** + * The start state of this chainable + */ + readonly startState: State; + + /** + * The chainable end state(s) of this chainable + */ + readonly endStates: INextable[]; +} + +/** + * Predefined error strings + */ +export class Errors { + /** + * Matches any Error. + */ + public static readonly All = 'States.ALL'; + + /** + * A Task State either ran longer than the “TimeoutSeconds” value, or + * failed to heartbeat for a time longer than the “HeartbeatSeconds” value. + */ + public static readonly Timeout = 'States.Timeout'; + + /** + * A Task State failed during the execution. + */ + public static readonly TaskFailed = 'States.TaskFailed'; + + /** + * A Task State failed because it had insufficient privileges to execute + * the specified code. + */ + public static readonly Permissions = 'States.Permissions'; + + /** + * A Task State’s “ResultPath” field cannot be applied to the input the state received. + */ + public static readonly ResultPathMatchFailure = 'States.ResultPathMatchFailure'; + + /** + * A branch of a Parallel state failed. + */ + public static readonly BranchFailed = 'States.BranchFailed'; + + /** + * A Choice state failed to find a match for the condition field extracted + * from its input. + */ + public static readonly NoChoiceMatched = 'States.NoChoiceMatched'; +} + +/** + * Retry details + */ +export interface RetryProps { + /** + * Errors to retry + * + * A list of error strings to retry, which can be either predefined errors + * (for example Errors.NoChoiceMatched) or a self-defined error. + * + * @default All errors + */ + errors?: string[]; + + /** + * How many seconds to wait initially before retrying + * + * @default 1 + */ + intervalSeconds?: number; + + /** + * How many times to retry this particular error. + * + * May be 0 to disable retry for specific errors (in case you have + * a catch-all retry policy). + * + * @default 3 + */ + maxAttempts?: number; + + /** + * Multiplication for how much longer the wait interval gets on every retry + * + * @default 2 + */ + backoffRate?: number; +} + +/** + * Error handler details + */ +export interface CatchProps { + /** + * Errors to recover from by going to the given state + * + * A list of error strings to retry, which can be either predefined errors + * (for example Errors.NoChoiceMatched) or a self-defined error. + * + * @default All errors + */ + errors?: string[]; + + /** + * JSONPath expression to indicate where to inject the error data + * + * May also be the special value DISCARD, which will cause the error + * data to be discarded. + * + * @default $ + */ + resultPath?: string; +} + +/** + * Special string value to discard state input, output or result + */ +export const DISCARD = 'DISCARD'; \ No newline at end of file diff --git a/packages/@aws-cdk/aws-stepfunctions/package.json b/packages/@aws-cdk/aws-stepfunctions/package.json index e760671439311..d6203bbae7bf8 100644 --- a/packages/@aws-cdk/aws-stepfunctions/package.json +++ b/packages/@aws-cdk/aws-stepfunctions/package.json @@ -54,11 +54,15 @@ "devDependencies": { "@aws-cdk/assert": "^0.12.0", "cdk-build-tools": "^0.12.0", + "cdk-integ-tools": "^0.12.0", "cfn2ts": "^0.12.0", "pkglint": "^0.12.0" }, "dependencies": { - "@aws-cdk/cdk": "^0.12.0" + "@aws-cdk/cdk": "^0.12.0", + "@aws-cdk/aws-cloudwatch": "^0.12.0", + "@aws-cdk/aws-events": "^0.12.0", + "@aws-cdk/aws-iam": "^0.12.0" }, "homepage": "https://github.com/awslabs/aws-cdk" } diff --git a/packages/@aws-cdk/aws-stepfunctions/test/integ.job-poller.expected.json b/packages/@aws-cdk/aws-stepfunctions/test/integ.job-poller.expected.json new file mode 100644 index 0000000000000..00bbd00975308 --- /dev/null +++ b/packages/@aws-cdk/aws-stepfunctions/test/integ.job-poller.expected.json @@ -0,0 +1,75 @@ +{ + "Resources": { + "SubmitJobFB773A16": { + "Type": "AWS::StepFunctions::Activity", + "Properties": { + "Name": "awsstepfunctionsintegSubmitJobA2508960" + } + }, + "CheckJob5FFC1D6F": { + "Type": "AWS::StepFunctions::Activity", + "Properties": { + "Name": "awsstepfunctionsintegCheckJobC4AC762D" + } + }, + "StateMachineRoleB840431D": { + "Type": "AWS::IAM::Role", + "Properties": { + "AssumeRolePolicyDocument": { + "Statement": [ + { + "Action": "sts:AssumeRole", + "Effect": "Allow", + "Principal": { + "Service": { + "Fn::Join": [ + "", + [ + "states.", + { + "Ref": "AWS::Region" + }, + ".amazonaws.com" + ] + ] + } + } + } + ], + "Version": "2012-10-17" + } + } + }, + "StateMachine2E01A3A5": { + "Type": "AWS::StepFunctions::StateMachine", + "Properties": { + "DefinitionString": { + "Fn::Join": [ + "", + [ + "{\"StartAt\":\"Submit Job\",\"States\":{\"Submit Job\":{\"Next\":\"Wait X Seconds\",\"Type\":\"Task\",\"Resource\":\"", + { + "Ref": "SubmitJobFB773A16" + }, + "\",\"ResultPath\":\"$.guid\"},\"Wait X Seconds\":{\"Type\":\"Wait\",\"SecondsPath\":\"$.wait_time\",\"Next\":\"Get Job Status\"},\"Get Job Status\":{\"Next\":\"Job Complete?\",\"InputPath\":\"$.guid\",\"Type\":\"Task\",\"Resource\":\"", + { + "Ref": "CheckJob5FFC1D6F" + }, + "\",\"ResultPath\":\"$.status\"},\"Job Complete?\":{\"Type\":\"Choice\",\"Choices\":[{\"Variable\":\"$.status\",\"StringEquals\":\"FAILED\",\"Next\":\"Job Failed\"},{\"Variable\":\"$.status\",\"StringEquals\":\"SUCCEEDED\",\"Next\":\"Get Final Job Status\"}],\"Default\":\"Wait X Seconds\"},\"Job Failed\":{\"Type\":\"Fail\",\"Error\":\"DescribeJob returned FAILED\",\"Cause\":\"AWS Batch Job Failed\"},\"Get Final Job Status\":{\"End\":true,\"InputPath\":\"$.guid\",\"Type\":\"Task\",\"Resource\":\"", + { + "Ref": "CheckJob5FFC1D6F" + }, + "\"}},\"TimeoutSeconds\":30}" + ] + ] + }, + "RoleArn": { + "Fn::GetAtt": [ + "StateMachineRoleB840431D", + "Arn" + ] + } + } + } + } +} \ No newline at end of file diff --git a/packages/@aws-cdk/aws-stepfunctions/test/integ.job-poller.ts b/packages/@aws-cdk/aws-stepfunctions/test/integ.job-poller.ts new file mode 100644 index 0000000000000..8110a39431c17 --- /dev/null +++ b/packages/@aws-cdk/aws-stepfunctions/test/integ.job-poller.ts @@ -0,0 +1,49 @@ +import cdk = require( '@aws-cdk/cdk'); +import stepfunctions = require('../lib'); + +class JobPollerStack extends cdk.Stack { + constructor(parent: cdk.App, id: string, props: cdk.StackProps = {}) { + super(parent, id, props); + + const submitJobActivity = new stepfunctions.Activity(this, 'SubmitJob'); + const checkJobActivity = new stepfunctions.Activity(this, 'CheckJob'); + + const submitJob = new stepfunctions.Task(this, 'Submit Job', { + resource: submitJobActivity, + resultPath: '$.guid', + }); + const waitX = new stepfunctions.Wait(this, 'Wait X Seconds', { secondsPath: '$.wait_time' }); + const getStatus = new stepfunctions.Task(this, 'Get Job Status', { + resource: checkJobActivity, + inputPath: '$.guid', + resultPath: '$.status', + }); + const isComplete = new stepfunctions.Choice(this, 'Job Complete?'); + const jobFailed = new stepfunctions.Fail(this, 'Job Failed', { + cause: 'AWS Batch Job Failed', + error: 'DescribeJob returned FAILED', + }); + const finalStatus = new stepfunctions.Task(this, 'Get Final Job Status', { + resource: checkJobActivity, + inputPath: '$.guid', + }); + + const chain = stepfunctions.Chain + .start(submitJob) + .next(waitX) + .next(getStatus) + .next(isComplete + .when(stepfunctions.Condition.stringEquals('$.status', 'FAILED'), jobFailed) + .when(stepfunctions.Condition.stringEquals('$.status', 'SUCCEEDED'), finalStatus) + .otherwise(waitX)); + + new stepfunctions.StateMachine(this, 'StateMachine', { + definition: chain, + timeoutSec: 30 + }); + } +} + +const app = new cdk.App(); +new JobPollerStack(app, 'aws-stepfunctions-integ'); +app.run(); \ No newline at end of file diff --git a/packages/@aws-cdk/aws-stepfunctions/test/test.activity.ts b/packages/@aws-cdk/aws-stepfunctions/test/test.activity.ts new file mode 100644 index 0000000000000..aed33b0050006 --- /dev/null +++ b/packages/@aws-cdk/aws-stepfunctions/test/test.activity.ts @@ -0,0 +1,77 @@ +import { expect, haveResource } from '@aws-cdk/assert'; +import cdk = require('@aws-cdk/cdk'); +import { Test } from 'nodeunit'; +import stepfunctions = require('../lib'); + +export = { + 'instantiate Activity'(test: Test) { + // GIVEN + const stack = new cdk.Stack(); + + // WHEN + new stepfunctions.Activity(stack, 'Activity'); + + // THEN + expect(stack).to(haveResource('AWS::StepFunctions::Activity', { + Name: 'Activity' + })); + + test.done(); + }, + + 'Activity can be used in a Task'(test: Test) { + // GIVEN + const stack = new cdk.Stack(); + + // WHEN + const activity = new stepfunctions.Activity(stack, 'Activity'); + const task = new stepfunctions.Task(stack, 'Task', { + resource: activity + }); + new stepfunctions.StateMachine(stack, 'SM', { + definition: task + }); + + // THEN + expect(stack).to(haveResource('AWS::StepFunctions::StateMachine', { + DefinitionString: { + "Fn::Join": ["", [ + "{\"StartAt\":\"Task\",\"States\":{\"Task\":{\"End\":true,\"Type\":\"Task\",\"Resource\":\"", + { Ref: "Activity04690B0A" }, + "\"}}}" + + ]] + }, + })); + + test.done(); + }, + + 'Activity exposes metrics'(test: Test) { + // GIVEN + const stack = new cdk.Stack(); + + // WHEN + const activity = new stepfunctions.Activity(stack, 'Activity'); + + // THEN + const sharedMetric = { + periodSec: 300, + namespace: 'AWS/States', + dimensions: { ActivityArn: { Ref: 'Activity04690B0A' }}, + }; + test.deepEqual(cdk.resolve(activity.metricRunTime()), { + ...sharedMetric, + metricName: 'ActivityRunTime', + statistic: 'avg' + }); + + test.deepEqual(cdk.resolve(activity.metricFailed()), { + ...sharedMetric, + metricName: 'ActivitiesFailed', + statistic: 'sum' + }); + + test.done(); + } +}; \ No newline at end of file diff --git a/packages/@aws-cdk/aws-stepfunctions/test/test.condition.ts b/packages/@aws-cdk/aws-stepfunctions/test/test.condition.ts new file mode 100644 index 0000000000000..f0577272715b8 --- /dev/null +++ b/packages/@aws-cdk/aws-stepfunctions/test/test.condition.ts @@ -0,0 +1,12 @@ +import { Test } from 'nodeunit'; +import stepfunctions = require('../lib'); + +export = { + 'Condition variables must start with $.'(test: Test) { + test.throws(() => { + stepfunctions.Condition.stringEquals('a', 'b'); + }); + + test.done(); + } +}; diff --git a/packages/@aws-cdk/aws-stepfunctions/test/test.metrics.ts b/packages/@aws-cdk/aws-stepfunctions/test/test.metrics.ts new file mode 100644 index 0000000000000..cd2752d396218 --- /dev/null +++ b/packages/@aws-cdk/aws-stepfunctions/test/test.metrics.ts @@ -0,0 +1,44 @@ +import cdk = require('@aws-cdk/cdk'); +import { Test } from 'nodeunit'; +import stepfunctions = require('../lib'); + +export = { + 'Activity Task metrics and Activity metrics are the same'(test: Test) { + // GIVEN + const stack = new cdk.Stack(); + const activity = new stepfunctions.Activity(stack, 'Activity'); + const task = new stepfunctions.Task(stack, 'Task', { resource: activity }); + + // WHEN + const activityMetrics = [ + activity.metricFailed(), + activity.metricHeartbeatTimedOut(), + activity.metricRunTime(), + activity.metricScheduled(), + activity.metricScheduleTime(), + activity.metricStarted(), + activity.metricSucceeded(), + activity.metricTime(), + activity.metricTimedOut() + ]; + + const taskMetrics = [ + task.metricFailed(), + task.metricHeartbeatTimedOut(), + task.metricRunTime(), + task.metricScheduled(), + task.metricScheduleTime(), + task.metricStarted(), + task.metricSucceeded(), + task.metricTime(), + task.metricTimedOut(), + ]; + + // THEN + for (let i = 0; i < activityMetrics.length; i++) { + test.deepEqual(activityMetrics[i], taskMetrics[i]); + } + + test.done(); + } +}; \ No newline at end of file diff --git a/packages/@aws-cdk/aws-stepfunctions/test/test.state-machine-resources.ts b/packages/@aws-cdk/aws-stepfunctions/test/test.state-machine-resources.ts new file mode 100644 index 0000000000000..d3482e31d960b --- /dev/null +++ b/packages/@aws-cdk/aws-stepfunctions/test/test.state-machine-resources.ts @@ -0,0 +1,111 @@ +import { expect, haveResource } from '@aws-cdk/assert'; +import iam = require('@aws-cdk/aws-iam'); +import cdk = require('@aws-cdk/cdk'); +import { Test } from 'nodeunit'; +import stepfunctions = require('../lib'); + +export = { + 'Tasks can add permissions to the execution role'(test: Test) { + // GIVEN + const stack = new cdk.Stack(); + const task = new stepfunctions.Task(stack, 'Task', { + resource: new FakeResource(), + }); + + // WHEN + new stepfunctions.StateMachine(stack, 'SM', { + definition: task + }); + + // THEN + expect(stack).to(haveResource('AWS::IAM::Policy', { + PolicyDocument: { + Statement: [ + { + Action: "resource:Everything", + Effect: "Allow", + Resource: "resource" + } + ], + } + })); + + test.done(); + }, + + 'Tasks hidden inside a Parallel state are also included'(test: Test) { + // GIVEN + const stack = new cdk.Stack(); + const task = new stepfunctions.Task(stack, 'Task', { + resource: new FakeResource(), + }); + + const para = new stepfunctions.Parallel(stack, 'Para'); + para.branch(task); + + // WHEN + new stepfunctions.StateMachine(stack, 'SM', { + definition: para + }); + + // THEN + expect(stack).to(haveResource('AWS::IAM::Policy', { + PolicyDocument: { + Statement: [ + { + Action: "resource:Everything", + Effect: "Allow", + Resource: "resource" + } + ], + } + })); + + test.done(); + }, + + 'Task metrics use values returned from resource'(test: Test) { + // GIVEN + const stack = new cdk.Stack(); + + // WHEN + const task = new stepfunctions.Task(stack, 'Task', { resource: new FakeResource() }); + + // THEN + const sharedMetric = { + periodSec: 300, + namespace: 'AWS/States', + dimensions: { ResourceArn: 'resource' }, + }; + test.deepEqual(cdk.resolve(task.metricRunTime()), { + ...sharedMetric, + metricName: 'FakeResourceRunTime', + statistic: 'avg' + }); + + test.deepEqual(cdk.resolve(task.metricFailed()), { + ...sharedMetric, + metricName: 'FakeResourcesFailed', + statistic: 'sum' + }); + + test.done(); + } +}; + +class FakeResource implements stepfunctions.IStepFunctionsTaskResource { + public asStepFunctionsTaskResource(_callingTask: stepfunctions.Task): stepfunctions.StepFunctionsTaskResourceProps { + const resourceArn = 'resource'; + + return { + resourceArn, + policyStatements: [new iam.PolicyStatement() + .addAction('resource:Everything') + .addResource('resource') + ], + metricPrefixSingular: 'FakeResource', + metricPrefixPlural: 'FakeResources', + metricDimensions: { ResourceArn: resourceArn }, + }; + } +} \ No newline at end of file diff --git a/packages/@aws-cdk/aws-stepfunctions/test/test.states-language.ts b/packages/@aws-cdk/aws-stepfunctions/test/test.states-language.ts new file mode 100644 index 0000000000000..beeb6dc04d857 --- /dev/null +++ b/packages/@aws-cdk/aws-stepfunctions/test/test.states-language.ts @@ -0,0 +1,733 @@ +import cdk = require('@aws-cdk/cdk'); +import { Test } from 'nodeunit'; +import stepfunctions = require('../lib'); + +export = { + 'Basic composition': { + 'A single task is a State Machine'(test: Test) { + // GIVEN + const stack = new cdk.Stack(); + + // WHEN + const chain = new stepfunctions.Pass(stack, 'Some State'); + + // THEN + test.deepEqual(render(chain), { + StartAt: 'Some State', + States: { + 'Some State': { Type: 'Pass', End: true } + } + }); + + test.done(); + }, + + 'A sequence of two tasks is a State Machine'(test: Test) { + // GIVEN + const stack = new cdk.Stack(); + + // WHEN + const task1 = new stepfunctions.Pass(stack, 'State One'); + const task2 = new stepfunctions.Pass(stack, 'State Two'); + + const chain = stepfunctions.Chain + .start(task1) + .next(task2); + + // THEN + test.deepEqual(render(chain), { + StartAt: 'State One', + States: { + 'State One': { Type: 'Pass', Next: 'State Two' }, + 'State Two': { Type: 'Pass', End: true }, + } + }); + + test.done(); + }, + + 'You dont need to hold on to the state to render the entire state machine correctly'(test: Test) { + // GIVEN + const stack = new cdk.Stack(); + + // WHEN + const task1 = new stepfunctions.Pass(stack, 'State One'); + const task2 = new stepfunctions.Pass(stack, 'State Two'); + + task1.next(task2); + + // THEN + test.deepEqual(render(task1), { + StartAt: 'State One', + States: { + 'State One': { Type: 'Pass', Next: 'State Two' }, + 'State Two': { Type: 'Pass', End: true }, + } + }); + + test.done(); + }, + + 'A chain can be appended to'(test: Test) { + // GIVEN + const stack = new cdk.Stack(); + + const task1 = new stepfunctions.Pass(stack, 'State One'); + const task2 = new stepfunctions.Pass(stack, 'State Two'); + const task3 = new stepfunctions.Pass(stack, 'State Three'); + + // WHEN + const chain = stepfunctions.Chain + .start(task1) + .next(task2) + .next(task3); + + // THEN + test.deepEqual(render(chain), { + StartAt: 'State One', + States: { + 'State One': { Type: 'Pass', Next: 'State Two' }, + 'State Two': { Type: 'Pass', Next: 'State Three' }, + 'State Three': { Type: 'Pass', End: true }, + } + }); + + test.done(); + }, + + 'A state machine can be appended to another state machine'(test: Test) { + // GIVEN + const stack = new cdk.Stack(); + + const task1 = new stepfunctions.Pass(stack, 'State One'); + const task2 = new stepfunctions.Pass(stack, 'State Two'); + const task3 = new stepfunctions.Wait(stack, 'State Three', { seconds: 10 }); + + // WHEN + const chain = stepfunctions.Chain + .start(task1) + .next(stepfunctions.Chain.start(task2).next(task3)); + + // THEN + test.deepEqual(render(chain), { + StartAt: 'State One', + States: { + 'State One': { Type: 'Pass', Next: 'State Two' }, + 'State Two': { Type: 'Pass', Next: 'State Three' }, + 'State Three': { Type: 'Wait', End: true, Seconds: 10 }, + } + }); + + test.done(); + }, + + 'A state machine definition can be instantiated and chained'(test: Test) { + // GIVEN + const stack = new cdk.Stack(); + const before = new stepfunctions.Pass(stack, 'Before'); + const after = new stepfunctions.Pass(stack, 'After'); + + // WHEN + const chain = before.next(new ReusableStateMachine(stack, 'Reusable')).next(after); + + // THEN + test.deepEqual(render(chain), { + StartAt: 'Before', + States: { + 'Before': { Type: 'Pass', Next: 'Choice' }, + 'Choice': { + Type: 'Choice', + Choices: [ + { Variable: '$.branch', StringEquals: 'left', Next: 'Left Branch' }, + { Variable: '$.branch', StringEquals: 'right', Next: 'Right Branch' }, + ] + }, + 'Left Branch': { Type: 'Pass', Next: 'After' }, + 'Right Branch': { Type: 'Pass', Next: 'After' }, + 'After': { Type: 'Pass', End: true }, + } + }); + + test.done(); + }, + + 'A success state cannot be chained onto'(test: Test) { + // GIVEN + const stack = new cdk.Stack(); + + const succeed = new stepfunctions.Succeed(stack, 'Succeed'); + const pass = new stepfunctions.Pass(stack, 'Pass'); + + // WHEN + test.throws(() => { + pass.next(succeed).next(pass); + }); + + test.done(); + }, + + 'A failure state cannot be chained onto'(test: Test) { + // GIVEN + const stack = new cdk.Stack(); + const fail = new stepfunctions.Fail(stack, 'Fail', { error: 'X', cause: 'Y' }); + const pass = new stepfunctions.Pass(stack, 'Pass'); + + // WHEN + test.throws(() => { + pass.next(fail).next(pass); + }); + + test.done(); + }, + + 'Parallels can contain direct states'(test: Test) { + // GIVEN + const stack = new cdk.Stack(); + + const one = new stepfunctions.Pass(stack, 'One'); + const two = new stepfunctions.Pass(stack, 'Two'); + const three = new stepfunctions.Pass(stack, 'Three'); + + // WHEN + const para = new stepfunctions.Parallel(stack, 'Parallel'); + para.branch(one.next(two)); + para.branch(three); + + // THEN + test.deepEqual(render(para), { + StartAt: 'Parallel', + States: { + Parallel: { + Type: 'Parallel', + End: true, + Branches: [ + { + StartAt: 'One', + States: { + One: { Type: 'Pass', Next: 'Two' }, + Two: { Type: 'Pass', End: true }, + } + }, + { + StartAt: 'Three', + States: { + Three: { Type: 'Pass', End: true } + } + } + ] + } + } + }); + + test.done(); + }, + + 'Parallels can contain instantiated reusable definitions'(test: Test) { + // GIVEN + const stack = new cdk.Stack(); + + // WHEN + const para = new stepfunctions.Parallel(stack, 'Parallel'); + para.branch(new ReusableStateMachine(stack, 'Reusable1').prefixStates('Reusable1/')); + para.branch(new ReusableStateMachine(stack, 'Reusable2').prefixStates('Reusable2/')); + + // THEN + test.deepEqual(render(para), { + StartAt: 'Parallel', + States: { + Parallel: { + Type: 'Parallel', + End: true, + Branches: [ + { + StartAt: 'Reusable1/Choice', + States: { + 'Reusable1/Choice': { + Type: 'Choice', + Choices: [ + { Variable: '$.branch', StringEquals: 'left', Next: 'Reusable1/Left Branch' }, + { Variable: '$.branch', StringEquals: 'right', Next: 'Reusable1/Right Branch' }, + ] + }, + 'Reusable1/Left Branch': { Type: 'Pass', End: true }, + 'Reusable1/Right Branch': { Type: 'Pass', End: true }, + } + }, + { + StartAt: 'Reusable2/Choice', + States: { + 'Reusable2/Choice': { + Type: 'Choice', + Choices: [ + { Variable: '$.branch', StringEquals: 'left', Next: 'Reusable2/Left Branch' }, + { Variable: '$.branch', StringEquals: 'right', Next: 'Reusable2/Right Branch' }, + ] + }, + 'Reusable2/Left Branch': { Type: 'Pass', End: true }, + 'Reusable2/Right Branch': { Type: 'Pass', End: true }, + } + }, + ] + } + } + }); + + test.done(); + }, + + 'State Machine Fragments can be wrapped in a single state'(test: Test) { + // GIVEN + const stack = new cdk.Stack(); + + const reusable = new SimpleChain(stack, 'Hello'); + const state = reusable.toSingleState(); + + test.deepEqual(render(state), { + StartAt: 'Hello', + States: { + Hello: { + Type: 'Parallel', + End: true, + Branches: [ + { + StartAt: 'Hello: Task1', + States: { + 'Hello: Task1': { Type: 'Task', Next: 'Hello: Task2', Resource: 'resource' }, + 'Hello: Task2': { Type: 'Task', End: true, Resource: 'resource' }, + } + } + ], + }, + } + }); + + test.done(); + }, + + 'Chaining onto branched failure state ignores failure state'(test: Test) { + // GIVEN + const stack = new cdk.Stack(); + + const yes = new stepfunctions.Pass(stack, 'Yes'); + const no = new stepfunctions.Fail(stack, 'No', { error: 'Failure', cause: 'Wrong branch' }); + const enfin = new stepfunctions.Pass(stack, 'Finally'); + const choice = new stepfunctions.Choice(stack, 'Choice') + .when(stepfunctions.Condition.stringEquals('$.foo', 'bar'), yes) + .otherwise(no); + + // WHEN + choice.afterwards().next(enfin); + + // THEN + test.deepEqual(render(choice), { + StartAt: 'Choice', + States: { + Choice: { + Type: 'Choice', + Choices: [ + { Variable: '$.foo', StringEquals: 'bar', Next: 'Yes' }, + ], + Default: 'No', + }, + Yes: { Type: 'Pass', Next: 'Finally' }, + No: { Type: 'Fail', Error: 'Failure', Cause: 'Wrong branch' }, + Finally: { Type: 'Pass', End: true }, + } + }); + + test.done(); + }, + + 'Can include OTHERWISE transition for Choice in afterwards()'(test: Test) { + // GIVEN + const stack = new cdk.Stack(); + + // WHEN + const chain = new stepfunctions.Choice(stack, 'Choice') + .when(stepfunctions.Condition.stringEquals('$.foo', 'bar'), + new stepfunctions.Pass(stack, 'Yes')) + .afterwards({ includeOtherwise: true }) + .next(new stepfunctions.Pass(stack, 'Finally')); + + // THEN + test.deepEqual(render(chain), { + StartAt: 'Choice', + States: { + Choice: { + Type: 'Choice', + Choices: [ + { Variable: '$.foo', StringEquals: 'bar', Next: 'Yes' }, + ], + Default: 'Finally', + }, + Yes: { Type: 'Pass', Next: 'Finally' }, + Finally: { Type: 'Pass', End: true }, + } + }); + + test.done(); + } + }, + + 'Goto support': { + 'State machines can have unconstrainted gotos'(test: Test) { + // GIVEN + const stack = new cdk.Stack(); + + const one = new stepfunctions.Pass(stack, 'One'); + const two = new stepfunctions.Pass(stack, 'Two'); + + // WHEN + const chain = one.next(two).next(one); + + // THEN + test.deepEqual(render(chain), { + StartAt: 'One', + States: { + One: { Type: 'Pass', Next: 'Two' }, + Two: { Type: 'Pass', Next: 'One' }, + } + }); + + test.done(); + }, + }, + + 'Catches': { + 'States can have error branches'(test: Test) { + // GIVEN + const stack = new cdk.Stack(); + const task1 = new stepfunctions.Task(stack, 'Task1', { resource: new FakeResource() }); + const failure = new stepfunctions.Fail(stack, 'Failed', { error: 'DidNotWork', cause: 'We got stuck' }); + + // WHEN + const chain = task1.addCatch(failure); + + // THEN + test.deepEqual(render(chain), { + StartAt: 'Task1', + States: { + Task1: { + Type: 'Task', + Resource: 'resource', + End: true, + Catch: [ + { ErrorEquals: ['States.ALL'], Next: 'Failed' }, + ] + }, + Failed: { + Type: 'Fail', + Error: 'DidNotWork', + Cause: 'We got stuck', + } + } + }); + + test.done(); + }, + + 'Retries and errors with a result path'(test: Test) { + // GIVEN + const stack = new cdk.Stack(); + const task1 = new stepfunctions.Task(stack, 'Task1', { resource: new FakeResource() }); + const failure = new stepfunctions.Fail(stack, 'Failed', { error: 'DidNotWork', cause: 'We got stuck' }); + + // WHEN + const chain = task1.addRetry({ errors: ['HTTPError'], maxAttempts: 2 }).addCatch(failure, { resultPath: '$.some_error' }).next(failure); + + // THEN + test.deepEqual(render(chain), { + StartAt: 'Task1', + States: { + Task1: { + Type: 'Task', + Resource: 'resource', + Catch: [ { ErrorEquals: ['States.ALL'], Next: 'Failed', ResultPath: '$.some_error' } ], + Retry: [ { ErrorEquals: ['HTTPError'], MaxAttempts: 2 } ], + Next: 'Failed', + }, + Failed: { + Type: 'Fail', + Error: 'DidNotWork', + Cause: 'We got stuck', + } + } + }); + + test.done(); + + }, + + 'Can wrap chain and attach error handler'(test: Test) { + // GIVEN + const stack = new cdk.Stack(); + + const task1 = new stepfunctions.Task(stack, 'Task1', { resource: new FakeResource() }); + const task2 = new stepfunctions.Task(stack, 'Task2', { resource: new FakeResource() }); + const errorHandler = new stepfunctions.Pass(stack, 'ErrorHandler'); + + // WHEN + const chain = task1.next(task2).toSingleState('Wrapped').addCatch(errorHandler); + + // THEN + test.deepEqual(render(chain), { + StartAt: 'Wrapped', + States: { + Wrapped: { + Type: 'Parallel', + Branches: [ + { + StartAt: 'Task1', + States: { + Task1: { + Type: 'Task', + Resource: 'resource', + Next: 'Task2', + }, + Task2: { + Type: 'Task', + Resource: 'resource', + End: true, + }, + } + } + ], + Catch: [ + { ErrorEquals: ['States.ALL'], Next: 'ErrorHandler' }, + ], + End: true + }, + ErrorHandler: { Type: 'Pass', End: true } + }, + }); + + test.done(); + }, + + 'Chaining does not chain onto error handler state'(test: Test) { + // GIVEN + const stack = new cdk.Stack(); + + const task1 = new stepfunctions.Task(stack, 'Task1', { resource: new FakeResource() }); + const task2 = new stepfunctions.Task(stack, 'Task2', { resource: new FakeResource() }); + const errorHandler = new stepfunctions.Pass(stack, 'ErrorHandler'); + + // WHEN + const chain = task1.addCatch(errorHandler).next(task2); + + // THEN + test.deepEqual(render(chain), { + StartAt: 'Task1', + States: { + Task1: { + Type: 'Task', + Resource: 'resource', + Next: 'Task2', + Catch: [ + { ErrorEquals: ['States.ALL'], Next: 'ErrorHandler' }, + ] + }, + Task2: { Type: 'Task', Resource: 'resource', End: true }, + ErrorHandler: { Type: 'Pass', End: true }, + } + }); + + test.done(); + }, + + 'Chaining does not chain onto error handler, extended'(test: Test) { + // GIVEN + const stack = new cdk.Stack(); + + const task1 = new stepfunctions.Task(stack, 'Task1', { resource: new FakeResource() }); + const task2 = new stepfunctions.Task(stack, 'Task2', { resource: new FakeResource() }); + const task3 = new stepfunctions.Task(stack, 'Task3', { resource: new FakeResource() }); + const errorHandler = new stepfunctions.Pass(stack, 'ErrorHandler'); + + // WHEN + const chain = task1.addCatch(errorHandler) + .next(task2.addCatch(errorHandler)) + .next(task3.addCatch(errorHandler)); + + // THEN + const sharedTaskProps = { Type: 'Task', Resource: 'resource', Catch: [ { ErrorEquals: ['States.ALL'], Next: 'ErrorHandler' } ] }; + test.deepEqual(render(chain), { + StartAt: 'Task1', + States: { + Task1: { Next: 'Task2', ...sharedTaskProps }, + Task2: { Next: 'Task3', ...sharedTaskProps }, + Task3: { End: true, ...sharedTaskProps }, + ErrorHandler: { Type: 'Pass', End: true }, + } + }); + + test.done(); + }, + + 'Error handler with a fragment'(test: Test) { + // GIVEN + const stack = new cdk.Stack(); + + const task1 = new stepfunctions.Task(stack, 'Task1', { resource: new FakeResource() }); + const task2 = new stepfunctions.Task(stack, 'Task2', { resource: new FakeResource() }); + const errorHandler = new stepfunctions.Pass(stack, 'ErrorHandler'); + + // WHEN + task1.addCatch(errorHandler) + .next(new SimpleChain(stack, 'Chain').catch(errorHandler)) + .next(task2.addCatch(errorHandler)); + + test.done(); + }, + + 'Can merge state machines with shared states'(test: Test) { + // GIVEN + const stack = new cdk.Stack(); + + const task1 = new stepfunctions.Task(stack, 'Task1', { resource: new FakeResource() }); + const task2 = new stepfunctions.Task(stack, 'Task2', { resource: new FakeResource() }); + const failure = new stepfunctions.Fail(stack, 'Failed', { error: 'DidNotWork', cause: 'We got stuck' }); + + // WHEN + task1.addCatch(failure); + task2.addCatch(failure); + + task1.next(task2); + + // THEN + test.deepEqual(render(task1), { + StartAt: 'Task1', + States: { + Task1: { + Type: 'Task', + Resource: 'resource', + Next: 'Task2', + Catch: [ + { ErrorEquals: ['States.ALL'], Next: 'Failed' }, + ] + }, + Task2: { + Type: 'Task', + Resource: 'resource', + End: true, + Catch: [ + { ErrorEquals: ['States.ALL'], Next: 'Failed' }, + ] + }, + Failed: { + Type: 'Fail', + Error: 'DidNotWork', + Cause: 'We got stuck', + } + } + }); + + test.done(); + } + }, + + 'State machine validation': { + 'No duplicate state IDs'(test: Test) { + // GIVEN + const stack = new cdk.Stack(); + const intermediateParent = new cdk.Construct(stack, 'Parent'); + + const state1 = new stepfunctions.Pass(stack, 'State'); + const state2 = new stepfunctions.Pass(intermediateParent, 'State'); + + state1.next(state2); + + // WHEN + test.throws(() => { + render(state1); + }); + + test.done(); + }, + + 'No duplicate state IDs even across Parallel branches'(test: Test) { + // GIVEN + const stack = new cdk.Stack(); + const intermediateParent = new cdk.Construct(stack, 'Parent'); + + const state1 = new stepfunctions.Pass(stack, 'State'); + const state2 = new stepfunctions.Pass(intermediateParent, 'State'); + + const parallel = new stepfunctions.Parallel(stack, 'Parallel') + .branch(state1) + .branch(state2); + + // WHEN + test.throws(() => { + render(parallel); + }); + + test.done(); + }, + + 'No cross-parallel jumps'(test: Test) { + // GIVEN + const stack = new cdk.Stack(); + const state1 = new stepfunctions.Pass(stack, 'State1'); + const state2 = new stepfunctions.Pass(stack, 'State2'); + + test.throws(() => { + new stepfunctions.Parallel(stack, 'Parallel') + .branch(state1.next(state2)) + .branch(state2); + }); + + test.done(); + }, + }, +}; + +class ReusableStateMachine extends stepfunctions.StateMachineFragment { + public readonly startState: stepfunctions.State; + public readonly endStates: stepfunctions.INextable[]; + constructor(parent: cdk.Construct, id: string) { + super(parent, id); + + const choice = new stepfunctions.Choice(this, 'Choice') + .when(stepfunctions.Condition.stringEquals('$.branch', 'left'), new stepfunctions.Pass(this, 'Left Branch')) + .when(stepfunctions.Condition.stringEquals('$.branch', 'right'), new stepfunctions.Pass(this, 'Right Branch')); + + this.startState = choice; + this.endStates = choice.afterwards().endStates; + } +} + +class SimpleChain extends stepfunctions.StateMachineFragment { + public readonly startState: stepfunctions.State; + public readonly endStates: stepfunctions.INextable[]; + + private readonly task2: stepfunctions.Task; + constructor(parent: cdk.Construct, id: string) { + super(parent, id); + + const task1 = new stepfunctions.Task(this, 'Task1', { resource: new FakeResource() }); + this.task2 = new stepfunctions.Task(this, 'Task2', { resource: new FakeResource() }); + + task1.next(this.task2); + + this.startState = task1; + this.endStates = [this.task2]; + } + + public catch(state: stepfunctions.IChainable, props?: stepfunctions.CatchProps): SimpleChain { + this.task2.addCatch(state, props); + return this; + } +} + +class FakeResource implements stepfunctions.IStepFunctionsTaskResource { + public asStepFunctionsTaskResource(_callingTask: stepfunctions.Task): stepfunctions.StepFunctionsTaskResourceProps { + return { + resourceArn: 'resource' + }; + } +} + +function render(sm: stepfunctions.IChainable) { + return cdk.resolve(new stepfunctions.StateGraph(sm.startState, 'Test Graph').toGraphJson()); +} diff --git a/packages/@aws-cdk/aws-stepfunctions/test/test.stepfunctions.ts b/packages/@aws-cdk/aws-stepfunctions/test/test.stepfunctions.ts deleted file mode 100644 index 820f6b467f38f..0000000000000 --- a/packages/@aws-cdk/aws-stepfunctions/test/test.stepfunctions.ts +++ /dev/null @@ -1,8 +0,0 @@ -import { Test, testCase } from 'nodeunit'; - -export = testCase({ - notTested(test: Test) { - test.ok(true, 'No tests are specified for this package.'); - test.done(); - } -}); diff --git a/packages/@aws-cdk/cdk/lib/cloudformation/cloudformation-json.ts b/packages/@aws-cdk/cdk/lib/cloudformation/cloudformation-json.ts index 84e99051b18b6..6075f5650352a 100644 --- a/packages/@aws-cdk/cdk/lib/cloudformation/cloudformation-json.ts +++ b/packages/@aws-cdk/cdk/lib/cloudformation/cloudformation-json.ts @@ -37,6 +37,8 @@ export class CloudFormationJSON { * Recurse into a structure, replace all intrinsics with IntrinsicTokens. */ function deepReplaceIntrinsics(x: any): any { + if (x == null) { return x; } + if (isIntrinsic(x)) { return wrapIntrinsic(x); }