Skip to content

Commit

Permalink
Clean up redundant attributes / modifiers
Browse files Browse the repository at this point in the history
  • Loading branch information
kevink-sq committed Sep 4, 2024
1 parent 9069886 commit a081734
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 46 deletions.
33 changes: 11 additions & 22 deletions src/main/java/com/amplitude/Amplitude.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,27 +6,23 @@
import java.util.concurrent.ExecutorService;

public class Amplitude {
private static Map<String, Amplitude> instances = new HashMap<>();
private final static Map<String, Amplitude> instances = new HashMap<>();
private final String instanceName;
private String apiKey;
private String serverUrl;

private AmplitudeLog logger;
private AmplitudeLog logger = new AmplitudeLog();

private Queue<Event> eventsToSend;
private boolean aboutToStartFlushing;
private final Queue<Event> eventsToSend = new ConcurrentLinkedQueue<>();
private boolean aboutToStartFlushing = false;

private HttpCallMode httpCallMode;
private HttpCall httpCall;
private HttpTransport httpTransport;
private final HttpTransport httpTransport;
private int eventUploadThreshold = Constants.EVENT_BUF_COUNT;
private int eventUploadPeriodMillis = Constants.EVENT_BUF_TIME_MILLIS;
private Object eventQueueLock = new Object();
private final Object eventQueueLock = new Object();
private Plan plan;
private IngestionMetadata ingestionMetadata;
private long flushTimeout;
private ExecutorService retryThreadPool;
private ExecutorService sendThreadPool;

/**
* A dictionary of key-value pairs that represent additional instructions for server save
Expand All @@ -46,10 +42,7 @@ public class Amplitude {
*/
private Amplitude(String name) {
instanceName = name;
logger = new AmplitudeLog();
eventsToSend = new ConcurrentLinkedQueue<>();
aboutToStartFlushing = false;
httpTransport = new HttpTransport(httpCall, null, logger, flushTimeout, sendThreadPool, retryThreadPool);
httpTransport = new HttpTransport(logger);
}

/**
Expand Down Expand Up @@ -204,7 +197,6 @@ public Amplitude setLogger(AmplitudeLog logger) {
* @param timeout the flush events threads timeout millis
*/
public Amplitude setFlushTimeout(long timeout) {
flushTimeout = timeout;
this.httpTransport.setFlushTimeout(timeout);
return this;
}
Expand All @@ -215,7 +207,6 @@ public Amplitude setFlushTimeout(long timeout) {
* @param sendThreadPool the thread pool for sending events
*/
public Amplitude setSendThreadPool(ExecutorService sendThreadPool) {
this.sendThreadPool = sendThreadPool;
this.httpTransport.setSendThreadPool(sendThreadPool);
return this;
}
Expand All @@ -226,7 +217,6 @@ public Amplitude setSendThreadPool(ExecutorService sendThreadPool) {
* @param retryThreadPool the thread pool for retrying events
*/
public Amplitude setRetryThreadPool(ExecutorService retryThreadPool) {
this.retryThreadPool = retryThreadPool;
this.httpTransport.setRetryThreadPool(retryThreadPool);
return this;
}
Expand Down Expand Up @@ -356,14 +346,13 @@ private void updateHttpCall(HttpCallMode updatedHttpCallMode) {
httpCallMode = updatedHttpCallMode;

if (updatedHttpCallMode == HttpCallMode.BATCH) {
httpCall =
httpTransport.setHttpCall(
new HttpCall(
apiKey, serverUrl != null ? serverUrl : Constants.BATCH_API_URL, options, proxy);
apiKey, serverUrl != null ? serverUrl : Constants.BATCH_API_URL, options, proxy));
} else {
httpCall =
new HttpCall(apiKey, serverUrl != null ? serverUrl : Constants.API_URL, options, proxy);
httpTransport.setHttpCall(
new HttpCall(apiKey, serverUrl != null ? serverUrl : Constants.API_URL, options, proxy));
}
httpTransport.setHttpCall(httpCall);
}

private synchronized void scheduleFlushEvents() {
Expand Down
38 changes: 16 additions & 22 deletions src/main/java/com/amplitude/HttpTransport.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,31 +30,25 @@ protected EventsRetryResult(

class HttpTransport {
// Use map to record the events are currently in retry queue.
private Object throttleLock = new Object();
private Map<String, Integer> throttledUserId = new HashMap<>();
private Map<String, Integer> throttledDeviceId = new HashMap<>();
private final Object throttleLock = new Object();
private final Map<String, Integer> throttledUserId = new HashMap<>();
private final Map<String, Integer> throttledDeviceId = new HashMap<>();
private boolean recordThrottledId = false;
private Map<String, Map<String, List<Event>>> idToBuffer = new HashMap<>();
private final Map<String, Map<String, List<Event>>> idToBuffer = new HashMap<>();
private int eventsInRetry = 0;
private Object bufferLock = new Object();
private Object counterLock = new Object();
private ExecutorService retryThreadPool;
private ExecutorService sendThreadPool;

private HttpCall httpCall;
private AmplitudeLog logger;
private AmplitudeCallbacks callbacks;
private long flushTimeout;

HttpTransport(
HttpCall httpCall, AmplitudeCallbacks callbacks, AmplitudeLog logger,
long flushTimeout, ExecutorService sendThreadPool, ExecutorService retryThreadPool) {
this.httpCall = httpCall;
this.callbacks = callbacks;
private final Object bufferLock = new Object();
private final Object counterLock = new Object();

// Managed by setters
private ExecutorService retryThreadPool = Executors.newFixedThreadPool(10);
private ExecutorService sendThreadPool = Executors.newFixedThreadPool(40);
private HttpCall httpCall = null;
private long flushTimeout = 0;
private AmplitudeCallbacks callbacks = null;
private final AmplitudeLog logger;

HttpTransport(AmplitudeLog logger) {
this.logger = logger;
this.flushTimeout = flushTimeout;
this.retryThreadPool = (retryThreadPool == null) ? Executors.newFixedThreadPool(10) : retryThreadPool;
this.sendThreadPool = (sendThreadPool == null) ? Executors.newFixedThreadPool(40) : sendThreadPool;
}

public void sendEventsWithRetry(List<Event> events) {
Expand Down
3 changes: 1 addition & 2 deletions src/test/java/com/amplitude/HttpTransportTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import org.mockito.BDDMockito;
import org.mockito.junit.jupiter.MockitoExtension;

import com.amplitude.exception.AmplitudeInvalidAPIKeyException;
Expand All @@ -40,7 +39,7 @@ public class HttpTransportTest {

@BeforeEach
public void setUp() {
httpTransport = new HttpTransport(null, null, new AmplitudeLog(), 0, null, null);
httpTransport = new HttpTransport(new AmplitudeLog());
}

/**
Expand Down

0 comments on commit a081734

Please sign in to comment.