diff --git a/src/main/java/com/amplitude/Amplitude.java b/src/main/java/com/amplitude/Amplitude.java index c84a042..4074977 100644 --- a/src/main/java/com/amplitude/Amplitude.java +++ b/src/main/java/com/amplitude/Amplitude.java @@ -3,6 +3,7 @@ import java.net.Proxy; import java.util.*; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutorService; public class Amplitude { private static Map instances = new HashMap<>(); @@ -24,6 +25,8 @@ public class Amplitude { private Plan plan; private IngestionMetadata ingestionMetadata; private long flushTimeout; + private ExecutorService retryThreadPool; + private ExecutorService sendThreadPool; /** * A dictionary of key-value pairs that represent additional instructions for server save @@ -46,7 +49,7 @@ private Amplitude(String name) { logger = new AmplitudeLog(); eventsToSend = new ConcurrentLinkedQueue<>(); aboutToStartFlushing = false; - httpTransport = new HttpTransport(httpCall, null, logger, flushTimeout); + httpTransport = new HttpTransport(httpCall, null, logger, flushTimeout, sendThreadPool, retryThreadPool); } /** @@ -206,6 +209,28 @@ public Amplitude setFlushTimeout(long timeout) { return this; } + /** + * Set the thread pool for sending events via {@link HttpTransport} + * + * @param sendThreadPool the thread pool for sending events + */ + public Amplitude setSendThreadPool(ExecutorService sendThreadPool) { + this.sendThreadPool = sendThreadPool; + this.httpTransport.setSendThreadPool(sendThreadPool); + return this; + } + + /** + * Set the thread pool for retrying events via {@link HttpTransport} + * + * @param retryThreadPool the thread pool for retrying events + */ + public Amplitude setRetryThreadPool(ExecutorService retryThreadPool) { + this.retryThreadPool = retryThreadPool; + this.httpTransport.setRetryThreadPool(retryThreadPool); + return this; + } + /** Add middleware to the middleware runner */ public synchronized void addEventMiddleware(Middleware middleware) { middlewareRunner.add(middleware); diff --git a/src/main/java/com/amplitude/HttpTransport.java b/src/main/java/com/amplitude/HttpTransport.java index 127f8ec..fd78ee9 100644 --- a/src/main/java/com/amplitude/HttpTransport.java +++ b/src/main/java/com/amplitude/HttpTransport.java @@ -47,13 +47,14 @@ class HttpTransport { private long flushTimeout; HttpTransport( - HttpCall httpCall, AmplitudeCallbacks callbacks, AmplitudeLog logger, long flushTimeout) { + HttpCall httpCall, AmplitudeCallbacks callbacks, AmplitudeLog logger, + long flushTimeout, ExecutorService sendThreadPool, ExecutorService retryThreadPool) { this.httpCall = httpCall; this.callbacks = callbacks; this.logger = logger; this.flushTimeout = flushTimeout; - retryThreadPool = Executors.newFixedThreadPool(10); - sendThreadPool = Executors.newFixedThreadPool(20); + this.retryThreadPool = (retryThreadPool == null) ? Executors.newFixedThreadPool(10) : retryThreadPool; + this.sendThreadPool = (sendThreadPool == null) ? Executors.newFixedThreadPool(40) : sendThreadPool; } public void sendEventsWithRetry(List events) { @@ -98,6 +99,14 @@ public void setFlushTimeout(long timeout) { flushTimeout = timeout; } + public void setSendThreadPool(ExecutorService sendThreadPool) { + this.sendThreadPool = sendThreadPool; + } + + public void setRetryThreadPool(ExecutorService retryThreadPool) { + this.retryThreadPool = retryThreadPool; + } + public void setCallbacks(AmplitudeCallbacks callbacks) { this.callbacks = callbacks; } @@ -118,7 +127,7 @@ private CompletableFuture sendEvents(List events) { throw new CompletionException(e); } return response; - }); + }, sendThreadPool); } // Call this function if event not in current Retry list. diff --git a/src/test/java/com/amplitude/HttpTransportTest.java b/src/test/java/com/amplitude/HttpTransportTest.java index b93958b..6ef5981 100644 --- a/src/test/java/com/amplitude/HttpTransportTest.java +++ b/src/test/java/com/amplitude/HttpTransportTest.java @@ -2,8 +2,11 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyList; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -13,6 +16,8 @@ import java.util.List; import java.util.Map; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import org.junit.jupiter.api.BeforeEach; @@ -20,10 +25,13 @@ import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.CsvSource; +import org.mockito.BDDMockito; import org.mockito.junit.jupiter.MockitoExtension; import com.amplitude.exception.AmplitudeInvalidAPIKeyException; import com.amplitude.util.EventsGenerator; +import org.mockito.junit.jupiter.MockitoSettings; +import org.mockito.quality.Strictness; @ExtendWith(MockitoExtension.class) public class HttpTransportTest { @@ -32,7 +40,30 @@ public class HttpTransportTest { @BeforeEach public void setUp() { - httpTransport = new HttpTransport(null, null, new AmplitudeLog(), 0); + httpTransport = new HttpTransport(null, null, new AmplitudeLog(), 0, null, null); + } + + /** + * This test is to make sure the same thread pool is used in both sendEventsWithRetry -> sendEvents. + * If the thread pool is not piped to sendEvents, then the default ForkJoinPool is used which can + * become a performance bottleneck. + */ + @Test + @MockitoSettings(strictness = Strictness.LENIENT) + public void testSentEventsThreadpool() throws AmplitudeInvalidAPIKeyException, InterruptedException{ + CountDownLatch latch = new CountDownLatch(2); + HttpCall httpCall = mock(HttpCall.class); + when(httpCall.makeRequest(anyList())).thenReturn(ResponseUtil.getSuccessResponse()); + ExecutorService sendThreadPool = spy(Executors.newFixedThreadPool(2)); + doAnswer((invocation) -> { + latch.countDown(); + invocation.callRealMethod(); + return null; + }).when(sendThreadPool).execute(any()); + httpTransport.setSendThreadPool(sendThreadPool); + httpTransport.setHttpCall(httpCall); + httpTransport.sendEventsWithRetry(List.of()); + assertTrue(latch.await(2L, TimeUnit.SECONDS)); } @ParameterizedTest