Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #226]eventMesh-rocketmq-connector tcp pub throw operation time out exception #227

Merged
merged 1 commit into from
Feb 26, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,7 @@
import com.webank.eventmesh.connector.rocketmq.promise.DefaultPromise;
import com.webank.eventmesh.connector.rocketmq.utils.OMSUtil;
import com.webank.eventmesh.api.SendCallback;
import io.openmessaging.BytesMessage;
import io.openmessaging.KeyValue;
import io.openmessaging.Message;
import io.openmessaging.Promise;
import io.openmessaging.*;
import io.openmessaging.exception.OMSRuntimeException;
import io.openmessaging.interceptor.ProducerInterceptor;
import io.openmessaging.producer.BatchMessageSender;
Expand All @@ -38,8 +35,6 @@ public class ProducerImpl extends AbstractOMSProducer implements Producer {

public static final int proxyServerAsyncAccumulationThreshold = 1000;

protected SendCallback sendCallback;

public ProducerImpl(final KeyValue properties) {
super(properties);
}
Expand All @@ -66,6 +61,16 @@ public SendResult send(Message message, LocalTransactionExecutor branchExecutor,
return null;
}

@Override
public Future<SendResult> sendAsync(Message message) {
return null;
}

@Override
public Future<SendResult> sendAsync(Message message, KeyValue attributes) {
return null;
}

private SendResult send(final Message message, long timeout) {
checkMessageType(message);
org.apache.rocketmq.common.message.Message rmqMessage = msgConvert((BytesMessage) message);
Expand All @@ -83,19 +88,11 @@ private SendResult send(final Message message, long timeout) {
}
}

@Override
public Promise<SendResult> sendAsync(final Message message) {
return sendAsync(message, this.rocketmqProducer.getSendMsgTimeout());
public Promise<SendResult> sendAsync(final Message message, SendCallback sendCallback) {
return sendAsync(message, this.rocketmqProducer.getSendMsgTimeout(), sendCallback);
}

@Override
public Promise<SendResult> sendAsync(final Message message, final KeyValue properties) {
long timeout = properties.containsKey(Message.BuiltinKeys.TIMEOUT)
? properties.getInt(Message.BuiltinKeys.TIMEOUT) : this.rocketmqProducer.getSendMsgTimeout();
return sendAsync(message, timeout);
}

private Promise<SendResult> sendAsync(final Message message, long timeout) {
private Promise<SendResult> sendAsync(final Message message, long timeout, SendCallback sendCallback) {
checkMessageType(message);
org.apache.rocketmq.common.message.Message rmqMessage = msgConvert((BytesMessage) message);
final Promise<SendResult> promise = new DefaultPromise<>();
Expand All @@ -106,13 +103,13 @@ public void onSuccess(final org.apache.rocketmq.client.producer.SendResult rmqRe
message.sysHeaders().put(Message.BuiltinKeys.MESSAGE_ID, rmqResult.getMsgId());
SendResult omsSendResult = OMSUtil.sendResultConvert(rmqResult);
promise.set(omsSendResult);
ProducerImpl.this.sendCallback.onSuccess(omsSendResult);
sendCallback.onSuccess(omsSendResult);
}

@Override
public void onException(final Throwable e) {
promise.setFailure(e);
ProducerImpl.this.sendCallback.onException(e);
sendCallback.onException(e);
}
}, timeout);
} catch (Exception e) {
Expand Down Expand Up @@ -161,11 +158,4 @@ public void setExtFields(){
super.getRocketmqProducer().setCompressMsgBodyOverHowmuch(10);
}

public void setSendCallback(SendCallback sendCallback) {
this.sendCallback = sendCallback;
}

public SendCallback getSendCallback() {
return sendCallback;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,7 @@ public synchronized void shutdown() {

@Override
public void send(Message message, SendCallback sendCallback) throws Exception {
if (producer.getSendCallback() == null){
producer.setSendCallback(sendCallback);
}
producer.sendAsync(message);
producer.sendAsync(message, sendCallback);
}

@Override
Expand Down