Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DDH Manager Ping Worker, Worker State Checked #248

Merged
merged 22 commits into from
Jun 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
d80d832
调整 JVM 参数适应高版本 JDK,移除内存优化参数,不分不适用,移除 DS copy
zhzhenqin Mar 14, 2023
ae2831f
调整ui的打包,使无需copy dist到api resource;增加versions-maven-plugin
zhzhenqin Mar 15, 2023
c830792
Merge branch 'datasophon:dev' into dev
zhzhenqin Mar 15, 2023
26dc60b
Merge branch 'datasophon:dev' into dev
zhzhenqin Mar 16, 2023
2af8b0c
Merge branch 'datasophon:dev' into dev
zhzhenqin Mar 30, 2023
24841ed
Merge branch 'datasophon:dev' into dev
zhzhenqin Apr 8, 2023
5313878
Merge branch 'datasophon:dev' into dev
zhzhenqin Apr 20, 2023
d09a090
支持优先从worker templates下加载配置模版,其次从component根目录下加载
zhzhenqin Apr 20, 2023
df5c2b3
remove spec comments
zhzhenqin Apr 20, 2023
b4b4af2
BugFix: 修复 workerjar 打包所有 resource 下资源导致jar太大,修复没有正常日志文件输出
zhzhenqin Apr 20, 2023
6e3bed8
Merge branch 'datasophon:dev' into dev
zhzhenqin Apr 25, 2023
67ed45a
Merge branch 'datasophon:dev' into dev
zhzhenqin Apr 26, 2023
e4b93b9
Manager Ping Worker, Worker State Check
zhzhenqin Apr 26, 2023
679b3f9
Merge branch 'datasophon:dev' into dev
zhzhenqin Apr 28, 2023
555db2e
when start service, read shell first line, use bash or sh exec scripts
zhzhenqin Apr 28, 2023
9156248
Merge branch 'datasophon:dev' into dev
zhzhenqin May 5, 2023
da0a3c3
Merge branch 'datasophon:dev' into dev
zhzhenqin May 8, 2023
ec3318b
upgrade ant depend to highest version
zhzhenqin May 8, 2023
a1b791f
Merge branch 'datasophon:dev' into dev
zhzhenqin May 9, 2023
e4d2d8d
Merge remote-tracking branch 'github/dev' into dev
zhzhenqin Jun 9, 2023
97b507b
merge origin code,check md5 use java api
zhzhenqin Jun 9, 2023
397dbce
Merge remote-tracking branch 'origin/dev' into dev
zhzhenqin Jun 9, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ public class DDHApplicationServer extends SpringBootServletInitializer {

public static void main(String[] args) {
SpringApplication.run(DDHApplicationServer.class, args);
// add shutdown hook, close and shutdown resources
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
shutdown();
}));
}

@PostConstruct
Expand All @@ -48,4 +52,11 @@ public void run() throws UnknownHostException, NoSuchAlgorithmException {
CacheUtils.put(Constants.HOSTNAME, hostName);
ActorUtils.init();
}

