Skip to content

Commit

Permalink
core: add ServerTransportFilter
Browse files Browse the repository at this point in the history
Called whenever a ServerTransport is ready and terminated.  Has the
ability to modify transport attributes, which ServerCall.attributes()
are based on.

Related changes:

- Attribute keys for remote address and SSL session are now moved from
ServerCall to a neutral place io.grpc.Grpc, because they can also be
used from ServerTransportFilter, and probably will be used on the
client-side too.  The old keys on ServerCall is marked deprecated and
are equivalent to the new keys.
- Added transportReady() to ServerTransportListener.

Resolves #2132
  • Loading branch information
zhangkun83 committed Aug 31, 2016
1 parent 48c6b3d commit 58d78dd
Show file tree
Hide file tree
Showing 19 changed files with 377 additions and 53 deletions.
7 changes: 7 additions & 0 deletions core/src/main/java/io/grpc/Attributes.java
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,13 @@ public Set<Key<?>> keys() {
return Collections.unmodifiableSet(data.keySet());
}

/**
* Create a new builder that is pre-populated with the content from a given container.
*/
public static Builder newBuilder(Attributes base) {
return newBuilder().setAll(base);
}

/**
* Create a new builder.
*/
Expand Down
60 changes: 60 additions & 0 deletions core/src/main/java/io/grpc/Grpc.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Copyright 2016, Google Inc. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
*
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/

package io.grpc;

import io.grpc.Attributes;

import java.net.SocketAddress;
import javax.net.ssl.SSLSession;

