Skip to content

Commit

Permalink
Implement using apiClient the triggerRepair, getJobDetails, scheduler…
Browse files Browse the repository at this point in the history
… as well as add a simple test to ensure the state is managed correctly
  • Loading branch information
burmanm committed Aug 22, 2023
1 parent b4a9ae6 commit 1ccb72a
Show file tree
Hide file tree
Showing 4 changed files with 126 additions and 26 deletions.
2 changes: 1 addition & 1 deletion src/server/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@
<dependency>
<groupId>io.k8ssandra</groupId>
<artifactId>datastax-mgmtapi-client-openapi</artifactId>
<version>0.1.0-507f418</version>
<version>0.1.0-4d2a772</version>
</dependency>
<!--test scope -->

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020-2020 The Last Pickle Ltd
* Copyright 2023-2023 DataStax, Inc.
*
*
* Licensed under the Apache License, Version 2.0 (the "License");
Expand Down Expand Up @@ -47,43 +47,46 @@
import javax.management.openmbean.CompositeData;
import javax.validation.constraints.NotNull;

import com.codahale.metrics.InstrumentedScheduledExecutorService;
import com.codahale.metrics.MetricRegistry;
import com.datastax.mgmtapi.client.api.DefaultApi;
import com.datastax.mgmtapi.client.invoker.ApiClient;
import com.datastax.mgmtapi.client.invoker.ApiException;
import com.datastax.mgmtapi.client.model.Job;
import com.datastax.mgmtapi.client.model.RepairRequest;
import com.datastax.mgmtapi.client.model.StatusChange;
import com.google.common.collect.Maps;
import org.apache.cassandra.repair.RepairParallelism;
import org.apache.cassandra.utils.progress.ProgressEventType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HttpCassandraManagementProxy implements ICassandraManagementProxy {
private static final Logger LOG = LoggerFactory.getLogger(HttpCassandraManagementProxy.class);

public static final int DEFAULT_POLL_INTERVAL_IN_MILLISECONDS = 5000;
private static final Logger LOG = LoggerFactory.getLogger(HttpCassandraManagementProxy.class);
String host;
MetricRegistry metricRegistry;
String rootPath;
InetSocketAddress endpoint;
DefaultApi apiClient;

private final ConcurrentMap<Integer, RepairStatusHandler> repairStatusHandlers = Maps.newConcurrentMap();
private final ConcurrentMap<String, JobStatusTracker> jobTracker = Maps.newConcurrentMap();
private final ConcurrentMap<Integer, ExecutorService> repairStatusExecutors = Maps.newConcurrentMap();
final ConcurrentMap<Integer, RepairStatusHandler> repairStatusHandlers = Maps.newConcurrentMap();
final ConcurrentMap<String, JobStatusTracker> jobTracker = Maps.newConcurrentMap();
final ConcurrentMap<Integer, ExecutorService> repairStatusExecutors = Maps.newConcurrentMap();


private ScheduledExecutorService statusTracker;

public HttpCassandraManagementProxy(MetricRegistry metricRegistry,
String rootPath,
InetSocketAddress endpoint,
ScheduledExecutorService executor
ScheduledExecutorService executor,
DefaultApi apiClient
) {
this.metricRegistry = metricRegistry;
this.rootPath = rootPath;
this.endpoint = endpoint;
this.apiClient = new DefaultApi(
new ApiClient().setBasePath("https://" + endpoint.getHostName() + ":" + endpoint.getPort() + rootPath));
this.statusTracker = new InstrumentedScheduledExecutorService(executor, metricRegistry);
this.apiClient = apiClient;
this.statusTracker = executor;

// TODO Perhaps the poll interval should be configurable through context.config ?
this.scheduleJobPoller(statusTracker, DEFAULT_POLL_INTERVAL_IN_MILLISECONDS);
Expand Down Expand Up @@ -187,11 +190,18 @@ public int triggerRepair(
int repairThreadCount)
throws ReaperException {

String jobId = "repair-1"; // TODO Make the actual implementation to call the mgmt-api to repair
String jobId;
try {
jobId = apiClient.repair1(new RepairRequest());
} catch (ApiException e) {
throw new ReaperException(e);
}

int repairNo = Integer.parseInt(jobId.substring(7));

repairStatusExecutors.putIfAbsent(repairNo, Executors.newSingleThreadExecutor());
repairStatusHandlers.putIfAbsent(repairNo, repairStatusHandler);
jobTracker.put(jobId, new JobStatusTracker());
return repairNo;
}

Expand All @@ -202,6 +212,8 @@ public void removeRepairStatusHandler(int repairNo) {
if (null != repairStatusExecutor) {
repairStatusExecutor.shutdown();
}
String jobId = String.format("repair-%d", repairNo);
jobTracker.remove(jobId);
}

@Override
Expand Down Expand Up @@ -273,16 +285,14 @@ public String getUntranslatedHost() {
return "";
}

private Job getJobStatus(String id) {
Job getJobStatus(String id) {
// Poll with HTTP client the job's status

try {
com.datastax.mgmtapi.client.model.Job jobStatus = apiClient.getJobStatus(id);
Job job = apiClient.getJobStatus(id);
return job;
} catch (ApiException e) {
throw new RuntimeException(e);
}

return new Job("repair", "repair-12345678"); // Stub, but this is the format that mgmt-api returns
}

private void scheduleJobPoller(ScheduledExecutorService scheduler, int pollInterval) {
Expand All @@ -291,26 +301,30 @@ private void scheduleJobPoller(ScheduledExecutorService scheduler, int pollInter
if (jobTracker.size() > 0) {
for (Map.Entry<String, JobStatusTracker> entry : jobTracker.entrySet()) {
Job job = getJobStatus(entry.getKey());
int availableNotificationsIndex = job.getStatusChanges().size() - 1;
int availableNotifications = job.getStatusChanges().size();
int currentNotificationCount = entry.getValue().latestNotificationCount.get();
if (currentNotificationCount < availableNotificationsIndex) {
if (currentNotificationCount < availableNotifications) {
// We need to process the new ones
for (int i = currentNotificationCount; i < availableNotificationsIndex; i++) {
Job.StatusChange statusChange = job.getStatusChanges().get(i);
for (int i = currentNotificationCount; i < availableNotifications; i++) {
StatusChange statusChange = job.getStatusChanges().get(i);
// remove "repair-" prefix
int repairNo = Integer.parseInt(job.getJobId().substring(7));
int repairNo = Integer.parseInt(job.getId().substring(7));
ProgressEventType progressType = ProgressEventType.valueOf(statusChange.toString());
repairStatusExecutors.get(repairNo).submit(() -> {
repairStatusHandlers
.get(repairNo)
.handle(repairNo, Optional.empty(), Optional.of(statusChange.getStatus()),
.handle(repairNo, Optional.empty(), Optional.of(progressType),
statusChange.getMessage(), this);
});

// Update the count as we process them
entry.getValue().latestNotificationCount.incrementAndGet();
}
}
}
}
},
10,
10000,
pollInterval,
TimeUnit.MILLISECONDS);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,10 @@
import javax.ws.rs.core.Response;

import com.codahale.metrics.Gauge;
import com.codahale.metrics.InstrumentedScheduledExecutorService;
import com.codahale.metrics.MetricRegistry;
import com.datastax.mgmtapi.client.api.DefaultApi;
import com.datastax.mgmtapi.client.invoker.ApiClient;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
Expand Down Expand Up @@ -122,11 +125,17 @@ private ICassandraManagementProxy connectImpl(Node node)
if (pidResponse.getStatus() != 200) {
throw new ReaperException("Could not get PID for node " + node.getHostname());
}
DefaultApi apiClient = new DefaultApi(
new ApiClient().setBasePath("https://" + node.getHostname() + ":" + managementPort + rootPath));

InstrumentedScheduledExecutorService statusTracker = new InstrumentedScheduledExecutorService(
jobStatusPollerExecutor, metricRegistry);
return new HttpCassandraManagementProxy(
metricRegistry,
rootPath,
new InetSocketAddress(node.getHostname(), managementPort),
jobStatusPollerExecutor
statusTracker,
apiClient
);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Copyright 2023-2023 DataStax, Inc.
*
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.cassandrareaper.management.http;

import io.cassandrareaper.management.RepairStatusHandler;
import io.cassandrareaper.management.http.models.JobStatusTracker;

import java.math.BigInteger;
import java.net.InetSocketAddress;
import java.util.Collections;
import java.util.concurrent.Callable;
import java.util.concurrent.ScheduledExecutorService;

import com.codahale.metrics.MetricRegistry;
import com.datastax.mgmtapi.client.api.DefaultApi;
import org.apache.cassandra.repair.RepairParallelism;
import org.apache.commons.lang3.concurrent.ConcurrentUtils;
import org.junit.Test;
import org.mockito.Mockito;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.when;

public class HttpCassandraManagementProxyTests {

@Test
public void testRepairHandlers() throws Exception {
HttpManagementConnectionFactory connectionFactory = Mockito.mock(HttpManagementConnectionFactory.class);
DefaultApi mockClient = Mockito.mock(DefaultApi.class);
Mockito.doReturn("repair-123456789").when(mockClient).repair1(any());
ScheduledExecutorService executorService = Mockito.mock(ScheduledExecutorService.class);
Mockito.doReturn(ConcurrentUtils.constantFuture(null)).when(executorService).submit(any(Callable.class));

HttpCassandraManagementProxy httpCassandraManagementProxy = new HttpCassandraManagementProxy(
Mockito.mock(MetricRegistry.class), "",
Mockito.mock(InetSocketAddress.class), executorService, mockClient);
when(connectionFactory.connectAny(any())).thenReturn(httpCassandraManagementProxy);

RepairStatusHandler repairStatusHandler = Mockito.mock(RepairStatusHandler.class);

int repairNo = httpCassandraManagementProxy.triggerRepair(BigInteger.ZERO, BigInteger.ONE, "ks",
RepairParallelism.PARALLEL,
Collections.singleton("table"), true, Collections.emptyList(), repairStatusHandler, Collections.emptyList(), 1);

assertEquals(123456789, repairNo);
assertEquals(1, httpCassandraManagementProxy.jobTracker.size());
String jobId = String.format("repair-%d", repairNo);
assertTrue(httpCassandraManagementProxy.jobTracker.containsKey(jobId));
JobStatusTracker jobStatus = httpCassandraManagementProxy.jobTracker.get(jobId);
assertEquals(0, jobStatus.latestNotificationCount.get());
assertEquals(1, httpCassandraManagementProxy.repairStatusExecutors.size());
assertEquals(1, httpCassandraManagementProxy.repairStatusHandlers.size());
assertTrue(httpCassandraManagementProxy.repairStatusHandlers.containsKey(repairNo));

httpCassandraManagementProxy.removeRepairStatusHandler(repairNo);
assertEquals(0, httpCassandraManagementProxy.jobTracker.size());
assertEquals(0, httpCassandraManagementProxy.repairStatusExecutors.size());
assertEquals(0, httpCassandraManagementProxy.repairStatusHandlers.size());
}
}

0 comments on commit 1ccb72a

Please sign in to comment.