Skip to content

Commit

Permalink
Merge pull request opensearch-project#8 from sgup432/CoordinatorStats
Browse files Browse the repository at this point in the history
Coordinator stats
  • Loading branch information
sgup432 authored Sep 18, 2023
2 parents 0d87cfb + 39f7596 commit 4d66914
Show file tree
Hide file tree
Showing 172 changed files with 4,478 additions and 1,555 deletions.
21 changes: 21 additions & 0 deletions .github/workflows/copy-linked-issue-labels.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
name: Copy labels from linked issues
on:
pull_request_target:
types: [opened, edited, review_requested, synchronize, reopened, ready_for_review]

jobs:
copy-issue-labels:
if: github.repository == 'opensearch-project/OpenSearch'
runs-on: ubuntu-latest
permissions:
issues: read
contents: read
pull-requests: write
steps:
- name: copy-issue-labels
uses: michalvankodev/copy-issue-labels@v1.3.0
with:
repo-token: ${{ secrets.GITHUB_TOKEN }}
labels-to-exclude: |
untriaged
triaged
2 changes: 1 addition & 1 deletion .github/workflows/precommit.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
name: Gradle Precommit and Asssemble
name: Gradle Precommit and Assemble
on: [pull_request]

jobs:
Expand Down
29 changes: 0 additions & 29 deletions .github/workflows/stalled.yml

This file was deleted.

