diff --git a/eventmesh-protocol-plugin/eventmesh-protocol-cloudevents/src/main/java/org/apache/eventmesh/protocol/cloudevents/resolver/http/SendMessageBatchV2ProtocolResolver.java b/eventmesh-protocol-plugin/eventmesh-protocol-cloudevents/src/main/java/org/apache/eventmesh/protocol/cloudevents/resolver/http/SendMessageBatchV2ProtocolResolver.java index d5ea9a5eb0..fedbbe046e 100644 --- a/eventmesh-protocol-plugin/eventmesh-protocol-cloudevents/src/main/java/org/apache/eventmesh/protocol/cloudevents/resolver/http/SendMessageBatchV2ProtocolResolver.java +++ b/eventmesh-protocol-plugin/eventmesh-protocol-cloudevents/src/main/java/org/apache/eventmesh/protocol/cloudevents/resolver/http/SendMessageBatchV2ProtocolResolver.java @@ -8,6 +8,7 @@ import org.apache.commons.lang3.StringUtils; import org.apache.eventmesh.common.protocol.http.body.Body; import org.apache.eventmesh.common.protocol.http.body.message.SendMessageBatchV2RequestBody; +import org.apache.eventmesh.common.protocol.http.common.ProtocolKey; import org.apache.eventmesh.common.protocol.http.common.ProtocolVersion; import org.apache.eventmesh.common.protocol.http.header.Header; import org.apache.eventmesh.common.protocol.http.header.message.SendMessageBatchV2RequestHeader; @@ -41,33 +42,36 @@ public static CloudEvent buildEvent(Header header, Body body) throws ProtocolHan if (StringUtils.equals(SpecVersion.V1.toString(), protocolVersion)) { event = JsonUtils.deserialize(content, CloudEventV1.class); event = CloudEventBuilder.from(event) - .withExtension("code", code) - .withExtension("env", env) - .withExtension("idc", idc) - .withExtension("ip", ip) - .withExtension("pid", pid) - .withExtension("sys", sys) - .withExtension("username", username) - .withExtension("passwd", passwd) - .withExtension("version", version.getVersion()) - .withExtension("language", language) - .withExtension("protocolType", protocolType) - .withExtension("protocolDesc", protocolDesc) - .withExtension("protocolVersion", protocolVersion) + .withExtension(ProtocolKey.REQUEST_CODE, code) + .withExtension(ProtocolKey.ClientInstanceKey.ENV, env) + .withExtension(ProtocolKey.ClientInstanceKey.IDC, idc) + .withExtension(ProtocolKey.ClientInstanceKey.IP, ip) + .withExtension(ProtocolKey.ClientInstanceKey.PID, pid) + .withExtension(ProtocolKey.ClientInstanceKey.SYS, sys) + .withExtension(ProtocolKey.ClientInstanceKey.USERNAME, username) + .withExtension(ProtocolKey.ClientInstanceKey.PASSWD, passwd) + .withExtension(ProtocolKey.VERSION, version.getVersion()) + .withExtension(ProtocolKey.LANGUAGE, language) + .withExtension(ProtocolKey.PROTOCOL_TYPE, protocolType) + .withExtension(ProtocolKey.PROTOCOL_DESC, protocolDesc) + .withExtension(ProtocolKey.PROTOCOL_VERSION, protocolVersion) .build(); } else if (StringUtils.equals(SpecVersion.V03.toString(), protocolVersion)) { event = JsonUtils.deserialize(content, CloudEventV03.class); event = CloudEventBuilder.from(event) - .withExtension("code", code) - .withExtension("env", env) - .withExtension("idc", idc) - .withExtension("ip", ip) - .withExtension("pid", pid) - .withExtension("sys", sys) - .withExtension("username", username) - .withExtension("passwd", passwd) - .withExtension("version", version.getVersion()) - .withExtension("language", language) + .withExtension(ProtocolKey.REQUEST_CODE, code) + .withExtension(ProtocolKey.ClientInstanceKey.ENV, env) + .withExtension(ProtocolKey.ClientInstanceKey.IDC, idc) + .withExtension(ProtocolKey.ClientInstanceKey.IP, ip) + .withExtension(ProtocolKey.ClientInstanceKey.PID, pid) + .withExtension(ProtocolKey.ClientInstanceKey.SYS, sys) + .withExtension(ProtocolKey.ClientInstanceKey.USERNAME, username) + .withExtension(ProtocolKey.ClientInstanceKey.PASSWD, passwd) + .withExtension(ProtocolKey.VERSION, version.getVersion()) + .withExtension(ProtocolKey.LANGUAGE, language) + .withExtension(ProtocolKey.PROTOCOL_TYPE, protocolType) + .withExtension(ProtocolKey.PROTOCOL_DESC, protocolDesc) + .withExtension(ProtocolKey.PROTOCOL_VERSION, protocolVersion) .build(); } return event; diff --git a/eventmesh-protocol-plugin/eventmesh-protocol-cloudevents/src/main/java/org/apache/eventmesh/protocol/cloudevents/resolver/http/SendMessageRequestProtocolResolver.java b/eventmesh-protocol-plugin/eventmesh-protocol-cloudevents/src/main/java/org/apache/eventmesh/protocol/cloudevents/resolver/http/SendMessageRequestProtocolResolver.java index 8055625ca7..7ef619a66e 100644 --- a/eventmesh-protocol-plugin/eventmesh-protocol-cloudevents/src/main/java/org/apache/eventmesh/protocol/cloudevents/resolver/http/SendMessageRequestProtocolResolver.java +++ b/eventmesh-protocol-plugin/eventmesh-protocol-cloudevents/src/main/java/org/apache/eventmesh/protocol/cloudevents/resolver/http/SendMessageRequestProtocolResolver.java @@ -8,6 +8,7 @@ import org.apache.commons.lang3.StringUtils; import org.apache.eventmesh.common.protocol.http.body.Body; import org.apache.eventmesh.common.protocol.http.body.message.SendMessageRequestBody; +import org.apache.eventmesh.common.protocol.http.common.ProtocolKey; import org.apache.eventmesh.common.protocol.http.common.ProtocolVersion; import org.apache.eventmesh.common.protocol.http.header.Header; import org.apache.eventmesh.common.protocol.http.header.message.SendMessageRequestHeader; @@ -42,33 +43,36 @@ public static CloudEvent buildEvent(Header header, Body body) throws ProtocolHan if (StringUtils.equals(SpecVersion.V1.toString(), protocolVersion)) { event = JsonUtils.deserialize(content, CloudEventV1.class); event = CloudEventBuilder.from(event) - .withExtension("code", code) - .withExtension("env", env) - .withExtension("idc", idc) - .withExtension("ip", ip) - .withExtension("pid", pid) - .withExtension("sys", sys) - .withExtension("username", username) - .withExtension("passwd", passwd) - .withExtension("version", version.getVersion()) - .withExtension("language", language) - .withExtension("protocolType", protocolType) - .withExtension("protocolDesc", protocolDesc) - .withExtension("protocolVersion", protocolVersion) + .withExtension(ProtocolKey.REQUEST_CODE, code) + .withExtension(ProtocolKey.ClientInstanceKey.ENV, env) + .withExtension(ProtocolKey.ClientInstanceKey.IDC, idc) + .withExtension(ProtocolKey.ClientInstanceKey.IP, ip) + .withExtension(ProtocolKey.ClientInstanceKey.PID, pid) + .withExtension(ProtocolKey.ClientInstanceKey.SYS, sys) + .withExtension(ProtocolKey.ClientInstanceKey.USERNAME, username) + .withExtension(ProtocolKey.ClientInstanceKey.PASSWD, passwd) + .withExtension(ProtocolKey.VERSION, version.getVersion()) + .withExtension(ProtocolKey.LANGUAGE, language) + .withExtension(ProtocolKey.PROTOCOL_TYPE, protocolType) + .withExtension(ProtocolKey.PROTOCOL_DESC, protocolDesc) + .withExtension(ProtocolKey.PROTOCOL_VERSION, protocolVersion) .build(); } else if (StringUtils.equals(SpecVersion.V03.toString(), protocolVersion)) { event = JsonUtils.deserialize(content, CloudEventV03.class); event = CloudEventBuilder.from(event) - .withExtension("code", code) - .withExtension("env", env) - .withExtension("idc", idc) - .withExtension("ip", ip) - .withExtension("pid", pid) - .withExtension("sys", sys) - .withExtension("username", username) - .withExtension("passwd", passwd) - .withExtension("version", version.getVersion()) - .withExtension("language", language) + .withExtension(ProtocolKey.REQUEST_CODE, code) + .withExtension(ProtocolKey.ClientInstanceKey.ENV, env) + .withExtension(ProtocolKey.ClientInstanceKey.IDC, idc) + .withExtension(ProtocolKey.ClientInstanceKey.IP, ip) + .withExtension(ProtocolKey.ClientInstanceKey.PID, pid) + .withExtension(ProtocolKey.ClientInstanceKey.SYS, sys) + .withExtension(ProtocolKey.ClientInstanceKey.USERNAME, username) + .withExtension(ProtocolKey.ClientInstanceKey.PASSWD, passwd) + .withExtension(ProtocolKey.VERSION, version.getVersion()) + .withExtension(ProtocolKey.LANGUAGE, language) + .withExtension(ProtocolKey.PROTOCOL_TYPE, protocolType) + .withExtension(ProtocolKey.PROTOCOL_DESC, protocolDesc) + .withExtension(ProtocolKey.PROTOCOL_VERSION, protocolVersion) .build(); } return event; diff --git a/eventmesh-protocol-plugin/eventmesh-protocol-cloudevents/src/main/java/org/apache/eventmesh/protocol/cloudevents/resolver/tcp/TcpMessageProtocolResolver.java b/eventmesh-protocol-plugin/eventmesh-protocol-cloudevents/src/main/java/org/apache/eventmesh/protocol/cloudevents/resolver/tcp/TcpMessageProtocolResolver.java index 3403fb5593..e5fa7f218c 100644 --- a/eventmesh-protocol-plugin/eventmesh-protocol-cloudevents/src/main/java/org/apache/eventmesh/protocol/cloudevents/resolver/tcp/TcpMessageProtocolResolver.java +++ b/eventmesh-protocol-plugin/eventmesh-protocol-cloudevents/src/main/java/org/apache/eventmesh/protocol/cloudevents/resolver/tcp/TcpMessageProtocolResolver.java @@ -7,6 +7,7 @@ import org.apache.eventmesh.common.Constants; import org.apache.eventmesh.common.protocol.tcp.Header; import org.apache.eventmesh.protocol.api.exception.ProtocolHandleException; +import org.apache.eventmesh.protocol.cloudevents.CloudEventsProtocolConstant; public class TcpMessageProtocolResolver { @@ -24,7 +25,7 @@ public static CloudEvent buildEvent(Header header, Object body) throws ProtocolH protocolType, protocolVersion, protocolDesc)); } - if (!StringUtils.equals("cloudevents", protocolType)) { + if (!StringUtils.equals(CloudEventsProtocolConstant.PROTOCOL_NAME, protocolType)) { throw new ProtocolHandleException(String.format("Unsupported protocolType: %s", protocolType)); } if (StringUtils.equals(SpecVersion.V1.toString(), protocolVersion)) { diff --git a/eventmesh-protocol-plugin/eventmesh-protocol-eventmeshmessage/src/main/java/org/apache/eventmesh/protocol/eventmeshmessage/EventMeshMessageProtocolAdaptor.java b/eventmesh-protocol-plugin/eventmesh-protocol-eventmeshmessage/src/main/java/org/apache/eventmesh/protocol/eventmeshmessage/EventMeshMessageProtocolAdaptor.java new file mode 100644 index 0000000000..4b8eeefa23 --- /dev/null +++ b/eventmesh-protocol-plugin/eventmesh-protocol-eventmeshmessage/src/main/java/org/apache/eventmesh/protocol/eventmeshmessage/EventMeshMessageProtocolAdaptor.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.protocol.eventmeshmessage; + +import io.cloudevents.CloudEvent; +import org.apache.commons.lang3.StringUtils; +import org.apache.eventmesh.common.Constants; +import org.apache.eventmesh.common.command.HttpCommand; +import org.apache.eventmesh.common.protocol.http.body.Body; +import org.apache.eventmesh.common.protocol.http.common.RequestCode; +import org.apache.eventmesh.common.protocol.tcp.EventMeshMessage; +import org.apache.eventmesh.common.protocol.tcp.Header; +import org.apache.eventmesh.common.protocol.tcp.Package; +import org.apache.eventmesh.protocol.api.ProtocolAdaptor; +import org.apache.eventmesh.protocol.api.exception.ProtocolHandleException; +import org.apache.eventmesh.protocol.eventmeshmessage.resolver.http.SendMessageBatchProtocolResolver; +import org.apache.eventmesh.protocol.eventmeshmessage.resolver.http.SendMessageBatchV2ProtocolResolver; +import org.apache.eventmesh.protocol.eventmeshmessage.resolver.http.SendMessageRequestProtocolResolver; +import org.apache.eventmesh.protocol.eventmeshmessage.resolver.tcp.TcpMessageProtocolResolver; + +import java.nio.charset.StandardCharsets; +import java.util.List; + +public class EventMeshMessageProtocolAdaptor implements ProtocolAdaptor { + + @Override + public CloudEvent toCloudEvent(T protocol) throws ProtocolHandleException { + if (protocol instanceof Package) { + Header header = ((Package) protocol).getHeader(); + Object body = ((Package) protocol).getBody(); + + return deserializeTcpProtocol(header, body); + + } else if (protocol instanceof HttpCommand) { + org.apache.eventmesh.common.protocol.http.header.Header header = ((HttpCommand) protocol).getHeader(); + Body body = ((HttpCommand) protocol).getBody(); + String requestCode = ((HttpCommand) protocol).getRequestCode(); + + return deserializeHttpProtocol(requestCode, header, body); + } else { + throw new ProtocolHandleException(String.format("protocol class: %s", protocol.getClass())); + } + } + + private CloudEvent deserializeTcpProtocol(Header header, Object body) throws ProtocolHandleException { + return TcpMessageProtocolResolver.buildEvent(header, body); + } + + private CloudEvent deserializeHttpProtocol(String requestCode, org.apache.eventmesh.common.protocol.http.header.Header header, Body body) throws ProtocolHandleException { + + if (String.valueOf(RequestCode.MSG_BATCH_SEND.getRequestCode()).equals(requestCode)) { + return SendMessageBatchProtocolResolver.buildEvent(header, body); + } else if (String.valueOf(RequestCode.MSG_BATCH_SEND_V2.getRequestCode()).equals(requestCode)) { + return SendMessageBatchV2ProtocolResolver.buildEvent(header, body); + } else if (String.valueOf(RequestCode.MSG_SEND_SYNC.getRequestCode()).equals(requestCode)) { + return SendMessageRequestProtocolResolver.buildEvent(header, body); + } else if (String.valueOf(RequestCode.MSG_SEND_ASYNC.getRequestCode()).equals(requestCode)) { + return SendMessageRequestProtocolResolver.buildEvent(header, body); + } else { + throw new ProtocolHandleException(String.format("unsupported requestCode: %s", requestCode)); + } + + } + + @Override + public List toBatchCloudEvent(T protocol) throws ProtocolHandleException { + return null; + } + + @Override + public Object fromCloudEvent(CloudEvent cloudEvent) throws ProtocolHandleException { + String protocolDesc = cloudEvent.getExtension(Constants.PROTOCOL_DESC).toString(); + + if (StringUtils.equals("http", protocolDesc)) { + return new String(cloudEvent.getData().toBytes(), StandardCharsets.UTF_8); + } else if (StringUtils.equals("tcp", protocolDesc)) { + return TcpMessageProtocolResolver.buildEventMeshMessage(cloudEvent); + } else { + throw new ProtocolHandleException(String.format("Unsupported protocolDesc: %s", protocolDesc)); + } + } + + @Override + public String getProtocolType() { + return EventMeshMessageProtocolConstant.PROTOCOL_NAME; + } +} diff --git a/eventmesh-protocol-plugin/eventmesh-protocol-eventmeshmessage/src/main/java/org/apache/eventmesh/protocol/eventmeshmessage/EventMeshMessageProtocolConstant.java b/eventmesh-protocol-plugin/eventmesh-protocol-eventmeshmessage/src/main/java/org/apache/eventmesh/protocol/eventmeshmessage/EventMeshMessageProtocolConstant.java new file mode 100644 index 0000000000..f1c744fe24 --- /dev/null +++ b/eventmesh-protocol-plugin/eventmesh-protocol-eventmeshmessage/src/main/java/org/apache/eventmesh/protocol/eventmeshmessage/EventMeshMessageProtocolConstant.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.protocol.eventmeshmessage; + +public enum EventMeshMessageProtocolConstant { + ; + public static final String PROTOCOL_NAME = "eventmeshmessage"; +} diff --git a/eventmesh-protocol-plugin/eventmesh-protocol-eventmeshmessage/src/main/java/org/apache/eventmesh/protocol/eventmeshmessage/resolver/http/SendMessageBatchProtocolResolver.java b/eventmesh-protocol-plugin/eventmesh-protocol-eventmeshmessage/src/main/java/org/apache/eventmesh/protocol/eventmeshmessage/resolver/http/SendMessageBatchProtocolResolver.java new file mode 100644 index 0000000000..9c8203837c --- /dev/null +++ b/eventmesh-protocol-plugin/eventmesh-protocol-eventmeshmessage/src/main/java/org/apache/eventmesh/protocol/eventmeshmessage/resolver/http/SendMessageBatchProtocolResolver.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.protocol.eventmeshmessage.resolver.http; + +import io.cloudevents.CloudEvent; +import org.apache.eventmesh.common.protocol.http.body.Body; +import org.apache.eventmesh.common.protocol.http.header.Header; + +public class SendMessageBatchProtocolResolver { + public static CloudEvent buildEvent(Header header, Body body) { + return null; + } +} diff --git a/eventmesh-protocol-plugin/eventmesh-protocol-eventmeshmessage/src/main/java/org/apache/eventmesh/protocol/eventmeshmessage/resolver/http/SendMessageBatchV2ProtocolResolver.java b/eventmesh-protocol-plugin/eventmesh-protocol-eventmeshmessage/src/main/java/org/apache/eventmesh/protocol/eventmeshmessage/resolver/http/SendMessageBatchV2ProtocolResolver.java new file mode 100644 index 0000000000..adc1e7e0b9 --- /dev/null +++ b/eventmesh-protocol-plugin/eventmesh-protocol-eventmeshmessage/src/main/java/org/apache/eventmesh/protocol/eventmeshmessage/resolver/http/SendMessageBatchV2ProtocolResolver.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.protocol.eventmeshmessage.resolver.http; + +import io.cloudevents.CloudEvent; +import io.cloudevents.SpecVersion; +import io.cloudevents.core.builder.CloudEventBuilder; +import io.cloudevents.core.v03.CloudEventV03; +import io.cloudevents.core.v1.CloudEventV1; +import org.apache.commons.lang3.StringUtils; +import org.apache.eventmesh.common.protocol.http.body.Body; +import org.apache.eventmesh.common.protocol.http.body.message.SendMessageBatchV2RequestBody; +import org.apache.eventmesh.common.protocol.http.common.ProtocolKey; +import org.apache.eventmesh.common.protocol.http.common.ProtocolVersion; +import org.apache.eventmesh.common.protocol.http.header.Header; +import org.apache.eventmesh.common.protocol.http.header.message.SendMessageBatchV2RequestHeader; +import org.apache.eventmesh.common.protocol.tcp.EventMeshMessage; +import org.apache.eventmesh.common.utils.JsonUtils; +import org.apache.eventmesh.protocol.api.exception.ProtocolHandleException; + +import java.nio.charset.StandardCharsets; + +public class SendMessageBatchV2ProtocolResolver { + public static CloudEvent buildEvent(Header header, Body body) throws ProtocolHandleException { + try { + SendMessageBatchV2RequestHeader sendMessageBatchV2RequestHeader = (SendMessageBatchV2RequestHeader) header; + SendMessageBatchV2RequestBody sendMessageBatchV2RequestBody = (SendMessageBatchV2RequestBody) body; + + String protocolType = sendMessageBatchV2RequestHeader.getProtocolType(); + String protocolDesc = sendMessageBatchV2RequestHeader.getProtocolDesc(); + String protocolVersion = sendMessageBatchV2RequestHeader.getProtocolVersion(); + + String code = sendMessageBatchV2RequestHeader.getCode(); + String env = sendMessageBatchV2RequestHeader.getEnv(); + String idc = sendMessageBatchV2RequestHeader.getIdc(); + String ip = sendMessageBatchV2RequestHeader.getIp(); + String pid = sendMessageBatchV2RequestHeader.getPid(); + String sys = sendMessageBatchV2RequestHeader.getSys(); + String username = sendMessageBatchV2RequestHeader.getUsername(); + String passwd = sendMessageBatchV2RequestHeader.getPasswd(); + ProtocolVersion version = sendMessageBatchV2RequestHeader.getVersion(); + String language = sendMessageBatchV2RequestHeader.getLanguage(); + + String content = sendMessageBatchV2RequestBody.getMsg(); + + CloudEvent event = null; + CloudEventBuilder cloudEventBuilder; + if (StringUtils.equals(SpecVersion.V1.toString(), protocolVersion)) { + cloudEventBuilder = CloudEventBuilder.v1(); + + event = cloudEventBuilder.withId(sendMessageBatchV2RequestBody.getBizSeqNo()) + .withSubject(sendMessageBatchV2RequestBody.getTopic()) + .withData(content.getBytes(StandardCharsets.UTF_8)) + .withExtension(ProtocolKey.REQUEST_CODE, code) + .withExtension(ProtocolKey.ClientInstanceKey.ENV, env) + .withExtension(ProtocolKey.ClientInstanceKey.IDC, idc) + .withExtension(ProtocolKey.ClientInstanceKey.IP, ip) + .withExtension(ProtocolKey.ClientInstanceKey.PID, pid) + .withExtension(ProtocolKey.ClientInstanceKey.SYS, sys) + .withExtension(ProtocolKey.ClientInstanceKey.USERNAME, username) + .withExtension(ProtocolKey.ClientInstanceKey.PASSWD, passwd) + .withExtension(ProtocolKey.VERSION, version.getVersion()) + .withExtension(ProtocolKey.LANGUAGE, language) + .withExtension(ProtocolKey.PROTOCOL_TYPE, protocolType) + .withExtension(ProtocolKey.PROTOCOL_DESC, protocolDesc) + .withExtension(ProtocolKey.PROTOCOL_VERSION, protocolVersion) + .withExtension(SendMessageBatchV2RequestBody.BIZSEQNO, sendMessageBatchV2RequestBody.getBizSeqNo()) + .withExtension(SendMessageBatchV2RequestBody.PRODUCERGROUP, sendMessageBatchV2RequestBody.getProducerGroup()) + .withExtension(SendMessageBatchV2RequestBody.TTL, sendMessageBatchV2RequestBody.getTtl()) + .withExtension(SendMessageBatchV2RequestBody.TAG, sendMessageBatchV2RequestBody.getTag()) + .build(); + } else if (StringUtils.equals(SpecVersion.V03.toString(), protocolVersion)) { + cloudEventBuilder = CloudEventBuilder.v03(); + event = cloudEventBuilder.withId(sendMessageBatchV2RequestBody.getBizSeqNo()) + .withSubject(sendMessageBatchV2RequestBody.getTopic()) + .withData(content.getBytes(StandardCharsets.UTF_8)) + .withExtension(ProtocolKey.REQUEST_CODE, code) + .withExtension(ProtocolKey.ClientInstanceKey.ENV, env) + .withExtension(ProtocolKey.ClientInstanceKey.IDC, idc) + .withExtension(ProtocolKey.ClientInstanceKey.IP, ip) + .withExtension(ProtocolKey.ClientInstanceKey.PID, pid) + .withExtension(ProtocolKey.ClientInstanceKey.SYS, sys) + .withExtension(ProtocolKey.ClientInstanceKey.USERNAME, username) + .withExtension(ProtocolKey.ClientInstanceKey.PASSWD, passwd) + .withExtension(ProtocolKey.VERSION, version.getVersion()) + .withExtension(ProtocolKey.LANGUAGE, language) + .withExtension(ProtocolKey.PROTOCOL_TYPE, protocolType) + .withExtension(ProtocolKey.PROTOCOL_DESC, protocolDesc) + .withExtension(ProtocolKey.PROTOCOL_VERSION, protocolVersion) + .withExtension(SendMessageBatchV2RequestBody.BIZSEQNO, sendMessageBatchV2RequestBody.getBizSeqNo()) + .withExtension(SendMessageBatchV2RequestBody.PRODUCERGROUP, sendMessageBatchV2RequestBody.getProducerGroup()) + .withExtension(SendMessageBatchV2RequestBody.TTL, sendMessageBatchV2RequestBody.getTtl()) + .withExtension(SendMessageBatchV2RequestBody.TAG, sendMessageBatchV2RequestBody.getTag()) + .build(); + } + return event; + } catch (Exception e) { + throw new ProtocolHandleException(e.getMessage(), e.getCause()); + } + } +} diff --git a/eventmesh-protocol-plugin/eventmesh-protocol-eventmeshmessage/src/main/java/org/apache/eventmesh/protocol/eventmeshmessage/resolver/http/SendMessageRequestProtocolResolver.java b/eventmesh-protocol-plugin/eventmesh-protocol-eventmeshmessage/src/main/java/org/apache/eventmesh/protocol/eventmeshmessage/resolver/http/SendMessageRequestProtocolResolver.java new file mode 100644 index 0000000000..40e2e9f5cb --- /dev/null +++ b/eventmesh-protocol-plugin/eventmesh-protocol-eventmeshmessage/src/main/java/org/apache/eventmesh/protocol/eventmeshmessage/resolver/http/SendMessageRequestProtocolResolver.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.protocol.eventmeshmessage.resolver.http; + +import io.cloudevents.CloudEvent; +import io.cloudevents.SpecVersion; +import io.cloudevents.core.builder.CloudEventBuilder; +import io.cloudevents.core.v03.CloudEventV03; +import io.cloudevents.core.v1.CloudEventV1; +import org.apache.commons.lang3.StringUtils; +import org.apache.eventmesh.common.protocol.http.body.Body; +import org.apache.eventmesh.common.protocol.http.body.message.SendMessageBatchV2RequestBody; +import org.apache.eventmesh.common.protocol.http.body.message.SendMessageRequestBody; +import org.apache.eventmesh.common.protocol.http.common.ProtocolKey; +import org.apache.eventmesh.common.protocol.http.common.ProtocolVersion; +import org.apache.eventmesh.common.protocol.http.header.Header; +import org.apache.eventmesh.common.protocol.http.header.message.SendMessageRequestHeader; +import org.apache.eventmesh.common.utils.JsonUtils; +import org.apache.eventmesh.protocol.api.exception.ProtocolHandleException; + +import java.nio.charset.StandardCharsets; + +public class SendMessageRequestProtocolResolver { + + public static CloudEvent buildEvent(Header header, Body body) throws ProtocolHandleException { + try { + SendMessageRequestHeader sendMessageRequestHeader = (SendMessageRequestHeader) header; + SendMessageRequestBody sendMessageRequestBody = (SendMessageRequestBody) body; + + String protocolType = sendMessageRequestHeader.getProtocolType(); + String protocolDesc = sendMessageRequestHeader.getProtocolDesc(); + String protocolVersion = sendMessageRequestHeader.getProtocolVersion(); + + String code = sendMessageRequestHeader.getCode(); + String env = sendMessageRequestHeader.getEnv(); + String idc = sendMessageRequestHeader.getIdc(); + String ip = sendMessageRequestHeader.getIp(); + String pid = sendMessageRequestHeader.getPid(); + String sys = sendMessageRequestHeader.getSys(); + String username = sendMessageRequestHeader.getUsername(); + String passwd = sendMessageRequestHeader.getPasswd(); + ProtocolVersion version = sendMessageRequestHeader.getVersion(); + String language = sendMessageRequestHeader.getLanguage(); + + String content = sendMessageRequestBody.getContent(); + + CloudEvent event = null; + CloudEventBuilder cloudEventBuilder; + if (StringUtils.equals(SpecVersion.V1.toString(), protocolVersion)) { + cloudEventBuilder = CloudEventBuilder.v1(); + + event = cloudEventBuilder.withId(sendMessageRequestBody.getBizSeqNo()) + .withSubject(sendMessageRequestBody.getTopic()) + .withData(content.getBytes(StandardCharsets.UTF_8)) + .withExtension(ProtocolKey.REQUEST_CODE, code) + .withExtension(ProtocolKey.ClientInstanceKey.ENV, env) + .withExtension(ProtocolKey.ClientInstanceKey.IDC, idc) + .withExtension(ProtocolKey.ClientInstanceKey.IP, ip) + .withExtension(ProtocolKey.ClientInstanceKey.PID, pid) + .withExtension(ProtocolKey.ClientInstanceKey.SYS, sys) + .withExtension(ProtocolKey.ClientInstanceKey.USERNAME, username) + .withExtension(ProtocolKey.ClientInstanceKey.PASSWD, passwd) + .withExtension(ProtocolKey.VERSION, version.getVersion()) + .withExtension(ProtocolKey.LANGUAGE, language) + .withExtension(ProtocolKey.PROTOCOL_TYPE, protocolType) + .withExtension(ProtocolKey.PROTOCOL_DESC, protocolDesc) + .withExtension(ProtocolKey.PROTOCOL_VERSION, protocolVersion) + .withExtension(SendMessageBatchV2RequestBody.BIZSEQNO, sendMessageRequestBody.getBizSeqNo()) + .withExtension(SendMessageBatchV2RequestBody.PRODUCERGROUP, sendMessageRequestBody.getProducerGroup()) + .withExtension(SendMessageBatchV2RequestBody.TTL, sendMessageRequestBody.getTtl()) + .withExtension(SendMessageBatchV2RequestBody.TAG, sendMessageRequestBody.getTag()) + .build(); + } else if (StringUtils.equals(SpecVersion.V03.toString(), protocolVersion)) { + cloudEventBuilder = CloudEventBuilder.v03(); + event = cloudEventBuilder.withId(sendMessageRequestBody.getBizSeqNo()) + .withSubject(sendMessageRequestBody.getTopic()) + .withData(content.getBytes(StandardCharsets.UTF_8)) + .withExtension(ProtocolKey.REQUEST_CODE, code) + .withExtension(ProtocolKey.ClientInstanceKey.ENV, env) + .withExtension(ProtocolKey.ClientInstanceKey.IDC, idc) + .withExtension(ProtocolKey.ClientInstanceKey.IP, ip) + .withExtension(ProtocolKey.ClientInstanceKey.PID, pid) + .withExtension(ProtocolKey.ClientInstanceKey.SYS, sys) + .withExtension(ProtocolKey.ClientInstanceKey.USERNAME, username) + .withExtension(ProtocolKey.ClientInstanceKey.PASSWD, passwd) + .withExtension(ProtocolKey.VERSION, version.getVersion()) + .withExtension(ProtocolKey.LANGUAGE, language) + .withExtension(ProtocolKey.PROTOCOL_TYPE, protocolType) + .withExtension(ProtocolKey.PROTOCOL_DESC, protocolDesc) + .withExtension(ProtocolKey.PROTOCOL_VERSION, protocolVersion) + .withExtension(SendMessageBatchV2RequestBody.BIZSEQNO, sendMessageRequestBody.getBizSeqNo()) + .withExtension(SendMessageBatchV2RequestBody.PRODUCERGROUP, sendMessageRequestBody.getProducerGroup()) + .withExtension(SendMessageBatchV2RequestBody.TTL, sendMessageRequestBody.getTtl()) + .withExtension(SendMessageBatchV2RequestBody.TAG, sendMessageRequestBody.getTag()) + .build(); + } + return event; + } catch (Exception e) { + throw new ProtocolHandleException(e.getMessage(), e.getCause()); + } + } +} diff --git a/eventmesh-protocol-plugin/eventmesh-protocol-eventmeshmessage/src/main/java/org/apache/eventmesh/protocol/eventmeshmessage/resolver/tcp/TcpMessageProtocolResolver.java b/eventmesh-protocol-plugin/eventmesh-protocol-eventmeshmessage/src/main/java/org/apache/eventmesh/protocol/eventmeshmessage/resolver/tcp/TcpMessageProtocolResolver.java new file mode 100644 index 0000000000..38faf38718 --- /dev/null +++ b/eventmesh-protocol-plugin/eventmesh-protocol-eventmeshmessage/src/main/java/org/apache/eventmesh/protocol/eventmeshmessage/resolver/tcp/TcpMessageProtocolResolver.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.protocol.eventmeshmessage.resolver.tcp; + +import io.cloudevents.CloudEvent; +import io.cloudevents.SpecVersion; +import io.cloudevents.core.builder.CloudEventBuilder; +import org.apache.commons.lang3.StringUtils; +import org.apache.eventmesh.common.Constants; +import org.apache.eventmesh.common.protocol.tcp.EventMeshMessage; +import org.apache.eventmesh.common.protocol.tcp.Header; +import org.apache.eventmesh.common.protocol.tcp.Package; +import org.apache.eventmesh.protocol.api.exception.ProtocolHandleException; +import org.apache.eventmesh.protocol.eventmeshmessage.EventMeshMessageProtocolConstant; + +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.Map; + +public class TcpMessageProtocolResolver { + + + public static CloudEvent buildEvent(Header header, Object body) throws ProtocolHandleException { + + CloudEventBuilder cloudEventBuilder; + + String protocolType = header.getProperty(Constants.PROTOCOL_TYPE).toString(); + String protocolVersion = header.getProperty(Constants.PROTOCOL_VERSION).toString(); + String protocolDesc = header.getProperty(Constants.PROTOCOL_DESC).toString(); + + if (StringUtils.isBlank(protocolType) + || StringUtils.isBlank(protocolVersion) + || StringUtils.isBlank(protocolDesc)) { + throw new ProtocolHandleException(String.format("invalid protocol params protocolType %s|protocolVersion %s|protocolDesc %s", + protocolType, protocolVersion, protocolDesc)); + } + + if (!StringUtils.equals(EventMeshMessageProtocolConstant.PROTOCOL_NAME, protocolType)) { + throw new ProtocolHandleException(String.format("Unsupported protocolType: %s", protocolType)); + } + + EventMeshMessage message = (EventMeshMessage) body; + + String topic = message.getTopic(); + + String content = message.getBody(); + + if (StringUtils.equals(SpecVersion.V1.toString(), protocolVersion)) { + cloudEventBuilder = CloudEventBuilder.v1(); + + } else if (StringUtils.equals(SpecVersion.V03.toString(), protocolVersion)) { + cloudEventBuilder = CloudEventBuilder.v03(); + + } else { + throw new ProtocolHandleException(String.format("Unsupported protocolVersion: %s", protocolVersion)); + } + + cloudEventBuilder = cloudEventBuilder + .withId(header.getSeq()) + .withSubject(topic) + .withData(content.getBytes(StandardCharsets.UTF_8)); + + for (String propKey : header.getProperties().keySet()) { + cloudEventBuilder.withExtension(propKey, header.getProperty(propKey).toString()); + } + + for (String propKey : message.getProperties().keySet()) { + cloudEventBuilder.withExtension(propKey, message.getProperties().get(propKey)); + } + + return cloudEventBuilder.build(); + + } + + public static Package buildEventMeshMessage(CloudEvent cloudEvent) { + Package pkg = new Package(); + EventMeshMessage eventMeshMessage = new EventMeshMessage(); + eventMeshMessage.setTopic(cloudEvent.getSubject()); + eventMeshMessage.setBody(new String(cloudEvent.getData().toBytes(), StandardCharsets.UTF_8)); + + Map prop = new HashMap<>(); + for (String extKey : cloudEvent.getExtensionNames()) { + prop.put(extKey, cloudEvent.getExtension(extKey).toString()); + } + eventMeshMessage.setProperties(prop); + + pkg.setBody(eventMeshMessage); + + return pkg; + } +} diff --git a/eventmesh-protocol-plugin/eventmesh-protocol-eventmeshmessage/src/main/resources/META-INF.eventmesh/org.apache.eventmesh.protocol.api.ProtocolAdaptor b/eventmesh-protocol-plugin/eventmesh-protocol-eventmeshmessage/src/main/resources/META-INF.eventmesh/org.apache.eventmesh.protocol.api.ProtocolAdaptor new file mode 100644 index 0000000000..9be39ede56 --- /dev/null +++ b/eventmesh-protocol-plugin/eventmesh-protocol-eventmeshmessage/src/main/resources/META-INF.eventmesh/org.apache.eventmesh.protocol.api.ProtocolAdaptor @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +eventmeshmessage=org.apache.eventmesh.protocol.eventmeshmessage.EventMeshMessageProtocolAdaptor \ No newline at end of file diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/MessageTransferTask.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/MessageTransferTask.java index 6a1bd59d5e..176fe5db9c 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/MessageTransferTask.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/MessageTransferTask.java @@ -26,6 +26,7 @@ import org.apache.eventmesh.api.SendCallback; import org.apache.eventmesh.api.SendResult; import org.apache.eventmesh.api.exception.OnExceptionContext; +import org.apache.eventmesh.common.Constants; import org.apache.eventmesh.common.protocol.tcp.Package; import org.apache.eventmesh.common.protocol.tcp.*; import org.apache.eventmesh.protocol.api.ProtocolAdaptor; @@ -63,8 +64,8 @@ public void run() { Command cmd = pkg.getHeader().getCommand(); Command replyCmd = getReplyCmd(cmd); String protocolType = "EventMeshMessage"; - if (pkg.getHeader().getProperties() != null && pkg.getHeader().getProperty("message_protocol") != null) { - protocolType = (String) pkg.getHeader().getProperty("message_protocol"); + if (pkg.getHeader().getProperties() != null && pkg.getHeader().getProperty(Constants.PROTOCOL_TYPE) != null) { + protocolType = (String) pkg.getHeader().getProperty(Constants.PROTOCOL_TYPE); } ProtocolAdaptor protocolAdaptor = ProtocolPluginFactory.getProtocolAdaptor(protocolType); Package msg = new Package(); diff --git a/settings.gradle b/settings.gradle index a87e06858c..787313b998 100644 --- a/settings.gradle +++ b/settings.gradle @@ -37,4 +37,5 @@ include 'eventmesh-protocol-plugin' include 'eventmesh-protocol-plugin:eventmesh-protocol-api' include 'eventmesh-protocol-plugin:eventmesh-protocol-openmessage' include 'eventmesh-protocol-plugin:eventmesh-protocol-cloudevents' +include 'eventmesh-protocol-plugin:eventmesh-protocol-eventmeshmessage'