Skip to content

Commit

Permalink
Implement HTTP based getTokens, getEndpointToHostId, getLocalEndpoint (
Browse files Browse the repository at this point in the history
…#1376)

* Implement HttpCassandraManagementProxy.getEndpointToHostId(), HttpCassandraManagementProxy.getLocalEndpoint(), HttpCassandraManagementProxy.getTokens()
  • Loading branch information
Miles-Garnsey committed Oct 3, 2023
1 parent 97eea41 commit fc171a4
Show file tree
Hide file tree
Showing 2 changed files with 113 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

import java.io.IOException;
import java.math.BigInteger;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
Expand Down Expand Up @@ -108,7 +109,23 @@ public String getHost() {

@Override
public List<BigInteger> getTokens() {
return null; // TODO: implement me.
EndpointStates endpointStates;
try {
endpointStates = apiClient.getEndpointStates();
return Arrays.stream(endpointStates.getEntity().stream().filter(i ->
i.getOrDefault("IS_LOCAL", "false")
.equals("true")
)
.findFirst()
.orElseThrow(() -> new RuntimeException("Failed to find local endpoint"))
.get("TOKENS")
.split(","))
.map(strToken -> new BigInteger(strToken)).collect(Collectors.toList());

} catch (ApiException e) {
LOG.error("Failed to retrieve endpoint states", e);
return Collections.emptyList();
}
}

@Override
Expand All @@ -119,13 +136,32 @@ public Map<List<String>, List<String>> getRangeToEndpointMap(String keyspace) th
@NotNull
@Override
public String getLocalEndpoint() throws ReaperException {
return null; // TODO: implement me.
// TODO: validate that this works in all situations. I suspect that if any address translation is
// happening we'll see failures here, but address translation is not in scope in this phase.
// The logic is that host is either a DNS address, or an IP address. If it's a DNS address, we do a
// reverse lookup to get the IP.
try {
return InetAddress.getByName(host).toString().split("/")[1];
} catch (UnknownHostException e) {
throw new ReaperException(e);
}
}

@NotNull
@Override
public Map<String, String> getEndpointToHostId() {
return null; // TODO: implement me.
try {
return apiClient.getEndpointStates().getEntity().stream()
.collect(
Collectors.toMap(
i -> i.get("ENDPOINT_IP"),
i -> i.get("HOST_ID")
)
);
} catch (ApiException ae) {
LOG.error("Failed to retrieve endpoint states - does the HTTP proxy have connectivity?", ae);
return Collections.emptyMap();
}
}

@Override
Expand Down Expand Up @@ -257,7 +293,7 @@ public int triggerRepair(
(new com.datastax.mgmtapi.client.model.RingRange())
.start(i.getStart().longValue())
.end(i.getEnd().longValue())
).collect(Collectors.toList())
).collect(Collectors.toList())
)
);
jobId = resp.getRepairId();
Expand Down Expand Up @@ -439,6 +475,7 @@ public String getDatacenter(String var1) throws UnknownHostException {
// From StreamManagerMBean
@Override
public Set<CompositeData> getCurrentStreams() {
// TODO: implement me
return new HashSet<CompositeData>();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@
import io.cassandrareaper.ReaperException;
import io.cassandrareaper.core.Snapshot;
import io.cassandrareaper.core.Table;
import io.cassandrareaper.management.ICassandraManagementProxy;
import io.cassandrareaper.management.RepairStatusHandler;
import io.cassandrareaper.management.http.models.JobStatusTracker;

import java.math.BigInteger;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
Expand All @@ -38,6 +40,7 @@
import com.datastax.mgmtapi.client.invoker.ApiException;
import com.datastax.mgmtapi.client.model.CompactRequest;
import com.datastax.mgmtapi.client.model.Compaction;
import com.datastax.mgmtapi.client.model.EndpointStates;
import com.datastax.mgmtapi.client.model.Job;
import com.datastax.mgmtapi.client.model.RepairRequest;
import com.datastax.mgmtapi.client.model.RepairRequestResponse;
Expand Down Expand Up @@ -403,5 +406,74 @@ private static HttpCassandraManagementProxy mockProxy(DefaultApi mockClient) {
null, "/", InetSocketAddress.createUnresolved("localhost", 8080), executorService, mockClient);
}

@Test
public void testGetTokens() throws Exception {
DefaultApi mockClient = Mockito.mock(DefaultApi.class);
List<Map<String, String>> mockEntity = new ArrayList<>();
mockEntity.add(ImmutableMap.of(
"TOKENS", "1,2,3,4",
"IS_LOCAL", "true"
)
);
mockEntity.add(ImmutableMap.of(
"TOKENS", "5,6,7,8",
"IS_LOCAL", "false"
)
);
EndpointStates mockEndpointStates = new EndpointStates().entity(mockEntity);
when(mockClient.getEndpointStates()).thenReturn(mockEndpointStates);
mockProxy(mockClient);
assertThat(mockProxy(mockClient).getTokens()).containsOnly(
new BigInteger("1"),
new BigInteger("2"),
new BigInteger("3"),
new BigInteger("4"));
}

@Test
public void getEndpointToHostId() throws Exception {
DefaultApi mockClient = Mockito.mock(DefaultApi.class);
List<Map<String, String>> mockEntity = new ArrayList<>();
mockEntity.add(ImmutableMap.of(
"ENDPOINT_IP", "127.0.0.1",
"HOST_ID", "fakehostID1"
)
);
mockEntity.add(ImmutableMap.of(
"ENDPOINT_IP", "127.0.0.2",
"HOST_ID", "fakehostID2"
)
);
EndpointStates mockEndpointStates = new EndpointStates().entity(mockEntity);
when(mockClient.getEndpointStates()).thenReturn(mockEndpointStates);
mockProxy(mockClient);
assertThat(mockProxy(mockClient).getEndpointToHostId()).containsAllEntriesOf(
ImmutableMap.of(
"127.0.0.1", "fakehostID1",
"127.0.0.2", "fakehostID2"
)
);
}

@Test
public void testGetLocalEndpoint() throws Exception {
DefaultApi mockClient = Mockito.mock(DefaultApi.class);
ScheduledExecutorService executorService = mock(ScheduledExecutorService.class);
when(executorService.submit(any(Callable.class))).thenAnswer(i -> {
Callable<Object> callable = i.getArgument(0);
callable.call();
return ConcurrentUtils.constantFuture(null);
});

ICassandraManagementProxy mockProxyIp = new HttpCassandraManagementProxy(
null, "/", InetSocketAddress.createUnresolved("192.168.1.1", 8080), executorService, mockClient);
assertThat(mockProxyIp.getLocalEndpoint()).isEqualTo("192.168.1.1");

ICassandraManagementProxy mockProxyDns = new HttpCassandraManagementProxy(
null, "/", InetSocketAddress.createUnresolved("localhost", 8080), executorService, mockClient);
assertThat(mockProxyDns.getLocalEndpoint()).isEqualTo("127.0.0.1");


}

}

0 comments on commit fc171a4

Please sign in to comment.