Skip to content

Commit

Permalink
feat: health check (#50)
Browse files Browse the repository at this point in the history
* feat: health check

* feat: health check in RouteClient

* chore: default options values

* feat: async health check task

* fix: atomic refresh

* chore: add comment

* chore: lock this object

* chore: lock a clearly object
  • Loading branch information
fengjiachun authored Sep 26, 2024
1 parent c88b65f commit 47349ff
Show file tree
Hide file tree
Showing 9 changed files with 182 additions and 50 deletions.
7 changes: 6 additions & 1 deletion ingester-protocol/src/main/java/io/greptime/GreptimeDB.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
/**
* The GreptimeDB client.
*/
public class GreptimeDB implements Write, WriteObject, Lifecycle<GreptimeOptions>, Display {
public class GreptimeDB implements Write, WriteObject, Lifecycle<GreptimeOptions>, Health, Display {

private static final Logger LOG = LoggerFactory.getLogger(GreptimeDB.class);

Expand Down Expand Up @@ -177,6 +177,11 @@ public StreamWriter<Table, WriteOk> streamWriter(int maxPointsPerSecond, Context
return this.writeClient.streamWriter(maxPointsPerSecond, attachCtx(ctx));
}

@Override
public CompletableFuture<Map<Endpoint, Boolean>> checkHealth() {
return this.routerClient.checkHealth();
}

@Override
public void display(Printer out) {
out.println("--- GreptimeDB Client ---")
Expand Down
29 changes: 29 additions & 0 deletions ingester-protocol/src/main/java/io/greptime/Health.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright 2023 Greptime Team
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.greptime;

import io.greptime.common.Endpoint;
import java.util.Map;
import java.util.concurrent.CompletableFuture;

/**
* Health check. It just like to probe the database and connections.
* Users can use this status to perform fault tolerance and disaster recovery actions.
*/
public interface Health {
CompletableFuture<Map<Endpoint, Boolean>> checkHealth();
}
15 changes: 4 additions & 11 deletions ingester-protocol/src/main/java/io/greptime/Router.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,10 @@ public interface Router<R, E> {
CompletableFuture<E> routeFor(R request);

/**
* Refresh the routing table from remote server.
* @return a future that will be completed when the refresh is done
*/
CompletableFuture<Boolean> refresh();

/**
* Refresh the routing table.
* We need to get all the endpoints, and this method will overwrite all
* current endpoints.
* Refresh the routing table. By health checker or service discovery.
*
* @param endpoints all new endpoints
* @param activities all activities endpoints
* @param inactivities all inactivities endpoints
*/
void onRefresh(List<E> endpoints);
void onRefresh(List<E> activities, List<E> inactivities);
}
109 changes: 90 additions & 19 deletions ingester-protocol/src/main/java/io/greptime/RouterClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,25 +25,33 @@
import io.greptime.rpc.Context;
import io.greptime.rpc.Observer;
import io.greptime.rpc.RpcClient;
import io.greptime.v1.Health.HealthCheckRequest;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* A route rpc client which cached the routing table information locally
* and will auto refresh.
*/
public class RouterClient implements Lifecycle<RouterOptions>, Display {
public class RouterClient implements Lifecycle<RouterOptions>, Health, Display {

private static final Logger LOG = LoggerFactory.getLogger(RouterClient.class);

private static final SharedScheduledPool REFRESHER_POOL = Util.getSharedScheduledPool("route_cache_refresher", 1);

private final AtomicLong refreshSequencer = new AtomicLong(0);

private ScheduledExecutorService refresher;
private RouterOptions opts;
private RpcClient rpcClient;
Expand All @@ -57,19 +65,44 @@ public boolean init(RouterOptions opts) {
List<Endpoint> endpoints = Ensures.ensureNonNull(this.opts.getEndpoints(), "null `endpoints`");

this.router = new DefaultRouter();
this.router.onRefresh(endpoints);
this.router.onRefresh(endpoints, null);

long refreshPeriod = this.opts.getRefreshPeriodSeconds();
if (refreshPeriod > 0) {
this.refresher = REFRESHER_POOL.getObject();
this.refresher.scheduleWithFixedDelay(
() -> this.router.refresh().whenComplete((r, e) -> {
if (e != null) {
LOG.error("Router cache refresh failed.", e);
} else {
LOG.debug("Router cache refresh {}.", r ? "success" : "failed");
}
}),
() -> {
long thisSequence = this.refreshSequencer.incrementAndGet();
checkHealth().whenComplete((r, t) -> {
if (t != null) {
LOG.warn("Failed to check health", t);
return;
}

synchronized (this.refreshSequencer) {
// If the next task has started, we will ignore the current result.
//
// I don't want to worry about the overflow issue with long anymore,
// because assuming one increment per second, it will take 292 years
// to overflow. I think that's sufficient.
if (thisSequence < this.refreshSequencer.get()) {
LOG.warn("Skip outdated health check result, sequence: {}", thisSequence);
return;
}

List<Endpoint> activities = new ArrayList<>();
List<Endpoint> inactivities = new ArrayList<>();
for (Map.Entry<Endpoint, Boolean> entry : r.entrySet()) {
if (entry.getValue()) {
activities.add(entry.getKey());
} else {
inactivities.add(entry.getKey());
}
}
this.router.onRefresh(activities, inactivities);
}
});
},
Util.randomInitialDelay(180),
refreshPeriod,
TimeUnit.SECONDS);
Expand Down Expand Up @@ -204,6 +237,22 @@ public String toString() {
return "RouterClient{" + "refresher=" + refresher + ", opts=" + opts + ", rpcClient=" + rpcClient + '}';
}

@Override
public CompletableFuture<Map<Endpoint, Boolean>> checkHealth() {
Map<Endpoint, CompletableFuture<Boolean>> futures = this.opts.getEndpoints().stream()
.collect(Collectors.toMap(Function.identity(), endpoint -> {
HealthCheckRequest req = HealthCheckRequest.newBuilder().build();
return this.invoke(endpoint, req, Context.newDefault())
.thenApply(resp -> true)
.exceptionally(t -> false); // Handle failure and return false
}));

return CompletableFuture.allOf(futures.values().toArray(new CompletableFuture[0]))
.thenApply(
ok -> futures.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue()
.join())));
}

/**
* Request to a `frontend` server, which needs to return all members(frontend server),
* or it can return only one domain address, it is also possible to return no address
Expand All @@ -219,25 +268,47 @@ public String toString() {
*/
private static class DefaultRouter implements Router<Void, Endpoint> {

private final AtomicReference<List<Endpoint>> endpointsRef = new AtomicReference<>();
private final AtomicReference<Endpoints> endpointsRef = new AtomicReference<>();

@Override
public CompletableFuture<Endpoint> routeFor(Void request) {
List<Endpoint> endpoints = this.endpointsRef.get();
Endpoints endpoints = this.endpointsRef.get();

if (endpoints == null) {
return Util.errorCf(new IllegalStateException("null `endpoints`"));
}

ThreadLocalRandom random = ThreadLocalRandom.current();
int i = random.nextInt(0, endpoints.size());
return Util.completedCf(endpoints.get(i));

if (!endpoints.activities.isEmpty()) {
int i = random.nextInt(0, endpoints.activities.size());
return Util.completedCf(endpoints.activities.get(i));
}

if (!endpoints.inactivities.isEmpty()) {
int i = random.nextInt(0, endpoints.inactivities.size());
Endpoint goodLuck = endpoints.inactivities.get(i);
LOG.warn("No active endpoint, return an inactive one: {}", goodLuck);
return Util.completedCf(goodLuck);
}

return Util.errorCf(new IllegalStateException("empty `endpoints`"));
}

@Override
public CompletableFuture<Boolean> refresh() {
// always return true
return Util.completedCf(true);
public void onRefresh(List<Endpoint> activities, List<Endpoint> inactivities) {
LOG.info("Router cache refreshed, activities: {}, inactivities: {}", activities, inactivities);
this.endpointsRef.set(new Endpoints(activities, inactivities));
}
}

@Override
public void onRefresh(List<Endpoint> endpoints) {
this.endpointsRef.set(endpoints);
static class Endpoints {
final List<Endpoint> activities;
final List<Endpoint> inactivities;

Endpoints(List<Endpoint> activities, List<Endpoint> inactivities) {
this.activities = activities == null ? new ArrayList<>() : activities;
this.inactivities = inactivities == null ? new ArrayList<>() : inactivities;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,31 +19,46 @@
import io.greptime.rpc.MethodDescriptor;
import io.greptime.rpc.RpcFactoryProvider;
import io.greptime.v1.Database;
import io.greptime.v1.Health;

/**
* The RPC service register.
*/
public class RpcServiceRegister {

private static final String METHOD_TEMPLATE = "greptime.v1.GreptimeDatabase/%s";
private static final String DATABASE_METHOD_TEMPLATE = "greptime.v1.GreptimeDatabase/%s";
private static final String HEALTH_METHOD_TEMPLATE = "greptime.v1.HealthCheck/%s";

public static void registerAllService() {
// register protobuf serializer
// Handle
MethodDescriptor handleMethod = MethodDescriptor.of(
String.format(DATABASE_METHOD_TEMPLATE, "Handle"), MethodDescriptor.MethodType.UNARY, 1);
RpcFactoryProvider.getRpcFactory()
.register(
MethodDescriptor.of(
String.format(METHOD_TEMPLATE, "Handle"), MethodDescriptor.MethodType.UNARY, 1),
handleMethod,
Database.GreptimeRequest.class,
Database.GreptimeRequest.getDefaultInstance(),
Database.GreptimeResponse.getDefaultInstance());

// HandleRequests
MethodDescriptor handleRequestsMethod = MethodDescriptor.of(
String.format(DATABASE_METHOD_TEMPLATE, "HandleRequests"),
MethodDescriptor.MethodType.CLIENT_STREAMING);
RpcFactoryProvider.getRpcFactory()
.register(
MethodDescriptor.of(
String.format(METHOD_TEMPLATE, "HandleRequests"),
MethodDescriptor.MethodType.CLIENT_STREAMING),
handleRequestsMethod,
Database.GreptimeRequest.class,
Database.GreptimeRequest.getDefaultInstance(),
Database.GreptimeResponse.getDefaultInstance());

// HealthCheck
MethodDescriptor healthCheckMethod = MethodDescriptor.of(
String.format(HEALTH_METHOD_TEMPLATE, "HealthCheck"), MethodDescriptor.MethodType.UNARY);
RpcFactoryProvider.getRpcFactory()
.register(
healthCheckMethod,
Health.HealthCheckRequest.class,
Health.HealthCheckRequest.getDefaultInstance(),
Health.HealthCheckResponse.getDefaultInstance());
}
}
14 changes: 14 additions & 0 deletions ingester-protocol/src/main/java/io/greptime/Util.java
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,20 @@ public static <U> CompletableFuture<U> completedCf(U value) {
return CompletableFuture.completedFuture(value);
}

/**
* Returns a new CompletableFuture that is already exceptionally with the given
* error.
*
* @param t the given exception
* @param <U> the type of the value
* @return the exceptionally {@link CompletableFuture}
*/
public static <U> CompletableFuture<U> errorCf(Throwable t) {
CompletableFuture<U> err = new CompletableFuture<>();
err.completeExceptionally(t);
return err;
}

public static <V> Observer<V> toObserver(CompletableFuture<V> future) {
return new Observer<V>() {

Expand Down
8 changes: 7 additions & 1 deletion ingester-protocol/src/main/java/io/greptime/WriteClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import io.greptime.v1.Common;
import io.greptime.v1.Database;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
Expand All @@ -56,7 +57,7 @@
/**
* Default Write API impl.
*/
public class WriteClient implements Write, Lifecycle<WriteOptions>, Display {
public class WriteClient implements Write, Health, Lifecycle<WriteOptions>, Display {

private static final Logger LOG = LoggerFactory.getLogger(WriteClient.class);

Expand Down Expand Up @@ -254,6 +255,11 @@ public void onCompleted() {
};
}

@Override
public CompletableFuture<Map<Endpoint, Boolean>> checkHealth() {
return this.routerClient.checkHealth();
}

@Override
public void display(Printer out) {
out.println("--- WriteClient ---")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@
* GreptimeDB client options.
*/
public class GreptimeOptions implements Copiable<GreptimeOptions> {
public static final int DEFAULT_WRITE_MAX_RETRIES = 1;
public static final int DEFAULT_MAX_IN_FLIGHT_WRITE_POINTS = 10 * 65536;
public static final int DEFAULT_DEFAULT_STREAM_MAX_WRITE_POINTS_PER_SECOND = 10 * 65536;
public static final long DEFAULT_ROUTE_TABLE_REFRESH_PERIOD_SECONDS = 10 * 60;

private List<Endpoint> endpoints;
private RpcOptions rpcOptions;
private RouterOptions routerOptions;
Expand Down Expand Up @@ -145,14 +150,14 @@ public static final class Builder {
private RpcOptions rpcOptions = RpcOptions.newDefault();
// GreptimeDB secure connection options
private TlsOptions tlsOptions;
private int writeMaxRetries = 1;
private int writeMaxRetries = DEFAULT_WRITE_MAX_RETRIES;
// Write flow limit: maximum number of data points in-flight.
private int maxInFlightWritePoints = 10 * 65536;
private int maxInFlightWritePoints = DEFAULT_MAX_IN_FLIGHT_WRITE_POINTS;
private LimitedPolicy writeLimitedPolicy = LimitedPolicy.defaultWriteLimitedPolicy();
private int defaultStreamMaxWritePointsPerSecond = 10 * 65536;
private int defaultStreamMaxWritePointsPerSecond = DEFAULT_DEFAULT_STREAM_MAX_WRITE_POINTS_PER_SECOND;
// Refresh frequency of route tables. The background refreshes all route tables periodically.
// If the value is less than or equal to 0, the route tables will not be refreshed.
private long routeTableRefreshPeriodSeconds = -1;
private long routeTableRefreshPeriodSeconds = DEFAULT_ROUTE_TABLE_REFRESH_PERIOD_SECONDS;
// Authentication information
private AuthInfo authInfo;
// The request router
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,19 +88,13 @@ public void testAllOptions() {

private Router<Void, Endpoint> createTestRouter() {
return new Router<Void, Endpoint>() {

@Override
public CompletableFuture<Endpoint> routeFor(Void request) {
return null;
}

@Override
public CompletableFuture<Boolean> refresh() {
return null;
}

@Override
public void onRefresh(List<Endpoint> endpoints) {}
public void onRefresh(List<Endpoint> activities, List<Endpoint> inactivities) {}
};
}
}

0 comments on commit 47349ff

Please sign in to comment.