Skip to content

Commit

Permalink
Merge pull request #259 from WeBankFinTech/1.2.1
Browse files Browse the repository at this point in the history
[ISSUE #256]Upgrade Openmessaging-api to 2.2.1-pubsub version
close #256
  • Loading branch information
xwm1992 authored Apr 1, 2021
2 parents 16c8a1d + 636dcaa commit 56e8e2a
Show file tree
Hide file tree
Showing 60 changed files with 1,407 additions and 2,364 deletions.
2 changes: 2 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,8 @@ subprojects {
exclude 'eventmesh-connector-api*.jar'
exclude 'eventmesh-registry*.jar'
exclude 'eventmesh-starter*.jar'
exclude 'eventmesh-test*.jar'
exclude 'eventmesh-sdk*.jar'
}
copy {
into '../dist/lib'
Expand Down
2 changes: 1 addition & 1 deletion docs/cn/instructions/eventmesh-runtime-quickstart.zh-CN.md
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ docker run -d -p 10911:10911 -p 10909:10909 -v `pwd`/data/broker/logs:/root/logs

Windows

- Windows系统下运行示例可以参考[这里](https://github.com/WeBankFinTech/EventMesh/blob/develop/docs/en/instructions/eventmesh-sdk-java-quickstart.md)
- Windows系统下运行示例可以参考[这里](https://github.com/WeBankFinTech/EventMesh/blob/develop/docs/cn/instructions/eventmesh-sdk-java-quickstart.zh-CN.md)

Linux

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,4 +61,24 @@ public class Constants {

public static final String IDC_SEPERATER = "-";

public static final String PROPERTY_MESSAGE_TIMEOUT = "TIMEOUT";

public static final String PROPERTY_MESSAGE_SEARCH_KEYS = "SEARCH_KEYS";

public static final String PROPERTY_MESSAGE_QUEUE_ID = "QUEUE_ID";

public static final String PROPERTY_MESSAGE_QUEUE_OFFSET = "QUEUE_OFFSET";

public static final String PROPERTY_MESSAGE_DESTINATION = "DESTINATION";

public static final String PROPERTY_MESSAGE_MESSAGE_ID = "MESSAGE_ID";

public static final String PROPERTY_MESSAGE_BORN_HOST = "BORN_HOST";

public static final String PROPERTY_MESSAGE_BORN_TIMESTAMP = "BORN_TIMESTAMP";

public static final String PROPERTY_MESSAGE_STORE_HOST = "STORE_HOST";

public static final String PROPERTY_MESSAGE_STORE_TIMESTAMP = "STORE_TIMESTAMP";

}
2 changes: 1 addition & 1 deletion eventmesh-connector-api/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ repositories {
}

List open_message = [
"io.openmessaging:openmessaging-api:0.3.1-alpha"
"io.openmessaging:openmessaging-api:2.2.1-pubsub"
]

dependencies {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.webank.eventmesh.api;

import io.openmessaging.Message;
import io.openmessaging.api.Message;

public interface RRCallback {

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,32 +1,31 @@
package com.webank.eventmesh.api.consumer;

import com.webank.eventmesh.api.AbstractContext;
import io.openmessaging.KeyValue;
import io.openmessaging.consumer.MessageListener;
import io.openmessaging.Message;
import io.openmessaging.consumer.PushConsumer;
import io.openmessaging.api.AsyncMessageListener;
import io.openmessaging.api.MessageListener;
import io.openmessaging.api.Message;
import io.openmessaging.api.Consumer;

import java.util.List;
import java.util.Properties;

public interface MeshMQPushConsumer extends PushConsumer {
public interface MeshMQPushConsumer extends Consumer {

void init(KeyValue keyValue) throws Exception;
void init(Properties keyValue) throws Exception;

void start() throws Exception;
@Override
void start();

// void updateOffset(List<MessageExt> msgs, ConsumeConcurrentlyContext context);

void updateOffset(List<Message> msgs, AbstractContext context);

// void registerMessageListener(MessageListenerConcurrently messageListenerConcurrently);

void subscribe(String topic, final MessageListener listener) throws Exception;
void subscribe(String topic, final AsyncMessageListener listener) throws Exception;

void unsubscribe(String topic) throws Exception;

boolean isPause();

void pause();
@Override
void unsubscribe(String topic);

AbstractContext getContext();
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,18 @@
package com.webank.eventmesh.api.producer;

import com.webank.eventmesh.api.RRCallback;
import com.webank.eventmesh.api.SendCallback;
import io.openmessaging.KeyValue;
import io.openmessaging.Message;
import io.openmessaging.producer.Producer;
import io.openmessaging.api.Message;
import io.openmessaging.api.Producer;
import io.openmessaging.api.SendCallback;

import java.util.Properties;

public interface MeshMQProducer extends Producer {

void init(KeyValue keyValue) throws Exception;
void init(Properties properties) throws Exception;

void start() throws Exception;
@Override
void start();

void send(Message message, SendCallback sendCallback) throws Exception;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,88 +17,75 @@
package com.webank.eventmesh.connector.rocketmq;

import com.webank.eventmesh.connector.rocketmq.producer.ProducerImpl;
import com.webank.eventmesh.connector.rocketmq.consumer.PullConsumerImpl;
import com.webank.eventmesh.connector.rocketmq.consumer.PushConsumerImpl;
import com.webank.eventmesh.connector.rocketmq.utils.OMSUtil;
import io.openmessaging.KeyValue;
import io.openmessaging.MessagingAccessPoint;
import io.openmessaging.ResourceManager;
import io.openmessaging.consumer.PullConsumer;
import io.openmessaging.consumer.PushConsumer;
import io.openmessaging.consumer.StreamingConsumer;
import io.openmessaging.exception.OMSNotSupportedException;
import io.openmessaging.producer.Producer;
import io.openmessaging.api.Consumer;
import io.openmessaging.api.MessagingAccessPoint;
import io.openmessaging.api.Producer;
import io.openmessaging.api.PullConsumer;
import io.openmessaging.api.batch.BatchConsumer;
import io.openmessaging.api.order.OrderConsumer;
import io.openmessaging.api.order.OrderProducer;
import io.openmessaging.api.transaction.LocalTransactionChecker;
import io.openmessaging.api.transaction.TransactionProducer;

import java.util.Properties;

public class MessagingAccessPointImpl implements MessagingAccessPoint {

private KeyValue accessPointProperties;
private Properties accessPointProperties;

public MessagingAccessPointImpl(final KeyValue accessPointProperties) {
public MessagingAccessPointImpl(final Properties accessPointProperties) {
this.accessPointProperties = accessPointProperties;
}

@Override
public KeyValue attributes() {
return accessPointProperties;
public String version() {
return null;
}

@Override
public String implVersion() {
return "0.3.0";
public Properties attributes() {
return accessPointProperties;
}

@Override
public Producer createProducer() {
public Producer createProducer(Properties properties) {
return new ProducerImpl(this.accessPointProperties);
}

@Override
public Producer createProducer(KeyValue properties) {
return new ProducerImpl(OMSUtil.buildKeyValue(this.accessPointProperties, properties));
}

@Override
public PushConsumer createPushConsumer() {
return new PushConsumerImpl(accessPointProperties);
public OrderProducer createOrderProducer(Properties properties) {
return null;
}

@Override
public PushConsumer createPushConsumer(KeyValue properties) {
return new PushConsumerImpl(OMSUtil.buildKeyValue(this.accessPointProperties, properties));
public TransactionProducer createTransactionProducer(Properties properties, LocalTransactionChecker checker) {
return null;
}

@Override
public PullConsumer createPullConsumer() {
return new PullConsumerImpl(accessPointProperties);
public TransactionProducer createTransactionProducer(Properties properties) {
return null;
}

@Override
public PullConsumer createPullConsumer(KeyValue attributes) {
return new PullConsumerImpl(OMSUtil.buildKeyValue(this.accessPointProperties, attributes));
public Consumer createConsumer(Properties properties) {
return new PushConsumerImpl(properties);
}

@Override
public StreamingConsumer createStreamingConsumer() {
public PullConsumer createPullConsumer(Properties properties) {
return null;
}

@Override
public StreamingConsumer createStreamingConsumer(KeyValue attributes) {
public BatchConsumer createBatchConsumer(Properties properties) {
return null;
}

@Override
public ResourceManager resourceManager() {
throw new OMSNotSupportedException("-1", "ResourceManager is not supported in current version.");
}

@Override
public void startup() {
//Ignore
public OrderConsumer createOrderedConsumer(Properties properties) {
return null;
}

@Override
public void shutdown() {
//Ignore
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,21 @@ public class Constants {

public final static String PRODUCER_GROUP_NAME_PREFIX = "ProducerGroup-";

public static final String PROPERTY_MESSAGE_TTL = "TTL";
public static final String PROPERTY_MESSAGE_TIMEOUT = "TIMEOUT";

public static final String PROPERTY_MESSAGE_KEYS = "KEYS";

public static final String PROPERTY_MESSAGE_QUEUE_ID = "QUEUE_ID";

public static final String PROPERTY_MESSAGE_QUEUE_OFFSET = "QUEUE_OFFSET";

public static final String PROPERTY_MESSAGE_DESTINATION = "DESTINATION";

public static final String PROPERTY_MESSAGE_MESSAGE_ID = "MESSAGE_ID";

public static final String PROPERTY_MESSAGE_BORN_HOST = "BORN_HOST";

public static final String PROPERTY_MESSAGE_BORN_TIMESTAMP = "BORN_TIMESTAMP";

public static final String PROPERTY_MESSAGE_STORE_HOST = "STORE_HOST";
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package com.webank.eventmesh.connector.rocketmq.config;

import com.webank.eventmesh.connector.rocketmq.domain.NonStandardKeys;
import io.openmessaging.OMSBuiltinKeys;
import io.openmessaging.api.OMSBuiltinKeys;

public class ClientConfig implements OMSBuiltinKeys, NonStandardKeys {
private String driverImpl;
Expand Down
Loading

0 comments on commit 56e8e2a

Please sign in to comment.