diff --git a/datasophon-api/src/main/java/com/datasophon/api/DDHApplicationServer.java b/datasophon-api/src/main/java/com/datasophon/api/DDHApplicationServer.java index 3e6b352d..d437b186 100644 --- a/datasophon-api/src/main/java/com/datasophon/api/DDHApplicationServer.java +++ b/datasophon-api/src/main/java/com/datasophon/api/DDHApplicationServer.java @@ -40,6 +40,10 @@ public class DDHApplicationServer extends SpringBootServletInitializer { public static void main(String[] args) { SpringApplication.run(DDHApplicationServer.class, args); + // add shutdown hook, close and shutdown resources + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + shutdown(); + })); } @PostConstruct @@ -48,4 +52,11 @@ public void run() throws UnknownHostException, NoSuchAlgorithmException { CacheUtils.put(Constants.HOSTNAME, hostName); ActorUtils.init(); } + + /** + * Master 关闭时调用 + */ + public static void shutdown() { + ActorUtils.shutdown(); + } } diff --git a/datasophon-api/src/main/java/com/datasophon/api/master/ActorUtils.java b/datasophon-api/src/main/java/com/datasophon/api/master/ActorUtils.java index 10e744fa..9e8075fd 100644 --- a/datasophon-api/src/main/java/com/datasophon/api/master/ActorUtils.java +++ b/datasophon-api/src/main/java/com/datasophon/api/master/ActorUtils.java @@ -70,9 +70,10 @@ public static void init() throws UnknownHostException, NoSuchAlgorithmException actorSystem.actorOf(Props.create(MasterNodeProcessingActor.class), getActorRefName(MasterNodeProcessingActor.class)); + // 节点检测 5m 检测一次 actorSystem.scheduler().schedule( - FiniteDuration.apply(60L, TimeUnit.SECONDS), - FiniteDuration.apply(5L, TimeUnit.MINUTES), + FiniteDuration.apply(30L, TimeUnit.SECONDS), + FiniteDuration.apply(300L, TimeUnit.SECONDS), hostCheckActor, new HostCheckCommand(), actorSystem.dispatcher(), @@ -136,6 +137,18 @@ public static ActorRef getRemoteActor(String hostname, String actorName) { return actorRef; } + /** + * shutdown + */ + public static void shutdown() { + if(actorSystem != null) { + try { + actorSystem.shutdown(); + } catch (Exception ignore){} + actorSystem = null; + } + } + /** * Get ActorRef name from Class name. */ diff --git a/datasophon-api/src/main/java/com/datasophon/api/master/HostCheckActor.java b/datasophon-api/src/main/java/com/datasophon/api/master/HostCheckActor.java index 49bc2eb3..34970fb9 100644 --- a/datasophon-api/src/main/java/com/datasophon/api/master/HostCheckActor.java +++ b/datasophon-api/src/main/java/com/datasophon/api/master/HostCheckActor.java @@ -17,27 +17,41 @@ package com.datasophon.api.master; +import akka.actor.ActorRef; +import akka.actor.UntypedActor; +import akka.pattern.Patterns; +import akka.util.Timeout; import com.datasophon.api.service.ClusterHostService; import com.datasophon.api.service.ClusterInfoService; import com.datasophon.api.service.ClusterServiceRoleInstanceService; import com.datasophon.api.utils.SpringTool; import com.datasophon.common.command.HostCheckCommand; +import com.datasophon.common.command.PingCommand; +import com.datasophon.common.model.HostInfo; +import com.datasophon.common.utils.ExecResult; import com.datasophon.common.utils.PromInfoUtils; import com.datasophon.common.utils.Result; import com.datasophon.dao.entity.ClusterHostEntity; import com.datasophon.dao.entity.ClusterInfoEntity; import com.datasophon.dao.entity.ClusterServiceRoleInstanceEntity; - +import com.datasophon.dao.enums.MANAGED; import org.apache.commons.lang.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.concurrent.Await; +import scala.concurrent.Future; +import scala.concurrent.duration.Duration; +import java.util.ArrayList; +import java.util.Date; import java.util.List; import java.util.Objects; +import java.util.concurrent.TimeUnit; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import akka.actor.UntypedActor; +/** + * 节点状态监测 + */ public class HostCheckActor extends UntypedActor { private static final Logger logger = LoggerFactory.getLogger(HostCheckActor.class); @@ -52,16 +66,51 @@ public void onReceive(Object msg) throws Throwable { SpringTool.getApplicationContext().getBean(ClusterServiceRoleInstanceService.class); ClusterInfoService clusterInfoService = SpringTool.getApplicationContext().getBean(ClusterInfoService.class); + + // Host or cluster + final HostCheckCommand hostCheckCommand = (HostCheckCommand)msg; + final HostInfo hostInfo = hostCheckCommand.getHostInfo(); + + // 获取当前安装并且正在运行的集群 Result result = clusterInfoService.runningClusterList(); List clusterList = (List) result.getData(); for (ClusterInfoEntity clusterInfoEntity : clusterList) { + // 获取集群上安装的 Prometheus 服务, 从 Prometheus 获取CPU、磁盘使用量等 ClusterServiceRoleInstanceEntity prometheusInstance = roleInstanceService.getOneServiceRole("Prometheus", "", clusterInfoEntity.getId()); if (Objects.nonNull(prometheusInstance)) { + // 集群正常安装了 Prometheus List list = clusterHostService.getHostListByClusterId(clusterInfoEntity.getId()); String promUrl = "http://" + prometheusInstance.getHostname() + ":9090/api/v1/query"; for (ClusterHostEntity clusterHostEntity : list) { + if(hostInfo != null && !StringUtils.equals(clusterHostEntity.getHostname(), hostInfo.getHostname())) { + // 指定了节点,直接只处理这一个节点的 + continue; + } + try { + // rpc 检测 + final ActorRef pingActor = ActorUtils.getRemoteActor(clusterHostEntity.getHostname(), "pingActor"); + PingCommand pingCommand = new PingCommand(); + pingCommand.setMessage("ping"); + Timeout timeout = new Timeout(Duration.create(180, TimeUnit.SECONDS)); + Future execFuture = Patterns.ask(pingActor, pingCommand, timeout); + ExecResult execResult = (ExecResult) Await.result(execFuture, timeout.duration()); + if (execResult.getExecResult()) { + logger.info("ping host: {} success", clusterHostEntity.getHostname()); + } else { + logger.warn("ping host: {} fail, reason: {}", clusterHostEntity.getHostname(), execResult.getExecOut()); + throw new IllegalStateException("ping host: " + clusterHostEntity.getHostname() + " failed."); + } + clusterHostEntity.setHostState(1); + clusterHostEntity.setManaged(MANAGED.YES); + } catch (Exception e) { + logger.warn("host: " + clusterHostEntity.getHostname() + " rpc error, cause: " + e.getMessage()); + clusterHostEntity.setHostState(3); + clusterHostEntity.setManaged(MANAGED.NO); + // ping 失败,则修改节点状态为 false + continue; + } try { String hostname = clusterHostEntity.getHostname(); // 查询内存总量 @@ -106,12 +155,51 @@ public void onReceive(Object msg) throws Throwable { clusterHostEntity.setAverageLoad(cpuLoad); } } catch (Exception e) { - logger.info(e.getMessage()); + logger.warn("check cluster state error, cause: {}", e.getMessage()); } } if (list.size() > 0) { clusterHostService.updateBatchById(list); } + } else { + // 没有 Prometheus?直接获取节点,通过 rpc 检测是否启动 + List hosts = clusterHostService.getHostListByClusterId(clusterInfoEntity.getId()); + List checkedHosts = new ArrayList<>(hosts.size()); + for (ClusterHostEntity host : hosts) { + if(hostInfo != null && !StringUtils.equals(host.getHostname(), hostInfo.getHostname())) { + // 指定了节点,直接只处理这一个节点的 + continue; + } + // copy 一个新的,只更新状态 + ClusterHostEntity checkedHost = new ClusterHostEntity(); + checkedHost.setId(host.getId()); + checkedHost.setCheckTime(new Date()); + try { + // rpc 检测 + final ActorRef pingActor = ActorUtils.getRemoteActor(host.getHostname(), "pingActor"); + PingCommand pingCommand = new PingCommand(); + pingCommand.setMessage("ping"); + Timeout timeout = new Timeout(Duration.create(180, TimeUnit.SECONDS)); + Future execFuture = Patterns.ask(pingActor, pingCommand, timeout); + ExecResult execResult = (ExecResult) Await.result(execFuture, timeout.duration()); + if (execResult.getExecResult()) { + logger.info("ping host: {} success", host.getHostname()); + } else { + logger.warn("ping host: {} fail, reason: {}", host.getHostname(), execResult.getExecOut()); + throw new IllegalStateException("ping host: " + host.getHostname() + " failed."); + } + checkedHost.setHostState(1); + checkedHost.setManaged(MANAGED.YES); + } catch (Exception e) { + logger.warn("host: " + host.getHostname() + " rpc error, cause: " + e.getMessage()); + checkedHost.setManaged(MANAGED.NO); + checkedHost.setHostState(3); + } + checkedHosts.add(checkedHost); + } + if (checkedHosts.size() > 0) { + clusterHostService.updateBatchById(checkedHosts); + } } } } else { diff --git a/datasophon-common/pom.xml b/datasophon-common/pom.xml index 2466d9c8..36d9b22b 100644 --- a/datasophon-common/pom.xml +++ b/datasophon-common/pom.xml @@ -104,6 +104,12 @@ mysql mysql-connector-java + + + org.apache.ant + ant + 1.10.13 + diff --git a/datasophon-common/src/main/java/com/datasophon/common/command/PingCommand.java b/datasophon-common/src/main/java/com/datasophon/common/command/PingCommand.java new file mode 100644 index 00000000..c6c36826 --- /dev/null +++ b/datasophon-common/src/main/java/com/datasophon/common/command/PingCommand.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.datasophon.common.command; + +import lombok.Data; + +import java.io.Serializable; + +/** + * + * + * @author zhenqin + */ +@Data +public class PingCommand implements Serializable { + + private String message; + +} diff --git a/datasophon-common/src/main/java/com/datasophon/common/utils/FileUtils.java b/datasophon-common/src/main/java/com/datasophon/common/utils/FileUtils.java new file mode 100644 index 00000000..3f792302 --- /dev/null +++ b/datasophon-common/src/main/java/com/datasophon/common/utils/FileUtils.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.datasophon.common.utils; + +import com.google.common.io.CharStreams; +import com.google.common.io.LineProcessor; +import org.apache.commons.codec.binary.Hex; +import org.apache.commons.lang.StringUtils; +import org.apache.tools.tar.TarEntry; +import org.apache.tools.tar.TarInputStream; + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.nio.charset.Charset; +import java.security.MessageDigest; +import java.util.zip.GZIPInputStream; +/** + * + * 基本文件的特殊操作,文件MD5,从 targz 压缩包不解压读取一个文本文件,读取一个文件的第一行 等 + * + *
+ *
+ * Created by zhenqin.
+ * User: zhenqin
+ * Date: 2023/4/21
+ * Time: 下午9:58
+ *
+ * 
+ * + * @author zhenqin + */ +public class FileUtils { + + + /** + * 获取一个文件的md5值(可处理大文件) + * @return md5 value + */ + public static String md5(File file) { + try (FileInputStream fileInputStream = new FileInputStream(file);) { + MessageDigest MD5 = MessageDigest.getInstance("MD5"); + + byte[] buffer = new byte[8192]; + int length; + while ((length = fileInputStream.read(buffer)) != -1) { + MD5.update(buffer, 0, length); + } + return new String(Hex.encodeHex(MD5.digest())); + } catch (Exception e) { + throw new IllegalStateException(e); + } + } + + + /** + * 从 tar.gz 的压缩包内读取一个 文本文件 + * @param targz + * @param name + * @return + * @throws IOException + */ + public static String readTargzTextFile(File targz, String name, Charset charset) throws IOException { + String content = null; + TarEntry tarEntry = null; + try (TarInputStream tarInputStream = new TarInputStream(new GZIPInputStream(new FileInputStream(targz))); + BufferedReader reader = new BufferedReader(new InputStreamReader(tarInputStream, charset));){ + boolean hasNext = reader.readLine() != null; + if(hasNext) { + return null; + } + while ((tarEntry = tarInputStream.getNextEntry()) != null ) { + String entryName = tarEntry.getName(); + if (tarEntry.isDirectory()) { + // 如果是文件夹,创建文件夹并加速循环 + continue; + } + if(entryName.endsWith(name)) { + // 找到第一个文件就结束 + content = CharStreams.toString(reader); + break; + } + } + } + return content; + } + + + /** + * 读取文件第一行,第一行的非空行 + * @param file + * @return + * @throws Exception + */ + public static String readFirstLine(File file) throws Exception { + final String firstLine = CharStreams.readLines(new FileReader(file), new LineProcessor() { + + String firstLine = null; + + @Override + public boolean processLine(String line) throws IOException { + this.firstLine = line; + // 第一行非空则返回 + return StringUtils.trimToNull(line) == null; + } + + @Override + public String getResult() { + return firstLine; + } + }); + return firstLine; + } +} diff --git a/datasophon-common/src/main/java/com/datasophon/common/utils/ShellUtils.java b/datasophon-common/src/main/java/com/datasophon/common/utils/ShellUtils.java index 3946c53d..7bc0bcf3 100644 --- a/datasophon-common/src/main/java/com/datasophon/common/utils/ShellUtils.java +++ b/datasophon-common/src/main/java/com/datasophon/common/utils/ShellUtils.java @@ -112,30 +112,6 @@ public static String getCpuArchitecture() { return null; } - // 获取cpu架构 arm或x86 - public static String getPackageMd5(String md5Cmd) { - try { - Process ps = Runtime.getRuntime().exec(new String[]{"sh", "-c", md5Cmd}); - StringBuffer stringBuffer = new StringBuffer(); - int exitValue = ps.waitFor(); - if (0 == exitValue) { - // 只能接收脚本echo打印的数据,并且是echo打印的最后一次数据 - BufferedInputStream in = new BufferedInputStream(ps.getInputStream()); - BufferedReader br = new BufferedReader(new InputStreamReader(in)); - String line; - while ((line = br.readLine()) != null) { - logger.info("脚本返回的数据如下: " + line); - stringBuffer.append(line); - } - in.close(); - br.close(); - return stringBuffer.toString(); - } - } catch (Exception e) { - e.printStackTrace(); - } - return null; - } public static ExecResult execWithStatus(String workPath, List command, long timeout) { Process process = null; diff --git a/datasophon-worker/src/main/java/com/datasophon/worker/actor/PingActor.java b/datasophon-worker/src/main/java/com/datasophon/worker/actor/PingActor.java new file mode 100644 index 00000000..480b0252 --- /dev/null +++ b/datasophon-worker/src/main/java/com/datasophon/worker/actor/PingActor.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.datasophon.worker.actor; + +import akka.actor.UntypedActor; +import com.datasophon.common.command.PingCommand; +import com.datasophon.common.utils.ExecResult; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * 发送 ping,返回 pong + * + * @author zhenqin + */ +public class PingActor extends UntypedActor { + + private static final Logger logger = LoggerFactory.getLogger(PingActor.class); + + @Override + public void onReceive(Object msg) throws Throwable { + if (msg instanceof PingCommand) { + PingCommand command = (PingCommand) msg; + ExecResult execResult = new ExecResult(); + execResult.setExecResult(true); + execResult.setExecOut("pong"); + getSender().tell(execResult, getSelf()); + } else { + unhandled(msg); + } + } +} diff --git a/datasophon-worker/src/main/java/com/datasophon/worker/actor/WorkerActor.java b/datasophon-worker/src/main/java/com/datasophon/worker/actor/WorkerActor.java index a124d32b..201555fb 100644 --- a/datasophon-worker/src/main/java/com/datasophon/worker/actor/WorkerActor.java +++ b/datasophon-worker/src/main/java/com/datasophon/worker/actor/WorkerActor.java @@ -68,6 +68,9 @@ public void preStart() throws IOException { getContext().actorOf(Props.create(NMStateActor.class), getActorRefName(NMStateActor.class)); ActorRef rMStateActor = getContext().actorOf(Props.create(RMStateActor.class), getActorRefName(RMStateActor.class)); + ActorRef pingActor = getContext().actorOf(Props.create(PingActor.class), getActorRefName(PingActor.class)); + + // 添加监听服务 getContext().watch(installServiceActor); getContext().watch(configureServiceActor); getContext().watch(startServiceActor); @@ -82,6 +85,7 @@ public void preStart() throws IOException { getContext().watch(kerberosActor); getContext().watch(rMStateActor); getContext().watch(nMStateActor); + getContext().watch(pingActor); } /** Get ActorRef name from Class name. */ @@ -100,11 +104,4 @@ public void onReceive(Object message) throws Throwable { unhandled(message); } } - - public static void main(String[] args) { - String str = - "{coreNum: 8, totalMem: 31.4189, totalDisk: 991.51,usedDisk: 9.59, diskAvail: 981.92,usedMem:5.91602,memUsedPersent:18.8295,diskUsedPersent:1.0,averageLoad:0.06}"; - StartWorkerMessage message = JSONObject.parseObject(str, StartWorkerMessage.class); - } - } diff --git a/datasophon-worker/src/main/java/com/datasophon/worker/handler/InstallServiceHandler.java b/datasophon-worker/src/main/java/com/datasophon/worker/handler/InstallServiceHandler.java index 998b1ca7..8bb3b0bd 100644 --- a/datasophon-worker/src/main/java/com/datasophon/worker/handler/InstallServiceHandler.java +++ b/datasophon-worker/src/main/java/com/datasophon/worker/handler/InstallServiceHandler.java @@ -25,11 +25,16 @@ import com.datasophon.common.model.RunAs; import com.datasophon.common.utils.CompressUtils; import com.datasophon.common.utils.ExecResult; +import com.datasophon.common.utils.FileUtils; import com.datasophon.common.utils.PropertyUtils; import com.datasophon.common.utils.ShellUtils; import com.datasophon.worker.utils.TaskConstants; import lombok.Data; import org.apache.commons.lang.StringUtils; + +import java.io.File; +import java.util.Objects; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -80,8 +85,7 @@ private Boolean isNeedDownloadPkg(String packagePath, String packageMd5) { logger.info("Remote package md5 is {}", packageMd5); if (FileUtil.exist(packagePath)) { // check md5 - String md5cmd = "sh " + Constants.WORKER_SCRIPT_PATH + "md5.sh " + packagePath; - String md5 = ShellUtils.getPackageMd5(md5cmd); + String md5 = FileUtils.md5(new File(packagePath)); logger.info("Local md5 is {}", md5); diff --git a/datasophon-worker/src/main/java/com/datasophon/worker/handler/ServiceHandler.java b/datasophon-worker/src/main/java/com/datasophon/worker/handler/ServiceHandler.java index 9cffc1e1..f9b61e64 100644 --- a/datasophon-worker/src/main/java/com/datasophon/worker/handler/ServiceHandler.java +++ b/datasophon-worker/src/main/java/com/datasophon/worker/handler/ServiceHandler.java @@ -21,6 +21,7 @@ import com.datasophon.common.model.RunAs; import com.datasophon.common.model.ServiceRoleRunner; import com.datasophon.common.utils.ExecResult; +import com.datasophon.common.utils.FileUtils; import com.datasophon.common.utils.PropertyUtils; import com.datasophon.common.utils.ShellUtils; import com.datasophon.worker.utils.TaskConstants; @@ -29,6 +30,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.File; import java.util.ArrayList; import java.util.List; import java.util.Objects; @@ -146,7 +148,25 @@ public ExecResult execRunner(ServiceRoleRunner runner, String decompressPackageN || runner.getProgram().contains(Constants.JOB_MANAGER)) { logger.info("do not use sh"); } else { - command.add("sh"); + File shellFile = new File(Constants.INSTALL_PATH + Constants.SLASH + decompressPackageName + Constants.SLASH + shell); + if(shellFile.exists()) { + try { + // 读取第一行,检查采用的 shell 是哪个,bash、sh ? + final String firstLine = StringUtils.trimToEmpty(FileUtils.readFirstLine(shellFile)); + if(firstLine.contains("bash")) { + command.add("bash"); + } else if(firstLine.contains("sh")) { + command.add("sh"); + } else { + command.add("sh"); + } + } catch (Exception e) { + logger.warn("read shell script file: " + shell + " error, reason: " + e.getMessage()); + command.add("sh"); + } + } else { + command.add("sh"); + } } command.add(shell); command.addAll(args);