diff --git a/horaedb-grpc/src/main/java/org/apache/horaedb/rpc/GrpcClient.java b/horaedb-grpc/src/main/java/org/apache/horaedb/rpc/GrpcClient.java index db8331c..cd2f015 100644 --- a/horaedb-grpc/src/main/java/org/apache/horaedb/rpc/GrpcClient.java +++ b/horaedb-grpc/src/main/java/org/apache/horaedb/rpc/GrpcClient.java @@ -35,6 +35,7 @@ import io.grpc.stub.ClientCalls; import io.grpc.stub.StreamObserver; +import org.apache.horaedb.rpc.interceptors.AuthenticationInterceptor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -394,6 +395,8 @@ public void addInterceptor(final ClientInterceptor interceptor) { // Interceptors run in the reverse order in which they are added private void initInterceptors() { // the last one + addInterceptor(new AuthenticationInterceptor(opts.getUser(), opts.getPassword())); + addInterceptor(new MetricInterceptor()); // the second diff --git a/horaedb-grpc/src/main/java/org/apache/horaedb/rpc/interceptors/AuthenticationInterceptor.java b/horaedb-grpc/src/main/java/org/apache/horaedb/rpc/interceptors/AuthenticationInterceptor.java new file mode 100644 index 0000000..bf4ac22 --- /dev/null +++ b/horaedb-grpc/src/main/java/org/apache/horaedb/rpc/interceptors/AuthenticationInterceptor.java @@ -0,0 +1,44 @@ +/* + * Copyright 2023 CeresDB Project Authors. Licensed under Apache-2.0. + */ +package org.apache.horaedb.rpc.interceptors; + +import io.grpc.*; + +import java.util.Base64; + +public class AuthenticationInterceptor implements ClientInterceptor { + private final String token; + + public AuthenticationInterceptor(String user, String password) { + // Build token + this.token = Base64.getEncoder().encodeToString((user + ":" + password).getBytes()); + } + + @Override + public ClientCall interceptCall(final MethodDescriptor method, + final CallOptions callOpts, Channel next) { + + return new AuthenticationAttachingClientCall<>(next.newCall(method, callOpts), token); + } + + private static final class AuthenticationAttachingClientCall + extends ForwardingClientCall.SimpleForwardingClientCall { + + private final String token; + private static final String AUTHORIZATION_HEADER = "authorization"; + private static final String BASIC_PREFIX = "Basic "; + + private AuthenticationAttachingClientCall(ClientCall delegate, String token) { + super(delegate); + this.token = token; + } + + @Override + public void start(Listener responseListener, Metadata headers) { + headers.put(Metadata.Key.of(AUTHORIZATION_HEADER, Metadata.ASCII_STRING_MARSHALLER), BASIC_PREFIX + token); + super.start(responseListener, headers); + } + } + +} diff --git a/horaedb-protocol/src/main/java/org/apache/horaedb/options/HoraeDBOptions.java b/horaedb-protocol/src/main/java/org/apache/horaedb/options/HoraeDBOptions.java index 8bc525c..ae9113c 100644 --- a/horaedb-protocol/src/main/java/org/apache/horaedb/options/HoraeDBOptions.java +++ b/horaedb-protocol/src/main/java/org/apache/horaedb/options/HoraeDBOptions.java @@ -166,6 +166,9 @@ public static final class Builder { // The routeMode for sdk, only Proxy and Direct support now. private RouteMode routeMode; private String database; + private String user; + private String password; + // Asynchronous thread pool, which is used to handle various asynchronous tasks in the SDK. private Executor asyncWritePool; private Executor asyncReadPool; @@ -201,6 +204,13 @@ public Builder(Endpoint clusterAddress, RouteMode routeMode) { this.routeMode = routeMode; } + @SuppressWarnings("PMD") + public Builder authentication(final String user, final String password) { + this.user = user; + this.password = password; + return this; + } + /** * * @param database the database name @@ -366,6 +376,10 @@ public HoraeDBOptions build() { opts.asyncWritePool = asyncWritePool; opts.asyncReadPool = asyncReadPool; opts.rpcOptions = this.rpcOptions; + + opts.rpcOptions.setUser(this.user); + opts.rpcOptions.setPassword(this.password); + opts.routerOptions = new RouterOptions(); opts.routerOptions.setClusterAddress(this.clusterAddress); opts.routerOptions.setMaxCachedSize(this.routeTableMaxCachedSize); diff --git a/horaedb-rpc/src/main/java/org/apache/horaedb/rpc/RpcOptions.java b/horaedb-rpc/src/main/java/org/apache/horaedb/rpc/RpcOptions.java index 6014557..6dd4fbf 100644 --- a/horaedb-rpc/src/main/java/org/apache/horaedb/rpc/RpcOptions.java +++ b/horaedb-rpc/src/main/java/org/apache/horaedb/rpc/RpcOptions.java @@ -14,6 +14,16 @@ */ public class RpcOptions implements Copiable { + /** + * Username provided for authentication + */ + private String user; + + /** + * Password provided for authentication + */ + private String password; + /** * RPC request default timeout in milliseconds * Default: 10000(10s) @@ -96,6 +106,14 @@ public class RpcOptions implements Copiable { */ private long connectionMaxAgeMs = 0; + public String getUser() { + return user; + } + + public String getPassword() { + return password; + } + public int getDefaultRpcTimeout() { return defaultRpcTimeout; } @@ -180,6 +198,14 @@ public LimitKind getLimitKind() { return limitKind; } + public void setUser(String user) { + this.user = user; + } + + public void setPassword(String password) { + this.password = password; + } + public void setLimitKind(LimitKind limitKind) { this.limitKind = limitKind; } @@ -235,6 +261,8 @@ public void setLogOnLimitChange(boolean logOnLimitChange) { @Override public RpcOptions copy() { final RpcOptions opts = new RpcOptions(); + opts.user = this.user; + opts.password = this.password; opts.defaultRpcTimeout = this.defaultRpcTimeout; opts.rpcThreadPoolSize = this.rpcThreadPoolSize; opts.rpcThreadPoolQueueSize = this.rpcThreadPoolQueueSize; diff --git a/pom.xml b/pom.xml index 932d4a6..d52569a 100644 --- a/pom.xml +++ b/pom.xml @@ -400,6 +400,13 @@ **/sql/TokenMgrException.java + + + org.ow2.asm + asm + 9.2 + + com.mycila