Skip to content

Commit

Permalink
Merge pull request apache#1168 from RSTdefg/dashboard-registry
Browse files Browse the repository at this point in the history
  • Loading branch information
qqeasonchen authored Aug 30, 2022
2 parents 86f963d + 370f9b6 commit 52e74fa
Show file tree
Hide file tree
Showing 54 changed files with 800 additions and 351 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,42 @@

import io.cloudevents.CloudEvent;

/**
* Admin API.
*/
@EventMeshSPI(isSingleton = false, eventMeshExtensionType = EventMeshExtensionType.CONNECTOR)
public interface Admin extends LifeCycle {
/**
* Initializes admin api service.
*/
void init(Properties keyValue) throws Exception;

/**
* Get the list of topics.
*
* @return List of topics.
*/
List<TopicProperties> getTopic() throws Exception;

/**
* Create one topic.
*/
void createTopic(String topicName) throws Exception;

/**
* Delete one topic.
*/
void deleteTopic(String topicName) throws Exception;

/**
* Get the list of all events.
*
* @return List of events.
*/
List<CloudEvent> getEvent(String topicName, int offset, int length) throws Exception;

/**
* Publish an event.
*/
void publish(CloudEvent cloudEvent) throws Exception;
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ public class TopicProperties {

@JsonCreator
public TopicProperties(
@JsonProperty("name") String name,
@JsonProperty("messageCount") long messageCount
@JsonProperty("name") String name,
@JsonProperty("messageCount") long messageCount
) {
super();
this.name = name;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,19 @@ configurations {
}

List rocketmq = [
"org.apache.rocketmq:rocketmq-client:$rocketmq_version",
"org.apache.rocketmq:rocketmq-broker:$rocketmq_version",
"org.apache.rocketmq:rocketmq-common:$rocketmq_version",
"org.apache.rocketmq:rocketmq-store:$rocketmq_version",
"org.apache.rocketmq:rocketmq-namesrv:$rocketmq_version",
"org.apache.rocketmq:rocketmq-tools:$rocketmq_version",
"org.apache.rocketmq:rocketmq-remoting:$rocketmq_version",
"org.apache.rocketmq:rocketmq-logging:$rocketmq_version",
"org.apache.rocketmq:rocketmq-test:$rocketmq_version",
"org.apache.rocketmq:rocketmq-srvutil:$rocketmq_version",
"org.apache.rocketmq:rocketmq-filter:$rocketmq_version",
"org.apache.rocketmq:rocketmq-acl:$rocketmq_version",
"org.apache.rocketmq:rocketmq-srvutil:$rocketmq_version",
"org.apache.rocketmq:rocketmq-client:$rocketmq_version",
"org.apache.rocketmq:rocketmq-broker:$rocketmq_version",
"org.apache.rocketmq:rocketmq-common:$rocketmq_version",
"org.apache.rocketmq:rocketmq-store:$rocketmq_version",
"org.apache.rocketmq:rocketmq-namesrv:$rocketmq_version",
"org.apache.rocketmq:rocketmq-tools:$rocketmq_version",
"org.apache.rocketmq:rocketmq-remoting:$rocketmq_version",
"org.apache.rocketmq:rocketmq-logging:$rocketmq_version",
"org.apache.rocketmq:rocketmq-test:$rocketmq_version",
"org.apache.rocketmq:rocketmq-srvutil:$rocketmq_version",
"org.apache.rocketmq:rocketmq-filter:$rocketmq_version",
"org.apache.rocketmq:rocketmq-acl:$rocketmq_version",
"org.apache.rocketmq:rocketmq-srvutil:$rocketmq_version",

]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#

rocketmq_version=4.9.3

pluginType=connector
pluginName=rocketmq
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ public List<TopicProperties> getTopic() throws Exception {
messageCount += topicOffset.getMaxOffset() - topicOffset.getMinOffset();
}
result.add(new TopicProperties(
topic, messageCount
topic, messageCount
));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,13 @@ public static MessageReader createReader(final Message message) throws CloudEven

public static MessageReader createReader(final Map<String, String> props,
@Nullable final byte[] body)
throws CloudEventRWException {
throws CloudEventRWException {

return MessageUtils.parseStructuredOrBinaryMessage(
() -> null,
format -> null,
() -> props.get(RocketMQHeaders.SPEC_VERSION),
sv -> new RocketMQBinaryMessageReader(sv, props, body)
() -> null,
format -> null,
() -> props.get(RocketMQHeaders.SPEC_VERSION),
sv -> new RocketMQBinaryMessageReader(sv, props, body)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,14 @@
import io.cloudevents.core.message.impl.BaseGenericBinaryMessageReaderImpl;

public class RocketMQBinaryMessageReader
extends BaseGenericBinaryMessageReaderImpl<String, String> {
extends BaseGenericBinaryMessageReaderImpl<String, String> {

private final Map<String, String> headers;

public RocketMQBinaryMessageReader(SpecVersion version, Map<String, String> headers,
byte[] payload) {
super(version,
payload != null && payload.length > 0 ? BytesCloudEventData.wrap(payload) : null);
payload != null && payload.length > 0 ? BytesCloudEventData.wrap(payload) : null);

Objects.requireNonNull(headers);
this.headers = headers;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@ public class RocketMQHeaders {
public static final String CE_PREFIX = "CE_";

protected static final Map<String, String> ATTRIBUTES_TO_HEADERS =
MessageUtils.generateAttributesToHeadersMapping(v -> v);
MessageUtils.generateAttributesToHeadersMapping(v -> v);

public static final String CONTENT_TYPE =
ATTRIBUTES_TO_HEADERS.get(CloudEventV1.DATACONTENTTYPE);
ATTRIBUTES_TO_HEADERS.get(CloudEventV1.DATACONTENTTYPE);

public static final String SPEC_VERSION = ATTRIBUTES_TO_HEADERS.get(CloudEventV1.SPECVERSION);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@


public final class RocketMQMessageWriter<R>
implements MessageWriter<CloudEventWriter<Message>, Message>, CloudEventWriter<Message> {
implements MessageWriter<CloudEventWriter<Message>, Message>, CloudEventWriter<Message> {

private Message message;

Expand Down Expand Up @@ -66,7 +66,7 @@ public RocketMQMessageWriter(String topic, String keys, String tags) {

@Override
public CloudEventContextWriter withContextAttribute(String name, String value)
throws CloudEventRWException {
throws CloudEventRWException {
message.putUserProperty(name, value);
return this;
}
Expand All @@ -79,7 +79,7 @@ public RocketMQMessageWriter<R> create(final SpecVersion version) {

@Override
public Message setEvent(final EventFormat format, final byte[] value)
throws CloudEventRWException {
throws CloudEventRWException {
message.putUserProperty(RocketMQHeaders.CONTENT_TYPE, format.serializedContentType());
message.setBody(value);
return message;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,86 +54,86 @@ public void init() {

String namesrvAddrStr = ConfigurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_ROCKETMQ_NAMESRV_ADDR);
Preconditions.checkState(StringUtils.isNotEmpty(namesrvAddrStr),
String.format("%s error", ConfKeys.KEYS_EVENTMESH_ROCKETMQ_NAMESRV_ADDR));
String.format("%s error", ConfKeys.KEYS_EVENTMESH_ROCKETMQ_NAMESRV_ADDR));
namesrvAddr = StringUtils.trim(namesrvAddrStr);

String consumeThreadPoolMinStr =
ConfigurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_ROCKETMQ_CONSUME_THREADPOOL_MIN);
ConfigurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_ROCKETMQ_CONSUME_THREADPOOL_MIN);
if (StringUtils.isNotEmpty(consumeThreadPoolMinStr)) {
Preconditions.checkState(StringUtils.isNumeric(consumeThreadPoolMinStr),
String.format("%s error", ConfKeys.KEYS_EVENTMESH_ROCKETMQ_CONSUME_THREADPOOL_MIN));
String.format("%s error", ConfKeys.KEYS_EVENTMESH_ROCKETMQ_CONSUME_THREADPOOL_MIN));
consumeThreadMin = Integer.valueOf(consumeThreadPoolMinStr);
}

String consumeThreadPoolMaxStr =
ConfigurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_ROCKETMQ_CONSUME_THREADPOOL_MAX);
ConfigurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_ROCKETMQ_CONSUME_THREADPOOL_MAX);
if (StringUtils.isNotEmpty(consumeThreadPoolMaxStr)) {
Preconditions.checkState(StringUtils.isNumeric(consumeThreadPoolMaxStr),
String.format("%s error", ConfKeys.KEYS_EVENTMESH_ROCKETMQ_CONSUME_THREADPOOL_MAX));
String.format("%s error", ConfKeys.KEYS_EVENTMESH_ROCKETMQ_CONSUME_THREADPOOL_MAX));
consumeThreadMax = Integer.valueOf(consumeThreadPoolMaxStr);
}

String consumerThreadPoolQueueSizeStr =
ConfigurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_ROCKETMQ_CONSUME_THREADPOOL_QUEUESIZE);
ConfigurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_ROCKETMQ_CONSUME_THREADPOOL_QUEUESIZE);
if (StringUtils.isNotEmpty(consumerThreadPoolQueueSizeStr)) {
Preconditions.checkState(StringUtils.isNumeric(consumerThreadPoolQueueSizeStr),
String.format("%s error", ConfKeys.KEYS_EVENTMESH_ROCKETMQ_CONSUME_THREADPOOL_QUEUESIZE));
String.format("%s error", ConfKeys.KEYS_EVENTMESH_ROCKETMQ_CONSUME_THREADPOOL_QUEUESIZE));
consumeQueueSize = Integer.valueOf(consumerThreadPoolQueueSizeStr);
}

String clientAckWindowStr = ConfigurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_ROCKETMQ_CLIENT_ACK_WINDOW);
if (StringUtils.isNotEmpty(clientAckWindowStr)) {
Preconditions.checkState(StringUtils.isNumeric(clientAckWindowStr),
String.format("%s error", ConfKeys.KEYS_EVENTMESH_ROCKETMQ_CLIENT_ACK_WINDOW));
String.format("%s error", ConfKeys.KEYS_EVENTMESH_ROCKETMQ_CLIENT_ACK_WINDOW));
ackWindow = Integer.valueOf(clientAckWindowStr);
}

String clientPubWindowStr = ConfigurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_ROCKETMQ_CLIENT_PUB_WINDOW);
if (StringUtils.isNotEmpty(clientPubWindowStr)) {
Preconditions.checkState(StringUtils.isNumeric(clientPubWindowStr),
String.format("%s error", ConfKeys.KEYS_EVENTMESH_ROCKETMQ_CLIENT_PUB_WINDOW));
String.format("%s error", ConfKeys.KEYS_EVENTMESH_ROCKETMQ_CLIENT_PUB_WINDOW));
pubWindow = Integer.valueOf(clientPubWindowStr);
}

String consumeTimeoutStr =
ConfigurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_ROCKETMQ_CLIENT_CONSUME_TIMEOUT);
ConfigurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_ROCKETMQ_CLIENT_CONSUME_TIMEOUT);
if (StringUtils.isNotBlank(consumeTimeoutStr)) {
Preconditions.checkState(StringUtils.isNumeric(consumeTimeoutStr),
String.format("%s error", ConfKeys.KEYS_EVENTMESH_ROCKETMQ_CLIENT_CONSUME_TIMEOUT));
String.format("%s error", ConfKeys.KEYS_EVENTMESH_ROCKETMQ_CLIENT_CONSUME_TIMEOUT));
consumeTimeout = Long.parseLong(consumeTimeoutStr);
}

