Skip to content

Commit

Permalink
Rename function and address comments.
Browse files Browse the repository at this point in the history
Signed-off-by: Souvik Bose <souvbose@amazon.com>
  • Loading branch information
sbose2k21 committed Sep 19, 2024
1 parent 92a797b commit 806f78d
Show file tree
Hide file tree
Showing 6 changed files with 64 additions and 69 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

package org.opensearch.dataprepper.plugins.kinesis.source;

import com.amazonaws.arn.Arn;
import org.opensearch.dataprepper.plugins.kinesis.source.configuration.KinesisSourceConfig;
import org.opensearch.dataprepper.plugins.kinesis.source.configuration.KinesisStreamConfig;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
Expand Down Expand Up @@ -44,34 +45,29 @@ public KinesisMultiStreamTracker(KinesisAsyncClient kinesisClient, final Kinesis
public List<StreamConfig> streamConfigList() {
List<StreamConfig> streamConfigList = new ArrayList<>();
for (KinesisStreamConfig kinesisStreamConfig : sourceConfig.getStreams()) {
StreamConfig streamConfig;
try {
streamConfig = getStreamConfig(kinesisStreamConfig);
} catch (Exception e) {
throw new RuntimeException(e);
}
StreamConfig streamConfig = getStreamConfig(kinesisStreamConfig);
streamConfigList.add(streamConfig);
}
return streamConfigList;
}

private StreamConfig getStreamConfig(KinesisStreamConfig kinesisStreamConfig) throws Exception {
private StreamConfig getStreamConfig(KinesisStreamConfig kinesisStreamConfig) {
StreamIdentifier sourceStreamIdentifier = getStreamIdentifier(kinesisStreamConfig);
return new StreamConfig(sourceStreamIdentifier,
InitialPositionInStreamExtended.newInitialPosition(kinesisStreamConfig.getInitialPosition()));
}

private StreamIdentifier getStreamIdentifier(KinesisStreamConfig kinesisStreamConfig) throws Exception {
private StreamIdentifier getStreamIdentifier(KinesisStreamConfig kinesisStreamConfig) {
DescribeStreamRequest describeStreamRequest = DescribeStreamRequest.builder()
.streamName(kinesisStreamConfig.getName())
.build();
DescribeStreamResponse describeStreamResponse = kinesisClient.describeStream(describeStreamRequest).get();
DescribeStreamResponse describeStreamResponse = kinesisClient.describeStream(describeStreamRequest).join();
String streamIdentifierString = getStreamIdentifierString(describeStreamResponse.streamDescription());
return StreamIdentifier.multiStreamInstance(streamIdentifierString);
}

private String getStreamIdentifierString(StreamDescription streamDescription) {
String accountId = streamDescription.streamARN().split(COLON)[4];
String accountId = Arn.fromString(streamDescription.streamARN()).getAccountId();
long creationEpochSecond = streamDescription.streamCreationTimestamp().getEpochSecond();
return String.join(COLON, accountId, streamDescription.streamName(), String.valueOf(creationEpochSecond));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public synchronized void markSequenceNumberForCheckpoint(final ExtendedSequenceN
checkpointerRecordList.get(extendedSequenceNumber).setReadyToCheckpoint(true);
}

public synchronized Optional<KinesisCheckpointerRecord> getLatestAvailableCheckpointRecord() {
public synchronized Optional<KinesisCheckpointerRecord> popLatestReadyToCheckpointRecord() {
Optional<KinesisCheckpointerRecord> kinesisCheckpointerRecordOptional = Optional.empty();
List<ExtendedSequenceNumber> toRemoveRecords = new ArrayList<>();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,38 +49,42 @@

public class KinesisRecordProcessor implements ShardRecordProcessor {
private static final Logger LOG = LoggerFactory.getLogger(KinesisRecordProcessor.class);

private static final int DEFAULT_MONITOR_WAIT_TIME_MS = 15_000;
private static final Duration ACKNOWLEDGEMENT_SET_TIMEOUT = Duration.ofSeconds(20);

private final StreamIdentifier streamIdentifier;
private final KinesisStreamConfig kinesisStreamConfig;
private final Duration checkpointInterval;
private final KinesisSourceConfig kinesisSourceConfig;
private final BufferAccumulator<Record<Event>> bufferAccumulator;
private final KinesisRecordConverter kinesisRecordConverter;
private final KinesisCheckpointerTracker kinesisCheckpointerTracker;
private final ExecutorService executorService;
private String kinesisShardId;
private long lastCheckpointTimeInMillis;
private final int bufferTimeoutMillis;
private final AcknowledgementSetManager acknowledgementSetManager;

private final Counter acknowledgementSetSuccesses;
private final Counter acknowledgementSetFailures;
private final Counter recordsProcessed;
private final Counter recordProcessingErrors;
private final Counter checkpointFailures;
private static final Duration ACKNOWLEDGEMENT_SET_TIMEOUT = Duration.ofSeconds(20);
public static final String ACKNOWLEDGEMENT_SET_SUCCESS_METRIC_NAME = "acknowledgementSetSuccesses";
public static final String ACKNOWLEDGEMENT_SET_FAILURES_METRIC_NAME = "acknowledgementSetFailures";
public static final String KINESIS_RECORD_PROCESSED = "recordProcessed";
public static final String KINESIS_RECORD_PROCESSING_ERRORS = "recordProcessingErrors";
public static final String KINESIS_CHECKPOINT_FAILURES = "checkpointFailures";
public static final String KINESIS_STREAM_TAG_KEY = "stream";
private KinesisCheckpointerTracker kinesisCheckpointerTracker;
private final ExecutorService executorService;
private AtomicBoolean isStopRequested;

public KinesisRecordProcessor(final BufferAccumulator<Record<Event>> bufferAccumulator,
final KinesisSourceConfig kinesisSourceConfig,
final AcknowledgementSetManager acknowledgementSetManager,
final PluginMetrics pluginMetrics,
final KinesisRecordConverter kinesisRecordConverter,
final KinesisCheckpointerTracker kinesisCheckpointerTracker,
final StreamIdentifier streamIdentifier) {
this.bufferTimeoutMillis = (int) kinesisSourceConfig.getBufferTimeout().toMillis();
this.streamIdentifier = streamIdentifier;
Expand All @@ -95,7 +99,7 @@ public KinesisRecordProcessor(final BufferAccumulator<Record<Event>> bufferAccum
this.checkpointFailures = pluginMetrics.counterWithTags(KINESIS_CHECKPOINT_FAILURES, KINESIS_STREAM_TAG_KEY, streamIdentifier.streamName());
this.checkpointInterval = kinesisStreamConfig.getCheckPointInterval();
this.bufferAccumulator = bufferAccumulator;
this.kinesisCheckpointerTracker = new KinesisCheckpointerTracker();
this.kinesisCheckpointerTracker = kinesisCheckpointerTracker;
this.executorService = Executors.newSingleThreadExecutor(BackgroundThreadFactory.defaultExecutorThreadFactory("kinesis-ack-monitor"));
this.isStopRequested = new AtomicBoolean(false);
}
Expand All @@ -120,16 +124,7 @@ public void initialize(InitializationInput initializationInput) {
private void monitorCheckpoint(final ExecutorService executorService) {
while (!isStopRequested.get()) {
if (System.currentTimeMillis() - lastCheckpointTimeInMillis >= checkpointInterval.toMillis()) {
LOG.debug("Regular checkpointing for shard {}", kinesisShardId);

Optional<KinesisCheckpointerRecord> kinesisCheckpointerRecordOptional = kinesisCheckpointerTracker.getLatestAvailableCheckpointRecord();
if (kinesisCheckpointerRecordOptional.isPresent()) {
RecordProcessorCheckpointer recordProcessorCheckpointer = kinesisCheckpointerRecordOptional.get().getCheckpointer();
String sequenceNumber = kinesisCheckpointerRecordOptional.get().getExtendedSequenceNumber().sequenceNumber();
Long subSequenceNumber = kinesisCheckpointerRecordOptional.get().getExtendedSequenceNumber().subSequenceNumber();
checkpoint(recordProcessorCheckpointer, sequenceNumber, subSequenceNumber);
lastCheckpointTimeInMillis = System.currentTimeMillis();
}
doCheckpoint();
}
try {
Thread.sleep(DEFAULT_MONITOR_WAIT_TIME_MS);
Expand Down Expand Up @@ -195,14 +190,7 @@ public void processRecords(ProcessRecordsInput processRecordsInput) {

// Checkpoint for shard
if (!acknowledgementsEnabled && (System.currentTimeMillis() - lastCheckpointTimeInMillis >= checkpointInterval.toMillis())) {
LOG.debug("Regular checkpointing for shard {}", kinesisShardId);

Optional<KinesisCheckpointerRecord> KinesisCheckpointerRecordOptional = kinesisCheckpointerTracker.getLatestAvailableCheckpointRecord();
if (KinesisCheckpointerRecordOptional.isPresent()) {
ExtendedSequenceNumber lastExtendedSequenceNumber = KinesisCheckpointerRecordOptional.get().getExtendedSequenceNumber();
checkpoint(processRecordsInput.checkpointer(), lastExtendedSequenceNumber.sequenceNumber(), lastExtendedSequenceNumber.subSequenceNumber());
lastCheckpointTimeInMillis = System.currentTimeMillis();
}
doCheckpoint();
}
} catch (Exception ex) {
recordProcessingErrors.increment();
Expand Down Expand Up @@ -242,6 +230,17 @@ public void checkpoint(RecordProcessorCheckpointer checkpointer, String sequence
}
}

private void doCheckpoint() {
LOG.debug("Regular checkpointing for shard {}", kinesisShardId);
Optional<KinesisCheckpointerRecord> kinesisCheckpointerRecordOptional = kinesisCheckpointerTracker.popLatestReadyToCheckpointRecord();
if (kinesisCheckpointerRecordOptional.isPresent()) {
ExtendedSequenceNumber lastExtendedSequenceNumber = kinesisCheckpointerRecordOptional.get().getExtendedSequenceNumber();
RecordProcessorCheckpointer recordProcessorCheckpointer = kinesisCheckpointerRecordOptional.get().getCheckpointer();
checkpoint(recordProcessorCheckpointer, lastExtendedSequenceNumber.sequenceNumber(), lastExtendedSequenceNumber.subSequenceNumber());
lastCheckpointTimeInMillis = System.currentTimeMillis();
}
}

private void checkpoint(RecordProcessorCheckpointer checkpointer) {
try {
String kinesisStream = streamIdentifier.streamName();
Expand All @@ -268,9 +267,4 @@ private ExtendedSequenceNumber getLatestSequenceNumberFromInput(final ProcessRec
}
return largestExtendedSequenceNumber;
}

@VisibleForTesting
public void setKinesisCheckpointerTracker(final KinesisCheckpointerTracker kinesisCheckpointerTracker) {
this.kinesisCheckpointerTracker = kinesisCheckpointerTracker;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ public ShardRecordProcessor shardRecordProcessor() {
public ShardRecordProcessor shardRecordProcessor(StreamIdentifier streamIdentifier) {
BufferAccumulator<Record<Event>> bufferAccumulator = BufferAccumulator.create(buffer,
kinesisSourceConfig.getNumberOfRecordsToAccumulate(), kinesisSourceConfig.getBufferTimeout());
return new KinesisRecordProcessor(bufferAccumulator, kinesisSourceConfig, acknowledgementSetManager, pluginMetrics, kinesisRecordConverter, streamIdentifier);
KinesisCheckpointerTracker kinesisCheckpointerTracker = new KinesisCheckpointerTracker();
return new KinesisRecordProcessor(bufferAccumulator, kinesisSourceConfig, acknowledgementSetManager,
pluginMetrics, kinesisRecordConverter, kinesisCheckpointerTracker, streamIdentifier);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,15 @@ void testCheckPointerAddAndGet() {
ExtendedSequenceNumber last = extendedSequenceNumberList.get(extendedSequenceNumberList.size()-1);
kinesisCheckpointerTracker.markSequenceNumberForCheckpoint(last);

Optional<KinesisCheckpointerRecord> checkpointRecord = kinesisCheckpointerTracker.getLatestAvailableCheckpointRecord();
Optional<KinesisCheckpointerRecord> checkpointRecord = kinesisCheckpointerTracker.popLatestReadyToCheckpointRecord();
assertTrue(checkpointRecord.isEmpty());
assertEquals(kinesisCheckpointerTracker.size(), numRecords);

int idx = random.nextInt(numRecords);
ExtendedSequenceNumber extendedSequenceNumber1 = extendedSequenceNumberList.get(idx);
kinesisCheckpointerTracker.markSequenceNumberForCheckpoint(extendedSequenceNumber1);

Optional<KinesisCheckpointerRecord> firstcheckpointer = kinesisCheckpointerTracker.getLatestAvailableCheckpointRecord();
Optional<KinesisCheckpointerRecord> firstcheckpointer = kinesisCheckpointerTracker.popLatestReadyToCheckpointRecord();
if (idx != 0) {
assertTrue(firstcheckpointer.isEmpty());
assertEquals(kinesisCheckpointerTracker.size(), numRecords);
Expand All @@ -79,7 +79,7 @@ void testGetLastCheckpointerAndStoreIsEmpty() {
kinesisCheckpointerTracker.markSequenceNumberForCheckpoint(extendedSequenceNumber);
}

Optional<KinesisCheckpointerRecord> checkpointer = kinesisCheckpointerTracker.getLatestAvailableCheckpointRecord();
Optional<KinesisCheckpointerRecord> checkpointer = kinesisCheckpointerTracker.popLatestReadyToCheckpointRecord();
assertTrue(checkpointer.isPresent());
assertEquals(0, kinesisCheckpointerTracker.size());
}
Expand All @@ -92,7 +92,7 @@ public void testMarkCheckpointerReadyForCheckpoint() {
ExtendedSequenceNumber extendedSequenceNumber = mock(ExtendedSequenceNumber.class);
assertThrows(IllegalArgumentException.class, () -> kinesisCheckpointerTracker.markSequenceNumberForCheckpoint(extendedSequenceNumber));

Optional<KinesisCheckpointerRecord> checkpointer = kinesisCheckpointerTracker.getLatestAvailableCheckpointRecord();
Optional<KinesisCheckpointerRecord> checkpointer = kinesisCheckpointerTracker.popLatestReadyToCheckpointRecord();
assertTrue(checkpointer.isEmpty());
}
}
Loading

0 comments on commit 806f78d

Please sign in to comment.