From fb84d90429cced2220fabf2f89cc7adacf365642 Mon Sep 17 00:00:00 2001 From: xwm1992 Date: Thu, 28 Oct 2021 15:33:34 +0800 Subject: [PATCH 1/7] [Feature #564] Support CloudEvents protocols for pub/sub in EventMesh-feature design --- .../eventmesh-cloudevents-sdk-binding.md | 80 +++++++++++++++---- 1 file changed, 66 insertions(+), 14 deletions(-) diff --git a/docs/en/features/eventmesh-cloudevents-sdk-binding.md b/docs/en/features/eventmesh-cloudevents-sdk-binding.md index 3c1224f413..af1b25835d 100644 --- a/docs/en/features/eventmesh-cloudevents-sdk-binding.md +++ b/docs/en/features/eventmesh-cloudevents-sdk-binding.md @@ -1,30 +1,25 @@ -# Lightweight EventMesh SDK (CloudEvents) +# EventMesh pluggable protocols (CloudEvents) ## Introduction -[EventMesh(incubating)](https://github.com/apache/incubator-eventmesh) is a dynamic -cloud-native eventing infrastructure. +[EventMesh(incubating)](https://github.com/apache/incubator-eventmesh) is a dynamic cloud-native eventing infrastructure. -[CloudEvents](https://github.com/cloudevents/spec) is a specification for describing -event data in common formats to provide interoperability across services, platforms and systems. +[CloudEvents](https://github.com/cloudevents/spec) is a specification for describing event data in common formats to provide interoperability across services, platforms and systems. -As of May 2021, EventMesh contains the following -major components: `eventmesh-runtime`, `eventmesh-sdk-java` and `eventmesh-connector-rocketmq`. +As of May 2021, EventMesh contains the following major components: `eventmesh-runtime`, `eventmesh-sdk-java` and `eventmesh-connector-rocketmq`. For a customer to use EventMesh, `eventmesh-runtime` can be deployed as microservices to transmit customer's events between event producers and consumers. Customer's applications can then interact with `eventmesh-runtime` using `eventmesh-sdk-java` to publish/subscribe for events on given topics. CloudEvents support has been a highly desired feature by EventMesh users. There are many reasons for users to prefer using a SDK with CloudEvents support: + - CloudEvents is a more widely accepted and supported way to describe events. `eventmesh-sdk-java` currently uses the `LiteMessage` structure to describe events, which is less standardized. - CloudEvents's Java SDK has a wider range of distribution methods. For example, EventMesh users currently need to use the SDK tarball or build from source for every EventMesh release. With - CloudEvents support, it's easier for users to take a dependency on EventMesh's SDK using CloudEvents's - public distributions (e.g. through a Maven configuration). -- CloudEvents's SDK supports multiple languages. Although EventMesh currently only supports a Java SDK, - in future if more languages need to be supported, the extensions can be easier with experience on - binding Java SDK with CloudEvents. + CloudEvents support, it's easier for users to take a dependency on EventMesh's SDK using CloudEvents's public distributions (e.g. through a Maven configuration). +- CloudEvents's SDK supports multiple languages. Although EventMesh currently only supports a Java SDK, in future if more languages need to be supported, the extensions can be easier with experience on binding Java SDK with CloudEvents. ## Requirements @@ -35,6 +30,8 @@ for users to prefer using a SDK with CloudEvents support: | F-1 | EventMesh users should be able to depend on a public SDK to publish/subscribe events in CloudEvents format | Functionality | | F-2 | EventMesh users should continue to have access to existing EventMesh client features (e.g. load balancing) with an SDK that supports CloudEvent | Feature Parity | | F-3 | EventMesh developers should be able to sync `eventmesh-sdk-java` and an SDK with CloudEvents support without much effort/pain | Maintainability | +| F-4 | EventMesh support pluggable protocols for developers integrate other protocols (e.g. CloudEvents\EventMesh Message\OpenMessage\MQTT ...) | Functionality | +| F-5 | EventMesh support the unified api for publish/subscribe events to/from event store | Functionality | ### Performance Requirements @@ -47,9 +44,64 @@ for users to prefer using a SDK with CloudEvents support: Binding with the CloudEvents Java SDK (similar to what Kafka already did, see Reference for more details) should be an easy way to achieve the requirements. -Design details TBD. +### Pluggable Protocols + +![pluggable-protocols](../../images/features/pluggable-protocols.png) + +### Process of CloudEvents under EventMesh + +#### For TCP + +##### SDK side for publish + +- add the CloudEvents identifier in `package` header +- use `CloudEventBuilder` build the CloudEvent and put it into the `package` body + +##### SDK side for subscribe + +- add `convert` function under the `ReceiveMsgHook` interface, for converting the `package` body to the specific protocol with the identifier in `package` header +- different protocols should implement the `ReceiveMsgHook` interface + +##### Server side for publish + +- design the protocol convert api contains `decodeMessage` interface which convert the package's body to CloudEvent +- update `Session.upstreamMsg()` in `MessageTransferTask` change the input parameter Message to CloudEvent, the CloudEvent use the last step `decodeMessage` api convert +- update `SessionSender.send()` change the input parameter `Message` to `CloudEvent` +- update `MeshMQProducer` api support send `CloudEvents` in runtime +- support the implementation in `connector-plugin` for send `CloudEvents` to EventStore + +##### Server side for subscribe + +- support change the `RocketMessage` to `CloudEvent` in connector-plugin + +- overwrite the `AsyncMessageListener.consume()` function, change the input parameter `Message` to `CloudEvent` + +- update the `MeshMQPushConsumer.updateOffset()` implementation change the the input parameter `Message` to `CloudEvent` + +- update `DownStreamMsgContext` , change the input parameter `Message` to `CloudEvent`, update the `DownStreamMsgContext.ackMsg` + +#### For HTTP + +##### SDK side for publish + +- support `LiteProducer.publish(cloudEvent)` +- add the CloudEvents identifier in http request header + +##### SDK side for subscribe + +##### Server side for publish + +- support build the `HttpCommand.body` by pluggable protocol plugins according the protocol type in `HttpCommand` header +- support publish the CloudEvent in message processors + +##### Server side for subscribe + +- update the `EventMeshConsumer.subscribe()` + +- update `HandleMsgContext` , change the input parameter `Message` to `CloudEvent` +- update `AsyncHttpPushRequest.tryHTTPRequest()` ## Appendix ### References -- https://cloudevents.github.io/sdk-java/kafka +- https://cloudevents.github.io/sdk-java/kafka \ No newline at end of file From 4e55912570cbe148fae17f2c684bbfbaa9001d28 Mon Sep 17 00:00:00 2001 From: xwm1992 Date: Mon, 1 Nov 2021 16:05:43 +0800 Subject: [PATCH 2/7] support cloudevents api in eventmesh-connector-api module --- .../eventmesh-connector-api/build.gradle | 1 + .../eventmesh/api/AsyncConsumeContext.java | 24 ++++++++ .../apache/eventmesh/api/EventListener.java | 26 +++++++++ .../apache/eventmesh/api/SendCallback.java | 27 +++++++++ .../org/apache/eventmesh/api/SendResult.java | 44 +++++++++++++++ .../eventmesh/api/consumer/EMConsumer.java | 35 ++++++++++++ .../exception/ConnectorRuntimeException.java | 37 ++++++++++++ .../api/exception/OnExceptionContext.java | 56 +++++++++++++++++++ .../eventmesh/api/producer/EMProducer.java | 48 ++++++++++++++++ 9 files changed, 298 insertions(+) create mode 100644 eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/AsyncConsumeContext.java create mode 100644 eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/EventListener.java create mode 100644 eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/SendCallback.java create mode 100644 eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/SendResult.java create mode 100644 eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/consumer/EMConsumer.java create mode 100644 eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/exception/ConnectorRuntimeException.java create mode 100644 eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/exception/OnExceptionContext.java create mode 100644 eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/producer/EMProducer.java diff --git a/eventmesh-connector-plugin/eventmesh-connector-api/build.gradle b/eventmesh-connector-plugin/eventmesh-connector-api/build.gradle index 19cb54c18b..a7211893e1 100644 --- a/eventmesh-connector-plugin/eventmesh-connector-api/build.gradle +++ b/eventmesh-connector-plugin/eventmesh-connector-api/build.gradle @@ -18,6 +18,7 @@ dependencies { implementation project(":eventmesh-spi") implementation project(":eventmesh-common") + api 'io.cloudevents:cloudevents-core' api 'io.openmessaging:openmessaging-api' api 'io.dropwizard.metrics:metrics-core' api "io.dropwizard.metrics:metrics-healthchecks" diff --git a/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/AsyncConsumeContext.java b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/AsyncConsumeContext.java new file mode 100644 index 0000000000..3a3e99e4c7 --- /dev/null +++ b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/AsyncConsumeContext.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eventmesh.api; + + +public abstract class AsyncConsumeContext { + + public abstract void commit(EventMeshAction action); + +} diff --git a/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/EventListener.java b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/EventListener.java new file mode 100644 index 0000000000..6e76d6de17 --- /dev/null +++ b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/EventListener.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eventmesh.api; + + +import io.cloudevents.CloudEvent; + +public interface EventListener { + + void consume(final CloudEvent cloudEvent, final AsyncConsumeContext context); + +} diff --git a/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/SendCallback.java b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/SendCallback.java new file mode 100644 index 0000000000..0fd1582868 --- /dev/null +++ b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/SendCallback.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eventmesh.api; + + +import org.apache.eventmesh.api.exception.OnExceptionContext; + +public interface SendCallback { + + void onSuccess(final SendResult sendResult); + + void onException(final OnExceptionContext context); +} diff --git a/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/SendResult.java b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/SendResult.java new file mode 100644 index 0000000000..216257d31a --- /dev/null +++ b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/SendResult.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eventmesh.api; + +public class SendResult { + private String messageId; + + private String topic; + + public String getMessageId() { + return messageId; + } + + public void setMessageId(String messageId) { + this.messageId = messageId; + } + + public String getTopic() { + return topic; + } + + public void setTopic(String topic) { + this.topic = topic; + } + + @Override + public String toString() { + return "SendResult[topic=" + topic + ", messageId=" + messageId + ']'; + } +} diff --git a/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/consumer/EMConsumer.java b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/consumer/EMConsumer.java new file mode 100644 index 0000000000..d8ff411ca8 --- /dev/null +++ b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/consumer/EMConsumer.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eventmesh.api.consumer; + +import io.cloudevents.CloudEvent; +import org.apache.eventmesh.api.AbstractContext; +import org.apache.eventmesh.api.EventListener; + +import java.util.List; +import java.util.Properties; + +public interface EMConsumer { + + void init(Properties keyValue) throws Exception; + + void updateOffset(List cloudEvents, AbstractContext context); + + void subscribe(String topic, final EventListener listener) throws Exception; + + void unsubscribe(String topic); +} diff --git a/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/exception/ConnectorRuntimeException.java b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/exception/ConnectorRuntimeException.java new file mode 100644 index 0000000000..e1c8ab5739 --- /dev/null +++ b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/exception/ConnectorRuntimeException.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eventmesh.api.exception; + +public class ConnectorRuntimeException extends RuntimeException{ + + public ConnectorRuntimeException(){ + + } + + public ConnectorRuntimeException(String message){ + super(message); + } + + public ConnectorRuntimeException(Throwable throwable){ + super(throwable); + } + + public ConnectorRuntimeException(String message, Throwable throwable){ + super(message, throwable); + } + +} diff --git a/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/exception/OnExceptionContext.java b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/exception/OnExceptionContext.java new file mode 100644 index 0000000000..2da46a5c1a --- /dev/null +++ b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/exception/OnExceptionContext.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eventmesh.api.exception; + + +public class OnExceptionContext { + + private String messageId; + + private String topic; + + /** + * Detailed exception stack information. + */ + private ConnectorRuntimeException exception; + + + public String getMessageId() { + return messageId; + } + + public void setMessageId(String messageId) { + this.messageId = messageId; + } + + public String getTopic() { + return topic; + } + + public void setTopic(String topic) { + this.topic = topic; + } + + + public ConnectorRuntimeException getException() { + return exception; + } + + public void setException(ConnectorRuntimeException exception) { + this.exception = exception; + } +} diff --git a/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/producer/EMProducer.java b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/producer/EMProducer.java new file mode 100644 index 0000000000..40435bab8c --- /dev/null +++ b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/producer/EMProducer.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eventmesh.api.producer; + +import io.cloudevents.CloudEvent; +import org.apache.eventmesh.api.RRCallback; +import org.apache.eventmesh.api.SendCallback; +import org.apache.eventmesh.api.SendResult; +import org.apache.eventmesh.spi.EventMeshExtensionType; +import org.apache.eventmesh.spi.EventMeshSPI; + +import java.util.Properties; + +@EventMeshSPI(isSingleton = false, eventMeshExtensionType = EventMeshExtensionType.CONNECTOR) +public interface EMProducer { + + void init(Properties properties) throws Exception; + + SendResult publish(final CloudEvent cloudEvent); + + void publish(CloudEvent cloudEvent, SendCallback sendCallback) throws Exception; + + void sendOneway(final CloudEvent cloudEvent); + + void sendAsync(final CloudEvent cloudEvent, final SendCallback sendCallback); + + void request(CloudEvent cloudEvent, RRCallback rrCallback, long timeout) throws Exception; + + boolean reply(final CloudEvent cloudEvent, final SendCallback sendCallback) throws Exception; + + void checkTopicExist(String topic) throws Exception; + + void setExtFields(); +} From 670c2f21774d8e5acc9b3641e297a64130f103ba Mon Sep 17 00:00:00 2001 From: xwm1992 Date: Mon, 1 Nov 2021 16:44:00 +0800 Subject: [PATCH 3/7] fix checkStyle --- .../org/apache/eventmesh/api/AsyncConsumeContext.java | 1 + .../java/org/apache/eventmesh/api/EventListener.java | 11 +++++++++++ .../org/apache/eventmesh/api/EventMeshAction.java | 1 + .../eventmesh/api/EventMeshAsyncConsumeContext.java | 1 + .../java/org/apache/eventmesh/api/SendCallback.java | 1 + .../java/org/apache/eventmesh/api/SendResult.java | 1 + .../org/apache/eventmesh/api/consumer/EMConsumer.java | 7 +++++++ .../api/exception/ConnectorRuntimeException.java | 1 + .../eventmesh/api/exception/OnExceptionContext.java | 1 + .../org/apache/eventmesh/api/producer/EMProducer.java | 5 +++++ 10 files changed, 30 insertions(+) diff --git a/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/AsyncConsumeContext.java b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/AsyncConsumeContext.java index 3a3e99e4c7..7c1c739cd5 100644 --- a/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/AsyncConsumeContext.java +++ b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/AsyncConsumeContext.java @@ -14,6 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.eventmesh.api; diff --git a/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/EventListener.java b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/EventListener.java index 6e76d6de17..16bf231028 100644 --- a/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/EventListener.java +++ b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/EventListener.java @@ -14,11 +14,22 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.eventmesh.api; import io.cloudevents.CloudEvent; +/** + * Event listener, registered for consume messages by consumer. + * + *

+ * + * Thread safe requirements: this interface will be invoked by multi threads, so users should keep thread safe during + * the consume process. + * + *

+ */ public interface EventListener { void consume(final CloudEvent cloudEvent, final AsyncConsumeContext context); diff --git a/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/EventMeshAction.java b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/EventMeshAction.java index 4fda6d05b6..f783201086 100644 --- a/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/EventMeshAction.java +++ b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/EventMeshAction.java @@ -14,6 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.eventmesh.api; public enum EventMeshAction { diff --git a/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/EventMeshAsyncConsumeContext.java b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/EventMeshAsyncConsumeContext.java index c7e4e7fb73..c123ca23d2 100644 --- a/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/EventMeshAsyncConsumeContext.java +++ b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/EventMeshAsyncConsumeContext.java @@ -14,6 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.eventmesh.api; import io.openmessaging.api.Action; diff --git a/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/SendCallback.java b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/SendCallback.java index 0fd1582868..6e763fd607 100644 --- a/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/SendCallback.java +++ b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/SendCallback.java @@ -14,6 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.eventmesh.api; diff --git a/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/SendResult.java b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/SendResult.java index 216257d31a..1a68132c6b 100644 --- a/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/SendResult.java +++ b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/SendResult.java @@ -14,6 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.eventmesh.api; public class SendResult { diff --git a/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/consumer/EMConsumer.java b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/consumer/EMConsumer.java index d8ff411ca8..cebd624658 100644 --- a/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/consumer/EMConsumer.java +++ b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/consumer/EMConsumer.java @@ -14,15 +14,22 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.eventmesh.api.consumer; import io.cloudevents.CloudEvent; import org.apache.eventmesh.api.AbstractContext; import org.apache.eventmesh.api.EventListener; +import org.apache.eventmesh.spi.EventMeshExtensionType; +import org.apache.eventmesh.spi.EventMeshSPI; import java.util.List; import java.util.Properties; +/** + * Consumer Interface + */ +@EventMeshSPI(isSingleton = false, eventMeshExtensionType = EventMeshExtensionType.CONNECTOR) public interface EMConsumer { void init(Properties keyValue) throws Exception; diff --git a/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/exception/ConnectorRuntimeException.java b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/exception/ConnectorRuntimeException.java index e1c8ab5739..52b351557c 100644 --- a/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/exception/ConnectorRuntimeException.java +++ b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/exception/ConnectorRuntimeException.java @@ -14,6 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.eventmesh.api.exception; public class ConnectorRuntimeException extends RuntimeException{ diff --git a/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/exception/OnExceptionContext.java b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/exception/OnExceptionContext.java index 2da46a5c1a..de178e0c74 100644 --- a/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/exception/OnExceptionContext.java +++ b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/exception/OnExceptionContext.java @@ -14,6 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.eventmesh.api.exception; diff --git a/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/producer/EMProducer.java b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/producer/EMProducer.java index 40435bab8c..9d11ec18cf 100644 --- a/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/producer/EMProducer.java +++ b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/producer/EMProducer.java @@ -14,9 +14,11 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.eventmesh.api.producer; import io.cloudevents.CloudEvent; + import org.apache.eventmesh.api.RRCallback; import org.apache.eventmesh.api.SendCallback; import org.apache.eventmesh.api.SendResult; @@ -25,6 +27,9 @@ import java.util.Properties; +/** + * Producer Interface + */ @EventMeshSPI(isSingleton = false, eventMeshExtensionType = EventMeshExtensionType.CONNECTOR) public interface EMProducer { From 691bcb3882ac625b656b61205de1c39d03455864 Mon Sep 17 00:00:00 2001 From: xwm1992 Date: Mon, 1 Nov 2021 17:06:26 +0800 Subject: [PATCH 4/7] fix checkStyle --- .../java/org/apache/eventmesh/api/EventListener.java | 4 ++-- .../java/org/apache/eventmesh/api/SendCallback.java | 5 +++++ .../api/consumer/{EMConsumer.java => Consumer.java} | 5 +++-- .../api/exception/ConnectorRuntimeException.java | 10 +++++----- .../api/producer/{EMProducer.java => Producer.java} | 4 ++-- 5 files changed, 17 insertions(+), 11 deletions(-) rename eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/consumer/{EMConsumer.java => Consumer.java} (96%) rename eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/producer/{EMProducer.java => Producer.java} (97%) diff --git a/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/EventListener.java b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/EventListener.java index 16bf231028..eede41ca09 100644 --- a/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/EventListener.java +++ b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/EventListener.java @@ -25,8 +25,8 @@ * *

* - * Thread safe requirements: this interface will be invoked by multi threads, so users should keep thread safe during - * the consume process. + * Thread safe requirements: this interface will be invoked by multi threads, + * so users should keep thread safe during the consume process. * *

*/ diff --git a/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/SendCallback.java b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/SendCallback.java index 6e763fd607..68d1d0c65e 100644 --- a/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/SendCallback.java +++ b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/SendCallback.java @@ -18,8 +18,13 @@ package org.apache.eventmesh.api; +import io.cloudevents.CloudEvent; import org.apache.eventmesh.api.exception.OnExceptionContext; +import org.apache.eventmesh.api.producer.Producer; +/** + * Call back interface used in {@link Producer#sendAsync(CloudEvent, SendCallback)}. + */ public interface SendCallback { void onSuccess(final SendResult sendResult); diff --git a/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/consumer/EMConsumer.java b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/consumer/Consumer.java similarity index 96% rename from eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/consumer/EMConsumer.java rename to eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/consumer/Consumer.java index cebd624658..312ae92ec8 100644 --- a/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/consumer/EMConsumer.java +++ b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/consumer/Consumer.java @@ -18,6 +18,7 @@ package org.apache.eventmesh.api.consumer; import io.cloudevents.CloudEvent; + import org.apache.eventmesh.api.AbstractContext; import org.apache.eventmesh.api.EventListener; import org.apache.eventmesh.spi.EventMeshExtensionType; @@ -27,10 +28,10 @@ import java.util.Properties; /** - * Consumer Interface + * Consumer Interface. */ @EventMeshSPI(isSingleton = false, eventMeshExtensionType = EventMeshExtensionType.CONNECTOR) -public interface EMConsumer { +public interface Consumer { void init(Properties keyValue) throws Exception; diff --git a/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/exception/ConnectorRuntimeException.java b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/exception/ConnectorRuntimeException.java index 52b351557c..0efe513652 100644 --- a/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/exception/ConnectorRuntimeException.java +++ b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/exception/ConnectorRuntimeException.java @@ -17,21 +17,21 @@ package org.apache.eventmesh.api.exception; -public class ConnectorRuntimeException extends RuntimeException{ +public class ConnectorRuntimeException extends RuntimeException { - public ConnectorRuntimeException(){ + public ConnectorRuntimeException() { } - public ConnectorRuntimeException(String message){ + public ConnectorRuntimeException(String message) { super(message); } - public ConnectorRuntimeException(Throwable throwable){ + public ConnectorRuntimeException(Throwable throwable) { super(throwable); } - public ConnectorRuntimeException(String message, Throwable throwable){ + public ConnectorRuntimeException(String message, Throwable throwable) { super(message, throwable); } diff --git a/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/producer/EMProducer.java b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/producer/Producer.java similarity index 97% rename from eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/producer/EMProducer.java rename to eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/producer/Producer.java index 9d11ec18cf..b157b5a42b 100644 --- a/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/producer/EMProducer.java +++ b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/producer/Producer.java @@ -28,10 +28,10 @@ import java.util.Properties; /** - * Producer Interface + * Producer Interface. */ @EventMeshSPI(isSingleton = false, eventMeshExtensionType = EventMeshExtensionType.CONNECTOR) -public interface EMProducer { +public interface Producer { void init(Properties properties) throws Exception; From b6a46132e0c0921b4ee97dcd10e68f1c80ac83ff Mon Sep 17 00:00:00 2001 From: xwm1992 Date: Mon, 1 Nov 2021 19:39:35 +0800 Subject: [PATCH 5/7] fix checkStyle --- .../main/java/org/apache/eventmesh/api/SendCallback.java | 4 ++-- .../java/org/apache/eventmesh/api/consumer/Consumer.java | 6 ++++-- .../java/org/apache/eventmesh/api/producer/Producer.java | 4 ++-- 3 files changed, 8 insertions(+), 6 deletions(-) diff --git a/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/SendCallback.java b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/SendCallback.java index 68d1d0c65e..c955f5d3aa 100644 --- a/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/SendCallback.java +++ b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/SendCallback.java @@ -17,11 +17,11 @@ package org.apache.eventmesh.api; - -import io.cloudevents.CloudEvent; import org.apache.eventmesh.api.exception.OnExceptionContext; import org.apache.eventmesh.api.producer.Producer; +import io.cloudevents.CloudEvent; + /** * Call back interface used in {@link Producer#sendAsync(CloudEvent, SendCallback)}. */ diff --git a/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/consumer/Consumer.java b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/consumer/Consumer.java index 312ae92ec8..8738d9de20 100644 --- a/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/consumer/Consumer.java +++ b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/consumer/Consumer.java @@ -17,8 +17,6 @@ package org.apache.eventmesh.api.consumer; -import io.cloudevents.CloudEvent; - import org.apache.eventmesh.api.AbstractContext; import org.apache.eventmesh.api.EventListener; import org.apache.eventmesh.spi.EventMeshExtensionType; @@ -27,6 +25,10 @@ import java.util.List; import java.util.Properties; +import io.cloudevents.CloudEvent; + + + /** * Consumer Interface. */ diff --git a/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/producer/Producer.java b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/producer/Producer.java index b157b5a42b..60c8515a7f 100644 --- a/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/producer/Producer.java +++ b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/producer/Producer.java @@ -17,8 +17,6 @@ package org.apache.eventmesh.api.producer; -import io.cloudevents.CloudEvent; - import org.apache.eventmesh.api.RRCallback; import org.apache.eventmesh.api.SendCallback; import org.apache.eventmesh.api.SendResult; @@ -27,6 +25,8 @@ import java.util.Properties; +import io.cloudevents.CloudEvent; + /** * Producer Interface. */ From 0ac6d4fae60f61dd4eee4b3aab7f53ebfb22a787 Mon Sep 17 00:00:00 2001 From: xwm1992 Date: Mon, 1 Nov 2021 20:34:05 +0800 Subject: [PATCH 6/7] 1.support LifeCycle.java 2.update Consumer and Producer --- .../org/apache/eventmesh/api/LifeCycle.java | 36 +++++++++++++++++++ .../eventmesh/api/consumer/Consumer.java | 3 +- .../eventmesh/api/producer/Producer.java | 3 +- 3 files changed, 40 insertions(+), 2 deletions(-) create mode 100644 eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/LifeCycle.java diff --git a/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/LifeCycle.java b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/LifeCycle.java new file mode 100644 index 0000000000..f5d61de333 --- /dev/null +++ b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/LifeCycle.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.api; + +import org.apache.eventmesh.api.consumer.Consumer; +import org.apache.eventmesh.api.producer.Producer; + +/** + * The {@code LifeCycle} defines a lifecycle interface for a OMS related service endpoint, + * like {@link Producer}, {@link Consumer}, and so on. + */ +public interface LifeCycle { + + boolean isStarted(); + + boolean isClosed(); + + void start(); + + void shutdown(); +} diff --git a/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/consumer/Consumer.java b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/consumer/Consumer.java index 8738d9de20..a6fafe023b 100644 --- a/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/consumer/Consumer.java +++ b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/consumer/Consumer.java @@ -19,6 +19,7 @@ import org.apache.eventmesh.api.AbstractContext; import org.apache.eventmesh.api.EventListener; +import org.apache.eventmesh.api.LifeCycle; import org.apache.eventmesh.spi.EventMeshExtensionType; import org.apache.eventmesh.spi.EventMeshSPI; @@ -33,7 +34,7 @@ * Consumer Interface. */ @EventMeshSPI(isSingleton = false, eventMeshExtensionType = EventMeshExtensionType.CONNECTOR) -public interface Consumer { +public interface Consumer extends LifeCycle { void init(Properties keyValue) throws Exception; diff --git a/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/producer/Producer.java b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/producer/Producer.java index 60c8515a7f..08d8998972 100644 --- a/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/producer/Producer.java +++ b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/producer/Producer.java @@ -17,6 +17,7 @@ package org.apache.eventmesh.api.producer; +import org.apache.eventmesh.api.LifeCycle; import org.apache.eventmesh.api.RRCallback; import org.apache.eventmesh.api.SendCallback; import org.apache.eventmesh.api.SendResult; @@ -31,7 +32,7 @@ * Producer Interface. */ @EventMeshSPI(isSingleton = false, eventMeshExtensionType = EventMeshExtensionType.CONNECTOR) -public interface Producer { +public interface Producer extends LifeCycle { void init(Properties properties) throws Exception; From 2239a655ce1fe2d1fc374ef0bd2df81ec75bb6eb Mon Sep 17 00:00:00 2001 From: xwm1992 Date: Mon, 1 Nov 2021 21:23:18 +0800 Subject: [PATCH 7/7] fix remove the extra blank line --- .../org/apache/eventmesh/api/exception/OnExceptionContext.java | 1 - 1 file changed, 1 deletion(-) diff --git a/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/exception/OnExceptionContext.java b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/exception/OnExceptionContext.java index de178e0c74..1237e5fe2f 100644 --- a/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/exception/OnExceptionContext.java +++ b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/exception/OnExceptionContext.java @@ -17,7 +17,6 @@ package org.apache.eventmesh.api.exception; - public class OnExceptionContext { private String messageId;