Skip to content

Commit

Permalink
Add protocol producer in java sdk (#600)
Browse files Browse the repository at this point in the history
* Add protocol producer in sdk

* Refactor callback handler

* Add tcp protocol client
  • Loading branch information
ruanwenjun authored Nov 21, 2021
1 parent 902c4a8 commit 367c6a1
Show file tree
Hide file tree
Showing 112 changed files with 2,491 additions and 2,192 deletions.
6 changes: 3 additions & 3 deletions docs/cn/features/https.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@ config env varible

```
//创建producer
LiteClientConfig liteClientConfig = new liteClientConfig();
LiteClientConfig eventMeshHttpClientConfig = new eventMeshHttpClientConfig();
...
//设置开启TLS
liteClientConfig.setUseTls(true);
LiteProducer producer = new LiteProducer(liteClientConfig);
eventMeshHttpClientConfig.setUseTls(true);
LiteProducer producer = new LiteProducer(eventMeshHttpClientConfig);
//配置环境变量
Expand Down
6 changes: 3 additions & 3 deletions docs/en/features/https.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@ config env varible

```
// create producer
LiteClientConfig liteClientConfig = new liteClientConfig();
LiteClientConfig eventMeshHttpClientConfig = new eventMeshHttpClientConfig();
...
// enable TLS
liteClientConfig.setUseTls(true);
LiteProducer producer = new LiteProducer(liteClientConfig);
eventMeshHttpClientConfig.setUseTls(true);
LiteProducer producer = new LiteProducer(eventMeshHttpClientConfig);
config env varible
Expand Down
27 changes: 4 additions & 23 deletions eventmesh-common/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -44,32 +44,13 @@ dependencies {

implementation "com.github.stefanbirkner:system-rules"

testImplementation "org.apache.commons:commons-lang3"
testImplementation "org.apache.commons:commons-collections4"
testImplementation "commons-io:commons-io"
testImplementation "org.apache.commons:commons-text"
compileOnly 'org.projectlombok:lombok:1.18.22'
annotationProcessor 'org.projectlombok:lombok:1.18.22'

testImplementation "com.google.guava:guava"

testImplementation "org.slf4j:slf4j-api"
testImplementation "org.apache.logging.log4j:log4j-api"
testImplementation "org.apache.logging.log4j:log4j-core"
testImplementation "org.apache.logging.log4j:log4j-core"
testImplementation "org.apache.logging.log4j:log4j-slf4j-impl"
testImplementation "org.apache.logging.log4j:log4j-web"

testImplementation "com.lmax:disruptor"

testImplementation "com.fasterxml.jackson.core:jackson-databind"
testImplementation "com.fasterxml.jackson.core:jackson-core"
testImplementation "com.fasterxml.jackson.core:jackson-annotations"

testImplementation "org.apache.httpcomponents:httpclient"

testImplementation "io.netty:netty-all"
testCompileOnly 'org.projectlombok:lombok:1.18.22'
testAnnotationProcessor 'org.projectlombok:lombok:1.18.22'

testImplementation "junit:junit"
testImplementation "com.github.stefanbirkner:system-rules"
testImplementation "org.assertj:assertj-core"

testImplementation "org.mockito:mockito-core"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.common;

import java.util.HashMap;
import java.util.Map;

import lombok.Builder;
import lombok.Data;

/**
* EventMesh message.
*/
@Builder
@Data
public class EventMeshMessage {

private String bizSeqNo;

private String uniqueId;

private String topic;

private String content;

private Map<String, String> prop;

@Builder.Default
private final long createTime = System.currentTimeMillis();

public EventMeshMessage addProp(String key, String val) {
if (prop == null) {
prop = new HashMap<>();
}
prop.put(key, val);
return this;
}

public String getProp(String key) {
if (prop == null) {
return null;
}
return prop.get(key);
}

public EventMeshMessage removePropIfPresent(String key) {
if (prop == null) {

return this;
}
prop.remove(key);
return this;
}

}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -26,37 +26,32 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import com.google.common.util.concurrent.ThreadFactoryBuilder;

public class ThreadPoolFactory {

public static ThreadPoolExecutor createThreadPoolExecutor(int core, int max, final String threadName) {
return createThreadPoolExecutor(core, max, threadName, true);
}

public static ThreadPoolExecutor createThreadPoolExecutor(int core, int max, final String threadName, final boolean isDaemon) {
public static ThreadPoolExecutor createThreadPoolExecutor(int core, int max, final String threadName,
final boolean isDaemon) {
return createThreadPoolExecutor(core, max, new LinkedBlockingQueue<Runnable>(1000), threadName, isDaemon);
}

public static ThreadPoolExecutor createThreadPoolExecutor(int core, int max, BlockingQueue<Runnable> blockingQueue, final String threadName, final boolean isDaemon) {
return new ThreadPoolExecutor(core, max,
10 * 1000, TimeUnit.MILLISECONDS, blockingQueue, new ThreadFactory() {

private AtomicInteger seq = new AtomicInteger(0);

@Override
public Thread newThread(Runnable r) {
seq.incrementAndGet();
Thread t = new Thread(r, threadName + seq.get());
t.setDaemon(isDaemon);
return t;
}
});
public static ThreadPoolExecutor createThreadPoolExecutor(int core, int max, BlockingQueue<Runnable> blockingQueue,
final String threadName, final boolean isDaemon) {
return new ThreadPoolExecutor(core, max, 10 * 1000, TimeUnit.MILLISECONDS, blockingQueue,
new ThreadFactoryBuilder().setNameFormat(threadName).setDaemon(isDaemon).build()
);
}

public static ThreadPoolExecutor createThreadPoolExecutor(int core, int max, ThreadFactory threadFactory) {
return createThreadPoolExecutor(core, max, new LinkedBlockingQueue<Runnable>(1000), threadFactory);
}

public static ThreadPoolExecutor createThreadPoolExecutor(int core, int max, BlockingQueue<Runnable> blockingQueue, ThreadFactory threadFactory) {
public static ThreadPoolExecutor createThreadPoolExecutor(int core, int max, BlockingQueue<Runnable> blockingQueue,
ThreadFactory threadFactory) {
return new ThreadPoolExecutor(core, max, 10 * 1000, TimeUnit.MILLISECONDS, blockingQueue, threadFactory);
}

Expand All @@ -77,7 +72,8 @@ public static ScheduledExecutorService createScheduledExecutor(int core, final S
return createScheduledExecutor(core, threadName, true);
}

public static ScheduledExecutorService createScheduledExecutor(int core, final String threadName, final boolean isDaemon) {
public static ScheduledExecutorService createScheduledExecutor(int core, final String threadName,
final boolean isDaemon) {
return Executors.newScheduledThreadPool(core, new ThreadFactory() {
private AtomicInteger ai = new AtomicInteger(0);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import com.google.common.base.Preconditions;

import org.apache.commons.lang3.StringUtils;
import org.apache.eventmesh.common.IPUtil;
import org.apache.eventmesh.common.utils.IPUtils;

public class CommonConfiguration {
public String eventMeshEnv = "P";
Expand Down Expand Up @@ -75,7 +75,7 @@ public void init() {

eventMeshServerIp = configurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_SERVER_HOST_IP);
if (StringUtils.isBlank(eventMeshServerIp)) {
eventMeshServerIp = IPUtil.getLocalAddress();
eventMeshServerIp = IPUtils.getLocalAddress();
}

eventMeshConnectorPluginType = configurationWrapper.getProp(ConfKeys.KEYS_ENENTMESH_CONNECTOR_PLUGIN_TYPE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,10 @@
* limitations under the License.
*/

package org.apache.eventmesh.common;
package org.apache.eventmesh.common.exception;

public class EventMeshException extends Exception {

public EventMeshException() {
}

public EventMeshException(String message) {
super(message);
}
Expand All @@ -34,8 +31,7 @@ public EventMeshException(Throwable cause) {
super(cause);
}

public EventMeshException(String message, Throwable cause, boolean enableSuppression,
boolean writableStackTrace) {
public EventMeshException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
super(message, cause, enableSuppression, writableStackTrace);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.RandomUtils;
import org.apache.eventmesh.common.EventMeshException;
import org.apache.eventmesh.common.exception.EventMeshException;

import java.util.List;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package org.apache.eventmesh.common;
package org.apache.eventmesh.common.protocol;

import org.apache.eventmesh.common.protocol.http.HttpCommand;

import java.io.Serializable;

/**
* <ul>
* <li>Tcp transport object{@link org.apache.eventmesh.common.protocol.tcp.Package}</li>
* <li>Http transport object{@link org.apache.eventmesh.common.command.HttpCommand}</li>
* <li>Http transport object{@link HttpCommand}</li>
* </ul>
*/
public interface ProtocolTransportObject extends Serializable {
Expand Down
Loading

0 comments on commit 367c6a1

Please sign in to comment.