String clientPullBatchSizeStr =
ConfigurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_ROCKETMQ_CLIENT_PULL_BATCHSIZE);
ConfigurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_ROCKETMQ_CLIENT_PULL_BATCHSIZE);
if (StringUtils.isNotEmpty(clientPullBatchSizeStr)) {
Preconditions.checkState(StringUtils.isNumeric(clientPullBatchSizeStr),
String.format("%s error", ConfKeys.KEYS_EVENTMESH_ROCKETMQ_CLIENT_PULL_BATCHSIZE));
String.format("%s error", ConfKeys.KEYS_EVENTMESH_ROCKETMQ_CLIENT_PULL_BATCHSIZE));
pullBatchSize = Integer.valueOf(clientPullBatchSizeStr);
}

String clientPollNamesrvIntervalStr =
ConfigurationWrapper.getProp(
ConfKeys.KEYS_EVENTMESH_ROCKETMQ_CLIENT_POLL_NAMESRV_INTERVAL);
ConfigurationWrapper.getProp(
ConfKeys.KEYS_EVENTMESH_ROCKETMQ_CLIENT_POLL_NAMESRV_INTERVAL);
if (StringUtils.isNotEmpty(clientPollNamesrvIntervalStr)) {
Preconditions.checkState(StringUtils.isNumeric(clientPollNamesrvIntervalStr),
String.format("%s error", ConfKeys.KEYS_EVENTMESH_ROCKETMQ_CLIENT_POLL_NAMESRV_INTERVAL));
String.format("%s error", ConfKeys.KEYS_EVENTMESH_ROCKETMQ_CLIENT_POLL_NAMESRV_INTERVAL));
pollNameServerInterval = Integer.valueOf(clientPollNamesrvIntervalStr);
}

