Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Unit Tests for TOS #18646

Merged
merged 3 commits into from
Jul 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 116 additions & 0 deletions underfs/tos/src/test/java/alluxio/underfs/tos/TOSInputStreamTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/*
* The Alluxio Open Foundation licenses this work under the Apache License, version 2.0
* (the "License"). You may not use this work except in compliance with the License, which is
* available at www.apache.org/licenses/LICENSE-2.0
*
* This software is distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied, as more fully set forth in the License.
*
* See the NOTICE file distributed with this work for information regarding copyright ownership.
*/

package alluxio.underfs.tos;

import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;

import alluxio.conf.AlluxioConfiguration;
import alluxio.conf.Configuration;
import alluxio.conf.PropertyKey;
import alluxio.retry.CountingRetry;

import com.volcengine.tos.TOSV2;
import com.volcengine.tos.model.object.GetObjectV2Input;
import com.volcengine.tos.model.object.GetObjectV2Output;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.text.MessageFormat;
import java.util.Arrays;

/**
* Unit tests for the {@link TOSInputStream}.
*/
public class TOSInputStreamTest {

private static final String BUCKET_NAME = "testBucket";
private static final String OBJECT_KEY = "testObjectKey";
private static AlluxioConfiguration sConf = Configuration.global();

private TOSInputStream mTosInputStream;
private TOSV2 mTosClient;
private InputStream[] mInputStreamSpy;
private GetObjectV2Output[] mTosObject;

/**
* The exception expected to be thrown.
*/
@Rule
public final ExpectedException mExceptionRule = ExpectedException.none();

@Before
public void setUp() throws IOException {
mTosClient = mock(TOSV2.class);

byte[] input = new byte[] {1, 2, 3};
mTosObject = new GetObjectV2Output[input.length];
mInputStreamSpy = new InputStream[input.length];
for (int i = 0; i < input.length; ++i) {
final long pos = (long) i;
mTosObject[i] = mock(GetObjectV2Output.class);
when(mTosClient.getObject(argThat(argument -> {
if (argument instanceof GetObjectV2Input) {
String range = ((GetObjectV2Input) argument).getOptions().getRange();
return range.equals(MessageFormat.format("bytes={0}-", pos));
}
return false;
}))).thenReturn(mTosObject[i]);
byte[] mockInput = Arrays.copyOfRange(input, i, input.length);
mInputStreamSpy[i] = spy(new ByteArrayInputStream(mockInput));
when(mTosObject[i].getContent()).thenReturn(mInputStreamSpy[i]);
}
mTosInputStream = new TOSInputStream(BUCKET_NAME, OBJECT_KEY, mTosClient, new CountingRetry(1),
sConf.getBytes(PropertyKey.UNDERFS_OBJECT_STORE_MULTI_RANGE_CHUNK_SIZE));
}

@Test
public void close() throws IOException {
mTosInputStream.close();

mExceptionRule.expect(IOException.class);
mExceptionRule.expectMessage(is("Stream closed"));
mTosInputStream.read();
}

@Test
public void readInt() throws IOException {
assertEquals(1, mTosInputStream.read());
assertEquals(2, mTosInputStream.read());
assertEquals(3, mTosInputStream.read());
}

@Test
public void readByteArray() throws IOException {
byte[] bytes = new byte[3];
int readCount = mTosInputStream.read(bytes, 0, 3);
assertEquals(3, readCount);
assertArrayEquals(new byte[] {1, 2, 3}, bytes);
}

@Test
public void skip() throws IOException {
assertEquals(1, mTosInputStream.read());
mTosInputStream.skip(1);
assertEquals(3, mTosInputStream.read());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
/*
* The Alluxio Open Foundation licenses this work under the Apache License, version 2.0
* (the "License"). You may not use this work except in compliance with the License, which is
* available at www.apache.org/licenses/LICENSE-2.0
*
* This software is distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied, as more fully set forth in the License.
*
* See the NOTICE file distributed with this work for information regarding copyright ownership.
*/

package alluxio.underfs.tos;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.when;

import alluxio.conf.Configuration;
import alluxio.conf.InstancedConfiguration;
import alluxio.conf.PropertyKey;

import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.volcengine.tos.TOSV2;
import com.volcengine.tos.model.object.CompleteMultipartUploadV2Input;
import com.volcengine.tos.model.object.CompleteMultipartUploadV2Output;
import com.volcengine.tos.model.object.CreateMultipartUploadInput;
import com.volcengine.tos.model.object.CreateMultipartUploadOutput;
import com.volcengine.tos.model.object.PutObjectInput;
import com.volcengine.tos.model.object.PutObjectOutput;
import com.volcengine.tos.model.object.UploadPartV2Input;
import com.volcengine.tos.model.object.UploadPartV2Output;
import com.volcengine.tos.model.object.UploadedPartV2;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;

import java.util.concurrent.Callable;

/**
* Unit tests for the {@link TOSLowLevelOutputStream}.
*/
@RunWith(PowerMockRunner.class)
@PrepareForTest(TOSLowLevelOutputStream.class)
public class TOSLowLevelOutputStreamTest {
private static final String BUCKET_NAME = "testBucket";
private static final String PARTITION_SIZE = "8MB";
private static final String KEY = "testKey";
private static final String UPLOAD_ID = "testUploadId";
private InstancedConfiguration mConf = Configuration.copyGlobal();

private TOSV2 mMockTosClient;
private ListeningExecutorService mMockExecutor;
private ListenableFuture<UploadedPartV2> mMockTag;
private TOSLowLevelOutputStream mStream;

@Before
public void before() throws Exception {
mockTOSClientAndExecutor();
mConf.set(PropertyKey.UNDERFS_TOS_STREAMING_UPLOAD_PARTITION_SIZE, PARTITION_SIZE);
mConf.set(PropertyKey.UNDERFS_TOS_STREAMING_UPLOAD_ENABLED, "true");
mStream = new TOSLowLevelOutputStream(BUCKET_NAME, KEY, mMockTosClient, mMockExecutor, mConf);
}

@Test
public void writeByte() throws Exception {
mStream.write(1);

mStream.close();
Mockito.verify(mMockExecutor, never()).submit(any(Callable.class));
Mockito.verify(mMockTosClient).putObject(any(PutObjectInput.class));
Mockito.verify(mMockTosClient, never())
.createMultipartUpload(any(CreateMultipartUploadInput.class));
Mockito.verify(mMockTosClient, never()).completeMultipartUpload(any(
CompleteMultipartUploadV2Input.class));
assertTrue(mStream.getContentHash().isPresent());
assertEquals("putTag", mStream.getContentHash().get());
}

@Test
public void writeByteArrayForSmallFile() throws Exception {
int partSize = (int) (8 * 1024 * 1024); // 8MB
byte[] b = new byte[partSize];

mStream.write(b, 0, b.length);

mStream.close();
Mockito.verify(mMockExecutor, never()).submit(any(Callable.class));
Mockito.verify(mMockTosClient).putObject(any(PutObjectInput.class));
Mockito.verify(mMockTosClient, never())
.createMultipartUpload(any(CreateMultipartUploadInput.class));
Mockito.verify(mMockTosClient, never())
.completeMultipartUpload(any(CompleteMultipartUploadV2Input.class));
assertTrue(mStream.getContentHash().isPresent());
assertEquals("putTag", mStream.getContentHash().get());
}

@Test
public void writeByteArrayForLargeFile() throws Exception {
int partSize = (int) (8 * 1024 * 1024); // 8MB
byte[] b = new byte[partSize + 1];

mStream.write(b, 0, b.length);

mStream.close();
Mockito.verify(mMockTosClient).createMultipartUpload(any(CreateMultipartUploadInput.class));
Mockito.verify(mMockExecutor, times(2)).submit(any(Callable.class));
Mockito.verify(mMockTosClient)
.completeMultipartUpload(any(CompleteMultipartUploadV2Input.class));
assertTrue(mStream.getContentHash().isPresent());
assertEquals("multiTag", mStream.getContentHash().get());
}

@Test
public void createEmptyFile() throws Exception {
mStream.close();
Mockito.verify(mMockExecutor, never()).submit(any(Callable.class));
Mockito.verify(mMockTosClient, never())
.createMultipartUpload(any(CreateMultipartUploadInput.class));
Mockito.verify(mMockTosClient, never())
.completeMultipartUpload(any(CompleteMultipartUploadV2Input.class));
Mockito.verify(mMockTosClient).putObject(any(PutObjectInput.class));
assertTrue(mStream.getContentHash().isPresent());
assertEquals("emptyTag", mStream.getContentHash().get());
}

@Test
public void flush() throws Exception {
int partSize = (int) (8 * 1024 * 1024); // 8MB
byte[] b = new byte[2 * partSize - 1];

mStream.write(b, 0, b.length);

mStream.flush();
Mockito.verify(mMockExecutor, times(2)).submit(any(Callable.class));
Mockito.verify(mMockTag, times(2)).get();

mStream.close();
Mockito.verify(mMockTosClient)
.completeMultipartUpload(any(CompleteMultipartUploadV2Input.class));
assertTrue(mStream.getContentHash().isPresent());
assertEquals("multiTag", mStream.getContentHash().get());
}

@Test
public void close() throws Exception {
mStream.close();
Mockito.verify(mMockTosClient, never())
.createMultipartUpload(any(CreateMultipartUploadInput.class));
Mockito.verify(mMockTosClient, never())
.completeMultipartUpload(any(CompleteMultipartUploadV2Input.class));
assertTrue(mStream.getContentHash().isPresent());
assertEquals("emptyTag", mStream.getContentHash().get());
}

private void mockTOSClientAndExecutor() throws Exception {
mMockTosClient = PowerMockito.mock(TOSV2.class);

CreateMultipartUploadOutput createOutput = new CreateMultipartUploadOutput();
createOutput.setUploadID(UPLOAD_ID);
when(mMockTosClient.createMultipartUpload(any(CreateMultipartUploadInput.class)))
.thenReturn(createOutput);

UploadPartV2Output uploadPartOutput = new UploadPartV2Output();
uploadPartOutput.setEtag("partTag");
when(mMockTosClient.uploadPart(any(UploadPartV2Input.class))).thenReturn(uploadPartOutput);

// Use Answer to dynamically return PutObjectOutput based on the input
when(mMockTosClient.putObject(any(PutObjectInput.class)))
.thenAnswer(new Answer<PutObjectOutput>() {
@Override
public PutObjectOutput answer(InvocationOnMock invocation) throws Throwable {
PutObjectInput input = invocation.getArgument(0);
PutObjectOutput output = new PutObjectOutput();
// Determine the Etag value based on the input condition
if (input.getContentLength() == 0) {
output.setEtag("emptyTag");
} else {
output.setEtag("putTag");
}
return output;
}
});

CompleteMultipartUploadV2Output completeOutput = new CompleteMultipartUploadV2Output();
completeOutput.setEtag("multiTag");
when(mMockTosClient.completeMultipartUpload(any(CompleteMultipartUploadV2Input.class)))
.thenReturn(completeOutput);

mMockTag = (ListenableFuture<UploadedPartV2>) PowerMockito.mock(ListenableFuture.class);
when(mMockTag.get()).thenReturn(new UploadedPartV2().setPartNumber(1).setEtag("partTag"));
mMockExecutor = Mockito.mock(ListeningExecutorService.class);
when(mMockExecutor.submit(any(Callable.class))).thenReturn(mMockTag);
}
}
Loading
Loading