Skip to content

Commit

Permalink
Introduces support for client interceptors in low-level, stub-free API.
Browse files Browse the repository at this point in the history
  • Loading branch information
spericas committed Mar 18, 2024
1 parent c11061d commit 478b043
Show file tree
Hide file tree
Showing 5 changed files with 288 additions and 9 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Copyright (c) 2024 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.helidon.grpc.core;

/**
* gRPC interceptor weight classes. Higher weight means higher priority.
*/
public class InterceptorWeights {

/**
* Context weight.
* <p>
* Interceptors with this weight typically <b>only</b> perform tasks
* such as adding state to the call {@link io.grpc.Context}.
*/
public static final int CONTEXT = 5000;

/**
* Tracing weight.
* <p>
* Tracing and metrics interceptors are typically applied after any context
* interceptors so that they can trace and gather metrics on the whole call
* stack of remaining interceptors.
*/
public static final int TRACING = CONTEXT + 1;

/**
* Security authentication weight.
*/
public static final int AUTHENTICATION = 2000;

/**
* Security authorization weight.
*/
public static final int AUTHORIZATION = 2000;

/**
* User-level weight.
* <p>
* This value is also used as a default weight for application-supplied interceptors.
*/
public static final int USER = 1000;

/**
* Cannot create instances.
*/
private InterceptorWeights() {
}
}
9 changes: 9 additions & 0 deletions grpc/core/src/main/java/io/helidon/grpc/core/WeightedBag.java
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,15 @@ public static <T> WeightedBag<T> withDefaultWeight(double weight) {
new ArrayList<>(), weight);
}

/**
* Check if bag is empty.
*
* @return outcome of test
*/
public boolean isEmpty() {
return contents.isEmpty() && noWeightedList.isEmpty();
}

/**
* Obtain a copy of this {@link WeightedBag}.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,17 @@

package io.helidon.webclient.grpc;

import java.util.Arrays;
import java.util.Objects;

import io.grpc.ClientInterceptor;
import io.helidon.grpc.core.InterceptorWeights;
import io.helidon.grpc.core.MarshallerSupplier;
import io.helidon.grpc.core.MethodHandler;

import io.grpc.CallCredentials;
import io.grpc.MethodDescriptor;
import io.helidon.grpc.core.WeightedBag;

/**
* Encapsulates all metadata necessary to define a gRPC method. In addition to wrapping
Expand All @@ -48,6 +52,11 @@ public final class GrpcClientMethodDescriptor {
*/
private final MethodDescriptor<?, ?> descriptor;

/**
* The list of client interceptors for this method.
*/
private WeightedBag<ClientInterceptor> interceptors;

