Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(mis): mis-server启动时,不完整同步一次封锁状态 #1374

Merged
merged 5 commits into from
Jul 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/ninety-zoos-think.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@scow/grpc-api": patch
---

当 mis-server 正在进行一次封锁状态同步时,调用 server/AdminService.UpdateBlockStatus API 会抛出`AlreadyExists`错误
6 changes: 6 additions & 0 deletions .changeset/shaggy-maps-hunt.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
"@scow/mis-server": patch
"@scow/mis-web": patch
---

mis-server 启动时,不完整运行一次封锁状态同步
3 changes: 3 additions & 0 deletions apps/mis-server/config/mis.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,7 @@ db:
password: mysqlrootpassword
dbName: scow_server_${JEST_WORKER_ID}

periodicSyncUserAccountBlockStatus:
enabled: false


2 changes: 0 additions & 2 deletions apps/mis-server/src/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,5 @@ export async function createServer() {
await server.register(misConfigServiceServer);
await server.register(exportServiceServer);

await server.ext.syncBlockStatus.sync();

return server;
}
58 changes: 34 additions & 24 deletions apps/mis-server/src/plugins/syncBlockStatus.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,32 +23,43 @@ export interface SyncBlockStatusPlugin {
stop: () => void;
schedule: string;
lastSyncTime: () => Date | null;
sync: () => Promise<SyncBlockStatusResponse>;
run: () => Promise<SyncBlockStatusResponse | undefined>;
}
}

export const syncBlockStatusPlugin = plugin(async (f) => {
const synchronizeCron = misConfig.periodicSyncUserAccountBlockStatus?.cron ?? "0 4 * * *";
let synchronizeStarted = !!misConfig.periodicSyncUserAccountBlockStatus?.enabled;
let synchronizeEnabled = !!misConfig.periodicSyncUserAccountBlockStatus?.enabled;
let synchronizeIsRunning = false;

const logger = f.logger.child({ plugin: "syncBlockStatus" });
logger.info("misConfig.periodicSyncStatus?.cron: %s", misConfig.periodicSyncUserAccountBlockStatus?.cron);

const trigger = () => {
if (synchronizeIsRunning) return;
const trigger = async () => {

const sublogger = logger.child({ time: new Date() });

if (synchronizeIsRunning) {
sublogger.info("Sync is already running.");
return Promise.resolve(undefined);
}

synchronizeIsRunning = true;
return synchronizeBlockStatus(f.ext.orm.em.fork(), logger, f.ext)
.finally(() => { synchronizeIsRunning = false; });
sublogger.info("Sync starts to run.");

try {
return await synchronizeBlockStatus(f.ext.orm.em.fork(), sublogger, f.ext);
} finally {
synchronizeIsRunning = false;
}
};

const task = cron.schedule(
synchronizeCron,
() => { void trigger(); },
{
timezone: "Asia/Shanghai",
scheduled: misConfig.periodicSyncUserAccountBlockStatus?.enabled,
scheduled: synchronizeEnabled,
},
);

Expand All @@ -60,27 +71,26 @@ export const syncBlockStatusPlugin = plugin(async (f) => {
});

f.addExtension("syncBlockStatus", ({
started: () => synchronizeStarted,
started: () => synchronizeEnabled,
start: () => {
if (synchronizeStarted) {
logger.info("Sync is requested to start but already started");
} else {
task.start();
synchronizeStarted = true;
logger.info("Sync started");
}
logger.info("Sync is started");
synchronizeEnabled = true;
task.start();
},
stop: () => {
if (!synchronizeStarted) {
logger.info("Sync is requested to stop but already stopped");
} else {
task.stop();
synchronizeStarted = false;
logger.info("Sync stopped");
}
logger.info("Sync is started");
synchronizeEnabled = false;
task.stop();
},
schedule: synchronizeCron,
lastSyncTime: () => lastSyncTime,
sync: trigger,
} as SyncBlockStatusPlugin["syncBlockStatus"]));
run: trigger,
} satisfies SyncBlockStatusPlugin["syncBlockStatus"]));

if (synchronizeEnabled) {
logger.info("Started a new synchronization");
void trigger();
} else {
logger.info("Account/Account block sychronization is disabled.");
}
});
10 changes: 8 additions & 2 deletions apps/mis-server/src/services/admin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@
*/

