Skip to content

Commit

Permalink
fix cpu load problem
Browse files Browse the repository at this point in the history
  • Loading branch information
EricJoy2048 committed May 25, 2023
1 parent 7d61ae9 commit c4a57ec
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ public class SeaTunnelSourceCollector<T> implements Collector<T> {

private final Meter sourceReceivedQPS;

private volatile long rowCountThisPollNext;

public SeaTunnelSourceCollector(
Object checkpointLock,
List<OneInputFlowLifeCycle<Record<?>>> outputs,
Expand All @@ -54,6 +56,7 @@ public SeaTunnelSourceCollector(
public void collect(T row) {
try {
sendRecordToNext(new Record<>(row));
rowCountThisPollNext++;
sourceReceivedCount.inc();
sourceReceivedQPS.markEvent();
} catch (IOException e) {
Expand All @@ -66,6 +69,16 @@ public Object getCheckpointLock() {
return checkpointLock;
}

@Override
public long getRowCountThisPollNext() {
return this.rowCountThisPollNext;
}

@Override
public void resetRowCountThisPollNext() {
this.rowCountThisPollNext = 0;
}

public void sendRecordToNext(Record<?> record) throws IOException {
synchronized (checkpointLock) {
for (OneInputFlowLifeCycle<Record<?>> output : outputs) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,11 @@ public void close() throws IOException {
public void collect() throws Exception {
if (!prepareClose) {
reader.pollNext(collector);
if (collector.getRowCountThisPollNext() == 0) {
Thread.sleep(100);
} else {
collector.resetRowCountThisPollNext();
}
}
}

Expand Down

0 comments on commit c4a57ec

Please sign in to comment.