String clientHeartbeatBrokerIntervalStr =
ConfigurationWrapper.getProp(
ConfKeys.KEYS_EVENTMESH_ROCKETMQ_CLIENT_HEARTBEAT_BROKER_INTERVAL);
ConfigurationWrapper.getProp(
ConfKeys.KEYS_EVENTMESH_ROCKETMQ_CLIENT_HEARTBEAT_BROKER_INTERVAL);
if (StringUtils.isNotEmpty(clientHeartbeatBrokerIntervalStr)) {
Preconditions.checkState(StringUtils.isNumeric(clientHeartbeatBrokerIntervalStr),
String.format("%s error", ConfKeys.KEYS_EVENTMESH_ROCKETMQ_CLIENT_HEARTBEAT_BROKER_INTERVAL));
String.format("%s error", ConfKeys.KEYS_EVENTMESH_ROCKETMQ_CLIENT_HEARTBEAT_BROKER_INTERVAL));
heartbeatBrokerInterval = Integer.valueOf(clientHeartbeatBrokerIntervalStr);
}

String clientRebalanceIntervalIntervalStr =
ConfigurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_ROCKETMQ_CLIENT_REBALANCE_INTERVAL);
ConfigurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_ROCKETMQ_CLIENT_REBALANCE_INTERVAL);
if (StringUtils.isNotEmpty(clientRebalanceIntervalIntervalStr)) {
Preconditions.checkState(StringUtils.isNumeric(clientRebalanceIntervalIntervalStr),
String.format("%s error", ConfKeys.KEYS_EVENTMESH_ROCKETMQ_CLIENT_REBALANCE_INTERVAL));
String.format("%s error", ConfKeys.KEYS_EVENTMESH_ROCKETMQ_CLIENT_REBALANCE_INTERVAL));
rebalanceInterval = Integer.valueOf(clientRebalanceIntervalIntervalStr);
}

