Skip to content

Commit

Permalink
Kinesis data streams ingestion functionality checkin
Browse files Browse the repository at this point in the history
Signed-off-by: Souvik Bose <souvbose@amazon.com>
  • Loading branch information
sbose2k21 committed Aug 14, 2024
1 parent 642db0d commit 50f02c8
Show file tree
Hide file tree
Showing 25 changed files with 2,284 additions and 1 deletion.
43 changes: 43 additions & 0 deletions data-prepper-plugins/kinesis-source/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

plugins {
id 'java'
}

dependencies {
implementation project(':data-prepper-api')
implementation project(':data-prepper-plugins:common')
implementation project(':data-prepper-plugins:buffer-common')
implementation libs.armeria.core
implementation 'com.fasterxml.jackson.core:jackson-core'
implementation 'com.fasterxml.jackson.core:jackson-databind'
implementation project(':data-prepper-plugins:blocking-buffer')
implementation 'software.amazon.awssdk:kinesis'
implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-ion'
implementation 'software.amazon.kinesis:amazon-kinesis-client:2.6.0'
compileOnly 'org.projectlombok:lombok:1.18.20'
annotationProcessor 'org.projectlombok:lombok:1.18.20'
implementation("software.amazon.awssdk:dynamodb")
implementation("com.amazonaws:aws-java-sdk:1.12.394")
implementation project(path: ':data-prepper-plugins:aws-plugin-api')

testImplementation 'org.yaml:snakeyaml:2.2'
testImplementation project(':data-prepper-test-common')
testImplementation platform('org.junit:junit-bom:5.9.1')
testImplementation 'org.junit.jupiter:junit-jupiter'
testImplementation project(':data-prepper-test-event')
}

