Skip to content

Commit

Permalink
bump guava version (#2)
Browse files Browse the repository at this point in the history
* Bump Kafka to 2.5.1

* Revert "Bump Kafka to 2.5.1"

This reverts commit 01ab670.

* Bump to Kafka 2.5.1

* update version number

* Update distribution version

* remove dependency from parent pom

* update parent version in children pom

* Fix wrong k version number

* bump distribution xml version

* bump guava to 24.1.1; change error handling

* use getCause instead of hardcode message

* unwrap execution exception

* centralise exception handling

* refactor exception catching

* fix a wrong if condition

* make exception test more valuable

* address pr comment
  • Loading branch information
GordonCai authored Aug 14, 2020
1 parent 4085a41 commit fe628e6
Show file tree
Hide file tree
Showing 5 changed files with 35 additions and 20 deletions.
2 changes: 1 addition & 1 deletion s3/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>20.0</version>
<version>24.1.1-jre</version>
</dependency>
</dependencies>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.amazonaws.AmazonClientException;
import com.google.common.util.concurrent.RateLimiter;
import com.google.common.util.concurrent.UncheckedExecutionException;
import com.google.common.util.concurrent.UncheckedTimeoutException;
import com.instaclustr.kafka.connect.s3.AwsConnectorStringFormats;
import com.instaclustr.kafka.connect.s3.AwsStorageConnectorCommonConfig;
Expand All @@ -16,7 +17,9 @@

import java.io.IOException;
import java.util.*;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;

public class AwsStorageSourceTask extends SourceTask {
Expand Down Expand Up @@ -139,20 +142,24 @@ public List<SourceRecord> poll() {
} catch (InterruptedException e) {
log.info("Thread interrupted in poll. Shutting down", e);
Thread.currentThread().interrupt();
} catch (AmazonClientException exception) {
if (!exception.isRetryable()) {
throw exception;
} else {
log.warn("Retryable S3 service exception while reading from s3", exception);
} catch (UncheckedExecutionException | ExecutionException | TimeoutException e) {
Throwable exceptionCause = e.getCause();
if (exceptionCause instanceof AmazonClientException) {
AmazonClientException amazonClientException = (AmazonClientException) exceptionCause;
if (!amazonClientException.isRetryable()) {
throw amazonClientException;
} else {
log.warn("Retryable S3 service exception while reading from s3", e);
if (topicPartition != null) {
awsSourceReader.revertAwsReadPositionMarker(topicPartition);
}
}
} else if (exceptionCause instanceof IOException || e instanceof TimeoutException) {
log.warn("Retryable exception while reading from s3", e);
if (topicPartition != null) {
awsSourceReader.revertAwsReadPositionMarker(topicPartition);
}
}
} catch (IOException | UncheckedTimeoutException exception) {
log.warn("Retryable exception while reading from s3", exception);
if (topicPartition != null) {
awsSourceReader.revertAwsReadPositionMarker(topicPartition);
}
} catch (RuntimeException e){
throw e;
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.instaclustr.kafka.connect.s3.source;

import com.amazonaws.AmazonClientException;
import com.google.common.util.concurrent.SimpleTimeLimiter;
import com.google.common.util.concurrent.TimeLimiter;
import com.instaclustr.kafka.connect.s3.AwsConnectorStringFormats;
Expand All @@ -12,9 +13,7 @@
import java.io.IOException;
import java.io.InputStream;
import java.util.HashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.*;
import java.util.regex.Matcher;

/**
Expand Down Expand Up @@ -62,7 +61,7 @@ public TopicPartitionSegmentParser(final InputStream s3ObjectInputStream, final
this.topicPrefix = topicPrefix;
this.targetTopic = AwsConnectorStringFormats.generateTargetTopic(topicPrefix, topic);
this.singleThreadExecutor = Executors.newSingleThreadExecutor();
this.timeLimiter = new SimpleTimeLimiter(this.singleThreadExecutor);
this.timeLimiter = SimpleTimeLimiter.create(this.singleThreadExecutor);
}

public void closeResources() throws IOException, InterruptedException {
Expand Down Expand Up @@ -99,7 +98,7 @@ private SourceRecord getNextRecord() throws IOException { //blocking call

public SourceRecord getNextRecord(Long time, TimeUnit units) throws Exception {
try {
return this.timeLimiter.callWithTimeout(this::getNextRecord, time, units, true);
return this.timeLimiter.callWithTimeout(this::getNextRecord, time, units);
} catch (Exception e) {
this.closeResources(); //not possible to read from this stream after a timeout as read positions gets messed up
throw e;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
import java.time.temporal.ChronoUnit;
import java.util.Collections;
import java.util.HashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;

import static org.mockito.Mockito.*;

Expand Down Expand Up @@ -54,7 +56,7 @@ public void givenNonResponsiveObjectStreamResetReadPosition() throws Exception {
doReturn("test").when(mockTopicPartitionSegmentParser).getTopic();
doReturn(0).when(mockTopicPartitionSegmentParser).getPartition();

doThrow(new UncheckedTimeoutException()).when(mockTopicPartitionSegmentParser).getNextRecord(any(), any());
doThrow(new TimeoutException()).when(mockTopicPartitionSegmentParser).getNextRecord(any(), any());

AwsStorageSourceTask awsStorageSourceTask = new AwsStorageSourceTask(mockTransferManagerProvider, mockAwsSourceReader);
awsStorageSourceTask.poll();
Expand All @@ -71,7 +73,7 @@ public void givenObjectStreamThatThrowsIOExceptionResetReadPosition() throws Exc
doReturn(mockTopicPartitionSegmentParser).when(mockAwsSourceReader).getNextTopicPartitionSegmentParser();
doReturn("test").when(mockTopicPartitionSegmentParser).getTopic();
doReturn(0).when(mockTopicPartitionSegmentParser).getPartition();
doThrow(new IOException()).when(mockTopicPartitionSegmentParser).getNextRecord(any(), any());
doThrow(new ExecutionException(new IOException())).when(mockTopicPartitionSegmentParser).getNextRecord(any(), any());

AwsStorageSourceTask awsStorageSourceTask = new AwsStorageSourceTask(mockTransferManagerProvider, mockAwsSourceReader);
awsStorageSourceTask.poll();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@
import org.testng.annotations.Test;

import java.io.*;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;


public class TopicPartitionSegmentParserTest {
Expand Down Expand Up @@ -72,7 +74,7 @@ public void givenNonResponsiveStreamTriggerTimeoutOnDefinedTimePeriod() throws I
try (PipedInputStream empty = new PipedInputStream(1)) {
pipedOutputStream = new PipedOutputStream(empty); //making sure we just have an unresponsive stream and not throwing an ioexception
TopicPartitionSegmentParser topicPartitionSegmentParser = new TopicPartitionSegmentParser(empty, s3ObjectKey, "");
Assert.expectThrows(UncheckedTimeoutException.class, () -> topicPartitionSegmentParser.getNextRecord(100L, TimeUnit.MILLISECONDS));
Assert.expectThrows(TimeoutException.class, () -> topicPartitionSegmentParser.getNextRecord(5L, TimeUnit.MILLISECONDS));
} finally {
if (pipedOutputStream != null) {
pipedOutputStream.close();
Expand Down Expand Up @@ -103,7 +105,12 @@ public void givenClosedStreamThrowIoException() throws IOException {
InputStream nullInputStream = InputStream.nullInputStream();
TopicPartitionSegmentParser topicPartitionSegmentParser = new TopicPartitionSegmentParser(nullInputStream, s3ObjectKey, "");
nullInputStream.close();
Assert.expectThrows(IOException.class, () -> topicPartitionSegmentParser.getNextRecord(5L, TimeUnit.SECONDS));
try {
topicPartitionSegmentParser.getNextRecord(5L, TimeUnit.SECONDS);
} catch (Exception e) {
Assert.assertTrue(e instanceof ExecutionException);
Assert.assertTrue(e.getCause() instanceof IOException);
}
}

@Test
Expand Down

0 comments on commit fe628e6

Please sign in to comment.