Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Searchable Snapshot] Implement cache restore mechanism and track phantom files #5980

Closed
aabukhalil opened this issue Jan 23, 2023 · 5 comments · Fixed by #6538
Closed

[Searchable Snapshot] Implement cache restore mechanism and track phantom files #5980

aabukhalil opened this issue Jan 23, 2023 · 5 comments · Fixed by #6538
Assignees
Labels
distributed framework enhancement Enhancement or improvement to existing feature or request

Comments

@aabukhalil
Copy link
Contributor

Refereeing to #4964

Right now, FileCache capacity is computed arbitrary using the first data path x 50% (hard coded) of total size. Also, for now, Cached files are stored within the corresponding indices directories. Also, for now, when node restart it won't put the cached files into the cache but will ignore them (phantom files). Next step is to decide which cache scope we should use and where to store the cached data and construct the cache accordingly. Change Node bootstrap to construct the named cached based on the List, it should include the cached files walk through logic as well to restore them to cache or to delete them.

@kotwanikunal
Copy link
Member

kotwanikunal commented Mar 6, 2023

There are 3 alternative solutions which were considered -

  1. Sequential/Single threaded loading
  2. Fork Join Pool based loading using recursive action
  3. Thread Pool based loading using recursive runnable

The current solution as a part of #6538 is based on Approach 1.

To compare between solutions, I ran the test code for all of the approaches with the same folder structure which contained 30 indices, each of which had 25 shards. Each of the shards had 200 files. (Roughly 1,50,000 files in the cache folder)
This test was run for 10,000 times and below is the average time needed by each approach -

The execution times turned out to be as follows -

  1. Sequential - 881888.0 nanos (0.000881888 seconds)
  2. ForkJoinPool - 5.754932971E8 nanos (0.5754932971000001 seconds)
  3. Executor Service / Thread Pool based - 632790.4 (0.0006327904 seconds)

For the purposes of cache loading, approach 1 seems to be the best given the overhead/tradeoffs with other approaches.

I will also attach the code for references/future work.

@kotwanikunal
Copy link
Member

To create files -

 public static final String CACHE_FOLDER = "cache";
    public static final String LOCAL_STORE_LOCATION = "RemoteLocalStore";
    private static void createFiles(Path path, String nodeId, String indexName, String shardId, int numFiles) throws IOException {
        Path folderPath = path.resolve(CACHE_FOLDER)
                .resolve(nodeId)
                .resolve(indexName)
                .resolve(shardId)
                .resolve(LOCAL_STORE_LOCATION);

        Files.createDirectories(folderPath);

        for(int i = 0; i < numFiles; i++) {
            Path filePath = folderPath.resolve(String.valueOf(i));
            Files.createFile(filePath);
            Files.write(filePath, "test-data".getBytes());
        }
    }

    public static void createData(Path path, int numIndices, int numShards, int numFiles) throws IOException {
        for (int i = 0; i < numIndices; i++) {
            for(int j = 0; j <numShards; j++) {
                createFiles(path, "node-0", String.valueOf(i)+"-index", String.valueOf(j), numFiles);

            }
        }
    }

@kotwanikunal
Copy link
Member

Sequential solution - currently a part of #6538

Fork Join Pool Action -

public static class LoadFJPFileCache extends RecursiveAction {
        private final Path fileCachePath;
        private final FileCache fileCache;

        public LoadFJPFileCache(Path fileCachePath, FileCache fileCache) {
            this.fileCachePath = fileCachePath;
            this.fileCache = fileCache;
        }

        @Override
        public void compute() {
            List<LoadFJPFileCache> subTasks = new ArrayList<>();
            try (DirectoryStream<Path> directoryStream = Files.newDirectoryStream(fileCachePath)) {
                for (Path subPath : directoryStream) {
                    if (Files.isDirectory(subPath) && FileCreator.LOCAL_STORE_LOCATION.equals(subPath.getFileName().toString())) {
                        File subPathFolder = new File(subPath.toString());
                        for (File subPathFile : subPathFolder.listFiles()) {
                            if (subPathFile.isFile()) {
                                fileCache.sizeMap.put(subPathFile.toPath().toRealPath(), subPathFile.length());
                            }
                        }
                    } else if (Files.isDirectory(subPath)) {
                        subTasks.add(new LoadFJPFileCache(subPath, fileCache));
                    }
                }
            } catch (IOException e) {
                System.out.println("Exception: " + e);
                return;
            }
            invokeAll(subTasks);
        }
    }

