Skip to content

Commit

Permalink
Fixes to prevent segment fault errors arising due to unexpected SDK b…
Browse files Browse the repository at this point in the history
…ehaviour

Signed-off-by: vikasvb90 <vikasvb@amazon.com>
  • Loading branch information
vikasvb90 committed Nov 28, 2023
1 parent 5bb6cae commit e2c5146
Show file tree
Hide file tree
Showing 4 changed files with 249 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ public class RemoteTransferContainer implements Closeable {
private final long expectedChecksum;
private final OffsetRangeInputStreamSupplier offsetRangeInputStreamSupplier;
private final boolean isRemoteDataIntegritySupported;

private static final Logger log = LogManager.getLogger(RemoteTransferContainer.class);

/**
Expand Down Expand Up @@ -226,27 +225,7 @@ private long getActualChecksum() {

@Override
public void close() throws IOException {
if (inputStreams.get() == null) {
log.warn("Input streams cannot be closed since they are not yet set for multi stream upload");
return;
}

boolean closeStreamException = false;
for (InputStream is : Objects.requireNonNull(inputStreams.get())) {
try {
if (is != null) {
is.close();
}
} catch (IOException ex) {
closeStreamException = true;
// Attempting to close all streams first before throwing exception.
log.error("Multipart stream failed to close ", ex);
}
}

if (closeStreamException) {
throw new IOException("Closure of some of the multi-part streams failed.");
}
// Nothing as each stream should be independently closed.
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,33 @@

package org.opensearch.common.blobstore.transfer.stream;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.IndexInput;
import org.opensearch.common.concurrent.RefCountedReleasable;
import org.opensearch.common.lucene.store.InputStreamIndexInput;
import org.opensearch.common.util.concurrent.RunOnce;

import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;

/**
* OffsetRangeIndexInputStream extends InputStream to read from a specified offset using IndexInput
*
* @opensearch.internal
*/
public class OffsetRangeIndexInputStream extends OffsetRangeInputStream {

private static final Logger logger = LogManager.getLogger(OffsetRangeIndexInputStream.class);
private final InputStreamIndexInput inputStreamIndexInput;
private final IndexInput indexInput;

private final OffsetRangeRefCount offsetRangeRefCount;

private final RunOnce closeOnce;

private final AtomicBoolean closed = new AtomicBoolean();

/**
* Construct a new OffsetRangeIndexInputStream object
*
Expand All @@ -35,16 +47,53 @@ public OffsetRangeIndexInputStream(IndexInput indexInput, long size, long positi
indexInput.seek(position);
this.indexInput = indexInput;
this.inputStreamIndexInput = new InputStreamIndexInput(indexInput, size);
ClosingStreams closingStreams = new ClosingStreams(inputStreamIndexInput, indexInput);
offsetRangeRefCount = new OffsetRangeRefCount(closingStreams);
closeOnce = new RunOnce(offsetRangeRefCount::decRef);
}

@Override
public int read(byte[] b, int off, int len) throws IOException {
return inputStreamIndexInput.read(b, off, len);
// There are two levels of check to ensure that we don't read an already closed stream and
// to not close the stream if it is already being read.
// 1. First check is a coarse-grained check outside reference check which allows us to fail fast if read
// was invoked after the stream was closed.
// 2. In second check, a tryIncRef is invoked which tries to increment reference under lock and fails if ref
// is already closed. If reference is successfully obtained by the stream then stream will not be closed.
// Ref counting ensures that stream isn't closed in between reads.
// All these protection mechanisms are required in order to prevent invalid access to streams happening
// from the new S3 async SDK.
ensureOpen();
try (OffsetRangeRefCount ignored = getStreamReference()) {
return inputStreamIndexInput.read(b, off, len);
}
}

private OffsetRangeRefCount getStreamReference() {
boolean successIncrement = offsetRangeRefCount.tryIncRef();
if (successIncrement == false) {
throw alreadyClosed("OffsetRangeIndexInputStream is already unreferenced.");

Check warning on line 75 in server/src/main/java/org/opensearch/common/blobstore/transfer/stream/OffsetRangeIndexInputStream.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/common/blobstore/transfer/stream/OffsetRangeIndexInputStream.java#L75

Added line #L75 was not covered by tests
}
return offsetRangeRefCount;
}

private void ensureOpen() {
if (closed.get() == true) {
logger.debug("Read on stream was attempted after during the close of overall file stream!");
throw alreadyClosed("Already closed: ");
}
}

AlreadyClosedException alreadyClosed(String msg) {
return new AlreadyClosedException(msg + this);
}

@Override
public int read() throws IOException {
return inputStreamIndexInput.read();
ensureOpen();
try (OffsetRangeRefCount ignored = getStreamReference()) {
return inputStreamIndexInput.read();

Check warning on line 95 in server/src/main/java/org/opensearch/common/blobstore/transfer/stream/OffsetRangeIndexInputStream.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/common/blobstore/transfer/stream/OffsetRangeIndexInputStream.java#L93-L95

Added lines #L93 - L95 were not covered by tests
}
}

@Override
Expand All @@ -67,9 +116,43 @@ public long getFilePointer() throws IOException {
return indexInput.getFilePointer();
}

@Override
public String toString() {
return "OffsetRangeIndexInputStream{" + "indexInput=" + indexInput + '}';
}

private static class ClosingStreams {
private final InputStreamIndexInput inputStreamIndexInput;
private final IndexInput indexInput;

public ClosingStreams(InputStreamIndexInput inputStreamIndexInput, IndexInput indexInput) {
this.inputStreamIndexInput = inputStreamIndexInput;
this.indexInput = indexInput;
}
}

private static class OffsetRangeRefCount extends RefCountedReleasable<ClosingStreams> {
private static final Logger logger = LogManager.getLogger(OffsetRangeRefCount.class);

public OffsetRangeRefCount(ClosingStreams ref) {
super("OffsetRangeRefCount", ref, () -> {
try {
ref.inputStreamIndexInput.close();
} catch (IOException ex) {
logger.error("Failed to close indexStreamIndexInput", ex);

Check warning on line 142 in server/src/main/java/org/opensearch/common/blobstore/transfer/stream/OffsetRangeIndexInputStream.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/common/blobstore/transfer/stream/OffsetRangeIndexInputStream.java#L141-L142

Added lines #L141 - L142 were not covered by tests
}
try {
ref.indexInput.close();
} catch (IOException ex) {
logger.error("Failed to close indexInput", ex);

Check warning on line 147 in server/src/main/java/org/opensearch/common/blobstore/transfer/stream/OffsetRangeIndexInputStream.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/common/blobstore/transfer/stream/OffsetRangeIndexInputStream.java#L146-L147

Added lines #L146 - L147 were not covered by tests
}
});
}
}

@Override
public void close() throws IOException {
inputStreamIndexInput.close();
indexInput.close();
closed.set(true);
closeOnce.run();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,5 @@
*/
public abstract class OffsetRangeInputStream extends InputStream {
public abstract long getFilePointer() throws IOException;

}
Loading

0 comments on commit e2c5146

Please sign in to comment.