Skip to content

Commit

Permalink
feat: Receipt modack (googleapis#1540)
Browse files Browse the repository at this point in the history
* receipt-modack for exactly once

* changing setup

* changing the pendingReceipt List

* using scheduled fixed rate

* using blocked queues

* using blocked queues

* using blocked queues

* adding null safety

* adding null safety

* removing list

* adding list back

* if permanent failure, remove outstandingmsg from queue

* adding snippet of test

* adding method to streaming subscriber

* adding method to streaming subscriber

* adding notifyAcks

* changing notifyAckFailed calls

* addressing some comments

* changed logic to use one datastructure

* fixing notifyFailed

* fixing notifyFailed

* changing Pair to custom class

* removing the not needed data structure

* Fixing test

* Fixing test

* Fixing test

* Fixing test

* fixing format

* fixing test to call receiveMessage

* testing test failure

* testing test failure

* testing test failure

* increasing timestamp to test

* increasing timestamp to test

* adding log statement for testing

* Fixing lint

* Adding more logs

* batch size log

* changing method to syncronized

* fixing for loop to not remove as we are iterating

* trying a concurrent map

* fix: syncronizing notifyFailed

* fix: removing unused import

* fix: reformat

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* fix: removing System.out.println statements

* fix: reviewign comments

* fix: lint

* adding another ordering key test example

* fix: trying to run this test again

* fix: trying to run this test again

* fix: removing commented code

* fix: removing commented code

* resolving the comments from review

* adding custom matcher

* adding custom matcher

* adding custom matcher

* adding custom matcher

* adding custom matcher correcting the matching statement

* lint

* removing comments

* removing comments

* removing comments

* changing messageMatcher to messageDataMatcher, and fixing other nit things

* lint

* addressing review comments

* addressing review comments

---------

Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
  • Loading branch information
maitrimangal and gcf-owl-bot[bot] authored Aug 24, 2023
1 parent 9634a48 commit 74d8da9
Show file tree
Hide file tree
Showing 4 changed files with 200 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,11 @@
import com.google.pubsub.v1.ReceivedMessage;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
Expand Down Expand Up @@ -89,7 +92,8 @@ class MessageDispatcher {
private final LinkedBlockingQueue<AckRequestData> pendingAcks = new LinkedBlockingQueue<>();
private final LinkedBlockingQueue<AckRequestData> pendingNacks = new LinkedBlockingQueue<>();
private final LinkedBlockingQueue<AckRequestData> pendingReceipts = new LinkedBlockingQueue<>();

private final LinkedHashMap<String, ReceiptCompleteData> outstandingReceipts =
new LinkedHashMap<String, ReceiptCompleteData>();
private final AtomicInteger messageDeadlineSeconds = new AtomicInteger();
private final AtomicBoolean extendDeadline = new AtomicBoolean(true);
private final Lock jobLock;
Expand Down Expand Up @@ -350,6 +354,28 @@ private OutstandingMessage(ReceivedMessage receivedMessage, AckHandler ackHandle
}
}

private static class ReceiptCompleteData {
private OutstandingMessage outstandingMessage;
private Boolean receiptComplete;

private ReceiptCompleteData(OutstandingMessage outstandingMessage) {
this.outstandingMessage = outstandingMessage;
this.receiptComplete = false;
}

private OutstandingMessage getOutstandingMessage() {
return this.outstandingMessage;
}

private Boolean isReceiptComplete() {
return this.receiptComplete;
}

private void notifyReceiptComplete() {
this.receiptComplete = true;
}
}

void processReceivedMessages(List<ReceivedMessage> messages) {
Instant totalExpiration = now().plus(maxAckExtensionPeriod);
List<OutstandingMessage> outstandingBatch = new ArrayList<>(messages.size());
Expand All @@ -361,7 +387,13 @@ void processReceivedMessages(List<ReceivedMessage> messages) {
AckRequestData ackRequestData = builder.build();
AckHandler ackHandler =
new AckHandler(ackRequestData, message.getMessage().getSerializedSize(), totalExpiration);
if (pendingMessages.putIfAbsent(message.getAckId(), ackHandler) != null) {
OutstandingMessage outstandingMessage = new OutstandingMessage(message, ackHandler);

if (this.exactlyOnceDeliveryEnabled.get()) {
// For exactly once deliveries we don't add to outstanding batch because we first
// process the receipt modack. If that is successful then we process the message.
outstandingReceipts.put(message.getAckId(), new ReceiptCompleteData(outstandingMessage));
} else if (pendingMessages.putIfAbsent(message.getAckId(), ackHandler) != null) {
// putIfAbsent puts ackHandler if ackID isn't previously mapped, then return the
// previously-mapped element.
// If the previous element is not null, we already have the message and the new one is
Expand All @@ -371,14 +403,44 @@ void processReceivedMessages(List<ReceivedMessage> messages) {
// we want to eventually
// totally expire so that pubsub service sends us the message again.
continue;
} else {
outstandingBatch.add(outstandingMessage);
}
outstandingBatch.add(new OutstandingMessage(message, ackHandler));
pendingReceipts.add(ackRequestData);
}

processBatch(outstandingBatch);
}

synchronized void notifyAckSuccess(AckRequestData ackRequestData) {

if (outstandingReceipts.containsKey(ackRequestData.getAckId())) {
outstandingReceipts.get(ackRequestData.getAckId()).notifyReceiptComplete();
List<OutstandingMessage> outstandingBatch = new ArrayList<>();

for (Iterator<Entry<String, ReceiptCompleteData>> it =
outstandingReceipts.entrySet().iterator();
it.hasNext(); ) {
Map.Entry<String, ReceiptCompleteData> receipt = it.next();
// If receipt is complete then add to outstandingBatch to process the batch
if (receipt.getValue().isReceiptComplete()) {
it.remove();
if (pendingMessages.putIfAbsent(
receipt.getKey(), receipt.getValue().getOutstandingMessage().ackHandler)
== null) {
outstandingBatch.add(receipt.getValue().getOutstandingMessage());
}
} else {
break;
}
}
processBatch(outstandingBatch);
}
}

synchronized void notifyAckFailed(AckRequestData ackRequestData) {
outstandingReceipts.remove(ackRequestData.getAckId());
}

private void processBatch(List<OutstandingMessage> batch) {
messagesWaiter.incrementPendingCount(batch.size());
for (OutstandingMessage message : batch) {
Expand Down Expand Up @@ -519,6 +581,7 @@ void extendDeadlines() {

@InternalApi
void processOutstandingOperations() {

List<ModackRequestData> modackRequestData = new ArrayList<ModackRequestData>();

// Nacks are modacks with an expiration of 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -527,6 +527,7 @@ public void onSuccess(Empty empty) {
for (AckRequestData ackRequestData : ackRequestDataList) {
// This will check if a response is needed, and if it has already been set
ackRequestData.setResponse(AckResponse.SUCCESSFUL, setResponseOnSuccess);
messageDispatcher.notifyAckSuccess(ackRequestData);
// Remove from our pending operations
pendingRequests.remove(ackRequestData);
}
Expand Down Expand Up @@ -564,12 +565,15 @@ public void onFailure(Throwable t) {
"Permanent error invalid ack id message, will not resend",
errorMessage);
ackRequestData.setResponse(AckResponse.INVALID, setResponseOnSuccess);
messageDispatcher.notifyAckFailed(ackRequestData);
} else {
logger.log(Level.INFO, "Unknown error message, will not resend", errorMessage);
ackRequestData.setResponse(AckResponse.OTHER, setResponseOnSuccess);
messageDispatcher.notifyAckFailed(ackRequestData);
}
} else {
ackRequestData.setResponse(AckResponse.SUCCESSFUL, setResponseOnSuccess);
messageDispatcher.notifyAckSuccess(ackRequestData);
}
// Remove from our pending
pendingRequests.remove(ackRequestData);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright 2017 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.pubsub.v1;

import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import org.mockito.ArgumentMatcher;

public class MessageDataMatcher implements ArgumentMatcher<PubsubMessage> {

private ByteString expectedData;

public MessageDataMatcher(ByteString expectedData) {
this.expectedData = expectedData;
}

@Override
public boolean matches(PubsubMessage message2) {
return (expectedData.equals(message2.getData()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,100 @@ public void testReceiptMessageReceiver() {
.receiveMessage(eq(TEST_MESSAGE.getMessage()), any(AckReplyConsumer.class));
}

@Test
public void testReceiptModackWithOrderingForExactlyOnceDelivered() {

MessageReceiverWithAckResponse mockMessageReceiverWithAckResponse =
mock(MessageReceiverWithAckResponse.class);
MessageDispatcher messageDispatcher = getMessageDispatcher(mockMessageReceiverWithAckResponse);
messageDispatcher.setExactlyOnceDeliveryEnabled(true);

ReceivedMessage TEST_MESSAGE1 =
ReceivedMessage.newBuilder()
.setAckId("ACK_ID1")
.setMessage(
PubsubMessage.newBuilder()
.setData(ByteString.copyFromUtf8("message-data1"))
.build())
.setDeliveryAttempt(DELIVERY_INFO_COUNT)
.build();
ReceivedMessage TEST_MESSAGE2 =
ReceivedMessage.newBuilder()
.setAckId("ACK_ID2")
.setMessage(
PubsubMessage.newBuilder()
.setData(ByteString.copyFromUtf8("message-data2"))
.build())
.setDeliveryAttempt(DELIVERY_INFO_COUNT)
.build();
ReceivedMessage TEST_MESSAGE3 =
ReceivedMessage.newBuilder()
.setAckId("ACK_ID3")
.setMessage(
PubsubMessage.newBuilder()
.setData(ByteString.copyFromUtf8("message-data3"))
.build())
.setDeliveryAttempt(DELIVERY_INFO_COUNT)
.build();

messageDispatcher.processReceivedMessages(
Arrays.asList(TEST_MESSAGE3, TEST_MESSAGE2, TEST_MESSAGE1));

messageDispatcher.processOutstandingOperations();
verify(mockMessageReceiverWithAckResponse, never())
.receiveMessage(eq(TEST_MESSAGE.getMessage()), any(AckReplyConsumerWithResponse.class));

AckRequestData ackRequestData1 = AckRequestData.newBuilder(TEST_MESSAGE1.getAckId()).build();
AckRequestData ackRequestData2 = AckRequestData.newBuilder(TEST_MESSAGE2.getAckId()).build();
AckRequestData ackRequestData3 = AckRequestData.newBuilder(TEST_MESSAGE3.getAckId()).build();
messageDispatcher.notifyAckSuccess(ackRequestData2);
messageDispatcher.processOutstandingOperations();

messageDispatcher.notifyAckSuccess(ackRequestData1);
messageDispatcher.notifyAckSuccess(ackRequestData3);
messageDispatcher.processOutstandingOperations();

verify(mockMessageReceiverWithAckResponse, times(1))
.receiveMessage(
argThat(new MessageDataMatcher(TEST_MESSAGE3.getMessage().getData())),
any(AckReplyConsumerWithResponse.class));
verify(mockMessageReceiverWithAckResponse, times(1))
.receiveMessage(
argThat(new MessageDataMatcher(TEST_MESSAGE2.getMessage().getData())),
any(AckReplyConsumerWithResponse.class));
verify(mockMessageReceiverWithAckResponse, times(1))
.receiveMessage(
argThat(new MessageDataMatcher(TEST_MESSAGE1.getMessage().getData())),
any(AckReplyConsumerWithResponse.class));
}

@Test
public void testReceiptModackForExactlyOnceDelivered() {

MessageReceiverWithAckResponse mockMessageReceiverWithAckResponse =
mock(MessageReceiverWithAckResponse.class);
MessageDispatcher messageDispatcher = getMessageDispatcher(mockMessageReceiverWithAckResponse);
messageDispatcher.setExactlyOnceDeliveryEnabled(true);

messageDispatcher.processReceivedMessages(Collections.singletonList(TEST_MESSAGE));

messageDispatcher.processOutstandingOperations();
verify(mockMessageReceiverWithAckResponse, never())
.receiveMessage(eq(TEST_MESSAGE.getMessage()), any(AckReplyConsumerWithResponse.class));

AckRequestData ackRequestData = AckRequestData.newBuilder(TEST_MESSAGE.getAckId()).build();
messageDispatcher.notifyAckSuccess(ackRequestData);
messageDispatcher.processOutstandingOperations();

List<ModackRequestData> modackRequestDataList = new ArrayList<ModackRequestData>();
modackRequestDataList.add(new ModackRequestData(MIN_ACK_DEADLINE_SECONDS, ackRequestData));

verify(mockMessageReceiverWithAckResponse, times(1))
.receiveMessage(
argThat(new MessageDataMatcher(TEST_MESSAGE.getMessage().getData())),
any(AckReplyConsumerWithResponse.class));
}

@Test
public void testReceiptMessageReceiverWithAckResponse() {
MessageReceiverWithAckResponse mockMessageReceiverWithAckResponse =
Expand Down

0 comments on commit 74d8da9

Please sign in to comment.