-
Notifications
You must be signed in to change notification settings - Fork 2.9k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Support cron timer to arrange the period heartbeat executor invoke time
Support cron timer to arrange the period heartbeat executor invoke time. Example ``` cron timer config: * 0-10,20-30,40-56 12-13 * * ? * The heartbeat executor invoke at every minute from 0 through 10, from 20 through 30, from 40 through 56, at past every hour from 12 through 13. ``` pr-link: #16900 change-id: cid-9277f30e2159e64863067d14cbcbee526707c5b6
- Loading branch information
1 parent
075e4e4
commit fd19fb6
Showing
50 changed files
with
506 additions
and
188 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
59 changes: 59 additions & 0 deletions
59
core/common/src/main/java/alluxio/heartbeat/CronExpressionIntervalSupplier.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,59 @@ | ||
/* | ||
* The Alluxio Open Foundation licenses this work under the Apache License, version 2.0 | ||
* (the "License"). You may not use this work except in compliance with the License, which is | ||
* available at www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* This software is distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, | ||
* either express or implied, as more fully set forth in the License. | ||
* | ||
* See the NOTICE file distributed with this work for information regarding copyright ownership. | ||
*/ | ||
|
||
package alluxio.heartbeat; | ||
|
||
import org.apache.logging.log4j.core.util.CronExpression; | ||
|
||
import java.time.Duration; | ||
import java.time.Instant; | ||
import java.util.Date; | ||
|
||
/** | ||
* Calculate the next interval by given cron expression. | ||
*/ | ||
public class CronExpressionIntervalSupplier implements SleepIntervalSupplier { | ||
private final long mInterval; | ||
private final CronExpression mCron; | ||
|
||
/** | ||
* Constructs a new {@link CronExpressionIntervalSupplier}. | ||
* | ||
* @param cronExpression the cron expression | ||
* @param fixedInterval the fixed interval | ||
*/ | ||
public CronExpressionIntervalSupplier(CronExpression cronExpression, long fixedInterval) { | ||
mInterval = fixedInterval; | ||
mCron = cronExpression; | ||
} | ||
|
||
@Override | ||
public long getNextInterval(long mPreviousTickedMs, long nowTimeStampMillis) { | ||
long nextInterval = 0; | ||
long executionTimeMs = nowTimeStampMillis - mPreviousTickedMs; | ||
if (executionTimeMs < mInterval) { | ||
nextInterval = mInterval - executionTimeMs; | ||
} | ||
Date now = Date.from(Instant.ofEpochMilli(nowTimeStampMillis + nextInterval)); | ||
if (mCron.isSatisfiedBy(now)) { | ||
return nextInterval; | ||
} | ||
return nextInterval + Duration.between( | ||
now.toInstant(), mCron.getNextValidTimeAfter(now).toInstant()).toMillis(); | ||
} | ||
|
||
@Override | ||
public long getRunLimit(long mPreviousTickedMs) { | ||
Date now = Date.from(Instant.ofEpochMilli(mPreviousTickedMs)); | ||
return Duration.between(now.toInstant(), | ||
mCron.getNextInvalidTimeAfter(now).toInstant()).toMillis(); | ||
} | ||
} |
63 changes: 63 additions & 0 deletions
63
core/common/src/main/java/alluxio/heartbeat/FixedIntervalSupplier.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,63 @@ | ||
/* | ||
* The Alluxio Open Foundation licenses this work under the Apache License, version 2.0 | ||
* (the "License"). You may not use this work except in compliance with the License, which is | ||
* available at www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* This software is distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, | ||
* either express or implied, as more fully set forth in the License. | ||
* | ||
* See the NOTICE file distributed with this work for information regarding copyright ownership. | ||
*/ | ||
|
||
package alluxio.heartbeat; | ||
|
||
import org.slf4j.Logger; | ||
import org.slf4j.helpers.NOPLogger; | ||
|
||
/** | ||
* Fixed interval supplier. | ||
*/ | ||
public class FixedIntervalSupplier implements SleepIntervalSupplier { | ||
|
||
private final long mInterval; | ||
protected final Logger mLogger; | ||
|
||
/** | ||
* Constructs a new {@link FixedIntervalSupplier}. | ||
* | ||
* @param fixedInterval the fixed interval | ||
* @param logger the logger | ||
*/ | ||
public FixedIntervalSupplier(long fixedInterval, Logger logger) { | ||
mInterval = fixedInterval; | ||
mLogger = logger; | ||
} | ||
|
||
/** | ||
* Constructs a new {@link FixedIntervalSupplier}. | ||
* | ||
* @param fixedInterval the fixed interval | ||
*/ | ||
public FixedIntervalSupplier(long fixedInterval) { | ||
this(fixedInterval, NOPLogger.NOP_LOGGER); | ||
} | ||
|
||
@Override | ||
public long getNextInterval(long mPreviousTickedMs, long nowTimeStampMillis) { | ||
if (mPreviousTickedMs == -1) { | ||
return -1; | ||
} | ||
long executionTimeMs = nowTimeStampMillis - mPreviousTickedMs; | ||
if (executionTimeMs > mInterval) { | ||
mLogger.warn("{} last execution took {} ms. Longer than the interval {}", | ||
Thread.currentThread().getName(), executionTimeMs, mInterval); | ||
return 0; | ||
} | ||
return mInterval - executionTimeMs; | ||
} | ||
|
||
@Override | ||
public long getRunLimit(long mPreviousTickedMs) { | ||
return mInterval; | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.