-
Notifications
You must be signed in to change notification settings - Fork 208
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add new metrics for tracking bytes received and processed by KDS #5237
base: main
Are you sure you want to change the base?
Conversation
@@ -177,6 +191,7 @@ public void processRecords(ProcessRecordsInput processRecordsInput) { | |||
// Flush buffer at the end | |||
bufferAccumulator.flush(); | |||
recordsProcessed.increment(eventCount); | |||
bytesProcessedSummary.record(recordBytes.stream().mapToLong(Integer::longValue).sum()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These two metrics are the same. We should remove the bytesReceived
and keep only bytesProcessed
.
@@ -177,6 +191,7 @@ public void processRecords(ProcessRecordsInput processRecordsInput) { | |||
// Flush buffer at the end | |||
bufferAccumulator.flush(); | |||
recordsProcessed.increment(eventCount); | |||
bytesProcessedSummary.record(recordBytes.stream().mapToLong(Integer::longValue).sum()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should record each record independently. This will allow us to know the record size as well.
processRecordsInput.records()
.stream()
.map(kinesisClientRecord.data().remaining())
.map(Integer::longValue)
.forEach(bytesProcessedSummary::record);
@AllArgsConstructor | ||
public class KinesisInputOutputRecord { | ||
private Record<Event> dataPrepperRecord; | ||
private KinesisClientRecord kinesisClientRecord; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rather than passing this entire record I think you could just provide it as the byte size.
private long recordBytesCount;
Event event = record.getData(); | ||
for (KinesisInputOutputRecord kinesisInputOutputRecord: kinesisOutputRecords) { | ||
Record<Event> dataPrepperRecord = kinesisInputOutputRecord.getDataPrepperRecord(); | ||
int incomingRecordSizeBytes = kinesisInputOutputRecord.getKinesisClientRecord().data().position(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you use ByteBuffer's capacity
instead?
https://docs.oracle.com/javase/8/docs/api/java/nio/Buffer.html#capacity--
Per my comment above, you could calculate this elsewhere.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@dlvenable - It looks like capacity
is the total size of the buffer. To get KCL record bytes, it might not be using the entire capacity. However, position
returns the offset upto which the data has been read.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you!
Signed-off-by: Souvik Bose <souvbose@amazon.com>
Signed-off-by: Souvik Bose <souvbose@amazon.com>
Signed-off-by: Souvik Bose <souvbose@amazon.com>
e7138ac
to
d271864
Compare
acknowledgementSetOpt.ifPresent(acknowledgementSet -> acknowledgementSet.add(event)); | ||
|
||
bufferAccumulator.add(record); | ||
bufferAccumulator.add(dataPrepperRecord); | ||
bytesProcessedSummary.record(incomingRecordSizeBytes); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you have this line repeated in line number 180 as well? Not sure if this is intentional
Description
This PR is to add two new metrics for KDS source plugin named
bytesReceived
andbytesProcessed
to track the size of throughput being ingested from Kinesis Data Streams.Issues Resolved
Resolves #1082
Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.