Skip to content

Commit

Permalink
Solve #1568, Create SSM Id file /system/ssm.id in HDFS (#1569)
Browse files Browse the repository at this point in the history
  • Loading branch information
littlezhou committed Jan 30, 2018
1 parent 888b5aa commit 6392755
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 17 deletions.
2 changes: 2 additions & 0 deletions smart-common/src/main/java/org/smartdata/SmartConstants.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,6 @@ public class SmartConstants {
"/tmp/SMART_CLIENT_DISABLED_ID_FILE";

public static final String NUMBER_OF_SMART_AGENT = "number_of_smart_agent_in_agents_file";

public static final String SMART_SERVER_ID_FILE = "/system/ssm.id";
}
37 changes: 37 additions & 0 deletions smart-common/src/main/java/org/smartdata/utils/SsmHostUtils.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.smartdata.utils;

import java.net.InetAddress;
import java.net.UnknownHostException;

public class SsmHostUtils {
public static String getHostNameOrIp() {
String hi = System.getProperty("SSM_EXEC_HOST");
if (hi != null) {
return hi;
}

try {
hi = InetAddress.getLocalHost().getHostName();
} catch (UnknownHostException e) {
hi = "UNKNOWN_HOST";
}
return hi;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.hadoop.io.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.smartdata.SmartConstants;
import org.smartdata.SmartContext;
import org.smartdata.conf.SmartConfKeys;
import org.smartdata.hdfs.metric.fetcher.CachedListFetcher;
Expand All @@ -36,9 +37,9 @@
import org.smartdata.metastore.MetaStore;
import org.smartdata.metastore.StatesUpdateService;
import org.smartdata.model.rule.RulePluginManager;
import org.smartdata.utils.SsmHostUtils;

import java.io.IOException;
import java.net.InetAddress;
import java.net.URI;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
Expand All @@ -48,14 +49,15 @@
* Polls metrics and events from NameNode
*/
public class HdfsStatesUpdateService extends StatesUpdateService {
private static final Path MOVER_ID_PATH = new Path("/system/mover.id");
private static final String MOVER_ID_PATH = "/system/mover.id";
private volatile boolean inSafeMode;
private DFSClient client;
private ScheduledExecutorService executorService;
private InotifyEventFetcher inotifyEventFetcher;
private CachedListFetcher cachedListFetcher;
private DataNodeInfoFetcher dataNodeInfoFetcher;
private FSDataOutputStream moverIdOutputStream;
private FSDataOutputStream ssmIdOutputStream;
private StorageInfoSampler storageInfoSampler;

public static final Logger LOG =
Expand Down Expand Up @@ -86,7 +88,7 @@ public void init() throws IOException {
final URI nnUri = HadoopUtil.getNameNodeUri(context.getConf());
LOG.debug("Final Namenode URL:" + nnUri.toString());
client = HadoopUtil.getDFSClient(nnUri, conf);
moverIdOutputStream = checkAndMarkRunning(nnUri, context.getConf());
checkAndCreateIdFiles(nnUri, context.getConf());
this.executorService = Executors.newScheduledThreadPool(4);
this.cachedListFetcher = new CachedListFetcher(client, metaStore);
this.inotifyEventFetcher = new InotifyEventFetcher(client,
Expand Down Expand Up @@ -130,12 +132,23 @@ public void stop() throws IOException {
if (moverIdOutputStream != null) {
try {
moverIdOutputStream.close();
moverIdOutputStream = null;
} catch (IOException e) {
LOG.debug("Close 'mover' ID output stream error", e);
// ignore this
}
}

if (ssmIdOutputStream != null) {
try {
ssmIdOutputStream.close();
ssmIdOutputStream = null;
} catch (IOException e) {
LOG.debug("Close SSM ID output stream error", e);
// ignore this
}
}

if (inotifyEventFetcher != null) {
this.inotifyEventFetcher.stop();
}
Expand All @@ -154,24 +167,54 @@ public void stop() throws IOException {
LOG.info("Stopped.");
}

private FSDataOutputStream checkAndMarkRunning(URI namenodeURI, Configuration conf)
throws IOException {
private void checkAndCreateIdFiles(URI namenodeURI, Configuration conf) throws IOException {
try {
moverIdOutputStream = checkAndMarkRunning(namenodeURI, conf, MOVER_ID_PATH);
LOG.info("Mover ID file " + MOVER_ID_PATH + " created successfully.");
} catch (IOException e) {
LOG.error("Unable to create " + MOVER_ID_PATH + " in HDFS. "
+ "Please check the permission or if it is being written by another instance.");
throw e;
}

try {
DistributedFileSystem fs = (DistributedFileSystem) FileSystem.get(namenodeURI, conf);
if (fs.exists(MOVER_ID_PATH)) {
// try appending to it so that it will fail fast if another instance is
// running.
IOUtils.closeStream(fs.append(MOVER_ID_PATH));
fs.delete(MOVER_ID_PATH, true);
ssmIdOutputStream = checkAndMarkRunning(namenodeURI, conf,
SmartConstants.SMART_SERVER_ID_FILE);
LOG.info("Smart server ID file " + SmartConstants.SMART_SERVER_ID_FILE
+ " created successfully.");
} catch (IOException e) {
LOG.error("Unable to create SSM ID file: " + SmartConstants.SMART_SERVER_ID_FILE
+ " in HDFS. Please check the permission or if it is being written by "
+ "another instance.");
try {
moverIdOutputStream.close();
moverIdOutputStream = null;
} catch (IOException ie) {
// ignore this one
}
FSDataOutputStream fsout = fs.create(MOVER_ID_PATH, false);
fs.deleteOnExit(MOVER_ID_PATH);
fsout.writeBytes(InetAddress.getLocalHost().getHostName());
throw e;
}
}

private FSDataOutputStream checkAndMarkRunning(URI namenodeURI, Configuration conf, String filePath)
throws IOException {
Path path = new Path(filePath);
DistributedFileSystem fs = (DistributedFileSystem) FileSystem.get(namenodeURI, conf);
if (fs.exists(path)) {
// try appending to it so that it will fail fast if another instance is
// running.
IOUtils.closeStream(fs.append(path));
fs.delete(path, true);
}
FSDataOutputStream fsout = fs.create(path, false);
try {
fs.deleteOnExit(path);
fsout.writeBytes(SsmHostUtils.getHostNameOrIp());
fsout.hflush();
return fsout;
} catch (Exception e) {
LOG.error("Unable to lock 'mover', please stop 'mover' first.");
} catch (IOException e) {
fsout.close();
throw e;
}
return fsout;
}
}

0 comments on commit 6392755

Please sign in to comment.