Skip to content

Commit

Permalink
Add EclipseStore to memory storage for persistence (#1484)
Browse files Browse the repository at this point in the history
* Add a K/V store for the memory backend

* Fix Memory storage persistence so objects are reinstantiated correctly

* Configure EclipseStore to persist transient fields

* update Management API client version

* Fix segments by repair run for memory storage

* Remove Guava ImmutableMap as it doesn't serialize in EclipseStore

---------

Co-authored-by: Alexander Dejanovski <adejanovski@gmail.com>
  • Loading branch information
emerkle826 and adejanovski authored Apr 11, 2024
1 parent 4fad46a commit bb99e2d
Show file tree
Hide file tree
Showing 15 changed files with 461 additions and 122 deletions.
11 changes: 10 additions & 1 deletion src/server/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
<docker.directory>src/main/docker</docker.directory>
<timestamp>${maven.build.timestamp}</timestamp>
<maven.build.timestamp.format>yyyy-MM-dd HH:mm:ss</maven.build.timestamp.format>
<management.api.version>0.1.74</management.api.version>
<eclipsestore.version>1.3.1</eclipsestore.version>

<!-- Cucumber Scenario Outline Examples listing released versions to test db migration against -->
<!-- anything added here must first be added as a artifact in build/plugins/plugins@maven-dependency-plugin -->
Expand Down Expand Up @@ -274,8 +276,15 @@
<dependency>
<groupId>io.k8ssandra</groupId>
<artifactId>datastax-mgmtapi-client-openapi</artifactId>
<version>0.1.70</version>
<version>${management.api.version}</version>
</dependency>
<dependency>
<groupId>org.eclipse.store</groupId>
<artifactId>storage-embedded</artifactId>
<version>${eclipsestore.version}</version>
</dependency>


<!--test scope -->

<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,10 @@ public final class ReaperApplicationConfiguration extends Configuration {
@Nullable
private CryptographFactory cryptograph;

@JsonProperty
@Nullable
private String persistenceStoragePath;

public HttpManagement getHttpManagement() {
return httpManagement;
}
Expand Down Expand Up @@ -508,6 +512,15 @@ public void setCryptograph(@Nullable CryptographFactory cryptograph) {
this.cryptograph = cryptograph;
}

public void setPersistenceStoragePath(@Nullable String persistenceStoragePath) {
this.persistenceStoragePath = persistenceStoragePath;
}

@Nullable
public String getPersistenceStoragePath() {
return persistenceStoragePath;
}

public enum DatacenterAvailability {
/* We require direct JMX access to all nodes across all datacenters */
ALL,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@
import java.math.BigInteger;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.Nullable;

import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonPOJOBuilder;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import org.joda.time.DateTime;

@JsonDeserialize(builder = RepairSegment.Builder.class)
Expand All @@ -43,7 +43,7 @@ public final class RepairSegment {
private final String coordinatorHost;
private final DateTime startTime;
private final DateTime endTime;
private final Map<String, String> replicas;
private final Map<String, String> replicas = new ConcurrentHashMap<>();
// hostID field is only ever populated for incremental repairs. For full repairs it is always null.
private final UUID hostID;

Expand All @@ -58,9 +58,9 @@ private RepairSegment(Builder builder, @Nullable UUID id) {
this.coordinatorHost = builder.coordinatorHost;
this.startTime = builder.startTime;
this.endTime = builder.endTime;
this.replicas = builder.replicas != null
? ImmutableMap.copyOf(builder.replicas)
: null;
if (builder.replicas != null) {
this.replicas.putAll(builder.replicas);
}
this.hostID = builder.hostID;
}

Expand Down
10 changes: 5 additions & 5 deletions src/server/src/main/java/io/cassandrareaper/core/Segment.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonPOJOBuilder;
import com.google.common.collect.ImmutableMap;

@JsonDeserialize(builder = Segment.Builder.class)
public final class Segment {
Expand All @@ -38,17 +38,17 @@ public final class Segment {

RingRange baseRange;
List<RingRange> tokenRanges;
Map<String, String> replicas;
Map<String, String> replicas = new ConcurrentHashMap<>();

private Segment(Builder builder) {
this.tokenRanges = builder.tokenRanges;
this.baseRange = builder.tokenRanges.get(0);
if (builder.baseRange != null) {
this.baseRange = builder.baseRange;
}
this.replicas = builder.replicas != null
? ImmutableMap.copyOf(builder.replicas)
: null;
if (builder.replicas != null) {
this.replicas.putAll(builder.replicas);
}
}

public RingRange getBaseRange() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@
import com.google.common.base.Splitter;
import com.google.common.base.Supplier;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.cassandra.repair.RepairParallelism;
import org.slf4j.Logger;
Expand Down Expand Up @@ -146,7 +145,7 @@ static List<Segment> filterSegmentsByNodes(

@VisibleForTesting
static Map<String, List<RingRange>> buildEndpointToRangeMap(Map<List<String>, List<String>> rangeToEndpoint) {
Map<String, List<RingRange>> endpointToRange = Maps.newHashMap();
Map<String, List<RingRange>> endpointToRange = new ConcurrentHashMap<>();

for (Entry<List<String>, List<String>> entry : rangeToEndpoint.entrySet()) {
RingRange range = new RingRange(entry.getKey().toArray(new String[entry.getKey().size()]));
Expand All @@ -163,7 +162,7 @@ static Map<String, List<RingRange>> buildEndpointToRangeMap(Map<List<String>, Li
@VisibleForTesting
static Map<List<String>, List<RingRange>> buildReplicasToRangeMap(
Map<List<String>, List<String>> rangeToEndpoint) {
Map<List<String>, List<RingRange>> replicasToRange = Maps.newHashMap();
Map<List<String>, List<RingRange>> replicasToRange = new ConcurrentHashMap<>();

for (Entry<List<String>, List<String>> entry : rangeToEndpoint.entrySet()) {
RingRange range = new RingRange(entry.getKey().toArray(new String[entry.getKey().size()]));
Expand Down Expand Up @@ -369,7 +368,7 @@ Map<String, String> getDCsByNodeForRepairSegment(
ICassandraManagementProxy jmxConnection = clusterFacade.connect(cluster);
// when hosts are coming up or going down, this method can throw an UndeclaredThrowableException
Collection<String> nodes = clusterFacade.tokenRangeToEndpoint(cluster, keyspace, segment);
Map<String, String> dcByNode = Maps.newHashMap();
Map<String, String> dcByNode = new ConcurrentHashMap<>();
nodes.forEach(node -> dcByNode.put(node, EndpointSnitchInfoProxy.create(jmxConnection).getDataCenter(node)));
if (repairUnit.getDatacenters().isEmpty()) {
return dcByNode;
Expand All @@ -393,7 +392,7 @@ Map<String, String> getDCsByNodeForRepairSegment(
@VisibleForTesting
Map<String, RingRange> getClusterNodes(Cluster targetCluster, RepairUnit repairUnit) throws ReaperException {
ConcurrentHashMap<String, RingRange> nodesWithRanges = new ConcurrentHashMap<>();
Map<List<String>, List<String>> rangeToEndpoint = Maps.newHashMap();
Map<List<String>, List<String>> rangeToEndpoint = new ConcurrentHashMap<>();

try {
rangeToEndpoint
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,9 @@ public IStorageDao initializeStorageBackend()
LOG.info("Initializing the database and performing schema migrations");

if ("memory".equalsIgnoreCase(config.getStorageType())) {
storage = new MemoryStorageFacade();
Preconditions.checkArgument(config.getPersistenceStoragePath() != null,
"persistenceStoragePath is required for memory storage type");
storage = new MemoryStorageFacade(config.getPersistenceStoragePath());
} else if (Lists.newArrayList("cassandra", "astra").contains(config.getStorageType())) {
CassandraStorageFacade.CassandraMode mode = config.getStorageType().equals("cassandra")
? CassandraStorageFacade.CassandraMode.CASSANDRA
Expand Down
Loading

0 comments on commit bb99e2d

Please sign in to comment.