diff --git a/core/client/fs/src/main/java/alluxio/client/file/ConfigHashSync.java b/core/client/fs/src/main/java/alluxio/client/file/ConfigHashSync.java index 144be4e7f6f1..b94ffd6d0651 100644 --- a/core/client/fs/src/main/java/alluxio/client/file/ConfigHashSync.java +++ b/core/client/fs/src/main/java/alluxio/client/file/ConfigHashSync.java @@ -71,7 +71,7 @@ public Optional getException() { } @Override - public synchronized void heartbeat() { + public synchronized void heartbeat(long timeLimitMs) { if (!mContext.getClientContext().getClusterConf().clusterDefaultsLoaded()) { // Wait until the initial cluster defaults are loaded. return; diff --git a/core/client/fs/src/main/java/alluxio/client/file/FileSystemContextReinitializer.java b/core/client/fs/src/main/java/alluxio/client/file/FileSystemContextReinitializer.java index 78ae526be8e6..ae7e9049e95c 100644 --- a/core/client/fs/src/main/java/alluxio/client/file/FileSystemContextReinitializer.java +++ b/core/client/fs/src/main/java/alluxio/client/file/FileSystemContextReinitializer.java @@ -66,7 +66,7 @@ public FileSystemContextReinitializer(FileSystemContext context) { mExecutor = new ConfigHashSync(context); mFuture = REINIT_EXECUTOR.scheduleAtFixedRate(() -> { try { - mExecutor.heartbeat(); + mExecutor.heartbeat(Long.MAX_VALUE); } catch (Exception e) { LOG.error("Uncaught exception in config heartbeat executor, shutting down", e); } diff --git a/core/common/src/main/java/alluxio/heartbeat/CronExpressionIntervalSupplier.java b/core/common/src/main/java/alluxio/heartbeat/CronExpressionIntervalSupplier.java new file mode 100644 index 000000000000..e632e472dac6 --- /dev/null +++ b/core/common/src/main/java/alluxio/heartbeat/CronExpressionIntervalSupplier.java @@ -0,0 +1,59 @@ +/* + * The Alluxio Open Foundation licenses this work under the Apache License, version 2.0 + * (the "License"). You may not use this work except in compliance with the License, which is + * available at www.apache.org/licenses/LICENSE-2.0 + * + * This software is distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied, as more fully set forth in the License. + * + * See the NOTICE file distributed with this work for information regarding copyright ownership. + */ + +package alluxio.heartbeat; + +import org.apache.logging.log4j.core.util.CronExpression; + +import java.time.Duration; +import java.time.Instant; +import java.util.Date; + +/** +* Calculate the next interval by given cron expression. +*/ +public class CronExpressionIntervalSupplier implements SleepIntervalSupplier { + private final long mInterval; + private final CronExpression mCron; + + /** + * Constructs a new {@link CronExpressionIntervalSupplier}. + * + * @param cronExpression the cron expression + * @param fixedInterval the fixed interval + */ + public CronExpressionIntervalSupplier(CronExpression cronExpression, long fixedInterval) { + mInterval = fixedInterval; + mCron = cronExpression; + } + + @Override + public long getNextInterval(long mPreviousTickedMs, long nowTimeStampMillis) { + long nextInterval = 0; + long executionTimeMs = nowTimeStampMillis - mPreviousTickedMs; + if (executionTimeMs < mInterval) { + nextInterval = mInterval - executionTimeMs; + } + Date now = Date.from(Instant.ofEpochMilli(nowTimeStampMillis + nextInterval)); + if (mCron.isSatisfiedBy(now)) { + return nextInterval; + } + return nextInterval + Duration.between( + now.toInstant(), mCron.getNextValidTimeAfter(now).toInstant()).toMillis(); + } + + @Override + public long getRunLimit(long mPreviousTickedMs) { + Date now = Date.from(Instant.ofEpochMilli(mPreviousTickedMs)); + return Duration.between(now.toInstant(), + mCron.getNextInvalidTimeAfter(now).toInstant()).toMillis(); + } +} diff --git a/core/common/src/main/java/alluxio/heartbeat/FixedIntervalSupplier.java b/core/common/src/main/java/alluxio/heartbeat/FixedIntervalSupplier.java new file mode 100644 index 000000000000..1269f5996112 --- /dev/null +++ b/core/common/src/main/java/alluxio/heartbeat/FixedIntervalSupplier.java @@ -0,0 +1,63 @@ +/* + * The Alluxio Open Foundation licenses this work under the Apache License, version 2.0 + * (the "License"). You may not use this work except in compliance with the License, which is + * available at www.apache.org/licenses/LICENSE-2.0 + * + * This software is distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied, as more fully set forth in the License. + * + * See the NOTICE file distributed with this work for information regarding copyright ownership. + */ + +package alluxio.heartbeat; + +import org.slf4j.Logger; +import org.slf4j.helpers.NOPLogger; + +/** + * Fixed interval supplier. + */ +public class FixedIntervalSupplier implements SleepIntervalSupplier { + + private final long mInterval; + protected final Logger mLogger; + + /** + * Constructs a new {@link FixedIntervalSupplier}. + * + * @param fixedInterval the fixed interval + * @param logger the logger + */ + public FixedIntervalSupplier(long fixedInterval, Logger logger) { + mInterval = fixedInterval; + mLogger = logger; + } + + /** + * Constructs a new {@link FixedIntervalSupplier}. + * + * @param fixedInterval the fixed interval + */ + public FixedIntervalSupplier(long fixedInterval) { + this(fixedInterval, NOPLogger.NOP_LOGGER); + } + + @Override + public long getNextInterval(long mPreviousTickedMs, long nowTimeStampMillis) { + if (mPreviousTickedMs == -1) { + return -1; + } + long executionTimeMs = nowTimeStampMillis - mPreviousTickedMs; + if (executionTimeMs > mInterval) { + mLogger.warn("{} last execution took {} ms. Longer than the interval {}", + Thread.currentThread().getName(), executionTimeMs, mInterval); + return 0; + } + return mInterval - executionTimeMs; + } + + @Override + public long getRunLimit(long mPreviousTickedMs) { + return mInterval; + } +} diff --git a/core/common/src/main/java/alluxio/heartbeat/HeartbeatExecutor.java b/core/common/src/main/java/alluxio/heartbeat/HeartbeatExecutor.java index a10c4662c5c5..2b8e96ec7532 100644 --- a/core/common/src/main/java/alluxio/heartbeat/HeartbeatExecutor.java +++ b/core/common/src/main/java/alluxio/heartbeat/HeartbeatExecutor.java @@ -15,15 +15,17 @@ /** * An interface for a heartbeat execution. The {@link HeartbeatThread} calls the - * {@link #heartbeat()} method. + * {@link #heartbeat(long)} method. */ public interface HeartbeatExecutor extends Closeable { + /** * Implements the heartbeat logic. * + * @param timeLimitMs time limit in milliseconds this heartbeat should not exceed when running * @throws InterruptedException if the thread is interrupted */ - void heartbeat() throws InterruptedException; + void heartbeat(long timeLimitMs) throws InterruptedException; /** * Cleans up any resources used by the heartbeat executor. diff --git a/core/common/src/main/java/alluxio/heartbeat/HeartbeatThread.java b/core/common/src/main/java/alluxio/heartbeat/HeartbeatThread.java index 2bb891d67c19..cc9b200bfe5f 100644 --- a/core/common/src/main/java/alluxio/heartbeat/HeartbeatThread.java +++ b/core/common/src/main/java/alluxio/heartbeat/HeartbeatThread.java @@ -12,7 +12,6 @@ package alluxio.heartbeat; import alluxio.conf.AlluxioConfiguration; -import alluxio.conf.Reconfigurable; import alluxio.conf.ReconfigurableRegistry; import alluxio.security.authentication.AuthenticatedClientUser; import alluxio.security.user.UserState; @@ -21,11 +20,12 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; -import com.google.common.base.Supplier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.time.Clock; +import java.util.function.Supplier; import javax.annotation.concurrent.NotThreadSafe; /** @@ -33,13 +33,12 @@ * the JVM from exiting. */ @NotThreadSafe -public final class HeartbeatThread implements Runnable, Reconfigurable { +public final class HeartbeatThread implements Runnable { private static final Logger LOG = LoggerFactory.getLogger(HeartbeatThread.class); private final String mThreadName; private final HeartbeatExecutor mExecutor; private final UserState mUserState; - private final Supplier mIntervalSupplier; private HeartbeatTimer mTimer; private AlluxioConfiguration mConfiguration; private Status mStatus; @@ -73,26 +72,28 @@ public static String generateThreadName(String executorName, String threadId) { * @param intervalSupplier Sleep time between different heartbeat supplier * @param conf Alluxio configuration * @param userState the user state for this heartbeat thread + * @param clock the clock used to compute the current time */ public HeartbeatThread(String executorName, String threadId, HeartbeatExecutor executor, - Supplier intervalSupplier, AlluxioConfiguration conf, UserState userState) { + Supplier intervalSupplier, + AlluxioConfiguration conf, UserState userState, Clock clock) { mThreadName = generateThreadName(executorName, threadId); mExecutor = Preconditions.checkNotNull(executor, "executor"); Class timerClass = HeartbeatContext.getTimerClass(executorName); - mTimer = CommonUtils.createNewClassInstance(timerClass, new Class[] {String.class, long.class}, - new Object[] {mThreadName, intervalSupplier.get()}); + mTimer = CommonUtils.createNewClassInstance(timerClass, + new Class[] {String.class, Clock.class, Supplier.class}, + new Object[] {mThreadName, clock, intervalSupplier}); mConfiguration = conf; mUserState = userState; - mIntervalSupplier = intervalSupplier; mStatus = Status.INIT; - ReconfigurableRegistry.register(this); + ReconfigurableRegistry.register(mTimer); } /** * Convenience method for * {@link * #HeartbeatThread(String, String, HeartbeatExecutor, Supplier, AlluxioConfiguration, - * UserState)} where threadId is null. + * UserState, Clock)} where threadId is null. * * @param executorName the executor name that is one of those defined in {@link HeartbeatContext} * @param executor the heartbeat executor @@ -101,12 +102,34 @@ public HeartbeatThread(String executorName, String threadId, HeartbeatExecutor e * @param userState the user state for this heartbeat thread */ public HeartbeatThread(String executorName, HeartbeatExecutor executor, - Supplier intervalSupplier, AlluxioConfiguration conf, UserState userState) { - this(executorName, null, executor, intervalSupplier, conf, userState); + Supplier intervalSupplier, AlluxioConfiguration conf, + UserState userState) { + this(executorName, null, executor, intervalSupplier, conf, userState, Clock.systemUTC()); + } + + /** + * Convenience method for + * {@link + * #HeartbeatThread(String, String, HeartbeatExecutor, Supplier, AlluxioConfiguration, + * UserState, Clock)} where threadId is null. + * + * @param executorName the executor name that is one of those defined in {@link HeartbeatContext} + * @param executor the heartbeat executor + * @param intervalSupplier the interval between heartbeats supplier + * @param conf the Alluxio configuration + * @param userState the user state for this heartbeat thread + * @param clock the clock used to compute the current time + */ + public HeartbeatThread(String executorName, HeartbeatExecutor executor, + Supplier intervalSupplier, + AlluxioConfiguration conf, UserState userState, Clock clock) { + this(executorName, null, executor, intervalSupplier, + conf, userState, clock); } @Override public void run() { + long counter = 0L; try { if (SecurityUtils.isSecurityEnabled(mConfiguration) && AuthenticatedClientUser.get(mConfiguration) == null) { @@ -123,9 +146,10 @@ public void run() { while (!Thread.interrupted()) { // TODO(peis): Fix this. The current implementation consumes one thread even when ticking. mStatus = Status.WAITING; - mTimer.tick(); + long limitTime = mTimer.tick(); mStatus = Status.RUNNING; - mExecutor.heartbeat(); + LOG.debug("{} #{} will run limited in {}s", mThreadName, counter++, limitTime / 1000); + mExecutor.heartbeat(limitTime); } } catch (InterruptedException e) { // Allow thread to exit. @@ -133,19 +157,11 @@ public void run() { LOG.error("Uncaught exception in heartbeat executor, Heartbeat Thread shutting down", e); } finally { mStatus = Status.STOPPED; + ReconfigurableRegistry.unregister(mTimer); mExecutor.close(); } } - /** - * Updates the heartbeat interval. - * - * @param intervalMs the heartbeat interval in ms - */ - public void updateIntervalMs(long intervalMs) { - mTimer.setIntervalMs(intervalMs); - } - /** * @return the status of current heartbeat thread */ @@ -153,18 +169,6 @@ public Status getStatus() { return mStatus; } - @Override - public void update() { - if (mStatus == Status.STOPPED) { - ReconfigurableRegistry.unregister(this); - return; - } - long interval = mIntervalSupplier.get(); - if (interval != mTimer.getIntervalMs()) { - updateIntervalMs(interval); - } - } - /** * Enum representing the status of HeartbeatThread. */ diff --git a/core/common/src/main/java/alluxio/heartbeat/HeartbeatTimer.java b/core/common/src/main/java/alluxio/heartbeat/HeartbeatTimer.java index 96e9618af3ea..736037234edd 100644 --- a/core/common/src/main/java/alluxio/heartbeat/HeartbeatTimer.java +++ b/core/common/src/main/java/alluxio/heartbeat/HeartbeatTimer.java @@ -11,33 +11,27 @@ package alluxio.heartbeat; +import alluxio.conf.Reconfigurable; + /** * An interface for heartbeat timers. The {@link HeartbeatThread} calls the {@link #tick()} method. */ -public interface HeartbeatTimer { +public interface HeartbeatTimer extends Reconfigurable { /** - * Sets the heartbeat interval. - * - * @param intervalMs the heartbeat interval in ms - */ - default void setIntervalMs(long intervalMs) { - throw new UnsupportedOperationException("Setting interval is not supported"); - } - - /** - * Get the interval of HeartbeatTimer. - * - * @return the interval of this HeartbeatTimer + * When this object needs to be reconfigured + * due to external configuration change etc., + * this function will be invoked. */ - default long getIntervalMs() { - throw new UnsupportedOperationException("Getting interval is not supported"); + default void update() { } /** * Waits until next heartbeat should be executed. * + * @return time limit in milliseconds for this heartbeat action to run for before + * the next heartbeat is due. * @throws InterruptedException if the thread is interrupted while waiting */ - void tick() throws InterruptedException; + long tick() throws InterruptedException; } diff --git a/core/common/src/main/java/alluxio/heartbeat/ScheduledTimer.java b/core/common/src/main/java/alluxio/heartbeat/ScheduledTimer.java index 62b6d5667d83..cff75372105c 100644 --- a/core/common/src/main/java/alluxio/heartbeat/ScheduledTimer.java +++ b/core/common/src/main/java/alluxio/heartbeat/ScheduledTimer.java @@ -15,9 +15,11 @@ import com.google.common.base.Preconditions; +import java.time.Clock; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Supplier; import javax.annotation.concurrent.ThreadSafe; /** @@ -46,9 +48,11 @@ public final class ScheduledTimer implements HeartbeatTimer { * Creates a new instance of {@link ScheduledTimer}. * * @param threadName the thread name - * @param intervalMs the heartbeat interval (unused) + * @param clock for telling the current time (unused) + * @param intervalSupplierSupplier Sleep time between different heartbeat supplier */ - public ScheduledTimer(String threadName, long intervalMs) { + public ScheduledTimer(String threadName, Clock clock, + Supplier intervalSupplierSupplier) { mThreadName = threadName; mLock = new ReentrantLock(); mTickCondition = mLock.newCondition(); @@ -77,7 +81,7 @@ protected void schedule() { } @Override - public void tick() throws InterruptedException { + public long tick() throws InterruptedException { try (LockResource r = new LockResource(mLock)) { HeartbeatScheduler.addTimer(this); // Wait in a loop to handle spurious wakeups @@ -87,5 +91,6 @@ public void tick() throws InterruptedException { mScheduled = false; } + return Long.MAX_VALUE; } } diff --git a/core/common/src/main/java/alluxio/heartbeat/SleepIntervalSupplier.java b/core/common/src/main/java/alluxio/heartbeat/SleepIntervalSupplier.java new file mode 100644 index 000000000000..cde2ddd5ff3f --- /dev/null +++ b/core/common/src/main/java/alluxio/heartbeat/SleepIntervalSupplier.java @@ -0,0 +1,34 @@ +/* + * The Alluxio Open Foundation licenses this work under the Apache License, version 2.0 + * (the "License"). You may not use this work except in compliance with the License, which is + * available at www.apache.org/licenses/LICENSE-2.0 + * + * This software is distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied, as more fully set forth in the License. + * + * See the NOTICE file distributed with this work for information regarding copyright ownership. + */ + +package alluxio.heartbeat; + +/** + * A policy to calculate the next interval to sleep. + */ +public interface SleepIntervalSupplier { + /** + * Gets the next interval for sleeping. + * + * @param mPreviousTickedMs previous ticked time stamp in millisecond + * @param nowTimeStampMillis current time stamp in millisecond + * @return the interval to sleep starting from now before next time the timer triggers + */ + long getNextInterval(long mPreviousTickedMs, long nowTimeStampMillis); + + /** + * Gets the run limit from previous ticked. + * + * @param mPreviousTickedMs previous ticked time stamp in millisecond + * @return the run limit + */ + long getRunLimit(long mPreviousTickedMs); +} diff --git a/core/common/src/main/java/alluxio/heartbeat/SleepingTimer.java b/core/common/src/main/java/alluxio/heartbeat/SleepingTimer.java index d6d4ad2589ab..2e444de5b892 100644 --- a/core/common/src/main/java/alluxio/heartbeat/SleepingTimer.java +++ b/core/common/src/main/java/alluxio/heartbeat/SleepingTimer.java @@ -11,7 +11,6 @@ package alluxio.heartbeat; -import alluxio.clock.SystemClock; import alluxio.time.Sleeper; import alluxio.time.ThreadSleeper; @@ -20,57 +19,52 @@ import java.time.Clock; import java.time.Duration; +import java.util.function.Supplier; import javax.annotation.concurrent.NotThreadSafe; /** * This class can be used for executing heartbeats periodically. */ @NotThreadSafe -public final class SleepingTimer implements HeartbeatTimer { - private long mIntervalMs; - private long mPreviousTickMs; +public class SleepingTimer implements HeartbeatTimer { + protected long mPreviousTickedMs = -1; private final String mThreadName; - private final Logger mLogger; - private final Clock mClock; - private final Sleeper mSleeper; + protected final Logger mLogger; + protected final Clock mClock; + protected final Sleeper mSleeper; + protected final Supplier mIntervalSupplierSupplier; + protected SleepIntervalSupplier mIntervalSupplier; /** * Creates a new instance of {@link SleepingTimer}. * * @param threadName the thread name - * @param intervalMs the heartbeat interval + * @param clock for telling the current time + * @param intervalSupplierSupplier Sleep time between different heartbeat supplier */ - public SleepingTimer(String threadName, long intervalMs) { - this(threadName, intervalMs, LoggerFactory.getLogger(SleepingTimer.class), - new SystemClock(), ThreadSleeper.INSTANCE); + public SleepingTimer(String threadName, Clock clock, + Supplier intervalSupplierSupplier) { + this(threadName, LoggerFactory.getLogger(SleepingTimer.class), + clock, ThreadSleeper.INSTANCE, intervalSupplierSupplier); } /** * Creates a new instance of {@link SleepingTimer}. * * @param threadName the thread name - * @param intervalMs the heartbeat interval * @param logger the logger to log to * @param clock for telling the current time * @param sleeper the utility to use for sleeping + * @param intervalSupplierSupplier Sleep time between different heartbeat supplier */ - public SleepingTimer(String threadName, long intervalMs, Logger logger, Clock clock, - Sleeper sleeper) { - mIntervalMs = intervalMs; + public SleepingTimer(String threadName, Logger logger, Clock clock, Sleeper sleeper, + Supplier intervalSupplierSupplier) { mThreadName = threadName; mLogger = logger; mClock = clock; mSleeper = sleeper; - } - - @Override - public void setIntervalMs(long intervalMs) { - mIntervalMs = intervalMs; - } - - @Override - public long getIntervalMs() { - return mIntervalMs; + mIntervalSupplierSupplier = intervalSupplierSupplier; + mIntervalSupplier = intervalSupplierSupplier.get(); } /** @@ -79,16 +73,18 @@ public long getIntervalMs() { * @throws InterruptedException if the thread is interrupted while waiting */ @Override - public void tick() throws InterruptedException { - if (mPreviousTickMs != 0) { - long executionTimeMs = mClock.millis() - mPreviousTickMs; - if (executionTimeMs > mIntervalMs) { - mLogger.warn("{} last execution took {} ms. Longer than the interval {}", mThreadName, - executionTimeMs, mIntervalMs); - } else { - mSleeper.sleep(Duration.ofMillis(mIntervalMs - executionTimeMs)); - } + public long tick() throws InterruptedException { + long nextInterval = mIntervalSupplier.getNextInterval(mPreviousTickedMs, mClock.millis()); + if (nextInterval > 0) { + mSleeper.sleep(Duration.ofMillis(nextInterval)); } - mPreviousTickMs = mClock.millis(); + mPreviousTickedMs = mClock.millis(); + return mIntervalSupplier.getRunLimit(mPreviousTickedMs); + } + + @Override + public void update() { + mIntervalSupplier = mIntervalSupplierSupplier.get(); + mLogger.info("update {} interval supplier.", mThreadName); } } diff --git a/core/common/src/test/java/alluxio/heartbeat/HeartbeatContextTest.java b/core/common/src/test/java/alluxio/heartbeat/HeartbeatContextTest.java index f5c222739dc0..0c972baf44db 100644 --- a/core/common/src/test/java/alluxio/heartbeat/HeartbeatContextTest.java +++ b/core/common/src/test/java/alluxio/heartbeat/HeartbeatContextTest.java @@ -21,7 +21,7 @@ */ public final class HeartbeatContextTest { @Test - public void allThreadsUseSleepingTimer() { + public void allThreadsUseProductionTimer() { for (String threadName : HeartbeatContext.getTimerClasses().keySet()) { Class timerClass = HeartbeatContext.getTimerClass(threadName); assertTrue(timerClass.isAssignableFrom(SleepingTimer.class)); diff --git a/core/common/src/test/java/alluxio/heartbeat/HeartbeatThreadTest.java b/core/common/src/test/java/alluxio/heartbeat/HeartbeatThreadTest.java index 5d09135dc7ea..921e250984da 100644 --- a/core/common/src/test/java/alluxio/heartbeat/HeartbeatThreadTest.java +++ b/core/common/src/test/java/alluxio/heartbeat/HeartbeatThreadTest.java @@ -139,7 +139,8 @@ public Void call() throws Exception { try (ManuallyScheduleHeartbeat.Resource r = new ManuallyScheduleHeartbeat.Resource(Arrays.asList(mThreadName))) { DummyHeartbeatExecutor executor = new DummyHeartbeatExecutor(); - HeartbeatThread ht = new HeartbeatThread(mThreadName, executor, () -> 1L, + HeartbeatThread ht = new HeartbeatThread(mThreadName, executor, + () -> new FixedIntervalSupplier(1L), Configuration.global(), UserState.Factory.create(Configuration.global())); // Run the HeartbeatThread. @@ -166,7 +167,7 @@ private class DummyHeartbeatExecutor implements HeartbeatExecutor { private int mCounter = 0; @Override - public void heartbeat() { + public void heartbeat(long timeLimitMs) { mCounter++; } diff --git a/core/common/src/test/java/alluxio/heartbeat/SleepingTimerForCronExpressionIntervalSupplierTest.java b/core/common/src/test/java/alluxio/heartbeat/SleepingTimerForCronExpressionIntervalSupplierTest.java new file mode 100644 index 000000000000..81d9d5e4bc06 --- /dev/null +++ b/core/common/src/test/java/alluxio/heartbeat/SleepingTimerForCronExpressionIntervalSupplierTest.java @@ -0,0 +1,121 @@ +/* + * The Alluxio Open Foundation licenses this work under the Apache License, version 2.0 + * (the "License"). You may not use this work except in compliance with the License, which is + * available at www.apache.org/licenses/LICENSE-2.0 + * + * This software is distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied, as more fully set forth in the License. + * + * See the NOTICE file distributed with this work for information regarding copyright ownership. + */ + +package alluxio.heartbeat; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; + +import alluxio.Constants; +import alluxio.clock.ManualClock; +import alluxio.time.Sleeper; + +import org.apache.logging.log4j.core.util.CronExpression; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; + +import java.text.DateFormat; +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.time.Duration; +import java.util.Date; + +/** + * Unit tests for {@link SleepingTimer}. + */ +public final class SleepingTimerForCronExpressionIntervalSupplierTest { + private static final String THREAD_NAME = "cron-test-thread-name"; + private static final long INTERVAL_MS = 10 * Constants.MINUTE_MS; + private Logger mMockLogger; + private ManualClock mFakeClock; + private Sleeper mMockSleeper; + private long mAllSleepTimeMs; + + @Before + public void before() throws InterruptedException { + mMockLogger = mock(Logger.class); + mFakeClock = new ManualClock(); + mMockSleeper = mock(Sleeper.class); + doAnswer((invocation) -> { + Duration duration = invocation.getArgument(0); + mFakeClock.addTime(duration); + mAllSleepTimeMs += duration.toMillis(); + return null; + }).when(mMockSleeper).sleep(any(Duration.class)); + } + + /** + * Tests that the cron timer will attempt to run at the same interval, independently of how + * long the execution between ticks takes. For example, if the interval is 100ms and execution + * takes 80ms, the timer should sleep for only 20ms to maintain the regular interval of 100ms. + */ + @Test + public void maintainInterval() throws Exception { + SleepingTimer timer = + new SleepingTimer(THREAD_NAME, mMockLogger, mFakeClock, mMockSleeper, + () -> { + try { + return new CronExpressionIntervalSupplier( + new CronExpression("* 30-59 0-1,4-9,13-23 * * ? *"), INTERVAL_MS); + } catch (ParseException e) { + throw new RuntimeException(e); + } + }); + DateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + Date startDate = formatter.parse("2022-01-01 00:00:00"); + Assert.assertEquals(-1, timer.mPreviousTickedMs); + mFakeClock.setTimeMs(startDate.getTime()); + long limitMs = timer.tick(); + long lastAllSleepTimeMs = mAllSleepTimeMs; + Assert.assertEquals(30 * Constants.MINUTE_MS, mAllSleepTimeMs); + Assert.assertEquals(30 * Constants.MINUTE_MS, limitMs); + Assert.assertEquals(formatter.parse("2022-01-01 00:30:00"), new Date(timer.mPreviousTickedMs)); + Assert.assertEquals(formatter.parse("2022-01-01 00:30:00"), new Date(mFakeClock.millis())); + // Mock heartbeat 1 minute + mFakeClock.addTime(Duration.ofMinutes(1)); + + limitMs = timer.tick(); + Assert.assertEquals(9 * Constants.MINUTE_MS, mAllSleepTimeMs - lastAllSleepTimeMs); + lastAllSleepTimeMs = mAllSleepTimeMs; + Assert.assertEquals(20 * Constants.MINUTE_MS, limitMs); + Assert.assertEquals(formatter.parse("2022-01-01 00:40:00"), new Date(timer.mPreviousTickedMs)); + Assert.assertEquals(formatter.parse("2022-01-01 00:40:00"), new Date(mFakeClock.millis())); + // Mock heartbeat 5 minute + mFakeClock.addTime(Duration.ofMinutes(5)); + + limitMs = timer.tick(); + Assert.assertEquals(5 * Constants.MINUTE_MS, mAllSleepTimeMs - lastAllSleepTimeMs); + lastAllSleepTimeMs = mAllSleepTimeMs; + Assert.assertEquals(10 * Constants.MINUTE_MS, limitMs); + Assert.assertEquals(formatter.parse("2022-01-01 00:50:00"), new Date(timer.mPreviousTickedMs)); + Assert.assertEquals(formatter.parse("2022-01-01 00:50:00"), new Date(mFakeClock.millis())); + // Mock heartbeat 5 minute + mFakeClock.addTime(Duration.ofMinutes(5)); + + limitMs = timer.tick(); + Assert.assertEquals(35 * Constants.MINUTE_MS, mAllSleepTimeMs - lastAllSleepTimeMs); + lastAllSleepTimeMs = mAllSleepTimeMs; + Assert.assertEquals(30 * Constants.MINUTE_MS, limitMs); + Assert.assertEquals(formatter.parse("2022-01-01 01:30:00"), new Date(timer.mPreviousTickedMs)); + Assert.assertEquals(formatter.parse("2022-01-01 01:30:00"), new Date(mFakeClock.millis())); + // Mock heartbeat 30 minute + mFakeClock.addTime(Duration.ofMinutes(30)); + + limitMs = timer.tick(); + Assert.assertEquals(150 * Constants.MINUTE_MS, mAllSleepTimeMs - lastAllSleepTimeMs); + Assert.assertEquals(30 * Constants.MINUTE_MS, limitMs); + Assert.assertEquals(formatter.parse("2022-01-01 04:30:00"), new Date(timer.mPreviousTickedMs)); + Assert.assertEquals(formatter.parse("2022-01-01 04:30:00"), new Date(mFakeClock.millis())); + } +} diff --git a/core/common/src/test/java/alluxio/heartbeat/SleepingTimerTest.java b/core/common/src/test/java/alluxio/heartbeat/SleepingTimerTest.java index ae8ef03d8aea..6a4f79447574 100644 --- a/core/common/src/test/java/alluxio/heartbeat/SleepingTimerTest.java +++ b/core/common/src/test/java/alluxio/heartbeat/SleepingTimerTest.java @@ -47,7 +47,8 @@ public void before() { @Test public void warnWhenExecutionTakesLongerThanInterval() throws Exception { SleepingTimer timer = - new SleepingTimer(THREAD_NAME, INTERVAL_MS, mMockLogger, mFakeClock, mMockSleeper); + new SleepingTimer(THREAD_NAME, mMockLogger, mFakeClock, mMockSleeper, + () -> new FixedIntervalSupplier(INTERVAL_MS, mMockLogger)); timer.tick(); mFakeClock.addTimeMs(5 * INTERVAL_MS); @@ -60,7 +61,8 @@ public void warnWhenExecutionTakesLongerThanInterval() throws Exception { @Test public void sleepForSpecifiedInterval() throws Exception { final SleepingTimer timer = - new SleepingTimer(THREAD_NAME, INTERVAL_MS, mMockLogger, mFakeClock, mMockSleeper); + new SleepingTimer(THREAD_NAME, mMockLogger, mFakeClock, mMockSleeper, + () -> new FixedIntervalSupplier(INTERVAL_MS)); timer.tick(); // first tick won't sleep verify(mMockSleeper, times(0)).sleep(any(Duration.class)); timer.tick(); @@ -75,7 +77,8 @@ public void sleepForSpecifiedInterval() throws Exception { @Test public void maintainInterval() throws Exception { SleepingTimer stimer = - new SleepingTimer(THREAD_NAME, INTERVAL_MS, mMockLogger, mFakeClock, mMockSleeper); + new SleepingTimer(THREAD_NAME, mMockLogger, mFakeClock, mMockSleeper, + () -> new FixedIntervalSupplier(INTERVAL_MS)); stimer.tick(); mFakeClock.addTimeMs(INTERVAL_MS / 3); diff --git a/core/server/master/src/main/java/alluxio/master/block/DefaultBlockMaster.java b/core/server/master/src/main/java/alluxio/master/block/DefaultBlockMaster.java index 733dcb29fc3d..c5f740d1fbc7 100644 --- a/core/server/master/src/main/java/alluxio/master/block/DefaultBlockMaster.java +++ b/core/server/master/src/main/java/alluxio/master/block/DefaultBlockMaster.java @@ -41,6 +41,7 @@ import alluxio.grpc.ServiceType; import alluxio.grpc.StorageList; import alluxio.grpc.WorkerLostStorageInfo; +import alluxio.heartbeat.FixedIntervalSupplier; import alluxio.heartbeat.HeartbeatContext; import alluxio.heartbeat.HeartbeatExecutor; import alluxio.heartbeat.HeartbeatThread; @@ -513,7 +514,7 @@ public class WorkerRegisterStreamGCExecutor implements HeartbeatExecutor { .getMs(PropertyKey.MASTER_WORKER_REGISTER_STREAM_RESPONSE_TIMEOUT); @Override - public void heartbeat() { + public void heartbeat(long timeLimitMs) { AtomicInteger removedSessions = new AtomicInteger(0); mActiveRegisterContexts.entrySet().removeIf((entry) -> { WorkerRegisterContext context = entry.getValue(); @@ -558,7 +559,8 @@ public void start(Boolean isLeader) throws IOException { if (isLeader || mWorkerRegisterToAllMasters) { getExecutorService().submit(new HeartbeatThread( HeartbeatContext.MASTER_LOST_WORKER_DETECTION, new LostWorkerDetectionHeartbeatExecutor(), - () -> Configuration.getMs(PropertyKey.MASTER_LOST_WORKER_DETECTION_INTERVAL), + () -> new FixedIntervalSupplier( + Configuration.getMs(PropertyKey.MASTER_LOST_WORKER_DETECTION_INTERVAL)), Configuration.global(), mMasterContext.getUserState())); } @@ -566,7 +568,8 @@ HeartbeatContext.MASTER_LOST_WORKER_DETECTION, new LostWorkerDetectionHeartbeatE getExecutorService().submit(new HeartbeatThread( HeartbeatContext.MASTER_WORKER_REGISTER_SESSION_CLEANER, new WorkerRegisterStreamGCExecutor(), - () -> Configuration.getMs(PropertyKey.MASTER_WORKER_REGISTER_STREAM_RESPONSE_TIMEOUT), + () -> new FixedIntervalSupplier(Configuration.getMs( + PropertyKey.MASTER_WORKER_REGISTER_STREAM_RESPONSE_TIMEOUT)), Configuration.global(), mMasterContext.getUserState())); } @@ -1759,7 +1762,7 @@ public final class LostWorkerDetectionHeartbeatExecutor implements HeartbeatExec public LostWorkerDetectionHeartbeatExecutor() {} @Override - public void heartbeat() { + public void heartbeat(long timeLimitMs) { long masterWorkerTimeoutMs = Configuration.getMs(PropertyKey.MASTER_WORKER_TIMEOUT_MS); long masterWorkerDeleteTimeoutMs = Configuration.getMs(PropertyKey.MASTER_LOST_WORKER_DELETION_TIMEOUT_MS); diff --git a/core/server/master/src/main/java/alluxio/master/block/meta/MasterWorkerInfo.java b/core/server/master/src/main/java/alluxio/master/block/meta/MasterWorkerInfo.java index 8974ce548176..b2d08a66fcb3 100644 --- a/core/server/master/src/main/java/alluxio/master/block/meta/MasterWorkerInfo.java +++ b/core/server/master/src/main/java/alluxio/master/block/meta/MasterWorkerInfo.java @@ -111,7 +111,7 @@ * and block removal/commit. * 2. In {@link alluxio.master.block.WorkerRegisterContext}, * to write locks are held throughout the lifecycle. - * 3. In {@link DefaultBlockMaster.LostWorkerDetectionHeartbeatExecutor#heartbeat()} + * 3. In {@link DefaultBlockMaster.LostWorkerDetectionHeartbeatExecutor#heartbeat(long)} */ @NotThreadSafe public final class MasterWorkerInfo { diff --git a/core/server/master/src/main/java/alluxio/master/file/BlockIntegrityChecker.java b/core/server/master/src/main/java/alluxio/master/file/BlockIntegrityChecker.java index 24334a592eb0..46370d2f91f2 100644 --- a/core/server/master/src/main/java/alluxio/master/file/BlockIntegrityChecker.java +++ b/core/server/master/src/main/java/alluxio/master/file/BlockIntegrityChecker.java @@ -39,7 +39,7 @@ public BlockIntegrityChecker(FileSystemMaster fsm) { } @Override - public void heartbeat() { + public void heartbeat(long timeLimitMs) { try { mFileSystemMaster.validateInodeBlocks(mRepair); } catch (Exception e) { diff --git a/core/server/master/src/main/java/alluxio/master/file/DefaultFileSystemMaster.java b/core/server/master/src/main/java/alluxio/master/file/DefaultFileSystemMaster.java index 5e2eb91255dc..eaf8193f483a 100644 --- a/core/server/master/src/main/java/alluxio/master/file/DefaultFileSystemMaster.java +++ b/core/server/master/src/main/java/alluxio/master/file/DefaultFileSystemMaster.java @@ -60,6 +60,7 @@ import alluxio.grpc.SetAclAction; import alluxio.grpc.SetAttributePOptions; import alluxio.grpc.TtlAction; +import alluxio.heartbeat.FixedIntervalSupplier; import alluxio.heartbeat.HeartbeatContext; import alluxio.heartbeat.HeartbeatThread; import alluxio.job.plan.persist.PersistConfig; @@ -717,30 +718,35 @@ public void start(Boolean isPrimary) throws IOException { getExecutorService().submit( new HeartbeatThread(HeartbeatContext.MASTER_BLOCK_INTEGRITY_CHECK, new BlockIntegrityChecker(this), () -> - Configuration.getMs(PropertyKey.MASTER_PERIODIC_BLOCK_INTEGRITY_CHECK_INTERVAL), + new FixedIntervalSupplier(Configuration.getMs( + PropertyKey.MASTER_PERIODIC_BLOCK_INTEGRITY_CHECK_INTERVAL)), Configuration.global(), mMasterContext.getUserState())); } getExecutorService().submit( new HeartbeatThread(HeartbeatContext.MASTER_TTL_CHECK, new InodeTtlChecker(this, mInodeTree), - () -> Configuration.getMs(PropertyKey.MASTER_TTL_CHECKER_INTERVAL_MS), + () -> new FixedIntervalSupplier( + Configuration.getMs(PropertyKey.MASTER_TTL_CHECKER_INTERVAL_MS)), Configuration.global(), mMasterContext.getUserState())); getExecutorService().submit( new HeartbeatThread(HeartbeatContext.MASTER_LOST_FILES_DETECTION, new LostFileDetector(this, mBlockMaster, mInodeTree), - () -> Configuration.getMs(PropertyKey.MASTER_LOST_WORKER_FILE_DETECTION_INTERVAL), + () -> new FixedIntervalSupplier( + Configuration.getMs(PropertyKey.MASTER_LOST_WORKER_FILE_DETECTION_INTERVAL)), Configuration.global(), mMasterContext.getUserState())); mReplicationCheckHeartbeatThread = new HeartbeatThread( HeartbeatContext.MASTER_REPLICATION_CHECK, new alluxio.master.file.replication.ReplicationChecker(mInodeTree, mBlockMaster, mSafeModeManager, mJobMasterClientPool), - () -> Configuration.getMs(PropertyKey.MASTER_REPLICATION_CHECK_INTERVAL_MS), + () -> new FixedIntervalSupplier( + Configuration.getMs(PropertyKey.MASTER_REPLICATION_CHECK_INTERVAL_MS)), Configuration.global(), mMasterContext.getUserState()); getExecutorService().submit(mReplicationCheckHeartbeatThread); getExecutorService().submit( new HeartbeatThread(HeartbeatContext.MASTER_PERSISTENCE_SCHEDULER, new PersistenceScheduler(), - () -> Configuration.getMs(PropertyKey.MASTER_PERSISTENCE_SCHEDULER_INTERVAL_MS), + () -> new FixedIntervalSupplier( + Configuration.getMs(PropertyKey.MASTER_PERSISTENCE_SCHEDULER_INTERVAL_MS)), Configuration.global(), mMasterContext.getUserState())); mPersistCheckerPool = new java.util.concurrent.ThreadPoolExecutor(PERSIST_CHECKER_POOL_THREADS, @@ -751,12 +757,14 @@ public void start(Boolean isPrimary) throws IOException { getExecutorService().submit( new HeartbeatThread(HeartbeatContext.MASTER_PERSISTENCE_CHECKER, new PersistenceChecker(), - () -> Configuration.getMs(PropertyKey.MASTER_PERSISTENCE_CHECKER_INTERVAL_MS), + () -> new FixedIntervalSupplier( + Configuration.getMs(PropertyKey.MASTER_PERSISTENCE_CHECKER_INTERVAL_MS)), Configuration.global(), mMasterContext.getUserState())); getExecutorService().submit( new HeartbeatThread(HeartbeatContext.MASTER_METRICS_TIME_SERIES, new TimeSeriesRecorder(), - () -> Configuration.getMs(PropertyKey.MASTER_METRICS_TIME_SERIES_INTERVAL), + () -> new FixedIntervalSupplier( + Configuration.getMs(PropertyKey.MASTER_METRICS_TIME_SERIES_INTERVAL)), Configuration.global(), mMasterContext.getUserState())); if (Configuration.getBoolean(PropertyKey.MASTER_AUDIT_LOGGING_ENABLED)) { mAsyncAuditLogWriter = new AsyncUserAccessAuditLogWriter("AUDIT_LOG"); @@ -769,7 +777,8 @@ public void start(Boolean isPrimary) throws IOException { if (Configuration.getBoolean(PropertyKey.UNDERFS_CLEANUP_ENABLED)) { getExecutorService().submit( new HeartbeatThread(HeartbeatContext.MASTER_UFS_CLEANUP, new UfsCleaner(this), - () -> Configuration.getMs(PropertyKey.UNDERFS_CLEANUP_INTERVAL), + () -> new FixedIntervalSupplier( + Configuration.getMs(PropertyKey.UNDERFS_CLEANUP_INTERVAL)), Configuration.global(), mMasterContext.getUserState())); } if (mAccessTimeUpdater != null) { @@ -4564,7 +4573,7 @@ private void handleReady(long fileId, JournalContext journalContext, AtomicInteg * @throws InterruptedException if the thread is interrupted */ @Override - public void heartbeat() throws InterruptedException { + public void heartbeat(long timeLimitMs) throws InterruptedException { LOG.debug("Async Persist heartbeat start"); java.util.concurrent.TimeUnit.SECONDS.sleep(mQuietPeriodSeconds); AtomicInteger journalCounter = new AtomicInteger(0); @@ -4867,7 +4876,7 @@ private void createParentPath(List inodes, String ufsPath, } @Override - public void heartbeat() throws InterruptedException { + public void heartbeat(long timeLimitMs) throws InterruptedException { boolean queueEmpty = mPersistCheckerPool.getQueue().isEmpty(); // Check the progress of persist jobs. for (long fileId : mPersistJobs.keySet()) { @@ -4955,7 +4964,7 @@ public void heartbeat() throws InterruptedException { @NotThreadSafe private final class TimeSeriesRecorder implements alluxio.heartbeat.HeartbeatExecutor { @Override - public void heartbeat() throws InterruptedException { + public void heartbeat(long timeLimitMs) throws InterruptedException { // TODO(calvin): Provide a better way to keep track of metrics collected as time series MetricRegistry registry = MetricsSystem.METRIC_REGISTRY; SortedMap gauges = registry.getGauges(); diff --git a/core/server/master/src/main/java/alluxio/master/file/InodeTtlChecker.java b/core/server/master/src/main/java/alluxio/master/file/InodeTtlChecker.java index 0c9cf4a76ab4..595322679c31 100644 --- a/core/server/master/src/main/java/alluxio/master/file/InodeTtlChecker.java +++ b/core/server/master/src/main/java/alluxio/master/file/InodeTtlChecker.java @@ -61,7 +61,7 @@ public InodeTtlChecker(FileSystemMaster fileSystemMaster, InodeTree inodeTree) { } @Override - public void heartbeat() throws InterruptedException { + public void heartbeat(long timeLimitMs) throws InterruptedException { Set expiredBuckets = mTtlBuckets.pollExpiredBuckets(System.currentTimeMillis()); Map failedInodesToRetryNum = new HashMap<>(); for (TtlBucket bucket : expiredBuckets) { diff --git a/core/server/master/src/main/java/alluxio/master/file/LostFileDetector.java b/core/server/master/src/main/java/alluxio/master/file/LostFileDetector.java index 535bec900ec9..9f25b8d8a857 100644 --- a/core/server/master/src/main/java/alluxio/master/file/LostFileDetector.java +++ b/core/server/master/src/main/java/alluxio/master/file/LostFileDetector.java @@ -59,7 +59,7 @@ public LostFileDetector(FileSystemMaster fileSystemMaster, BlockMaster blockMast } @Override - public void heartbeat() throws InterruptedException { + public void heartbeat(long timeLimitMs) throws InterruptedException { Iterator iter = mBlockMaster.getLostBlocksIterator(); Set toMarkFiles = new HashSet<>(); while (iter.hasNext()) { diff --git a/core/server/master/src/main/java/alluxio/master/file/UfsCleaner.java b/core/server/master/src/main/java/alluxio/master/file/UfsCleaner.java index bc9ab0ab6ef4..5d1261bff807 100644 --- a/core/server/master/src/main/java/alluxio/master/file/UfsCleaner.java +++ b/core/server/master/src/main/java/alluxio/master/file/UfsCleaner.java @@ -30,7 +30,7 @@ public UfsCleaner(FileSystemMaster fileSystemMaster) { } @Override - public void heartbeat() { + public void heartbeat(long timeLimitMs) { mFileSystemMaster.cleanupUfs(); } diff --git a/core/server/master/src/main/java/alluxio/master/file/activesync/ActiveSyncManager.java b/core/server/master/src/main/java/alluxio/master/file/activesync/ActiveSyncManager.java index 214c1ec72e67..6993b31027dd 100644 --- a/core/server/master/src/main/java/alluxio/master/file/activesync/ActiveSyncManager.java +++ b/core/server/master/src/main/java/alluxio/master/file/activesync/ActiveSyncManager.java @@ -17,6 +17,7 @@ import alluxio.conf.Configuration; import alluxio.conf.PropertyKey; import alluxio.exception.InvalidPathException; +import alluxio.heartbeat.FixedIntervalSupplier; import alluxio.heartbeat.HeartbeatContext; import alluxio.heartbeat.HeartbeatThread; import alluxio.master.file.FileSystemMaster; @@ -262,7 +263,8 @@ public void launchPollingThread(long mountId, long txId) { ActiveSyncer syncer = new ActiveSyncer(mFileSystemMaster, this, mMountTable, mountId); Future future = getExecutor().submit( new HeartbeatThread(HeartbeatContext.MASTER_ACTIVE_UFS_SYNC, - syncer, () -> Configuration.getMs(PropertyKey.MASTER_UFS_ACTIVE_SYNC_INTERVAL), + syncer, () -> new FixedIntervalSupplier( + Configuration.getMs(PropertyKey.MASTER_UFS_ACTIVE_SYNC_INTERVAL)), Configuration.global(), ServerUserState.global())); mPollerMap.put(mountId, future); } diff --git a/core/server/master/src/main/java/alluxio/master/file/activesync/ActiveSyncer.java b/core/server/master/src/main/java/alluxio/master/file/activesync/ActiveSyncer.java index 666da9434682..e9ba8aebec3c 100644 --- a/core/server/master/src/main/java/alluxio/master/file/activesync/ActiveSyncer.java +++ b/core/server/master/src/main/java/alluxio/master/file/activesync/ActiveSyncer.java @@ -74,7 +74,7 @@ public ActiveSyncer(FileSystemMaster fileSystemMaster, ActiveSyncManager syncMan } @Override - public void heartbeat() { + public void heartbeat(long timeLimitMs) { LOG.debug("start sync heartbeat for {} with mount id {}", mMountUri, mMountId); // Remove any previously completed sync tasks mSyncTasks.removeIf(Future::isDone); diff --git a/core/server/master/src/main/java/alluxio/master/file/replication/ReplicationChecker.java b/core/server/master/src/main/java/alluxio/master/file/replication/ReplicationChecker.java index 44e801dc29d9..d669f182bbdb 100644 --- a/core/server/master/src/main/java/alluxio/master/file/replication/ReplicationChecker.java +++ b/core/server/master/src/main/java/alluxio/master/file/replication/ReplicationChecker.java @@ -148,7 +148,7 @@ private boolean shouldRun() { * (2) Is there any blocks over replicated, schedule evict jobs to reduce the replication level. */ @Override - public void heartbeat() throws InterruptedException { + public void heartbeat(long timeLimitMs) throws InterruptedException { if (!shouldRun()) { return; } diff --git a/core/server/master/src/main/java/alluxio/master/meta/DefaultMetaMaster.java b/core/server/master/src/main/java/alluxio/master/meta/DefaultMetaMaster.java index 010e668acd5a..b1b8fe6bb393 100644 --- a/core/server/master/src/main/java/alluxio/master/meta/DefaultMetaMaster.java +++ b/core/server/master/src/main/java/alluxio/master/meta/DefaultMetaMaster.java @@ -35,6 +35,7 @@ import alluxio.grpc.RegisterMasterPOptions; import alluxio.grpc.Scope; import alluxio.grpc.ServiceType; +import alluxio.heartbeat.FixedIntervalSupplier; import alluxio.heartbeat.HeartbeatContext; import alluxio.heartbeat.HeartbeatExecutor; import alluxio.heartbeat.HeartbeatThread; @@ -307,13 +308,14 @@ public void start(Boolean isPrimary) throws IOException { getExecutorService().submit(new HeartbeatThread( HeartbeatContext.MASTER_LOST_MASTER_DETECTION, new LostMasterDetectionHeartbeatExecutor(), - () -> Configuration.getMs(PropertyKey.MASTER_STANDBY_HEARTBEAT_INTERVAL), + () -> new FixedIntervalSupplier( + Configuration.getMs(PropertyKey.MASTER_STANDBY_HEARTBEAT_INTERVAL)), Configuration.global(), mMasterContext.getUserState())); getExecutorService().submit( new HeartbeatThread(HeartbeatContext.MASTER_LOG_CONFIG_REPORT_SCHEDULING, new LogConfigReportHeartbeatExecutor(), - () -> Configuration - .getMs(PropertyKey.MASTER_LOG_CONFIG_REPORT_HEARTBEAT_INTERVAL), + () -> new FixedIntervalSupplier( + Configuration.getMs(PropertyKey.MASTER_LOG_CONFIG_REPORT_HEARTBEAT_INTERVAL)), Configuration.global(), mMasterContext.getUserState())); if (Configuration.getBoolean(PropertyKey.MASTER_DAILY_BACKUP_ENABLED)) { @@ -324,7 +326,8 @@ public void start(Boolean isPrimary) throws IOException { if (mJournalSpaceMonitor != null) { getExecutorService().submit(new HeartbeatThread( HeartbeatContext.MASTER_JOURNAL_SPACE_MONITOR, mJournalSpaceMonitor, - () -> Configuration.getMs(PropertyKey.MASTER_JOURNAL_SPACE_MONITOR_INTERVAL), + () -> new FixedIntervalSupplier( + Configuration.getMs(PropertyKey.MASTER_JOURNAL_SPACE_MONITOR_INTERVAL)), Configuration.global(), mMasterContext.getUserState())); } if (mState.getClusterID().equals(INVALID_CLUSTER_ID)) { @@ -337,7 +340,8 @@ public void start(Boolean isPrimary) throws IOException { && !Configuration.getBoolean(PropertyKey.TEST_MODE)) { getExecutorService().submit(new HeartbeatThread(HeartbeatContext.MASTER_UPDATE_CHECK, new UpdateChecker(this), - () -> Configuration.getMs(PropertyKey.MASTER_UPDATE_CHECK_INTERVAL), + () -> new FixedIntervalSupplier( + Configuration.getMs(PropertyKey.MASTER_UPDATE_CHECK_INTERVAL)), Configuration.global(), mMasterContext.getUserState())); } } else { @@ -352,7 +356,8 @@ public void start(Boolean isPrimary) throws IOException { .newBuilder(ClientContext.create(Configuration.global())).build()); getExecutorService().submit(new HeartbeatThread(HeartbeatContext.META_MASTER_SYNC, new MetaMasterSync(mMasterAddress, metaMasterClient), - () -> Configuration.getMs(PropertyKey.MASTER_STANDBY_HEARTBEAT_INTERVAL), + () -> new FixedIntervalSupplier( + Configuration.getMs(PropertyKey.MASTER_STANDBY_HEARTBEAT_INTERVAL)), Configuration.global(), mMasterContext.getUserState())); LOG.info("Standby master with address {} starts sending heartbeat to leader master.", mMasterAddress); @@ -714,7 +719,7 @@ public LostMasterDetectionHeartbeatExecutor() { } @Override - public void heartbeat() { + public void heartbeat(long timeLimitMs) { long masterTimeoutMs = Configuration.getMs(PropertyKey.MASTER_HEARTBEAT_TIMEOUT); for (MasterInfo master : mMasters) { synchronized (master) { @@ -743,7 +748,7 @@ private final class LogConfigReportHeartbeatExecutor implements HeartbeatExecuto private volatile boolean mFirst = true; @Override - public void heartbeat() { + public void heartbeat(long timeLimitMs) { // Skip the first heartbeat since it happens before servers have time to register their // configurations. if (mFirst) { diff --git a/core/server/master/src/main/java/alluxio/master/meta/JournalSpaceMonitor.java b/core/server/master/src/main/java/alluxio/master/meta/JournalSpaceMonitor.java index 8b74f695e6a9..d917be9e348f 100644 --- a/core/server/master/src/main/java/alluxio/master/meta/JournalSpaceMonitor.java +++ b/core/server/master/src/main/java/alluxio/master/meta/JournalSpaceMonitor.java @@ -169,7 +169,7 @@ public List getJournalDiskWarnings() { } @Override - public void heartbeat() throws InterruptedException { + public void heartbeat(long timeLimitMs) throws InterruptedException { getJournalDiskWarnings().forEach(LOG::warn); } diff --git a/core/server/master/src/main/java/alluxio/master/meta/MetaMasterSync.java b/core/server/master/src/main/java/alluxio/master/meta/MetaMasterSync.java index f793f2d7fa34..3b246cefae15 100644 --- a/core/server/master/src/main/java/alluxio/master/meta/MetaMasterSync.java +++ b/core/server/master/src/main/java/alluxio/master/meta/MetaMasterSync.java @@ -62,7 +62,7 @@ public MetaMasterSync(Address masterAddress, RetryHandlingMetaMasterMasterClient * Heartbeats to the leader master node. */ @Override - public void heartbeat() { + public void heartbeat(long timeLimitMs) { MetaCommand command = null; try { if (mMasterId.get() == UNINITIALIZED_MASTER_ID) { diff --git a/core/server/master/src/main/java/alluxio/master/meta/UpdateChecker.java b/core/server/master/src/main/java/alluxio/master/meta/UpdateChecker.java index d7d75f837014..7bfdfb6e77c2 100644 --- a/core/server/master/src/main/java/alluxio/master/meta/UpdateChecker.java +++ b/core/server/master/src/main/java/alluxio/master/meta/UpdateChecker.java @@ -45,7 +45,7 @@ public UpdateChecker(DefaultMetaMaster metaMaster) { * Heartbeat for the periodic update check. */ @Override - public void heartbeat() { + public void heartbeat(long timeLimitMs) { try { List additionalInfo = new ArrayList<>(); int clusterSize = mMetaMaster.getWorkerAddresses().size(); diff --git a/core/server/master/src/main/java/alluxio/master/metrics/DefaultMetricsMaster.java b/core/server/master/src/main/java/alluxio/master/metrics/DefaultMetricsMaster.java index bf65ad6d2449..3ccbb8c7aba1 100644 --- a/core/server/master/src/main/java/alluxio/master/metrics/DefaultMetricsMaster.java +++ b/core/server/master/src/main/java/alluxio/master/metrics/DefaultMetricsMaster.java @@ -18,6 +18,7 @@ import alluxio.grpc.GrpcService; import alluxio.grpc.MetricValue; import alluxio.grpc.ServiceType; +import alluxio.heartbeat.FixedIntervalSupplier; import alluxio.heartbeat.HeartbeatContext; import alluxio.heartbeat.HeartbeatExecutor; import alluxio.heartbeat.HeartbeatThread; @@ -180,7 +181,8 @@ public void start(Boolean isLeader) throws IOException { if (isLeader) { getExecutorService().submit(new HeartbeatThread( HeartbeatContext.MASTER_CLUSTER_METRICS_UPDATER, new ClusterMetricsUpdater(), - () -> Configuration.getMs(PropertyKey.MASTER_CLUSTER_METRICS_UPDATE_INTERVAL), + () -> new FixedIntervalSupplier( + Configuration.getMs(PropertyKey.MASTER_CLUSTER_METRICS_UPDATE_INTERVAL)), Configuration.global(), mMasterContext.getUserState())); } } @@ -215,7 +217,7 @@ public Map getMetrics() { */ private class ClusterMetricsUpdater implements HeartbeatExecutor { @Override - public void heartbeat() throws InterruptedException { + public void heartbeat(long timeLimitMs) throws InterruptedException { updateMultiValueMasterMetrics(); } diff --git a/core/server/master/src/main/java/alluxio/master/throttle/DefaultThrottleMaster.java b/core/server/master/src/main/java/alluxio/master/throttle/DefaultThrottleMaster.java index 70ee98d0b85c..ef5eee6f489c 100644 --- a/core/server/master/src/main/java/alluxio/master/throttle/DefaultThrottleMaster.java +++ b/core/server/master/src/main/java/alluxio/master/throttle/DefaultThrottleMaster.java @@ -19,6 +19,7 @@ import alluxio.conf.PropertyKey; import alluxio.grpc.GrpcService; import alluxio.grpc.ServiceType; +import alluxio.heartbeat.FixedIntervalSupplier; import alluxio.heartbeat.HeartbeatContext; import alluxio.heartbeat.HeartbeatExecutor; import alluxio.heartbeat.HeartbeatThread; @@ -109,7 +110,8 @@ public void start(Boolean isLeader) throws IOException { LOG.info("Starting {}", getName()); mThrottleService = getExecutorService().submit( new HeartbeatThread(HeartbeatContext.MASTER_THROTTLE, mThrottleExecutor, - () -> Configuration.getMs(PropertyKey.MASTER_THROTTLE_HEARTBEAT_INTERVAL), + () -> new FixedIntervalSupplier( + Configuration.getMs(PropertyKey.MASTER_THROTTLE_HEARTBEAT_INTERVAL)), Configuration.global(), mMasterContext.getUserState())); LOG.info("{} is started", getName()); @@ -141,7 +143,7 @@ public ThrottleExecutor(MasterProcess masterProcess) { } @Override - public void heartbeat() throws InterruptedException { + public void heartbeat(long timeLimitMs) throws InterruptedException { mSystemMonitor.run(); } diff --git a/core/server/master/src/test/java/alluxio/master/file/replication/ReplicationCheckerTest.java b/core/server/master/src/test/java/alluxio/master/file/replication/ReplicationCheckerTest.java index 40bab536216f..f49c504db33c 100644 --- a/core/server/master/src/test/java/alluxio/master/file/replication/ReplicationCheckerTest.java +++ b/core/server/master/src/test/java/alluxio/master/file/replication/ReplicationCheckerTest.java @@ -281,7 +281,7 @@ private void heartbeatToAddLocationHelper(long blockId, long workerId) throws Ex @Test public void heartbeatWhenTreeIsEmpty() throws Exception { - mReplicationChecker.heartbeat(); + mReplicationChecker.heartbeat(Long.MAX_VALUE); Assert.assertEquals(EMPTY, mMockReplicationHandler.getSetReplicaRequests()); } @@ -292,17 +292,17 @@ public void heartbeatFileWithinRange() throws Exception { createBlockHelper(TEST_FILE_1, mFileContext, ""); // One replica, meeting replication min addBlockLocationHelper(blockId, 1); - mReplicationChecker.heartbeat(); + mReplicationChecker.heartbeat(Long.MAX_VALUE); Assert.assertEquals(EMPTY, mMockReplicationHandler.getSetReplicaRequests()); // Two replicas, good heartbeatToAddLocationHelper(blockId, createWorkerHelper(1)); - mReplicationChecker.heartbeat(); + mReplicationChecker.heartbeat(Long.MAX_VALUE); Assert.assertEquals(EMPTY, mMockReplicationHandler.getSetReplicaRequests()); // Three replicas, meeting replication max, still good heartbeatToAddLocationHelper(blockId, createWorkerHelper(2)); - mReplicationChecker.heartbeat(); + mReplicationChecker.heartbeat(Long.MAX_VALUE); Assert.assertEquals(EMPTY, mMockReplicationHandler.getSetReplicaRequests()); } @@ -311,7 +311,7 @@ public void heartbeatFileUnderReplicatedBy1() throws Exception { mFileContext.getOptions().setReplicationMin(1); long blockId = createBlockHelper(TEST_FILE_1, mFileContext, ""); - mReplicationChecker.heartbeat(); + mReplicationChecker.heartbeat(Long.MAX_VALUE); Map expected = ImmutableMap.of(blockId, 1); Assert.assertEquals(expected, mMockReplicationHandler.getSetReplicaRequests()); } @@ -322,7 +322,7 @@ public void heartbeatFileNeedsMove() throws Exception { long blockId = createBlockHelper(TEST_FILE_1, mFileContext, Constants.MEDIUM_SSD); addBlockLocationHelper(blockId, 1); - mReplicationChecker.heartbeat(); + mReplicationChecker.heartbeat(Long.MAX_VALUE); Map> expected = ImmutableMap.of(blockId, new Pair<>("host0", Constants.MEDIUM_SSD)); Assert.assertEquals(EMPTY, mMockReplicationHandler.getSetReplicaRequests()); @@ -335,7 +335,7 @@ public void heartbeatFileDoesnotNeedMove() throws Exception { long blockId = createBlockHelper(TEST_FILE_1, mFileContext, Constants.MEDIUM_MEM); addBlockLocationHelper(blockId, 1); - mReplicationChecker.heartbeat(); + mReplicationChecker.heartbeat(Long.MAX_VALUE); Assert.assertEquals(EMPTY, mMockReplicationHandler.getSetReplicaRequests()); Assert.assertEquals(EMPTY, mMockReplicationHandler.getMigrateRequests()); } @@ -345,7 +345,7 @@ public void heartbeatFileUnderReplicatedBy10() throws Exception { mFileContext.getOptions().setReplicationMin(10); long blockId = createBlockHelper(TEST_FILE_1, mFileContext, ""); - mReplicationChecker.heartbeat(); + mReplicationChecker.heartbeat(Long.MAX_VALUE); Map expected = ImmutableMap.of(blockId, 10); Assert.assertEquals(expected, mMockReplicationHandler.getSetReplicaRequests()); } @@ -357,7 +357,7 @@ public void heartbeatMultipleFilesUnderReplicated() throws Exception { mFileContext.getOptions().setReplicationMin(2); long blockId2 = createBlockHelper(TEST_FILE_2, mFileContext, ""); - mReplicationChecker.heartbeat(); + mReplicationChecker.heartbeat(Long.MAX_VALUE); Map expected = ImmutableMap.of(blockId1, 1, blockId2, 2); Assert.assertEquals(expected, mMockReplicationHandler.getSetReplicaRequests()); } @@ -382,7 +382,7 @@ public void heartbeatFileUnderReplicatedAndLost() throws Exception { ImmutableMap.of(Constants.MEDIUM_MEM, 0L), ImmutableList.of(blockId), NO_BLOCKS_ON_LOCATION, NO_LOST_STORAGE, NO_METRICS); - mReplicationChecker.heartbeat(); + mReplicationChecker.heartbeat(Long.MAX_VALUE); Assert.assertEquals(EMPTY, mMockReplicationHandler.getSetReplicaRequests()); } @@ -392,7 +392,7 @@ public void heartbeatFileOverReplicatedBy1() throws Exception { long blockId = createBlockHelper(TEST_FILE_1, mFileContext, ""); addBlockLocationHelper(blockId, 2); - mReplicationChecker.heartbeat(); + mReplicationChecker.heartbeat(Long.MAX_VALUE); Map expected = ImmutableMap.of(blockId, 1); Assert.assertEquals(expected, mMockReplicationHandler.getSetReplicaRequests()); } @@ -403,7 +403,7 @@ public void heartbeatFileOverReplicatedBy10() throws Exception { long blockId = createBlockHelper(TEST_FILE_1, mFileContext, ""); addBlockLocationHelper(blockId, 11); - mReplicationChecker.heartbeat(); + mReplicationChecker.heartbeat(Long.MAX_VALUE); Map expected = ImmutableMap.of(blockId, 1); Assert.assertEquals(expected, mMockReplicationHandler.getSetReplicaRequests()); } @@ -417,7 +417,7 @@ public void heartbeatMultipleFilesOverReplicated() throws Exception { addBlockLocationHelper(blockId1, 2); addBlockLocationHelper(blockId2, 4); - mReplicationChecker.heartbeat(); + mReplicationChecker.heartbeat(Long.MAX_VALUE); Map expected = ImmutableMap.of(blockId1, 1, blockId2, 2); Assert.assertEquals(expected, mMockReplicationHandler.getSetReplicaRequests()); } @@ -431,7 +431,7 @@ public void heartbeatFilesUnderAndOverReplicated() throws Exception { addBlockLocationHelper(blockId1, 1); addBlockLocationHelper(blockId2, 5); - mReplicationChecker.heartbeat(); + mReplicationChecker.heartbeat(Long.MAX_VALUE); Map expected1 = ImmutableMap.of(blockId1, 2, blockId2, 3); Assert.assertEquals(expected1, mMockReplicationHandler.getSetReplicaRequests()); } @@ -449,7 +449,7 @@ public void heartbeatPartial() throws Exception { addBlockLocationHelper(blockId2, 1); addBlockLocationHelper(blockId3, 1); - mReplicationChecker.heartbeat(); + mReplicationChecker.heartbeat(Long.MAX_VALUE); final Map replicateRequests = mMockReplicationHandler.getSetReplicaRequests(); System.out.println(replicateRequests); Assert.assertEquals(2, replicateRequests.size()); @@ -459,11 +459,11 @@ public void heartbeatPartial() throws Exception { mMockReplicationHandler.setJobStatus(1, Status.RUNNING); mMockReplicationHandler.setJobStatus(2, Status.RUNNING); - mReplicationChecker.heartbeat(); + mReplicationChecker.heartbeat(Long.MAX_VALUE); Assert.assertEquals(0, replicateRequests.size()); mMockReplicationHandler.setJobStatus(1, Status.FAILED); - mReplicationChecker.heartbeat(); + mReplicationChecker.heartbeat(Long.MAX_VALUE); Assert.assertEquals(1, replicateRequests.size()); Assert.assertEquals(3, replicateRequests.values().toArray()[0]); @@ -473,7 +473,7 @@ public void heartbeatPartial() throws Exception { mMockReplicationHandler.setJobStatus(2, Status.COMPLETED); mMockReplicationHandler.setJobStatus(3, Status.COMPLETED); - mReplicationChecker.heartbeat(); + mReplicationChecker.heartbeat(Long.MAX_VALUE); Assert.assertEquals(1, replicateRequests.size()); Assert.assertTrue(replicateRequests.containsKey(blockId3)); Assert.assertEquals(3, replicateRequests.values().toArray()[0]); diff --git a/core/server/master/src/test/java/alluxio/master/meta/JournalSpaceMonitorTest.java b/core/server/master/src/test/java/alluxio/master/meta/JournalSpaceMonitorTest.java index eb638ae88800..8054599ee0a6 100644 --- a/core/server/master/src/test/java/alluxio/master/meta/JournalSpaceMonitorTest.java +++ b/core/server/master/src/test/java/alluxio/master/meta/JournalSpaceMonitorTest.java @@ -82,7 +82,7 @@ public void testLoggingPositive() throws IOException, InterruptedException { JournalSpaceMonitor monitor = Mockito.spy( new JournalSpaceMonitor(Paths.get(".").toAbsolutePath().toString(), 90)); doReturn(new CommandReturn(0, CMD_RETURN_MOCK)).when(monitor).getRawDiskInfo(); - monitor.heartbeat(); + monitor.heartbeat(Long.MAX_VALUE); assertTrue(mLogger.wasLoggedWithLevel("The journal disk /dev/nvme0n1p2 backing the journal " + "has only .* space left", Level.WARN)); } @@ -92,7 +92,7 @@ public void testLoggingNegative() throws IOException, InterruptedException { JournalSpaceMonitor monitor = Mockito.spy( new JournalSpaceMonitor(Paths.get(".").toAbsolutePath().toString(), 10)); doReturn(new CommandReturn(0, CMD_RETURN_MOCK)).when(monitor).getRawDiskInfo(); - monitor.heartbeat(); + monitor.heartbeat(Long.MAX_VALUE); assertFalse(mLogger.wasLoggedWithLevel("The journal disk /dev/nvme0n1p2 backing the journal " + "has only .* space left", Level.WARN)); } diff --git a/core/server/worker/src/main/java/alluxio/worker/block/BlockMasterSync.java b/core/server/worker/src/main/java/alluxio/worker/block/BlockMasterSync.java index b372c7f84ac8..3ac632238cc2 100644 --- a/core/server/worker/src/main/java/alluxio/worker/block/BlockMasterSync.java +++ b/core/server/worker/src/main/java/alluxio/worker/block/BlockMasterSync.java @@ -117,7 +117,7 @@ private void registerWithMaster() throws IOException { * Heartbeats to the master node about the change in the worker's managed space. */ @Override - public void heartbeat() { + public void heartbeat(long timeLimitMs) { boolean success = mBlockMasterSyncHelper.heartbeat( mWorkerId.get(), mBlockWorker.getReport(), mBlockWorker.getStoreMeta(), this::handleMasterCommand); diff --git a/core/server/worker/src/main/java/alluxio/worker/block/BlockSyncMasterGroup.java b/core/server/worker/src/main/java/alluxio/worker/block/BlockSyncMasterGroup.java index 6abc313fc1d1..ba9758da143a 100644 --- a/core/server/worker/src/main/java/alluxio/worker/block/BlockSyncMasterGroup.java +++ b/core/server/worker/src/main/java/alluxio/worker/block/BlockSyncMasterGroup.java @@ -15,6 +15,7 @@ import alluxio.ProcessUtils; import alluxio.conf.Configuration; import alluxio.conf.PropertyKey; +import alluxio.heartbeat.FixedIntervalSupplier; import alluxio.heartbeat.HeartbeatContext; import alluxio.heartbeat.HeartbeatThread; import alluxio.master.MasterClientContext; @@ -91,7 +92,8 @@ public synchronized void start(ExecutorService executorService) { } mMasterSyncOperators.values().forEach(blockMasterSync -> executorService .submit(new HeartbeatThread(HeartbeatContext.WORKER_BLOCK_SYNC, blockMasterSync, - () -> Configuration.getMs(PropertyKey.WORKER_BLOCK_HEARTBEAT_INTERVAL_MS), + () -> new FixedIntervalSupplier( + Configuration.getMs(PropertyKey.WORKER_BLOCK_HEARTBEAT_INTERVAL_MS)), Configuration.global(), ServerUserState.global()))); } diff --git a/core/server/worker/src/main/java/alluxio/worker/block/DefaultBlockWorker.java b/core/server/worker/src/main/java/alluxio/worker/block/DefaultBlockWorker.java index 0dd50a6978c5..fcba47cedbe6 100644 --- a/core/server/worker/src/main/java/alluxio/worker/block/DefaultBlockWorker.java +++ b/core/server/worker/src/main/java/alluxio/worker/block/DefaultBlockWorker.java @@ -38,6 +38,7 @@ import alluxio.grpc.GrpcService; import alluxio.grpc.ServiceType; import alluxio.grpc.UfsReadOptions; +import alluxio.heartbeat.FixedIntervalSupplier; import alluxio.heartbeat.HeartbeatContext; import alluxio.heartbeat.HeartbeatExecutor; import alluxio.heartbeat.HeartbeatThread; @@ -223,7 +224,8 @@ public void start(WorkerNetAddress address) throws IOException { new PinListSync(this, mFileSystemMasterClient)); getExecutorService() .submit(new HeartbeatThread(HeartbeatContext.WORKER_PIN_LIST_SYNC, pinListSync, - () -> Configuration.getMs(PropertyKey.WORKER_BLOCK_HEARTBEAT_INTERVAL_MS), + () -> new FixedIntervalSupplier( + Configuration.getMs(PropertyKey.WORKER_BLOCK_HEARTBEAT_INTERVAL_MS)), Configuration.global(), ServerUserState.global())); // Setup session cleaner @@ -236,7 +238,8 @@ public void start(WorkerNetAddress address) throws IOException { StorageChecker storageChecker = mResourceCloser.register(new StorageChecker()); getExecutorService() .submit(new HeartbeatThread(HeartbeatContext.WORKER_STORAGE_HEALTH, storageChecker, - () -> Configuration.getMs(PropertyKey.WORKER_BLOCK_HEARTBEAT_INTERVAL_MS), + () -> new FixedIntervalSupplier( + Configuration.getMs(PropertyKey.WORKER_BLOCK_HEARTBEAT_INTERVAL_MS)), Configuration.global(), ServerUserState.global())); } @@ -251,7 +254,8 @@ protected void setupBlockMasterSync() throws IOException { .register(new BlockMasterSync(this, mWorkerId, mAddress, mBlockMasterClientPool)); getExecutorService() .submit(new HeartbeatThread(HeartbeatContext.WORKER_BLOCK_SYNC, blockMasterSync, - () -> Configuration.getMs(PropertyKey.WORKER_BLOCK_HEARTBEAT_INTERVAL_MS), + () -> new FixedIntervalSupplier( + Configuration.getMs(PropertyKey.WORKER_BLOCK_HEARTBEAT_INTERVAL_MS)), Configuration.global(), ServerUserState.global())); } @@ -568,7 +572,7 @@ private Metrics() {} // prevent instantiation public final class StorageChecker implements HeartbeatExecutor { @Override - public void heartbeat() { + public void heartbeat(long timeLimitMs) { try { mBlockStore.removeInaccessibleStorage(); } catch (Exception e) { diff --git a/core/server/worker/src/main/java/alluxio/worker/block/PinListSync.java b/core/server/worker/src/main/java/alluxio/worker/block/PinListSync.java index a85a50092a3c..67ac89a7357d 100644 --- a/core/server/worker/src/main/java/alluxio/worker/block/PinListSync.java +++ b/core/server/worker/src/main/java/alluxio/worker/block/PinListSync.java @@ -47,7 +47,7 @@ public PinListSync(BlockWorker blockWorker, FileSystemMasterClient masterClient) } @Override - public void heartbeat() { + public void heartbeat(long timeLimitMs) { // Send the sync try { Set pinList = mMasterClient.getPinList(); diff --git a/core/server/worker/src/main/java/alluxio/worker/block/SpecificMasterBlockSync.java b/core/server/worker/src/main/java/alluxio/worker/block/SpecificMasterBlockSync.java index 3c9aeea0b491..660e0735c785 100644 --- a/core/server/worker/src/main/java/alluxio/worker/block/SpecificMasterBlockSync.java +++ b/core/server/worker/src/main/java/alluxio/worker/block/SpecificMasterBlockSync.java @@ -182,7 +182,7 @@ private RetryPolicy createEndlessRetry() { } @Override - public synchronized void heartbeat() throws InterruptedException { + public synchronized void heartbeat(long runLimit) throws InterruptedException { if (mWorkerState == WorkerMasterRegistrationState.NOT_REGISTERED) { // Not registered because: // 1. The worker just started, we kick off the 1st registration here. diff --git a/core/server/worker/src/test/java/alluxio/worker/block/PinListSyncTest.java b/core/server/worker/src/test/java/alluxio/worker/block/PinListSyncTest.java index 2e8b44920ef6..dae0717ffef1 100644 --- a/core/server/worker/src/test/java/alluxio/worker/block/PinListSyncTest.java +++ b/core/server/worker/src/test/java/alluxio/worker/block/PinListSyncTest.java @@ -44,7 +44,7 @@ public Set getPinList() { }; PinListSync sync = new PinListSync(mBlockWorker, client); - sync.heartbeat(); + sync.heartbeat(Long.MAX_VALUE); // should receive the latest pin list assertEquals(testPinLists, mBlockWorker.getPinList()); @@ -62,7 +62,7 @@ public Set getPinList() throws IOException { PinListSync sync = new PinListSync(mBlockWorker, client); // should fail - sync.heartbeat(); + sync.heartbeat(Long.MAX_VALUE); // should not get any pin list update assertEquals(ImmutableSet.of(), mBlockWorker.getPinList()); diff --git a/core/server/worker/src/test/java/alluxio/worker/block/SpecificMasterBlockSyncTest.java b/core/server/worker/src/test/java/alluxio/worker/block/SpecificMasterBlockSyncTest.java index cf02f215f52a..e88385f2ae56 100644 --- a/core/server/worker/src/test/java/alluxio/worker/block/SpecificMasterBlockSyncTest.java +++ b/core/server/worker/src/test/java/alluxio/worker/block/SpecificMasterBlockSyncTest.java @@ -63,24 +63,24 @@ public void heartbeatThread() throws Exception { assertFalse(sync.isRegistered()); // heartbeat registers the worker if it has not been registered. - sync.heartbeat(); + sync.heartbeat(Long.MAX_VALUE); assertTrue(sync.isRegistered()); // heartbeat returning register command resets the worker state. Configuration.set(PropertyKey.WORKER_REGISTER_STREAM_ENABLED, true); TestBlockMasterClient.INSTANCE.setReturnRegisterCommand(true); - sync.heartbeat(); + sync.heartbeat(Long.MAX_VALUE); TestBlockMasterClient.INSTANCE.setReturnRegisterCommand(false); assertFalse(sync.isRegistered()); Configuration.set(PropertyKey.WORKER_REGISTER_STREAM_ENABLED, false); TestBlockMasterClient.INSTANCE.setReturnRegisterCommand(true); - sync.heartbeat(); + sync.heartbeat(Long.MAX_VALUE); TestBlockMasterClient.INSTANCE.setReturnRegisterCommand(false); assertFalse(sync.isRegistered()); // heartbeat registers the worker if it has not been registered. - sync.heartbeat(); + sync.heartbeat(Long.MAX_VALUE); assertTrue(sync.isRegistered()); // TestBlockHeartbeatReporter generates the report with one more removed block id each time. @@ -88,7 +88,7 @@ public void heartbeatThread() throws Exception { // heartbeatReportCapacityThreshold is 3. TestBlockMasterClient.INSTANCE.mHeartbeatCallCount = 0; TestBlockMasterClient.INSTANCE.setHeartbeatError(true); - sync.heartbeat(); + sync.heartbeat(Long.MAX_VALUE); assertFalse(sync.isRegistered()); assertEquals( heartbeatReportCapacityThreshold, TestBlockMasterClient.INSTANCE.mHeartbeatCallCount); @@ -96,7 +96,7 @@ public void heartbeatThread() throws Exception { // registration should happen on the next heartbeat and the reporter should be cleared, // except the newly generated ones. TestBlockMasterClient.INSTANCE.setHeartbeatError(false); - sync.heartbeat(); + sync.heartbeat(Long.MAX_VALUE); assertTrue(sync.isRegistered()); assertEquals(1, blockHeartbeatReporter.generateReportAndClear().getBlockChangeCount()); diff --git a/integration/fuse/src/main/java/alluxio/fuse/AlluxioFuse.java b/integration/fuse/src/main/java/alluxio/fuse/AlluxioFuse.java index 8ec0aa9048c8..ab6bdb529444 100644 --- a/integration/fuse/src/main/java/alluxio/fuse/AlluxioFuse.java +++ b/integration/fuse/src/main/java/alluxio/fuse/AlluxioFuse.java @@ -27,6 +27,7 @@ import alluxio.exception.runtime.InvalidArgumentRuntimeException; import alluxio.fuse.meta.UpdateChecker; import alluxio.fuse.options.FuseOptions; +import alluxio.heartbeat.FixedIntervalSupplier; import alluxio.heartbeat.HeartbeatContext; import alluxio.heartbeat.HeartbeatThread; import alluxio.jnifuse.LibFuse; @@ -178,7 +179,7 @@ public static void main(String[] args) throws ParseException { if (fuseOptions.updateCheckEnabled()) { executor = Executors.newSingleThreadExecutor(); executor.submit(new HeartbeatThread(HeartbeatContext.FUSE_UPDATE_CHECK, - UpdateChecker.create(fuseOptions), () -> Long.valueOf(Constants.DAY_MS), + UpdateChecker.create(fuseOptions), () -> new FixedIntervalSupplier(Constants.DAY_MS), Configuration.global(), UserState.Factory.create(conf))); } try (FileSystem fs = FileSystem.Factory.create(fsContext, fuseOptions.getFileSystemOptions())) { diff --git a/integration/fuse/src/main/java/alluxio/fuse/meta/UpdateChecker.java b/integration/fuse/src/main/java/alluxio/fuse/meta/UpdateChecker.java index 802ebd1ef434..bfcc6ca93f13 100644 --- a/integration/fuse/src/main/java/alluxio/fuse/meta/UpdateChecker.java +++ b/integration/fuse/src/main/java/alluxio/fuse/meta/UpdateChecker.java @@ -79,7 +79,7 @@ private UpdateChecker(List unchangeableFuseInfo, Map fuseO * Heartbeat for the periodic update check. */ @Override - public void heartbeat() { + public void heartbeat(long timeLimitMs) { try { String latestVersion = UpdateCheck.getLatestVersion(mInstanceId, getFuseCheckInfo(), diff --git a/job/server/src/main/java/alluxio/master/job/JobMaster.java b/job/server/src/main/java/alluxio/master/job/JobMaster.java index ae99321ca928..bc2782e01bfb 100644 --- a/job/server/src/main/java/alluxio/master/job/JobMaster.java +++ b/job/server/src/main/java/alluxio/master/job/JobMaster.java @@ -28,6 +28,7 @@ import alluxio.grpc.ListAllPOptions; import alluxio.grpc.RegisterCommand; import alluxio.grpc.ServiceType; +import alluxio.heartbeat.FixedIntervalSupplier; import alluxio.heartbeat.HeartbeatContext; import alluxio.heartbeat.HeartbeatExecutor; import alluxio.heartbeat.HeartbeatThread; @@ -199,7 +200,8 @@ public void start(Boolean isLeader) throws IOException { getExecutorService() .submit(new HeartbeatThread(HeartbeatContext.JOB_MASTER_LOST_WORKER_DETECTION, new LostWorkerDetectionHeartbeatExecutor(), - () -> Configuration.getMs(PropertyKey.JOB_MASTER_LOST_WORKER_INTERVAL), + () -> new FixedIntervalSupplier( + Configuration.getMs(PropertyKey.JOB_MASTER_LOST_WORKER_INTERVAL)), Configuration.global(), mMasterContext.getUserState())); if (Configuration.getBoolean(PropertyKey.MASTER_AUDIT_LOGGING_ENABLED)) { mAsyncAuditLogWriter = new AsyncUserAccessAuditLogWriter("JOB_MASTER_AUDIT_LOG"); @@ -694,7 +696,7 @@ private final class LostWorkerDetectionHeartbeatExecutor implements HeartbeatExe public LostWorkerDetectionHeartbeatExecutor() {} @Override - public void heartbeat() { + public void heartbeat(long timeLimitMs) { int masterWorkerTimeoutMs = (int) Configuration .getMs(PropertyKey.JOB_MASTER_WORKER_TIMEOUT); List lostWorkers = new ArrayList<>(); diff --git a/job/server/src/main/java/alluxio/worker/JobWorker.java b/job/server/src/main/java/alluxio/worker/JobWorker.java index aec996509b95..29a6cc054772 100644 --- a/job/server/src/main/java/alluxio/worker/JobWorker.java +++ b/job/server/src/main/java/alluxio/worker/JobWorker.java @@ -21,6 +21,7 @@ import alluxio.exception.ConnectionFailedException; import alluxio.grpc.GrpcService; import alluxio.grpc.ServiceType; +import alluxio.heartbeat.FixedIntervalSupplier; import alluxio.heartbeat.HeartbeatContext; import alluxio.heartbeat.HeartbeatThread; import alluxio.job.JobServerContext; @@ -107,7 +108,8 @@ public void start(WorkerNetAddress address) throws IOException { new HeartbeatThread(HeartbeatContext.JOB_WORKER_COMMAND_HANDLING, new CommandHandlingExecutor(mJobServerContext, taskExecutorManager, mJobMasterClient, address), - () -> Configuration.getMs(PropertyKey.JOB_MASTER_WORKER_HEARTBEAT_INTERVAL), + () -> new FixedIntervalSupplier( + Configuration.getMs(PropertyKey.JOB_MASTER_WORKER_HEARTBEAT_INTERVAL)), Configuration.global(), ServerUserState.global())); } diff --git a/job/server/src/main/java/alluxio/worker/job/command/CommandHandlingExecutor.java b/job/server/src/main/java/alluxio/worker/job/command/CommandHandlingExecutor.java index c52db6c0ff58..4d14e2418532 100644 --- a/job/server/src/main/java/alluxio/worker/job/command/CommandHandlingExecutor.java +++ b/job/server/src/main/java/alluxio/worker/job/command/CommandHandlingExecutor.java @@ -83,7 +83,7 @@ public CommandHandlingExecutor(JobServerContext jobServerContext, } @Override - public void heartbeat() { + public void heartbeat(long timeLimitMs) { JobWorkerHealthReporter.JobWorkerHealthReport jobWorkerHealthReport = mHealthReporter.getJobWorkerHealthReport(); diff --git a/job/server/src/test/java/alluxio/job/command/CommandHandlingExecutorTest.java b/job/server/src/test/java/alluxio/job/command/CommandHandlingExecutorTest.java index 95310ff7c92b..15c2d804e916 100644 --- a/job/server/src/test/java/alluxio/job/command/CommandHandlingExecutorTest.java +++ b/job/server/src/test/java/alluxio/job/command/CommandHandlingExecutorTest.java @@ -86,7 +86,7 @@ public void heartbeat() throws Exception { Mockito.when(mJobMasterClient.heartbeat(any(JobWorkerHealth.class), eq(Lists.newArrayList()))) .thenReturn(Lists.newArrayList(command.build())); - mCommandHandlingExecutor.heartbeat(); + mCommandHandlingExecutor.heartbeat(Long.MAX_VALUE); ExecutorService executorService = AlluxioMockUtil.getInternalState( mCommandHandlingExecutor, "mCommandHandlingService"); executorService.shutdown(); diff --git a/table/server/master/src/main/java/alluxio/master/table/transform/TransformManager.java b/table/server/master/src/main/java/alluxio/master/table/transform/TransformManager.java index e5a24c5715be..ba7b9bab3a65 100644 --- a/table/server/master/src/main/java/alluxio/master/table/transform/TransformManager.java +++ b/table/server/master/src/main/java/alluxio/master/table/transform/TransformManager.java @@ -18,6 +18,7 @@ import alluxio.exception.ExceptionMessage; import alluxio.exception.status.NotFoundException; import alluxio.exception.status.UnavailableException; +import alluxio.heartbeat.FixedIntervalSupplier; import alluxio.heartbeat.HeartbeatContext; import alluxio.heartbeat.HeartbeatExecutor; import alluxio.heartbeat.HeartbeatThread; @@ -135,7 +136,8 @@ public TransformManager( public void start(ExecutorService executorService, UserState userState) { executorService.submit( new HeartbeatThread(HeartbeatContext.MASTER_TABLE_TRANSFORMATION_MONITOR, new JobMonitor(), - () -> Configuration.getMs(PropertyKey.TABLE_TRANSFORM_MANAGER_JOB_MONITOR_INTERVAL), + () -> new FixedIntervalSupplier( + Configuration.getMs(PropertyKey.TABLE_TRANSFORM_MANAGER_JOB_MONITOR_INTERVAL)), Configuration.global(), userState)); } @@ -300,7 +302,7 @@ private void handleJobSuccess(TransformJobInfo job) { } @Override - public void heartbeat() throws InterruptedException { + public void heartbeat(long timeLimitMs) throws InterruptedException { for (TransformJobInfo job : mState.getRunningJobs()) { if (Thread.currentThread().isInterrupted()) { throw new InterruptedException("TransformManager's heartbeat was interrupted"); diff --git a/tests/src/test/java/alluxio/client/fs/BlockMasterDeleteLostWorkerIntegrationTest.java b/tests/src/test/java/alluxio/client/fs/BlockMasterDeleteLostWorkerIntegrationTest.java index d9aa14700b79..ef11c38eb6fe 100644 --- a/tests/src/test/java/alluxio/client/fs/BlockMasterDeleteLostWorkerIntegrationTest.java +++ b/tests/src/test/java/alluxio/client/fs/BlockMasterDeleteLostWorkerIntegrationTest.java @@ -91,14 +91,14 @@ public void lostWorkerDeletedAfterTimeout() throws Exception { // The worker will not be deleted, if the lost time is less than MASTER_WORKER_TIMEOUT_MS long newTimeMs = worker.getLastUpdatedTimeMs() + MASTER_WORKER_TIMEOUT_MS + 1; mClock.setTimeMs(newTimeMs); - lostWorkerDetector.heartbeat(); + lostWorkerDetector.heartbeat(Long.MAX_VALUE); assertEquals(0, mBlockMaster.getWorkerCount()); assertEquals(1, mBlockMaster.getLostWorkerCount()); // The worker will be deleted, if the lost time is greater than MASTER_WORKER_TIMEOUT_MS newTimeMs = newTimeMs + MASTER_WORKER_DELETE_TIMEOUT_MS + 1; mClock.setTimeMs(newTimeMs); - lostWorkerDetector.heartbeat(); + lostWorkerDetector.heartbeat(Long.MAX_VALUE); assertEquals(0, mBlockMaster.getWorkerCount()); assertEquals(0, mBlockMaster.getLostWorkerCount()); } diff --git a/tests/src/test/java/alluxio/client/fs/FileSystemContextReinitIntegrationTest.java b/tests/src/test/java/alluxio/client/fs/FileSystemContextReinitIntegrationTest.java index 22472643ab13..43c92688ec24 100644 --- a/tests/src/test/java/alluxio/client/fs/FileSystemContextReinitIntegrationTest.java +++ b/tests/src/test/java/alluxio/client/fs/FileSystemContextReinitIntegrationTest.java @@ -139,7 +139,7 @@ public void configHashSyncWithOpenStream() throws Exception { ExecutorService service = Executors.newSingleThreadExecutor(); Future future = service.submit(() -> { - mExecutor.heartbeat(); + mExecutor.heartbeat(Long.MAX_VALUE); }); TimeUnit.SECONDS.sleep(1); // Stream is open, so reinitialization should block until the stream is closed. @@ -159,7 +159,7 @@ public void configHashSyncWithOpenStream() throws Exception { * Triggers ConfigHashSync heartbeat and waits for it to finish. */ private void triggerAndWaitSync() throws Exception { - mExecutor.heartbeat(); + mExecutor.heartbeat(Long.MAX_VALUE); } private void restartMasters() throws Exception { diff --git a/tests/src/test/java/alluxio/server/block/BlockMasterRegisterStreamIntegrationTest.java b/tests/src/test/java/alluxio/server/block/BlockMasterRegisterStreamIntegrationTest.java index dab5f3e302ab..6bd74f113f03 100644 --- a/tests/src/test/java/alluxio/server/block/BlockMasterRegisterStreamIntegrationTest.java +++ b/tests/src/test/java/alluxio/server/block/BlockMasterRegisterStreamIntegrationTest.java @@ -211,7 +211,7 @@ public void registerLostWorker() throws Exception { mClock.setTimeMs(newTimeMs); DefaultBlockMaster.LostWorkerDetectionHeartbeatExecutor lostWorkerDetector = ((DefaultBlockMaster) mBlockMaster).new LostWorkerDetectionHeartbeatExecutor(); - lostWorkerDetector.heartbeat(); + lostWorkerDetector.heartbeat(Long.MAX_VALUE); // Verify the worker has been forgotten assertEquals(0, mBlockMaster.getWorkerCount()); diff --git a/tests/src/test/java/alluxio/server/block/BlockWorkerRegisterStreamIntegrationTest.java b/tests/src/test/java/alluxio/server/block/BlockWorkerRegisterStreamIntegrationTest.java index 6964bdb2e146..d90d6d56741d 100644 --- a/tests/src/test/java/alluxio/server/block/BlockWorkerRegisterStreamIntegrationTest.java +++ b/tests/src/test/java/alluxio/server/block/BlockWorkerRegisterStreamIntegrationTest.java @@ -462,7 +462,7 @@ public void deleteDuringRegisterStream() throws Exception { f.get(); assertNull(error.get()); // Validation will happen on the heartbeat - sync.heartbeat(); + sync.heartbeat(Long.MAX_VALUE); } // TODO(jiacheng): an internal block movement happens during register stream