jacocoTestCoverageVerification {
dependsOn jacocoTestReport
violationRules {
rule { //in addition to core projects rule
limit {
minimum = 1.0
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package org.opensearch.dataprepper.plugins.source.kinesis;

import org.opensearch.dataprepper.aws.api.AwsCredentialsOptions;
import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier;
import org.opensearch.dataprepper.plugins.source.kinesis.configuration.AwsAuthenticationConfig;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.kinesis.common.KinesisClientUtil;

public class ClientFactory {
private final AwsCredentialsProvider awsCredentialsProvider;
private final AwsAuthenticationConfig awsAuthenticationConfig;

public ClientFactory(final AwsCredentialsSupplier awsCredentialsSupplier,
final AwsAuthenticationConfig awsAuthenticationConfig) {
awsCredentialsProvider = awsCredentialsSupplier.getProvider(AwsCredentialsOptions.builder()
.withRegion(awsAuthenticationConfig.getAwsRegion())
.withStsRoleArn(awsAuthenticationConfig.getAwsStsRoleArn())
.withStsExternalId(awsAuthenticationConfig.getAwsStsExternalId())
.withStsHeaderOverrides(awsAuthenticationConfig.getAwsStsHeaderOverrides())
.build());
this.awsAuthenticationConfig = awsAuthenticationConfig;
}

public DynamoDbAsyncClient buildDynamoDBClient() {
return DynamoDbAsyncClient.builder()
.credentialsProvider(awsAuthenticationConfig.authenticateAwsConfiguration())
.region(awsAuthenticationConfig.getAwsRegion())
.build();
}

public KinesisAsyncClient buildKinesisAsyncClient() {
return KinesisClientUtil.createKinesisAsyncClient(
KinesisAsyncClient.builder()
.credentialsProvider(awsAuthenticationConfig.authenticateAwsConfiguration())
.region(awsAuthenticationConfig.getAwsRegion())
);
}

public CloudWatchAsyncClient buildCloudWatchAsyncClient() {
return CloudWatchAsyncClient.builder()
.credentialsProvider(awsAuthenticationConfig.authenticateAwsConfiguration())
.region(awsAuthenticationConfig.getAwsRegion())
.build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package org.opensearch.dataprepper.plugins.source.kinesis;

import org.opensearch.dataprepper.plugins.source.kinesis.configuration.KinesisSourceConfig;
import org.opensearch.dataprepper.plugins.source.kinesis.configuration.KinesisStreamConfig;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest;
import software.amazon.awssdk.services.kinesis.model.DescribeStreamResponse;
import software.amazon.awssdk.services.kinesis.model.StreamDescription;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.common.StreamConfig;
import software.amazon.kinesis.common.StreamIdentifier;
import software.amazon.kinesis.processor.FormerStreamsLeasesDeletionStrategy;
import software.amazon.kinesis.processor.MultiStreamTracker;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;


public class KinesisMultiStreamTracker implements MultiStreamTracker {
private static final String COLON = ":";

private final KinesisAsyncClient kinesisClient;
private final KinesisSourceConfig sourceConfig;
private final String applicationName;

public KinesisMultiStreamTracker(KinesisAsyncClient kinesisClient, final KinesisSourceConfig sourceConfig, final String applicationName) {
this.kinesisClient = kinesisClient;
this.sourceConfig = sourceConfig;
this.applicationName = applicationName;
}

@Override
public List<StreamConfig> streamConfigList() {
List<StreamConfig> streamConfigList = new ArrayList<>();
for (KinesisStreamConfig kinesisStreamConfig : sourceConfig.getStreams()) {
StreamConfig streamConfig;
try {
streamConfig = getStreamConfig(kinesisStreamConfig);
} catch (Exception e) {
throw new RuntimeException(e);
}
streamConfigList.add(streamConfig);
}
return streamConfigList;
}

private StreamConfig getStreamConfig(KinesisStreamConfig kinesisStreamConfig) throws Exception {
StreamIdentifier sourceStreamIdentifier = getStreamIdentifier(kinesisStreamConfig);
return new StreamConfig(sourceStreamIdentifier,
InitialPositionInStreamExtended.newInitialPosition(kinesisStreamConfig.getInitialPosition()));
}

private StreamIdentifier getStreamIdentifier(KinesisStreamConfig kinesisStreamConfig) throws Exception {
DescribeStreamRequest describeStreamRequest = DescribeStreamRequest.builder()
.streamName(kinesisStreamConfig.getName())
.build();
DescribeStreamResponse describeStreamResponse = kinesisClient.describeStream(describeStreamRequest).get();
String streamIdentifierString = getStreamIdentifierString(describeStreamResponse.streamDescription());
return StreamIdentifier.multiStreamInstance(streamIdentifierString);
}

private String getStreamIdentifierString(StreamDescription streamDescription) {
String accountId = streamDescription.streamARN().split(COLON)[4];
long creationEpochSecond = streamDescription.streamCreationTimestamp().getEpochSecond();
return String.join(COLON, accountId, streamDescription.streamName(), String.valueOf(creationEpochSecond));
}

/**
* Setting the deletion policy as autodetect and release shard lease with a wait time of 10 sec
*/
@Override
public FormerStreamsLeasesDeletionStrategy formerStreamsLeasesDeletionStrategy() {
return new FormerStreamsLeasesDeletionStrategy.AutoDetectionAndDeferredDeletionStrategy() {
@Override
public Duration waitPeriodToDeleteFormerStreams() {
return Duration.ofSeconds(10);
}
};

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
package org.opensearch.dataprepper.plugins.source.kinesis;

import io.micrometer.core.instrument.util.StringUtils;
import lombok.Setter;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.configuration.PipelineDescription;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.plugin.PluginFactory;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.plugins.source.kinesis.configuration.ConsumerStrategy;
import org.opensearch.dataprepper.plugins.source.kinesis.configuration.KinesisSourceConfig;
import org.opensearch.dataprepper.plugins.source.kinesis.processor.KinesisShardRecordProcessorFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.dynamodb.model.BillingMode;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.kinesis.common.ConfigsBuilder;
import software.amazon.kinesis.coordinator.Scheduler;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
import software.amazon.kinesis.retrieval.polling.PollingConfig;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class KinesisService {
private static final Logger LOG = LoggerFactory.getLogger(KinesisService.class);
private static final int GRACEFUL_SHUTDOWN_WAIT_INTERVAL_SECONDS = 20;

private final PluginMetrics pluginMetrics;
private final PluginFactory pluginFactory;

private final String applicationName;
private final String tableName;
private final String pipelineName;
private final AcknowledgementSetManager acknowledgementSetManager;
private final KinesisSourceConfig sourceConfig;
private final KinesisAsyncClient kinesisClient;
private final DynamoDbAsyncClient dynamoDbClient;
private final CloudWatchAsyncClient cloudWatchClient;

@Setter
private Scheduler scheduler;

private final ExecutorService executorService;

public KinesisService(final KinesisSourceConfig sourceConfig,
final ClientFactory clientFactory,
final PluginMetrics pluginMetrics,
final PluginFactory pluginFactory,
final PipelineDescription pipelineDescription,
final AcknowledgementSetManager acknowledgementSetManager
){
this.sourceConfig = sourceConfig;
this.pluginMetrics = pluginMetrics;
this.pluginFactory = pluginFactory;
this.acknowledgementSetManager = acknowledgementSetManager;
this.dynamoDbClient = clientFactory.buildDynamoDBClient();
this.kinesisClient = clientFactory.buildKinesisAsyncClient();
this.cloudWatchClient = clientFactory.buildCloudWatchAsyncClient();
this.pipelineName = pipelineDescription.getPipelineName();
this.applicationName = pipelineName;
this.tableName = StringUtils.isNotEmpty(sourceConfig.getLeaseCoordinationTable()) ?
sourceConfig.getLeaseCoordinationTable() : applicationName;
this.executorService = Executors.newFixedThreadPool(1);
}

public void start(final Buffer<Record<Event>> buffer) {
if (buffer == null) {
throw new IllegalStateException("Buffer provided is null");
}

if (sourceConfig.getStreams() == null || sourceConfig.getStreams().isEmpty()) {
throw new IllegalStateException("Streams are empty!");
}

scheduler = getScheduler(buffer);
executorService.execute(scheduler);
}

public void shutDown() {
LOG.info("Stop request received for Kinesis Source");

Future<Boolean> gracefulShutdownFuture = scheduler.startGracefulShutdown();
LOG.info("Waiting up to {} seconds for shutdown to complete.", GRACEFUL_SHUTDOWN_WAIT_INTERVAL_SECONDS);
try {
gracefulShutdownFuture.get(GRACEFUL_SHUTDOWN_WAIT_INTERVAL_SECONDS, TimeUnit.SECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException ex) {
LOG.error("Exception while executing kinesis consumer graceful shutdown, doing force shutdown", ex);
scheduler.shutdown();
}
LOG.info("Completed, shutting down now.");
}

public Scheduler getScheduler(final Buffer<Record<Event>> buffer) {
if (scheduler == null) {
return createScheduler(buffer);
}
return scheduler;
}

public Scheduler createScheduler(final Buffer<Record<Event>> buffer) {
final ShardRecordProcessorFactory processorFactory = new KinesisShardRecordProcessorFactory(
buffer, sourceConfig, acknowledgementSetManager, pluginMetrics, pluginFactory);

ConfigsBuilder configsBuilder =
new ConfigsBuilder(
new KinesisMultiStreamTracker(kinesisClient, sourceConfig, applicationName),
applicationName, kinesisClient, dynamoDbClient, cloudWatchClient,
new WorkerIdentifierGenerator().generate(), processorFactory
).tableName(applicationName);

ConsumerStrategy consumerStrategy = sourceConfig.getConsumerStrategy();
if (consumerStrategy == ConsumerStrategy.POLLING) {
configsBuilder.retrievalConfig().retrievalSpecificConfig(
new PollingConfig(kinesisClient)
.maxRecords(sourceConfig.getPollingConfig().getMaxPollingRecords())
.idleTimeBetweenReadsInMillis(
sourceConfig.getPollingConfig().getIdleTimeBetweenReadsInMillis()));
}

return new Scheduler(
configsBuilder.checkpointConfig(),
configsBuilder.coordinatorConfig(),
configsBuilder.leaseManagementConfig()
.billingMode(BillingMode.PAY_PER_REQUEST),
configsBuilder.lifecycleConfig(),
configsBuilder.metricsConfig(),
configsBuilder.processorConfig(),
configsBuilder.retrievalConfig()
);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package org.opensearch.dataprepper.plugins.source.kinesis;

import lombok.Setter;
import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.source.Source;
import org.opensearch.dataprepper.model.configuration.PipelineDescription;
import org.opensearch.dataprepper.model.plugin.PluginFactory;
import org.opensearch.dataprepper.plugins.source.kinesis.configuration.KinesisSourceConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@DataPrepperPlugin(name = "kinesis", pluginType = Source.class, pluginConfigurationType = KinesisSourceConfig.class)
public class KinesisSource implements Source<Record<Event>> {
private static final Logger LOG = LoggerFactory.getLogger(KinesisSource.class);
private final KinesisSourceConfig kinesisSourceConfig;

@Setter
private KinesisService kinesisService;

@DataPrepperPluginConstructor
public KinesisSource(final KinesisSourceConfig kinesisSourceConfig,
final PluginMetrics pluginMetrics,
final PluginFactory pluginFactory,
final PipelineDescription pipelineDescription,
final AwsCredentialsSupplier awsCredentialsSupplier,
final AcknowledgementSetManager acknowledgementSetManager) {
this.kinesisSourceConfig = kinesisSourceConfig;
ClientFactory clientFactory = new ClientFactory(awsCredentialsSupplier, kinesisSourceConfig.getAwsAuthenticationConfig());
this.kinesisService = new KinesisService(kinesisSourceConfig, clientFactory, pluginMetrics, pluginFactory, pipelineDescription, acknowledgementSetManager);
}
@Override
public void start(final Buffer<Record<Event>> buffer) {
if (buffer == null) {
throw new IllegalStateException("Buffer provided is null");
}

kinesisService.start(buffer);
}

@Override
public void stop() {
kinesisService.shutDown();
}

@Override
public boolean areAcknowledgementsEnabled() {
return kinesisSourceConfig.isAcknowledgments();
}
}
Loading

0 comments on commit 50f02c8

Please sign in to comment.