Skip to content

Commit

Permalink
Merge pull request #311 from wqliang/develop
Browse files Browse the repository at this point in the history
[ISSUE #293]Lack of licenses in each source file under the eventmesh-connector-api module
  • Loading branch information
xwm1992 authored Apr 23, 2021
2 parents cc2e98d + 5c4f40f commit 7bf7b06
Show file tree
Hide file tree
Showing 7 changed files with 66 additions and 10 deletions.
Original file line number Diff line number Diff line change
@@ -1,3 +1,20 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.api;

public interface AbstractContext {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,20 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.api;

import io.openmessaging.api.Message;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,20 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.api.consumer;

import java.util.List;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,4 +112,12 @@ public class EventMeshConstants {

public static final String PROPERTY_MESSAGE_KEYS = "KEYS";

public static final String PROPERTY_MESSAGE_REPLY_TO = "REPLY_TO"; //requester clientId

public static final String PROPERTY_RR_REQUEST_ID = "RR_REQUEST_UNIQ_ID";

public static final String LEAVE_TIME = "LEAVE_TIME"; //leaveBrokerTime
public static final String ARRIVE_TIME = "ARRIVE_TIME";
public static final String STORE_TIME = "STORE_TIME";

}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.eventmesh.common.Constants;
import org.apache.eventmesh.runtime.boot.EventMeshTCPServer;
import org.apache.eventmesh.runtime.constants.DeFiBusConstant;
import org.apache.eventmesh.runtime.constants.EventMeshConstants;
import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.Session;
import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.push.DownStreamMsgContext;
Expand Down Expand Up @@ -166,11 +165,11 @@ private boolean isRetryMsgTimeout(DownStreamMsgContext downStreamMsgContext) {
boolean flag = false;
long ttl = Long.parseLong(downStreamMsgContext.msgExt.getUserProperties(EventMeshConstants.PROPERTY_MESSAGE_TTL));
//TODO 关注是否能取到
long storeTimestamp = Long.parseLong(downStreamMsgContext.msgExt.getUserProperties(DeFiBusConstant.STORE_TIME));
String leaveTimeStr = downStreamMsgContext.msgExt.getUserProperties(DeFiBusConstant.LEAVE_TIME);
long storeTimestamp = Long.parseLong(downStreamMsgContext.msgExt.getUserProperties(EventMeshConstants.STORE_TIME));
String leaveTimeStr = downStreamMsgContext.msgExt.getUserProperties(EventMeshConstants.LEAVE_TIME);
long brokerCost = StringUtils.isNumeric(leaveTimeStr) ? Long.parseLong(leaveTimeStr) - storeTimestamp : 0;

String arriveTimeStr = downStreamMsgContext.msgExt.getUserProperties(DeFiBusConstant.ARRIVE_TIME);
String arriveTimeStr = downStreamMsgContext.msgExt.getUserProperties(EventMeshConstants.ARRIVE_TIME);
long accessCost = StringUtils.isNumeric(arriveTimeStr) ? System.currentTimeMillis() - Long.parseLong(arriveTimeStr) : 0;
double elapseTime = brokerCost + accessCost;
if (elapseTime >= ttl) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import org.apache.eventmesh.common.protocol.tcp.Header;
import org.apache.eventmesh.common.protocol.tcp.OPStatus;
import org.apache.eventmesh.common.protocol.tcp.Package;
import org.apache.eventmesh.runtime.constants.DeFiBusConstant;
import org.apache.eventmesh.runtime.constants.EventMeshConstants;
import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.Session;
import org.apache.eventmesh.runtime.util.EventMeshUtil;
Expand Down Expand Up @@ -81,13 +80,13 @@ public EventMeshTcpSendResult send(Header header, Message msg, SendCallback send
UpStreamMsgContext upStreamMsgContext = null;
Command cmd = header.getCommand();
if (Command.REQUEST_TO_SERVER == cmd) {
long ttl = msg.getSystemProperties(DeFiBusConstant.PROPERTY_MESSAGE_TTL) != null ? Long.parseLong(msg.getSystemProperties(DeFiBusConstant.PROPERTY_MESSAGE_TTL)) : EventMeshConstants.DEFAULT_TIMEOUT_IN_MILLISECONDS;
long ttl = msg.getSystemProperties(EventMeshConstants.PROPERTY_MESSAGE_TTL) != null ? Long.parseLong(msg.getSystemProperties(EventMeshConstants.PROPERTY_MESSAGE_TTL)) : EventMeshConstants.DEFAULT_TIMEOUT_IN_MILLISECONDS;
upStreamMsgContext = new UpStreamMsgContext(header.getSeq(), session, msg);
session.getClientGroupWrapper().get().request(upStreamMsgContext, sendCallback, initSyncRRCallback(header, startTime, taskExecuteTime), ttl);
} else if (Command.RESPONSE_TO_SERVER == cmd) {
String cluster = msg.getUserProperties(DeFiBusConstant.PROPERTY_MESSAGE_CLUSTER);
String cluster = msg.getUserProperties(EventMeshConstants.PROPERTY_MESSAGE_CLUSTER);
if (!StringUtils.isEmpty(cluster)) {
String replyTopic = DeFiBusConstant.RR_REPLY_TOPIC;
String replyTopic = EventMeshConstants.RR_REPLY_TOPIC;
replyTopic = cluster + "-" + replyTopic;
msg.getSystemProperties().put(Constants.PROPERTY_MESSAGE_DESTINATION, replyTopic);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import org.apache.eventmesh.common.protocol.tcp.OPStatus;
import org.apache.eventmesh.common.protocol.tcp.Package;
import org.apache.eventmesh.runtime.boot.EventMeshTCPServer;
import org.apache.eventmesh.runtime.constants.DeFiBusConstant;
import org.apache.eventmesh.runtime.constants.EventMeshConstants;
import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.send.EventMeshTcpSendResult;
import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.send.EventMeshTcpSendStatus;
Expand Down Expand Up @@ -86,7 +85,7 @@ public void operationComplete(ChannelFuture future) throws Exception {
long sendTime = System.currentTimeMillis();
addTimestamp(eventMeshMessage, cmd, sendTime);
if (cmd.equals(Command.REQUEST_TO_SERVER)) {
eventMeshMessage.getProperties().put(DeFiBusConstant.PROPERTY_MESSAGE_REPLY_TO, session.getClientGroupWrapper()
eventMeshMessage.getProperties().put(EventMeshConstants.PROPERTY_MESSAGE_REPLY_TO, session.getClientGroupWrapper()
.get().getMqProducerWrapper().getMeshMQProducer().buildMQClientId());
}

Expand Down

0 comments on commit 7bf7b06

Please sign in to comment.