Skip to content

Commit

Permalink
Use file length from metadata while downloading segments from remote …
Browse files Browse the repository at this point in the history
…store (opensearch-project#10399)

Signed-off-by: Sachin Kale <kalsac@amazon.com>
  • Loading branch information
sachinpkale authored Oct 10, 2023
1 parent 74ffdb6 commit 8e5e54e
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -193,10 +192,14 @@ public IndexOutput createOutput(String name, IOContext context) {
*/
@Override
public IndexInput openInput(String name, IOContext context) throws IOException {
return openInput(name, fileLength(name), context);
}

public IndexInput openInput(String name, long fileLength, IOContext context) throws IOException {
InputStream inputStream = null;
try {
inputStream = blobContainer.readBlob(name);
return new RemoteIndexInput(name, downloadRateLimiter.apply(inputStream), fileLength(name));
return new RemoteIndexInput(name, downloadRateLimiter.apply(inputStream), fileLength);
} catch (Exception e) {
// Incase the RemoteIndexInput creation fails, close the input stream to avoid file handler leak.
if (inputStream != null) {
Expand Down Expand Up @@ -230,9 +233,9 @@ public void close() throws IOException {
@Override
public long fileLength(String name) throws IOException {
// ToDo: Instead of calling remote store each time, keep a cache with segment metadata
Map<String, BlobMetadata> metadata = blobContainer.listBlobsByPrefix(name);
if (metadata.containsKey(name)) {
return metadata.get(name).length();
List<BlobMetadata> metadata = blobContainer.listBlobsByPrefixInSortedOrder(name, 1, BlobContainer.BlobNameSortOrder.LEXICOGRAPHIC);
if (metadata.size() == 1 && metadata.get(0).name().equals(name)) {
return metadata.get(0).length();
}
throw new NoSuchFileException(name);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -434,8 +434,9 @@ public IndexOutput createOutput(String name, IOContext context) throws IOExcepti
@Override
public IndexInput openInput(String name, IOContext context) throws IOException {
String remoteFilename = getExistingRemoteFilename(name);
long fileLength = fileLength(name);
if (remoteFilename != null) {
return remoteDataDirectory.openInput(remoteFilename, context);
return remoteDataDirectory.openInput(remoteFilename, fileLength, context);
} else {
throw new NoSuchFileException(name);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import java.nio.file.NoSuchFileException;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand All @@ -41,11 +40,13 @@

import org.mockito.Mockito;

import static org.opensearch.common.blobstore.BlobContainer.BlobNameSortOrder.LEXICOGRAPHIC;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

Expand Down Expand Up @@ -204,13 +205,29 @@ public void testCreateOutput() {
public void testOpenInput() throws IOException {
InputStream mockInputStream = mock(InputStream.class);
when(blobContainer.readBlob("segment_1")).thenReturn(mockInputStream);
Map<String, BlobMetadata> fileInfo = new HashMap<>();
fileInfo.put("segment_1", new PlainBlobMetadata("segment_1", 100));
when(blobContainer.listBlobsByPrefix("segment_1")).thenReturn(fileInfo);

BlobMetadata blobMetadata = new PlainBlobMetadata("segment_1", 100);

when(blobContainer.listBlobsByPrefixInSortedOrder("segment_1", 1, LEXICOGRAPHIC)).thenReturn(List.of(blobMetadata));

IndexInput indexInput = remoteDirectory.openInput("segment_1", IOContext.DEFAULT);
assertTrue(indexInput instanceof RemoteIndexInput);
assertEquals(100, indexInput.length());
verify(blobContainer).listBlobsByPrefixInSortedOrder("segment_1", 1, LEXICOGRAPHIC);
}

public void testOpenInputWithLength() throws IOException {
InputStream mockInputStream = mock(InputStream.class);
when(blobContainer.readBlob("segment_1")).thenReturn(mockInputStream);

BlobMetadata blobMetadata = new PlainBlobMetadata("segment_1", 100);

when(blobContainer.listBlobsByPrefixInSortedOrder("segment_1", 1, LEXICOGRAPHIC)).thenReturn(List.of(blobMetadata));

IndexInput indexInput = remoteDirectory.openInput("segment_1", 100, IOContext.DEFAULT);
assertTrue(indexInput instanceof RemoteIndexInput);
assertEquals(100, indexInput.length());
verify(blobContainer, times(0)).listBlobsByPrefixInSortedOrder("segment_1", 1, LEXICOGRAPHIC);
}

public void testOpenInputIOException() throws IOException {
Expand All @@ -228,9 +245,8 @@ public void testOpenInputNoSuchFileException() throws IOException {
}

public void testFileLength() throws IOException {
Map<String, BlobMetadata> fileInfo = new HashMap<>();
fileInfo.put("segment_1", new PlainBlobMetadata("segment_1", 100));
when(blobContainer.listBlobsByPrefix("segment_1")).thenReturn(fileInfo);
BlobMetadata blobMetadata = new PlainBlobMetadata("segment_1", 100);
when(blobContainer.listBlobsByPrefixInSortedOrder("segment_1", 1, LEXICOGRAPHIC)).thenReturn(List.of(blobMetadata));

assertEquals(100, remoteDirectory.fileLength("segment_1"));
}
Expand All @@ -246,13 +262,7 @@ public void testListFilesByPrefixInLexicographicOrder() throws IOException {
LatchedActionListener<List<BlobMetadata>> latchedActionListener = invocation.getArgument(3);
latchedActionListener.onResponse(List.of(new PlainBlobMetadata("metadata_1", 1)));
return null;
}).when(blobContainer)
.listBlobsByPrefixInSortedOrder(
eq("metadata"),
eq(1),
eq(BlobContainer.BlobNameSortOrder.LEXICOGRAPHIC),
any(ActionListener.class)
);
}).when(blobContainer).listBlobsByPrefixInSortedOrder(eq("metadata"), eq(1), eq(LEXICOGRAPHIC), any(ActionListener.class));

assertEquals(List.of("metadata_1"), remoteDirectory.listFilesByPrefixInLexicographicOrder("metadata", 1));
}
Expand All @@ -262,13 +272,7 @@ public void testListFilesByPrefixInLexicographicOrderEmpty() throws IOException
LatchedActionListener<List<BlobMetadata>> latchedActionListener = invocation.getArgument(3);
latchedActionListener.onResponse(List.of());
return null;
}).when(blobContainer)
.listBlobsByPrefixInSortedOrder(
eq("metadata"),
eq(1),
eq(BlobContainer.BlobNameSortOrder.LEXICOGRAPHIC),
any(ActionListener.class)
);
}).when(blobContainer).listBlobsByPrefixInSortedOrder(eq("metadata"), eq(1), eq(LEXICOGRAPHIC), any(ActionListener.class));

assertEquals(List.of(), remoteDirectory.listFilesByPrefixInLexicographicOrder("metadata", 1));
}
Expand All @@ -278,13 +282,7 @@ public void testListFilesByPrefixInLexicographicOrderException() {
LatchedActionListener<List<BlobMetadata>> latchedActionListener = invocation.getArgument(3);
latchedActionListener.onFailure(new IOException("Error"));
return null;
}).when(blobContainer)
.listBlobsByPrefixInSortedOrder(
eq("metadata"),
eq(1),
eq(BlobContainer.BlobNameSortOrder.LEXICOGRAPHIC),
any(ActionListener.class)
);
}).when(blobContainer).listBlobsByPrefixInSortedOrder(eq("metadata"), eq(1), eq(LEXICOGRAPHIC), any(ActionListener.class));

