Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: health check timeout #51

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions ingester-example/src/main/java/io/greptime/TestConnector.java
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,11 @@ public static GreptimeDB connectToDefaultDB() {
// periodically. By default, the route tables will not be refreshed.
.routeTableRefreshPeriodSeconds(-1)
// Optional, the default value is fine.
// Timeout for health check, if the health check is not completed within the specified time,
// the health check will fail.
// The default is 1000
.checkHealthTimeoutMs(1000)
// Optional, the default value is fine.
// Sets the request router, The internal default implementation works well.
// You don't need to set it unless you have special requirements.
.router(null)
Expand Down
26 changes: 14 additions & 12 deletions ingester-protocol/src/main/java/io/greptime/RouterClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -239,18 +239,20 @@ public String toString() {

@Override
public CompletableFuture<Map<Endpoint, Boolean>> checkHealth() {
Map<Endpoint, CompletableFuture<Boolean>> futures = this.opts.getEndpoints().stream()
.collect(Collectors.toMap(Function.identity(), endpoint -> {
HealthCheckRequest req = HealthCheckRequest.newBuilder().build();
return this.invoke(endpoint, req, Context.newDefault())
.thenApply(resp -> true)
.exceptionally(t -> false); // Handle failure and return false
}));

return CompletableFuture.allOf(futures.values().toArray(new CompletableFuture[0]))
.thenApply(
ok -> futures.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue()
.join())));
Map<Endpoint, CompletableFuture<Boolean>> futures =
this.opts.getEndpoints().stream().collect(Collectors.toMap(Function.identity(), this::doCheckHealth));

CompletableFuture<Void> all = CompletableFuture.allOf(futures.values().toArray(new CompletableFuture[0]));

return all.thenApply(ok -> futures.entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().join())));
}

private CompletableFuture<Boolean> doCheckHealth(Endpoint endpoint) {
HealthCheckRequest req = HealthCheckRequest.newBuilder().build();
return invoke(endpoint, req, Context.newDefault(), this.opts.getCheckHealthTimeoutMs())
.thenApply(resp -> true)
.exceptionally(t -> false); // Handle failure and return false
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public class GreptimeOptions implements Copiable<GreptimeOptions> {
public static final int DEFAULT_MAX_IN_FLIGHT_WRITE_POINTS = 10 * 65536;
public static final int DEFAULT_DEFAULT_STREAM_MAX_WRITE_POINTS_PER_SECOND = 10 * 65536;
public static final long DEFAULT_ROUTE_TABLE_REFRESH_PERIOD_SECONDS = 10 * 60;
public static final long DEFAULT_CHECK_HEALTH_TIMEOUT_MS = 1000;

private List<Endpoint> endpoints;
private RpcOptions rpcOptions;
Expand Down Expand Up @@ -158,6 +159,8 @@ public static final class Builder {
// Refresh frequency of route tables. The background refreshes all route tables periodically.
// If the value is less than or equal to 0, the route tables will not be refreshed.
private long routeTableRefreshPeriodSeconds = DEFAULT_ROUTE_TABLE_REFRESH_PERIOD_SECONDS;
// Timeout for health check
private long checkHealthTimeoutMs = DEFAULT_CHECK_HEALTH_TIMEOUT_MS;
// Authentication information
private AuthInfo authInfo;
// The request router
Expand Down Expand Up @@ -273,6 +276,19 @@ public Builder routeTableRefreshPeriodSeconds(long routeTableRefreshPeriodSecond
return this;
}

/**
* Timeout for health check. The default is 1000ms.
* If the health check is not completed within the specified time, the health
* check will fail.
*
* @param checkHealthTimeoutMs timeout for health check
* @return this builder
*/
public Builder checkHealthTimeoutMs(long checkHealthTimeoutMs) {
this.checkHealthTimeoutMs = checkHealthTimeoutMs;
return this;
}

/**
* Sets authentication information. If the DB is not required to authenticate,
* we can ignore this.
Expand Down Expand Up @@ -321,6 +337,7 @@ private RouterOptions routerOptions() {
routerOpts.setEndpoints(this.endpoints);
routerOpts.setRouter(this.router);
routerOpts.setRefreshPeriodSeconds(this.routeTableRefreshPeriodSeconds);
routerOpts.setCheckHealthTimeoutMs(this.checkHealthTimeoutMs);
return routerOpts;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ public class RouterOptions implements Copiable<RouterOptions> {
// all route tables periodically. If the value is less than or
// equal to 0, the route tables will not be refreshed.
private long refreshPeriodSeconds = -1;
// Timeout for health check
private long checkHealthTimeoutMs;
private Router<Void, Endpoint> router;

public RpcClient getRpcClient() {
Expand All @@ -60,6 +62,14 @@ public void setRefreshPeriodSeconds(long refreshPeriodSeconds) {
this.refreshPeriodSeconds = refreshPeriodSeconds;
}

public long getCheckHealthTimeoutMs() {
return checkHealthTimeoutMs;
}

public void setCheckHealthTimeoutMs(long checkHealthTimeoutMs) {
this.checkHealthTimeoutMs = checkHealthTimeoutMs;
}

public Router<Void, Endpoint> getRouter() {
return router;
}
Expand All @@ -74,15 +84,18 @@ public RouterOptions copy() {
opts.rpcClient = rpcClient;
opts.endpoints = this.endpoints;
opts.refreshPeriodSeconds = this.refreshPeriodSeconds;
opts.checkHealthTimeoutMs = this.checkHealthTimeoutMs;
opts.router = this.router;
return opts;
}

@Override
public String toString() {
return "RouterOptions{" + "endpoints="
return "RouterOptions{" + "rpcClient="
+ rpcClient + ", endpoints="
+ endpoints + ", refreshPeriodSeconds="
+ refreshPeriodSeconds + ", router="
+ refreshPeriodSeconds + ", checkHealthTimeoutMs="
+ checkHealthTimeoutMs + ", router="
+ router + '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public void testAllOptions() {
LimitedPolicy limitedPolicy = new LimitedPolicy.DiscardPolicy();
int defaultStreamMaxWritePointsPerSecond = 100000;
long routeTableRefreshPeriodSeconds = 99;
long checkHealthTimeoutMs = 1000;
AuthInfo authInfo = new AuthInfo("user", "password");
Router<Void, Endpoint> router = createTestRouter();
TlsOptions tlsOptions = new TlsOptions();
Expand All @@ -57,6 +58,7 @@ public void testAllOptions() {
.writeLimitedPolicy(limitedPolicy)
.defaultStreamMaxWritePointsPerSecond(defaultStreamMaxWritePointsPerSecond)
.routeTableRefreshPeriodSeconds(routeTableRefreshPeriodSeconds)
.checkHealthTimeoutMs(checkHealthTimeoutMs)
.authInfo(authInfo)
.router(router)
.build();
Expand All @@ -74,6 +76,7 @@ public void testAllOptions() {
routerOptions.getEndpoints().stream().map(Endpoint::toString).toArray());
Assert.assertEquals(router, routerOptions.getRouter());
Assert.assertEquals(routeTableRefreshPeriodSeconds, routerOptions.getRefreshPeriodSeconds());
Assert.assertEquals(checkHealthTimeoutMs, routerOptions.getCheckHealthTimeoutMs());

WriteOptions writeOptions = opts.getWriteOptions();
Assert.assertNotNull(writeOptions);
Expand Down