Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Receipt modack #1540

Merged
merged 69 commits into from
Aug 24, 2023
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
69 commits
Select commit Hold shift + click to select a range
76539e8
receipt-modack for exactly once
maitrimangal Mar 27, 2023
a06a4b7
changing setup
maitrimangal Mar 27, 2023
d8a1779
changing the pendingReceipt List
maitrimangal Apr 3, 2023
680ac2d
using scheduled fixed rate
maitrimangal Apr 18, 2023
ec652a6
using blocked queues
maitrimangal Apr 18, 2023
4236363
using blocked queues
maitrimangal Apr 18, 2023
88dfc84
using blocked queues
maitrimangal Apr 18, 2023
1741342
adding null safety
maitrimangal Apr 18, 2023
b5d9706
adding null safety
maitrimangal Apr 18, 2023
baa470b
removing list
maitrimangal Apr 18, 2023
d7adf96
adding list back
maitrimangal Apr 18, 2023
349d0c4
if permanent failure, remove outstandingmsg from queue
maitrimangal Apr 18, 2023
3f877d5
adding snippet of test
maitrimangal May 19, 2023
1423c01
adding method to streaming subscriber
maitrimangal May 19, 2023
2f8a408
adding method to streaming subscriber
maitrimangal May 19, 2023
aa35b97
adding notifyAcks
maitrimangal May 19, 2023
71a90df
changing notifyAckFailed calls
maitrimangal May 24, 2023
4dfa1fd
addressing some comments
maitrimangal May 30, 2023
a893dd6
changed logic to use one datastructure
maitrimangal May 30, 2023
3193b81
fixing notifyFailed
maitrimangal Jun 5, 2023
bb0c2a9
fixing notifyFailed
maitrimangal Jun 5, 2023
60de4e1
changing Pair to custom class
maitrimangal Jun 15, 2023
0d9d41b
removing the not needed data structure
maitrimangal Jun 26, 2023
19c51db
Fixing test
maitrimangal Jun 26, 2023
ba5f30b
Fixing test
maitrimangal Jun 26, 2023
9d808eb
Fixing test
maitrimangal Jun 26, 2023
1a16d9a
Fixing test
maitrimangal Jun 26, 2023
e5718ab
fixing format
maitrimangal Jun 26, 2023
3a520ed
fixing test to call receiveMessage
maitrimangal Jun 26, 2023
6ca0337
testing test failure
maitrimangal Jun 26, 2023
5553ded
testing test failure
maitrimangal Jun 26, 2023
85bb43d
testing test failure
maitrimangal Jun 26, 2023
c8c4b63
increasing timestamp to test
maitrimangal Jun 26, 2023
aed52a1
increasing timestamp to test
maitrimangal Jun 26, 2023
d8fb0fa
adding log statement for testing
maitrimangal Jun 27, 2023
b90a6b6
Fixing lint
maitrimangal Jun 27, 2023
0681eca
Adding more logs
maitrimangal Jun 29, 2023
6ebd401
batch size log
maitrimangal Jul 17, 2023
10a43f3
changing method to syncronized
maitrimangal Jul 17, 2023
ead3c51
fixing for loop to not remove as we are iterating
maitrimangal Jul 20, 2023
bb35726
trying a concurrent map
maitrimangal Jul 20, 2023
f9abf69
fix: syncronizing notifyFailed
maitrimangal Aug 17, 2023
551528b
fix: removing unused import
maitrimangal Aug 17, 2023
04124fe
fix: reformat
maitrimangal Aug 17, 2023
7dcc8bc
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Aug 18, 2023
f4ca8a5
fix: removing System.out.println statements
maitrimangal Aug 18, 2023
e1c251a
Merge branch 'main' of https://github.com/googleapis/java-pubsub into…
maitrimangal Aug 18, 2023
f156c4a
Merge branch 'ReceiptModack' of https://github.com/maitrimangal/java-…
maitrimangal Aug 18, 2023
fc9b8a8
fix: reviewign comments
maitrimangal Aug 18, 2023
3091823
fix: lint
maitrimangal Aug 18, 2023
1cc0365
adding another ordering key test example
maitrimangal Aug 18, 2023
d906e11
fix: trying to run this test again
maitrimangal Aug 18, 2023
46c2511
fix: trying to run this test again
maitrimangal Aug 19, 2023
b8edb54
fix: removing commented code
maitrimangal Aug 21, 2023
14e7292
fix: removing commented code
maitrimangal Aug 21, 2023
eb780ca
resolving the comments from review
maitrimangal Aug 22, 2023
aa5848b
adding custom matcher
maitrimangal Aug 22, 2023
ac3840e
adding custom matcher
maitrimangal Aug 22, 2023
e201ae4
adding custom matcher
maitrimangal Aug 22, 2023
1c94e57
adding custom matcher
maitrimangal Aug 22, 2023
3e9c83f
adding custom matcher correcting the matching statement
maitrimangal Aug 22, 2023
3aef16b
lint
maitrimangal Aug 22, 2023
f8db409
removing comments
maitrimangal Aug 22, 2023
11af2a0
removing comments
maitrimangal Aug 22, 2023
3200173
removing comments
maitrimangal Aug 22, 2023
e06b968
changing messageMatcher to messageDataMatcher, and fixing other nit t…
maitrimangal Aug 23, 2023
1846eb6
lint
maitrimangal Aug 23, 2023
3e45ca5
addressing review comments
maitrimangal Aug 24, 2023
a2336bd
addressing review comments
maitrimangal Aug 24, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import com.google.common.util.concurrent.MoreExecutors;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.ReceivedMessage;
import io.opencensus.trace.Link;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -89,7 +90,9 @@ class MessageDispatcher {
private final LinkedBlockingQueue<AckRequestData> pendingAcks = new LinkedBlockingQueue<>();
private final LinkedBlockingQueue<AckRequestData> pendingNacks = new LinkedBlockingQueue<>();
private final LinkedBlockingQueue<AckRequestData> pendingReceipts = new LinkedBlockingQueue<>();

private final ConcurrentMap<String, OutstandingMessage> outstandingReceipts = new ConcurrentHashMap<>();
private final List<OutstandingMessage> exactlyOnceOutstandingBatch = new ArrayList<>();
maitrimangal marked this conversation as resolved.
Show resolved Hide resolved
private final LinkedBlockingQueue<OutstandingMessage> exactlyOncePendingBatch = new LinkedBlockingQueue<>();
private final AtomicInteger messageDeadlineSeconds = new AtomicInteger();
private final AtomicBoolean extendDeadline = new AtomicBoolean(true);
private final Lock jobLock;
Expand Down Expand Up @@ -261,6 +264,16 @@ public void run() {
TimeUnit.SECONDS);
}
processOutstandingOperations();
// List<OutstandingMessage> outstandingBatch = new ArrayList<>();
maitrimangal marked this conversation as resolved.
Show resolved Hide resolved
// for(OutstandingMessage message: exactlyOnceOutstandingBatch){
// if(!exactlyOncePendingBatch.isEmpty() &&
// message.receivedMessage.getAckId() == exactlyOncePendingBatch.peek().receivedMessage.getAckId()){
// outstandingBatch.add(message);
// exactlyOncePendingBatch.poll();
// exactlyOnceOutstandingBatch.remove(message);
// }
// }
// processBatch(outstandingBatch);
} catch (Throwable t) {
// Catch everything so that one run failing doesn't prevent subsequent runs.
logger.log(Level.WARNING, "failed to run periodic job", t);
Expand Down Expand Up @@ -348,6 +361,7 @@ private OutstandingMessage(ReceivedMessage receivedMessage, AckHandler ackHandle
this.receivedMessage = receivedMessage;
this.ackHandler = ackHandler;
}

}

