Skip to content

Commit

Permalink
Adding wk size to message size calculation along with a buffer (#476)
Browse files Browse the repository at this point in the history
* Adding wk size to message size calculation along with a buffer
  • Loading branch information
MichaelGHSeg authored Mar 20, 2024
1 parent d3e0d86 commit ffbaca4
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,8 @@ public int messageSizeInBytes(Message message) {
private Boolean isBackPressuredAfterSize(int incomingSize) {
int POISON_BYTE_SIZE = messageSizeInBytes(FlushMessage.POISON);
int sizeAfterAdd = this.currentQueueSizeInBytes + incomingSize + POISON_BYTE_SIZE;
return sizeAfterAdd >= Math.min(this.maximumQueueByteSize, BATCH_MAX_SIZE);
// Leave a 10% buffer since the unsynchronized enqueue could add multiple at a time
return sizeAfterAdd >= Math.min(this.maximumQueueByteSize, BATCH_MAX_SIZE) * 0.9;
}

public boolean offer(Message message) {
Expand All @@ -176,8 +177,6 @@ public void enqueue(Message message) {
// @jorgen25 check if message is below 32kb limit for individual messages, no need to check
// for extra characters
if (messageByteSize <= MSG_MAX_SIZE) {
// messageQueue.put(message);

if (isBackPressuredAfterSize(messageByteSize)) {
this.currentQueueSizeInBytes = messageByteSize;
messageQueue.put(FlushMessage.POISON);
Expand Down Expand Up @@ -443,13 +442,13 @@ public static class BatchUtility {
* "userId":"jorgen25","integrations":{"someKey":{"data":"aaaaa"}},"previousId":"foo"},{"type":"alias",
* "messageId":"57b0ceb4-a1cf-4599-9fba-0a44c7041004","timestamp":"Nov 18, 2021, 2:45:07 PM",
* "userId":"jorgen25","integrations":{"someKey":{"data":"aaaaa"}},"previousId":"foo"}],
* "sentAt":"Nov 18, 2021, 2:45:07
* PM","context":{"library":{"name":"analytics-java","version":"3.1.3"}},"sequence":1}
* "sentAt":"Nov 18, 2021, 2:45:07 PM","context":{"library":{"name":"analytics-java",
* "version":"3.1.3"}},"sequence":1,"writeKey":"XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX"}
*
* <p>total size of batch : 886
* <p>total size of batch : 932
*
* <p>BREAKDOWN: {"batch":[MESSAGE1,MESSAGE2,MESSAGE3,MESSAGE4],"sentAt":"MMM dd, yyyy, HH:mm:ss
* tt","context":CONTEXT,"sequence":1}
* tt","context":CONTEXT,"sequence":1,"writeKey":"XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX"}
*
* <p>so we need to account for: 1 -message size: 189 * 4 = 756 2 -context object size = 55 in
* this sample -> 756 + 55 = 811 3 -Metadata (This has the sent data/sequence characters) +
Expand All @@ -463,12 +462,14 @@ public static class BatchUtility {
* <p>so formulae to determine the expected default size of the batch is
*
* @return: defaultSize = messages size + context size + metadata size + comma number + sequence
* digits
* digits + writekey + buffer
* @return
*/
private static int getBatchDefaultSize(int contextSize, int currentMessageNumber) {
// sample data: {"batch":[],"sentAt":"MMM dd, yyyy, HH:mm:ss tt","context":,"sequence":1} - 73
int metadataExtraCharsSize = 73;
// sample data: {"batch":[],"sentAt":"MMM dd, yyyy, HH:mm:ss tt","context":,"sequence":1,
// "writeKey":"XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX"} - 119
// Don't need to squeeze everything possible into a batch, adding a buffer
int metadataExtraCharsSize = 119 + 1024;
int commaNumber = currentMessageNumber - 1;

return contextSize
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -875,7 +875,7 @@ public void submitBatchBelowThreshold() throws InterruptedException, IllegalArgu
new Gson());

Map<String, String> properties = new HashMap<String, String>();
properties.put("property3", generateDataOfSizeSpecialChars(MAX_MSG_SIZE, true));
properties.put("property3", generateDataOfSizeSpecialChars(((int) (MAX_MSG_SIZE * 0.9)), true));

for (int i = 0; i < 15; i++) {
TrackMessage bigMessage =
Expand Down Expand Up @@ -929,13 +929,7 @@ public void submitBatchAboveThreshold() throws InterruptedException, IllegalArgu
client.shutdown();
while (!isShutDown.get()) {}

/**
* modified from expected 8 to expected 7 times, since we removed the inner loop. The inner loop
* was forcing to message list created from the queue to keep making batches even if its a 1
* message batch until the message list is empty, that was forcing the code to make one last
* batch of 1 msg in size bumping the number of times a batch would be submitted from 7 to 8
*/
verify(networkExecutor, times(7)).submit(any(Runnable.class));
verify(networkExecutor, times(8)).submit(any(Runnable.class));
}

@Test
Expand Down Expand Up @@ -970,6 +964,6 @@ public void submitManySmallMessagesBatchAboveThreshold() throws InterruptedExcep
client.shutdown();
while (!isShutDown.get()) {}

verify(networkExecutor, times(19)).submit(any(Runnable.class));
verify(networkExecutor, times(21)).submit(any(Runnable.class));
}
}

0 comments on commit ffbaca4

Please sign in to comment.