diff --git a/src/main/java/com/amplitude/Amplitude.java b/src/main/java/com/amplitude/Amplitude.java index c84a042..060f3b9 100644 --- a/src/main/java/com/amplitude/Amplitude.java +++ b/src/main/java/com/amplitude/Amplitude.java @@ -3,6 +3,7 @@ import java.net.Proxy; import java.util.*; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutorService; public class Amplitude { private static Map instances = new HashMap<>(); @@ -206,6 +207,26 @@ public Amplitude setFlushTimeout(long timeout) { return this; } + /** + * Set the thread pool for sending events via {@link HttpTransport} + * + * @param sendThreadPool the thread pool for sending events + */ + public Amplitude setSendThreadPool(ExecutorService sendThreadPool) { + this.httpTransport.setSendThreadPool(sendThreadPool); + return this; + } + + /** + * Set the thread pool for retrying events via {@link HttpTransport} + * + * @param retryThreadPool the thread pool for retrying events + */ + public Amplitude setRetryThreadPool(ExecutorService retryThreadPool) { + this.httpTransport.setRetryThreadPool(retryThreadPool); + return this; + } + /** Add middleware to the middleware runner */ public synchronized void addEventMiddleware(Middleware middleware) { middlewareRunner.add(middleware); diff --git a/src/main/java/com/amplitude/HttpTransport.java b/src/main/java/com/amplitude/HttpTransport.java index 127f8ec..690283e 100644 --- a/src/main/java/com/amplitude/HttpTransport.java +++ b/src/main/java/com/amplitude/HttpTransport.java @@ -38,22 +38,29 @@ class HttpTransport { private int eventsInRetry = 0; private Object bufferLock = new Object(); private Object counterLock = new Object(); - private ExecutorService retryThreadPool; - private ExecutorService sendThreadPool; private HttpCall httpCall; private AmplitudeLog logger; private AmplitudeCallbacks callbacks; private long flushTimeout; + // Managed by setters + private ExecutorService retryThreadPool = Executors.newFixedThreadPool(10); + + // The supplyAsyncPool is only used within the sendThreadPool so only when + // the sendThreadPool is increased will the supplyAsyncPool be more utilized. + // We are using the supplyAsyncPool rather than the default fork join common + // pool because the fork join common pool scales with cpu... and we do not + // want to perform network requests in that small pool. + private ExecutorService sendThreadPool = Executors.newFixedThreadPool(20); + private ExecutorService supplyAsyncPool = Executors.newCachedThreadPool(); + HttpTransport( HttpCall httpCall, AmplitudeCallbacks callbacks, AmplitudeLog logger, long flushTimeout) { this.httpCall = httpCall; this.callbacks = callbacks; this.logger = logger; this.flushTimeout = flushTimeout; - retryThreadPool = Executors.newFixedThreadPool(10); - sendThreadPool = Executors.newFixedThreadPool(20); } public void sendEventsWithRetry(List events) { @@ -98,6 +105,14 @@ public void setFlushTimeout(long timeout) { flushTimeout = timeout; } + public void setSendThreadPool(ExecutorService sendThreadPool) { + this.sendThreadPool = sendThreadPool; + } + + public void setRetryThreadPool(ExecutorService retryThreadPool) { + this.retryThreadPool = retryThreadPool; + } + public void setCallbacks(AmplitudeCallbacks callbacks) { this.callbacks = callbacks; } @@ -118,7 +133,7 @@ private CompletableFuture sendEvents(List events) { throw new CompletionException(e); } return response; - }); + }, supplyAsyncPool); } // Call this function if event not in current Retry list.