Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support AWS Kinesis Data Streams as a Source #4836

Merged
merged 8 commits into from
Sep 19, 2024

Conversation

sb2k16
Copy link
Member

@sb2k16 sb2k16 commented Aug 14, 2024

Description

Implementation for a source plugin in data-prepper to support ingestion of Kinesis Data Streams. It uses KCL (Kinesls Client Library) under the hood for ingestion purposes.

KCL features used:

  • multi-stream support
  • lease coordination using a single dynamoDB table per pipeline
  • enhanced fan-out reads

Pipeline configuration for source:

  source:
    kinesis:
      streams:
        - stream_name: "stream-a"
        - stream_name: "stream-b"
          initial_position: "TRIM_HORIZON" # Default = "LATEST"
      aws:
          ...
      acknowledgments: true
      consumer_strategy: "fan-out" or "polling" # Default = "fan-out"
      codec:
        ndjson:

Issues Resolved

Resolves #1082

Check List

  • New functionality includes testing.
  • New functionality has a documentation issue. Please link to it in this PR.
    • New functionality has javadoc added
  • [X ] Commits are signed with a real name per the DCO

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

@@ -0,0 +1,50 @@
package org.opensearch.dataprepper.plugins.kinesis.source;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know we aren't following it consistently but I think it is better to name this as KinesisClientFactory.java. There are too many "ClientFactory.java" and it becomes search more difficult.

@sb2k16 sb2k16 force-pushed the kinesis-code-checkin branch from 1ca0077 to 09212fa Compare August 28, 2024 16:52
Copy link
Member

@dlvenable dlvenable left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sb2k16 , Thank you for this contribution. This initial PR review has a number of high-level comments.

data-prepper-plugins/kinesis-source/build.gradle Outdated Show resolved Hide resolved
data-prepper-plugins/kinesis-source/build.gradle Outdated Show resolved Hide resolved
data-prepper-plugins/kinesis-source/build.gradle Outdated Show resolved Hide resolved
data-prepper-plugins/kinesis-source/build.gradle Outdated Show resolved Hide resolved
data-prepper-plugins/kinesis-source/build.gradle Outdated Show resolved Hide resolved
data-prepper-plugins/kinesis-source/build.gradle Outdated Show resolved Hide resolved
}
}

