Skip to content
This repository has been archived by the owner on Jan 3, 2023. It is now read-only.

Make name space fetcher run asynchronously #947

Merged
merged 4 commits into from
Aug 19, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions smart-common/src/main/java/org/smartdata/AbstractService.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,8 @@ public SmartContext getContext() {
public void setContext(SmartContext context) {
this.context = context;
}

public boolean inSafeMode() {
return false;
}
}
13 changes: 13 additions & 0 deletions smart-engine/src/main/java/org/smartdata/server/SmartEngine.java
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,19 @@ public void init() throws IOException {
}
}

@Override
public boolean inSafeMode() {
if (services.isEmpty()) { //Not initiated
return true;
}
for (AbstractService service : services) {
if (service.inSafeMode()) {
return true;
}
}
return false;
}

@Override
public void start() throws IOException {
for (AbstractService s : services) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,14 @@ public void init() throws IOException {
LOG.info("Initialized.");
}

@Override
public boolean inSafeMode() {
if (statesUpdaterService == null) {
return true;
}
return statesUpdaterService.inSafeMode();
}

/**
* Start daemon threads in StatesManager for function.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import java.io.IOException;
import java.net.InetAddress;
import java.net.URI;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;

Expand All @@ -46,6 +47,7 @@
*/
public class HdfsStatesUpdateService extends StatesUpdateService {
private static final Path MOVER_ID_PATH = new Path("/system/mover.id");
private volatile boolean inSafeMode;
private DFSClient client;
private ScheduledExecutorService executorService;
private InotifyEventFetcher inotifyEventFetcher;
Expand All @@ -58,6 +60,7 @@ public class HdfsStatesUpdateService extends StatesUpdateService {

public HdfsStatesUpdateService(SmartContext context, MetaStore metaStore) {
super(context, metaStore);
this.inSafeMode = true;
}

/**
Expand All @@ -78,12 +81,25 @@ public void init() throws IOException {
this.executorService = Executors.newScheduledThreadPool(4);
this.cachedListFetcher = new CachedListFetcher(client, metaStore);
this.inotifyEventFetcher = new InotifyEventFetcher(client,
metaStore, executorService);
metaStore, executorService, new FetchFinishedCallBack());
this.dataNodeInfoFetcher = new DataNodeInfoFetcher(
client, metaStore, executorService, context.getConf());
LOG.info("Initialized.");
}

private class FetchFinishedCallBack implements Callable<Object> {
@Override
public Object call() throws Exception {
inSafeMode = false;
return null;
}
}

@Override
public boolean inSafeMode() {
return inSafeMode;
}

/**
* Start daemon threads in StatesManager for function.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@
*/
package org.smartdata.hdfs.metric.fetcher;

import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.squareup.tape.QueueFile;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSInotifyEventInputStream;
Expand All @@ -27,9 +32,11 @@
import org.smartdata.metastore.MetaStore;
import org.smartdata.metastore.MetaStoreException;

import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
Expand All @@ -39,6 +46,7 @@ public class InotifyEventFetcher {
private final NamespaceFetcher nameSpaceFetcher;
private final ScheduledExecutorService scheduledExecutorService;
private final InotifyEventApplier applier;
private Callable fetchFinishedCallback;
private ScheduledFuture inotifyFetchFuture;
private ScheduledFuture fetchAndApplyFuture;
private EventApplyTask eventApplyTask;
Expand All @@ -48,50 +56,58 @@ public class InotifyEventFetcher {
LoggerFactory.getLogger(InotifyEventFetcher.class);

public InotifyEventFetcher(DFSClient client, MetaStore metaStore,
ScheduledExecutorService service) {
this(client, metaStore, service, new InotifyEventApplier(metaStore, client));
ScheduledExecutorService service, Callable callBack) {
this(client, metaStore, service, new InotifyEventApplier(metaStore, client), callBack);
}

public InotifyEventFetcher(DFSClient client, MetaStore metaStore,
ScheduledExecutorService service, InotifyEventApplier applier) {
ScheduledExecutorService service, InotifyEventApplier applier, Callable callBack) {
this.client = client;
this.applier = applier;
this.scheduledExecutorService = service;
this.fetchFinishedCallback = callBack;
this.nameSpaceFetcher = new NamespaceFetcher(client, metaStore, service);
}

public void start() throws IOException {
this.inotifyFile = new File("/tmp/inotify" + new Random().nextLong());
this.queueFile = new QueueFile(inotifyFile);
long startId = this.client.getNamenode().getCurrentEditLogTxid();
ListeningExecutorService listeningExecutorService = MoreExecutors.listeningDecorator(scheduledExecutorService);
inotifyFile = new File("/tmp/inotify" + new Random().nextLong());
queueFile = new QueueFile(inotifyFile);
long startId = client.getNamenode().getCurrentEditLogTxid();
LOG.info("Start fetching namespace with current edit log txid = " + startId);
this.nameSpaceFetcher.startFetch();
this.inotifyFetchFuture = scheduledExecutorService.scheduleAtFixedRate(
nameSpaceFetcher.startFetch();
inotifyFetchFuture = scheduledExecutorService.scheduleAtFixedRate(
new InotifyFetchTask(queueFile, client, startId), 0, 100, TimeUnit.MILLISECONDS);
this.eventApplyTask = new EventApplyTask(nameSpaceFetcher, applier, queueFile, startId);

eventApplyTask = new EventApplyTask(nameSpaceFetcher, applier, queueFile, startId);
ListenableFuture<?> future = listeningExecutorService.submit(eventApplyTask);
Futures.addCallback(future, new NameSpaceFetcherCallBack(), scheduledExecutorService);
LOG.info("Start apply iNotify events.");
eventApplyTask.start();
}

private class NameSpaceFetcherCallBack implements FutureCallback<Object> {

try {
this.waitNameSpaceFetcherFinished();
} catch (InterruptedException e) {
throw new IOException(e);
@Override
public void onSuccess(@Nullable Object o) {
long lastId = eventApplyTask.getLastId();
inotifyFetchFuture.cancel(false);
nameSpaceFetcher.stop();
try {
queueFile.close();
InotifyFetchAndApplyTask fetchAndApplyTask =
new InotifyFetchAndApplyTask(client, applier, lastId);
fetchAndApplyFuture = scheduledExecutorService.scheduleAtFixedRate(
fetchAndApplyTask, 0, 100, TimeUnit.MILLISECONDS);
LOG.info("Name space fetch finished.");
fetchFinishedCallback.call();
} catch (Exception e) {
e.printStackTrace();
}
}
LOG.info("Name space fetch finished.");
}

private void waitNameSpaceFetcherFinished() throws InterruptedException, IOException {
eventApplyTask.join();

long lastId = eventApplyTask.getLastId();
this.inotifyFetchFuture.cancel(false);
this.nameSpaceFetcher.stop();
this.queueFile.close();
InotifyFetchAndApplyTask fetchAndApplyTask =
new InotifyFetchAndApplyTask(client, applier, lastId);
this.fetchAndApplyFuture = scheduledExecutorService.scheduleAtFixedRate(
fetchAndApplyTask, 0, 100, TimeUnit.MILLISECONDS);
@Override
public void onFailure(Throwable throwable) {
throwable.printStackTrace();
}
}

public void stop() {
Expand Down Expand Up @@ -129,7 +145,7 @@ public void run() {
}
}

private static class EventApplyTask extends Thread {
private static class EventApplyTask implements Runnable {
private final NamespaceFetcher namespaceFetcher;
private final InotifyEventApplier applier;
private final QueueFile queueFile;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import java.util.Arrays;
import java.util.EnumSet;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;

public abstract class TestInotifyFetcher {
Expand Down Expand Up @@ -89,7 +90,12 @@ public void testFetcher() throws Exception {
MetaStore metaStore = Mockito.mock(MetaStore.class);
EventApplierForTest applierForTest = new EventApplierForTest(metaStore, client);
final InotifyEventFetcher fetcher = new InotifyEventFetcher(client, metaStore,
Executors.newScheduledThreadPool(2), applierForTest);
Executors.newScheduledThreadPool(2), applierForTest, new Callable() {
@Override
public Object call() throws Exception {
return null; // Do nothing
}
});

Thread thread = new Thread() {
public void run() {
Expand Down
20 changes: 12 additions & 8 deletions smart-server/src/main/java/org/smartdata/server/SmartServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public class SmartServer {
private SmartConf conf;
private SmartEngine engine;
private ServerContext context;
private SmartServiceState serviceState = SmartServiceState.SAFEMODE;
private boolean enabled;

private SmartRpcServer rpcServer;
private SmartZeppelinServer zeppelinServer;
Expand All @@ -68,6 +68,7 @@ public class SmartServer {
public SmartServer(SmartConf conf) {
this.conf = conf;
this.confMgr = new ConfManager(conf);
this.enabled = false;
}

public void initWith(StartupOption startupOption) throws Exception {
Expand Down Expand Up @@ -217,9 +218,6 @@ private void run() throws Exception {

if (enabled) {
startEngines();
serviceState = SmartServiceState.ACTIVE;
} else {
serviceState = SmartServiceState.DISABLED;
}

rpcServer.start();
Expand All @@ -230,27 +228,33 @@ private void run() throws Exception {
}

private void startEngines() throws Exception {
enabled = true;
engine.init();
engine.start();
}

public void enable() throws IOException {
if (serviceState == SmartServiceState.DISABLED) {
if (getSSMServiceState() == SmartServiceState.DISABLED) {
try {
startEngines();
serviceState = SmartServiceState.ACTIVE;
} catch (Exception e) {
throw new IOException(e);
}
}
}

public SmartServiceState getSSMServiceState() {
return serviceState;
if (!enabled) {
return SmartServiceState.DISABLED;
} else if (!engine.inSafeMode()) {
return SmartServiceState.ACTIVE;
} else {
return SmartServiceState.SAFEMODE;
}
}

public boolean isActive() {
return serviceState == SmartServiceState.ACTIVE;
return getSSMServiceState() == SmartServiceState.ACTIVE;
}

private void stop() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ public void setUp() throws Exception {
uriList.get(0).toString());
ssm.enable();

Thread.sleep(2000);

Assert.assertTrue(ssm.getSSMServiceState() == SmartServiceState.ACTIVE);

} finally {
Expand Down