Skip to content
This repository has been archived by the owner on Jan 3, 2023. It is now read-only.

Commit

Permalink
Add multiple producers for fetching namespace (#1668)
Browse files Browse the repository at this point in the history
  • Loading branch information
PHILO-HE authored and littlezhou committed May 3, 2018
1 parent 618d748 commit dae96aa
Show file tree
Hide file tree
Showing 9 changed files with 154 additions and 109 deletions.
8 changes: 7 additions & 1 deletion conf/smart-default.xml
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,14 @@
</property>

<property>
<name>smart.namespace.fetcher.num.consumers</name>
<name>smart.namespace.fetcher.producers.num</name>
<value>3</value>
<description>Number of producers to produce data fetched by Namespace fetcher</description>
</property>

<property>
<name>smart.namespace.fetcher.consumers.num</name>
<value>6</value>
<description>Number of consumers to consume data fetched by Namespace fetcher</description>
</property>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public class AlluxioNamespaceFetcher {
public AlluxioNamespaceFetcher(FileSystem fs, MetaStore metaStore, long fetchInterval,
ScheduledExecutorService service) {
this.fetchTask = new AlluxioFetchTask(fs);
this.consumer = new FileStatusIngester(metaStore, fetchTask);
this.consumer = new FileStatusIngester(metaStore);
this.fetchInterval = fetchInterval;
this.scheduledExecutorService = service;
}
Expand Down Expand Up @@ -98,7 +98,7 @@ public void run() {
LOG.debug(String.format(
"%d sec, numDirectories = %d, numFiles = %d, batchsInqueue = %d",
(curr - startTime) / 1000,
numDirectoriesFetched, numFilesFetched, batches.size()));
numDirectoriesFetched.get(), numFilesFetched.get(), batches.size()));
lastUpdateTime = curr;
}
}
Expand All @@ -121,7 +121,7 @@ public void run() {
LOG.info(String.format(
"Finished fetch Namespace! %d secs used, numDirs = %d, numFiles = %d",
(curr - startTime) / 1000,
numDirectoriesFetched, numFilesFetched));
numDirectoriesFetched.get(), numFilesFetched.get()));
}
}
return;
Expand All @@ -133,13 +133,13 @@ public void run() {
List<URIStatus> children = fs.listStatus(new AlluxioURI(parent));
FileInfo fileInfo = convertToFileInfo(status);
this.addFileStatus(fileInfo);
numDirectoriesFetched++;
numDirectoriesFetched.incrementAndGet();
for (URIStatus child : children) {
if (child.isFolder()) {
this.deque.add(child.getPath());
} else {
this.addFileStatus(convertToFileInfo(child));
numFilesFetched++;
numFilesFetched.incrementAndGet();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,12 @@ public class SmartConfKeys {
"smart.namespace.fetcher.ignore.unsuccessive.inotify.event";
public static final boolean SMART_NAMESPACE_FETCHER_IGNORE_UNSUCCESSIVE_INOTIFY_EVENT_DEFAULT =
false;
public static final String SMART_NAMESPACE_FETCHER_NUM_CONSUMERS_KEY =
"smart.namespace.fetcher.num.consumers";
public static final int SMART_NAMESPACE_FETCHER_NUM_CONSUMERS_DEFAULT = 3;
public static final String SMART_NAMESPACE_FETCHER_PRODUCERS_NUM_KEY =
"smart.namespace.fetcher.producers.num";
public static final int SMART_NAMESPACE_FETCHER_PRODUCERS_NUM_DEFAULT = 3;
public static final String SMART_NAMESPACE_FETCHER_CONSUMERS_NUM_KEY =
"smart.namespace.fetcher.consumers.num";
public static final int SMART_NAMESPACE_FETCHER_CONSUMERS_NUM_DEFAULT = 3;

// Configure keys for Alluxio
public static final String SMART_ALLUXIO_MASTER_HOSTNAME_KEY = "smart.alluxio.master.hostname";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.smartdata.metastore.ingestion.IngestionTask;
import org.smartdata.model.FileInfoBatch;
import org.smartdata.metastore.ingestion.FileStatusIngester;
import org.smartdata.protocol.message.StatusReport;


import java.io.IOException;
Expand All @@ -46,10 +47,10 @@ public class NamespaceFetcher {

private final ScheduledExecutorService scheduledExecutorService;
private final long fetchInterval;
private ScheduledFuture fetchTaskFuture;
private ScheduledFuture[] fetchTaskFutures;
private ScheduledFuture[] consumerFutures;
private FileStatusIngester[] consumers;
private IngestionTask ingestionTask;
private IngestionTask[] ingestionTasks;
private MetaStore metaStore;
private SmartConf conf;

Expand All @@ -70,19 +71,26 @@ public NamespaceFetcher(DFSClient client, MetaStore metaStore, long fetchInterva

public NamespaceFetcher(DFSClient client, MetaStore metaStore, long fetchInterval,
ScheduledExecutorService service, SmartConf conf) {
this.ingestionTask = new HdfsFetchTask(client, conf);
int numConsumers = conf.getInt(SmartConfKeys.SMART_NAMESPACE_FETCHER_NUM_CONSUMERS_KEY,
SmartConfKeys.SMART_NAMESPACE_FETCHER_NUM_CONSUMERS_DEFAULT);
int numProducers = conf.getInt(SmartConfKeys.SMART_NAMESPACE_FETCHER_PRODUCERS_NUM_KEY,
SmartConfKeys.SMART_NAMESPACE_FETCHER_PRODUCERS_NUM_DEFAULT);
numProducers = numProducers <= 0 ? 1 : numProducers;
this.ingestionTasks = new IngestionTask[numProducers];
for (int i = 0; i < numProducers; i++) {
ingestionTasks[i] = new HdfsFetchTask(ingestionTasks, client, conf);
}

int numConsumers = conf.getInt(SmartConfKeys.SMART_NAMESPACE_FETCHER_CONSUMERS_NUM_KEY,
SmartConfKeys.SMART_NAMESPACE_FETCHER_CONSUMERS_NUM_DEFAULT);
numConsumers = numConsumers <= 0 ? 1 : numConsumers;
consumers = new FileStatusIngester[numConsumers];
for (int i = 0; i < numConsumers; i++) {
consumers[i] = new FileStatusIngester(metaStore, ingestionTask);
consumers[i] = new FileStatusIngester(metaStore);
}
this.fetchInterval = fetchInterval;
if (service != null) {
this.scheduledExecutorService = service;
} else {
scheduledExecutorService = Executors.newScheduledThreadPool(numConsumers + 1);
scheduledExecutorService = Executors.newScheduledThreadPool(numProducers + numConsumers);
}
this.metaStore = metaStore;
this.conf = conf;
Expand All @@ -94,10 +102,13 @@ public void startFetch() throws IOException {
} catch (MetaStoreException e) {
throw new IOException("Error while reset files", e);
}
this.fetchTaskFuture = this.scheduledExecutorService.scheduleAtFixedRate(
ingestionTask, 0, fetchInterval, TimeUnit.MILLISECONDS);
this.fetchTaskFutures = new ScheduledFuture[ingestionTasks.length];
for (int i = 0; i < ingestionTasks.length; i++) {
fetchTaskFutures[i] = this.scheduledExecutorService.scheduleAtFixedRate(
ingestionTasks[i], 0, fetchInterval, TimeUnit.MILLISECONDS);
}

consumerFutures = new ScheduledFuture[consumers.length];
this.consumerFutures = new ScheduledFuture[consumers.length];
for (int i = 0; i < consumers.length; i++) {
consumerFutures[i] = this.scheduledExecutorService.scheduleAtFixedRate(
consumers[i], 0, 100, TimeUnit.MILLISECONDS);
Expand All @@ -106,12 +117,16 @@ public void startFetch() throws IOException {
}

public boolean fetchFinished() {
return this.ingestionTask.finished();
return IngestionTask.finished();
}

public void stop() {
if (fetchTaskFuture != null) {
this.fetchTaskFuture.cancel(false);
if (fetchTaskFutures != null) {
for (ScheduledFuture f: fetchTaskFutures) {
if (f != null) {
f.cancel(false);
}
}
}
if (consumerFutures != null) {
for (ScheduledFuture f : consumerFutures) {
Expand All @@ -126,32 +141,52 @@ private static class HdfsFetchTask extends IngestionTask {
private final HdfsFileStatus[] EMPTY_STATUS = new HdfsFileStatus[0];
private final DFSClient client;
private final SmartConf conf;
private List<String> ignoreList = new ArrayList<>();
private byte[] startAfter = null;
private final byte[] empty = HdfsFileStatus.EMPTY_NAME;
private String parent = "";
private String pendingParent;
private IngestionTask[] ingestionTasks;
private static List<String> ignoreList;
private static int idCounter = 0;
private int id;

public HdfsFetchTask(DFSClient client, SmartConf conf) {
public HdfsFetchTask(IngestionTask[] ingestionTasks, DFSClient client, SmartConf conf) {
super();
id = idCounter++;
this.ingestionTasks = ingestionTasks;
this.client = client;
this.conf = conf;
String configString = conf.get(SmartConfKeys.SMART_IGNORE_DIRS_KEY);
defaultBatchSize = conf.getInt(SmartConfKeys
.SMART_NAMESPACE_FETCHER_BATCH_KEY,
SmartConfKeys.SMART_NAMESPACE_FETCHER_BATCH_DEFAULT);
if (configString != null) {
configString = configString.trim();
if (!configString.equals("")) {
//only when parent dir is not ignored we run the follow code
ignoreList = Arrays.asList(configString.split(","));
for (int i = 0; i < ignoreList.size(); i++) {
if (!ignoreList.get(i).endsWith("/")) {
ignoreList.set(i, ignoreList.get(i).concat("/"));
if (ignoreList == null) {
ignoreList = new ArrayList<>();
String configString = conf.get(SmartConfKeys.SMART_IGNORE_DIRS_KEY);
if (configString != null) {
configString = configString.trim();
if (!configString.equals("")) {
//only when parent dir is not ignored we run the follow code
ignoreList = Arrays.asList(configString.split(","));
for (int i = 0; i < ignoreList.size(); i++) {
if (!ignoreList.get(i).endsWith("/")) {
ignoreList.set(i, ignoreList.get(i).concat("/"));
}
}
}
}
}
}

// BFS finished
public boolean isDequeEmpty() {
for (IngestionTask ingestionTask: ingestionTasks) {
if (((HdfsFetchTask)ingestionTask).parent != null) {
return false;
}
}
return true;
}

@Override
public void run() {
if (LOG.isDebugEnabled()) {
Expand All @@ -160,7 +195,7 @@ public void run() {
LOG.debug(String.format(
"%d sec, numDirectories = %d, numFiles = %d, batchsInqueue = %d",
(curr - startTime) / 1000,
numDirectoriesFetched, numFilesFetched, batches.size()));
numDirectoriesFetched.get(), numFilesFetched.get(), batches.size()));
lastUpdateTime = curr;
}
}
Expand All @@ -169,8 +204,13 @@ public void run() {
return;
}

String parent = deque.pollFirst();
if (parent == null) { // BFS finished
if (this.pendingParent != null) {
this.parent = pendingParent;
this.pendingParent = null;
} else {
this.parent = deque.pollFirst();
}
if (parent == null) {
if (currentBatch.actualSize() > 0) {
try {
this.batches.put(currentBatch);
Expand All @@ -181,14 +221,14 @@ public void run() {
this.currentBatch = new FileInfoBatch(defaultBatchSize);
}

if (this.batches.isEmpty()) {
if (!this.isFinished) {
this.isFinished = true;
if (this.id == 0 && isDequeEmpty() && this.batches.isEmpty()) {
if (!IngestionTask.isFinished) {
IngestionTask.isFinished = true;
long curr = System.currentTimeMillis();
LOG.info(String.format(
"Finished fetch Namespace! %d secs used, numDirs = %d, numFiles = %d",
(curr - startTime) / 1000,
numDirectoriesFetched, numFilesFetched));
numDirectoriesFetched.get(), numFilesFetched.get()));
}
}
return;
Expand All @@ -213,7 +253,7 @@ public void run() {
FileInfo internal = convertToFileInfo(status, "");
internal.setPath(parent);
this.addFileStatus(internal);
numDirectoriesFetched++;
numDirectoriesFetched.incrementAndGet();
}

HdfsFileStatus[] children;
Expand All @@ -227,12 +267,12 @@ public void run() {
this.deque.add(child.getFullName(parent));
} else {
this.addFileStatus(convertToFileInfo(child, parent));
numFilesFetched++;
numFilesFetched.incrementAndGet();
}
}
} while (startAfter != null && batches.size() < maxPendingBatches);
if (startAfter != null) {
this.deque.addFirst(parent);
pendingParent = parent;
}
}
} catch (IOException | InterruptedException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,42 +24,24 @@
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.inotify.MissingEventsException;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.ArgumentMatcher;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.smartdata.model.FileInfo;
import org.smartdata.conf.SmartConf;
import org.smartdata.metastore.MetaStore;
import org.smartdata.metastore.MetaStoreException;

import static org.mockito.Mockito.*;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;

public class TestNamespaceFetcher {

public class FileStatusArgMatcher extends ArgumentMatcher<FileInfo[]> {
private List<String> expected;

public FileStatusArgMatcher(List<String> path) {
this.expected = path;
}

@Override
public boolean matches(Object o) {
FileInfo[] array = (FileInfo[]) o;
List<String> paths = new ArrayList<>();
for (FileInfo statusInternal : array) {
paths.add(statusInternal.getPath());
}
Collections.sort(paths);
return paths.size() == expected.size() && paths.containsAll(expected);
}
}

@Test
public void testNamespaceFetcher() throws IOException, InterruptedException,
MissingEventsException, MetaStoreException {
Expand All @@ -75,14 +57,28 @@ public void testNamespaceFetcher() throws IOException, InterruptedException,
DFSClient client = dfs.getClient();

MetaStore adapter = Mockito.mock(MetaStore.class);
final List<String> pathesInDB = new ArrayList<>();
doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocationOnMock) {
try {
Object[] objects = invocationOnMock.getArguments();
for (FileInfo fileInfo : (FileInfo[]) objects[0]) {
pathesInDB.add(fileInfo.getPath());
}
} catch (Throwable t) {
t.printStackTrace();
}
return null;
}
}).when(adapter).insertFiles(any(FileInfo[].class));
NamespaceFetcher fetcher = new NamespaceFetcher(client, adapter, 100);
fetcher.startFetch();
List<String> expected = Arrays.asList("/", "/user", "/user/user1", "/user/user2", "/tmp");
while (!fetcher.fetchFinished()) {
Thread.sleep(1000);
Thread.sleep(100);
}

Mockito.verify(adapter).insertFiles(Matchers.argThat(new FileStatusArgMatcher(expected)));
Assert.assertTrue(pathesInDB.size() == expected.size() && pathesInDB.containsAll(expected));
fetcher.stop();
} finally {
cluster.shutdown();
Expand Down
Loading

0 comments on commit dae96aa

Please sign in to comment.