diff --git a/adapters/webhdfs-adapter/src/main/java/com/connexta/replication/adapters/webhdfs/WebHdfsNodeAdapter.java b/adapters/webhdfs-adapter/src/main/java/com/connexta/replication/adapters/webhdfs/WebHdfsNodeAdapter.java index d51983934..636096ef0 100644 --- a/adapters/webhdfs-adapter/src/main/java/com/connexta/replication/adapters/webhdfs/WebHdfsNodeAdapter.java +++ b/adapters/webhdfs-adapter/src/main/java/com/connexta/replication/adapters/webhdfs/WebHdfsNodeAdapter.java @@ -151,8 +151,14 @@ public String getSystemName() { @Override public QueryResponse query(QueryRequest queryRequest) { + List failedItemIds = queryRequest.getFailedItemIds(); - List filesToReplicate = getFilesToReplicate(queryRequest.getModifiedAfter()); + if (!failedItemIds.isEmpty()) { + LOGGER.info(String.format("Found failed item IDs: %s", failedItemIds)); + } + + List filesToReplicate = + getFilesToReplicate(failedItemIds, queryRequest.getModifiedAfter()); filesToReplicate.sort(Comparator.comparing(FileStatus::getModificationTime)); @@ -255,7 +261,8 @@ Metadata createMetadata(FileStatus fileStatus) { * @param modificationTime modification time of the file * @return a {@code String} representing a version-4 UUID */ - private String getVersion4Uuid(String fileUrl, Date modificationTime) { + @VisibleForTesting + String getVersion4Uuid(String fileUrl, Date modificationTime) { String input = String.format("%s_%d", fileUrl, modificationTime.getTime()); String version3Uuid = UUID.nameUUIDFromBytes(input.getBytes()).toString(); StringBuilder version4Uuid = new StringBuilder(version3Uuid); @@ -270,11 +277,12 @@ private String getVersion4Uuid(String fileUrl, Date modificationTime) { * additional results if the file system contains more results than can be returned in a single * response. * + * @param failedItemIds the list of IDs that failed to be created * @param filterDate specifies a point in time such that only files more recent are returned * @return a resulting {@code List} of {@link FileStatus} objects meeting the criteria */ @VisibleForTesting - List getFilesToReplicate(@Nullable Date filterDate) { + List getFilesToReplicate(List failedItemIds, @Nullable Date filterDate) { List filesToReplicate = new ArrayList<>(); AtomicInteger remainingEntries = new AtomicInteger(); @@ -315,7 +323,7 @@ List getFilesToReplicate(@Nullable Date filterDate) { pathSuffix.set(results.get(results.size() - 1).getPathSuffix()); } - return getRelevantFiles(results, filterDate); + return getRelevantFiles(results, failedItemIds, filterDate); } else { throw new ReplicationException( @@ -337,23 +345,77 @@ List getFilesToReplicate(@Nullable Date filterDate) { } /** - * Returns the files meeting the criteria for replication by removing elements that: 1) are of - * type DIRECTORY or 2) have a modification time before or equal to the filter date, when the - * filter date is specified + * Takes a list of {@link FileStatus} and returns only the ones that are relevant to the current + * replication run. Relevancy is determined by meeting the following criteria: * - * @param files a {@code List} of all {@link FileStatus} objects returned by the GET request - * @param filterDate specifies a point in time such that only files more recent are included; this - * value will be set to {@code null} during the first execution of replication - * @return a resulting {@code List} of {@link FileStatus} objects meeting the criteria + *
    + *
  1. It is of type "FILE". A valid {@link FileStatus} can be either a directory or a file. + *
  2. It has a modification date after the {@code filterDate} OR it has a UUID matching one in + * the failed IDs list + *
+ * + *

For the case when a replication job is first run and the {@code filterDate} is null, all + * files will be included in the returned results. + * + * @param files - the list of {@link FileStatus} to be filtered down + * @param failedItemIds - the list of failed IDs from the previous replication run + * @param filterDate - the date to use in filtering so that all files modified after will be + * included; this value will be set to {@code null} on the first replication run + * @return A list of {@link FileStatus} that have met the relevancy criteria. */ @VisibleForTesting - List getRelevantFiles(List files, @Nullable Date filterDate) { - files.removeIf( - file -> - file.isDirectory() - || (filterDate != null && !file.getModificationTime().after(filterDate))); + List getRelevantFiles( + List files, List failedItemIds, @Nullable Date filterDate) { + List results = new ArrayList<>(); + + // this map contains the UUIDs and associated files that are modified before the filter date + Map oldFiles = new HashMap<>(); + + for (FileStatus file : files) { + // skip over any directories since we only want files + if (file.isDirectory()) { + continue; + } + + // files modified after the filter date should always be added to the returned results + if (filterDate == null || file.getModificationTime().after(filterDate)) { + results.add(file); + } else { + String id = + getVersion4Uuid( + getWebHdfsUrl().toString() + file.getPathSuffix(), file.getModificationTime()); + + oldFiles.put(id, file); + } + } + + return addFailedItemsToRelevantFiles(results, oldFiles, failedItemIds); + } + + /** + * Iterates through the list of failed IDs to look up the associated {@link FileStatus} in the + * map. + * + * @param relevantFiles - the list of {@link FileStatus} to be returned + * @param oldFiles - the list of {@link FileStatus} with modification date before the filter date + * @param failedItemIds - the list of failed item IDs + * @return A list of {@link FileStatus} representing the full set of files to be returned + */ + private List addFailedItemsToRelevantFiles( + List relevantFiles, + Map oldFiles, + List failedItemIds) { + // the lookup should only occur when there are items in both the old files map and list of + // failed IDs + if (!oldFiles.isEmpty() && !failedItemIds.isEmpty()) { + for (String failedId : failedItemIds) { + if (oldFiles.containsKey(failedId)) { + relevantFiles.add(oldFiles.get(failedId)); + } + } + } - return files; + return relevantFiles; } @Override diff --git a/adapters/webhdfs-adapter/src/test/java/com/connexta/replication/adapters/webhdfs/WebHdfsNodeAdapterTest.java b/adapters/webhdfs-adapter/src/test/java/com/connexta/replication/adapters/webhdfs/WebHdfsNodeAdapterTest.java index fa6a396ed..e0fb7c344 100644 --- a/adapters/webhdfs-adapter/src/test/java/com/connexta/replication/adapters/webhdfs/WebHdfsNodeAdapterTest.java +++ b/adapters/webhdfs-adapter/src/test/java/com/connexta/replication/adapters/webhdfs/WebHdfsNodeAdapterTest.java @@ -43,6 +43,7 @@ import java.net.URI; import java.net.URISyntaxException; import java.net.URL; +import java.util.ArrayList; import java.util.Arrays; import java.util.Calendar; import java.util.Collections; @@ -52,6 +53,7 @@ import java.util.TimeZone; import java.util.stream.Collectors; import java.util.stream.Stream; +import java.util.stream.StreamSupport; import org.apache.commons.io.IOUtils; import org.apache.http.Header; import org.apache.http.HttpEntity; @@ -84,6 +86,8 @@ public class WebHdfsNodeAdapterTest { @Rule public ExpectedException thrown = ExpectedException.none(); + private static final String WEBHDFS_URL = "http://host:1234/some/path/"; + WebHdfsNodeAdapter webHdfsNodeAdapter; CloseableHttpClient client; @@ -92,7 +96,7 @@ public class WebHdfsNodeAdapterTest { public void setup() throws MalformedURLException { client = mock(CloseableHttpClient.class); - webHdfsNodeAdapter = new WebHdfsNodeAdapter(new URL("http://host:1234/some/path/"), client); + webHdfsNodeAdapter = new WebHdfsNodeAdapter(new URL(WEBHDFS_URL), client); } @SuppressWarnings("unchecked") @@ -216,6 +220,110 @@ public void testQuery() throws IOException { assertThat(metadataAttributes.get("resource-size").getValue(), is("251")); } + @Test + public void testQueryWithFailedItemIds() throws IOException { + Calendar cal = Calendar.getInstance(); + cal.clear(); + cal.setTimeZone(TimeZone.getTimeZone("UTC")); + + cal.set(Calendar.MONTH, 1); + cal.set(Calendar.DAY_OF_MONTH, 1); + cal.set(Calendar.YEAR, 2000); + + // dates before the filter date + Date beforeFilterDate1 = cal.getTime(); + cal.add(Calendar.DAY_OF_MONTH, 1); + Date beforeFilterDate2 = cal.getTime(); + + // the filter date + cal.add(Calendar.DAY_OF_MONTH, 1); + Date filterDate = cal.getTime(); + + // dates after the filter date + cal.add(Calendar.DAY_OF_MONTH, 1); + Date afterFilterDate1 = cal.getTime(); + cal.add(Calendar.DAY_OF_MONTH, 1); + Date afterFilterDate2 = cal.getTime(); + + // file1 should be in the response list and file2 should NOT be + FileStatus file1 = getFileStatus(afterFilterDate1, "file1.ext", "FILE", 251); + FileStatus file2 = getFileStatus(beforeFilterDate1, "file2.ext", "FILE", 251); + + // the failed files should be in the response list + FileStatus failedFile1 = getFileStatus(beforeFilterDate2, "failed-file1.ext", "FILE", 251); + FileStatus failedFile2 = getFileStatus(beforeFilterDate1, "failed-file2.ext", "FILE", 251); + FileStatus failedFile3 = getFileStatus(afterFilterDate2, "failed-file3.ext", "FILE", 251); + + // when sorted by modification time the order should be: failedFile2, failedFile1, file1, + // failedFile3 + List files = + Stream.of(file1, file2, failedFile1, failedFile2, failedFile3).collect(Collectors.toList()); + + // the ids generated for the failed files will populate the failed item ids list + String failedIdFile1 = + webHdfsNodeAdapter.getVersion4Uuid( + WEBHDFS_URL + failedFile1.getPathSuffix(), failedFile1.getModificationTime()); + String failedIdFile2 = + webHdfsNodeAdapter.getVersion4Uuid( + WEBHDFS_URL + failedFile2.getPathSuffix(), failedFile2.getModificationTime()); + String failedIdFile3 = + webHdfsNodeAdapter.getVersion4Uuid( + WEBHDFS_URL + failedFile3.getPathSuffix(), failedFile3.getModificationTime()); + + List failedItemIds = new ArrayList<>(); + failedItemIds.add(failedIdFile1); + failedItemIds.add(failedIdFile2); + failedItemIds.add(failedIdFile3); + + QueryRequest queryRequest = mock(QueryRequest.class); + when(queryRequest.getModifiedAfter()).thenReturn(filterDate); + when(queryRequest.getFailedItemIds()).thenReturn(failedItemIds); + + String idl = getIterativeDirectoryListingAsString(files, 0); + InputStream inputStream = new ByteArrayInputStream(idl.getBytes(UTF_8)); + + HttpResponse response = mock(HttpResponse.class); + StatusLine statusLine = mock(StatusLine.class); + when(response.getStatusLine()).thenReturn(statusLine); + when(statusLine.getStatusCode()).thenReturn(200); + + HttpEntity httpEntity = mock(HttpEntity.class); + when(response.getEntity()).thenReturn(httpEntity); + when(httpEntity.getContent()).thenReturn(inputStream); + + doAnswer( + invocationOnMock -> { + ResponseHandler> responseHandler = + (ResponseHandler>) invocationOnMock.getArguments()[1]; + return responseHandler.handleResponse(response); + }) + .when(client) + .execute(any(HttpGet.class), any(ResponseHandler.class)); + + QueryResponse queryResponse = webHdfsNodeAdapter.query(queryRequest); + + Iterable metadataIterable = queryResponse.getMetadata(); + Metadata metadata1 = Iterables.get(metadataIterable, 0); + Metadata metadata2 = Iterables.get(metadataIterable, 1); + Metadata metadata3 = Iterables.get(metadataIterable, 2); + Metadata metadata4 = Iterables.get(metadataIterable, 3); + int metadataIterableSize = + (int) StreamSupport.stream(metadataIterable.spliterator(), false).count(); + + assertThat(metadataIterableSize, is(4)); + assertThat( + metadata1.getResourceUri().toString(), + is(String.format("%s%s", WEBHDFS_URL, "failed-file2.ext"))); + assertThat( + metadata2.getResourceUri().toString(), + is(String.format("%s%s", WEBHDFS_URL, "failed-file1.ext"))); + assertThat( + metadata3.getResourceUri().toString(), is(String.format("%s%s", WEBHDFS_URL, "file1.ext"))); + assertThat( + metadata4.getResourceUri().toString(), + is(String.format("%s%s", WEBHDFS_URL, "failed-file3.ext"))); + } + @SuppressWarnings("unchecked") @Test public void testCreateMetadata() { @@ -281,18 +389,22 @@ public void testCreateMetadataBadUri() { @Test public void testGetRelevantFilesNullFilterDate() { + Date date = new Date(); FileStatus file1 = new FileStatus(); file1.setType("FILE"); + file1.setModificationTime(date); FileStatus file2 = new FileStatus(); file2.setType("DIRECTORY"); + file2.setModificationTime(date); FileStatus file3 = new FileStatus(); file3.setType("FILE"); + file3.setModificationTime(date); List files = Stream.of(file1, file2, file3).collect(Collectors.toList()); // when getting a list of relevant files with no specified filter date - List result = webHdfsNodeAdapter.getRelevantFiles(files, null); + List result = webHdfsNodeAdapter.getRelevantFiles(files, new ArrayList<>(), null); // then all items of type FILE are returned assertThat(result.get(0), is(file1)); @@ -300,6 +412,49 @@ public void testGetRelevantFilesNullFilterDate() { assertThat(result.size(), is(2)); } + @Test + public void testGetRelevantFilesFailedItemsIncluded() { + // for non-failed items, only the ones with modification times after the filter date should be + // included + Date filterDate = new Date(200000L); + Date beforeFilterDate = new Date(100000L); + Date afterFilterDate = new Date(300000L); + + FileStatus file1 = new FileStatus(); + file1.setType("FILE"); + file1.setModificationTime(afterFilterDate); + file1.setPathSuffix("file1"); + FileStatus file2 = new FileStatus(); + file2.setType("FILE"); + file2.setModificationTime(afterFilterDate); + file2.setPathSuffix("file2"); + + // this file should get filtered because it has a modification time before the filter date + FileStatus file3 = new FileStatus(); + file3.setType("FILE"); + file3.setModificationTime(beforeFilterDate); + file3.setPathSuffix("file3"); + + // using this file as the "failed item" + FileStatus file4 = new FileStatus(); + file4.setType("FILE"); + file4.setModificationTime(beforeFilterDate); + file4.setPathSuffix("file4"); + String failedItemUrl = WEBHDFS_URL + file4.getPathSuffix(); + String failedId = + webHdfsNodeAdapter.getVersion4Uuid(failedItemUrl, file4.getModificationTime()); + + List files = Stream.of(file1, file2, file3, file4).collect(Collectors.toList()); + List failedItemIds = Collections.singletonList(failedId); + + List result = webHdfsNodeAdapter.getRelevantFiles(files, failedItemIds, filterDate); + + assertThat(result.size(), is(3)); + assertThat(result.get(0), is(file1)); + assertThat(result.get(1), is(file2)); + assertThat(result.get(2), is(file4)); + } + @SuppressWarnings("unchecked") @Test public void testGetFilesToReplicate() throws IOException { @@ -367,7 +522,8 @@ public void testGetFilesToReplicate() throws IOException { .execute(any(HttpGet.class), any(ResponseHandler.class)); // when the adapter retrieves the files to replicate - List filesToReplicate = webHdfsNodeAdapter.getFilesToReplicate(filter); + List filesToReplicate = + webHdfsNodeAdapter.getFilesToReplicate(new ArrayList<>(), filter); // then there are three files that fit the criteria assertThat(filesToReplicate.size() == 3, is(true)); @@ -417,7 +573,7 @@ public void testGetFilesToReplicateBadStatusCode() throws IOException { thrown.expect(ReplicationException.class); thrown.expectMessage("List Status Batch request failed with status code: 400"); - webHdfsNodeAdapter.getFilesToReplicate(new Date()); + webHdfsNodeAdapter.getFilesToReplicate(new ArrayList<>(), new Date()); } @SuppressWarnings({"Duplicates", "unchecked"})