Skip to content

Commit

Permalink
添加DorisFEObserver角色并自动添加 (#394)
Browse files Browse the repository at this point in the history
  • Loading branch information
a19920714liou authored Aug 30, 2023
1 parent 25627a5 commit 1ebdf00
Show file tree
Hide file tree
Showing 12 changed files with 322 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,35 @@
"url": "http://${host}:18030"
}
},
{
"name": "DorisFEObserver",
"label": "DorisFEObserver",
"roleType": "worker",
"logFile": "fe/log/fe.log",
"jmxPort": 18031,
"startRunner": {
"timeout": "60",
"program": "fe/bin/start_fe.sh",
"args": [
"--daemon"
]
},
"stopRunner": {
"timeout": "600",
"program": "fe/bin/stop_fe.sh",
"args": [
"--daemon"
]
},
"statusRunner": {
"timeout": "60",
"program": "fe/bin/status_fe.sh",
"args": [
"status",
"fe"
]
}
},
{
"name": "DorisBE",
"label": "DorisBE",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,37 @@
*/
public enum OlapOpsType {

ADD_BE,
ADD_FE
ADD_BE(1, "backend"),
ADD_FE_FOLLOWER(1, "follower"),
ADD_FE_OBSERVER(1, "observer");

private int value;

private String desc;

OlapOpsType(int value, String desc) {
this.value = value;
this.desc = desc;
}

public int getValue() {
return value;
}

public void setValue(int value) {
this.value = value;
}

public String getDesc() {
return desc;
}

public void setDesc(String desc) {
this.desc = desc;
}

@Override
public String toString() {
return this.desc;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,21 @@ public static ExecResult addFollower(String feMaster, String hostname) {
return execResult;
}

public static ExecResult addObserver(String feMaster, String hostname) {
ExecResult execResult = new ExecResult();
String sql = "ALTER SYSTEM add OBSERVER \"" + hostname + ":9010\";";
logger.info("Add fe to cluster , the sql is {}", sql);
try {
executeSql(feMaster, hostname, sql);
execResult.setExecResult(true);
} catch (ClassNotFoundException e) {
e.printStackTrace();
} catch (SQLException throwables) {
throwables.printStackTrace();
}
return execResult;
}

public static ExecResult addBackend(String feMaster, String hostname) {
ExecResult execResult = new ExecResult();
String sql = "ALTER SYSTEM add BACKEND \"" + hostname + ":9050\";";
Expand Down Expand Up @@ -88,6 +103,19 @@ public static ExecResult addFollowerBySqlClient(String feMaster,
return ShellUtils.exceShell(sqlCommand);
}

public static ExecResult addObserverBySqlClient(String feMaster,
String hostname) {
String sqlCommand =
"mysql -h"
+ feMaster
+ " -uroot -P9030 -e"
+ " 'ALTER SYSTEM add OBSERVER \""
+ hostname
+ ":9010\"';";
// logger.info("sqlCommand is {}", sqlCommand);
return ShellUtils.exceShell(sqlCommand);
}

public static ExecResult addBackendBySqlClient(String feMaster,
String hostname) {
String sqlCommand =
Expand All @@ -106,7 +134,7 @@ private static Connection getConnection(String feMaster) throws ClassNotFoundExc
String password = "";
String url = "jdbc:mysql://" + feMaster + ":9030";
// 加载驱动
Class.forName("com.mysql.jdbc.Driver");
Class.forName("com.mysql.cj.jdbc.Driver");
return DriverManager.getConnection(url, username, password);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package com.datasophon.api.master;

import akka.actor.UntypedActor;
import com.datasophon.common.command.OlapOpsType;
import cn.hutool.json.JSONUtil;
import com.datasophon.common.command.OlapSqlExecCommand;
import com.datasophon.common.utils.ExecResult;
import com.datasophon.common.utils.OlapUtils;
Expand All @@ -16,29 +16,47 @@ public class MasterNodeProcessingActor extends UntypedActor {

@Override
public void onReceive(Object message) throws Throwable {
logger.info("MasterNodeProcessingActor receive message: " + JSONUtil.toJsonStr(message) );
if (message instanceof OlapSqlExecCommand) {
OlapSqlExecCommand command = (OlapSqlExecCommand) message;
ExecResult execResult = OlapOpsType.ADD_BE.equals(command.getOpsType())
? OlapUtils.addBackend(command.getFeMaster(), command.getHostName())
: OlapUtils.addFollower(command.getFeMaster(), command.getHostName());
String tip = OlapOpsType.ADD_BE.equals(command.getOpsType()) ? "backend" : "follower";
ExecResult execResult = new ExecResult();
String tip = command.getOpsType().getDesc();
switch (command.getOpsType()) {
case ADD_BE:
execResult = OlapUtils.addBackend(command.getFeMaster(), command.getHostName());
break;
case ADD_FE_FOLLOWER:
execResult = OlapUtils.addFollower(command.getFeMaster(), command.getHostName());
break;
case ADD_FE_OBSERVER:
execResult = OlapUtils.addObserver(command.getFeMaster(), command.getHostName());
break;
}
if (execResult.getExecResult()) {
logger.info(command.getHostName() + " " + tip + " be added success");
logger.info(command.getHostName() + " " + tip + " added success");
} else {
logger.info(command.getHostName() + " " + tip + " be added failed");
logger.info(command.getHostName() + " " + tip + " added failed");
}
int tryTimes = 0;
while (!execResult.getExecResult() && tryTimes < 3) {
try {
TimeUnit.SECONDS.sleep(10L);
execResult = OlapOpsType.ADD_BE.equals(command.getOpsType())
? OlapUtils.addBackendBySqlClient(command.getFeMaster(), command.getHostName())
: OlapUtils.addFollowerBySqlClient(command.getFeMaster(), command.getHostName());
switch (command.getOpsType()) {
case ADD_BE:
execResult = OlapUtils.addBackendBySqlClient(command.getFeMaster(), command.getHostName());
break;
case ADD_FE_FOLLOWER:
execResult = OlapUtils.addFollowerBySqlClient(command.getFeMaster(), command.getHostName());
break;
case ADD_FE_OBSERVER:
execResult = OlapUtils.addObserverBySqlClient(command.getFeMaster(), command.getHostName());
break;
}
if (execResult.getExecResult()) {
logger.info(command.getHostName() + " " + tip + " be added success");
logger.info(command.getHostName() + " " + tip + " added success");
break;
} else {
logger.info(command.getHostName() + " " + tip + " be added failed");
logger.info(command.getHostName() + " " + tip + " added failed");
}
tryTimes++;
} catch (InterruptedException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,9 @@ public void onReceive(Object msg) throws Throwable {
+ Constants.UNDERLINE
+ roleInstanceEntity.getServiceRoleName();
logger.info("jmxKey is {}", jmxKey);
if ("SRFE".equals(roleInstanceEntity.getServiceRoleName()) || "DorisFE".equals(roleInstanceEntity.getServiceRoleName())) {
if ("SRFE".equals(roleInstanceEntity.getServiceRoleName())
|| "DorisFE".equals(roleInstanceEntity.getServiceRoleName())
|| "DorisFEObserver".equals(roleInstanceEntity.getServiceRoleName())) {
logger.info(ServiceRoleJmxMap.get(jmxKey));
feList.add(
roleInstanceEntity.getHostname() + ":" + ServiceRoleJmxMap.get(jmxKey));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ public void onReceive(Object msg) throws Throwable {
"SRFE",
"SRBE",
"DorisFE",
"DorisFEObserver",
"DorisBE",
"NameNode",
"ResourceManager"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package com.datasophon.api.strategy;

import cn.hutool.core.util.ObjUtil;
import com.datasophon.api.load.GlobalVariables;
import com.datasophon.api.utils.ProcessUtils;
import com.datasophon.common.model.ProcInfo;
Expand All @@ -39,8 +40,12 @@ public class FEHandlerStartegy implements ServiceRoleStrategy {
@Override
public void handler(Integer clusterId, List<String> hosts) {
Map<String, String> globalVariables = GlobalVariables.get(clusterId);
if (hosts.size() >= 1) {
ProcessUtils.generateClusterVariable(globalVariables, clusterId, "${feMaster}", hosts.get(0));
// if feMaster is null, set the first host as feMaster
//Prevent FE Observer nodes from starting and FE Master nodes from changing
if (!globalVariables.containsKey("${feMaster}") || ObjUtil.isNull(globalVariables.get("${feMaster}"))) {
if (!hosts.isEmpty()) {
ProcessUtils.generateClusterVariable(globalVariables, clusterId, "${feMaster}", hosts.get(0));
}
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.datasophon.api.strategy;

import cn.hutool.core.util.ObjUtil;
import com.datasophon.api.load.GlobalVariables;
import com.datasophon.api.utils.ProcessUtils;
import com.datasophon.common.model.ProcInfo;
import com.datasophon.common.model.ServiceConfig;
import com.datasophon.common.model.ServiceRoleInfo;
import com.datasophon.common.utils.OlapUtils;
import com.datasophon.dao.entity.ClusterServiceRoleInstanceEntity;
import com.datasophon.dao.enums.AlertLevel;
import com.datasophon.dao.enums.ServiceRoleState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;
import java.util.Map;

public class FEObserverHandlerStartegy implements ServiceRoleStrategy {

private static final Logger logger = LoggerFactory.getLogger(FEObserverHandlerStartegy.class);

@Override
public void handler(Integer clusterId, List<String> hosts) {

}

@Override
public void handlerConfig(Integer clusterId, List<ServiceConfig> list) {

}

@Override
public void getConfig(Integer clusterId, List<ServiceConfig> list) {

}

@Override
public void handlerServiceRoleInfo(ServiceRoleInfo serviceRoleInfo, String hostname) {
Map<String, String> globalVariables = GlobalVariables.get(serviceRoleInfo.getClusterId());
String feMaster = globalVariables.get("${feMaster}");
if (hostname.equals(feMaster)) {
logger.info("fe master is {}", feMaster);
serviceRoleInfo.setSortNum(1);
} else {
logger.info("set fe follower master");
serviceRoleInfo.setMasterHost(feMaster);
serviceRoleInfo.setSlave(true);
serviceRoleInfo.setSortNum(2);
}

}

@Override
public void handlerServiceRoleCheck(ClusterServiceRoleInstanceEntity roleInstanceEntity,
Map<String, ClusterServiceRoleInstanceEntity> map) {
Map<String, String> globalVariables = GlobalVariables.get(roleInstanceEntity.getClusterId());
String feMaster = globalVariables.get("${feMaster}");
if (roleInstanceEntity.getHostname().equals(feMaster)
&& roleInstanceEntity.getServiceRoleState() == ServiceRoleState.RUNNING) {
try {
List<ProcInfo> frontends = OlapUtils.showFrontends(feMaster);
resolveProcInfoAlert(roleInstanceEntity.getServiceRoleName(), frontends, map);
} catch (Exception e) {

}


}
}
private void resolveProcInfoAlert(String serviceRoleName, List<ProcInfo> frontends,
Map<String, ClusterServiceRoleInstanceEntity> map) {
for (ProcInfo frontend : frontends) {
ClusterServiceRoleInstanceEntity roleInstanceEntity = map.get(frontend.getHostName() + serviceRoleName);
if (!frontend.getAlive()) {
String alertTargetName = serviceRoleName + " Not Add To Cluster";
logger.info("{} at host {} is not add to cluster", serviceRoleName, frontend.getHostName());
String alertAdvice = "The errmsg is " + frontend.getErrMsg();
ProcessUtils.saveAlert(roleInstanceEntity, alertTargetName, AlertLevel.WARN, alertAdvice);
} else {
ProcessUtils.recoverAlert(roleInstanceEntity);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ public class ServiceRoleStrategyContext {
map.put("ZKFC", new ZKFCHandlerStrategy());
map.put("SRFE", new FEHandlerStartegy());
map.put("DorisFE", new FEHandlerStartegy());
map.put("DorisFEObserver", new FEObserverHandlerStartegy());
map.put("SRBE", new BEHandlerStartegy());
map.put("DorisBE", new BEHandlerStartegy());
map.put("Krb5Kdc", new Krb5KdcHandlerStrategy());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package com.datasophon.worker.strategy;

import akka.actor.ActorRef;
import cn.hutool.json.JSON;
import cn.hutool.json.JSONUtil;
import com.datasophon.common.Constants;
import com.datasophon.common.cache.CacheUtils;
import com.datasophon.common.command.OlapOpsType;
Expand All @@ -41,6 +43,7 @@ public FEHandlerStrategy(String serviceName,String serviceRoleName) {
@Override
public ExecResult handler(ServiceRoleOperateCommand command) {
ExecResult startResult = new ExecResult();
logger.info("FEHandlerStrategy start fe"+ JSONUtil.toJsonStr(command));
ServiceHandler serviceHandler = new ServiceHandler(command.getServiceName(), command.getServiceRoleName());
if (command.getCommandType() == CommandType.INSTALL_SERVICE) {
if (command.isSlave()) {
Expand All @@ -62,7 +65,7 @@ public ExecResult handler(ServiceRoleOperateCommand command) {
OlapSqlExecCommand sqlExecCommand = new OlapSqlExecCommand();
sqlExecCommand.setFeMaster(command.getMasterHost());
sqlExecCommand.setHostName(CacheUtils.getString(Constants.HOSTNAME));
sqlExecCommand.setOpsType(OlapOpsType.ADD_FE);
sqlExecCommand.setOpsType(OlapOpsType.ADD_FE_FOLLOWER);
ActorUtils.getRemoteActor(command.getManagerHost(), "masterNodeProcessingActor")
.tell(sqlExecCommand, ActorRef.noSender());
} catch (Exception e) {
Expand Down
Loading

0 comments on commit 1ebdf00

Please sign in to comment.