private void processRecord(KinesisClientRecord record, Consumer<Record<Event>> eventConsumer) throws IOException {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should split the logic for processing an individual record out of this class to make the class easier to extend and test over time.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sb2k16 , Let's make this a separate class. Perhaps KinesisRecordConverter. It can be responsible for all the work for taking the record and using the codec. You can have some good tests on this as well.

We've had difficulty expanding our tests in other similar locations because we combine too many things in a single class.


// Checkpoint for shard
if (kinesisStreamConfig.isEnableCheckPoint() && System.currentTimeMillis() - lastCheckpointTimeInMillis > checkpointIntervalMilliSeconds) {
LOG.info("Regular checkpointing for shard " + kinesisShardId);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
LOG.info("Regular checkpointing for shard " + kinesisShardId);
LOG.debug("Regular checkpointing for shard {}", kinesisShardId);

Use Sl4fj string interpolation.

Also, this is probably more of a debug log.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @dlvenable. I have fixed this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updating the checkpoint synchronously is a pattern that we should move away from, as it can lead to this issue (#4422). You can create a separate thread to do this checkpointing. To handle it for acknowledgments, you can add a progress check (example here: #4918). Unless KCL handles the ownership aspect and the checkpointing is just to show how far it has gone in a shard

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sbose2k21 , I believe you missed this one.

@sb2k16 sb2k16 force-pushed the kinesis-code-checkin branch 3 times, most recently from 7da25c7 to ba2532d Compare August 29, 2024 18:45
this.awsAuthenticationConfig = awsAuthenticationConfig;
}

public DynamoDbAsyncClient buildDynamoDBClient(Region region) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it recommended to use the Async client? If so why?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @graytaylor0 for your review. This is required to construct ConfigsBuilder class in KCL

}

if (sourceConfig.getStreams() == null || sourceConfig.getStreams().isEmpty()) {
throw new IllegalStateException("Streams are empty!");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should throw InvalidPluginConfiugrationException here

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's also make a clearer and less excited message.

No Kinesis streams provided.


// Checkpoint for shard
if (kinesisStreamConfig.isEnableCheckPoint() && System.currentTimeMillis() - lastCheckpointTimeInMillis > checkpointIntervalMilliSeconds) {
LOG.info("Regular checkpointing for shard " + kinesisShardId);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updating the checkpoint synchronously is a pattern that we should move away from, as it can lead to this issue (#4422). You can create a separate thread to do this checkpointing. To handle it for acknowledgments, you can add a progress check (example here: #4918). Unless KCL handles the ownership aspect and the checkpointing is just to show how far it has gone in a shard

Copy link
Member

@dlvenable dlvenable left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @sb2k16 for this change! There are a few outstanding comments from the last review. And I have a few other comments as well.


@Getter
@JsonProperty("enable_checkpoint")
private boolean enableCheckPoint = DEFAULT_ENABLE_CHECKPOINT;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why would somebody disable checkpointing? Also, the default is false here. Why is that?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since, we will be enabling acknowledgements, this is set to false by default.

// Checkpoint for shard
if (kinesisStreamConfig.isEnableCheckPoint() && System.currentTimeMillis() - lastCheckpointTimeInMillis > checkpointInterval.toMillis()) {
LOG.info("Regular checkpointing for shard " + kinesisShardId);
checkpoint(processRecordsInput.checkpointer());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe we should not call this when end-to-end acknowledgements are enabled. Otherwise, we will checkpoint prematurely.

return acknowledgementSetManager.create((result) -> {
acknowledgementSetCallbackCounter.increment();
if (result) {
LOG.info("acknowledgements received");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be a debug level log.

LOG.info("acknowledgements received");
checkpoint(processRecordsInput.checkpointer());
} else {
LOG.info("acknowledgements received with false");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be a debug level log.

this.kinesisStreamConfig = getStreamConfig(kinesisSourceConfig);
final PluginModel codecConfiguration = kinesisSourceConfig.getCodec();
final PluginSetting codecPluginSettings = new PluginSetting(codecConfiguration.getPluginName(), codecConfiguration.getPluginSettings());
this.codec = pluginFactory.loadPlugin(InputCodec.class, codecPluginSettings);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might KCL create multiple of these processors? We can create the InputCodec once, either in KinesisService (probably the ideal location) or KinesisSharedRecordProcessorFactory. Then pass it in here. This way, multiple instances of KinesisRecordProcessor all share one instance of InputCodec.

This also prepares us to use #4838 once implemented.

Copy link
Member Author

@sb2k16 sb2k16 Sep 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @dlvenable. I have passed the KinesisRecordConverter object to each KinesisRecordProcessor which is instantiated in KinesisSharedRecordProcessorFactory.

kinesisRecordProcessor = new KinesisRecordProcessor(buffer, kinesisSourceConfig, acknowledgementSetManager, pluginMetrics, pluginFactory, streamIdentifier);
kinesisRecordProcessor.initialize(initializationInput);

Thread.sleep(2000);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use Awaitility instead of a sleep.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have modified the test to remove this.

kinesisRecordProcessor = new KinesisRecordProcessor(buffer, kinesisSourceConfig, acknowledgementSetManager, pluginMetrics, pluginFactory, streamIdentifier);
kinesisRecordProcessor.initialize(initializationInput);

Thread.sleep(2000);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use Awaitility instead of a sleep.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have modified the test to remove this.

kinesisRecordProcessor = new KinesisRecordProcessor(buffer, kinesisSourceConfig, acknowledgementSetManager, pluginMetrics, pluginFactory, streamIdentifier);
kinesisRecordProcessor.initialize(initializationInput);

Thread.sleep(2000);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use Awaitility instead of a sleep.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have modified the test to remove this.

kinesisRecordProcessor = new KinesisRecordProcessor(buffer, kinesisSourceConfig, acknowledgementSetManager, pluginMetrics, pluginFactory, streamIdentifier);
kinesisRecordProcessor.initialize(initializationInput);

Thread.sleep(2000);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use Awaitility instead of a sleep.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have modified the test to remove this.

@@ -0,0 +1,10 @@
package org.opensearch.dataprepper.plugins.kinesis.extension;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Every class needs a license header. Please add to all files.

See: https://github.com/opensearch-project/data-prepper/blob/main/CONTRIBUTING.md#license-headers

Hint: If you use IntelliJ you can configure it to automatically add this. And you can have it add the headers to all the files in this Gradle module (since they are all new).

Signed-off-by: Souvik Bose <souvbose@amazon.com>
Signed-off-by: Souvik Bose <souvbose@amazon.com>
Signed-off-by: Souvik Bose <souvbose@amazon.com>
Signed-off-by: Souvik Bose <souvbose@amazon.com>
Signed-off-by: Souvik Bose <souvbose@amazon.com>
Signed-off-by: Souvik Bose <souvbose@amazon.com>
@sb2k16 sb2k16 force-pushed the kinesis-code-checkin branch 3 times, most recently from 82aa510 to 57f5d42 Compare September 13, 2024 16:35
Signed-off-by: Souvik Bose <souvbose@amazon.com>
@sb2k16 sb2k16 force-pushed the kinesis-code-checkin branch from 57f5d42 to 92a797b Compare September 13, 2024 23:34

@JsonProperty("acknowledgments")
@Getter
private boolean acknowledgments = false;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we start making acknowledgements as true by default to all pull sources? If yes, we should start with this. @dlvenable

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should keep the behavior consistent for now. I do think we should revisit in Data Prepper 3.0. It may make sense to have a disable_acknowledgements: true in that case. But, let's do it consistently then.

this.recordProcessingErrors = pluginMetrics.counterWithTags(KINESIS_RECORD_PROCESSING_ERRORS, KINESIS_STREAM_TAG_KEY, streamIdentifier.streamName());
this.checkpointFailures = pluginMetrics.counterWithTags(KINESIS_CHECKPOINT_FAILURES, KINESIS_STREAM_TAG_KEY, streamIdentifier.streamName());
this.checkpointInterval = kinesisStreamConfig.getCheckPointInterval();
this.buffer = buffer;
this.bufferAccumulator = bufferAccumulator;
this.kinesisCheckpointerTracker = new KinesisCheckpointerTracker();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than include setKinesisCheckpointTracker, just pass this in the constructor. Then you can provide the mocks in the test using that. You can then remove the setter as this can lead to code bugs down the line. And you can make the field final.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @dlvenable . I have removed the setter method and passed in the checkpoint tracker through the constructor.

checkpointerRecordList.get(extendedSequenceNumber).setReadyToCheckpoint(true);
}

public synchronized Optional<KinesisCheckpointerRecord> getLatestAvailableCheckpointRecord() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe rename this to popLatestAvailableCheckpointRecord or something similar. It is mutating the structure and a get method doesn't convey that.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have renamed this method to popLatestReadyToCheckpointRecord


Optional<KinesisCheckpointerRecord> kinesisCheckpointerRecordOptional = kinesisCheckpointerTracker.getLatestAvailableCheckpointRecord();
if (kinesisCheckpointerRecordOptional.isPresent()) {
RecordProcessorCheckpointer recordProcessorCheckpointer = kinesisCheckpointerRecordOptional.get().getCheckpointer();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you combine these lines with 201-205?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @dlvenable. I have refactored these lines and added a new method doCheckpoint.

dlvenable
dlvenable previously approved these changes Sep 18, 2024
Copy link
Member

@dlvenable dlvenable left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @sb2k16 for this great contribution!

try {
streamConfig = getStreamConfig(kinesisStreamConfig);
} catch (Exception e) {
throw new RuntimeException(e);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What kind of exceptions get thrown here? If it's a config error we should throw InvalidPluginConfigurationException and provide context with the message where we can

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an internal StreamConfig class which is passed to KCL. It is not data prepper kinesis stream config.

}

private String getStreamIdentifierString(StreamDescription streamDescription) {
String accountId = streamDescription.streamARN().split(COLON)[4];
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this an actual ARN, or just an identifier? If Arn we should use the Arn class to get the account id

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @graytaylor0. I will fix this.

return new FormerStreamsLeasesDeletionStrategy.AutoDetectionAndDeferredDeletionStrategy() {
@Override
public Duration waitPeriodToDeleteFormerStreams() {
return Duration.ofSeconds(10);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What are trade offs for having this higher or lower?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is the strategy to perform cleaning up of the leases for streams that just has been deleted. The wait period is to allow for any pending processing of records before cleaning up the lease from the database.

this.applicationName = pipelineName;
this.workerIdentifierGenerator = workerIdentifierGenerator;
this.executorService = Executors.newFixedThreadPool(1);
final PluginModel codecConfiguration = kinesisSourceConfig.getCodec();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this codec the type of data they send to Kinesis? It's not just JSON records from Kinesis streams? Or is it always provided from Kinesis as just bytes?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When we read from Kinesis, it always comes in as bytes. Reference Record


@Override
public void leaseLost(LeaseLostInput leaseLostInput) {
LOG.debug("Lease Lost");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this worth having a metric for or does kinesis already track it? Why does this happen?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

KCL already tracks this through a metric. This can happen if the current worker holding the lease is evicted by another worker which acquires the lease for the specific shard.

Signed-off-by: Souvik Bose <souvbose@amazon.com>
@dlvenable dlvenable merged commit a3bd538 into opensearch-project:main Sep 19, 2024
71 of 74 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Support AWS Kinesis Data Streams as a Source
5 participants