Skip to content

Commit

Permalink
fix: use sendThreadPool in sendEvents
Browse files Browse the repository at this point in the history
  • Loading branch information
kevink-sq committed Aug 28, 2024
1 parent 6dfc6bb commit 9069886
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 6 deletions.
27 changes: 26 additions & 1 deletion src/main/java/com/amplitude/Amplitude.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import java.net.Proxy;
import java.util.*;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;

public class Amplitude {
private static Map<String, Amplitude> instances = new HashMap<>();
Expand All @@ -24,6 +25,8 @@ public class Amplitude {
private Plan plan;
private IngestionMetadata ingestionMetadata;
private long flushTimeout;
private ExecutorService retryThreadPool;
private ExecutorService sendThreadPool;

/**
* A dictionary of key-value pairs that represent additional instructions for server save
Expand All @@ -46,7 +49,7 @@ private Amplitude(String name) {
logger = new AmplitudeLog();
eventsToSend = new ConcurrentLinkedQueue<>();
aboutToStartFlushing = false;
httpTransport = new HttpTransport(httpCall, null, logger, flushTimeout);
httpTransport = new HttpTransport(httpCall, null, logger, flushTimeout, sendThreadPool, retryThreadPool);
}

/**
Expand Down Expand Up @@ -206,6 +209,28 @@ public Amplitude setFlushTimeout(long timeout) {
return this;
}

/**
* Set the thread pool for sending events via {@link HttpTransport}
*
* @param sendThreadPool the thread pool for sending events
*/
public Amplitude setSendThreadPool(ExecutorService sendThreadPool) {
this.sendThreadPool = sendThreadPool;
this.httpTransport.setSendThreadPool(sendThreadPool);
return this;
}

/**
* Set the thread pool for retrying events via {@link HttpTransport}
*
* @param retryThreadPool the thread pool for retrying events
*/
public Amplitude setRetryThreadPool(ExecutorService retryThreadPool) {
this.retryThreadPool = retryThreadPool;
this.httpTransport.setRetryThreadPool(retryThreadPool);
return this;
}

/** Add middleware to the middleware runner */
public synchronized void addEventMiddleware(Middleware middleware) {
middlewareRunner.add(middleware);
Expand Down
17 changes: 13 additions & 4 deletions src/main/java/com/amplitude/HttpTransport.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,14 @@ class HttpTransport {
private long flushTimeout;

HttpTransport(
HttpCall httpCall, AmplitudeCallbacks callbacks, AmplitudeLog logger, long flushTimeout) {
HttpCall httpCall, AmplitudeCallbacks callbacks, AmplitudeLog logger,
long flushTimeout, ExecutorService sendThreadPool, ExecutorService retryThreadPool) {
this.httpCall = httpCall;
this.callbacks = callbacks;
this.logger = logger;
this.flushTimeout = flushTimeout;
retryThreadPool = Executors.newFixedThreadPool(10);
sendThreadPool = Executors.newFixedThreadPool(20);
this.retryThreadPool = (retryThreadPool == null) ? Executors.newFixedThreadPool(10) : retryThreadPool;
this.sendThreadPool = (sendThreadPool == null) ? Executors.newFixedThreadPool(40) : sendThreadPool;
}

public void sendEventsWithRetry(List<Event> events) {
Expand Down Expand Up @@ -98,6 +99,14 @@ public void setFlushTimeout(long timeout) {
flushTimeout = timeout;
}

public void setSendThreadPool(ExecutorService sendThreadPool) {
this.sendThreadPool = sendThreadPool;
}

public void setRetryThreadPool(ExecutorService retryThreadPool) {
this.retryThreadPool = retryThreadPool;
}

public void setCallbacks(AmplitudeCallbacks callbacks) {
this.callbacks = callbacks;
}
Expand All @@ -118,7 +127,7 @@ private CompletableFuture<Response> sendEvents(List<Event> events) {
throw new CompletionException(e);
}
return response;
});
}, sendThreadPool);
}

// Call this function if event not in current Retry list.
Expand Down
33 changes: 32 additions & 1 deletion src/test/java/com/amplitude/HttpTransportTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyList;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
Expand All @@ -13,17 +16,22 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import org.mockito.BDDMockito;
import org.mockito.junit.jupiter.MockitoExtension;

import com.amplitude.exception.AmplitudeInvalidAPIKeyException;
import com.amplitude.util.EventsGenerator;
import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness;

@ExtendWith(MockitoExtension.class)
public class HttpTransportTest {
Expand All @@ -32,7 +40,30 @@ public class HttpTransportTest {

@BeforeEach
public void setUp() {
httpTransport = new HttpTransport(null, null, new AmplitudeLog(), 0);
httpTransport = new HttpTransport(null, null, new AmplitudeLog(), 0, null, null);
}

/**
* This test is to make sure the same thread pool is used in both sendEventsWithRetry -> sendEvents.
* If the thread pool is not piped to sendEvents, then the default ForkJoinPool is used which can
* become a performance bottleneck.
*/
@Test
@MockitoSettings(strictness = Strictness.LENIENT)
public void testSentEventsThreadpool() throws AmplitudeInvalidAPIKeyException, InterruptedException{
CountDownLatch latch = new CountDownLatch(2);
HttpCall httpCall = mock(HttpCall.class);
when(httpCall.makeRequest(anyList())).thenReturn(ResponseUtil.getSuccessResponse());
ExecutorService sendThreadPool = spy(Executors.newFixedThreadPool(2));
doAnswer((invocation) -> {
latch.countDown();
invocation.callRealMethod();
return null;
}).when(sendThreadPool).execute(any());
httpTransport.setSendThreadPool(sendThreadPool);
httpTransport.setHttpCall(httpCall);
httpTransport.sendEventsWithRetry(List.of());
assertTrue(latch.await(2L, TimeUnit.SECONDS));
}

@ParameterizedTest
Expand Down

0 comments on commit 9069886

Please sign in to comment.