Skip to content

Commit

Permalink
Merge pull request #227 from xwm1992/1.2.0
Browse files Browse the repository at this point in the history
[ISSUE #226]eventMesh-rocketmq-connector tcp pub throw operation time out exception
  • Loading branch information
xwm1992 authored Feb 26, 2021
2 parents 7bf365d + f375636 commit b0bf19c
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,7 @@
import com.webank.eventmesh.connector.rocketmq.promise.DefaultPromise;
import com.webank.eventmesh.connector.rocketmq.utils.OMSUtil;
import com.webank.eventmesh.api.SendCallback;
import io.openmessaging.BytesMessage;
import io.openmessaging.KeyValue;
import io.openmessaging.Message;
import io.openmessaging.Promise;
import io.openmessaging.*;
import io.openmessaging.exception.OMSRuntimeException;
import io.openmessaging.interceptor.ProducerInterceptor;
import io.openmessaging.producer.BatchMessageSender;
Expand All @@ -38,8 +35,6 @@ public class ProducerImpl extends AbstractOMSProducer implements Producer {

public static final int proxyServerAsyncAccumulationThreshold = 1000;

protected SendCallback sendCallback;

public ProducerImpl(final KeyValue properties) {
super(properties);
}
Expand All @@ -66,6 +61,16 @@ public SendResult send(Message message, LocalTransactionExecutor branchExecutor,
return null;
}

@Override
public Future<SendResult> sendAsync(Message message) {
return null;
}

@Override
public Future<SendResult> sendAsync(Message message, KeyValue attributes) {
return null;
}

private SendResult send(final Message message, long timeout) {
checkMessageType(message);
org.apache.rocketmq.common.message.Message rmqMessage = msgConvert((BytesMessage) message);
Expand All @@ -83,19 +88,11 @@ private SendResult send(final Message message, long timeout) {
}
}

@Override
public Promise<SendResult> sendAsync(final Message message) {
return sendAsync(message, this.rocketmqProducer.getSendMsgTimeout());
public Promise<SendResult> sendAsync(final Message message, SendCallback sendCallback) {
return sendAsync(message, this.rocketmqProducer.getSendMsgTimeout(), sendCallback);
}

@Override
public Promise<SendResult> sendAsync(final Message message, final KeyValue properties) {
long timeout = properties.containsKey(Message.BuiltinKeys.TIMEOUT)
? properties.getInt(Message.BuiltinKeys.TIMEOUT) : this.rocketmqProducer.getSendMsgTimeout();
return sendAsync(message, timeout);
}

private Promise<SendResult> sendAsync(final Message message, long timeout) {
private Promise<SendResult> sendAsync(final Message message, long timeout, SendCallback sendCallback) {
checkMessageType(message);
org.apache.rocketmq.common.message.Message rmqMessage = msgConvert((BytesMessage) message);
final Promise<SendResult> promise = new DefaultPromise<>();
Expand All @@ -106,13 +103,13 @@ public void onSuccess(final org.apache.rocketmq.client.producer.SendResult rmqRe
message.sysHeaders().put(Message.BuiltinKeys.MESSAGE_ID, rmqResult.getMsgId());
SendResult omsSendResult = OMSUtil.sendResultConvert(rmqResult);
promise.set(omsSendResult);
ProducerImpl.this.sendCallback.onSuccess(omsSendResult);
sendCallback.onSuccess(omsSendResult);
}

@Override
public void onException(final Throwable e) {
promise.setFailure(e);
ProducerImpl.this.sendCallback.onException(e);
sendCallback.onException(e);
}
}, timeout);
} catch (Exception e) {
Expand Down Expand Up @@ -161,11 +158,4 @@ public void setExtFields(){
super.getRocketmqProducer().setCompressMsgBodyOverHowmuch(10);
}

public void setSendCallback(SendCallback sendCallback) {
this.sendCallback = sendCallback;
}

public SendCallback getSendCallback() {
return sendCallback;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,7 @@ public synchronized void shutdown() {

@Override
public void send(Message message, SendCallback sendCallback) throws Exception {
if (producer.getSendCallback() == null){
producer.setSendCallback(sendCallback);
}
producer.sendAsync(message);
producer.sendAsync(message, sendCallback);
}

@Override
Expand Down

0 comments on commit b0bf19c

Please sign in to comment.