Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(portal-server/mis-server): 启动时跳过已停用集群的ssh检查 #1347

Merged
merged 13 commits into from
Jul 17, 2024
6 changes: 6 additions & 0 deletions .changeset/twelve-olives-scream.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
"@scow/portal-server": patch
"@scow/mis-server": patch
---

在门户和管理系统启动时只检查启用中集群登录节点的 ssh 连接,在管理系统启用集群操作中检查登录节点的 ssh 连接
15 changes: 11 additions & 4 deletions apps/mis-server/src/bl/PriceMap.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,13 @@ import { Logger } from "@ddadaal/tsgrpc-server";
import { MySqlDriver, SqlEntityManager } from "@mikro-orm/mysql";
import { Partition } from "@scow/scheduler-adapter-protos/build/protos/config";
import { calculateJobPrice } from "src/bl/jobPrice";
import { configClusters } from "src/config/clusters";
import { misConfig } from "src/config/mis";
import { JobPriceInfo } from "src/entities/JobInfo";
import { AmountStrategy, JobPriceItem } from "src/entities/JobPriceItem";
import { ClusterPlugin } from "src/plugins/clusters";

import { getActivatedClusters } from "./clustersUtils";

export interface JobInfo {
// cluster job id
jobId: number;
Expand Down Expand Up @@ -91,12 +92,18 @@ export async function createPriceMap(
// partitions info for all clusters
const partitionsForClusters: Record<string, Partition[]> = {};

// call for all config clusters
// call for all activated clusters
const activatedClusters = await getActivatedClusters(em, logger).catch((e) => {
logger.info("!!![important] No available activated clusters.This will skip creating price map in cluster!!!");
logger.info(e);
return {};
});
const reply = await clusterPlugin.callOnAll(
configClusters,
activatedClusters,
logger,
async (client) => await asyncClientCall(client.config, "getClusterConfig", {}),
);

reply.forEach((x) => {
partitionsForClusters[x.cluster] = x.result.partitions;
});
Expand All @@ -109,7 +116,7 @@ export async function createPriceMap(

const missingPaths = [] as string[];

for (const cluster in configClusters) {
for (const cluster in activatedClusters) {
for (const partition of partitionsForClusters[cluster]) {
const path = [cluster, partition.name];
const { qos } = partition;
Expand Down
2 changes: 1 addition & 1 deletion apps/mis-server/src/bl/clustersUtils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ export async function getClustersRuntimeInfo(
});

const clusterDatabaseList = clustersFromDb.map((x) => {
return `(Cluster ID: ${x.clusterId}) : ${x.activationStatus}`;
return `Cluster ID: ${x.clusterId}, Current Status: ${x.activationStatus}`;
}).join("; ");

logger.info("Current clusters list: %s", clusterDatabaseList);
Expand Down
21 changes: 15 additions & 6 deletions apps/mis-server/src/plugins/clusters.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import { ClusterConfigSchema, getLoginNode } from "@scow/config/build/cluster";
import { getSchedulerAdapterClient, SchedulerAdapterClient } from "@scow/lib-scheduler-adapter";
import { scowErrorMetadata } from "@scow/lib-server/build/error";
import { testRootUserSshLogin } from "@scow/lib-ssh";
import { updateCluster } from "src/bl/clustersUtils";
import { getActivatedClusters, updateCluster } from "src/bl/clustersUtils";
import { configClusters } from "src/config/clusters";
import { rootKeyPair } from "src/config/env";

Expand Down Expand Up @@ -52,8 +52,20 @@ export const ADAPTER_CALL_ON_ONE_ERROR = "ADAPTER_CALL_ON_ONE_ERROR";

export const clustersPlugin = plugin(async (f) => {

// initial clusters database
const configClusterIds = Object.keys(configClusters);
await updateCluster(f.ext.orm.em.fork(), configClusterIds, f.logger);

if (process.env.NODE_ENV === "production") {
await Promise.all(Object.values(configClusters).map(async ({ displayName, loginNodes }) => {

// only check activated clusters' root user login when system is starting
const activatedClusters = await getActivatedClusters(f.ext.orm.em.fork(), f.logger).catch((e) => {
f.logger.info("!!![important] No available activated clusters.This will skip root ssh login check in cluster!!!");
f.logger.info(e);
return {};
});

await Promise.all(Object.values(activatedClusters).map(async ({ displayName, loginNodes }) => {
const loginNode = getLoginNode(loginNodes[0]);
const address = loginNode.address;
const node = loginNode.name;
Expand All @@ -66,11 +78,8 @@ export const clustersPlugin = plugin(async (f) => {
f.logger.info("Root can login to %s by login node %s", displayName, node);
}
}));
}

// initial clusters database
const configClusterIds = Object.keys(configClusters);
await updateCluster(f.ext.orm.em.fork(), configClusterIds, f.logger);
}

// adapterClient of all config clusters
const adapterClientForClusters = Object.entries(configClusters).reduce((prev, [cluster, c]) => {
Expand Down
4 changes: 3 additions & 1 deletion apps/mis-server/src/services/init.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import { UniqueConstraintViolationException } from "@mikro-orm/core";
import { createUser } from "@scow/lib-auth";
import { InitServiceServer, InitServiceService } from "@scow/protos/build/server/init";
import { authUrl } from "src/config";
import { configClusters } from "src/config/clusters";
import { SystemState } from "src/entities/SystemState";
import { PlatformRole, TenantRole, User } from "src/entities/User";
import { DEFAULT_TENANT_NAME } from "src/utils/constants";
Expand Down Expand Up @@ -72,7 +73,8 @@ export const initServiceServer = plugin((server) => {
server.logger)
.then(async () => {
// 插入公钥失败也认为是创建用户成功
await insertKeyToNewUser(userId, password, server.logger)
// 在所有集群下执行
await insertKeyToNewUser(userId, password, server.logger, configClusters)
.catch(() => null);
return true;
})
Expand Down
107 changes: 68 additions & 39 deletions apps/mis-server/src/services/misConfig.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,13 @@
import { asyncClientCall } from "@ddadaal/tsgrpc-client";
import { plugin } from "@ddadaal/tsgrpc-server";
import { ServiceError, status } from "@grpc/grpc-js";
import { getLoginNode } from "@scow/config/build/cluster";
import { testRootUserSshLogin } from "@scow/lib-ssh";
import { ClusterRuntimeInfo_LastActivationOperation,
ConfigServiceServer, ConfigServiceService } from "@scow/protos/build/server/config";
import { getActivatedClusters, getClustersRuntimeInfo } from "src/bl/clustersUtils";
import { configClusters } from "src/config/clusters";
import { rootKeyPair } from "src/config/env";
import { Cluster, ClusterActivationStatus } from "src/entities/Cluster";

export const misConfigServiceServer = plugin((server) => {
Expand Down Expand Up @@ -75,52 +79,77 @@ export const misConfigServiceServer = plugin((server) => {
activateCluster: async ({ request, em, logger }) => {
const { clusterId, operatorId } = request;

const cluster = await em.findOne(Cluster, { clusterId });

if (!cluster) {
throw {
code: status.NOT_FOUND, message: `Cluster( Cluster ID: ${clusterId}) is not found`,
} as ServiceError;
}
return await em.transactional(async (em) => {
const cluster = await em.findOne(Cluster, { clusterId });

// check current scheduler adapter connection state
// do not need check cluster's activation
await server.ext.clusters.callOnOne(
clusterId,
logger,
async (client) => await asyncClientCall(client.config, "getClusterConfig", {}),
).catch((e) => {
logger.info("Cluster Connection Error ( Cluster ID : %s , Details: %s ) .", cluster, e);
throw {
code: status.FAILED_PRECONDITION,
message: `Activate cluster failed, Cluster( Cluster ID: ${clusterId}) is currently unreachable.`,
} as ServiceError;
});
if (!cluster) {
throw {
code: status.NOT_FOUND, message: `Cluster( Cluster ID: ${clusterId}) is not found`,
} as ServiceError;
}

// when the cluster has already been activated
if (cluster.activationStatus === ClusterActivationStatus.ACTIVATED) {
logger.info("Cluster (Cluster ID: %s) has already been activated",
// check current scheduler adapter connection state
// do not need check cluster's activation
await server.ext.clusters.callOnOne(
clusterId,
logger,
async (client) => await asyncClientCall(client.config, "getClusterConfig", {}),
).catch((e) => {
logger.info("Cluster Connection Error ( Cluster ID : %s , Details: %s ) .", cluster, e);
throw {
code: status.FAILED_PRECONDITION,
message: `Activate cluster failed, Cluster( Cluster ID: ${clusterId}) is currently unreachable.`,
} as ServiceError;
});

// when the cluster has already been activated
if (cluster.activationStatus === ClusterActivationStatus.ACTIVATED) {
logger.info("Cluster (Cluster ID: %s) has already been activated",
clusterId,
);
return [{ executed: false }];
}

// check root user ssh login in the target cluster
const targetClusterLoginNodes = configClusters[clusterId].loginNodes;

const loginNode = getLoginNode(targetClusterLoginNodes[0]);
const address = loginNode.address;
const node = loginNode.name;
logger.info("Checking if root can login to cluster (clusterId: %s) by login node %s",
clusterId, node);
const error = await testRootUserSshLogin(address, rootKeyPair, logger);

if (error) {
logger.info("Root cannot login to cluster (clusterId: %s) by login node %s. err: %o",
clusterId, node, error);
throw {
code: status.FAILED_PRECONDITION,
message: `Activate cluster failed, root login check failed in Cluster( Cluster ID: ${clusterId}) .`,
} as ServiceError;
} else {
logger.info("Root can login to cluster (clusterId: %s) by login node %s", clusterId, node);
}

cluster.activationStatus = ClusterActivationStatus.ACTIVATED;

// save operator userId in lastActivationOperation
const lastActivationOperationMap: ClusterRuntimeInfo_LastActivationOperation = {};

lastActivationOperationMap.operatorId = operatorId;
cluster.lastActivationOperation = lastActivationOperationMap;

await em.persistAndFlush(cluster);

logger.info("Cluster (Cluster ID: %s) is successfully activated by user (User Id: %s)",
clusterId,
operatorId,
);
return [{ executed: false }];
}

cluster.activationStatus = ClusterActivationStatus.ACTIVATED;

// save operator userId in lastActivationOperation
const lastActivationOperationMap: ClusterRuntimeInfo_LastActivationOperation = {};

lastActivationOperationMap.operatorId = operatorId;
cluster.lastActivationOperation = lastActivationOperationMap;

await em.persistAndFlush(cluster);

logger.info("Cluster (Cluster ID: %s) is successfully activated by user (User Id: %s)",
clusterId,
operatorId,
);
return [{ executed: true }];

return [{ executed: true }];
});

},

Expand Down
5 changes: 4 additions & 1 deletion apps/mis-server/src/services/tenant.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import { TenantServiceServer, TenantServiceService } from "@scow/protos/build/se
import { blockAccount, unblockAccount } from "src/bl/block";
import { getActivatedClusters } from "src/bl/clustersUtils";
import { authUrl } from "src/config";
import { configClusters } from "src/config/clusters";
import { Account } from "src/entities/Account";
import { Tenant } from "src/entities/Tenant";
import { TenantRole, User } from "src/entities/User";
Expand Down Expand Up @@ -147,7 +148,9 @@ export const tenantServiceServer = plugin((server) => {
{ identityId: user.userId, id: user.id, mail: user.email, name: user.name, password: userPassword },
logger)
.then(async () => {
await insertKeyToNewUser(userId, userPassword, logger)
// 插入公钥失败也认为是创建用户成功
// 在所有集群下执行
await insertKeyToNewUser(userId, userPassword, logger, configClusters)
.catch(() => { });
return true;
})
Expand Down
5 changes: 4 additions & 1 deletion apps/mis-server/src/services/user.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import {
import { blockUserInAccount, unblockUserInAccount } from "src/bl/block";
import { getActivatedClusters } from "src/bl/clustersUtils";
import { authUrl } from "src/config";
import { configClusters } from "src/config/clusters";
import { Account } from "src/entities/Account";
import { Tenant } from "src/entities/Tenant";
import { PlatformRole, TenantRole, User } from "src/entities/User";
Expand Down Expand Up @@ -440,7 +441,9 @@ export const userServiceServer = plugin((server) => {
server.logger)
.then(async () => {
// insert public key
await insertKeyToNewUser(identityId, password, server.logger)
// 插入公钥失败也认为是创建用户成功
// 在所有集群下执行
await insertKeyToNewUser(identityId, password, server.logger, configClusters)
.catch(() => {});
return true;
})
Expand Down
6 changes: 5 additions & 1 deletion apps/mis-server/src/tasks/fetch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,11 @@ export async function fetchJobs(
const persistJobAndCharge = async (jobs: ({ cluster: string } & ClusterJobInfo)[]) => {
const result = await em.transactional(async (em) => {

const currentActivatedClusters = await getActivatedClusters(em, logger);
const currentActivatedClusters = await getActivatedClusters(em, logger).catch((e) => {
logger.info("!!![important] No available activated clusters.This will skip fetching Jobs in cluster!!!");
logger.info(e);
return {};
}); ;

// Calculate prices for new info and persist
const pricedJobs: JobInfo[] = [];
Expand Down
5 changes: 3 additions & 2 deletions apps/mis-server/src/utils/createUser.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import { ServiceError } from "@grpc/grpc-js";
import { Status } from "@grpc/grpc-js/build/src/constants";
import { UniqueConstraintViolationException } from "@mikro-orm/core";
import { MySqlDriver, SqlEntityManager } from "@mikro-orm/mysql";
import { getLoginNode } from "@scow/config/build/cluster";
import { ClusterConfigSchema, getLoginNode } from "@scow/config/build/cluster";
import { insertKeyAsUser } from "@scow/lib-ssh";
import { configClusters } from "src/config/clusters";
import { rootKeyPair } from "src/config/env";
Expand Down Expand Up @@ -66,11 +66,12 @@ export async function insertKeyToNewUser(
userId: string,
password: string,
logger: Logger,
currentClusters: Record<string, ClusterConfigSchema>,
) {
// Making an ssh Request to the login node as the user created.
if (process.env.NODE_ENV === "production") {

await Promise.all(Object.values(configClusters).map(async ({ displayName, loginNodes }) => {
await Promise.all(Object.values(currentClusters).map(async ({ displayName, loginNodes }) => {
const node = getLoginNode(loginNodes[0]);
logger.info("Checking if user can login to %s by login node %s", displayName, node.name);

Expand Down
2 changes: 1 addition & 1 deletion apps/mis-web/src/utils/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ export const route: typeof typeboxRoute = (schema, handler) => {

const SCOW_ERROR = e.metadata.get("IS_SCOW_ERROR");
if (!SCOW_ERROR) { throw e; }
const code = e.metadata.get("SCOW_ERROR_CODE")[0].toString();
const code = e.metadata.get("SCOW_ERROR_CODE")?.[0]?.toString();
const details = e.details;

// 如果包含集群详细错误信息
Expand Down
18 changes: 12 additions & 6 deletions apps/portal-server/src/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

import { Server } from "@ddadaal/tsgrpc-server";
import { omitConfigSpec } from "@scow/lib-config";
import { libGetCurrentActivatedClusters } from "@scow/lib-server";
import { readVersionFile } from "@scow/utils/build/version";
import { configClusters } from "src/config/clusters";
import { config } from "src/config/env";
Expand All @@ -28,6 +29,8 @@ import { setupProxyGateway } from "src/utils/proxy";
import { initShellFile } from "src/utils/shell";
import { checkClustersRootUserLogin } from "src/utils/ssh";

import { commonConfig } from "./config/common";

export async function createServer() {

const server = new Server({
Expand All @@ -51,17 +54,20 @@ export async function createServer() {
await server.register(dashboardServiceServer);
await server.register(fileServiceServer);
await server.register(desktopServiceServer);


if (process.env.NODE_ENV === "production") {
await checkClustersRootUserLogin(server.logger);
await Promise.all(Object.entries(configClusters).map(async ([id]) => {
const activatedClusters = await libGetCurrentActivatedClusters(
server.logger,
configClusters,
config.MIS_SERVER_URL,
commonConfig.scowApi?.auth?.token);

await checkClustersRootUserLogin(server.logger, activatedClusters);
await Promise.all(Object.entries(activatedClusters).map(async ([id]) => {
await initShellFile(id, server.logger);
}));
await setupProxyGateway(server.logger);
await setupProxyGateway(server.logger, activatedClusters);
}



return server;
}
Loading
Loading