Skip to content
This repository has been archived by the owner on Jan 3, 2023. It is now read-only.

Commit

Permalink
Solve #1646, Concurrent consumer of namespace fetcher (#1652)
Browse files Browse the repository at this point in the history
  • Loading branch information
littlezhou authored Mar 22, 2018
1 parent 21a6e23 commit ec484b3
Show file tree
Hide file tree
Showing 6 changed files with 60 additions and 67 deletions.
6 changes: 6 additions & 0 deletions conf/smart-default.xml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,12 @@
<description>Batch size of Namespace fetcher</description>
</property>

<property>
<name>smart.namespace.fetcher.num.consumers</name>
<value>3</value>
<description>Number of consumers to consume data fetched by Namespace fetcher</description>
</property>

<property>
<name>smart.rule.executors</name>
<value>5</value>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ public class SmartConfKeys {
"smart.namespace.fetcher.ignore.unsuccessive.inotify.event";
public static final boolean SMART_NAMESPACE_FETCHER_IGNORE_UNSUCCESSIVE_INOTIFY_EVENT_DEFAULT =
false;
public static final String SMART_NAMESPACE_FETCHER_NUM_CONSUMERS_KEY =
"smart.namespace.fetcher.num.consumers";
public static final int SMART_NAMESPACE_FETCHER_NUM_CONSUMERS_DEFAULT = 3;

// Configure keys for Alluxio
public static final String SMART_ALLUXIO_MASTER_HOSTNAME_KEY = "smart.alluxio.master.hostname";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,8 @@ public InotifyEventFetcher(DFSClient client, MetaStore metaStore,
this.metaStore = metaStore;
this.scheduledExecutorService = service;
this.finishedCallback = callBack;
this.nameSpaceFetcher = new NamespaceFetcher(client, metaStore, service);
// use independent thread pool
this.nameSpaceFetcher = new NamespaceFetcher(client, metaStore, null);
this.conf = new SmartConf();
}

Expand All @@ -98,7 +99,7 @@ public InotifyEventFetcher(DFSClient client, MetaStore metaStore,
this.scheduledExecutorService = service;
this.finishedCallback = callBack;
this.conf = conf;
this.nameSpaceFetcher = new NamespaceFetcher(client, metaStore, service,conf);
this.nameSpaceFetcher = new NamespaceFetcher(client, metaStore, null, conf);
}

public void start() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
Expand All @@ -48,55 +47,43 @@ public class NamespaceFetcher {
private final ScheduledExecutorService scheduledExecutorService;
private final long fetchInterval;
private ScheduledFuture fetchTaskFuture;
private ScheduledFuture consumerFuture;
private FileStatusIngester consumer;
private ScheduledFuture[] consumerFutures;
private FileStatusIngester[] consumers;
private IngestionTask ingestionTask;
private MetaStore metaStore;
private SmartConf conf;

public static final Logger LOG =
LoggerFactory.getLogger(NamespaceFetcher.class);

public NamespaceFetcher(DFSClient client, MetaStore metaStore) {
this(client, metaStore, DEFAULT_INTERVAL);
}

public NamespaceFetcher(DFSClient client, MetaStore metaStore, ScheduledExecutorService service) {
this(client, metaStore, DEFAULT_INTERVAL, service);
this.conf = new SmartConf();
this(client, metaStore, DEFAULT_INTERVAL, service, new SmartConf());
}

public NamespaceFetcher(DFSClient client, MetaStore metaStore, ScheduledExecutorService service, SmartConf conf) {
this(client, metaStore, DEFAULT_INTERVAL, service);
this.conf = conf;
this(client, metaStore, DEFAULT_INTERVAL, service, conf);
}

public NamespaceFetcher(DFSClient client, MetaStore metaStore, long fetchInterval) {
this(client, metaStore, fetchInterval, Executors.newSingleThreadScheduledExecutor());
this.conf = new SmartConf();
}

public NamespaceFetcher(DFSClient client, MetaStore metaStore, long fetchInterval, SmartConf conf) {
this(client, metaStore, fetchInterval, Executors.newSingleThreadScheduledExecutor());
this.conf = conf;
}

public NamespaceFetcher(DFSClient client, MetaStore metaStore, long fetchInterval,
ScheduledExecutorService service) {
this.ingestionTask = new HdfsFetchTask(client);
this.consumer = new FileStatusIngester(metaStore, ingestionTask);
this.fetchInterval = fetchInterval;
this.scheduledExecutorService = service;
this.metaStore = metaStore;
this.conf = new SmartConf();
this(client, metaStore, fetchInterval, null, new SmartConf());
}

public NamespaceFetcher(DFSClient client, MetaStore metaStore, long fetchInterval,
ScheduledExecutorService service, SmartConf conf) {
this.ingestionTask = new HdfsFetchTask(client, conf);
this.consumer = new FileStatusIngester(metaStore, ingestionTask);
int numConsumers = conf.getInt(SmartConfKeys.SMART_NAMESPACE_FETCHER_NUM_CONSUMERS_KEY,
SmartConfKeys.SMART_NAMESPACE_FETCHER_NUM_CONSUMERS_DEFAULT);
numConsumers = numConsumers <= 0 ? 1 : numConsumers;
consumers = new FileStatusIngester[numConsumers];
for (int i = 0; i < numConsumers; i++) {
consumers[i] = new FileStatusIngester(metaStore, ingestionTask);
}
this.fetchInterval = fetchInterval;
this.scheduledExecutorService = service;
if (service != null) {
this.scheduledExecutorService = service;
} else {
scheduledExecutorService = Executors.newScheduledThreadPool(numConsumers + 1);
}
this.metaStore = metaStore;
this.conf = conf;
}
Expand All @@ -109,8 +96,12 @@ public void startFetch() throws IOException {
}
this.fetchTaskFuture = this.scheduledExecutorService.scheduleAtFixedRate(
ingestionTask, 0, fetchInterval, TimeUnit.MILLISECONDS);
this.consumerFuture = this.scheduledExecutorService.scheduleAtFixedRate(
consumer, 0, 100, TimeUnit.MILLISECONDS);

consumerFutures = new ScheduledFuture[consumers.length];
for (int i = 0; i < consumers.length; i++) {
consumerFutures[i] = this.scheduledExecutorService.scheduleAtFixedRate(
consumers[i], 0, 100, TimeUnit.MILLISECONDS);
}
LOG.info("Started.");
}

Expand All @@ -122,16 +113,20 @@ public void stop() {
if (fetchTaskFuture != null) {
this.fetchTaskFuture.cancel(false);
}
if (consumerFuture != null) {
this.consumerFuture.cancel(false);
if (consumerFutures != null) {
for (ScheduledFuture f : consumerFutures) {
if (f != null) {
f.cancel(false);
}
}
}
}

private static class HdfsFetchTask extends IngestionTask {
private final HdfsFileStatus[] EMPTY_STATUS = new HdfsFileStatus[0];
private final DFSClient client;
private final SmartConf conf;
private List<String> ignoreList;
private List<String> ignoreList = new ArrayList<>();
private byte[] startAfter = null;
private final byte[] empty = HdfsFileStatus.EMPTY_NAME;

Expand All @@ -143,35 +138,20 @@ public HdfsFetchTask(DFSClient client, SmartConf conf) {
defaultBatchSize = conf.getInt(SmartConfKeys
.SMART_NAMESPACE_FETCHER_BATCH_KEY,
SmartConfKeys.SMART_NAMESPACE_FETCHER_BATCH_DEFAULT);
if (configString == null){
configString = "";
}

//only when parent dir is not ignored we run the follow code
ignoreList = Arrays.asList(configString.split(","));
for (int i = 0; i < ignoreList.size(); i++) {
if (!ignoreList.get(i).endsWith("/")) {
ignoreList.set(i, ignoreList.get(i).concat("/"));
if (configString != null) {
configString = configString.trim();
if (!configString.equals("")) {
//only when parent dir is not ignored we run the follow code
ignoreList = Arrays.asList(configString.split(","));
for (int i = 0; i < ignoreList.size(); i++) {
if (!ignoreList.get(i).endsWith("/")) {
ignoreList.set(i, ignoreList.get(i).concat("/"));
}
}
}
}
}

public HdfsFetchTask(DFSClient client) {
super();
this.client = client;
this.conf = new SmartConf();
String configString = conf.get(SmartConfKeys.SMART_IGNORE_DIRS_KEY);
defaultBatchSize = conf.getInt(SmartConfKeys
.SMART_NAMESPACE_FETCHER_BATCH_KEY,
SmartConfKeys.SMART_NAMESPACE_FETCHER_BATCH_DEFAULT);
if (configString == null){
configString = "";
}

//only when parent dir is not ignored we run the follow code
ignoreList = Arrays.asList(configString.split(","));
}

@Override
public void run() {
if (LOG.isDebugEnabled()) {
Expand Down Expand Up @@ -220,7 +200,6 @@ public void run() {
tmpParent = tmpParent.concat("/");
}
for (int i = 0; i < ignoreList.size(); i++) {

if (ignoreList.get(i).equals(tmpParent)) {
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,13 @@ public class FileStatusIngester implements Runnable {
private final IngestionTask ingestionTask;
private long startTime = System.currentTimeMillis();
private long lastUpdateTime = startTime;
private static int idCounter = 0;
private int id;

public FileStatusIngester(MetaStore dbAdapter, IngestionTask ingestionTask) {
this.dbAdapter = dbAdapter;
this.ingestionTask = ingestionTask;
id = idCounter++;
}

@Override
Expand All @@ -54,15 +57,16 @@ public void run() {
}

if (LOG.isDebugEnabled()) {
LOG.debug(batch.actualSize() + " files insert into table 'files'.");
LOG.debug("Consumer " + id + " " + batch.actualSize()
+ " files insert into table 'files'.");
}
}
} catch (MetaStoreException e) {
// TODO: handle this issue
LOG.error("Consumer error");
LOG.error("Consumer {} error", id);
}

if (LOG.isDebugEnabled()) {
if (LOG.isDebugEnabled() && id == 0) {
long curr = System.currentTimeMillis();
if (curr - lastUpdateTime >= 2000) {
long total = IngestionTask.numDirectoriesFetched + IngestionTask.numFilesFetched;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public abstract class IngestionTask implements Runnable {
public static long numPersisted = 0L;

protected int defaultBatchSize = 20;
protected int maxPendingBatches = 55;
protected int maxPendingBatches = 80;

protected static final String ROOT = "/";
// Deque for Breadth-First-Search
Expand Down

0 comments on commit ec484b3

Please sign in to comment.