@kotwanikunal
Copy link
Member

ThreadPool based solution -

 public static class LoadThreadPoolFileCache implements Runnable {
        private final Path fileCachePath;
        private final FileCache fileCache;
        private final ExecutorService threadPoolExecutor;

        public LoadThreadPoolFileCache(Path fileCachePath, FileCache fileCache, ExecutorService threadPoolExecutor) {
            this.fileCachePath = fileCachePath;
            this.fileCache = fileCache;
            this.threadPoolExecutor = threadPoolExecutor;
        }

        public void run() {
            try (DirectoryStream<Path> directoryStream = Files.newDirectoryStream(fileCachePath)) {
                for (Path subPath : directoryStream) {
                    if (Files.isDirectory(subPath) && FileCreator.LOCAL_STORE_LOCATION.equals(subPath.getFileName().toString())) {
                        File subPathFolder = new File(subPath.toString());
                        for (File subPathFile : subPathFolder.listFiles()) {
                            if (subPathFile.isFile()) {
                                fileCache.sizeMap.put(subPathFile.toPath().toRealPath(), subPathFile.length());
                            }
                        }
                    } else if (Files.isDirectory(subPath)) {
                        threadPoolExecutor.execute(new LoadThreadPoolFileCache(subPath, fileCache, threadPoolExecutor));
                    }
                }
            } catch (IOException e) {
                System.out.println("Exception: " + e);
            }
        }
    }

@kotwanikunal
Copy link
Member

Also adding in the actual execution code -

public static void restoreCache() throws InterruptedException {
        Path cachePath = Path.of("xyz");
        List<Long> sequentialDuration = new ArrayList<>();
        List<Long> fjpDuration = new ArrayList<>();
        List<Long> threadPoolDuration = new ArrayList<>();

        for (int i = 0; i < 10000; i++ ) {
            Instant preTest = Instant.now();
            FileCache sequentialFileCache = new FileCache();
            FileCache.restoreSequentialFileCache(cachePath, sequentialFileCache);
            Instant postTest = Instant.now();
//            System.out.println("Sequential Time Taken: " + Duration.between(preTest, postTest).get(ChronoUnit.NANOS));
            sequentialDuration.add(Duration.between(preTest, postTest).get(ChronoUnit.NANOS));

            ForkJoinPool forkJoinPool = FileCache.createForkJoinPool(10);
            FileCache fjpFileCache = new FileCache();
            Instant preFJPTest = Instant.now();
            forkJoinPool.submit(new FileCache.LoadFJPFileCache(cachePath, fjpFileCache));
            forkJoinPool.shutdown();
            forkJoinPool.awaitTermination(10, TimeUnit.MINUTES);
            Instant postFJPTest = Instant.now();
//            System.out.println("FJP Time Taken: " + Duration.between(preFJPTest, postFJPTest).get(ChronoUnit.NANOS));
            fjpDuration.add(Duration.between(preFJPTest, postFJPTest).get(ChronoUnit.NANOS));

            ExecutorService threadPoolExecutor = Executors.newFixedThreadPool(10);
            FileCache threadPoolFileCache = new FileCache();
            Instant preTPTest = Instant.now();
            threadPoolExecutor.submit(new FileCache.LoadThreadPoolFileCache(cachePath, threadPoolFileCache, threadPoolExecutor));
            threadPoolExecutor.shutdown();
            threadPoolExecutor.awaitTermination(10, TimeUnit.MINUTES);
            Instant postTPTest = Instant.now();
//            System.out.println("Thread Pool Executor Time Taken: " + Duration.between(preTPTest, postTPTest).get(ChronoUnit.NANOS));
            threadPoolDuration.add(Duration.between(preTPTest, postTPTest).get(ChronoUnit.NANOS));
        }

        System.out.println("Average duration");
        System.out.println("Sequential: " + sequentialDuration.stream().collect(Collectors.averagingLong(x -> x)));
        System.out.println("FJP: " + fjpDuration.stream().collect(Collectors.averagingLong(x -> x)));
        System.out.println("ThreadPool: " + threadPoolDuration.stream().collect(Collectors.averagingLong(x -> x)));
    }

@github-project-automation github-project-automation bot moved this from In Progress to Done in Searchable Snapshots Mar 9, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
distributed framework enhancement Enhancement or improvement to existing feature or request
Projects
Status: Done
Development

Successfully merging a pull request may close this issue.

3 participants