Skip to content

Commit

Permalink
Fix JobServiceMetricsCommandTest
Browse files Browse the repository at this point in the history
Cherry-pick of existing commit.
orig-pr: TachyonNexus/enterprise#4063
orig-commit: TachyonNexus/enterprise@e39e948
orig-commit-author: Jiacheng Liu <jiacheliu3@gmail.com>

			pr-link: #17314
			change-id: cid-c3cae8f23c6b5695b6908217e815f09fe5edb625
  • Loading branch information
jiacheliu3 authored Apr 25, 2023
1 parent 092703f commit 8da5953
Show file tree
Hide file tree
Showing 23 changed files with 1,257 additions and 28 deletions.
2 changes: 2 additions & 0 deletions core/common/src/main/java/alluxio/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ public final class Constants {
public static final long META_MASTER_CLIENT_SERVICE_VERSION = 2;
public static final long META_MASTER_MASTER_SERVICE_VERSION = 1;
public static final long META_MASTER_PROXY_SERVICE_VERSION = 1;
public static final long JOB_MASTER_MASTER_SERVICE_VERSION = 1;
public static final long METRICS_MASTER_CLIENT_SERVICE_VERSION = 2;
public static final long JOURNAL_MASTER_CLIENT_SERVICE_VERSION = 1;
public static final long RAFT_JOURNAL_SERVICE_VERSION = 1;
Expand All @@ -120,6 +121,7 @@ public final class Constants {
public static final String META_MASTER_CLIENT_SERVICE_NAME = "MetaMaster";
public static final String META_MASTER_PROXY_SERVICE_NAME = "MetaMasterProxy";
public static final String META_MASTER_MASTER_SERVICE_NAME = "MetaMasterMaster";
public static final String JOB_MASTER_MASTER_SERVICE_NAME = "JobMasterMaster";
public static final String METRICS_MASTER_CLIENT_SERVICE_NAME = "MetricsMasterClient";
public static final String BLOCK_WORKER_CLIENT_SERVICE_NAME = "BlockWorkerClient";
public static final String FILE_SYSTEM_WORKER_CLIENT_SERVICE_NAME = "FileSystemWorkerClient";
Expand Down
7 changes: 7 additions & 0 deletions core/common/src/main/java/alluxio/RuntimeConstants.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@

package alluxio;

import alluxio.grpc.BuildVersion;

import javax.annotation.concurrent.ThreadSafe;

/**
Expand Down Expand Up @@ -40,6 +42,11 @@ public final class RuntimeConstants {
? ProjectConstants.REVISION.substring(0, 8) : ProjectConstants.REVISION;
public static final String VERSION_AND_REVISION_SHORT =
VERSION + "-" + REVISION_SHORT;
public static final BuildVersion UNKNOWN_VERSION_INFO = BuildVersion.newBuilder()
.setVersion("UNKNOWN").setRevision("UNKNOWN").build();
public static final BuildVersion CURRENT_VERSION_INFO = BuildVersion.newBuilder()
.setVersion(RuntimeConstants.VERSION)
.setRevision(RuntimeConstants.REVISION_SHORT).build();

/** The relative path to the Alluxio target jar. */
public static final String ALLUXIO_JAR = "target/alluxio-" + VERSION
Expand Down
27 changes: 27 additions & 0 deletions core/common/src/main/java/alluxio/conf/PropertyKey.java
Original file line number Diff line number Diff line change
Expand Up @@ -7285,6 +7285,20 @@ public String toString() {
.setDefaultValue(100000)
.setScope(Scope.MASTER)
.build();
public static final PropertyKey JOB_MASTER_MASTER_HEARTBEAT_INTERVAL =
durationBuilder(Name.JOB_MASTER_MASTER_HEARTBEAT_INTERVAL)
.setDescription("The amount of time that a standby Alluxio Job Master should wait "
+ "in between heartbeats to the primary Job Master.")
.setDefaultValue("1sec")
.setScope(Scope.MASTER)
.build();
public static final PropertyKey JOB_MASTER_MASTER_TIMEOUT =
durationBuilder(Name.JOB_MASTER_MASTER_TIMEOUT)
.setDescription("The time period after which the primary Job Master will mark a standby "
+ "as lost without a subsequent heartbeat.")
.setDefaultValue("60sec")
.setScope(Scope.MASTER)
.build();
public static final PropertyKey JOB_MASTER_WORKER_HEARTBEAT_INTERVAL =
durationBuilder(Name.JOB_MASTER_WORKER_HEARTBEAT_INTERVAL)
.setDescription("The amount of time that the Alluxio job worker should wait in between "
Expand All @@ -7311,6 +7325,13 @@ public String toString() {
.setDefaultValue(format("${%s}", Name.MASTER_HOSTNAME))
.setScope(Scope.ALL)
.build();
public static final PropertyKey JOB_MASTER_LOST_MASTER_INTERVAL =
durationBuilder(Name.JOB_MASTER_LOST_MASTER_INTERVAL)
.setDescription("The time interval the job master waits between checks for "
+ "lost job masters.")
.setDefaultValue("10sec")
.setScope(Scope.MASTER)
.build();
public static final PropertyKey JOB_MASTER_LOST_WORKER_INTERVAL =
durationBuilder(Name.JOB_MASTER_LOST_WORKER_INTERVAL)
.setDescription("The time interval the job master waits between checks for lost workers.")
Expand Down Expand Up @@ -9073,13 +9094,19 @@ public static final class Name {
public static final String JOB_MASTER_FINISHED_JOB_RETENTION_TIME =
"alluxio.job.master.finished.job.retention.time";
public static final String JOB_MASTER_JOB_CAPACITY = "alluxio.job.master.job.capacity";
public static final String JOB_MASTER_MASTER_HEARTBEAT_INTERVAL =
"alluxio.job.master.master.heartbeat.interval";
public static final String JOB_MASTER_MASTER_TIMEOUT =
"alluxio.job.master.master.timeout";
public static final String JOB_MASTER_WORKER_HEARTBEAT_INTERVAL =
"alluxio.job.master.worker.heartbeat.interval";
public static final String JOB_MASTER_WORKER_TIMEOUT =
"alluxio.job.master.worker.timeout";

public static final String JOB_MASTER_BIND_HOST = "alluxio.job.master.bind.host";
public static final String JOB_MASTER_HOSTNAME = "alluxio.job.master.hostname";
public static final String JOB_MASTER_LOST_MASTER_INTERVAL =
"alluxio.job.master.lost.master.interval";
public static final String JOB_MASTER_LOST_WORKER_INTERVAL =
"alluxio.job.master.lost.worker.interval";
public static final String JOB_MASTER_RPC_PORT = "alluxio.job.master.rpc.port";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,9 @@ public void getServiceVersion(GetServiceVersionPRequest request,
case JOB_MASTER_WORKER_SERVICE:
serviceVersion = Constants.JOB_MASTER_WORKER_SERVICE_VERSION;
break;
case JOB_MASTER_MASTER_SERVICE:
serviceVersion = Constants.JOB_MASTER_MASTER_SERVICE_VERSION;
break;
case JOURNAL_MASTER_CLIENT_SERVICE:
serviceVersion = Constants.JOURNAL_MASTER_CLIENT_SERVICE_VERSION;
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ public final class HeartbeatContext {
// Names of different heartbeat executors.
public static final String FUSE_UPDATE_CHECK = "Fuse update check";
public static final String JOB_MASTER_LOST_WORKER_DETECTION = "Job Master Lost Worker Detection";
public static final String JOB_MASTER_LOST_MASTER_DETECTION = "Job Master Lost Master Detection";
public static final String JOB_MASTER_SYNC = "Job Master Sync";
public static final String JOB_WORKER_COMMAND_HANDLING =
"Job Worker Command Handling";
public static final String MASTER_THROTTLE = "Master Throttle";
Expand Down Expand Up @@ -126,6 +128,8 @@ private HeartbeatType(int value) {
sTimerClasses = new HashMap<>();
sTimerClasses.put(MASTER_THROTTLE, SLEEPING_TIMER_CLASS);
sTimerClasses.put(JOB_MASTER_LOST_WORKER_DETECTION, SLEEPING_TIMER_CLASS);
sTimerClasses.put(JOB_MASTER_LOST_MASTER_DETECTION, SLEEPING_TIMER_CLASS);
sTimerClasses.put(JOB_MASTER_SYNC, SLEEPING_TIMER_CLASS);
sTimerClasses.put(JOB_WORKER_COMMAND_HANDLING, SLEEPING_TIMER_CLASS);
sTimerClasses.put(MASTER_ACTIVE_UFS_SYNC, SLEEPING_TIMER_CLASS);
sTimerClasses.put(MASTER_BLOCK_INTEGRITY_CHECK, SLEEPING_TIMER_CLASS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,12 +80,16 @@ public void start() throws Exception {
mLeaderSelector.start(getRpcAddress());

while (!Thread.interrupted()) {
// Start the master components in standby mode
// Eg. for job master they are the JobMaster and JournalMaster
startMasterComponents(false);
LOG.info("Standby started");
// We are in standby mode. Nothing to do until we become the primary.
mLeaderSelector.waitForState(NodeState.PRIMARY);
LOG.info("Transitioning from standby to primary");
mJournalSystem.gainPrimacy();
stopMasterComponents();
LOG.info("Secondary stopped");
LOG.info("Standby stopped");
startMasterComponents(true);
mServices.forEach(SimpleService::promote);
LOG.info("Primary started");
Expand All @@ -96,8 +100,6 @@ public void start() throws Exception {
stopMasterComponents();
mJournalSystem.losePrimacy();
LOG.info("Primary stopped");
startMasterComponents(false);
LOG.info("Standby started");
}
}

Expand Down
81 changes: 81 additions & 0 deletions core/transport/src/main/proto/grpc/job_master.proto
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ message JobWorkerHealth {
optional int32 taskPoolSize = 5;
optional int32 numActiveTasks = 6;
optional int32 unfinishedTasks = 7;
optional grpc.BuildVersion version = 8;
}

message JobCommand {
Expand Down Expand Up @@ -197,6 +198,21 @@ message GetAllWorkerHealthPResponse {
repeated JobWorkerHealth workerHealths = 1;
}

message JobMasterStatus {
optional string state = 1;
optional grpc.NetAddress masterAddress = 2;
optional int64 startTime = 3;
optional grpc.BuildVersion version = 4;
}

message GetAllMasterStatusPOptions {}
message GetAllMasterStatusPRequest {
optional GetAllMasterStatusPOptions options = 1;
}
message GetAllMasterStatusPResponse {
repeated JobMasterStatus jobMasterStatus = 1;
}

message SubmitOptions {}
message SubmitRequest {
optional bytes cmdConfig = 1;
Expand Down Expand Up @@ -275,6 +291,11 @@ service JobMasterClientService {
*/
rpc GetAllWorkerHealth(GetAllWorkerHealthPRequest) returns (GetAllWorkerHealthPResponse);

/**
* Lists all job master status.
*/
rpc GetAllMasterStatus(GetAllMasterStatusPRequest) returns (GetAllMasterStatusPResponse);

/**
* Submit a CMD job, return a jobControlId.
*/
Expand Down Expand Up @@ -305,6 +326,7 @@ message RegisterJobWorkerPOptions {}
message RegisterJobWorkerPRequest {
optional grpc.WorkerNetAddress workerNetAddress = 1;
optional RegisterJobWorkerPOptions options = 2;
optional grpc.BuildVersion version = 3;
}
message RegisterJobWorkerPResponse {
optional int64 id = 1;
Expand All @@ -325,3 +347,62 @@ service JobMasterWorkerService {
*/
rpc RegisterJobWorker(RegisterJobWorkerPRequest) returns (RegisterJobWorkerPResponse);
}

message GetJobMasterIdPOptions {}
message GetJobMasterIdPRequest {
optional grpc.NetAddress masterAddress = 1;
optional GetJobMasterIdPOptions options = 2;
}
message GetJobMasterIdPResponse {
optional int64 masterId = 1;
}

enum JobMasterMetaCommand {
MetaCommand_Unknown = 0;
MetaCommand_Nothing = 1;
MetaCommand_Register = 2; // Ask the standby master to re-register.
}

message RegisterJobMasterPOptions {
optional int64 startTimeMs = 2;
optional int64 losePrimacyTimeMs = 3;
optional grpc.BuildVersion version = 4;
}

message RegisterJobMasterPRequest {
optional int64 jobMasterId = 1;
optional RegisterJobMasterPOptions options = 2;
}
message RegisterJobMasterPResponse {}

message JobMasterHeartbeatPOptions {
}
message JobMasterHeartbeatPRequest {
optional int64 masterId = 1;
optional JobMasterHeartbeatPOptions options = 2;
}
message JobMasterHeartbeatPResponse {
optional JobMasterMetaCommand command = 1;
}

/**
* This interface contains meta master service endpoints for Alluxio standby masters.
*/
service JobMasterMasterService {

/**
* Returns a master id for the given master address.
*/
rpc GetMasterId(GetJobMasterIdPRequest) returns (GetJobMasterIdPResponse);

/**
* Registers a master.
*/
rpc RegisterMaster(RegisterJobMasterPRequest) returns (RegisterJobMasterPResponse);

/**
* Heartbeats to indicate the master is lost or not.
*/
rpc MasterHeartbeat(JobMasterHeartbeatPRequest) returns (JobMasterHeartbeatPResponse);
}

1 change: 1 addition & 0 deletions core/transport/src/main/proto/grpc/version.proto
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ enum ServiceType {
METRICS_MASTER_CLIENT_SERVICE = 9;
JOB_MASTER_CLIENT_SERVICE = 10;
JOB_MASTER_WORKER_SERVICE = 11;
JOB_MASTER_MASTER_SERVICE = 19;
JOURNAL_MASTER_CLIENT_SERVICE = 13;
TABLE_MASTER_CLIENT_SERVICE = 14;
META_MASTER_BACKUP_MESSAGING_SERVICE = 15;
Expand Down
Loading

0 comments on commit 8da5953

Please sign in to comment.