From cf95c759c5198d1e26aa615dbcf36e9e5464a234 Mon Sep 17 00:00:00 2001 From: littlezhou Date: Fri, 26 Jan 2018 16:11:54 +0800 Subject: [PATCH] Solve #1568, Create SSM Id file /system/ssm.id in HDFS --- .../java/org/smartdata/SmartConstants.java | 2 + .../org/smartdata/utils/SsmHostUtils.java | 37 +++++++++ .../hdfs/HdfsStatesUpdateService.java | 77 +++++++++++++++---- 3 files changed, 99 insertions(+), 17 deletions(-) create mode 100644 smart-common/src/main/java/org/smartdata/utils/SsmHostUtils.java diff --git a/smart-common/src/main/java/org/smartdata/SmartConstants.java b/smart-common/src/main/java/org/smartdata/SmartConstants.java index 96fc53ede9e..68a376f81c3 100644 --- a/smart-common/src/main/java/org/smartdata/SmartConstants.java +++ b/smart-common/src/main/java/org/smartdata/SmartConstants.java @@ -40,4 +40,6 @@ public class SmartConstants { "/tmp/SMART_CLIENT_DISABLED_ID_FILE"; public static final String NUMBER_OF_SMART_AGENT = "number_of_smart_agent_in_agents_file"; + + public static final String SMART_SERVER_ID_FILE = "/system/ssm.id"; } diff --git a/smart-common/src/main/java/org/smartdata/utils/SsmHostUtils.java b/smart-common/src/main/java/org/smartdata/utils/SsmHostUtils.java new file mode 100644 index 00000000000..965ce9b6476 --- /dev/null +++ b/smart-common/src/main/java/org/smartdata/utils/SsmHostUtils.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.smartdata.utils; + +import java.net.InetAddress; +import java.net.UnknownHostException; + +public class SsmHostUtils { + public static String getHostNameOrIp() { + String hi = System.getProperty("SSM_EXEC_HOST"); + if (hi != null) { + return hi; + } + + try { + hi = InetAddress.getLocalHost().getHostName(); + } catch (UnknownHostException e) { + hi = "UNKNOWN_HOST"; + } + return hi; + } +} diff --git a/smart-hadoop-support/smart-hadoop/src/main/java/org/smartdata/hdfs/HdfsStatesUpdateService.java b/smart-hadoop-support/smart-hadoop/src/main/java/org/smartdata/hdfs/HdfsStatesUpdateService.java index ebd3e6da9b3..f8d99f18093 100644 --- a/smart-hadoop-support/smart-hadoop/src/main/java/org/smartdata/hdfs/HdfsStatesUpdateService.java +++ b/smart-hadoop-support/smart-hadoop/src/main/java/org/smartdata/hdfs/HdfsStatesUpdateService.java @@ -26,6 +26,7 @@ import org.apache.hadoop.io.IOUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.smartdata.SmartConstants; import org.smartdata.SmartContext; import org.smartdata.conf.SmartConfKeys; import org.smartdata.hdfs.metric.fetcher.CachedListFetcher; @@ -36,9 +37,9 @@ import org.smartdata.metastore.MetaStore; import org.smartdata.metastore.StatesUpdateService; import org.smartdata.model.rule.RulePluginManager; +import org.smartdata.utils.SsmHostUtils; import java.io.IOException; -import java.net.InetAddress; import java.net.URI; import java.util.concurrent.Callable; import java.util.concurrent.Executors; @@ -48,7 +49,7 @@ * Polls metrics and events from NameNode */ public class HdfsStatesUpdateService extends StatesUpdateService { - private static final Path MOVER_ID_PATH = new Path("/system/mover.id"); + private static final String MOVER_ID_PATH = "/system/mover.id"; private volatile boolean inSafeMode; private DFSClient client; private ScheduledExecutorService executorService; @@ -56,6 +57,7 @@ public class HdfsStatesUpdateService extends StatesUpdateService { private CachedListFetcher cachedListFetcher; private DataNodeInfoFetcher dataNodeInfoFetcher; private FSDataOutputStream moverIdOutputStream; + private FSDataOutputStream ssmIdOutputStream; private StorageInfoSampler storageInfoSampler; public static final Logger LOG = @@ -86,7 +88,7 @@ public void init() throws IOException { final URI nnUri = HadoopUtil.getNameNodeUri(context.getConf()); LOG.debug("Final Namenode URL:" + nnUri.toString()); client = HadoopUtil.getDFSClient(nnUri, conf); - moverIdOutputStream = checkAndMarkRunning(nnUri, context.getConf()); + checkAndCreateIdFiles(nnUri, context.getConf()); this.executorService = Executors.newScheduledThreadPool(4); this.cachedListFetcher = new CachedListFetcher(client, metaStore); this.inotifyEventFetcher = new InotifyEventFetcher(client, @@ -130,12 +132,23 @@ public void stop() throws IOException { if (moverIdOutputStream != null) { try { moverIdOutputStream.close(); + moverIdOutputStream = null; } catch (IOException e) { LOG.debug("Close 'mover' ID output stream error", e); // ignore this } } + if (ssmIdOutputStream != null) { + try { + ssmIdOutputStream.close(); + ssmIdOutputStream = null; + } catch (IOException e) { + LOG.debug("Close SSM ID output stream error", e); + // ignore this + } + } + if (inotifyEventFetcher != null) { this.inotifyEventFetcher.stop(); } @@ -154,24 +167,54 @@ public void stop() throws IOException { LOG.info("Stopped."); } - private FSDataOutputStream checkAndMarkRunning(URI namenodeURI, Configuration conf) - throws IOException { + private void checkAndCreateIdFiles(URI namenodeURI, Configuration conf) throws IOException { + try { + moverIdOutputStream = checkAndMarkRunning(namenodeURI, conf, MOVER_ID_PATH); + LOG.info("Mover ID file " + MOVER_ID_PATH + " created successfully."); + } catch (IOException e) { + LOG.error("Unable to create " + MOVER_ID_PATH + " in HDFS. " + + "Please check the permission or if it is being written by another instance."); + throw e; + } + try { - DistributedFileSystem fs = (DistributedFileSystem) FileSystem.get(namenodeURI, conf); - if (fs.exists(MOVER_ID_PATH)) { - // try appending to it so that it will fail fast if another instance is - // running. - IOUtils.closeStream(fs.append(MOVER_ID_PATH)); - fs.delete(MOVER_ID_PATH, true); + ssmIdOutputStream = checkAndMarkRunning(namenodeURI, conf, + SmartConstants.SMART_SERVER_ID_FILE); + LOG.info("Smart server ID file " + SmartConstants.SMART_SERVER_ID_FILE + + " created successfully."); + } catch (IOException e) { + LOG.error("Unable to create SSM ID file: " + SmartConstants.SMART_SERVER_ID_FILE + + " in HDFS. Please check the permission or if it is being written by " + + "another instance."); + try { + moverIdOutputStream.close(); + moverIdOutputStream = null; + } catch (IOException ie) { + // ignore this one } - FSDataOutputStream fsout = fs.create(MOVER_ID_PATH, false); - fs.deleteOnExit(MOVER_ID_PATH); - fsout.writeBytes(InetAddress.getLocalHost().getHostName()); + throw e; + } + } + + private FSDataOutputStream checkAndMarkRunning(URI namenodeURI, Configuration conf, String filePath) + throws IOException { + Path path = new Path(filePath); + DistributedFileSystem fs = (DistributedFileSystem) FileSystem.get(namenodeURI, conf); + if (fs.exists(path)) { + // try appending to it so that it will fail fast if another instance is + // running. + IOUtils.closeStream(fs.append(path)); + fs.delete(path, true); + } + FSDataOutputStream fsout = fs.create(path, false); + try { + fs.deleteOnExit(path); + fsout.writeBytes(SsmHostUtils.getHostNameOrIp()); fsout.hflush(); - return fsout; - } catch (Exception e) { - LOG.error("Unable to lock 'mover', please stop 'mover' first."); + } catch (IOException e) { + fsout.close(); throw e; } + return fsout; } }