6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,24 +79,30 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
### Added
- Add coordinator level stats for search latency ([#8386](https://github.com/opensearch-project/OpenSearch/issues/8386))
- Add metrics for thread_pool task wait time ([#9681](https://github.com/opensearch-project/OpenSearch/pull/9681))
- Async blob read support for S3 plugin ([#9694](https://github.com/opensearch-project/OpenSearch/pull/9694))

### Dependencies
- Bump `peter-evans/create-or-update-comment` from 2 to 3 ([#9575](https://github.com/opensearch-project/OpenSearch/pull/9575))
- Bump `actions/checkout` from 2 to 4 ([#9968](https://github.com/opensearch-project/OpenSearch/pull/9968))
- Bump OpenTelemetry from 1.26.0 to 1.30.1 ([#9950](https://github.com/opensearch-project/OpenSearch/pull/9950))
- Bump `org.apache.commons:commons-compress` from 1.23.0 to 1.24.0 ([#9973, #9972](https://github.com/opensearch-project/OpenSearch/pull/9973, https://github.com/opensearch-project/OpenSearch/pull/9972))
- Bump `com.google.cloud:google-cloud-core-http` from 2.21.1 to 2.23.0 ([#9971](https://github.com/opensearch-project/OpenSearch/pull/9971))
- Bump `mockito` from 5.4.0 to 5.5.0 ([#10022](https://github.com/opensearch-project/OpenSearch/pull/10022))
- Bump `bytebuddy` from 1.14.3 to 1.14.7 ([#10022](https://github.com/opensearch-project/OpenSearch/pull/10022))

### Changed
- Add instrumentation in rest and network layer. ([#9415](https://github.com/opensearch-project/OpenSearch/pull/9415))
- Allow parameterization of tests with OpenSearchIntegTestCase.SuiteScopeTestCase annotation ([#9916](https://github.com/opensearch-project/OpenSearch/pull/9916))
- Mute the query profile IT with concurrent execution ([#9840](https://github.com/opensearch-project/OpenSearch/pull/9840))
- Add instrumentation in transport service. ([#10042](https://github.com/opensearch-project/OpenSearch/pull/10042))

### Deprecated

### Removed

### Fixed
- Fix ignore_missing parameter has no effect when using template snippet in rename ingest processor ([#9725](https://github.com/opensearch-project/OpenSearch/pull/9725))
- Fix broken backward compatibility from 2.7 for IndexSorted field indices ([#10045](https://github.com/opensearch-project/OpenSearch/pull/9725))

### Security

Expand Down
4 changes: 2 additions & 2 deletions buildSrc/version.properties
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,9 @@ bouncycastle=1.75
randomizedrunner = 2.7.1
junit = 4.13.2
hamcrest = 2.1
mockito = 5.4.0
mockito = 5.5.0
objenesis = 3.2
bytebuddy = 1.14.3
bytebuddy = 1.14.7

# benchmark dependencies
jmh = 1.35
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,15 @@
* @opensearch.internal
*/
public class Iterators {

/**
* Concat iterators
*
* @param iterators the iterators to concat
* @param <T> the type of iterator
* @return a new {@link ConcatenatedIterator}
* @throws NullPointerException if iterators is null
*/
public static <T> Iterator<T> concat(Iterator<? extends T>... iterators) {
if (iterators == null) {
throw new NullPointerException("iterators");
Expand Down Expand Up @@ -71,6 +80,11 @@ static class ConcatenatedIterator<T> implements Iterator<T> {
this.iterators = iterators;
}

/**
* Returns {@code true} if the iteration has more elements. (In other words, returns {@code true} if {@link #next} would return an
* element rather than throwing an exception.)
* @return {@code true} if the iteration has more elements
*/
@Override
public boolean hasNext() {
boolean hasNext = false;
Expand All @@ -81,6 +95,11 @@ public boolean hasNext() {
return hasNext;
}

/**
* Returns the next element in the iteration.
* @return the next element in the iteration
* @throws NoSuchElementException if the iteration has no more elements
*/
@Override
public T next() {
if (!hasNext()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,13 @@
* @opensearch.experimental - class methods might change
*/
public class ZstdCompressor implements Compressor {
// An arbitrary header that we use to identify compressed streams
// It needs to be different from other compressors and to not be specific
// enough so that no stream starting with these bytes could be detected as
// a XContent

/**
* An arbitrary header that we use to identify compressed streams
* It needs to be different from other compressors and to not be specific
* enough so that no stream starting with these bytes could be detected as
* a XContent
* */
private static final byte[] HEADER = new byte[] { 'Z', 'S', 'T', 'D', '\0' };

/**
Expand All @@ -44,10 +47,20 @@ public class ZstdCompressor implements Compressor {
@PublicApi(since = "2.10.0")
public static final String NAME = "ZSTD";

/**
* The compression level for {@link ZstdOutputStreamNoFinalizer}
*/
private static final int LEVEL = 3;

/** The buffer size for {@link BufferedInputStream} and {@link BufferedOutputStream}
*/
private static final int BUFFER_SIZE = 4096;

/**
* Compares the given bytes with the {@link ZstdCompressor#HEADER} of a compressed stream
* @param bytes the bytes to compare to ({@link ZstdCompressor#HEADER})
* @return true if the bytes are the {@link ZstdCompressor#HEADER}, false otherwise
*/
@Override
public boolean isCompressed(BytesReference bytes) {
if (bytes.length() < HEADER.length) {
Expand All @@ -61,11 +74,22 @@ public boolean isCompressed(BytesReference bytes) {
return true;
}

/**
* Returns the length of the {@link ZstdCompressor#HEADER}
* @return the {@link ZstdCompressor#HEADER} length
*/
@Override
public int headerLength() {
return HEADER.length;
}

/**
* Returns a new {@link ZstdInputStreamNoFinalizer} from the given compressed {@link InputStream}
* @param in the compressed {@link InputStream}
* @return a new {@link ZstdInputStreamNoFinalizer} from the given compressed {@link InputStream}
* @throws IOException if an I/O error occurs
* @throws IllegalArgumentException if the input stream is not compressed with ZSTD
*/
@Override
public InputStream threadLocalInputStream(InputStream in) throws IOException {
final byte[] header = in.readNBytes(HEADER.length);
Expand All @@ -75,17 +99,36 @@ public InputStream threadLocalInputStream(InputStream in) throws IOException {
return new ZstdInputStreamNoFinalizer(new BufferedInputStream(in, BUFFER_SIZE), RecyclingBufferPool.INSTANCE);
}

/**
* Returns a new {@link ZstdOutputStreamNoFinalizer} from the given {@link OutputStream}
* @param out the {@link OutputStream}
* @return a new {@link ZstdOutputStreamNoFinalizer} from the given {@link OutputStream}
* @throws IOException if an I/O error occurs
*/
@Override
public OutputStream threadLocalOutputStream(OutputStream out) throws IOException {
out.write(HEADER);
return new ZstdOutputStreamNoFinalizer(new BufferedOutputStream(out, BUFFER_SIZE), RecyclingBufferPool.INSTANCE, LEVEL);
}

/**
* Always throws an {@link UnsupportedOperationException} as ZSTD compression is supported only for snapshotting
* @param bytesReference a reference to the bytes to uncompress
* @return always throws an exception
* @throws UnsupportedOperationException if the method is called
* @throws IOException is never thrown
*/
@Override
public BytesReference uncompress(BytesReference bytesReference) throws IOException {
throw new UnsupportedOperationException("ZSTD compression is supported only for snapshotting");
}

/**
* Always throws an {@link UnsupportedOperationException} as ZSTD compression is supported only for snapshotting
* @param bytesReference a reference to the bytes to compress
* @return always throws an exception
* @throws UnsupportedOperationException if the method is called
*/
@Override
public BytesReference compress(BytesReference bytesReference) throws IOException {
throw new UnsupportedOperationException("ZSTD compression is supported only for snapshotting");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,17 +71,23 @@ public interface CircuitBreaker {

/**
* The type of breaker
*
* can be {@link #MEMORY}, {@link #PARENT}, or {@link #NOOP}
* @opensearch.internal
*/
enum Type {
// A regular or ChildMemoryCircuitBreaker
/** A regular or ChildMemoryCircuitBreaker */
MEMORY,
// A special parent-type for the hierarchy breaker service
/** A special parent-type for the hierarchy breaker service */
PARENT,
// A breaker where every action is a noop, it never breaks
/** A breaker where every action is a noop, it never breaks */
NOOP;

/**
* Converts string (case-insensitive) to breaker {@link Type}
* @param value "noop", "parent", or "memory" (case-insensitive)
* @return the breaker {@link Type}
* @throws IllegalArgumentException if value is not "noop", "parent", or "memory"
*/
public static Type parseValue(String value) {
switch (value.toLowerCase(Locale.ROOT)) {
case "noop":
Expand All @@ -98,13 +104,13 @@ public static Type parseValue(String value) {

/**
* The breaker durability
*
* can be {@link #TRANSIENT} or {@link #PERMANENT}
* @opensearch.internal
*/
enum Durability {
// The condition that tripped the circuit breaker fixes itself eventually.
/** The condition that tripped the circuit breaker fixes itself eventually. */
TRANSIENT,
// The condition that tripped the circuit breaker requires manual intervention.
/** The condition that tripped the circuit breaker requires manual intervention. */
PERMANENT
}

Expand All @@ -120,11 +126,14 @@ enum Durability {
* @param bytes number of bytes to add
* @param label string label describing the bytes being added
* @return the number of "used" bytes for the circuit breaker
* @throws CircuitBreakingException if the breaker tripped
*/
double addEstimateBytesAndMaybeBreak(long bytes, String label) throws CircuitBreakingException;

/**
* Adjust the circuit breaker without tripping
* @param bytes number of bytes to add
* @return the number of "used" bytes for the circuit breaker
*/
long addWithoutBreaking(long bytes);

Expand Down Expand Up @@ -154,7 +163,10 @@ enum Durability {
String getName();

/**
* @return whether a tripped circuit breaker will reset itself (transient) or requires manual intervention (permanent).
* Returns the {@link Durability} of this breaker
* @return whether a tripped circuit breaker will
* reset itself ({@link Durability#TRANSIENT})
* or requires manual intervention ({@link Durability#PERMANENT}).
*/
Durability getDurability();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,11 @@
*/
public class CircuitBreakingException extends OpenSearchException {

/** The number of bytes wanted */
private final long bytesWanted;
/** The circuit breaker limit */
private final long byteLimit;
/** The {@link CircuitBreaker.Durability} of the circuit breaker */
private final CircuitBreaker.Durability durability;

public CircuitBreakingException(StreamInput in) throws IOException {
Expand Down Expand Up @@ -88,6 +91,7 @@ public CircuitBreaker.Durability getDurability() {
return durability;
}

/** Always returns {@link RestStatus#TOO_MANY_REQUESTS} */
@Override
public RestStatus status() {
return RestStatus.TOO_MANY_REQUESTS;
Expand Down
Loading

0 comments on commit 4d66914

Please sign in to comment.