Skip to content

Commit

Permalink
Reject Personalize processor in ad hoc pipeline (#183)
Browse files Browse the repository at this point in the history
If we create a search pipeline as part of a search request, each
processor gets created as part of the search request. Since the
Amazon Personalize ranking processor creates a client (with a connection
pool and credential provider), this could get expensive if someone sends
a flurry of search requests with inline (ad hoc) pipelines.

Also, when a pipeline is created, we create each processor as a way of
validating the configuration. At that point, we don't need to initialize
the Personalize client, since that processor will just get
garbage-collected.

Signed-off-by: Michael Froh <froh@amazon.com>
  • Loading branch information
msfroh authored Jul 26, 2023
1 parent 83c8301 commit 64355c7
Show file tree
Hide file tree
Showing 2 changed files with 110 additions and 70 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ public Factory(PersonalizeClientSettings settings) {
}

@Override
public PersonalizeRankingResponseProcessor create(Map<String, Processor.Factory<SearchResponseProcessor>> processorFactories, String tag, String description, boolean ignoreFailure, Map<String, Object> config, PipelineContext pipelineContext) throws Exception {
public PersonalizeRankingResponseProcessor create(Map<String, Processor.Factory<SearchResponseProcessor>> processorFactories, String tag, String description, boolean ignoreFailure, Map<String, Object> config, PipelineContext pipelineContext) {
String personalizeCampaign = ConfigurationUtils.readStringProperty(TYPE, tag, config, CAMPAIGN_ARN_CONFIG_NAME);
String iamRoleArn = ConfigurationUtils.readOptionalStringProperty(TYPE, tag, config, IAM_ROLE_ARN_CONFIG_NAME);
String recipe = ConfigurationUtils.readStringProperty(TYPE, tag, config, RECIPE_CONFIG_NAME);
Expand All @@ -165,9 +165,25 @@ public PersonalizeRankingResponseProcessor create(Map<String, Processor.Factory<
PersonalizeIntelligentRankerConfiguration rankerConfig =
new PersonalizeIntelligentRankerConfiguration(personalizeCampaign, iamRoleArn, recipe, itemIdField, awsRegion, weight);
ValidationUtil.validatePersonalizeIntelligentRankerConfiguration(rankerConfig, TYPE, tag);
AWSCredentialsProvider credentialsProvider = PersonalizeCredentialsProviderFactory.getCredentialsProvider(personalizeClientSettings, iamRoleArn, awsRegion);
PersonalizeClient personalizeClient = clientBuilder.apply(credentialsProvider, awsRegion);

final PersonalizeClient personalizeClient;
switch (pipelineContext.getPipelineSource()) {
case SEARCH_REQUEST:
throw new IllegalStateException(TYPE + " processor may not be instantiated as part of a search request. Create a named search pipeline instead.");
case UPDATE_PIPELINE:
AWSCredentialsProvider credentialsProvider = PersonalizeCredentialsProviderFactory.getCredentialsProvider(personalizeClientSettings, iamRoleArn, awsRegion);
personalizeClient = clientBuilder.apply(credentialsProvider, awsRegion);
break;
case VALIDATE_PIPELINE:
default:
personalizeClient = null; // Do not instantiate client on validation
}
return new PersonalizeRankingResponseProcessor(tag, description, ignoreFailure, rankerConfig, personalizeClient);
}
}

PersonalizeClient getPersonalizeClient() {
// Visible for testing
return personalizeClient;
}
}
Loading

0 comments on commit 64355c7

Please sign in to comment.