diff --git a/eventmesh-connector-rocketmq/src/main/java/com/webank/eventmesh/connector/rocketmq/producer/ProducerImpl.java b/eventmesh-connector-rocketmq/src/main/java/com/webank/eventmesh/connector/rocketmq/producer/ProducerImpl.java index dcc9bf3e6a..f9cef8be62 100644 --- a/eventmesh-connector-rocketmq/src/main/java/com/webank/eventmesh/connector/rocketmq/producer/ProducerImpl.java +++ b/eventmesh-connector-rocketmq/src/main/java/com/webank/eventmesh/connector/rocketmq/producer/ProducerImpl.java @@ -19,10 +19,7 @@ import com.webank.eventmesh.connector.rocketmq.promise.DefaultPromise; import com.webank.eventmesh.connector.rocketmq.utils.OMSUtil; import com.webank.eventmesh.api.SendCallback; -import io.openmessaging.BytesMessage; -import io.openmessaging.KeyValue; -import io.openmessaging.Message; -import io.openmessaging.Promise; +import io.openmessaging.*; import io.openmessaging.exception.OMSRuntimeException; import io.openmessaging.interceptor.ProducerInterceptor; import io.openmessaging.producer.BatchMessageSender; @@ -38,8 +35,6 @@ public class ProducerImpl extends AbstractOMSProducer implements Producer { public static final int proxyServerAsyncAccumulationThreshold = 1000; - protected SendCallback sendCallback; - public ProducerImpl(final KeyValue properties) { super(properties); } @@ -66,6 +61,16 @@ public SendResult send(Message message, LocalTransactionExecutor branchExecutor, return null; } + @Override + public Future sendAsync(Message message) { + return null; + } + + @Override + public Future sendAsync(Message message, KeyValue attributes) { + return null; + } + private SendResult send(final Message message, long timeout) { checkMessageType(message); org.apache.rocketmq.common.message.Message rmqMessage = msgConvert((BytesMessage) message); @@ -83,19 +88,11 @@ private SendResult send(final Message message, long timeout) { } } - @Override - public Promise sendAsync(final Message message) { - return sendAsync(message, this.rocketmqProducer.getSendMsgTimeout()); + public Promise sendAsync(final Message message, SendCallback sendCallback) { + return sendAsync(message, this.rocketmqProducer.getSendMsgTimeout(), sendCallback); } - @Override - public Promise sendAsync(final Message message, final KeyValue properties) { - long timeout = properties.containsKey(Message.BuiltinKeys.TIMEOUT) - ? properties.getInt(Message.BuiltinKeys.TIMEOUT) : this.rocketmqProducer.getSendMsgTimeout(); - return sendAsync(message, timeout); - } - - private Promise sendAsync(final Message message, long timeout) { + private Promise sendAsync(final Message message, long timeout, SendCallback sendCallback) { checkMessageType(message); org.apache.rocketmq.common.message.Message rmqMessage = msgConvert((BytesMessage) message); final Promise promise = new DefaultPromise<>(); @@ -106,13 +103,13 @@ public void onSuccess(final org.apache.rocketmq.client.producer.SendResult rmqRe message.sysHeaders().put(Message.BuiltinKeys.MESSAGE_ID, rmqResult.getMsgId()); SendResult omsSendResult = OMSUtil.sendResultConvert(rmqResult); promise.set(omsSendResult); - ProducerImpl.this.sendCallback.onSuccess(omsSendResult); + sendCallback.onSuccess(omsSendResult); } @Override public void onException(final Throwable e) { promise.setFailure(e); - ProducerImpl.this.sendCallback.onException(e); + sendCallback.onException(e); } }, timeout); } catch (Exception e) { @@ -161,11 +158,4 @@ public void setExtFields(){ super.getRocketmqProducer().setCompressMsgBodyOverHowmuch(10); } - public void setSendCallback(SendCallback sendCallback) { - this.sendCallback = sendCallback; - } - - public SendCallback getSendCallback() { - return sendCallback; - } } diff --git a/eventmesh-connector-rocketmq/src/main/java/com/webank/eventmesh/connector/rocketmq/producer/RocketMQProducerImpl.java b/eventmesh-connector-rocketmq/src/main/java/com/webank/eventmesh/connector/rocketmq/producer/RocketMQProducerImpl.java index 1a4cbba3aa..bd3b5df010 100644 --- a/eventmesh-connector-rocketmq/src/main/java/com/webank/eventmesh/connector/rocketmq/producer/RocketMQProducerImpl.java +++ b/eventmesh-connector-rocketmq/src/main/java/com/webank/eventmesh/connector/rocketmq/producer/RocketMQProducerImpl.java @@ -82,10 +82,7 @@ public synchronized void shutdown() { @Override public void send(Message message, SendCallback sendCallback) throws Exception { - if (producer.getSendCallback() == null){ - producer.setSendCallback(sendCallback); - } - producer.sendAsync(message); + producer.sendAsync(message, sendCallback); } @Override