diff --git a/eventmesh-connectors/eventmesh-connector-openfunction/src/main/java/org/apache/eventmesh/connector/openfunction/source/connector/OpenFunctionSourceConnector.java b/eventmesh-connectors/eventmesh-connector-openfunction/src/main/java/org/apache/eventmesh/connector/openfunction/source/connector/OpenFunctionSourceConnector.java index 4e79a56d92..c259485758 100644 --- a/eventmesh-connectors/eventmesh-connector-openfunction/src/main/java/org/apache/eventmesh/connector/openfunction/source/connector/OpenFunctionSourceConnector.java +++ b/eventmesh-connectors/eventmesh-connector-openfunction/src/main/java/org/apache/eventmesh/connector/openfunction/source/connector/OpenFunctionSourceConnector.java @@ -26,12 +26,15 @@ import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; import lombok.extern.slf4j.Slf4j; @Slf4j public class OpenFunctionSourceConnector implements Source { + private static final int DEFAULT_BATCH_SIZE = 10; + private OpenFunctionSourceConfig sourceConfig; private BlockingQueue queue; @@ -74,10 +77,20 @@ public BlockingQueue queue() { @Override public List poll() { - List connectRecords = new ArrayList<>(); - ConnectRecord connectRecord = queue.poll(); - if (connectRecord != null) { - connectRecords.add(connectRecord); + + List connectRecords = new ArrayList<>(DEFAULT_BATCH_SIZE); + + for (int count = 0; count < DEFAULT_BATCH_SIZE; ++count) { + try { + ConnectRecord connectRecord = queue.poll(3, TimeUnit.SECONDS); + if (connectRecord == null) { + break; + } + connectRecords.add(connectRecord); + } catch (InterruptedException e) { + // nothing to do + break; + } } return connectRecords; } diff --git a/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/SourceWorker.java b/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/SourceWorker.java index 91e494a337..0f0c2f4784 100644 --- a/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/SourceWorker.java +++ b/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/SourceWorker.java @@ -29,6 +29,8 @@ import org.apache.eventmesh.openconnect.api.data.ConnectRecord; import org.apache.eventmesh.openconnect.api.source.Source; +import org.apache.commons.collections4.CollectionUtils; + import java.net.URI; import java.nio.charset.StandardCharsets; import java.util.List; @@ -120,7 +122,7 @@ public void startPoll() { try { connectRecord = queue.poll(5, TimeUnit.SECONDS); } catch (InterruptedException e) { - e.printStackTrace(); + Thread.currentThread().interrupt(); log.error("poll connect record error", e); } if (connectRecord == null) { @@ -136,8 +138,11 @@ private void startConnector() throws Exception { source.start(); while (isRunning) { List connectorRecordList = source.poll(); - for (ConnectRecord connectRecord : connectorRecordList) { - queue.put(connectRecord); + if (CollectionUtils.isEmpty(connectorRecordList)) { + continue; + } + for (ConnectRecord record : connectorRecordList) { + queue.put(record); } } } @@ -175,7 +180,6 @@ public void stop() { try { eventMeshTCPClient.close(); } catch (Exception e) { - e.printStackTrace(); log.error("event mesh client close error", e); } log.info("source worker stopped");