-
Notifications
You must be signed in to change notification settings - Fork 1.9k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[Backport 2.x] In-flight cancellation of SearchShardTask based on res…
…ource consumption (#5039) * [Backport 2.x] Added in-flight cancellation of SearchShardTask based on resource consumption (#4575) This feature aims to identify and cancel resource intensive SearchShardTasks if they have breached certain thresholds. This will help in terminating problematic queries which can put nodes in duress and degrade the cluster performance. * [Backport 2.x] Added resource usage trackers for in-flight cancellation of SearchShardTask (#4805) 1. CpuUsageTracker: cancels tasks if they consume too much CPU 2. ElapsedTimeTracker: cancels tasks if they consume too much time 3. HeapUsageTracker: cancels tasks if they consume too much heap * [Backport 2.x]Added search backpressure stats API Added search backpressure stats to the existing node/stats API to describe: 1. the number of cancellations (currently for SearchShardTask only) 2. the current state of TaskResourceUsageTracker Signed-off-by: Ketan Verma <ketan9495@gmail.com>
- Loading branch information
Showing
49 changed files
with
2,970 additions
and
12 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
57 changes: 57 additions & 0 deletions
57
server/src/main/java/org/opensearch/common/util/MovingAverage.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,57 @@ | ||
/* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
*/ | ||
|
||
package org.opensearch.common.util; | ||
|
||
/** | ||
* MovingAverage is used to calculate the moving average of last 'n' observations. | ||
* | ||
* @opensearch.internal | ||
*/ | ||
public class MovingAverage { | ||
private final int windowSize; | ||
private final long[] observations; | ||
|
||
private long count = 0; | ||
private long sum = 0; | ||
private double average = 0; | ||
|
||
public MovingAverage(int windowSize) { | ||
if (windowSize <= 0) { | ||
throw new IllegalArgumentException("window size must be greater than zero"); | ||
} | ||
|
||
this.windowSize = windowSize; | ||
this.observations = new long[windowSize]; | ||
} | ||
|
||
/** | ||
* Records a new observation and evicts the n-th last observation. | ||
*/ | ||
public synchronized double record(long value) { | ||
long delta = value - observations[(int) (count % observations.length)]; | ||
observations[(int) (count % observations.length)] = value; | ||
|
||
count++; | ||
sum += delta; | ||
average = (double) sum / Math.min(count, observations.length); | ||
return average; | ||
} | ||
|
||
public double getAverage() { | ||
return average; | ||
} | ||
|
||
public long getCount() { | ||
return count; | ||
} | ||
|
||
public boolean isReady() { | ||
return count >= windowSize; | ||
} | ||
} |
33 changes: 33 additions & 0 deletions
33
server/src/main/java/org/opensearch/common/util/Streak.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,33 @@ | ||
/* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
*/ | ||
|
||
package org.opensearch.common.util; | ||
|
||
import java.util.concurrent.atomic.AtomicInteger; | ||
|
||
/** | ||
* Streak is a data structure that keeps track of the number of successive successful events. | ||
* | ||
* @opensearch.internal | ||
*/ | ||
public class Streak { | ||
private final AtomicInteger successiveSuccessfulEvents = new AtomicInteger(); | ||
|
||
public int record(boolean isSuccessful) { | ||
if (isSuccessful) { | ||
return successiveSuccessfulEvents.incrementAndGet(); | ||
} else { | ||
successiveSuccessfulEvents.set(0); | ||
return 0; | ||
} | ||
} | ||
|
||
public int length() { | ||
return successiveSuccessfulEvents.get(); | ||
} | ||
} |
124 changes: 124 additions & 0 deletions
124
server/src/main/java/org/opensearch/common/util/TokenBucket.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,124 @@ | ||
/* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
*/ | ||
|
||
package org.opensearch.common.util; | ||
|
||
import java.util.Objects; | ||
import java.util.concurrent.atomic.AtomicReference; | ||
import java.util.function.LongSupplier; | ||
|
||
/** | ||
* TokenBucket is used to limit the number of operations at a constant rate while allowing for short bursts. | ||
* | ||
* @opensearch.internal | ||
*/ | ||
public class TokenBucket { | ||
/** | ||
* Defines a monotonically increasing counter. | ||
* | ||
* Usage examples: | ||
* 1. clock = System::nanoTime can be used to perform rate-limiting per unit time | ||
* 2. clock = AtomicLong::get can be used to perform rate-limiting per unit number of operations | ||
*/ | ||
private final LongSupplier clock; | ||
|
||
/** | ||
* Defines the number of tokens added to the bucket per clock cycle. | ||
*/ | ||
private final double rate; | ||
|
||
/** | ||
* Defines the capacity and the maximum number of operations that can be performed per clock cycle before | ||
* the bucket runs out of tokens. | ||
*/ | ||
private final double burst; | ||
|
||
/** | ||
* Defines the current state of the token bucket. | ||
*/ | ||
private final AtomicReference<State> state; | ||
|
||
public TokenBucket(LongSupplier clock, double rate, double burst) { | ||
this(clock, rate, burst, burst); | ||
} | ||
|
||
public TokenBucket(LongSupplier clock, double rate, double burst, double initialTokens) { | ||
if (rate <= 0.0) { | ||
throw new IllegalArgumentException("rate must be greater than zero"); | ||
} | ||
|
||
if (burst <= 0.0) { | ||
throw new IllegalArgumentException("burst must be greater than zero"); | ||
} | ||
|
||
this.clock = clock; | ||
this.rate = rate; | ||
this.burst = burst; | ||
this.state = new AtomicReference<>(new State(Math.min(initialTokens, burst), clock.getAsLong())); | ||
} | ||
|
||
/** | ||
* If there are enough tokens in the bucket, it requests/deducts 'n' tokens and returns true. | ||
* Otherwise, returns false and leaves the bucket untouched. | ||
*/ | ||
public boolean request(double n) { | ||
if (n <= 0) { | ||
throw new IllegalArgumentException("requested tokens must be greater than zero"); | ||
} | ||
|
||
// Refill tokens | ||
State currentState, updatedState; | ||
do { | ||
currentState = state.get(); | ||
long now = clock.getAsLong(); | ||
double incr = (now - currentState.lastRefilledAt) * rate; | ||
updatedState = new State(Math.min(currentState.tokens + incr, burst), now); | ||
} while (state.compareAndSet(currentState, updatedState) == false); | ||
|
||
// Deduct tokens | ||
do { | ||
currentState = state.get(); | ||
if (currentState.tokens < n) { | ||
return false; | ||
} | ||
updatedState = new State(currentState.tokens - n, currentState.lastRefilledAt); | ||
} while (state.compareAndSet(currentState, updatedState) == false); | ||
|
||
return true; | ||
} | ||
|
||
public boolean request() { | ||
return request(1.0); | ||
} | ||
|
||
/** | ||
* Represents an immutable token bucket state. | ||
*/ | ||
private static class State { | ||
final double tokens; | ||
final long lastRefilledAt; | ||
|
||
public State(double tokens, long lastRefilledAt) { | ||
this.tokens = tokens; | ||
this.lastRefilledAt = lastRefilledAt; | ||
} | ||
|
||
@Override | ||
public boolean equals(Object o) { | ||
if (this == o) return true; | ||
if (o == null || getClass() != o.getClass()) return false; | ||
State state = (State) o; | ||
return Double.compare(state.tokens, tokens) == 0 && lastRefilledAt == state.lastRefilledAt; | ||
} | ||
|
||
@Override | ||
public int hashCode() { | ||
return Objects.hash(tokens, lastRefilledAt); | ||
} | ||
} | ||
} |
Oops, something went wrong.