/**
* The {@link io.grpc.CallCredentials} for this method.
*/
Expand All @@ -60,10 +69,12 @@ public final class GrpcClientMethodDescriptor {

private GrpcClientMethodDescriptor(String name,
MethodDescriptor<?, ?> descriptor,
WeightedBag<ClientInterceptor> interceptors,
CallCredentials callCredentials,
MethodHandler<?, ?> methodHandler) {
this.name = name;
this.descriptor = descriptor;
this.interceptors = interceptors;
this.callCredentials = callCredentials;
this.methodHandler = methodHandler;
}
Expand Down Expand Up @@ -146,6 +157,15 @@ public static Builder bidirectional(String serviceName, String name) {
return builder(serviceName, name, MethodDescriptor.MethodType.BIDI_STREAMING);
}

/**
* Obtain the {@link ClientInterceptor}s to use for this method.
*
* @return the {@link ClientInterceptor}s to use for this method
*/
WeightedBag<ClientInterceptor> interceptors() {
return interceptors.readOnly();
}

/**
* Return the {@link io.grpc.CallCredentials} set on this service.
*
Expand Down Expand Up @@ -234,6 +254,25 @@ public interface Rules {
*/
Rules responseType(Class<?> type);

/**
* Register one or more {@link ClientInterceptor interceptors} for the method.
*
* @param interceptors the interceptor(s) to register
* @return this {@link Rules} instance for fluent call chaining
*/
Rules intercept(ClientInterceptor... interceptors);

/**
* Register one or more {@link ClientInterceptor interceptors} for the method.
* <p>
* The added interceptors will be applied using the specified priority.
*
* @param weight the weight to assign to the interceptors
* @param interceptors one or more {@link ClientInterceptor}s to register
* @return this {@link Rules} to allow fluent method chaining
*/
Rules intercept(double weight, ClientInterceptor... interceptors);

/**
* Register the {@link MarshallerSupplier} for the method.
* <p>
Expand Down Expand Up @@ -276,6 +315,7 @@ public static class Builder
private final MethodDescriptor.Builder<?, ?> descriptor;
private Class<?> requestType;
private Class<?> responseType;
private final WeightedBag<ClientInterceptor> interceptors = WeightedBag.withDefaultWeight(InterceptorWeights.USER);
private MarshallerSupplier defaultMarshallerSupplier = MarshallerSupplier.defaultInstance();
private MarshallerSupplier marshallerSupplier;
private CallCredentials callCredentials;
Expand Down Expand Up @@ -305,6 +345,18 @@ public Builder responseType(Class<?> type) {
return this;
}

@Override
public Builder intercept(ClientInterceptor... interceptors) {
this.interceptors.addAll(Arrays.asList(interceptors));
return this;
}

@Override
public Builder intercept(double weight, ClientInterceptor... interceptors) {
this.interceptors.addAll(Arrays.asList(interceptors), weight);
return this;
}

@Override
public Builder marshallerSupplier(MarshallerSupplier supplier) {
this.marshallerSupplier = supplier;
Expand Down Expand Up @@ -364,6 +416,7 @@ public GrpcClientMethodDescriptor build() {

return new GrpcClientMethodDescriptor(name,
descriptor.build(),
interceptors,
callCredentials,
methodHandler);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,26 +19,48 @@
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;

import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ClientCall;
import io.grpc.ClientInterceptor;
import io.grpc.ClientInterceptors;
import io.grpc.MethodDescriptor;
import io.grpc.stub.ClientCalls;
import io.grpc.stub.StreamObserver;
import io.helidon.grpc.core.WeightedBag;

class GrpcServiceClientImpl implements GrpcServiceClient {
private final GrpcServiceDescriptor descriptor;
private final GrpcServiceDescriptor serviceDescriptor;
private final Channel serviceChannel;
private final GrpcClientImpl grpcClient;
private final Map<String, Channel> methodCache = new ConcurrentHashMap<>();

GrpcServiceClientImpl(GrpcServiceDescriptor descriptor, GrpcClientImpl grpcClient) {
this.descriptor = descriptor;
this.serviceDescriptor = descriptor;
this.grpcClient = grpcClient;

if (descriptor.interceptors().isEmpty()) {
serviceChannel = grpcClient.channel();
} else {
// sort interceptors using a weighted bag
WeightedBag<ClientInterceptor> interceptors = WeightedBag.create();
for (ClientInterceptor interceptor : descriptor.interceptors()) {
interceptors.add(interceptor);
}

// wrap channel to call interceptors -- reversed for composition
List<ClientInterceptor> orderedInterceptors = interceptors.stream().toList().reversed();
serviceChannel = ClientInterceptors.intercept(grpcClient.channel(), orderedInterceptors);
}
}

@Override
public String serviceName() {
return descriptor.serviceName();
return serviceDescriptor.serviceName();
}

@Override
Expand Down Expand Up @@ -152,13 +174,26 @@ public <ReqT, ResT> StreamObserver<ReqT> bidi(String methodName, StreamObserver<
}

private <ReqT, ResT> ClientCall<ReqT, ResT> ensureMethod(String methodName, MethodDescriptor.MethodType methodType) {
GrpcClientMethodDescriptor method = descriptor.method(methodName);
if (!method.type().equals(methodType)) {
throw new IllegalArgumentException("Method " + methodName + " is of type " + method.type()
GrpcClientMethodDescriptor methodDescriptor = serviceDescriptor.method(methodName);
if (!methodDescriptor.type().equals(methodType)) {
throw new IllegalArgumentException("Method " + methodName + " is of type " + methodDescriptor.type()
+ ", yet " + methodType + " was requested.");
}
return methodType == MethodDescriptor.MethodType.UNARY
? new GrpcUnaryClientCall<>(grpcClient, method.descriptor(), CallOptions.DEFAULT)
: new GrpcClientCall<>(grpcClient, method.descriptor(), CallOptions.DEFAULT);

// use channel that contains all service and method interceptors
if (methodDescriptor.interceptors().isEmpty()) {
return serviceChannel.newCall(methodDescriptor.descriptor(), CallOptions.DEFAULT);
} else {
Channel methodChannel = methodCache.computeIfAbsent(methodName, k -> {
WeightedBag<ClientInterceptor> interceptors = WeightedBag.create();
for (ClientInterceptor interceptor : serviceDescriptor.interceptors()) {
interceptors.add(interceptor);
}
interceptors.merge(methodDescriptor.interceptors());
List<ClientInterceptor> orderedInterceptors = interceptors.stream().toList().reversed();
return ClientInterceptors.intercept(grpcClient.channel(), orderedInterceptors);
});
return methodChannel.newCall(methodDescriptor.descriptor(), CallOptions.DEFAULT);
}
}
}
Loading

0 comments on commit 478b043

Please sign in to comment.