Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix WebHDFS adapter query to retry failed resources #332

Merged
merged 6 commits into from
Jun 19, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,14 @@ public String getSystemName() {

@Override
public QueryResponse query(QueryRequest queryRequest) {
List<String> failedItemIds = queryRequest.getFailedItemIds();

List<FileStatus> filesToReplicate = getFilesToReplicate(queryRequest.getModifiedAfter());
if (!failedItemIds.isEmpty()) {
LOGGER.info(String.format("Found failed item IDs: %s", failedItemIds));
}

List<FileStatus> filesToReplicate =
getFilesToReplicate(failedItemIds, queryRequest.getModifiedAfter());

filesToReplicate.sort(Comparator.comparing(FileStatus::getModificationTime));

Expand Down Expand Up @@ -255,7 +261,8 @@ Metadata createMetadata(FileStatus fileStatus) {
* @param modificationTime modification time of the file
* @return a {@code String} representing a version-4 UUID
*/
private String getVersion4Uuid(String fileUrl, Date modificationTime) {
@VisibleForTesting
String getVersion4Uuid(String fileUrl, Date modificationTime) {
String input = String.format("%s_%d", fileUrl, modificationTime.getTime());
String version3Uuid = UUID.nameUUIDFromBytes(input.getBytes()).toString();
StringBuilder version4Uuid = new StringBuilder(version3Uuid);
Expand All @@ -270,11 +277,12 @@ private String getVersion4Uuid(String fileUrl, Date modificationTime) {
* additional results if the file system contains more results than can be returned in a single
* response.
*
* @param failedItemIds the list of IDs that failed to be created
* @param filterDate specifies a point in time such that only files more recent are returned
* @return a resulting {@code List} of {@link FileStatus} objects meeting the criteria
*/
@VisibleForTesting
List<FileStatus> getFilesToReplicate(@Nullable Date filterDate) {
List<FileStatus> getFilesToReplicate(List<String> failedItemIds, @Nullable Date filterDate) {

List<FileStatus> filesToReplicate = new ArrayList<>();
AtomicInteger remainingEntries = new AtomicInteger();
Expand Down Expand Up @@ -315,7 +323,7 @@ List<FileStatus> getFilesToReplicate(@Nullable Date filterDate) {
pathSuffix.set(results.get(results.size() - 1).getPathSuffix());
}

return getRelevantFiles(results, filterDate);
return getRelevantFiles(results, failedItemIds, filterDate);

} else {
throw new ReplicationException(
Expand All @@ -337,23 +345,77 @@ List<FileStatus> getFilesToReplicate(@Nullable Date filterDate) {
}

/**
* Returns the files meeting the criteria for replication by removing elements that: 1) are of
* type DIRECTORY or 2) have a modification time before or equal to the filter date, when the
* filter date is specified
* Takes a list of {@link FileStatus} and returns only the ones that are relevant to the current
* replication run. Relevancy is determined by meeting the following criteria:
*
* @param files a {@code List} of all {@link FileStatus} objects returned by the GET request
* @param filterDate specifies a point in time such that only files more recent are included; this
* value will be set to {@code null} during the first execution of replication
* @return a resulting {@code List} of {@link FileStatus} objects meeting the criteria
* <ol>
* <li>It is of type "FILE". A valid {@link FileStatus} can be either a directory or a file.
* <li>It has a modification date after the {@code filterDate} OR it has a UUID matching one in
* the failed IDs list
* </ol>
*
* <p>For the case when a replication job is first run and the {@code filterDate} is null, all
* files will be included in the returned results.
*
* @param files - the list of {@link FileStatus} to be filtered down
* @param failedItemIds - the list of failed IDs from the previous replication run
* @param filterDate - the date to use in filtering so that all files modified after will be
* included; this value will be set to {@code null} on the first replication run
* @return A list of {@link FileStatus} that have met the relevancy criteria.
*/
@VisibleForTesting
List<FileStatus> getRelevantFiles(List<FileStatus> files, @Nullable Date filterDate) {
files.removeIf(
file ->
file.isDirectory()
|| (filterDate != null && !file.getModificationTime().after(filterDate)));
List<FileStatus> getRelevantFiles(
List<FileStatus> files, List<String> failedItemIds, @Nullable Date filterDate) {
List<FileStatus> results = new ArrayList<>();

// this map contains the UUIDs and associated files that are modified before the filter date
Map<String, FileStatus> oldFiles = new HashMap<>();

for (FileStatus file : files) {
// skip over any directories since we only want files
if (file.isDirectory()) {
continue;
}

// files modified after the filter date should always be added to the returned results
if (filterDate == null || file.getModificationTime().after(filterDate)) {
results.add(file);
} else {
String id =
getVersion4Uuid(
getWebHdfsUrl().toString() + file.getPathSuffix(), file.getModificationTime());

oldFiles.put(id, file);
}
}

return addFailedItemsToRelevantFiles(results, oldFiles, failedItemIds);
}

/**
* Iterates through the list of failed IDs to look up the associated {@link FileStatus} in the
* map.
*
* @param relevantFiles - the list of {@link FileStatus} to be returned
* @param oldFiles - the list of {@link FileStatus} with modification date before the filter date
* @param failedItemIds - the list of failed item IDs
* @return A list of {@link FileStatus} representing the full set of files to be returned
*/
private List<FileStatus> addFailedItemsToRelevantFiles(
List<FileStatus> relevantFiles,
Map<String, FileStatus> oldFiles,
List<String> failedItemIds) {
// the lookup should only occur when there are items in both the old files map and list of
// failed IDs
if (!oldFiles.isEmpty() && !failedItemIds.isEmpty()) {
for (String failedId : failedItemIds) {
if (oldFiles.containsKey(failedId)) {
relevantFiles.add(oldFiles.get(failedId));
}
}
}

return files;
return relevantFiles;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Calendar;
import java.util.Collections;
Expand All @@ -52,6 +53,7 @@
import java.util.TimeZone;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.apache.commons.io.IOUtils;
import org.apache.http.Header;
import org.apache.http.HttpEntity;
Expand Down Expand Up @@ -84,6 +86,8 @@ public class WebHdfsNodeAdapterTest {

@Rule public ExpectedException thrown = ExpectedException.none();

private static final String WEBHDFS_URL = "http://host:1234/some/path/";

WebHdfsNodeAdapter webHdfsNodeAdapter;

CloseableHttpClient client;
Expand All @@ -92,7 +96,7 @@ public class WebHdfsNodeAdapterTest {
public void setup() throws MalformedURLException {

client = mock(CloseableHttpClient.class);
webHdfsNodeAdapter = new WebHdfsNodeAdapter(new URL("http://host:1234/some/path/"), client);
webHdfsNodeAdapter = new WebHdfsNodeAdapter(new URL(WEBHDFS_URL), client);
}

@SuppressWarnings("unchecked")
Expand Down Expand Up @@ -216,6 +220,110 @@ public void testQuery() throws IOException {
assertThat(metadataAttributes.get("resource-size").getValue(), is("251"));
}

@Test
public void testQueryWithFailedItemIds() throws IOException {
Calendar cal = Calendar.getInstance();
cal.clear();
cal.setTimeZone(TimeZone.getTimeZone("UTC"));

cal.set(Calendar.MONTH, 1);
cal.set(Calendar.DAY_OF_MONTH, 1);
cal.set(Calendar.YEAR, 2000);

// dates before the filter date
Date beforeFilterDate1 = cal.getTime();
cal.add(Calendar.DAY_OF_MONTH, 1);
Date beforeFilterDate2 = cal.getTime();

// the filter date
cal.add(Calendar.DAY_OF_MONTH, 1);
Date filterDate = cal.getTime();

// dates after the filter date
cal.add(Calendar.DAY_OF_MONTH, 1);
Date afterFilterDate1 = cal.getTime();
cal.add(Calendar.DAY_OF_MONTH, 1);
Date afterFilterDate2 = cal.getTime();

// file1 should be in the response list and file2 should NOT be
FileStatus file1 = getFileStatus(afterFilterDate1, "file1.ext", "FILE", 251);
FileStatus file2 = getFileStatus(beforeFilterDate1, "file2.ext", "FILE", 251);

// the failed files should be in the response list
FileStatus failedFile1 = getFileStatus(beforeFilterDate2, "failed-file1.ext", "FILE", 251);
FileStatus failedFile2 = getFileStatus(beforeFilterDate1, "failed-file2.ext", "FILE", 251);
FileStatus failedFile3 = getFileStatus(afterFilterDate2, "failed-file3.ext", "FILE", 251);

// when sorted by modification time the order should be: failedFile2, failedFile1, file1,
// failedFile3
List<FileStatus> files =
Stream.of(file1, file2, failedFile1, failedFile2, failedFile3).collect(Collectors.toList());

// the ids generated for the failed files will populate the failed item ids list
String failedIdFile1 =
webHdfsNodeAdapter.getVersion4Uuid(
WEBHDFS_URL + failedFile1.getPathSuffix(), failedFile1.getModificationTime());
String failedIdFile2 =
webHdfsNodeAdapter.getVersion4Uuid(
WEBHDFS_URL + failedFile2.getPathSuffix(), failedFile2.getModificationTime());
String failedIdFile3 =
webHdfsNodeAdapter.getVersion4Uuid(
WEBHDFS_URL + failedFile3.getPathSuffix(), failedFile3.getModificationTime());

List<String> failedItemIds = new ArrayList<>();
failedItemIds.add(failedIdFile1);
failedItemIds.add(failedIdFile2);
failedItemIds.add(failedIdFile3);

QueryRequest queryRequest = mock(QueryRequest.class);
when(queryRequest.getModifiedAfter()).thenReturn(filterDate);
when(queryRequest.getFailedItemIds()).thenReturn(failedItemIds);

String idl = getIterativeDirectoryListingAsString(files, 0);
InputStream inputStream = new ByteArrayInputStream(idl.getBytes(UTF_8));

HttpResponse response = mock(HttpResponse.class);
StatusLine statusLine = mock(StatusLine.class);
when(response.getStatusLine()).thenReturn(statusLine);
when(statusLine.getStatusCode()).thenReturn(200);

HttpEntity httpEntity = mock(HttpEntity.class);
when(response.getEntity()).thenReturn(httpEntity);
when(httpEntity.getContent()).thenReturn(inputStream);

doAnswer(
invocationOnMock -> {
ResponseHandler<List<FileStatus>> responseHandler =
(ResponseHandler<List<FileStatus>>) invocationOnMock.getArguments()[1];
return responseHandler.handleResponse(response);
})
.when(client)
.execute(any(HttpGet.class), any(ResponseHandler.class));

QueryResponse queryResponse = webHdfsNodeAdapter.query(queryRequest);

Iterable<Metadata> metadataIterable = queryResponse.getMetadata();
Metadata metadata1 = Iterables.get(metadataIterable, 0);
Metadata metadata2 = Iterables.get(metadataIterable, 1);
Metadata metadata3 = Iterables.get(metadataIterable, 2);
Metadata metadata4 = Iterables.get(metadataIterable, 3);
int metadataIterableSize =
(int) StreamSupport.stream(metadataIterable.spliterator(), false).count();

assertThat(metadataIterableSize, is(4));
assertThat(
metadata1.getResourceUri().toString(),
is(String.format("%s%s", WEBHDFS_URL, "failed-file2.ext")));
assertThat(
metadata2.getResourceUri().toString(),
is(String.format("%s%s", WEBHDFS_URL, "failed-file1.ext")));
assertThat(
metadata3.getResourceUri().toString(), is(String.format("%s%s", WEBHDFS_URL, "file1.ext")));
assertThat(
metadata4.getResourceUri().toString(),
is(String.format("%s%s", WEBHDFS_URL, "failed-file3.ext")));
}
mdang8 marked this conversation as resolved.
Show resolved Hide resolved

@SuppressWarnings("unchecked")
@Test
public void testCreateMetadata() {
Expand Down Expand Up @@ -281,25 +389,72 @@ public void testCreateMetadataBadUri() {

@Test
public void testGetRelevantFilesNullFilterDate() {
Date date = new Date();

FileStatus file1 = new FileStatus();
file1.setType("FILE");
file1.setModificationTime(date);
FileStatus file2 = new FileStatus();
file2.setType("DIRECTORY");
file2.setModificationTime(date);
FileStatus file3 = new FileStatus();
file3.setType("FILE");
file3.setModificationTime(date);

List<FileStatus> files = Stream.of(file1, file2, file3).collect(Collectors.toList());

// when getting a list of relevant files with no specified filter date
List<FileStatus> result = webHdfsNodeAdapter.getRelevantFiles(files, null);
List<FileStatus> result = webHdfsNodeAdapter.getRelevantFiles(files, new ArrayList<>(), null);

// then all items of type FILE are returned
assertThat(result.get(0), is(file1));
assertThat(result.get(1), is(file3));
assertThat(result.size(), is(2));
}

@Test
public void testGetRelevantFilesFailedItemsIncluded() {
// for non-failed items, only the ones with modification times after the filter date should be
// included
Date filterDate = new Date(200000L);
Date beforeFilterDate = new Date(100000L);
Date afterFilterDate = new Date(300000L);

FileStatus file1 = new FileStatus();
file1.setType("FILE");
file1.setModificationTime(afterFilterDate);
file1.setPathSuffix("file1");
FileStatus file2 = new FileStatus();
file2.setType("FILE");
file2.setModificationTime(afterFilterDate);
file2.setPathSuffix("file2");

// this file should get filtered because it has a modification time before the filter date
FileStatus file3 = new FileStatus();
file3.setType("FILE");
file3.setModificationTime(beforeFilterDate);
file3.setPathSuffix("file3");

// using this file as the "failed item"
FileStatus file4 = new FileStatus();
file4.setType("FILE");
file4.setModificationTime(beforeFilterDate);
file4.setPathSuffix("file4");
String failedItemUrl = WEBHDFS_URL + file4.getPathSuffix();
String failedId =
webHdfsNodeAdapter.getVersion4Uuid(failedItemUrl, file4.getModificationTime());

List<FileStatus> files = Stream.of(file1, file2, file3, file4).collect(Collectors.toList());
List<String> failedItemIds = Collections.singletonList(failedId);

List<FileStatus> result = webHdfsNodeAdapter.getRelevantFiles(files, failedItemIds, filterDate);

assertThat(result.size(), is(3));
assertThat(result.get(0), is(file1));
assertThat(result.get(1), is(file2));
assertThat(result.get(2), is(file4));
}

@SuppressWarnings("unchecked")
@Test
public void testGetFilesToReplicate() throws IOException {
Expand Down Expand Up @@ -367,7 +522,8 @@ public void testGetFilesToReplicate() throws IOException {
.execute(any(HttpGet.class), any(ResponseHandler.class));

// when the adapter retrieves the files to replicate
List<FileStatus> filesToReplicate = webHdfsNodeAdapter.getFilesToReplicate(filter);
List<FileStatus> filesToReplicate =
webHdfsNodeAdapter.getFilesToReplicate(new ArrayList<>(), filter);

// then there are three files that fit the criteria
assertThat(filesToReplicate.size() == 3, is(true));
Expand Down Expand Up @@ -417,7 +573,7 @@ public void testGetFilesToReplicateBadStatusCode() throws IOException {
thrown.expect(ReplicationException.class);
thrown.expectMessage("List Status Batch request failed with status code: 400");

webHdfsNodeAdapter.getFilesToReplicate(new Date());
webHdfsNodeAdapter.getFilesToReplicate(new ArrayList<>(), new Date());
}

@SuppressWarnings({"Duplicates", "unchecked"})
Expand Down