import { asyncClientCall } from "@ddadaal/tsgrpc-client";
import { ServiceError } from "@ddadaal/tsgrpc-common";
import { plugin } from "@ddadaal/tsgrpc-server";
import { ServiceError } from "@grpc/grpc-js";
import { Status } from "@grpc/grpc-js/build/src/constants";
import { libCheckActivatedClusters } from "@scow/lib-server/build/misCommon/clustersActivation";
import {
Expand Down Expand Up @@ -230,7 +230,13 @@ export const adminServiceServer = plugin((server) => {
// check whether there is activated cluster in SCOW
// cause syncBlockStatus in plugin will skip the check
await getActivatedClusters(em, logger);
const reply = await server.ext.syncBlockStatus.sync();
const reply = await server.ext.syncBlockStatus.run();
if (!reply) {
throw new ServiceError({
code: Status.ALREADY_EXISTS,
message: "Sync is already running. Please wait for its completion before starting a new one.",
});
}
return [reply];
},

Expand Down
3 changes: 2 additions & 1 deletion apps/mis-server/tests/admin/updateBlockStatus.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ afterEach(async () => {
await server.close();
});

it("test whether the block update time exists at startup", async () => {
// Test server will not sync block status at startup
it.skip("test whether the block update time exists at startup", async () => {
const em = server.ext.orm.em.fork();
const updateTime = await em.findOne(SystemState, { key: SystemState.KEYS.UPDATE_SLURM_BLOCK_STATUS });
expect(updateTime).not.toBeNull();
Expand Down
2 changes: 2 additions & 0 deletions apps/mis-web/src/i18n/en.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1003,6 +1003,8 @@ export default {
alertInfo: "SCOW will regularly synchronize the blocking status of accounts and users to the scheduler. "
+ "You can click Sync Now to perform a manual synchronization.",
periodicSyncUserAccountBlockStatusInfo: "Periodically Synchronize Scheduler Account And User Blocked Status",
syncAlreadyStarted:
"Synchronization is already started. Please wait for its completion before starting a new run.",
turnedOn: "Turned On",
paused: "Paused",
stopSync: "Stop Synchronization",
Expand Down
2 changes: 1 addition & 1 deletion apps/mis-web/src/i18n/zh_cn.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1001,7 +1001,7 @@ export default {
slurmBlockStatus: {
syncUserAccountBlockingStatus: "用户账户封锁状态同步",
alertInfo: "SCOW会定期向调度器同步SCOW数据库中账户和用户的封锁状态,您可以点击立刻同步执行一次手动同步",

syncAlreadyStarted: "正在进行一次同步。请等待本次同步执行完成后,再重新同步。",
periodicSyncUserAccountBlockStatusInfo:"周期性同步调度器账户和用户的封锁状态",
turnedOn: "已开启",
paused: "已暂停",
Expand Down
3 changes: 3 additions & 0 deletions apps/mis-web/src/pages/admin/systemDebug/slurmBlockStatus.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,9 @@ export const SlurmBlockStatusPage: NextPage = requireAuth((u) => u.platformRoles
onClick={() => {
setFetching(true);
api.syncBlockStatus({})
.httpError(409, () => {
message.error(t(p("syncAlreadyStarted")));
})
.then(({ blockedFailedAccounts, unblockedFailedAccounts, blockedFailedUserAccounts }) => {
if (!(blockedFailedAccounts.length || unblockedFailedAccounts.length
|| blockedFailedUserAccounts.length)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,14 @@

import { typeboxRouteSchema } from "@ddadaal/next-typed-api-routes-runtime";
import { asyncClientCall } from "@ddadaal/tsgrpc-client";
import { status } from "@grpc/grpc-js";
import { AdminServiceClient } from "@scow/protos/build/server/admin";
import { Type } from "@sinclair/typebox";
import { authenticate } from "src/auth/server";
import { PlatformRole } from "src/models/User";
import { getClient } from "src/utils/client";
import { route } from "src/utils/route";
import { handlegRPCError } from "src/utils/server";

export const SyncBlockStatusSchema = typeboxRouteSchema({
method: "PUT",
Expand All @@ -31,6 +33,7 @@ export const SyncBlockStatusSchema = typeboxRouteSchema({
})),
unblockedFailedAccounts: Type.Array(Type.String()),
}),
409: Type.Null(),
},
});
const auth = authenticate((info) => info.platformRoles.includes(PlatformRole.PLATFORM_ADMIN));
Expand All @@ -43,6 +46,10 @@ export default route(SyncBlockStatusSchema,

const client = getClient(AdminServiceClient);

return await asyncClientCall(client, "syncBlockStatus", {}).then((x) => ({ 200: x }));
return await asyncClientCall(client, "syncBlockStatus", {})
.then((x) => ({ 200: x }))
.catch(handlegRPCError({
[status.ALREADY_EXISTS]: () => ({ 409: null }),
}));

});
5 changes: 5 additions & 0 deletions protos/server/admin.proto
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,11 @@ service AdminService {
rpc GetFetchInfo(GetFetchInfoRequest) returns (GetFetchInfoResponse);
rpc SetFetchState(SetFetchStateRequest) returns (SetFetchStateResponse);
rpc FetchJobs(FetchJobsRequest) returns (FetchJobsResponse);

/*
* Synchronize block status of account and account user to the backing scheduler
* If the synchronization is already running when the API is called, it throws ALREADY_EXISTS
*/
rpc UpdateBlockStatus(UpdateBlockStatusRequest)
returns (UpdateBlockStatusResponse);
rpc GetAdminInfo(GetAdminInfoRequest) returns (GetAdminInfoResponse) {
Expand Down
Loading