Skip to content

Commit

Permalink
[ISSUE #8365] add remoting client non-oneway updateConsumerOffset fun…
Browse files Browse the repository at this point in the history
…ction (#8391)

* add non-oneway updateConsumerOffset
  • Loading branch information
qianye1001 authored Jul 17, 2024
1 parent 259efdb commit 1588d65
Show file tree
Hide file tree
Showing 6 changed files with 72 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,12 @@
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.utils.FutureUtils;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.proxy.common.ProxyContext;
import org.apache.rocketmq.proxy.common.ProxyException;
import org.apache.rocketmq.proxy.common.ProxyExceptionCode;
import org.apache.rocketmq.common.utils.FutureUtils;
import org.apache.rocketmq.proxy.common.utils.ProxyUtils;
import org.apache.rocketmq.proxy.service.ServiceManager;
import org.apache.rocketmq.proxy.service.message.ReceiptHandleMessage;
Expand Down Expand Up @@ -96,7 +96,7 @@ public CompletableFuture<PopResult> popMessage(
}
return popMessage(ctx, messageQueue, consumerGroup, topic, maxMsgNums, invisibleTime, pollTime, initMode,
subscriptionData, fifo, popMessageResultFilter, attemptId, timeoutMillis);
} catch (Throwable t) {
} catch (Throwable t) {
future.completeExceptionally(t);
}
return future;
Expand Down Expand Up @@ -287,7 +287,8 @@ public CompletableFuture<List<BatchAckResult>> batchAckMessage(
return FutureUtils.addExecutor(future, this.executor);
}

protected CompletableFuture<List<BatchAckResult>> processBrokerHandle(ProxyContext ctx, String consumerGroup, String topic, List<ReceiptHandleMessage> handleMessageList, long timeoutMillis) {
protected CompletableFuture<List<BatchAckResult>> processBrokerHandle(ProxyContext ctx, String consumerGroup,
String topic, List<ReceiptHandleMessage> handleMessageList, long timeoutMillis) {
return this.serviceManager.getMessageService().batchAckMessage(ctx, handleMessageList, consumerGroup, topic, timeoutMillis)
.thenApply(result -> {
List<BatchAckResult> results = new ArrayList<>();
Expand Down Expand Up @@ -393,6 +394,24 @@ public CompletableFuture<Void> updateConsumerOffset(ProxyContext ctx, MessageQue
return FutureUtils.addExecutor(future, this.executor);
}

public CompletableFuture<Void> updateConsumerOffsetAsync(ProxyContext ctx, MessageQueue messageQueue,
String consumerGroup, long commitOffset, long timeoutMillis) {
CompletableFuture<Void> future = new CompletableFuture<>();
try {
AddressableMessageQueue addressableMessageQueue = serviceManager.getTopicRouteService()
.buildAddressableMessageQueue(ctx, messageQueue);
UpdateConsumerOffsetRequestHeader requestHeader = new UpdateConsumerOffsetRequestHeader();
requestHeader.setConsumerGroup(consumerGroup);
requestHeader.setTopic(addressableMessageQueue.getTopic());
requestHeader.setQueueId(addressableMessageQueue.getQueueId());
requestHeader.setCommitOffset(commitOffset);
future = serviceManager.getMessageService().updateConsumerOffsetAsync(ctx, addressableMessageQueue, requestHeader, timeoutMillis);
} catch (Throwable t) {
future.completeExceptionally(t);
}
return FutureUtils.addExecutor(future, this.executor);
}

public CompletableFuture<Long> queryConsumerOffset(ProxyContext ctx, MessageQueue messageQueue,
String consumerGroup, long timeoutMillis) {
CompletableFuture<Long> future = new CompletableFuture<>();
Expand Down Expand Up @@ -501,9 +520,9 @@ public CompletableFuture<Long> getMinOffset(ProxyContext ctx, MessageQueue messa

protected Set<AddressableMessageQueue> buildAddressableSet(ProxyContext ctx, Set<MessageQueue> mqSet) {
Set<AddressableMessageQueue> addressableMessageQueueSet = new HashSet<>(mqSet.size());
for (MessageQueue mq:mqSet) {
for (MessageQueue mq : mqSet) {
try {
addressableMessageQueueSet.add(serviceManager.getTopicRouteService().buildAddressableMessageQueue(ctx, mq)) ;
addressableMessageQueueSet.add(serviceManager.getTopicRouteService().buildAddressableMessageQueue(ctx, mq));
} catch (Exception e) {
log.error("build addressable message queue fail, messageQueue = {}", mq, e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public class DefaultMessagingProcessor extends AbstractStartAndShutdown implemen
protected ThreadPoolExecutor producerProcessorExecutor;
protected ThreadPoolExecutor consumerProcessorExecutor;
protected static final String ROCKETMQ_HOME = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY,
System.getenv(MixAll.ROCKETMQ_HOME_ENV));
System.getenv(MixAll.ROCKETMQ_HOME_ENV));

protected DefaultMessagingProcessor(ServiceManager serviceManager) {
ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig();
Expand Down Expand Up @@ -167,7 +167,8 @@ public CompletableFuture<RemotingCommand> forwardMessageToDeadLetterQueue(ProxyC
}

@Override
public CompletableFuture<Void> endTransaction(ProxyContext ctx, String topic, String transactionId, String messageId, String producerGroup,
public CompletableFuture<Void> endTransaction(ProxyContext ctx, String topic, String transactionId,
String messageId, String producerGroup,
TransactionStatus transactionStatus, boolean fromTransactionCheck,
long timeoutMillis) {
return this.transactionProcessor.endTransaction(ctx, topic, transactionId, messageId, producerGroup, transactionStatus, fromTransactionCheck, timeoutMillis);
Expand Down Expand Up @@ -225,6 +226,12 @@ public CompletableFuture<Void> updateConsumerOffset(ProxyContext ctx, MessageQue
return this.consumerProcessor.updateConsumerOffset(ctx, messageQueue, consumerGroup, commitOffset, timeoutMillis);
}

@Override
public CompletableFuture<Void> updateConsumerOffsetAsync(ProxyContext ctx, MessageQueue messageQueue,
String consumerGroup, long commitOffset, long timeoutMillis) {
return this.consumerProcessor.updateConsumerOffsetAsync(ctx, messageQueue, consumerGroup, commitOffset, timeoutMillis);
}

@Override
public CompletableFuture<Long> queryConsumerOffset(ProxyContext ctx, MessageQueue messageQueue,
String consumerGroup, long timeoutMillis) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,10 @@
import org.apache.rocketmq.common.consumer.ReceiptHandle;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.utils.StartAndShutdown;
import org.apache.rocketmq.proxy.common.Address;
import org.apache.rocketmq.proxy.common.MessageReceiptHandle;
import org.apache.rocketmq.proxy.common.ProxyContext;
import org.apache.rocketmq.common.utils.StartAndShutdown;
import org.apache.rocketmq.proxy.service.message.ReceiptHandleMessage;
import org.apache.rocketmq.proxy.service.metadata.MetadataService;
import org.apache.rocketmq.proxy.service.relay.ProxyRelayService;
Expand Down Expand Up @@ -217,6 +217,14 @@ CompletableFuture<Void> updateConsumerOffset(
long timeoutMillis
);

CompletableFuture<Void> updateConsumerOffsetAsync(
ProxyContext ctx,
MessageQueue messageQueue,
String consumerGroup,
long commitOffset,
long timeoutMillis
);

CompletableFuture<Long> queryConsumerOffset(
ProxyContext ctx,
MessageQueue messageQueue,
Expand Down Expand Up @@ -321,7 +329,9 @@ void addTransactionSubscription(

MetadataService getMetadataService();

void addReceiptHandle(ProxyContext ctx, Channel channel, String group, String msgID, MessageReceiptHandle messageReceiptHandle);
void addReceiptHandle(ProxyContext ctx, Channel channel, String group, String msgID,
MessageReceiptHandle messageReceiptHandle);

MessageReceiptHandle removeReceiptHandle(ProxyContext ctx, Channel channel, String group, String msgID, String receiptHandle);
MessageReceiptHandle removeReceiptHandle(ProxyContext ctx, Channel channel, String group, String msgID,
String receiptHandle);
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@
import org.apache.rocketmq.common.consumer.ReceiptHandle;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.utils.FutureUtils;
import org.apache.rocketmq.proxy.common.ProxyContext;
import org.apache.rocketmq.proxy.common.ProxyException;
import org.apache.rocketmq.proxy.common.ProxyExceptionCode;
import org.apache.rocketmq.common.utils.FutureUtils;
import org.apache.rocketmq.proxy.service.route.AddressableMessageQueue;
import org.apache.rocketmq.proxy.service.route.TopicRouteService;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
Expand Down Expand Up @@ -139,7 +139,8 @@ public CompletableFuture<AckResult> ackMessage(ProxyContext ctx, ReceiptHandle h
}

@Override
public CompletableFuture<AckResult> batchAckMessage(ProxyContext ctx, List<ReceiptHandleMessage> handleList, String consumerGroup,
public CompletableFuture<AckResult> batchAckMessage(ProxyContext ctx, List<ReceiptHandleMessage> handleList,
String consumerGroup,
String topic, long timeoutMillis) {
List<String> extraInfoList = handleList.stream().map(message -> message.getReceiptHandle().getReceiptHandle()).collect(Collectors.toList());
return this.mqClientAPIFactory.getClient().batchAckMessageAsync(
Expand Down Expand Up @@ -181,6 +182,16 @@ public CompletableFuture<Void> updateConsumerOffset(ProxyContext ctx, Addressabl
);
}

@Override
public CompletableFuture<Void> updateConsumerOffsetAsync(ProxyContext ctx, AddressableMessageQueue messageQueue,
UpdateConsumerOffsetRequestHeader requestHeader, long timeoutMillis) {
return this.mqClientAPIFactory.getClient().updateConsumerOffsetAsync(
messageQueue.getBrokerAddr(),
requestHeader,
timeoutMillis
);
}

@Override
public CompletableFuture<Set<MessageQueue>> lockBatchMQ(ProxyContext ctx, AddressableMessageQueue messageQueue,
LockBatchRequestBody requestBody, long timeoutMillis) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -440,6 +440,12 @@ public CompletableFuture<Void> updateConsumerOffset(ProxyContext ctx, Addressabl
throw new NotImplementedException("updateConsumerOffset is not implemented in LocalMessageService");
}

@Override
public CompletableFuture<Void> updateConsumerOffsetAsync(ProxyContext ctx, AddressableMessageQueue messageQueue,
UpdateConsumerOffsetRequestHeader requestHeader, long timeoutMillis) {
throw new NotImplementedException("updateConsumerOffsetAsync is not implemented in LocalMessageService");
}

@Override
public CompletableFuture<Set<MessageQueue>> lockBatchMQ(ProxyContext ctx, AddressableMessageQueue messageQueue,
LockBatchRequestBody requestBody, long timeoutMillis) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,13 @@ CompletableFuture<Void> updateConsumerOffset(
long timeoutMillis
);

CompletableFuture<Void> updateConsumerOffsetAsync(
ProxyContext ctx,
AddressableMessageQueue messageQueue,
UpdateConsumerOffsetRequestHeader requestHeader,
long timeoutMillis
);

CompletableFuture<Set<MessageQueue>> lockBatchMQ(
ProxyContext ctx,
AddressableMessageQueue messageQueue,
Expand Down

0 comments on commit 1588d65

Please sign in to comment.