/**
* Stuff that are part of the public API but are not bound to particular classes, e.g., static
* methods, constants, attribute and context keys.
*/
public final class Grpc {
private Grpc() {
}

/**
* Attribute key for the remote address of a transport.
*/
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/1710")
public static final Attributes.Key<SocketAddress> TRANSPORT_ATTR_REMOTE_ADDR =
Attributes.Key.of("remote-addr");

/**
* Attribute key for SSL session of a transport.
*/
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/1710")
public static final Attributes.Key<SSLSession> TRANSPORT_ATTR_SSL_SESSION =
Attributes.Key.of("ssl-session");
}
7 changes: 7 additions & 0 deletions core/src/main/java/io/grpc/ServerBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,13 @@ public static ServerBuilder<?> forPort(int port) {
*/
public abstract T addService(BindableService bindableService);

/**
* Adds a {@link ServerTransportFilter}. The order of filters being added is the order they will
* be executed.
*/
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/2132")
public abstract T addTransportFilter(ServerTransportFilter filter);

/**
* Sets a fallback handler registry that will be looked up in if a method is not found in the
* primary registry.
Expand Down
18 changes: 14 additions & 4 deletions core/src/main/java/io/grpc/ServerCall.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,17 +59,24 @@ public abstract class ServerCall<ReqT, RespT> {
/**
* {@link Attributes.Key} for the remote address of server call attributes
* {@link ServerCall#attributes()}
*
* @deprecated use the equivalent {@link io.grpc.Grpc#TRANSPORT_ATTR_REMOTE_ADDR} instead
*/
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/1710")
@Deprecated
public static final Attributes.Key<SocketAddress> REMOTE_ADDR_KEY =
Attributes.Key.of("remote-addr");
Grpc.TRANSPORT_ATTR_REMOTE_ADDR;

/**
* {@link Attributes.Key} for the SSL session of server call attributes
* {@link ServerCall#attributes()}
*
* @deprecated use the equivalent {@link io.grpc.Grpc#TRANSPORT_ATTR_SSL_SESSION} instead
*/
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/1710")
@Deprecated
public static final Attributes.Key<SSLSession> SSL_SESSION_KEY =
Attributes.Key.of("ssl-session");
Grpc.TRANSPORT_ATTR_SSL_SESSION;

/**
* Callbacks for consuming incoming RPC messages.
Expand Down Expand Up @@ -221,8 +228,11 @@ public void setCompression(String compressor) {
}

/**
* Returns properties of a single call. This is a generic container which can contain any kind of
* information describing call like for example remote address, TLS information (OU etc.)
* Returns properties of a single call.
*
* <p>Attributes originate from the transport and can be altered by {@link ServerTransportFilter}.
* {@link Grpc} defines commonly used attributes, while the availability of them in a particular
* {@code ServerCall} is not guaranteed.
*
* @return Attributes container
*/
Expand Down
73 changes: 73 additions & 0 deletions core/src/main/java/io/grpc/ServerTransportFilter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Copyright 2016, Google Inc. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
*
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/

package io.grpc;

/**
* Listens on server transport life-cycle events, with the capability to read and/or change
* transport attributes. Attributes returned by this filter will be merged into {@link
* ServerCall#attributes}.
*
* <p>Multiple filters maybe registered to a server, in which case the output of a filter is the
* input of the next filter. For example, what returned by {@link #transportReady} of a filter is
* passed to the same method of the next filter, and the last filter's return value is the effective
* transport attributes. A filter should modify the passed-in attributes instead of creating one
* from scratch.
*
* <p>{@link Grpc} defines commonly used attributes.
*/
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/2132")
public abstract class ServerTransportFilter {
/**
* Called when a transport is ready to process streams. All necessary handshakes, e.g., TLS
* handshake, are done at this point.
*
* <p>Note the implementation should always inherit the passed-in attributes using {@code
* Attributes.newBuilder(transportAttrs)}, instead of creating one from scratch.
*
* @param transportAttrs current transport attributes
*
* @return new transport attributes. Default implementation returns the passed-in attributes
* intact.
*/
public Attributes transportReady(Attributes transportAttrs) {
return transportAttrs;
}

/**
* Called when a transport is terminated. Default implementation is no-op.
*
* @param transportAttrs the effective transport attributes, which is what returned by {@link
* #transportReady} of the last executed filter.
*/
public void transportTerminated(Attributes transportAttrs) {
}
}
12 changes: 7 additions & 5 deletions core/src/main/java/io/grpc/inprocess/InProcessTransport.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@
import io.grpc.CallOptions;
import io.grpc.Compressor;
import io.grpc.Decompressor;
import io.grpc.Grpc;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.ServerCall;
import io.grpc.Status;
import io.grpc.internal.ClientStream;
import io.grpc.internal.ClientStreamListener;
Expand Down Expand Up @@ -72,7 +72,7 @@ class InProcessTransport implements ServerTransport, ConnectionClientTransport {

private final String name;
private ServerTransportListener serverTransportListener;
private final Attributes serverStreamAttributes;
private Attributes serverStreamAttributes;
private ManagedClientTransport.Listener clientTransportListener;
@GuardedBy("this")
private boolean shutdown;
Expand All @@ -85,9 +85,6 @@ class InProcessTransport implements ServerTransport, ConnectionClientTransport {

public InProcessTransport(String name) {
this.name = name;
this.serverStreamAttributes = Attributes.newBuilder()
.set(ServerCall.REMOTE_ADDR_KEY, new InProcessSocketAddress(name))
.build();
}

@CheckReturnValue
Expand All @@ -113,8 +110,13 @@ public void run() {
}
return new Runnable() {
@Override
@SuppressWarnings("deprecation")
public void run() {
synchronized (InProcessTransport.this) {
Attributes serverTransportAttrs = Attributes.newBuilder()
.set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, new InProcessSocketAddress(name))
.build();
serverStreamAttributes = serverTransportListener.transportReady(serverTransportAttrs);
clientTransportListener.transportReady();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
package io.grpc.internal;

import static com.google.common.base.MoreObjects.firstNonNull;
import static com.google.common.base.Preconditions.checkNotNull;

import com.google.common.util.concurrent.MoreExecutors;

Expand All @@ -44,7 +45,9 @@
import io.grpc.ServerBuilder;
import io.grpc.ServerMethodDefinition;
import io.grpc.ServerServiceDefinition;
import io.grpc.ServerTransportFilter;

import java.util.ArrayList;
import java.util.concurrent.Executor;

import javax.annotation.Nullable;
Expand All @@ -69,6 +72,9 @@ public abstract class AbstractServerImplBuilder<T extends AbstractServerImplBuil
private final InternalHandlerRegistry.Builder registryBuilder =
new InternalHandlerRegistry.Builder();

private final ArrayList<ServerTransportFilter> transportFilters =
new ArrayList<ServerTransportFilter>();

@Nullable
private HandlerRegistry fallbackRegistry;

Expand Down Expand Up @@ -103,6 +109,12 @@ public final T addService(BindableService bindableService) {
return addService(bindableService.bindService());
}

@Override
public final T addTransportFilter(ServerTransportFilter filter) {
transportFilters.add(checkNotNull(filter, "filter"));
return thisT();
}

@Override
public final T fallbackHandlerRegistry(HandlerRegistry registry) {
this.fallbackRegistry = registry;
Expand All @@ -127,7 +139,8 @@ public ServerImpl build() {
return new ServerImpl(executor, registryBuilder.build(),
firstNonNull(fallbackRegistry, EMPTY_FALLBACK_REGISTRY), transportServer,
Context.ROOT, firstNonNull(decompressorRegistry, DecompressorRegistry.getDefaultInstance()),
firstNonNull(compressorRegistry, CompressorRegistry.getDefaultInstance()));
firstNonNull(compressorRegistry, CompressorRegistry.getDefaultInstance()),
transportFilters);
}

/**
Expand Down
24 changes: 23 additions & 1 deletion core/src/main/java/io/grpc/internal/ServerImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,20 +41,24 @@

import com.google.common.base.Preconditions;

import io.grpc.Attributes;
import io.grpc.CompressorRegistry;
import io.grpc.Context;
import io.grpc.DecompressorRegistry;
import io.grpc.HandlerRegistry;
import io.grpc.Metadata;
import io.grpc.ServerCall;
import io.grpc.ServerMethodDefinition;
import io.grpc.ServerTransportFilter;
import io.grpc.Status;

import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
Expand Down Expand Up @@ -85,6 +89,7 @@ public final class ServerImpl extends io.grpc.Server {
private boolean usingSharedExecutor;
private final InternalHandlerRegistry registry;
private final HandlerRegistry fallbackRegistry;
private final List<ServerTransportFilter> transportFilters;
@GuardedBy("lock") private boolean started;
@GuardedBy("lock") private boolean shutdown;
/** non-{@code null} if immediate shutdown has been requested. */
Expand Down Expand Up @@ -116,7 +121,8 @@ public final class ServerImpl extends io.grpc.Server {
*/
ServerImpl(Executor executor, InternalHandlerRegistry registry, HandlerRegistry fallbackRegistry,
InternalServer transportServer, Context rootContext,
DecompressorRegistry decompressorRegistry, CompressorRegistry compressorRegistry) {
DecompressorRegistry decompressorRegistry, CompressorRegistry compressorRegistry,
List<ServerTransportFilter> transportFilters) {
this.executor = executor;
this.registry = Preconditions.checkNotNull(registry, "registry");
this.fallbackRegistry = Preconditions.checkNotNull(fallbackRegistry, "fallbackRegistry");
Expand All @@ -126,6 +132,8 @@ public final class ServerImpl extends io.grpc.Server {
this.rootContext = Preconditions.checkNotNull(rootContext, "rootContext").fork();
this.decompressorRegistry = decompressorRegistry;
this.compressorRegistry = compressorRegistry;
this.transportFilters = Collections.unmodifiableList(
new ArrayList<ServerTransportFilter>(transportFilters));
}

/**
Expand Down Expand Up @@ -315,13 +323,27 @@ public void serverShutdown() {

private class ServerTransportListenerImpl implements ServerTransportListener {
private final ServerTransport transport;
private Attributes attributes;

public ServerTransportListenerImpl(ServerTransport transport) {
this.transport = transport;
}

@Override
public Attributes transportReady(Attributes attributes) {
for (ServerTransportFilter filter : transportFilters) {
attributes = Preconditions.checkNotNull(filter.transportReady(attributes),
"Filter %s returned null", filter);
}
this.attributes = attributes;
return attributes;
}

@Override
public void transportTerminated() {
for (ServerTransportFilter filter : transportFilters) {
filter.transportTerminated(attributes);
}
transportClosed(transport);
}

Expand Down
3 changes: 2 additions & 1 deletion core/src/main/java/io/grpc/internal/ServerStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ public interface ServerStream extends Stream {
void cancel(Status status);

/**
* Attributes describing stream.
* Attributes describing stream. This is inherited from the transport attributes, and used
* as the basis of {@link io.grpc.ServerCall#attributes}.
*
* @return Attributes container
*/
Expand Down
Loading

0 comments on commit 58d78dd

Please sign in to comment.