/**
* Master 关闭时调用
*/
public static void shutdown() {
ActorUtils.shutdown();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,10 @@ public static void init() throws UnknownHostException, NoSuchAlgorithmException
actorSystem.actorOf(Props.create(MasterNodeProcessingActor.class),
getActorRefName(MasterNodeProcessingActor.class));

// 节点检测 5m 检测一次
actorSystem.scheduler().schedule(
FiniteDuration.apply(60L, TimeUnit.SECONDS),
FiniteDuration.apply(5L, TimeUnit.MINUTES),
FiniteDuration.apply(30L, TimeUnit.SECONDS),
FiniteDuration.apply(300L, TimeUnit.SECONDS),
hostCheckActor,
new HostCheckCommand(),
actorSystem.dispatcher(),
Expand Down Expand Up @@ -136,6 +137,18 @@ public static ActorRef getRemoteActor(String hostname, String actorName) {
return actorRef;
}

/**
* shutdown
*/
public static void shutdown() {
if(actorSystem != null) {
try {
actorSystem.shutdown();
} catch (Exception ignore){}
actorSystem = null;
}
}

/**
* Get ActorRef name from Class name.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,27 +17,41 @@

package com.datasophon.api.master;

import akka.actor.ActorRef;
import akka.actor.UntypedActor;
import akka.pattern.Patterns;
import akka.util.Timeout;
import com.datasophon.api.service.ClusterHostService;
import com.datasophon.api.service.ClusterInfoService;
import com.datasophon.api.service.ClusterServiceRoleInstanceService;
import com.datasophon.api.utils.SpringTool;
import com.datasophon.common.command.HostCheckCommand;
import com.datasophon.common.command.PingCommand;
import com.datasophon.common.model.HostInfo;
import com.datasophon.common.utils.ExecResult;
import com.datasophon.common.utils.PromInfoUtils;
import com.datasophon.common.utils.Result;
import com.datasophon.dao.entity.ClusterHostEntity;
import com.datasophon.dao.entity.ClusterInfoEntity;
import com.datasophon.dao.entity.ClusterServiceRoleInstanceEntity;

import com.datasophon.dao.enums.MANAGED;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.Duration;

import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import akka.actor.UntypedActor;

/**
* 节点状态监测
*/
public class HostCheckActor extends UntypedActor {

private static final Logger logger = LoggerFactory.getLogger(HostCheckActor.class);
Expand All @@ -52,16 +66,51 @@ public void onReceive(Object msg) throws Throwable {
SpringTool.getApplicationContext().getBean(ClusterServiceRoleInstanceService.class);
ClusterInfoService clusterInfoService =
SpringTool.getApplicationContext().getBean(ClusterInfoService.class);

// Host or cluster
final HostCheckCommand hostCheckCommand = (HostCheckCommand)msg;
final HostInfo hostInfo = hostCheckCommand.getHostInfo();

// 获取当前安装并且正在运行的集群
Result result = clusterInfoService.runningClusterList();
List<ClusterInfoEntity> clusterList = (List<ClusterInfoEntity>) result.getData();

for (ClusterInfoEntity clusterInfoEntity : clusterList) {
// 获取集群上安装的 Prometheus 服务, 从 Prometheus 获取CPU、磁盘使用量等
ClusterServiceRoleInstanceEntity prometheusInstance =
roleInstanceService.getOneServiceRole("Prometheus", "", clusterInfoEntity.getId());
if (Objects.nonNull(prometheusInstance)) {
// 集群正常安装了 Prometheus
List<ClusterHostEntity> list = clusterHostService.getHostListByClusterId(clusterInfoEntity.getId());
String promUrl = "http://" + prometheusInstance.getHostname() + ":9090/api/v1/query";
for (ClusterHostEntity clusterHostEntity : list) {
if(hostInfo != null && !StringUtils.equals(clusterHostEntity.getHostname(), hostInfo.getHostname())) {
// 指定了节点,直接只处理这一个节点的
continue;
}
try {
// rpc 检测
final ActorRef pingActor = ActorUtils.getRemoteActor(clusterHostEntity.getHostname(), "pingActor");
PingCommand pingCommand = new PingCommand();
pingCommand.setMessage("ping");
Timeout timeout = new Timeout(Duration.create(180, TimeUnit.SECONDS));
Future<Object> execFuture = Patterns.ask(pingActor, pingCommand, timeout);
ExecResult execResult = (ExecResult) Await.result(execFuture, timeout.duration());
if (execResult.getExecResult()) {
logger.info("ping host: {} success", clusterHostEntity.getHostname());
} else {
logger.warn("ping host: {} fail, reason: {}", clusterHostEntity.getHostname(), execResult.getExecOut());
throw new IllegalStateException("ping host: " + clusterHostEntity.getHostname() + " failed.");
}
clusterHostEntity.setHostState(1);
clusterHostEntity.setManaged(MANAGED.YES);
} catch (Exception e) {
logger.warn("host: " + clusterHostEntity.getHostname() + " rpc error, cause: " + e.getMessage());
clusterHostEntity.setHostState(3);
clusterHostEntity.setManaged(MANAGED.NO);
// ping 失败,则修改节点状态为 false
continue;
}
try {
String hostname = clusterHostEntity.getHostname();
// 查询内存总量
Expand Down Expand Up @@ -106,12 +155,51 @@ public void onReceive(Object msg) throws Throwable {
clusterHostEntity.setAverageLoad(cpuLoad);
}
} catch (Exception e) {
logger.info(e.getMessage());
logger.warn("check cluster state error, cause: {}", e.getMessage());
}
}
if (list.size() > 0) {
clusterHostService.updateBatchById(list);
}
} else {
// 没有 Prometheus?直接获取节点,通过 rpc 检测是否启动
List<ClusterHostEntity> hosts = clusterHostService.getHostListByClusterId(clusterInfoEntity.getId());
List<ClusterHostEntity> checkedHosts = new ArrayList<>(hosts.size());
for (ClusterHostEntity host : hosts) {
if(hostInfo != null && !StringUtils.equals(host.getHostname(), hostInfo.getHostname())) {
// 指定了节点,直接只处理这一个节点的
continue;
}
// copy 一个新的,只更新状态
ClusterHostEntity checkedHost = new ClusterHostEntity();
checkedHost.setId(host.getId());
checkedHost.setCheckTime(new Date());
try {
// rpc 检测
final ActorRef pingActor = ActorUtils.getRemoteActor(host.getHostname(), "pingActor");
PingCommand pingCommand = new PingCommand();
pingCommand.setMessage("ping");
Timeout timeout = new Timeout(Duration.create(180, TimeUnit.SECONDS));
Future<Object> execFuture = Patterns.ask(pingActor, pingCommand, timeout);
ExecResult execResult = (ExecResult) Await.result(execFuture, timeout.duration());
if (execResult.getExecResult()) {
logger.info("ping host: {} success", host.getHostname());
} else {
logger.warn("ping host: {} fail, reason: {}", host.getHostname(), execResult.getExecOut());
throw new IllegalStateException("ping host: " + host.getHostname() + " failed.");
}
checkedHost.setHostState(1);
checkedHost.setManaged(MANAGED.YES);
} catch (Exception e) {
logger.warn("host: " + host.getHostname() + " rpc error, cause: " + e.getMessage());
checkedHost.setManaged(MANAGED.NO);
checkedHost.setHostState(3);
}
checkedHosts.add(checkedHost);
}
if (checkedHosts.size() > 0) {
clusterHostService.updateBatchById(checkedHosts);
}
}
}
} else {
Expand Down
6 changes: 6 additions & 0 deletions datasophon-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,12 @@
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>

<dependency>
<groupId>org.apache.ant</groupId>
<artifactId>ant</artifactId>
<version>1.10.13</version>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.datasophon.common.command;

import lombok.Data;

import java.io.Serializable;

/**
*
*
* @author zhenqin
*/
@Data
public class PingCommand implements Serializable {

private String message;

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.datasophon.common.utils;

import com.google.common.io.CharStreams;
import com.google.common.io.LineProcessor;
import org.apache.commons.codec.binary.Hex;
import org.apache.commons.lang.StringUtils;
import org.apache.tools.tar.TarEntry;
import org.apache.tools.tar.TarInputStream;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.charset.Charset;
import java.security.MessageDigest;
import java.util.zip.GZIPInputStream;
/**
*
* 基本文件的特殊操作,文件MD5,从 targz 压缩包不解压读取一个文本文件,读取一个文件的第一行 等
*
* <pre>
*
* Created by zhenqin.
* User: zhenqin
* Date: 2023/4/21
* Time: 下午9:58
*
* </pre>
*
* @author zhenqin
*/
public class FileUtils {


/**
* 获取一个文件的md5值(可处理大文件)
* @return md5 value
*/
public static String md5(File file) {
try (FileInputStream fileInputStream = new FileInputStream(file);) {
MessageDigest MD5 = MessageDigest.getInstance("MD5");

byte[] buffer = new byte[8192];
int length;
while ((length = fileInputStream.read(buffer)) != -1) {
MD5.update(buffer, 0, length);
}
return new String(Hex.encodeHex(MD5.digest()));
} catch (Exception e) {
throw new IllegalStateException(e);
}
}


/**
* 从 tar.gz 的压缩包内读取一个 文本文件
* @param targz
* @param name
* @return
* @throws IOException
*/
public static String readTargzTextFile(File targz, String name, Charset charset) throws IOException {
String content = null;
TarEntry tarEntry = null;
try (TarInputStream tarInputStream = new TarInputStream(new GZIPInputStream(new FileInputStream(targz)));
BufferedReader reader = new BufferedReader(new InputStreamReader(tarInputStream, charset));){
boolean hasNext = reader.readLine() != null;
if(hasNext) {
return null;
}
while ((tarEntry = tarInputStream.getNextEntry()) != null ) {
String entryName = tarEntry.getName();
if (tarEntry.isDirectory()) {
// 如果是文件夹,创建文件夹并加速循环
continue;
}
if(entryName.endsWith(name)) {
// 找到第一个文件就结束
content = CharStreams.toString(reader);
break;
}
}
}
return content;
}


/**
* 读取文件第一行,第一行的非空行
* @param file
* @return
* @throws Exception
*/
public static String readFirstLine(File file) throws Exception {
final String firstLine = CharStreams.readLines(new FileReader(file), new LineProcessor<String>() {

String firstLine = null;

@Override
public boolean processLine(String line) throws IOException {
this.firstLine = line;
// 第一行非空则返回
return StringUtils.trimToNull(line) == null;
}

@Override
public String getResult() {
return firstLine;
}
});
return firstLine;
}
}
Loading