Skip to content

Commit

Permalink
Fix error when ActiveWorkRefresher processed empty heartbeat map. (ap…
Browse files Browse the repository at this point in the history
  • Loading branch information
scwhittle authored Aug 5, 2024
1 parent ca744ae commit bfc64d5
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,9 @@ private void refreshActiveWork() {
Instant refreshDeadline = clock.get().minus(Duration.millis(activeWorkRefreshPeriodMillis));
Map<HeartbeatSender, Heartbeats> heartbeatsBySender =
aggregateHeartbeatsBySender(refreshDeadline);
if (heartbeatsBySender.isEmpty()) {
return;
}

List<CompletableFuture<Void>> fanOutRefreshActiveWork = new ArrayList<>();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.beam.runners.dataflow.worker.windmill.work.refresh;

import static com.google.common.truth.Truth.assertThat;
import static org.junit.Assert.*;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doAnswer;
Expand Down Expand Up @@ -194,10 +195,13 @@ public void testActiveWorkRefresh() throws InterruptedException {
assertThat(heartbeatRequests)
.comparingElementsUsing(
Correspondence.from(
(Windmill.HeartbeatRequest h, Work w) ->
h.getWorkToken() == w.getWorkItem().getWorkToken()
&& h.getCacheToken() == w.getWorkItem().getWorkToken()
&& h.getShardingKey() == w.getWorkItem().getShardingKey(),
(Windmill.HeartbeatRequest h, Work w) -> {
assert h != null;
assert w != null;
return h.getWorkToken() == w.getWorkItem().getWorkToken()
&& h.getCacheToken() == w.getWorkItem().getWorkToken()
&& h.getShardingKey() == w.getWorkItem().getShardingKey();
},
"heartbeatRequest's and Work's workTokens, cacheTokens, and shardingKeys should be equal."))
.containsExactlyElementsIn(work);
}
Expand All @@ -207,6 +211,32 @@ public void testActiveWorkRefresh() throws InterruptedException {
workIsProcessed.countDown();
}

@Test
public void testEmptyActiveWorkRefresh() throws InterruptedException {
int activeWorkRefreshPeriodMillis = 100;

List<ComputationState> computations = new ArrayList<>();
for (int i = 0; i < 5; i++) {
ComputationState computationState = createComputationState(i);
computations.add(computationState);
}

CountDownLatch heartbeatsSent = new CountDownLatch(1);
TestClock fakeClock = new TestClock(Instant.now());
ActiveWorkRefresher activeWorkRefresher =
createActiveWorkRefresher(
fakeClock::now,
activeWorkRefreshPeriodMillis,
0,
() -> computations,
heartbeats -> heartbeatsSent::countDown);

activeWorkRefresher.start();
fakeClock.advance(Duration.millis(activeWorkRefreshPeriodMillis * 2));
assertFalse(heartbeatsSent.await(500, TimeUnit.MILLISECONDS));
activeWorkRefresher.stop();
}

@Test
public void testInvalidateStuckCommits() throws InterruptedException {
int stuckCommitDurationMillis = 100;
Expand Down

0 comments on commit bfc64d5

Please sign in to comment.