assertThrows(IOException.class, () -> remoteDirectory.listFilesByPrefixInLexicographicOrder("metadata", 1));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
import static org.opensearch.test.RemoteStoreTestUtils.createMetadataFileBytes;
import static org.opensearch.test.RemoteStoreTestUtils.getDummyMetadata;
import static org.hamcrest.CoreMatchers.is;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.contains;
import static org.mockito.Mockito.any;
Expand Down Expand Up @@ -391,7 +392,7 @@ public void testOpenInput() throws IOException {
remoteSegmentStoreDirectory.init();

IndexInput indexInput = mock(IndexInput.class);
when(remoteDataDirectory.openInput(startsWith("_0.si"), eq(IOContext.DEFAULT))).thenReturn(indexInput);
when(remoteDataDirectory.openInput(startsWith("_0.si"), anyLong(), eq(IOContext.DEFAULT))).thenReturn(indexInput);

assertEquals(indexInput, remoteSegmentStoreDirectory.openInput("_0.si", IOContext.DEFAULT));
}
Expand All @@ -404,7 +405,7 @@ public void testOpenInputException() throws IOException {
populateMetadata();
remoteSegmentStoreDirectory.init();

when(remoteDataDirectory.openInput(startsWith("_0.si"), eq(IOContext.DEFAULT))).thenThrow(new IOException("Error"));
when(remoteDataDirectory.openInput(startsWith("_0.si"), anyLong(), eq(IOContext.DEFAULT))).thenThrow(new IOException("Error"));

assertThrows(IOException.class, () -> remoteSegmentStoreDirectory.openInput("_0.si", IOContext.DEFAULT));
}
Expand Down

0 comments on commit 8e5e54e

Please sign in to comment.