Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix WebHDFS adapter query to retry failed resources #332

Merged
merged 6 commits into from
Jun 19, 2020
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,14 @@ public String getSystemName() {

@Override
public QueryResponse query(QueryRequest queryRequest) {
List<String> failedItemIds = queryRequest.getFailedItemIds();

List<FileStatus> filesToReplicate = getFilesToReplicate(queryRequest.getModifiedAfter());
if (!failedItemIds.isEmpty()) {
LOGGER.info(String.format("Found failed item IDs: %s", failedItemIds));
}

List<FileStatus> filesToReplicate =
getFilesToReplicate(failedItemIds, queryRequest.getModifiedAfter());

filesToReplicate.sort(Comparator.comparing(FileStatus::getModificationTime));

Expand Down Expand Up @@ -255,7 +261,8 @@ Metadata createMetadata(FileStatus fileStatus) {
* @param modificationTime modification time of the file
* @return a {@code String} representing a version-4 UUID
*/
private String getVersion4Uuid(String fileUrl, Date modificationTime) {
@VisibleForTesting
String getVersion4Uuid(String fileUrl, Date modificationTime) {
String input = String.format("%s_%d", fileUrl, modificationTime.getTime());
String version3Uuid = UUID.nameUUIDFromBytes(input.getBytes()).toString();
StringBuilder version4Uuid = new StringBuilder(version3Uuid);
Expand All @@ -270,11 +277,12 @@ private String getVersion4Uuid(String fileUrl, Date modificationTime) {
* additional results if the file system contains more results than can be returned in a single
* response.
*
* @param failedItemIds the list of IDs that failed to be created
* @param filterDate specifies a point in time such that only files more recent are returned
* @return a resulting {@code List} of {@link FileStatus} objects meeting the criteria
*/
@VisibleForTesting
List<FileStatus> getFilesToReplicate(@Nullable Date filterDate) {
List<FileStatus> getFilesToReplicate(List<String> failedItemIds, @Nullable Date filterDate) {

List<FileStatus> filesToReplicate = new ArrayList<>();
AtomicInteger remainingEntries = new AtomicInteger();
Expand Down Expand Up @@ -315,7 +323,7 @@ List<FileStatus> getFilesToReplicate(@Nullable Date filterDate) {
pathSuffix.set(results.get(results.size() - 1).getPathSuffix());
}

return getRelevantFiles(results, filterDate);
return getRelevantFiles(results, failedItemIds, filterDate);

} else {
throw new ReplicationException(
Expand All @@ -338,22 +346,53 @@ List<FileStatus> getFilesToReplicate(@Nullable Date filterDate) {

/**
* Returns the files meeting the criteria for replication by removing elements that: 1) are of
* type DIRECTORY or 2) have a modification time before or equal to the filter date, when the
* filter date is specified
* type DIRECTORY or 2) are not one of the failed IDs and have a modification time before or equal
* to the filter date, when the filter date is specified
*
* @param files a {@code List} of all {@link FileStatus} objects returned by the GET request
* @param failedItemIds a {@code List} of the IDs that failed to be created
* @param filterDate specifies a point in time such that only files more recent are included; this
* value will be set to {@code null} during the first execution of replication
* @return a resulting {@code List} of {@link FileStatus} objects meeting the criteria
*/
@VisibleForTesting
List<FileStatus> getRelevantFiles(List<FileStatus> files, @Nullable Date filterDate) {
files.removeIf(
file ->
file.isDirectory()
|| (filterDate != null && !file.getModificationTime().after(filterDate)));
List<FileStatus> getRelevantFiles(
List<FileStatus> files, List<String> failedItemIds, @Nullable Date filterDate) {
List<FileStatus> results = new ArrayList<>();
Map<String, FileStatus> oldFiles = new HashMap<>();

for (FileStatus file : files) {
// skip over any directories
if (file.isDirectory()) {
continue;
}

// files modified after the filter date should always be added to the returned results
if (filterDate == null || file.getModificationTime().after(filterDate)) {
results.add(file);
} else {
String fileUrl = getWebHdfsUrl().toString() + file.getPathSuffix();
Date modificationTime = file.getModificationTime();
String id = getVersion4Uuid(fileUrl, modificationTime);

oldFiles.put(id, file);
}
}

return addFailedItemsToRelevantFiles(results, oldFiles, failedItemIds);
}

private List<FileStatus> addFailedItemsToRelevantFiles(List<FileStatus> relevantFiles, Map<String, FileStatus> oldFiles, List<String> failedItemIds) {
// only need to do the lookup if there are items in the failed ID list
mdang8 marked this conversation as resolved.
Show resolved Hide resolved
if (oldFiles.isEmpty() || !failedItemIds.isEmpty()) {
mdang8 marked this conversation as resolved.
Show resolved Hide resolved
for (String failedId : failedItemIds) {
if (oldFiles.containsKey(failedId)) {
relevantFiles.add(oldFiles.get(failedId));
}
}
}

return files;
return relevantFiles;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Calendar;
import java.util.Collections;
Expand All @@ -52,6 +53,7 @@
import java.util.TimeZone;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.apache.commons.io.IOUtils;
import org.apache.http.Header;
import org.apache.http.HttpEntity;
Expand Down Expand Up @@ -84,6 +86,8 @@ public class WebHdfsNodeAdapterTest {

@Rule public ExpectedException thrown = ExpectedException.none();

private static final String WEBHDFS_URL = "http://host:1234/some/path/";

WebHdfsNodeAdapter webHdfsNodeAdapter;

CloseableHttpClient client;
Expand All @@ -92,7 +96,7 @@ public class WebHdfsNodeAdapterTest {
public void setup() throws MalformedURLException {

client = mock(CloseableHttpClient.class);
webHdfsNodeAdapter = new WebHdfsNodeAdapter(new URL("http://host:1234/some/path/"), client);
webHdfsNodeAdapter = new WebHdfsNodeAdapter(new URL(WEBHDFS_URL), client);
}

@SuppressWarnings("unchecked")
Expand Down Expand Up @@ -216,6 +220,105 @@ public void testQuery() throws IOException {
assertThat(metadataAttributes.get("resource-size").getValue(), is("251"));
}

@Test
public void testQueryWithFailedItemIds() throws IOException {
Calendar cal = Calendar.getInstance();
cal.clear();
cal.setTimeZone(TimeZone.getTimeZone("UTC"));

cal.set(Calendar.MONTH, 1);
cal.set(Calendar.DAY_OF_MONTH, 1);
cal.set(Calendar.YEAR, 2000);

// dates before the filter date
Date beforeFilterDate1 = cal.getTime();
cal.add(Calendar.DAY_OF_MONTH, 1);
Date beforeFilterDate2 = cal.getTime();

// the filter date
cal.add(Calendar.DAY_OF_MONTH, 1);
Date filterDate = cal.getTime();

// dates after the filter date
cal.add(Calendar.DAY_OF_MONTH, 1);
Date afterFilterDate1 = cal.getTime();
cal.add(Calendar.DAY_OF_MONTH, 1);
Date afterFilterDate2 = cal.getTime();

// file1 should be in the response list and file2 should NOT be
FileStatus file1 = getFileStatus(afterFilterDate1, "file1.ext", "FILE", 251);
FileStatus file2 = getFileStatus(beforeFilterDate1, "file2.ext", "FILE", 251);

// the failed files should be in the response list
FileStatus failedFile1 = getFileStatus(beforeFilterDate2, "failed-file1.ext", "FILE", 251);
FileStatus failedFile2 = getFileStatus(beforeFilterDate1, "failed-file2.ext", "FILE", 251);
FileStatus failedFile3 = getFileStatus(afterFilterDate2, "failed-file3.ext", "FILE", 251);

// when sorted by modification time the order should be: failedFile2, failedFile1, file1,
// failedFile3
List<FileStatus> files =
Stream.of(file1, file2, failedFile1, failedFile2, failedFile3).collect(Collectors.toList());

// the ids generated for the failed files will populate the failed item ids list
String failedIdFile1 =
webHdfsNodeAdapter.getVersion4Uuid(
WEBHDFS_URL + failedFile1.getPathSuffix(), failedFile1.getModificationTime());
String failedIdFile2 =
webHdfsNodeAdapter.getVersion4Uuid(
WEBHDFS_URL + failedFile2.getPathSuffix(), failedFile2.getModificationTime());
String failedIdFile3 =
webHdfsNodeAdapter.getVersion4Uuid(
WEBHDFS_URL + failedFile3.getPathSuffix(), failedFile3.getModificationTime());

List<String> failedItemIds = new ArrayList<>();
failedItemIds.add(failedIdFile1);
failedItemIds.add(failedIdFile2);
failedItemIds.add(failedIdFile3);

QueryRequest queryRequest = mock(QueryRequest.class);
when(queryRequest.getModifiedAfter()).thenReturn(filterDate);
when(queryRequest.getFailedItemIds()).thenReturn(failedItemIds);

String idl = getIterativeDirectoryListingAsString(files, 0);
InputStream inputStream = new ByteArrayInputStream(idl.getBytes(UTF_8));

HttpResponse response = mock(HttpResponse.class);
StatusLine statusLine = mock(StatusLine.class);
when(response.getStatusLine()).thenReturn(statusLine);
when(statusLine.getStatusCode()).thenReturn(200);

HttpEntity httpEntity = mock(HttpEntity.class);
when(response.getEntity()).thenReturn(httpEntity);
when(httpEntity.getContent()).thenReturn(inputStream);

doAnswer(
invocationOnMock -> {
ResponseHandler<List<FileStatus>> responseHandler =
(ResponseHandler<List<FileStatus>>) invocationOnMock.getArguments()[1];
return responseHandler.handleResponse(response);
})
.when(client)
.execute(any(HttpGet.class), any(ResponseHandler.class));

QueryResponse queryResponse = webHdfsNodeAdapter.query(queryRequest);

Iterable<Metadata> metadataIterable = queryResponse.getMetadata();
Metadata failedFile2Metadata = Iterables.get(metadataIterable, 0);
Metadata failedFile1Metadata = Iterables.get(metadataIterable, 1);
Metadata file1Metadata = Iterables.get(metadataIterable, 2);
Metadata failedFile3Metadata = Iterables.get(metadataIterable, 3);
int metadataIterableSize =
(int) StreamSupport.stream(metadataIterable.spliterator(), false).count();

assertThat(metadataIterableSize, is(4));
assertThat(
failedFile1Metadata.getResourceUri().toString(),
is(String.format("%s%s", WEBHDFS_URL, "failed-file1.ext")));
assertThat(
file1Metadata.getResourceUri().toString(),
is(String.format("%s%s", WEBHDFS_URL, "file1.ext")));
}
mdang8 marked this conversation as resolved.
Show resolved Hide resolved

@SuppressWarnings("unchecked")
@Test
public void testCreateMetadata() {
Expand Down Expand Up @@ -281,25 +384,72 @@ public void testCreateMetadataBadUri() {

@Test
public void testGetRelevantFilesNullFilterDate() {
Date date = new Date();

FileStatus file1 = new FileStatus();
file1.setType("FILE");
file1.setModificationTime(date);
FileStatus file2 = new FileStatus();
file2.setType("DIRECTORY");
file2.setModificationTime(date);
FileStatus file3 = new FileStatus();
file3.setType("FILE");
file3.setModificationTime(date);

List<FileStatus> files = Stream.of(file1, file2, file3).collect(Collectors.toList());

// when getting a list of relevant files with no specified filter date
List<FileStatus> result = webHdfsNodeAdapter.getRelevantFiles(files, null);
List<FileStatus> result = webHdfsNodeAdapter.getRelevantFiles(files, new ArrayList<>(), null);

// then all items of type FILE are returned
assertThat(result.get(0), is(file1));
assertThat(result.get(1), is(file3));
assertThat(result.size(), is(2));
}

@Test
public void testGetRelevantFilesFailedItemsIncluded() {
// for non-failed items, only the ones with modification times after the filter date should be
// included
Date filterDate = new Date(200000L);
Date beforeFilterDate = new Date(100000L);
Date afterFilterDate = new Date(300000L);

FileStatus file1 = new FileStatus();
file1.setType("FILE");
file1.setModificationTime(afterFilterDate);
file1.setPathSuffix("file1");
FileStatus file2 = new FileStatus();
file2.setType("FILE");
file2.setModificationTime(afterFilterDate);
file2.setPathSuffix("file2");

// this file should get filtered because it has a modification time before the filter date
FileStatus file3 = new FileStatus();
file3.setType("FILE");
file3.setModificationTime(beforeFilterDate);
file3.setPathSuffix("file3");

// using this file as the "failed item"
FileStatus file4 = new FileStatus();
file4.setType("FILE");
file4.setModificationTime(beforeFilterDate);
file4.setPathSuffix("file4");
String failedItemUrl = WEBHDFS_URL + file4.getPathSuffix();
String failedId =
webHdfsNodeAdapter.getVersion4Uuid(failedItemUrl, file4.getModificationTime());

List<FileStatus> files = Stream.of(file1, file2, file3, file4).collect(Collectors.toList());
List<String> failedItemIds = Collections.singletonList(failedId);

List<FileStatus> result = webHdfsNodeAdapter.getRelevantFiles(files, failedItemIds, filterDate);

assertThat(result.size(), is(3));
assertThat(result.get(0), is(file1));
assertThat(result.get(1), is(file2));
assertThat(result.get(2), is(file4));
}

@SuppressWarnings("unchecked")
@Test
public void testGetFilesToReplicate() throws IOException {
Expand Down Expand Up @@ -367,7 +517,8 @@ public void testGetFilesToReplicate() throws IOException {
.execute(any(HttpGet.class), any(ResponseHandler.class));

// when the adapter retrieves the files to replicate
List<FileStatus> filesToReplicate = webHdfsNodeAdapter.getFilesToReplicate(filter);
List<FileStatus> filesToReplicate =
webHdfsNodeAdapter.getFilesToReplicate(new ArrayList<>(), filter);

// then there are three files that fit the criteria
assertThat(filesToReplicate.size() == 3, is(true));
Expand Down Expand Up @@ -417,7 +568,7 @@ public void testGetFilesToReplicateBadStatusCode() throws IOException {
thrown.expect(ReplicationException.class);
thrown.expectMessage("List Status Batch request failed with status code: 400");

webHdfsNodeAdapter.getFilesToReplicate(new Date());
webHdfsNodeAdapter.getFilesToReplicate(new ArrayList<>(), new Date());
}

@SuppressWarnings({"Duplicates", "unchecked"})
Expand Down