void processReceivedMessages(List<ReceivedMessage> messages) {
Expand All @@ -361,7 +375,7 @@ void processReceivedMessages(List<ReceivedMessage> messages) {
AckRequestData ackRequestData = builder.build();
AckHandler ackHandler =
new AckHandler(ackRequestData, message.getMessage().getSerializedSize(), totalExpiration);
if (pendingMessages.putIfAbsent(message.getAckId(), ackHandler) != null) {
if (!this.exactlyOnceDeliveryEnabled.get() && pendingMessages.putIfAbsent(message.getAckId(), ackHandler) != null) {
// putIfAbsent puts ackHandler if ackID isn't previously mapped, then return the
// previously-mapped element.
// If the previous element is not null, we already have the message and the new one is
Expand All @@ -372,13 +386,48 @@ void processReceivedMessages(List<ReceivedMessage> messages) {
// totally expire so that pubsub service sends us the message again.
continue;
}
outstandingBatch.add(new OutstandingMessage(message, ackHandler));
OutstandingMessage outstandingMessage = new OutstandingMessage(message, ackHandler);
maitrimangal marked this conversation as resolved.
Show resolved Hide resolved
if (this.exactlyOnceDeliveryEnabled.get()) {
maitrimangal marked this conversation as resolved.
Show resolved Hide resolved
exactlyOncePendingBatch.add(outstandingMessage);
outstandingReceipts.put(message.getAckId(), outstandingMessage);
} else {
outstandingBatch.add(outstandingMessage);
}
pendingReceipts.add(ackRequestData);
}

processBatch(outstandingBatch);
}

void notifyAckSuccess(AckRequestData ackRequestData) {

if(outstandingReceipts.containsKey(ackRequestData.getAckId())) {
maitrimangal marked this conversation as resolved.
Show resolved Hide resolved
OutstandingMessage outstandingMessage = outstandingReceipts.get(ackRequestData.getAckId());
pendingMessages.putIfAbsent(outstandingMessage.receivedMessage.getAckId(),
maitrimangal marked this conversation as resolved.
Show resolved Hide resolved
outstandingMessage.ackHandler);

exactlyOnceOutstandingBatch.add(outstandingMessage);
maitrimangal marked this conversation as resolved.
Show resolved Hide resolved
List<OutstandingMessage> outstandingBatch = new ArrayList<>();
for (OutstandingMessage message : exactlyOnceOutstandingBatch) {
if (!exactlyOncePendingBatch.isEmpty() &&
message.receivedMessage.getAckId()
== exactlyOncePendingBatch.peek().receivedMessage.getAckId()) {
outstandingBatch.add(message);
exactlyOncePendingBatch.poll();
exactlyOnceOutstandingBatch.remove(message);
}
}
processBatch(outstandingBatch);
}
}

void notifyAckFailed(AckRequestData ackRequestData) {
if(outstandingReceipts.containsKey(ackRequestData.getAckId())){
outstandingReceipts.remove(ackRequestData);
OutstandingMessage outstandingMessage = outstandingReceipts.get(ackRequestData.getAckId());
exactlyOncePendingBatch.remove(outstandingMessage);
}
}

private void processBatch(List<OutstandingMessage> batch) {
messagesWaiter.incrementPendingCount(batch.size());
for (OutstandingMessage message : batch) {
Expand Down Expand Up @@ -519,6 +568,7 @@ void extendDeadlines() {

@InternalApi
void processOutstandingOperations() {

List<ModackRequestData> modackRequestData = new ArrayList<ModackRequestData>();

// Nacks are modacks with an expiration of 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -519,6 +519,7 @@ public void onSuccess(Empty empty) {
for (AckRequestData ackRequestData : ackRequestDataList) {
// This will check if a response is needed, and if it has already been set
ackRequestData.setResponse(AckResponse.SUCCESSFUL, setResponseOnSuccess);
messageDispatcher.notifyAckSuccess(ackRequestData);
// Remove from our pending operations
pendingRequests.remove(ackRequestData);
}
Expand All @@ -541,6 +542,7 @@ public void onFailure(Throwable t) {
Map<String, String> metadataMap = getMetadataMapFromThrowable(t);
ackRequestDataList.forEach(
ackRequestData -> {
messageDispatcher.notifyAckFailed(ackRequestData);
maitrimangal marked this conversation as resolved.
Show resolved Hide resolved
String ackId = ackRequestData.getAckId();
if (metadataMap.containsKey(ackId)) {
// An error occured
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,16 @@ public void testReceiptMessageReceiver() {
.receiveMessage(eq(TEST_MESSAGE.getMessage()), any(AckReplyConsumer.class));
}

// @Test
maitrimangal marked this conversation as resolved.
Show resolved Hide resolved
// public void testBatchSizeofExactlyOnceDelivered() {
// MessageReceiver mockMessageReceiver = mock(MessageReceiver.class);
// MessageDispatcher messageDispatcher = getMessageDispatcher(mockMessageReceiver);
// List<ReceivedMessage> receivedMessageList = new ArrayList<ReceivedMessage>();
// Collections.addAll(receivedMessageList, TEST_MESSAGE, TEST_MESSAGE, TEST_MESSAGE, TEST_MESSAGE, TEST_MESSAGE);
// messageDispatcher.processReceivedMessages(receivedMessageList);
//
// }

@Test
public void testReceiptMessageReceiverWithAckResponse() {
MessageReceiverWithAckResponse mockMessageReceiverWithAckResponse =
Expand Down