diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ConsumerProcessor.java b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ConsumerProcessor.java index 24fc0a2a28f..ace8af1b994 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ConsumerProcessor.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ConsumerProcessor.java @@ -40,12 +40,12 @@ import org.apache.rocketmq.common.message.MessageConst; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.common.utils.FutureUtils; import org.apache.rocketmq.logging.org.slf4j.Logger; import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; import org.apache.rocketmq.proxy.common.ProxyContext; import org.apache.rocketmq.proxy.common.ProxyException; import org.apache.rocketmq.proxy.common.ProxyExceptionCode; -import org.apache.rocketmq.common.utils.FutureUtils; import org.apache.rocketmq.proxy.common.utils.ProxyUtils; import org.apache.rocketmq.proxy.service.ServiceManager; import org.apache.rocketmq.proxy.service.message.ReceiptHandleMessage; @@ -96,7 +96,7 @@ public CompletableFuture popMessage( } return popMessage(ctx, messageQueue, consumerGroup, topic, maxMsgNums, invisibleTime, pollTime, initMode, subscriptionData, fifo, popMessageResultFilter, attemptId, timeoutMillis); - } catch (Throwable t) { + } catch (Throwable t) { future.completeExceptionally(t); } return future; @@ -287,7 +287,8 @@ public CompletableFuture> batchAckMessage( return FutureUtils.addExecutor(future, this.executor); } - protected CompletableFuture> processBrokerHandle(ProxyContext ctx, String consumerGroup, String topic, List handleMessageList, long timeoutMillis) { + protected CompletableFuture> processBrokerHandle(ProxyContext ctx, String consumerGroup, + String topic, List handleMessageList, long timeoutMillis) { return this.serviceManager.getMessageService().batchAckMessage(ctx, handleMessageList, consumerGroup, topic, timeoutMillis) .thenApply(result -> { List results = new ArrayList<>(); @@ -393,6 +394,24 @@ public CompletableFuture updateConsumerOffset(ProxyContext ctx, MessageQue return FutureUtils.addExecutor(future, this.executor); } + public CompletableFuture updateConsumerOffsetAsync(ProxyContext ctx, MessageQueue messageQueue, + String consumerGroup, long commitOffset, long timeoutMillis) { + CompletableFuture future = new CompletableFuture<>(); + try { + AddressableMessageQueue addressableMessageQueue = serviceManager.getTopicRouteService() + .buildAddressableMessageQueue(ctx, messageQueue); + UpdateConsumerOffsetRequestHeader requestHeader = new UpdateConsumerOffsetRequestHeader(); + requestHeader.setConsumerGroup(consumerGroup); + requestHeader.setTopic(addressableMessageQueue.getTopic()); + requestHeader.setQueueId(addressableMessageQueue.getQueueId()); + requestHeader.setCommitOffset(commitOffset); + future = serviceManager.getMessageService().updateConsumerOffsetAsync(ctx, addressableMessageQueue, requestHeader, timeoutMillis); + } catch (Throwable t) { + future.completeExceptionally(t); + } + return FutureUtils.addExecutor(future, this.executor); + } + public CompletableFuture queryConsumerOffset(ProxyContext ctx, MessageQueue messageQueue, String consumerGroup, long timeoutMillis) { CompletableFuture future = new CompletableFuture<>(); @@ -501,9 +520,9 @@ public CompletableFuture getMinOffset(ProxyContext ctx, MessageQueue messa protected Set buildAddressableSet(ProxyContext ctx, Set mqSet) { Set addressableMessageQueueSet = new HashSet<>(mqSet.size()); - for (MessageQueue mq:mqSet) { + for (MessageQueue mq : mqSet) { try { - addressableMessageQueueSet.add(serviceManager.getTopicRouteService().buildAddressableMessageQueue(ctx, mq)) ; + addressableMessageQueueSet.add(serviceManager.getTopicRouteService().buildAddressableMessageQueue(ctx, mq)); } catch (Exception e) { log.error("build addressable message queue fail, messageQueue = {}", mq, e); } diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/DefaultMessagingProcessor.java b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/DefaultMessagingProcessor.java index 48a732c284b..9c494d7a451 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/DefaultMessagingProcessor.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/DefaultMessagingProcessor.java @@ -75,7 +75,7 @@ public class DefaultMessagingProcessor extends AbstractStartAndShutdown implemen protected ThreadPoolExecutor producerProcessorExecutor; protected ThreadPoolExecutor consumerProcessorExecutor; protected static final String ROCKETMQ_HOME = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, - System.getenv(MixAll.ROCKETMQ_HOME_ENV)); + System.getenv(MixAll.ROCKETMQ_HOME_ENV)); protected DefaultMessagingProcessor(ServiceManager serviceManager) { ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig(); @@ -167,7 +167,8 @@ public CompletableFuture forwardMessageToDeadLetterQueue(ProxyC } @Override - public CompletableFuture endTransaction(ProxyContext ctx, String topic, String transactionId, String messageId, String producerGroup, + public CompletableFuture endTransaction(ProxyContext ctx, String topic, String transactionId, + String messageId, String producerGroup, TransactionStatus transactionStatus, boolean fromTransactionCheck, long timeoutMillis) { return this.transactionProcessor.endTransaction(ctx, topic, transactionId, messageId, producerGroup, transactionStatus, fromTransactionCheck, timeoutMillis); @@ -225,6 +226,12 @@ public CompletableFuture updateConsumerOffset(ProxyContext ctx, MessageQue return this.consumerProcessor.updateConsumerOffset(ctx, messageQueue, consumerGroup, commitOffset, timeoutMillis); } + @Override + public CompletableFuture updateConsumerOffsetAsync(ProxyContext ctx, MessageQueue messageQueue, + String consumerGroup, long commitOffset, long timeoutMillis) { + return this.consumerProcessor.updateConsumerOffsetAsync(ctx, messageQueue, consumerGroup, commitOffset, timeoutMillis); + } + @Override public CompletableFuture queryConsumerOffset(ProxyContext ctx, MessageQueue messageQueue, String consumerGroup, long timeoutMillis) { diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/MessagingProcessor.java b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/MessagingProcessor.java index 213d2beeeac..03d28262d73 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/MessagingProcessor.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/MessagingProcessor.java @@ -33,10 +33,10 @@ import org.apache.rocketmq.common.consumer.ReceiptHandle; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.common.utils.StartAndShutdown; import org.apache.rocketmq.proxy.common.Address; import org.apache.rocketmq.proxy.common.MessageReceiptHandle; import org.apache.rocketmq.proxy.common.ProxyContext; -import org.apache.rocketmq.common.utils.StartAndShutdown; import org.apache.rocketmq.proxy.service.message.ReceiptHandleMessage; import org.apache.rocketmq.proxy.service.metadata.MetadataService; import org.apache.rocketmq.proxy.service.relay.ProxyRelayService; @@ -217,6 +217,14 @@ CompletableFuture updateConsumerOffset( long timeoutMillis ); + CompletableFuture updateConsumerOffsetAsync( + ProxyContext ctx, + MessageQueue messageQueue, + String consumerGroup, + long commitOffset, + long timeoutMillis + ); + CompletableFuture queryConsumerOffset( ProxyContext ctx, MessageQueue messageQueue, @@ -321,7 +329,9 @@ void addTransactionSubscription( MetadataService getMetadataService(); - void addReceiptHandle(ProxyContext ctx, Channel channel, String group, String msgID, MessageReceiptHandle messageReceiptHandle); + void addReceiptHandle(ProxyContext ctx, Channel channel, String group, String msgID, + MessageReceiptHandle messageReceiptHandle); - MessageReceiptHandle removeReceiptHandle(ProxyContext ctx, Channel channel, String group, String msgID, String receiptHandle); + MessageReceiptHandle removeReceiptHandle(ProxyContext ctx, Channel channel, String group, String msgID, + String receiptHandle); } diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/ClusterMessageService.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/ClusterMessageService.java index ba7d5ad8e28..f9eb94fcfce 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/ClusterMessageService.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/ClusterMessageService.java @@ -29,10 +29,10 @@ import org.apache.rocketmq.common.consumer.ReceiptHandle; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.common.utils.FutureUtils; import org.apache.rocketmq.proxy.common.ProxyContext; import org.apache.rocketmq.proxy.common.ProxyException; import org.apache.rocketmq.proxy.common.ProxyExceptionCode; -import org.apache.rocketmq.common.utils.FutureUtils; import org.apache.rocketmq.proxy.service.route.AddressableMessageQueue; import org.apache.rocketmq.proxy.service.route.TopicRouteService; import org.apache.rocketmq.remoting.protocol.RemotingCommand; @@ -139,7 +139,8 @@ public CompletableFuture ackMessage(ProxyContext ctx, ReceiptHandle h } @Override - public CompletableFuture batchAckMessage(ProxyContext ctx, List handleList, String consumerGroup, + public CompletableFuture batchAckMessage(ProxyContext ctx, List handleList, + String consumerGroup, String topic, long timeoutMillis) { List extraInfoList = handleList.stream().map(message -> message.getReceiptHandle().getReceiptHandle()).collect(Collectors.toList()); return this.mqClientAPIFactory.getClient().batchAckMessageAsync( @@ -181,6 +182,16 @@ public CompletableFuture updateConsumerOffset(ProxyContext ctx, Addressabl ); } + @Override + public CompletableFuture updateConsumerOffsetAsync(ProxyContext ctx, AddressableMessageQueue messageQueue, + UpdateConsumerOffsetRequestHeader requestHeader, long timeoutMillis) { + return this.mqClientAPIFactory.getClient().updateConsumerOffsetAsync( + messageQueue.getBrokerAddr(), + requestHeader, + timeoutMillis + ); + } + @Override public CompletableFuture> lockBatchMQ(ProxyContext ctx, AddressableMessageQueue messageQueue, LockBatchRequestBody requestBody, long timeoutMillis) { diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalMessageService.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalMessageService.java index aaa688fee64..6b2ba02f7c9 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalMessageService.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalMessageService.java @@ -440,6 +440,12 @@ public CompletableFuture updateConsumerOffset(ProxyContext ctx, Addressabl throw new NotImplementedException("updateConsumerOffset is not implemented in LocalMessageService"); } + @Override + public CompletableFuture updateConsumerOffsetAsync(ProxyContext ctx, AddressableMessageQueue messageQueue, + UpdateConsumerOffsetRequestHeader requestHeader, long timeoutMillis) { + throw new NotImplementedException("updateConsumerOffsetAsync is not implemented in LocalMessageService"); + } + @Override public CompletableFuture> lockBatchMQ(ProxyContext ctx, AddressableMessageQueue messageQueue, LockBatchRequestBody requestBody, long timeoutMillis) { diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/MessageService.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/MessageService.java index 58a835adb46..61accbc0412 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/MessageService.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/MessageService.java @@ -120,6 +120,13 @@ CompletableFuture updateConsumerOffset( long timeoutMillis ); + CompletableFuture updateConsumerOffsetAsync( + ProxyContext ctx, + AddressableMessageQueue messageQueue, + UpdateConsumerOffsetRequestHeader requestHeader, + long timeoutMillis + ); + CompletableFuture> lockBatchMQ( ProxyContext ctx, AddressableMessageQueue messageQueue,