From 2720a8396a8a269c4f14d31fe81aef831cf747fd Mon Sep 17 00:00:00 2001 From: Bharath Kumarasubramanian Date: Tue, 21 May 2024 17:11:35 -0700 Subject: [PATCH] [common][server] Bug fixes and clean of ACL handlers for GRPC (#982) - Consolidate boiler plate code in ACL handlers for access validations - Fix code path that throws exception in GRPC and close the calls with the appropriate error messages - Fix the flow that invokes onMessage twice in GRPC call intercept to prevent exceptions - Rename GrpcSSLUtils to GrpcUtils to accommodate general utility methods - Add reflection service to GRPC server to interact with Grpcurl - Reuse the header for skipping ACLs from router since the store ACL handling is performed at the router for non FC clients - [code readability] Move public methods together followed by overridable and protected methods followed by package private and private methods Co-authored-by: Bharath Kumarasubramanian --- build.gradle | 2 + .../transport/GrpcTransportClient.java | 6 +- .../venice/acl/handler/StoreAclHandler.java | 372 ++++++++---------- .../{GrpcSslUtils.java => GrpcUtils.java} | 91 +++-- .../venice/listener/ServerHandlerUtils.java | 13 + .../venice/grpc/GrpcSslUtilsTest.java | 37 -- .../linkedin/venice/grpc/GrpcUtilsTest.java | 74 ++++ .../venice/grpc/VeniceGrpcServer.java | 6 +- .../venice/listener/ServerAclHandler.java | 35 +- .../listener/ServerStoreAclHandler.java | 52 +-- .../listener/ServerStoreAclHandlerTest.java | 12 +- 11 files changed, 344 insertions(+), 356 deletions(-) rename internal/venice-common/src/main/java/com/linkedin/venice/grpc/{GrpcSslUtils.java => GrpcUtils.java} (68%) delete mode 100644 internal/venice-common/src/test/java/com/linkedin/venice/grpc/GrpcSslUtilsTest.java create mode 100644 internal/venice-common/src/test/java/com/linkedin/venice/grpc/GrpcUtilsTest.java diff --git a/build.gradle b/build.gradle index ccfff660c5..35bb1629ce 100644 --- a/build.gradle +++ b/build.gradle @@ -82,6 +82,7 @@ ext.libraries = [ fastUtil: 'it.unimi.dsi:fastutil:8.3.0', grpcNettyShaded: "io.grpc:grpc-netty-shaded:${grpcVersion}", grpcProtobuf: "io.grpc:grpc-protobuf:${grpcVersion}", + grpcServices: "io.grpc:grpc-services:${grpcVersion}", grpcStub: "io.grpc:grpc-stub:${grpcVersion}", hadoopCommon: "org.apache.hadoop:hadoop-common:${hadoopVersion}", helix: 'org.apache.helix:helix-core:1.1.0', @@ -266,6 +267,7 @@ subprojects { avroCompiler 'org.slf4j:slf4j-simple:1.7.32' implementation libraries.grpcNettyShaded implementation libraries.grpcProtobuf + implementation libraries.grpcServices implementation libraries.grpcStub compileOnly libraries.tomcatAnnotations } diff --git a/clients/venice-client/src/main/java/com/linkedin/venice/fastclient/transport/GrpcTransportClient.java b/clients/venice-client/src/main/java/com/linkedin/venice/fastclient/transport/GrpcTransportClient.java index bd0b6c4c54..ef3124dc5b 100644 --- a/clients/venice-client/src/main/java/com/linkedin/venice/fastclient/transport/GrpcTransportClient.java +++ b/clients/venice-client/src/main/java/com/linkedin/venice/fastclient/transport/GrpcTransportClient.java @@ -11,7 +11,7 @@ import com.linkedin.venice.exceptions.VeniceException; import com.linkedin.venice.fastclient.GrpcClientConfig; import com.linkedin.venice.grpc.GrpcErrorCodes; -import com.linkedin.venice.grpc.GrpcSslUtils; +import com.linkedin.venice.grpc.GrpcUtils; import com.linkedin.venice.protocols.VeniceClientRequest; import com.linkedin.venice.protocols.VeniceReadServiceGrpc; import com.linkedin.venice.protocols.VeniceServerResponse; @@ -64,8 +64,8 @@ public GrpcTransportClient(GrpcClientConfig grpcClientConfig) { private void initChannelCredentials() { try { TlsChannelCredentials.Builder tlsBuilder = TlsChannelCredentials.newBuilder() - .keyManager(GrpcSslUtils.getKeyManagers(sslFactory)) - .trustManager(GrpcSslUtils.getTrustManagers(sslFactory)); + .keyManager(GrpcUtils.getKeyManagers(sslFactory)) + .trustManager(GrpcUtils.getTrustManagers(sslFactory)); channelCredentials = tlsBuilder.build(); } catch (Exception e) { throw new VeniceClientException( diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/acl/handler/StoreAclHandler.java b/internal/venice-common/src/main/java/com/linkedin/venice/acl/handler/StoreAclHandler.java index 23b461dfda..354a440a5c 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/acl/handler/StoreAclHandler.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/acl/handler/StoreAclHandler.java @@ -1,5 +1,9 @@ package com.linkedin.venice.acl.handler; +import static com.linkedin.venice.grpc.GrpcUtils.*; +import static com.linkedin.venice.listener.ServerHandlerUtils.*; + +import com.google.common.annotations.VisibleForTesting; import com.linkedin.venice.acl.AclCreationDeletionListener; import com.linkedin.venice.acl.AclException; import com.linkedin.venice.acl.DynamicAccessController; @@ -11,7 +15,6 @@ import com.linkedin.venice.meta.Store; import com.linkedin.venice.protocols.VeniceClientRequest; import com.linkedin.venice.utils.NettyUtils; -import com.linkedin.venice.utils.SslUtils; import io.grpc.ForwardingServerCallListener; import io.grpc.Grpc; import io.grpc.Metadata; @@ -24,7 +27,6 @@ import io.netty.channel.SimpleChannelInboundHandler; import io.netty.handler.codec.http.HttpRequest; import io.netty.handler.codec.http.HttpResponseStatus; -import io.netty.handler.ssl.SslHandler; import io.netty.util.ReferenceCountUtil; import java.net.URI; import java.security.cert.X509Certificate; @@ -32,9 +34,10 @@ import java.util.HashSet; import java.util.Objects; import java.util.Set; +import java.util.function.BiConsumer; import java.util.stream.Collectors; import javax.net.ssl.SSLPeerUnverifiedException; -import javax.net.ssl.SSLSession; +import org.apache.commons.lang.StringUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -68,33 +71,6 @@ public StoreAclHandler(DynamicAccessController accessController, ReadOnlyStoreRe this.metadataRepository.registerStoreDataChangedListener(new AclCreationDeletionListener(accessController)); } - /** - * Extract the store name from the incoming resource name. - */ - protected String extractStoreName(String resourceName) { - return resourceName; - } - - protected X509Certificate extractClientCert(ChannelHandlerContext ctx) throws SSLPeerUnverifiedException { - SslHandler sslHandler = ctx.pipeline().get(SslHandler.class); - if (sslHandler == null) { - /** - * In HTTP/2, the SSLHandler is k parent channel pipeline and the child channels won't have the SSL Handler. - */ - sslHandler = ctx.channel().parent().pipeline().get(SslHandler.class); - } - return SslUtils.getX509Certificate(sslHandler.engine().getSession().getPeerCertificates()[0]); - } - - protected X509Certificate extractClientCert(ServerCall call) throws SSLPeerUnverifiedException { - SSLSession sslSession = call.getAttributes().get(Grpc.TRANSPORT_ATTR_SSL_SESSION); - if (sslSession == null) { - throw new SSLPeerUnverifiedException("SSL session not found"); - } - - return SslUtils.getX509Certificate(sslSession.getPeerCertificates()[0]); - } - /** * Verify if client has permission to access. * @@ -104,142 +80,46 @@ protected X509Certificate extractClientCert(ServerCall call) throws SSLPee */ @Override public void channelRead0(ChannelHandlerContext ctx, HttpRequest req) throws SSLPeerUnverifiedException { - X509Certificate clientCert = extractClientCert(ctx); - String uri = req.uri(); + String method = req.method().name(); + String client = ctx.channel().remoteAddress().toString(); // ip and port + BiConsumer errorHandler = + (status, errorMessage) -> NettyUtils.setupResponseAndFlush(status, errorMessage.getBytes(), false, ctx); + // Parse resource type and store name String[] requestParts = URI.create(uri).getPath().split("/"); - // invalid request if requestParts.length < 3 except for HEALTH check from venice client - if (requestParts.length < 3 - && !(requestParts.length == 2 && requestParts[1].toUpperCase().equals(QueryAction.HEALTH.toString()))) { - NettyUtils.setupResponseAndFlush( - HttpResponseStatus.BAD_REQUEST, - ("Invalid request uri: " + uri).getBytes(), - false, - ctx); + + if (isInvalidRequest(requestParts)) { + errorHandler.accept(HttpResponseStatus.BAD_REQUEST, "Invalid request uri: " + uri); return; } - try { - QueryAction queryAction = QueryAction.valueOf(requestParts[1].toUpperCase()); - if (QUERIES_TO_SKIP_ACL.contains(queryAction)) { - ReferenceCountUtil.retain(req); - ctx.fireChannelRead(req); - return; - } - } catch (IllegalArgumentException illegalArgumentException) { - throw new VeniceException("Unknown query action: " + requestParts[1]); + /* + * Skip request uri validations for store name and certificates due to special actions + * TODO: Identify validations for each query actions and have a flow to perform validations and actions based on + * query actions + */ + QueryAction queryAction = QueryAction.valueOf(requestParts[1].toUpperCase()); + if (QUERIES_TO_SKIP_ACL.contains(queryAction)) { + ReferenceCountUtil.retain(req); + ctx.fireChannelRead(req); + return; } - String storeName = extractStoreName(requestParts[2]); + X509Certificate clientCert = extractClientCert(ctx); + String resourceName = requestParts[2]; + String storeName = extractStoreName(resourceName); - String method = req.method().name(); try { - if (VeniceSystemStoreUtils.isSystemStore(storeName)) { - // Ignore ACL for Venice system stores. System stores should be world readable and only contain public - // information. + // Check ACL in case of non system store as system store contain public information + if (VeniceSystemStoreUtils.isSystemStore(storeName) + || hasAccess(client, uri, clientCert, storeName, method, errorHandler)) { ReferenceCountUtil.retain(req); ctx.fireChannelRead(req); - } else { - try { - /** - * TODO: Consider making this the first check, so that we optimize for the hot path. If rejected, then we - * could check whether the request is for a system store, METADATA, etc. - */ - if (accessController.hasAccess(clientCert, storeName, method)) { - // Client has permission. Proceed - ReferenceCountUtil.retain(req); - ctx.fireChannelRead(req); - } else { - // Fact: - // Request gets rejected. - // Possible Reasons: - // A. ACL not found. OR, - // B. ACL exists but caller does not have permission. - - String client = ctx.channel().remoteAddress().toString(); // ip and port - String errLine = String.format("%s requested %s %s", client, method, req.uri()); - - if (!accessController.isFailOpen() && !accessController.hasAcl(storeName)) { // short circuit, order matters - // Case A - // Conditions: - // 0. (outside) Store exists and is being access controlled. AND, - // 1. (left) The following policy is applied: if ACL not found, reject the request. AND, - // 2. (right) ACL not found. - // Result: - // Request is rejected by DynamicAccessController#hasAccess() - // Root cause: - // Requested resource exists but does not have ACL. - // Action: - // return 401 Unauthorized - LOGGER.warn("Requested store does not have ACL: {}", errLine); - LOGGER.debug( - "Existing stores: {}", - () -> metadataRepository.getAllStores() - .stream() - .map(Store::getName) - .sorted() - .collect(Collectors.toList())); - LOGGER.debug( - "Access-controlled stores: {}", - () -> accessController.getAccessControlledResources().stream().sorted().collect(Collectors.toList())); - NettyUtils.setupResponseAndFlush( - HttpResponseStatus.UNAUTHORIZED, - ("ACL not found!\n" + "Either it has not been created, or can not be loaded.\n" - + "Please create the ACL, or report the error if you know for sure that ACL exists for this store: " - + storeName).getBytes(), - false, - ctx); - } else { - // Case B - // Conditions: - // 1. Fail closed, and ACL found. OR, - // 2. Fail open, and ACL found. OR, - // 3. Fail open, and ACL not found. - // Analyses: - // (1) ACL exists, therefore result is determined by ACL. - // Since the request has been rejected, it must be due to lack of permission. - // (2) ACL exists, therefore result is determined by ACL. - // Since the request has been rejected, it must be due to lack of permission. - // (3) In such case, request would NOT be rejected in the first place, - // according to the definition of hasAccess() in DynamicAccessController interface. - // Contradiction to the fact, therefore this case is impossible. - // Root cause: - // Caller does not have permission to access the resource. - // Action: - // return 403 Forbidden - LOGGER.debug("Unauthorized access rejected: {}", errLine); - NettyUtils.setupResponseAndFlush( - HttpResponseStatus.FORBIDDEN, - ("Access denied!\n" - + "If you are the store owner, add this application (or your own username for Venice shell client) to the store ACL.\n" - + "Otherwise, ask the store owner for read permission.").getBytes(), - false, - ctx); - } - } - } catch (AclException e) { - String client = ctx.channel().remoteAddress().toString(); // ip and port - String errLine = String.format("%s requested %s %s", client, method, req.uri()); - - if (accessController.isFailOpen()) { - LOGGER.warn("Exception occurred! Access granted: {} {}", errLine, e); - ReferenceCountUtil.retain(req); - ctx.fireChannelRead(req); - } else { - LOGGER.warn("Exception occurred! Access rejected: {} {}", errLine, e); - NettyUtils.setupResponseAndFlush(HttpResponseStatus.FORBIDDEN, new byte[0], false, ctx); - } - } } } catch (VeniceNoStoreException noStoreException) { - String client = ctx.channel().remoteAddress().toString(); // ip and port LOGGER.debug("Requested store does not exist: {} requested {} {}", client, method, req.uri()); - NettyUtils.setupResponseAndFlush( - HttpResponseStatus.BAD_REQUEST, - ("Invalid Venice store name: " + storeName).getBytes(), - false, - ctx); + errorHandler.accept(HttpResponseStatus.BAD_REQUEST, "Invalid Venice store name: " + storeName); } } @@ -251,74 +131,158 @@ public ServerCall.Listener interceptCall( return new ForwardingServerCallListener.SimpleForwardingServerCallListener(next.startCall(call, headers)) { @Override public void onMessage(ReqT message) { - X509Certificate clientCert; - try { - clientCert = extractClientCert(call); - } catch (SSLPeerUnverifiedException e) { - throw new VeniceException(e); - } VeniceClientRequest request = (VeniceClientRequest) message; - String storeName = extractStoreName(request.getResourceName()); String method = request.getMethod(); - if (storeName.equals("") || method.equals("")) { - call.close(Status.INVALID_ARGUMENT.withDescription("Invalid request"), new Metadata()); - } + BiConsumer grpcCloseConsumer = call::close; + BiConsumer errorHandler = ((httpResponseStatus, s) -> grpcCloseConsumer + .accept(httpResponseStatusToGrpcStatus(httpResponseStatus, s), headers)); - if (VeniceSystemStoreUtils.isSystemStore(storeName)) { - super.onMessage(message); + if (StringUtils.isEmpty(storeName) || StringUtils.isEmpty(method)) { + LOGGER.error("Invalid store name {} or method {}", storeName, method); + grpcCloseConsumer.accept(Status.INVALID_ARGUMENT.withDescription("Invalid request"), headers); return; } try { - if (accessController.hasAccess(clientCert, storeName, method)) { - super.onMessage(message); - } else { - String client = - Objects.requireNonNull(call.getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR)).toString(); - String errLine = String.format("%s requested %s %s", client, method, storeName); - - if (!accessController.isFailOpen() && !accessController.hasAcl(storeName)) { - LOGGER.warn("Requested store does not have ACL: {}", errLine); - LOGGER.debug( - "Existing stores: {}", - () -> metadataRepository.getAllStores() - .stream() - .map(Store::getName) - .sorted() - .collect(Collectors.toList())); - LOGGER.debug( - "Access-controlled stores: {}", - () -> accessController.getAccessControlledResources().stream().sorted().collect(Collectors.toList())); - String responseMessage = "ACL not found!\n" + "Either it has not been created, or can not be loaded.\n" - + "Please create the ACL, or report the error if you know for sure that ACL exists for this store: " - + storeName; - call.close(Status.PERMISSION_DENIED.withDescription(responseMessage), new Metadata()); - } else { - String responseMessage = "Access denied!\n" - + "If you are the store owner, add this application (or your own username for Venice shell client) to the store ACL.\n" - + "Otherwise, ask the store owner for read permission."; - call.close(Status.PERMISSION_DENIED.withDescription(responseMessage), new Metadata()); - } - return; - } - } catch (AclException e) { + X509Certificate clientCert = extractGrpcClientCert(call); String client = Objects.requireNonNull(call.getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR)).toString(); - String errLine = String.format("%s requested %s %s", client, method, storeName); - if (accessController.isFailOpen()) { - LOGGER.warn("Exception occurred! Access granted: {} {}", errLine, e); + if (VeniceSystemStoreUtils.isSystemStore(storeName) + || hasAccess(client, call.getAuthority(), clientCert, storeName, method, errorHandler)) { + LOGGER.info("Requested principal has access to resource. Processing request"); super.onMessage(message); - } else { - LOGGER.warn("Exception occurred! Access rejected: {} {}", errLine, e); - call.close(Status.PERMISSION_DENIED.withDescription("Access denied"), new Metadata()); } - return; + } catch (SSLPeerUnverifiedException e) { + LOGGER.error("Cannot verify the certificate.", e); + grpcCloseConsumer.accept(Status.UNAUTHENTICATED.withDescription("Invalid certificate"), headers); + } catch (VeniceException e) { + LOGGER.error("Cannot process request successfully due to", e); + grpcCloseConsumer.accept(Status.INTERNAL.withDescription(e.getMessage()), headers); } - - super.onMessage(message); } }; } + + /** + * Extract the store name from the incoming resource name. + */ + protected String extractStoreName(String resourceName) { + return resourceName; + } + + @VisibleForTesting + boolean isInvalidRequest(String[] requestParts) { + int partsLength = requestParts.length; + boolean invalidRequest = false; + + // Only for HEALTH queries, parts length can be 2 + if (partsLength == 2) { + invalidRequest = !requestParts[1].equalsIgnoreCase(QueryAction.HEALTH.name()); + } else if (partsLength < 3) { // invalid request if parts length < 3 except health queries + invalidRequest = true; + } else { // throw exception to retain current behavior for invalid query actions + try { + QueryAction.valueOf(requestParts[1].toUpperCase()); + } catch (IllegalArgumentException exception) { + throw new VeniceException("Unknown query action: " + requestParts[1]); + } + } + + return invalidRequest; + } + + @VisibleForTesting + boolean hasAccess( + String client, + String uri, + X509Certificate clientCert, + String storeName, + String method, + BiConsumer errorHandler) { + boolean allowRequest = false; + try { + /** + * TODO: Consider making this the first check, so that we optimize for the hot path. If rejected, then we + * could check whether the request is for a system store, METADATA, etc. + */ + allowRequest = accessController.hasAccess(clientCert, storeName, method); + if (!allowRequest) { + // Fact: + // Request gets rejected. + // Possible Reasons: + // A. ACL not found. OR, + // B. ACL exists but caller does not have permission. + + String errLine = String.format("%s requested %s %s", client, method, uri); + + if (!accessController.isFailOpen() && !accessController.hasAcl(storeName)) { // short circuit, order matters + // Case A + // Conditions: + // 0. (outside) Store exists and is being access controlled. AND, + // 1. (left) The following policy is applied: if ACL not found, reject the request. AND, + // 2. (right) ACL not found. + // Result: + // Request is rejected by DynamicAccessController#hasAccess() + // Root cause: + // Requested resource exists but does not have ACL. + // Action: + // return 401 Unauthorized + LOGGER.warn("Requested store does not have ACL: {}", errLine); + LOGGER.debug( + "Existing stores: {}", + () -> metadataRepository.getAllStores() + .stream() + .map(Store::getName) + .sorted() + .collect(Collectors.toList())); + LOGGER.debug( + "Access-controlled stores: {}", + () -> accessController.getAccessControlledResources().stream().sorted().collect(Collectors.toList())); + errorHandler.accept( + HttpResponseStatus.UNAUTHORIZED, + "ACL not found!\n" + "Either it has not been created, or can not be loaded.\n" + + "Please create the ACL, or report the error if you know for sure that ACL exists for this store: " + + storeName); + } else { + // Case B + // Conditions: + // 1. Fail closed, and ACL found. OR, + // 2. Fail open, and ACL found. OR, + // 3. Fail open, and ACL not found. + // Analyses: + // (1) ACL exists, therefore result is determined by ACL. + // Since the request has been rejected, it must be due to lack of permission. + // (2) ACL exists, therefore result is determined by ACL. + // Since the request has been rejected, it must be due to lack of permission. + // (3) In such case, request would NOT be rejected in the first place, + // according to the definition of hasAccess() in DynamicAccessController interface. + // Contradiction to the fact, therefore this case is impossible. + // Root cause: + // Caller does not have permission to access the resource. + // Action: + // return 403 Forbidden + LOGGER.debug("Unauthorized access rejected: {}", errLine); + errorHandler.accept( + HttpResponseStatus.FORBIDDEN, + "Access denied!\n" + + "If you are the store owner, add this application (or your own username for Venice shell client) to the store ACL.\n" + + "Otherwise, ask the store owner for read permission."); + } + } + } catch (AclException e) { + String errLine = String.format("%s requested %s %s", client, method, uri); + + if (accessController.isFailOpen()) { + LOGGER.warn("Exception occurred! Access granted: {} {}", errLine, e); + allowRequest = true; + } else { + LOGGER.warn("Exception occurred! Access rejected: {} {}", errLine, e); + errorHandler.accept(HttpResponseStatus.FORBIDDEN, "Access denied!"); + } + } + + return allowRequest; + } } diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/grpc/GrpcSslUtils.java b/internal/venice-common/src/main/java/com/linkedin/venice/grpc/GrpcUtils.java similarity index 68% rename from internal/venice-common/src/main/java/com/linkedin/venice/grpc/GrpcSslUtils.java rename to internal/venice-common/src/main/java/com/linkedin/venice/grpc/GrpcUtils.java index 8bd44f7a64..44020f9eed 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/grpc/GrpcSslUtils.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/grpc/GrpcUtils.java @@ -1,7 +1,13 @@ package com.linkedin.venice.grpc; +import com.linkedin.venice.exceptions.VeniceException; import com.linkedin.venice.security.SSLConfig; import com.linkedin.venice.security.SSLFactory; +import com.linkedin.venice.utils.SslUtils; +import io.grpc.Grpc; +import io.grpc.ServerCall; +import io.grpc.Status; +import io.netty.handler.codec.http.HttpResponseStatus; import java.io.IOException; import java.io.InputStream; import java.nio.file.Files; @@ -11,13 +17,57 @@ import java.security.NoSuchAlgorithmException; import java.security.UnrecoverableKeyException; import java.security.cert.CertificateException; +import java.security.cert.X509Certificate; import javax.net.ssl.KeyManager; import javax.net.ssl.KeyManagerFactory; +import javax.net.ssl.SSLPeerUnverifiedException; +import javax.net.ssl.SSLSession; import javax.net.ssl.TrustManager; import javax.net.ssl.TrustManagerFactory; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; -public class GrpcSslUtils { +public final class GrpcUtils { + private static final Logger LOGGER = LogManager.getLogger(GrpcUtils.class); + + public static KeyManager[] getKeyManagers(SSLFactory sslFactory) + throws UnrecoverableKeyException, CertificateException, KeyStoreException, IOException, NoSuchAlgorithmException { + String algorithm = KeyManagerFactory.getDefaultAlgorithm(); + String password = sslFactory.getSSLConfig().getKeyStorePassword(); + KeyStore keyStore = loadKeyStore(sslFactory, sslFactory.getSSLConfig().getKeyStoreType()); + KeyManagerFactory keyManagerFactory = KeyManagerFactory.getInstance(algorithm); + keyManagerFactory.init(keyStore, password.toCharArray()); + return keyManagerFactory.getKeyManagers(); + } + + public static TrustManager[] getTrustManagers(SSLFactory sslFactory) + throws CertificateException, KeyStoreException, IOException, NoSuchAlgorithmException { + String algorithm = TrustManagerFactory.getDefaultAlgorithm(); + KeyStore trustStore = loadTrustStore(sslFactory, sslFactory.getSSLConfig().getTrustStoreType()); + TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance(algorithm); + trustManagerFactory.init(trustStore); + return trustManagerFactory.getTrustManagers(); + } + + public static Status httpResponseStatusToGrpcStatus(HttpResponseStatus status, String errorMessage) { + if (status.equals(HttpResponseStatus.FORBIDDEN) || status.equals(HttpResponseStatus.UNAUTHORIZED)) { + return Status.PERMISSION_DENIED.withDescription(errorMessage); + } + + return Status.UNKNOWN.withDescription(errorMessage); + } + + public static X509Certificate extractGrpcClientCert(ServerCall call) throws SSLPeerUnverifiedException { + SSLSession sslSession = call.getAttributes().get(Grpc.TRANSPORT_ATTR_SSL_SESSION); + if (sslSession == null) { + LOGGER.error("Cannot obtain SSLSession for authority {}", call.getAuthority()); + throw new VeniceException("Failed to obtain SSL session"); + } + + return SslUtils.getX509Certificate(sslSession.getPeerCertificates()[0]); + } + private static KeyStore loadKeyStore(SSLFactory sslFactory, String type) throws CertificateException, KeyStoreException, IOException, NoSuchAlgorithmException { SSLConfig config = sslFactory.getSSLConfig(); @@ -42,43 +92,4 @@ private static KeyStore loadStore(String path, char[] password, String type) } return keyStore; } - - public static KeyManager[] getKeyManagers(SSLFactory sslFactory) - throws UnrecoverableKeyException, CertificateException, KeyStoreException, IOException, NoSuchAlgorithmException { - String algorithm = KeyManagerFactory.getDefaultAlgorithm(); - return getKeyManagers(sslFactory, algorithm); - } - - public static TrustManager[] getTrustManagers(SSLFactory sslFactory) - throws CertificateException, KeyStoreException, IOException, NoSuchAlgorithmException { - String algorithm = TrustManagerFactory.getDefaultAlgorithm(); - return getTrustManagers(sslFactory, algorithm); - } - - public static KeyManager[] getKeyManagers(SSLFactory sslFactory, String algorithm) - throws CertificateException, KeyStoreException, IOException, NoSuchAlgorithmException, UnrecoverableKeyException { - String password = sslFactory.getSSLConfig().getKeyStorePassword(); - KeyStore keyStore = loadKeyStore(sslFactory, sslFactory.getSSLConfig().getKeyStoreType()); - return createKeyManagers(keyStore, algorithm, password); - } - - public static TrustManager[] getTrustManagers(SSLFactory sslFactory, String algorithm) - throws CertificateException, KeyStoreException, IOException, NoSuchAlgorithmException { - KeyStore trustStore = loadTrustStore(sslFactory, sslFactory.getSSLConfig().getTrustStoreType()); - return createTrustManagers(trustStore, algorithm); - } - - private static KeyManager[] createKeyManagers(KeyStore keyStore, String algorithm, String password) - throws NoSuchAlgorithmException, UnrecoverableKeyException, KeyStoreException { - KeyManagerFactory keyManagerFactory = KeyManagerFactory.getInstance(algorithm); - keyManagerFactory.init(keyStore, password.toCharArray()); - return keyManagerFactory.getKeyManagers(); - } - - private static TrustManager[] createTrustManagers(KeyStore trustStore, String algorithm) - throws NoSuchAlgorithmException, KeyStoreException { - TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance(algorithm); - trustManagerFactory.init(trustStore); - return trustManagerFactory.getTrustManagers(); - } } diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/listener/ServerHandlerUtils.java b/internal/venice-common/src/main/java/com/linkedin/venice/listener/ServerHandlerUtils.java index f7fcc659e8..f1c14b8436 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/listener/ServerHandlerUtils.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/listener/ServerHandlerUtils.java @@ -1,7 +1,11 @@ package com.linkedin.venice.listener; +import com.linkedin.venice.exceptions.VeniceException; +import com.linkedin.venice.utils.SslUtils; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.ssl.SslHandler; +import java.security.cert.X509Certificate; +import javax.net.ssl.SSLPeerUnverifiedException; public class ServerHandlerUtils { @@ -21,4 +25,13 @@ public static SslHandler extractSslHandler(ChannelHandlerContext ctx) { } return sslHandler; } + + public static X509Certificate extractClientCert(ChannelHandlerContext ctx) throws SSLPeerUnverifiedException { + SslHandler sslHandler = ServerHandlerUtils.extractSslHandler(ctx); + if (sslHandler != null) { + return SslUtils.getX509Certificate(sslHandler.engine().getSession().getPeerCertificates()[0]); + } else { + throw new VeniceException("Failed to extract client cert from the incoming request"); + } + } } diff --git a/internal/venice-common/src/test/java/com/linkedin/venice/grpc/GrpcSslUtilsTest.java b/internal/venice-common/src/test/java/com/linkedin/venice/grpc/GrpcSslUtilsTest.java deleted file mode 100644 index 583365773d..0000000000 --- a/internal/venice-common/src/test/java/com/linkedin/venice/grpc/GrpcSslUtilsTest.java +++ /dev/null @@ -1,37 +0,0 @@ -package com.linkedin.venice.grpc; - -import static org.testng.Assert.assertNotNull; -import static org.testng.Assert.assertTrue; - -import com.linkedin.venice.security.SSLFactory; -import com.linkedin.venice.utils.SslUtils; -import javax.net.ssl.KeyManager; -import javax.net.ssl.TrustManager; -import org.testng.annotations.BeforeTest; -import org.testng.annotations.Test; - - -public class GrpcSslUtilsTest { - private static SSLFactory sslFactory; - - @BeforeTest - public static void setup() { - sslFactory = SslUtils.getVeniceLocalSslFactory(); - } - - @Test - public void testGetTrustManagers() throws Exception { - TrustManager[] trustManagers = GrpcSslUtils.getTrustManagers(sslFactory); - - assertNotNull(trustManagers); - assertTrue(trustManagers.length > 0); - } - - @Test - public void testGetKeyManagers() throws Exception { - KeyManager[] keyManagers = GrpcSslUtils.getKeyManagers(sslFactory); - - assertNotNull(keyManagers); - assertTrue(keyManagers.length > 0); - } -} diff --git a/internal/venice-common/src/test/java/com/linkedin/venice/grpc/GrpcUtilsTest.java b/internal/venice-common/src/test/java/com/linkedin/venice/grpc/GrpcUtilsTest.java new file mode 100644 index 0000000000..e877b4e480 --- /dev/null +++ b/internal/venice-common/src/test/java/com/linkedin/venice/grpc/GrpcUtilsTest.java @@ -0,0 +1,74 @@ +package com.linkedin.venice.grpc; + +import static org.mockito.Mockito.*; +import static org.testng.Assert.*; + +import com.linkedin.venice.security.SSLFactory; +import com.linkedin.venice.utils.SslUtils; +import io.grpc.Status; +import io.netty.handler.codec.http.HttpResponseStatus; +import javax.net.ssl.KeyManager; +import javax.net.ssl.TrustManager; +import org.testng.annotations.BeforeTest; +import org.testng.annotations.Test; + + +public class GrpcUtilsTest { + private static SSLFactory sslFactory; + + @BeforeTest + public static void setup() { + sslFactory = SslUtils.getVeniceLocalSslFactory(); + } + + @Test + public void testGetTrustManagers() throws Exception { + TrustManager[] trustManagers = GrpcUtils.getTrustManagers(sslFactory); + + assertNotNull(trustManagers); + assertTrue(trustManagers.length > 0); + } + + @Test + public void testGetKeyManagers() throws Exception { + KeyManager[] keyManagers = GrpcUtils.getKeyManagers(sslFactory); + + assertNotNull(keyManagers); + assertTrue(keyManagers.length > 0); + } + + @Test + public void testHttpResponseStatusToGrpcStatus() { + final String permissionDeniedErrorMessage = "permission denied error message"; + Status grpcStatus = + GrpcUtils.httpResponseStatusToGrpcStatus(HttpResponseStatus.FORBIDDEN, permissionDeniedErrorMessage); + + assertEquals( + grpcStatus.getCode(), + Status.PERMISSION_DENIED.getCode(), + "Mismatch in GRPC status for the http response status permission denied"); + assertEquals( + permissionDeniedErrorMessage, + grpcStatus.getDescription(), + "Mismatch in error description for the mapped grpc status"); + + final String unauthorizedErrorMessage = "unauthorized error message"; + grpcStatus = GrpcUtils.httpResponseStatusToGrpcStatus(HttpResponseStatus.UNAUTHORIZED, unauthorizedErrorMessage); + assertEquals( + grpcStatus.getCode(), + Status.PERMISSION_DENIED.getCode(), + "Mismatch in GRPC status for the http response status unauthorized"); + assertEquals( + unauthorizedErrorMessage, + grpcStatus.getDescription(), + "Mismatch in error description for the mapped grpc status"); + + final String badRequestErrorMessage = "bad request error message"; + grpcStatus = GrpcUtils.httpResponseStatusToGrpcStatus(HttpResponseStatus.BAD_REQUEST, badRequestErrorMessage); + assertEquals(grpcStatus.getCode(), Status.UNKNOWN.getCode(), "Expected unknown status for everything else"); + assertEquals( + badRequestErrorMessage, + grpcStatus.getDescription(), + "Mismatch in error description for the mapped grpc status"); + } +} diff --git a/services/venice-server/src/main/java/com/linkedin/venice/grpc/VeniceGrpcServer.java b/services/venice-server/src/main/java/com/linkedin/venice/grpc/VeniceGrpcServer.java index 041343db1d..3e6fc3546a 100644 --- a/services/venice-server/src/main/java/com/linkedin/venice/grpc/VeniceGrpcServer.java +++ b/services/venice-server/src/main/java/com/linkedin/venice/grpc/VeniceGrpcServer.java @@ -8,6 +8,7 @@ import io.grpc.ServerCredentials; import io.grpc.ServerInterceptors; import io.grpc.TlsServerCredentials; +import io.grpc.protobuf.services.ProtoReflectionService; import java.io.IOException; import java.security.KeyStoreException; import java.security.NoSuchAlgorithmException; @@ -40,6 +41,7 @@ public VeniceGrpcServer(VeniceGrpcServerConfig config) { server = Grpc.newServerBuilderForPort(config.getPort(), credentials) .executor(executor) // TODO: experiment with different executors for best performance .addService(ServerInterceptors.intercept(config.getService(), config.getInterceptors())) + .addService(ProtoReflectionService.newInstance()) .build(); } @@ -58,8 +60,8 @@ private void initServerCredentials() { try { credentials = TlsServerCredentials.newBuilder() - .keyManager(GrpcSslUtils.getKeyManagers(sslFactory)) - .trustManager(GrpcSslUtils.getTrustManagers(sslFactory)) + .keyManager(GrpcUtils.getKeyManagers(sslFactory)) + .trustManager(GrpcUtils.getTrustManagers(sslFactory)) .clientAuth(TlsServerCredentials.ClientAuth.REQUIRE) .build(); } catch (UnrecoverableKeyException | KeyStoreException | CertificateException | IOException diff --git a/services/venice-server/src/main/java/com/linkedin/venice/listener/ServerAclHandler.java b/services/venice-server/src/main/java/com/linkedin/venice/listener/ServerAclHandler.java index 86bfab653d..2b408b71ed 100644 --- a/services/venice-server/src/main/java/com/linkedin/venice/listener/ServerAclHandler.java +++ b/services/venice-server/src/main/java/com/linkedin/venice/listener/ServerAclHandler.java @@ -1,14 +1,14 @@ package com.linkedin.venice.listener; +import static com.linkedin.venice.grpc.GrpcUtils.*; +import static com.linkedin.venice.listener.ServerHandlerUtils.*; import static io.grpc.Metadata.ASCII_STRING_MARSHALLER; import static io.grpc.Metadata.Key; import com.linkedin.venice.acl.StaticAccessController; import com.linkedin.venice.acl.VeniceComponent; -import com.linkedin.venice.exceptions.VeniceException; import com.linkedin.venice.protocols.VeniceClientRequest; import com.linkedin.venice.utils.NettyUtils; -import com.linkedin.venice.utils.SslUtils; import io.grpc.ForwardingServerCallListener; import io.grpc.Grpc; import io.grpc.Metadata; @@ -21,13 +21,11 @@ import io.netty.channel.SimpleChannelInboundHandler; import io.netty.handler.codec.http.HttpRequest; import io.netty.handler.codec.http.HttpResponseStatus; -import io.netty.handler.ssl.SslHandler; import io.netty.util.AttributeKey; import io.netty.util.ReferenceCountUtil; import java.security.cert.X509Certificate; import java.util.Objects; import javax.net.ssl.SSLPeerUnverifiedException; -import javax.net.ssl.SSLSession; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -42,9 +40,9 @@ */ @ChannelHandler.Sharable public class ServerAclHandler extends SimpleChannelInboundHandler implements ServerInterceptor { + public static final String SERVER_ACL_APPROVED = "SERVER_ACL_APPROVED_ATTRIBUTE_KEY"; public static final AttributeKey SERVER_ACL_APPROVED_ATTRIBUTE_KEY = - AttributeKey.valueOf("SERVER_ACL_APPROVED_ATTRIBUTE_KEY"); - public static final String GRPC_SERVER_ACL_APPROVED_ATTRIBUTE_KEY = "GRPC_SERVER_ACL_APPROVED_ATTRIBUTE_KEY"; + AttributeKey.valueOf(SERVER_ACL_APPROVED); private static final Logger LOGGER = LogManager.getLogger(ServerAclHandler.class); private final StaticAccessController accessController; private final boolean failOnAccessRejection; @@ -67,12 +65,7 @@ public ServerAclHandler(StaticAccessController accessController, boolean failOnA */ @Override public void channelRead0(ChannelHandlerContext ctx, HttpRequest req) throws SSLPeerUnverifiedException { - SslHandler sslHandler = ServerHandlerUtils.extractSslHandler(ctx); - if (sslHandler == null) { - throw new VeniceException("Failed to extract ssl handler from the incoming request"); - } - - X509Certificate clientCert = SslUtils.getX509Certificate(sslHandler.engine().getSession().getPeerCertificates()[0]); + X509Certificate clientCert = extractClientCert(ctx); String method = req.method().name(); boolean accessApproved = accessController.hasAccess(clientCert, VeniceComponent.SERVER, method); @@ -98,23 +91,14 @@ public ServerCall.Listener interceptCall( return new ForwardingServerCallListener.SimpleForwardingServerCallListener(next.startCall(call, headers)) { @Override public void onMessage(ReqT message) { - SSLSession sslSession; String method = ((VeniceClientRequest) message).getMethod(); - String clientAddr = Objects.requireNonNull(call.getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR)).toString(); - Key accessApprovedKey = Key.of(GRPC_SERVER_ACL_APPROVED_ATTRIBUTE_KEY, ASCII_STRING_MARSHALLER); + Key accessApprovedKey = Key.of(SERVER_ACL_APPROVED, ASCII_STRING_MARSHALLER); boolean accessApproved = false; - try { - sslSession = call.getAttributes().get(Grpc.TRANSPORT_ATTR_SSL_SESSION); - if (sslSession == null) { - throw new VeniceException("Failed to extract ssl session from the incoming request"); - } - - X509Certificate clientCert = SslUtils.getX509Certificate(sslSession.getPeerCertificates()[0]); - + X509Certificate clientCert = extractGrpcClientCert(call); accessApproved = accessController.hasAccess(clientCert, VeniceComponent.SERVER, method); headers.put(accessApprovedKey, Boolean.toString(accessApproved)); } catch (SSLPeerUnverifiedException e) { @@ -125,10 +109,7 @@ public void onMessage(ReqT message) { if (accessApproved || !failOnAccessRejection) { super.onMessage(message); } else { - if (LOGGER.isDebugEnabled()) { - String errLine = String.format("%s requested %s", clientAddr, method); - LOGGER.debug("Unauthorized access rejected: {}", errLine); - } + LOGGER.debug("Unauthorized access rejected: {} requested {}", clientAddr, method); call.close(Status.PERMISSION_DENIED, headers); } } diff --git a/services/venice-server/src/main/java/com/linkedin/venice/listener/ServerStoreAclHandler.java b/services/venice-server/src/main/java/com/linkedin/venice/listener/ServerStoreAclHandler.java index 071758c125..5ac3ca446d 100644 --- a/services/venice-server/src/main/java/com/linkedin/venice/listener/ServerStoreAclHandler.java +++ b/services/venice-server/src/main/java/com/linkedin/venice/listener/ServerStoreAclHandler.java @@ -2,19 +2,15 @@ import com.linkedin.venice.acl.DynamicAccessController; import com.linkedin.venice.acl.handler.StoreAclHandler; -import com.linkedin.venice.exceptions.VeniceException; import com.linkedin.venice.meta.ReadOnlyStoreRepository; import com.linkedin.venice.meta.Version; -import com.linkedin.venice.utils.SslUtils; import io.grpc.Metadata; import io.grpc.ServerCall; import io.grpc.ServerCallHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.http.HttpRequest; -import io.netty.handler.ssl.SslHandler; import io.netty.util.Attribute; import io.netty.util.ReferenceCountUtil; -import java.security.cert.X509Certificate; import javax.net.ssl.SSLPeerUnverifiedException; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -25,7 +21,6 @@ * 1. Access from Router, and Router request will be validated in {@link ServerAclHandler}, and {@link ServerStoreAclHandler} will be a quick pass-through. * 2. Access from Client directly, and {@link ServerAclHandler} will deny the request, and {@link ServerStoreAclHandler} will * validate the request in store-level, which is exactly same as the access control behavior in Router. - * * If both of them fail, the request will be rejected. */ public class ServerStoreAclHandler extends StoreAclHandler { @@ -35,14 +30,6 @@ public ServerStoreAclHandler(DynamicAccessController accessController, ReadOnlyS super(accessController, metadataRepository); } - /** - * In Venice Server, the resource name is actually a Kafka topic name. - */ - @Override - protected String extractStoreName(String resourceName) { - return Version.parseStoreFromKafkaTopicName(resourceName); - } - @Override public void channelRead0(ChannelHandlerContext ctx, HttpRequest req) throws SSLPeerUnverifiedException { if (checkWhetherAccessHasAlreadyApproved(ctx)) { @@ -57,15 +44,27 @@ public void channelRead0(ChannelHandlerContext ctx, HttpRequest req) throws SSLP } @Override - protected X509Certificate extractClientCert(ChannelHandlerContext ctx) throws SSLPeerUnverifiedException { - SslHandler sslHandler = ServerHandlerUtils.extractSslHandler(ctx); - if (sslHandler != null) { - return SslUtils.getX509Certificate(sslHandler.engine().getSession().getPeerCertificates()[0]); + public ServerCall.Listener interceptCall( + ServerCall call, + Metadata headers, + ServerCallHandler next) { + if (checkWhetherAccessHasAlreadyApproved(headers)) { + LOGGER.debug("Access already approved by ServerAclHandler"); + return next.startCall(call, headers); } else { - throw new VeniceException("Failed to extract client cert from the incoming request"); + LOGGER.debug("Delegating access check to StoreAclHandler"); + return super.interceptCall(call, headers, next); } } + /** + * In Venice Server, the resource name is actually a Kafka topic name. + */ + @Override + protected String extractStoreName(String resourceName) { + return Version.parseStoreFromKafkaTopicName(resourceName); + } + protected static boolean checkWhetherAccessHasAlreadyApproved(ChannelHandlerContext ctx) { Attribute serverAclApprovedAttr = ctx.channel().attr(ServerAclHandler.SERVER_ACL_APPROVED_ATTRIBUTE_KEY); return Boolean.TRUE.equals(serverAclApprovedAttr.get()); @@ -73,21 +72,6 @@ protected static boolean checkWhetherAccessHasAlreadyApproved(ChannelHandlerCont protected static boolean checkWhetherAccessHasAlreadyApproved(Metadata headers) { return Boolean.parseBoolean( - headers.get( - Metadata.Key - .of(ServerAclHandler.GRPC_SERVER_ACL_APPROVED_ATTRIBUTE_KEY, Metadata.ASCII_STRING_MARSHALLER))); - } - - @Override - public ServerCall.Listener interceptCall( - ServerCall call, - Metadata headers, - ServerCallHandler next) { - boolean checkWhetherAccessHasAlreadyApproved = checkWhetherAccessHasAlreadyApproved(headers); - if (checkWhetherAccessHasAlreadyApproved) { - return next.startCall(call, headers); - } else { - return super.interceptCall(call, headers, next); - } + headers.get(Metadata.Key.of(ServerAclHandler.SERVER_ACL_APPROVED, Metadata.ASCII_STRING_MARSHALLER))); } } diff --git a/services/venice-server/src/test/java/com/linkedin/venice/listener/ServerStoreAclHandlerTest.java b/services/venice-server/src/test/java/com/linkedin/venice/listener/ServerStoreAclHandlerTest.java index e1ba14768d..af3c38808d 100644 --- a/services/venice-server/src/test/java/com/linkedin/venice/listener/ServerStoreAclHandlerTest.java +++ b/services/venice-server/src/test/java/com/linkedin/venice/listener/ServerStoreAclHandlerTest.java @@ -45,9 +45,7 @@ public void testCheckWhetherAccessHasAlreadyApproved() { @Test public void testCheckWhetherAccessHasAlreadyApprovedGrpc() { Metadata headers = new Metadata(); - headers.put( - Metadata.Key.of(ServerAclHandler.GRPC_SERVER_ACL_APPROVED_ATTRIBUTE_KEY, Metadata.ASCII_STRING_MARSHALLER), - "true"); + headers.put(Metadata.Key.of(ServerAclHandler.SERVER_ACL_APPROVED, Metadata.ASCII_STRING_MARSHALLER), "true"); assertTrue( ServerStoreAclHandler.checkWhetherAccessHasAlreadyApproved(headers), @@ -60,9 +58,7 @@ public void testInterceptor() { ServerCallHandler next = mock(ServerCallHandler.class); Metadata falseHeaders = new Metadata(); - falseHeaders.put( - Metadata.Key.of(ServerAclHandler.GRPC_SERVER_ACL_APPROVED_ATTRIBUTE_KEY, Metadata.ASCII_STRING_MARSHALLER), - "false"); + falseHeaders.put(Metadata.Key.of(ServerAclHandler.SERVER_ACL_APPROVED, Metadata.ASCII_STRING_MARSHALLER), "false"); ServerStoreAclHandler handler = new ServerStoreAclHandler(mock(DynamicAccessController.class), mock(ReadOnlyStoreRepository.class)); @@ -72,9 +68,7 @@ public void testInterceptor() { verify(next, times(1)).startCall(call, falseHeaders); Metadata trueHeaders = new Metadata(); - trueHeaders.put( - Metadata.Key.of(ServerAclHandler.GRPC_SERVER_ACL_APPROVED_ATTRIBUTE_KEY, Metadata.ASCII_STRING_MARSHALLER), - "true"); + trueHeaders.put(Metadata.Key.of(ServerAclHandler.SERVER_ACL_APPROVED, Metadata.ASCII_STRING_MARSHALLER), "true"); // next.intercept call should not have been invoked handler.interceptCall(call, trueHeaders, next);