Skip to content

Commit

Permalink
Add stubbed polling of job details from the mgmt-api. This will not w…
Browse files Browse the repository at this point in the history
…ork without the actual client implementation
  • Loading branch information
burmanm committed Aug 10, 2023
1 parent 32a0584 commit bdba052
Show file tree
Hide file tree
Showing 5 changed files with 233 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ public void run(ReaperApplicationConfiguration config, Environment environment)
Cryptograph cryptograph = context.config == null || context.config.getCryptograph() == null
? new NoopCrypotograph() : context.config.getCryptograph().create();

initializeManagement(context, cryptograph);
initializeManagement(context, environment, cryptograph);

context.repairManager = RepairManager.create(
context,
Expand Down Expand Up @@ -299,14 +299,17 @@ public void run(ReaperApplicationConfiguration config, Environment environment)
}


private void initializeManagement(AppContext context, Cryptograph cryptograph) {
private void initializeManagement(AppContext context, Environment environment, Cryptograph cryptograph) {
if (context.managementConnectionFactory == null) {
LOG.info("no management connection factory given in context, creating default");
if (context.config.getHttpManagement() == null || !context.config.getHttpManagement().getEnabled()) {
LOG.info("HTTP management connection config not set, or set disabled. Creating JMX connection factory instead");
context.managementConnectionFactory = new JmxManagementConnectionFactory(context, cryptograph);
} else {
context.managementConnectionFactory = new HttpManagementConnectionFactory(context);
ScheduledExecutorService jobStatusPollerExecutor = environment.lifecycle()
.scheduledExecutorService("JobStatusPoller")
.threads(2).build();
context.managementConnectionFactory = new HttpManagementConnectionFactory(context, jobStatusPollerExecutor);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import io.cassandrareaper.core.Table;
import io.cassandrareaper.management.ICassandraManagementProxy;
import io.cassandrareaper.management.RepairStatusHandler;
import io.cassandrareaper.management.http.models.Job;
import io.cassandrareaper.management.http.models.JobStatusTracker;
import io.cassandrareaper.service.RingRange;

import java.io.IOException;
Expand All @@ -33,29 +35,50 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import javax.management.JMException;
import javax.management.openmbean.CompositeData;
import javax.management.openmbean.TabularData;
import javax.validation.constraints.NotNull;

import com.codahale.metrics.InstrumentedScheduledExecutorService;
import com.codahale.metrics.MetricRegistry;
import com.google.common.collect.Maps;
import org.apache.cassandra.repair.RepairParallelism;

public class HttpCassandraManagementProxy implements ICassandraManagementProxy {
public static final int DEFAULT_POLL_INTERVAL_IN_MILLISECONDS = 5000;
String host;
MetricRegistry metricRegistry;
String rootPath;
InetSocketAddress endpoint;

private final ConcurrentMap<Integer, RepairStatusHandler> repairStatusHandlers = Maps.newConcurrentMap();
private final ConcurrentMap<String, JobStatusTracker> jobTracker = Maps.newConcurrentMap();
private final ConcurrentMap<Integer, ExecutorService> repairStatusExecutors = Maps.newConcurrentMap();


private ScheduledExecutorService statusTracker;

public HttpCassandraManagementProxy(MetricRegistry metricRegistry,
String rootPath,
InetSocketAddress endpoint
InetSocketAddress endpoint,
ScheduledExecutorService executor
) {
this.metricRegistry = metricRegistry;
this.rootPath = rootPath;
this.endpoint = endpoint;
this.statusTracker = new InstrumentedScheduledExecutorService(executor, metricRegistry);

// TODO Perhaps the poll interval should be configurable through context.config ?
this.scheduleJobPoller(statusTracker, DEFAULT_POLL_INTERVAL_IN_MILLISECONDS);
}

@Override
Expand Down Expand Up @@ -149,13 +172,22 @@ public int triggerRepair(
List<RingRange> associatedTokens,
int repairThreadCount)
throws ReaperException {
return 1; //TODO: implement me

String jobId = "repair-1"; // TODO Make the actual implementation to call the mgmt-api to repair
int repairNo = Integer.parseInt(jobId.substring(7));

repairStatusExecutors.putIfAbsent(repairNo, Executors.newSingleThreadExecutor());
repairStatusHandlers.putIfAbsent(repairNo, repairStatusHandler);
return repairNo;
}

@Override
public void removeRepairStatusHandler(int repairNo) {
// TODO: implement me.
repairStatusHandlers.remove(repairNo);
ExecutorService repairStatusExecutor = repairStatusExecutors.remove(repairNo);
if (null != repairStatusExecutor) {
repairStatusExecutor.shutdown();
}
}

@Override
Expand Down Expand Up @@ -228,5 +260,39 @@ public String getUntranslatedHost() {
return "";
}

private Job getJobStatus(String id) {
// Poll with HTTP client the job's status
return new Job("repair", "repair-12345678"); // Stub, but this is the format that mgmt-api returns
}

private void scheduleJobPoller(ScheduledExecutorService scheduler, int pollInterval) {
scheduler.scheduleWithFixedDelay(
() -> {
if (jobTracker.size() > 0) {
for (Map.Entry<String, JobStatusTracker> entry : jobTracker.entrySet()) {
Job job = getJobStatus(entry.getKey());
int availableNotificationsIndex = job.getStatusChanges().size() - 1;
int currentNotificationCount = entry.getValue().latestNotificationCount.get();
if (currentNotificationCount < availableNotificationsIndex) {
// We need to process the new ones
for (int i = currentNotificationCount; i < availableNotificationsIndex; i++) {
Job.StatusChange statusChange = job.getStatusChanges().get(i);
// remove "repair-" prefix
int repairNo = Integer.parseInt(job.getJobId().substring(7));
repairStatusExecutors.get(repairNo).submit(() -> {
repairStatusHandlers
.get(repairNo)
.handle(repairNo, Optional.empty(), Optional.of(statusChange.getStatus()),
statusChange.getMessage(), this);
});
}
}
}
}
},
10,
pollInterval,
TimeUnit.MILLISECONDS);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledExecutorService;
import javax.ws.rs.core.Response;

import com.codahale.metrics.Gauge;
Expand All @@ -47,14 +48,17 @@ public class HttpManagementConnectionFactory implements IManagementConnectionFac
private final MetricRegistry metricRegistry;
private final HostConnectionCounters hostConnectionCounters;

private ScheduledExecutorService jobStatusPollerExecutor;

private final Set<String> accessibleDatacenters = Sets.newHashSet();

// Constructor for HttpManagementConnectionFactory
public HttpManagementConnectionFactory(AppContext context) {
public HttpManagementConnectionFactory(AppContext context, ScheduledExecutorService jobStatusPollerExecutor) {
this.metricRegistry
= context.metricRegistry == null ? new MetricRegistry() : context.metricRegistry;
hostConnectionCounters = new HostConnectionCounters(metricRegistry);
registerConnectionsGauge();
this.jobStatusPollerExecutor = jobStatusPollerExecutor;
}

public ICassandraManagementProxy connectAny(Collection<Node> nodes) throws ReaperException {
Expand Down Expand Up @@ -121,7 +125,8 @@ private ICassandraManagementProxy connectImpl(Node node)
return new HttpCassandraManagementProxy(
metricRegistry,
rootPath,
new InetSocketAddress(node.getHostname(), managementPort)
new InetSocketAddress(node.getHostname(), managementPort),
jobStatusPollerExecutor
);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
/*
* Copyright 2023-2023 DataStax, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.cassandrareaper.management.http.models;

import java.util.ArrayList;
import java.util.List;

import com.google.common.annotations.VisibleForTesting;
import org.apache.cassandra.utils.progress.ProgressEventType;

public class Job {
public enum JobStatus {
ERROR,
COMPLETED,
WAITING;
}

private String jobId;
private String jobType;
private JobStatus status;
private long submitTime;
private long startTime;
private long finishedTime;
private Throwable error;

public class StatusChange {
ProgressEventType status;
long changeTime;

String message;

public StatusChange(ProgressEventType type, String message) {
changeTime = System.currentTimeMillis();
status = type;
this.message = message;
}

public ProgressEventType getStatus() {
return status;
}

public long getChangeTime() {
return changeTime;
}

public String getMessage() {
return message;
}
}

private List<StatusChange> statusChanges;

public Job(String jobType, String jobId) {
this.jobType = jobType;
this.jobId = jobId;
submitTime = System.currentTimeMillis();
status = JobStatus.WAITING;
statusChanges = new ArrayList<>();
}

@VisibleForTesting
// This method is only for testing purposes
public void setJobId(String jobId) {
this.jobId = jobId;
}

public String getJobId() {
return jobId;
}

public String getJobType() {
return jobType;
}

public JobStatus getStatus() {
return status;
}

public void setStatus(JobStatus status) {
this.status = status;
}

public void setStatusChange(ProgressEventType type, String message) {
statusChanges.add(new StatusChange(type, message));
}

public List<StatusChange> getStatusChanges() {
return statusChanges;
}

public long getSubmitTime() {
return submitTime;
}

public long getFinishedTime() {
return finishedTime;
}

public void setFinishedTime(long finishedTime) {
this.finishedTime = finishedTime;
}

public Throwable getError() {
return error;
}

public void setError(Throwable error) {
this.error = error;
}

public void setStartTime(long startTime) {
this.startTime = startTime;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright 2023-2023 DataStax, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.cassandrareaper.management.http.models;

import java.util.concurrent.atomic.AtomicInteger;

public class JobStatusTracker {
public AtomicInteger latestNotificationCount = new AtomicInteger(0);
}

0 comments on commit bdba052

Please sign in to comment.