Skip to content

Commit

Permalink
Merge 140bebd into 991cbda
Browse files Browse the repository at this point in the history
  • Loading branch information
mxsm authored Jun 25, 2023
2 parents 991cbda + 140bebd commit cf4334d
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,15 @@
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

import lombok.extern.slf4j.Slf4j;

@Slf4j
public class OpenFunctionSourceConnector implements Source {

private static final int DEFAULT_BATCH_SIZE = 10;

private OpenFunctionSourceConfig sourceConfig;

private BlockingQueue<ConnectRecord> queue;
Expand Down Expand Up @@ -74,10 +77,20 @@ public BlockingQueue<ConnectRecord> queue() {

@Override
public List<ConnectRecord> poll() {
List<ConnectRecord> connectRecords = new ArrayList<>();
ConnectRecord connectRecord = queue.poll();
if (connectRecord != null) {
connectRecords.add(connectRecord);

List<ConnectRecord> connectRecords = new ArrayList<>(DEFAULT_BATCH_SIZE);

for (int count = 0; count < DEFAULT_BATCH_SIZE; ++count) {
try {
ConnectRecord connectRecord = queue.poll(3, TimeUnit.SECONDS);
if (connectRecord == null) {
break;
}
connectRecords.add(connectRecord);
} catch (InterruptedException e) {
// nothing to do
break;
}
}
return connectRecords;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import org.apache.eventmesh.openconnect.api.data.ConnectRecord;
import org.apache.eventmesh.openconnect.api.source.Source;

import org.apache.commons.collections4.CollectionUtils;

import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.List;
Expand Down Expand Up @@ -120,7 +122,7 @@ public void startPoll() {
try {
connectRecord = queue.poll(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
Thread.currentThread().interrupt();
log.error("poll connect record error", e);
}
if (connectRecord == null) {
Expand All @@ -136,8 +138,11 @@ private void startConnector() throws Exception {
source.start();
while (isRunning) {
List<ConnectRecord> connectorRecordList = source.poll();
for (ConnectRecord connectRecord : connectorRecordList) {
queue.put(connectRecord);
if (CollectionUtils.isEmpty(connectorRecordList)) {
continue;
}
for (ConnectRecord record : connectorRecordList) {
queue.put(record);
}
}
}
Expand Down Expand Up @@ -175,7 +180,6 @@ public void stop() {
try {
eventMeshTCPClient.close();
} catch (Exception e) {
e.printStackTrace();
log.error("event mesh client close error", e);
}
log.info("source worker stopped");
Expand Down

0 comments on commit cf4334d

Please sign in to comment.