Expand Down Expand Up @@ -162,40 +162,40 @@ static class ConfKeys {
public static final String KEYS_EVENTMESH_ROCKETMQ_PASSWORD = "eventMesh.server.rocketmq.password";

public static final String KEYS_EVENTMESH_ROCKETMQ_CONSUME_THREADPOOL_MIN =
"eventMesh.server.rocketmq.client.consumeThreadMin";
"eventMesh.server.rocketmq.client.consumeThreadMin";

public static final String KEYS_EVENTMESH_ROCKETMQ_CONSUME_THREADPOOL_MAX =
"eventMesh.server.rocketmq.client.consumeThreadMax";
"eventMesh.server.rocketmq.client.consumeThreadMax";

public static final String KEYS_EVENTMESH_ROCKETMQ_CONSUME_THREADPOOL_QUEUESIZE =
"eventMesh.server.rocketmq.client.consumeThreadPoolQueueSize";
"eventMesh.server.rocketmq.client.consumeThreadPoolQueueSize";

public static final String KEYS_EVENTMESH_ROCKETMQ_CLIENT_ACK_WINDOW = "eventMesh.server.rocketmq.client.ackwindow";

public static final String KEYS_EVENTMESH_ROCKETMQ_CLIENT_PUB_WINDOW = "eventMesh.server.rocketmq.client.pubwindow";

public static final String KEYS_EVENTMESH_ROCKETMQ_CLIENT_CONSUME_TIMEOUT =
"eventMesh.server.rocketmq.client.comsumeTimeoutInMin";
"eventMesh.server.rocketmq.client.comsumeTimeoutInMin";

public static final String KEYS_EVENTMESH_ROCKETMQ_CLIENT_PULL_BATCHSIZE =
"eventMesh.server.rocketmq.client.pullBatchSize";
"eventMesh.server.rocketmq.client.pullBatchSize";

public static final String KEYS_EVENTMESH_ROCKETMQ_CLIENT_POLL_NAMESRV_INTERVAL =
"eventMesh.server.rocketmq.client.pollNameServerInterval";
"eventMesh.server.rocketmq.client.pollNameServerInterval";

public static final String KEYS_EVENTMESH_ROCKETMQ_CLIENT_HEARTBEAT_BROKER_INTERVAL =
"eventMesh.server.rocketmq.client.heartbeatBrokerInterval";
"eventMesh.server.rocketmq.client.heartbeatBrokerInterval";

public static final String KEYS_EVENTMESH_ROCKETMQ_CLIENT_REBALANCE_INTERVAL =
"eventMesh.server.rocketmq.client.rebalanceInterval";
"eventMesh.server.rocketmq.client.rebalanceInterval";

public static final String KEYS_EVENTMESH_ROCKETMQ_CLUSTER = "eventMesh.server.rocketmq.cluster";

public static final String KEYS_EVENTMESH_ROCKETMQ_ACCESS_KEY =
"eventMesh.server.rocketmq.accessKey";
"eventMesh.server.rocketmq.accessKey";

public static final String KEYS_EVENTMESH_ROCKETMQ_SECRET_KEY =
"eventMesh.server.rocketmq.secretKey";
"eventMesh.server.rocketmq.secretKey";

}
}
Loading

0 comments on commit 52e74fa

Please sign in to comment.