Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(stepfunctions): Add support for ResultSelector #14648

Merged
merged 8 commits into from
May 17, 2021
Merged
38 changes: 32 additions & 6 deletions packages/@aws-cdk/aws-stepfunctions-tasks/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,32 @@ const submitJob = new tasks.LambdaInvoke(this, 'Invoke Handler', {
});
```

### ResultSelector

You can use [`ResultSelector`](https://docs.aws.amazon.com/step-functions/latest/dg/input-output-inputpath-params.html#input-output-resultselector)
to manipulate the raw result of a Task, Map or Parallel state before it is
passed to [`ResultPath`](###ResultPath). For service integrations, the raw
result contains metadata in addition to the response payload. You can use
ResultSelector to construct a JSON payload that becomes the effective result
using static values or references to the raw result or context object.

The following example extracts the output payload of a Lambda function Task and combines
it with some static values and the state name from the context object.

```ts
new tasks.LambdaInvoke(this, 'Invoke Handler', {
lambdaFunction: fn,
resultSelector: {
lambdaOutput: sfn.JsonPath.stringAt('$.Payload'),
invokeRequestId: sfn.JsonPath.stringAt('$.SdkResponseMetadata.RequestId'),
staticValue: {
foo: 'bar',
},
stateName: sfn.JsonPath.stringAt('$$.State.Name'),
},
})
```

### ResultPath

The output of a state can be a copy of its input, the result it produces (for
Expand Down Expand Up @@ -226,8 +252,8 @@ of the Node.js family are supported.

Step Functions supports [API Gateway](https://docs.aws.amazon.com/step-functions/latest/dg/connect-api-gateway.html) through the service integration pattern.

HTTP APIs are designed for low-latency, cost-effective integrations with AWS services, including AWS Lambda, and HTTP endpoints.
HTTP APIs support OIDC and OAuth 2.0 authorization, and come with built-in support for CORS and automatic deployments.
HTTP APIs are designed for low-latency, cost-effective integrations with AWS services, including AWS Lambda, and HTTP endpoints.
HTTP APIs support OIDC and OAuth 2.0 authorization, and come with built-in support for CORS and automatic deployments.
Previous-generation REST APIs currently offer more features. More details can be found [here](https://docs.aws.amazon.com/apigateway/latest/developerguide/http-api-vs-rest.html).

### Call REST API Endpoint
Expand Down Expand Up @@ -507,8 +533,8 @@ isolation by design. Learn more about [Fargate](https://aws.amazon.com/fargate/)

The Fargate launch type allows you to run your containerized applications without the need
to provision and manage the backend infrastructure. Just register your task definition and
Fargate launches the container for you. The latest ACTIVE revision of the passed
task definition is used for running the task. Learn more about
Fargate launches the container for you. The latest ACTIVE revision of the passed
task definition is used for running the task. Learn more about
[Fargate Versioning](https://docs.aws.amazon.com/AmazonECS/latest/APIReference/API_DescribeTaskDefinition.html)

The following example runs a job from a task definition on Fargate
Expand Down Expand Up @@ -718,7 +744,7 @@ You can call the [`StartJobRun`](https://docs.aws.amazon.com/glue/latest/dg/aws-
new tasks.GlueStartJobRun(this, 'Task', {
glueJobName: 'my-glue-job',
arguments: sfn.TaskInput.fromObject({
key: 'value',
key: 'value',
}),
timeout: cdk.Duration.minutes(30),
notifyDelayAfter: cdk.Duration.minutes(5),
Expand Down Expand Up @@ -1020,7 +1046,7 @@ a specific task in a state machine.

When Step Functions reaches an activity task state, the workflow waits for an
activity worker to poll for a task. An activity worker polls Step Functions by
using GetActivityTask, and sending the ARN for the related activity.
using GetActivityTask, and sending the ARN for the related activity.

After the activity worker completes its work, it can provide a report of its
success or failure by using `SendTaskSuccess` or `SendTaskFailure`. These two
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@
"Arn"
]
},
"\",\"Payload.$\":\"$\"}},\"Check the job state\":{\"Next\":\"Job Complete?\",\"Retry\":[{\"ErrorEquals\":[\"Lambda.ServiceException\",\"Lambda.AWSLambdaException\",\"Lambda.SdkClientException\"],\"IntervalSeconds\":2,\"MaxAttempts\":6,\"BackoffRate\":2}],\"Type\":\"Task\",\"OutputPath\":\"$.Payload\",\"Resource\":\"arn:",
"\",\"Payload.$\":\"$\"}},\"Check the job state\":{\"Next\":\"Job Complete?\",\"Retry\":[{\"ErrorEquals\":[\"Lambda.ServiceException\",\"Lambda.AWSLambdaException\",\"Lambda.SdkClientException\"],\"IntervalSeconds\":2,\"MaxAttempts\":6,\"BackoffRate\":2}],\"Type\":\"Task\",\"ResultSelector\":{\"status.$\":\"$.Payload.status\"},\"Resource\":\"arn:",
wong-a marked this conversation as resolved.
Show resolved Hide resolved
{
"Ref": "AWS::Partition"
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,9 @@ const checkJobStateLambda = new Function(stack, 'checkJobStateLambda', {

const checkJobState = new LambdaInvoke(stack, 'Check the job state', {
lambdaFunction: checkJobStateLambda,
outputPath: '$.Payload',
resultSelector: {
status: sfn.JsonPath.stringAt('$.Payload.status'),
},
wong-a marked this conversation as resolved.
Show resolved Hide resolved
});

const isComplete = new sfn.Choice(stack, 'Job Complete?');
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,58 @@ describe('LambdaInvoke', () => {
}));
});

test('resultSelector', () => {
// WHEN
const task = new LambdaInvoke(stack, 'Task', {
lambdaFunction,
resultSelector: {
Result: sfn.JsonPath.stringAt('$.output.Payload'),
},
});

// THEN
expect(stack.resolve(task.toStateJson())).toEqual(expect.objectContaining({
Type: 'Task',
Resource: {
'Fn::Join': [
'',
[
'arn:',
{
Ref: 'AWS::Partition',
},
':states:::lambda:invoke',
],
],
},
End: true,
Parameters: {
FunctionName: {
'Fn::GetAtt': [
'Fn9270CBC0',
'Arn',
],
},
'Payload.$': '$',
},
ResultSelector: {
'Result.$': '$.output.Payload',
},
Retry: [
{
ErrorEquals: [
'Lambda.ServiceException',
'Lambda.AWSLambdaException',
'Lambda.SdkClientException',
],
IntervalSeconds: 2,
MaxAttempts: 6,
BackoffRate: 2,
},
],
}));
});

test('invoke Lambda function and wait for task token', () => {
// GIVEN
const task = new LambdaInvoke(stack, 'Task', {
Expand Down
15 changes: 15 additions & 0 deletions packages/@aws-cdk/aws-stepfunctions/lib/states/map.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,20 @@ export interface MapProps {
*/
readonly parameters?: { [key: string]: any };

/**
* The JSON that will replace the state's raw result and become the effective
* result before ResultPath is applied.
*
* You can use ResultSelector to create a payload with values that are static
* or selected from the state's raw result.
*
* @see
* https://docs.aws.amazon.com/step-functions/latest/dg/input-output-inputpath-params.html#input-output-resultselector
*
* @default - None
*/
readonly resultSelector?: { [key: string]: any };

/**
* MaxConcurrency
*
Expand Down Expand Up @@ -158,6 +172,7 @@ export class Map extends State implements INextable {
...this.renderNextEnd(),
...this.renderInputOutput(),
...this.renderParameters(),
...this.renderResultSelector(),
...this.renderRetryCatch(),
...this.renderIterator(),
...this.renderItemsPath(),
Expand Down
15 changes: 15 additions & 0 deletions packages/@aws-cdk/aws-stepfunctions/lib/states/parallel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,20 @@ export interface ParallelProps {
* @default $
*/
readonly resultPath?: string;

/**
* The JSON that will replace the state's raw result and become the effective
* result before ResultPath is applied.
*
* You can use ResultSelector to create a payload with values that are static
* or selected from the state's raw result.
*
* @see
* https://docs.aws.amazon.com/step-functions/latest/dg/input-output-inputpath-params.html#input-output-resultselector
*
* @default - None
*/
readonly resultSelector?: { [key: string]: any };
}

/**
Expand Down Expand Up @@ -117,6 +131,7 @@ export class Parallel extends State implements INextable {
...this.renderInputOutput(),
...this.renderRetryCatch(),
...this.renderBranches(),
...this.renderResultSelector(),
};
}

Expand Down
27 changes: 26 additions & 1 deletion packages/@aws-cdk/aws-stepfunctions/lib/states/state.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { IConstruct, Construct, Node } from 'constructs';
import { Condition } from '../condition';
import { JsonPath } from '../fields';
import { FieldUtils, JsonPath } from '../fields';
import { StateGraph } from '../state-graph';
import { CatchProps, Errors, IChainable, INextable, RetryProps } from '../types';

Expand Down Expand Up @@ -58,6 +58,20 @@ export interface StateProps {
* @default $
*/
readonly resultPath?: string;

/**
* The JSON that will replace the state's raw result and become the effective
* result before ResultPath is applied.
*
* You can use ResultSelector to create a payload with values that are static
* or selected from the state's raw result.
*
* @see
* https://docs.aws.amazon.com/step-functions/latest/dg/input-output-inputpath-params.html#input-output-resultselector
*
* @default - None
*/
readonly resultSelector?: { [key: string]: any };
}

/**
Expand Down Expand Up @@ -149,6 +163,7 @@ export abstract class State extends CoreConstruct implements IChainable {
protected readonly parameters?: object;
protected readonly outputPath?: string;
protected readonly resultPath?: string;
protected readonly resultSelector?: object;
protected readonly branches: StateGraph[] = [];
protected iteration?: StateGraph;
protected defaultChoice?: State;
Expand Down Expand Up @@ -187,6 +202,7 @@ export abstract class State extends CoreConstruct implements IChainable {
this.parameters = props.parameters;
this.outputPath = props.outputPath;
this.resultPath = props.resultPath;
this.resultSelector = props.resultSelector;
}

public get id() {
Expand Down Expand Up @@ -398,6 +414,15 @@ export abstract class State extends CoreConstruct implements IChainable {
};
}

/**
* Render ResultSelector in ASL JSON format
*/
protected renderResultSelector(): any {
return FieldUtils.renderObject({
ResultSelector: this.resultSelector,
});
}

/**
* Called whenever this state is bound to a graph
*
Expand Down
15 changes: 15 additions & 0 deletions packages/@aws-cdk/aws-stepfunctions/lib/states/task-base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,20 @@ export interface TaskStateBaseProps {
*/
readonly resultPath?: string;

/**
* The JSON that will replace the state's raw result and become the effective
* result before ResultPath is applied.
*
* You can use ResultSelector to create a payload with values that are static
* or selected from the state's raw result.
*
* @see
* https://docs.aws.amazon.com/step-functions/latest/dg/input-output-inputpath-params.html#input-output-resultselector
*
* @default - None
*/
readonly resultSelector?: { [key: string]: any };

/**
* Timeout for the state machine
*
Expand Down Expand Up @@ -269,6 +283,7 @@ export abstract class TaskStateBase extends State implements INextable {
InputPath: renderJsonPath(this.inputPath),
OutputPath: renderJsonPath(this.outputPath),
ResultPath: renderJsonPath(this.resultPath),
...this.renderResultSelector(),
};
}
}
Expand Down
41 changes: 41 additions & 0 deletions packages/@aws-cdk/aws-stepfunctions/test/map.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,47 @@ describe('Map State', () => {
},
});
}),
test('State Machine With Map State and ResultSelector', () => {
// GIVEN
const stack = new cdk.Stack();

// WHEN
const map = new stepfunctions.Map(stack, 'Map State', {
maxConcurrency: 1,
itemsPath: stepfunctions.JsonPath.stringAt('$.inputForMap'),
resultSelector: {
buz: 'buz',
baz: stepfunctions.JsonPath.stringAt('$.baz'),
},
});
map.iterator(new stepfunctions.Pass(stack, 'Pass State'));

// THEN
expect(render(map)).toStrictEqual({
StartAt: 'Map State',
States: {
'Map State': {
Type: 'Map',
End: true,
Iterator: {
StartAt: 'Pass State',
States: {
'Pass State': {
Type: 'Pass',
End: true,
},
},
},
ItemsPath: '$.inputForMap',
MaxConcurrency: 1,
ResultSelector: {
'buz': 'buz',
'baz.$': '$.baz',
},
},
},
});
}),
test('synth is successful', () => {
const app = createAppWithMap((stack) => {
const map = new stepfunctions.Map(stack, 'Map State', {
Expand Down
32 changes: 32 additions & 0 deletions packages/@aws-cdk/aws-stepfunctions/test/parallel.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,38 @@ describe('Parallel State', () => {
},
});
});

test('State Machine With Parallel State and ResultSelector', () => {
// GIVEN
const stack = new cdk.Stack();

// WHEN
const parallel = new stepfunctions.Parallel(stack, 'Parallel State', {
resultSelector: {
buz: 'buz',
baz: stepfunctions.JsonPath.stringAt('$.baz'),
},
});
parallel.branch(new stepfunctions.Pass(stack, 'Branch 1'));

// THEN
expect(render(parallel)).toStrictEqual({
StartAt: 'Parallel State',
States: {
'Parallel State': {
Type: 'Parallel',
End: true,
Branches: [
{ StartAt: 'Branch 1', States: { 'Branch 1': { Type: 'Pass', End: true } } },
],
ResultSelector: {
'buz': 'buz',
'baz.$': '$.baz',
},
},
},
});
});
});

function render(sm: stepfunctions.IChainable) {
Expand Down
Loading