Skip to content

Commit

Permalink
Fix incorrect Work.java cast and logging (#31528)
Browse files Browse the repository at this point in the history
  • Loading branch information
m-trieu authored Jun 11, 2024
1 parent 814cc8d commit 0d037a9
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
*/
@NotThreadSafe
@Internal
public class Work {
public final class Work {
private final ShardedKey shardedKey;
private final WorkItem workItem;
private final ProcessingContext processingContext;
Expand Down Expand Up @@ -196,8 +196,7 @@ public String getLatencyTrackingId() {
return latencyTrackingId;
}

public final void queueCommit(
WorkItemCommitRequest commitRequest, ComputationState computationState) {
public void queueCommit(WorkItemCommitRequest commitRequest, ComputationState computationState) {
setState(State.COMMIT_QUEUED);
processingContext.workCommitter().accept(Commit.create(commitRequest, computationState, this));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import javax.annotation.concurrent.GuardedBy;
import org.apache.beam.runners.dataflow.worker.streaming.Work;
import org.apache.beam.runners.dataflow.worker.streaming.ExecutableWork;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.Monitor;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.Monitor.Guard;

Expand Down Expand Up @@ -224,9 +225,10 @@ private void executeMonitorHeld(Runnable work, long workBytes) {
() -> {
String threadName = Thread.currentThread().getName();
try {
if (work instanceof Work) {
if (work instanceof ExecutableWork) {
String workToken =
String.format("%016x", ((Work) work).getWorkItem().getWorkToken());
debugFormattedWorkToken(
((ExecutableWork) work).work().getWorkItem().getWorkToken());
Thread.currentThread().setName(threadName + ":" + workToken);
}
work.run();
Expand All @@ -242,6 +244,11 @@ private void executeMonitorHeld(Runnable work, long workBytes) {
}
}

@VisibleForTesting
public static String debugFormattedWorkToken(long workToken) {
return String.format("%016x", workToken);
}

private void decrementCounters(long workBytes) {
monitor.enter();
--elementsOutstanding;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,17 @@
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;

import java.util.Collections;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.apache.beam.runners.dataflow.worker.streaming.ExecutableWork;
import org.apache.beam.runners.dataflow.worker.streaming.Watermarks;
import org.apache.beam.runners.dataflow.worker.streaming.Work;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.joda.time.Instant;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
Expand All @@ -39,13 +47,31 @@
// released (2.11.0)
@SuppressWarnings("unused")
public class BoundedQueueExecutorTest {
@Rule public transient Timeout globalTimeout = Timeout.seconds(300);
private static final long MAXIMUM_BYTES_OUTSTANDING = 10000000;
private static final int DEFAULT_MAX_THREADS = 2;
private static final int DEFAULT_THREAD_EXPIRATION_SEC = 60;

@Rule public transient Timeout globalTimeout = Timeout.seconds(300);
private BoundedQueueExecutor executor;

private static ExecutableWork createWork(Consumer<Work> executeWorkFn) {
return ExecutableWork.create(
Work.create(
Windmill.WorkItem.newBuilder()
.setKey(ByteString.EMPTY)
.setShardingKey(1)
.setWorkToken(33)
.setCacheToken(1)
.build(),
Watermarks.builder().setInputDataWatermark(Instant.now()).build(),
Work.createProcessingContext(
"computationId",
(a, b) -> Windmill.KeyedGetDataResponse.getDefaultInstance(),
ignored -> {}),
Instant::now,
Collections.emptyList()),
executeWorkFn);
}

private Runnable createSleepProcessWorkFn(CountDownLatch start, CountDownLatch stop) {
Runnable runnable =
() -> {
Expand Down Expand Up @@ -203,14 +229,14 @@ public void testRecordTotalTimeMaxActiveThreadsUsed() throws Exception {
executor.execute(m3, 1);
assertFalse(processStart3.await(1000, TimeUnit.MILLISECONDS));

assertEquals(0l, executor.allThreadsActiveTime());
assertEquals(0L, executor.allThreadsActiveTime());
stop.countDown();
while (executor.activeCount() != 0) {
// Waiting for all threads to be ended.
Thread.sleep(200);
}
// Max pool size was reached so the allThreadsActiveTime() was updated.
assertThat(executor.allThreadsActiveTime(), greaterThan(0l));
assertThat(executor.allThreadsActiveTime(), greaterThan(0L));

executor.shutdown();
}
Expand Down Expand Up @@ -241,7 +267,7 @@ public void testRecordTotalTimeMaxActiveThreadsUsedWhenMaximumPoolSizeUpdated()
executor.execute(m3, 1);
assertFalse(processStart3.await(1000, TimeUnit.MILLISECONDS));

assertEquals(0l, executor.allThreadsActiveTime());
assertEquals(0L, executor.allThreadsActiveTime());
// Increase the max thread count
executor.setMaximumPoolSize(5, 105);
stop.countDown();
Expand All @@ -251,18 +277,54 @@ public void testRecordTotalTimeMaxActiveThreadsUsedWhenMaximumPoolSizeUpdated()
}
// Max pool size was updated during execution but allThreadsActiveTime() was still recorded
// for the thread which reached the old max pool size.
assertThat(executor.allThreadsActiveTime(), greaterThan(0l));
assertThat(executor.allThreadsActiveTime(), greaterThan(0L));

executor.shutdown();
}

@Test
public void testRenderSummaryHtml() throws Exception {
public void testRenderSummaryHtml() {
String expectedSummaryHtml =
"Worker Threads: 0/2<br>/n"
+ "Active Threads: 0<br>/n"
+ "Work Queue Size: 0/102<br>/n"
+ "Work Queue Bytes: 0/10000000<br>/n";
assertEquals(expectedSummaryHtml, executor.summaryHtml());
}

@Test
public void testExecute_updatesThreadNameForExecutableWork() throws InterruptedException {
CountDownLatch waitForWorkExecution = new CountDownLatch(1);
ExecutableWork executableWork =
createWork(
work -> {
assertTrue(
Thread.currentThread()
.getName()
.contains(
BoundedQueueExecutor.debugFormattedWorkToken(
work.getWorkItem().getWorkToken())));
waitForWorkExecution.countDown();
});
executor.execute(executableWork, executableWork.getWorkItem().getSerializedSize());
waitForWorkExecution.await();
}

@Test
public void testForceExecute_updatesThreadNameForExecutableWork() throws InterruptedException {
CountDownLatch waitForWorkExecution = new CountDownLatch(1);
ExecutableWork executableWork =
createWork(
work -> {
assertTrue(
Thread.currentThread()
.getName()
.contains(
BoundedQueueExecutor.debugFormattedWorkToken(
work.getWorkItem().getWorkToken())));
waitForWorkExecution.countDown();
});
executor.forceExecute(executableWork, executableWork.getWorkItem().getSerializedSize());
waitForWorkExecution.await();
}
}

0 comments on commit 0d037a9

Please sign in to comment.