diff --git a/core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java b/core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java index dd320eeb8684..f838d9448484 100644 --- a/core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java +++ b/core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java @@ -16,11 +16,8 @@ package io.grpc.internal; -import static com.google.common.base.Preconditions.checkArgument; - import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; -import com.google.common.util.concurrent.MoreExecutors; import io.grpc.Attributes; import io.grpc.BinaryLog; import io.grpc.ClientInterceptor; @@ -38,9 +35,7 @@ import java.net.URI; import java.net.URISyntaxException; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; -import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.concurrent.Executor; @@ -86,13 +81,13 @@ public static ManagedChannelBuilder forTarget(String target) { */ static final long IDLE_MODE_MIN_TIMEOUT_MILLIS = TimeUnit.SECONDS.toMillis(1); - private static final ObjectPool DEFAULT_EXECUTOR_POOL = + protected static final ObjectPool DEFAULT_EXECUTOR_POOL = SharedResourcePool.forResource(GrpcUtil.SHARED_CHANNEL_EXECUTOR); - private static final DecompressorRegistry DEFAULT_DECOMPRESSOR_REGISTRY = + protected static final DecompressorRegistry DEFAULT_DECOMPRESSOR_REGISTRY = DecompressorRegistry.getDefaultInstance(); - private static final CompressorRegistry DEFAULT_COMPRESSOR_REGISTRY = + protected static final CompressorRegistry DEFAULT_COMPRESSOR_REGISTRY = CompressorRegistry.getDefaultInstance(); private static final long DEFAULT_RETRY_BUFFER_SIZE_IN_BYTES = 1L << 24; // 16M @@ -102,16 +97,16 @@ public static ManagedChannelBuilder forTarget(String target) { ObjectPool offloadExecutorPool = DEFAULT_EXECUTOR_POOL; - private final List interceptors = new ArrayList<>(); + protected final List interceptors = new ArrayList<>(); final NameResolverRegistry nameResolverRegistry = NameResolverRegistry.getDefaultRegistry(); // Access via getter, which may perform authority override as needed - private NameResolver.Factory nameResolverFactory = nameResolverRegistry.asFactory(); + protected NameResolver.Factory nameResolverFactory = nameResolverRegistry.asFactory(); final String target; @Nullable - private final SocketAddress directServerAddress; + protected final SocketAddress directServerAddress; @Nullable String userAgent; @@ -149,7 +144,7 @@ public static ManagedChannelBuilder forTarget(String target) { protected TransportTracer.Factory transportTracerFactory = TransportTracer.getDefaultFactory(); - private int maxInboundMessageSize = GrpcUtil.DEFAULT_MAX_MESSAGE_SIZE; + protected int maxInboundMessageSize = GrpcUtil.DEFAULT_MAX_MESSAGE_SIZE; @Nullable BinaryLog binlog; @@ -157,28 +152,11 @@ public static ManagedChannelBuilder forTarget(String target) { @Nullable ProxyDetector proxyDetector; - /** - * Sets the maximum message size allowed for a single gRPC frame. If an inbound messages - * larger than this limit is received it will not be processed and the RPC will fail with - * RESOURCE_EXHAUSTED. - */ - // Can be overridden by subclasses. - @Override - public T maxInboundMessageSize(int max) { - checkArgument(max >= 0, "negative max"); - maxInboundMessageSize = max; - return thisT(); - } - - protected final int maxInboundMessageSize() { - return maxInboundMessageSize; - } - - private boolean statsEnabled = true; - private boolean recordStartedRpcs = true; - private boolean recordFinishedRpcs = true; - private boolean recordRealTimeMetrics = false; - private boolean tracingEnabled = true; + protected boolean statsEnabled = true; + protected boolean recordStartedRpcs = true; + protected boolean recordFinishedRpcs = true; + protected boolean recordRealTimeMetrics = false; + protected boolean tracingEnabled = true; protected AbstractManagedChannelImplBuilder(String target) { this.target = Preconditions.checkNotNull(target, "target"); @@ -206,306 +184,6 @@ protected AbstractManagedChannelImplBuilder(SocketAddress directServerAddress, S this.nameResolverFactory = new DirectAddressNameResolverFactory(directServerAddress, authority); } - @Override - public final T directExecutor() { - return executor(MoreExecutors.directExecutor()); - } - - @Override - public final T executor(Executor executor) { - if (executor != null) { - this.executorPool = new FixedObjectPool<>(executor); - } else { - this.executorPool = DEFAULT_EXECUTOR_POOL; - } - return thisT(); - } - - @Override - public final T offloadExecutor(Executor executor) { - if (executor != null) { - this.offloadExecutorPool = new FixedObjectPool<>(executor); - } else { - this.offloadExecutorPool = DEFAULT_EXECUTOR_POOL; - } - return thisT(); - } - - @Override - public final T intercept(List interceptors) { - this.interceptors.addAll(interceptors); - return thisT(); - } - - @Override - public final T intercept(ClientInterceptor... interceptors) { - return intercept(Arrays.asList(interceptors)); - } - - @Deprecated - @Override - public final T nameResolverFactory(NameResolver.Factory resolverFactory) { - Preconditions.checkState(directServerAddress == null, - "directServerAddress is set (%s), which forbids the use of NameResolverFactory", - directServerAddress); - if (resolverFactory != null) { - this.nameResolverFactory = resolverFactory; - } else { - this.nameResolverFactory = nameResolverRegistry.asFactory(); - } - return thisT(); - } - - @Override - public final T defaultLoadBalancingPolicy(String policy) { - Preconditions.checkState(directServerAddress == null, - "directServerAddress is set (%s), which forbids the use of load-balancing policy", - directServerAddress); - Preconditions.checkArgument(policy != null, "policy cannot be null"); - this.defaultLbPolicy = policy; - return thisT(); - } - - @Override - public final T enableFullStreamDecompression() { - this.fullStreamDecompression = true; - return thisT(); - } - - @Override - public final T decompressorRegistry(DecompressorRegistry registry) { - if (registry != null) { - this.decompressorRegistry = registry; - } else { - this.decompressorRegistry = DEFAULT_DECOMPRESSOR_REGISTRY; - } - return thisT(); - } - - @Override - public final T compressorRegistry(CompressorRegistry registry) { - if (registry != null) { - this.compressorRegistry = registry; - } else { - this.compressorRegistry = DEFAULT_COMPRESSOR_REGISTRY; - } - return thisT(); - } - - @Override - public final T userAgent(@Nullable String userAgent) { - this.userAgent = userAgent; - return thisT(); - } - - @Override - public final T overrideAuthority(String authority) { - this.authorityOverride = checkAuthority(authority); - return thisT(); - } - - @Override - public final T idleTimeout(long value, TimeUnit unit) { - checkArgument(value > 0, "idle timeout is %s, but must be positive", value); - // We convert to the largest unit to avoid overflow - if (unit.toDays(value) >= IDLE_MODE_MAX_TIMEOUT_DAYS) { - // This disables idle mode - this.idleTimeoutMillis = ManagedChannelImpl.IDLE_TIMEOUT_MILLIS_DISABLE; - } else { - this.idleTimeoutMillis = Math.max(unit.toMillis(value), IDLE_MODE_MIN_TIMEOUT_MILLIS); - } - return thisT(); - } - - @Override - public final T maxRetryAttempts(int maxRetryAttempts) { - this.maxRetryAttempts = maxRetryAttempts; - return thisT(); - } - - @Override - public final T maxHedgedAttempts(int maxHedgedAttempts) { - this.maxHedgedAttempts = maxHedgedAttempts; - return thisT(); - } - - @Override - public final T retryBufferSize(long bytes) { - checkArgument(bytes > 0L, "retry buffer size must be positive"); - retryBufferSize = bytes; - return thisT(); - } - - @Override - public final T perRpcBufferLimit(long bytes) { - checkArgument(bytes > 0L, "per RPC buffer limit must be positive"); - perRpcBufferLimit = bytes; - return thisT(); - } - - @Override - public final T disableRetry() { - retryEnabled = false; - return thisT(); - } - - @Override - public final T enableRetry() { - retryEnabled = true; - statsEnabled = false; - tracingEnabled = false; - return thisT(); - } - - @Override - public final T setBinaryLog(BinaryLog binlog) { - this.binlog = binlog; - return thisT(); - } - - @Override - public T maxTraceEvents(int maxTraceEvents) { - checkArgument(maxTraceEvents >= 0, "maxTraceEvents must be non-negative"); - this.maxTraceEvents = maxTraceEvents; - return thisT(); - } - - @Override - public T proxyDetector(@Nullable ProxyDetector proxyDetector) { - this.proxyDetector = proxyDetector; - return thisT(); - } - - @Override - public T defaultServiceConfig(@Nullable Map serviceConfig) { - // TODO(notcarl): use real parsing - defaultServiceConfig = checkMapEntryTypes(serviceConfig); - return thisT(); - } - - @Nullable - private static Map checkMapEntryTypes(@Nullable Map map) { - if (map == null) { - return null; - } - // Not using ImmutableMap.Builder because of extra guava dependency for Android. - Map parsedMap = new LinkedHashMap<>(); - for (Map.Entry entry : map.entrySet()) { - checkArgument( - entry.getKey() instanceof String, - "The key of the entry '%s' is not of String type", entry); - - String key = (String) entry.getKey(); - Object value = entry.getValue(); - if (value == null) { - parsedMap.put(key, null); - } else if (value instanceof Map) { - parsedMap.put(key, checkMapEntryTypes((Map) value)); - } else if (value instanceof List) { - parsedMap.put(key, checkListEntryTypes((List) value)); - } else if (value instanceof String) { - parsedMap.put(key, value); - } else if (value instanceof Double) { - parsedMap.put(key, value); - } else if (value instanceof Boolean) { - parsedMap.put(key, value); - } else { - throw new IllegalArgumentException( - "The value of the map entry '" + entry + "' is of type '" + value.getClass() - + "', which is not supported"); - } - } - return Collections.unmodifiableMap(parsedMap); - } - - private static List checkListEntryTypes(List list) { - List parsedList = new ArrayList<>(list.size()); - for (Object value : list) { - if (value == null) { - parsedList.add(null); - } else if (value instanceof Map) { - parsedList.add(checkMapEntryTypes((Map) value)); - } else if (value instanceof List) { - parsedList.add(checkListEntryTypes((List) value)); - } else if (value instanceof String) { - parsedList.add(value); - } else if (value instanceof Double) { - parsedList.add(value); - } else if (value instanceof Boolean) { - parsedList.add(value); - } else { - throw new IllegalArgumentException( - "The entry '" + value + "' is of type '" + value.getClass() - + "', which is not supported"); - } - } - return Collections.unmodifiableList(parsedList); - } - - @Override - public T disableServiceConfigLookUp() { - this.lookUpServiceConfig = false; - return thisT(); - } - - /** - * Disable or enable stats features. Enabled by default. - * - *

For the current release, calling {@code setStatsEnabled(true)} may have a side effect that - * disables retry. - */ - protected void setStatsEnabled(boolean value) { - statsEnabled = value; - } - - /** - * Disable or enable stats recording for RPC upstarts. Effective only if {@link - * #setStatsEnabled} is set to true. Enabled by default. - */ - protected void setStatsRecordStartedRpcs(boolean value) { - recordStartedRpcs = value; - } - - /** - * Disable or enable stats recording for RPC completions. Effective only if {@link - * #setStatsEnabled} is set to true. Enabled by default. - */ - protected void setStatsRecordFinishedRpcs(boolean value) { - recordFinishedRpcs = value; - } - - /** - * Disable or enable real-time metrics recording. Effective only if {@link #setStatsEnabled} is - * set to true. Disabled by default. - */ - protected void setStatsRecordRealTimeMetrics(boolean value) { - recordRealTimeMetrics = value; - } - - /** - * Disable or enable tracing features. Enabled by default. - * - *

For the current release, calling {@code setTracingEnabled(true)} may have a side effect that - * disables retry. - */ - protected void setTracingEnabled(boolean value) { - tracingEnabled = value; - } - - @VisibleForTesting - final long getIdleTimeoutMillis() { - return idleTimeoutMillis; - } - - /** - * Verifies the authority is valid. This method exists as an escape hatch for putting in an - * authority that is valid, but would fail the default validation provided by this - * implementation. - */ - protected String checkAuthority(String authority) { - return GrpcUtil.checkAuthority(authority); - } - // Temporarily disable retry when stats or tracing is enabled to avoid breakage, until we know // what should be the desired behavior for retry + stats/tracing. // TODO(zdapeng): FIX IT @@ -639,15 +317,6 @@ public String getDefaultScheme() { } } - /** - * Returns the correctly typed version of the builder. - */ - private T thisT() { - @SuppressWarnings("unchecked") - T thisT = (T) this; - return thisT; - } - /** * Returns the internal offload executor pool for offloading tasks. */ diff --git a/core/src/main/java/io/grpc/internal/ManagedChannelImplBuilder.java b/core/src/main/java/io/grpc/internal/ManagedChannelImplBuilder.java index 1043cc359848..38a50be86095 100644 --- a/core/src/main/java/io/grpc/internal/ManagedChannelImplBuilder.java +++ b/core/src/main/java/io/grpc/internal/ManagedChannelImplBuilder.java @@ -16,11 +16,26 @@ package io.grpc.internal; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.MoreExecutors; +import io.grpc.BinaryLog; +import io.grpc.ClientInterceptor; +import io.grpc.CompressorRegistry; +import io.grpc.DecompressorRegistry; import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; +import io.grpc.NameResolver; +import io.grpc.ProxyDetector; import java.net.SocketAddress; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; import javax.annotation.Nullable; /** @@ -145,49 +160,331 @@ public ManagedChannel build() { TimeProvider.SYSTEM_TIME_PROVIDER)); } - /** Disable the check whether the authority is valid. */ - public ManagedChannelImplBuilder disableCheckAuthority() { - authorityCheckerDisabled = true; + @Override + public ManagedChannelImplBuilder directExecutor() { + return executor(MoreExecutors.directExecutor()); + } + + @Override + public ManagedChannelImplBuilder executor(Executor executor) { + if (executor != null) { + this.executorPool = new FixedObjectPool<>(executor); + } else { + this.executorPool = DEFAULT_EXECUTOR_POOL; + } return this; } - /** Enable previously disabled authority check. */ - public ManagedChannelImplBuilder enableCheckAuthority() { - authorityCheckerDisabled = false; + @Override + public ManagedChannelImplBuilder offloadExecutor(Executor executor) { + if (executor != null) { + this.offloadExecutorPool = new FixedObjectPool<>(executor); + } else { + this.offloadExecutorPool = DEFAULT_EXECUTOR_POOL; + } return this; } @Override - protected String checkAuthority(String authority) { - if (authorityCheckerDisabled) { - return authority; + public ManagedChannelImplBuilder intercept(List interceptors) { + this.interceptors.addAll(interceptors); + return this; + } + + @Override + public ManagedChannelImplBuilder intercept(ClientInterceptor... interceptors) { + return intercept(Arrays.asList(interceptors)); + } + + @Override + public ManagedChannelImplBuilder userAgent(@Nullable String userAgent) { + this.userAgent = userAgent; + return this; + } + + @Override + public ManagedChannelImplBuilder overrideAuthority(String authority) { + this.authorityOverride = checkAuthority(authority); + return this; + } + + @Deprecated + @Override + public ManagedChannelImplBuilder nameResolverFactory(NameResolver.Factory resolverFactory) { + Preconditions.checkState(directServerAddress == null, + "directServerAddress is set (%s), which forbids the use of NameResolverFactory", + directServerAddress); + if (resolverFactory != null) { + this.nameResolverFactory = resolverFactory; + } else { + this.nameResolverFactory = nameResolverRegistry.asFactory(); } - return super.checkAuthority(authority); + return this; } @Override - public void setStatsEnabled(boolean value) { - super.setStatsEnabled(value); + public ManagedChannelImplBuilder defaultLoadBalancingPolicy(String policy) { + Preconditions.checkState(directServerAddress == null, + "directServerAddress is set (%s), which forbids the use of load-balancing policy", + directServerAddress); + Preconditions.checkArgument(policy != null, "policy cannot be null"); + this.defaultLbPolicy = policy; + return this; } @Override - public void setStatsRecordStartedRpcs(boolean value) { - super.setStatsRecordStartedRpcs(value); + public ManagedChannelImplBuilder enableFullStreamDecompression() { + this.fullStreamDecompression = true; + return this; } @Override - public void setStatsRecordFinishedRpcs(boolean value) { - super.setStatsRecordFinishedRpcs(value); + public ManagedChannelImplBuilder decompressorRegistry(DecompressorRegistry registry) { + if (registry != null) { + this.decompressorRegistry = registry; + } else { + this.decompressorRegistry = DEFAULT_DECOMPRESSOR_REGISTRY; + } + return this; } @Override - public void setStatsRecordRealTimeMetrics(boolean value) { - super.setStatsRecordRealTimeMetrics(value); + public ManagedChannelImplBuilder compressorRegistry(CompressorRegistry registry) { + if (registry != null) { + this.compressorRegistry = registry; + } else { + this.compressorRegistry = DEFAULT_COMPRESSOR_REGISTRY; + } + return this; + } + + @Override + public ManagedChannelImplBuilder idleTimeout(long value, TimeUnit unit) { + Preconditions.checkArgument(value > 0, "idle timeout is %s, but must be positive", value); + // We convert to the largest unit to avoid overflow + if (unit.toDays(value) >= IDLE_MODE_MAX_TIMEOUT_DAYS) { + // This disables idle mode + this.idleTimeoutMillis = ManagedChannelImpl.IDLE_TIMEOUT_MILLIS_DISABLE; + } else { + this.idleTimeoutMillis = Math.max(unit.toMillis(value), IDLE_MODE_MIN_TIMEOUT_MILLIS); + } + return this; + } + + /** + * Sets the maximum message size allowed for a single gRPC frame. If an inbound messages + * larger than this limit is received it will not be processed and the RPC will fail with + * RESOURCE_EXHAUSTED. + */ + // Can be overridden by subclasses. + @Override + public ManagedChannelImplBuilder maxInboundMessageSize(int max) { + Preconditions.checkArgument(max >= 0, "negative max"); + maxInboundMessageSize = max; + return this; + } + + @Override + public ManagedChannelImplBuilder maxRetryAttempts(int maxRetryAttempts) { + this.maxRetryAttempts = maxRetryAttempts; + return this; + } + + @Override + public ManagedChannelImplBuilder maxHedgedAttempts(int maxHedgedAttempts) { + this.maxHedgedAttempts = maxHedgedAttempts; + return this; + } + + @Override + public ManagedChannelImplBuilder retryBufferSize(long bytes) { + Preconditions.checkArgument(bytes > 0L, "retry buffer size must be positive"); + retryBufferSize = bytes; + return this; + } + + @Override + public ManagedChannelImplBuilder perRpcBufferLimit(long bytes) { + Preconditions.checkArgument(bytes > 0L, "per RPC buffer limit must be positive"); + perRpcBufferLimit = bytes; + return this; + } + + @Override + public ManagedChannelImplBuilder disableRetry() { + retryEnabled = false; + return this; + } + + @Override + public ManagedChannelImplBuilder enableRetry() { + retryEnabled = true; + statsEnabled = false; + tracingEnabled = false; + return this; + } + + @Override + public ManagedChannelImplBuilder setBinaryLog(BinaryLog binlog) { + this.binlog = binlog; + return this; + } + + @Override + public ManagedChannelImplBuilder maxTraceEvents(int maxTraceEvents) { + Preconditions.checkArgument(maxTraceEvents >= 0, "maxTraceEvents must be non-negative"); + this.maxTraceEvents = maxTraceEvents; + return this; + } + + @Override + public ManagedChannelImplBuilder proxyDetector(@Nullable ProxyDetector proxyDetector) { + this.proxyDetector = proxyDetector; + return this; + } + + @Override + public ManagedChannelImplBuilder defaultServiceConfig(@Nullable Map serviceConfig) { + // TODO(notcarl): use real parsing + defaultServiceConfig = checkMapEntryTypes(serviceConfig); + return this; } @Override + public ManagedChannelImplBuilder disableServiceConfigLookUp() { + this.lookUpServiceConfig = false; + return this; + } + + @Nullable + private static Map checkMapEntryTypes(@Nullable Map map) { + if (map == null) { + return null; + } + // Not using ImmutableMap.Builder because of extra guava dependency for Android. + Map parsedMap = new LinkedHashMap<>(); + for (Map.Entry entry : map.entrySet()) { + Preconditions.checkArgument( + entry.getKey() instanceof String, + "The key of the entry '%s' is not of String type", entry); + + String key = (String) entry.getKey(); + Object value = entry.getValue(); + if (value == null) { + parsedMap.put(key, null); + } else if (value instanceof Map) { + parsedMap.put(key, checkMapEntryTypes((Map) value)); + } else if (value instanceof List) { + parsedMap.put(key, checkListEntryTypes((List) value)); + } else if (value instanceof String) { + parsedMap.put(key, value); + } else if (value instanceof Double) { + parsedMap.put(key, value); + } else if (value instanceof Boolean) { + parsedMap.put(key, value); + } else { + throw new IllegalArgumentException( + "The value of the map entry '" + entry + "' is of type '" + value.getClass() + + "', which is not supported"); + } + } + return Collections.unmodifiableMap(parsedMap); + } + + private static List checkListEntryTypes(List list) { + List parsedList = new ArrayList<>(list.size()); + for (Object value : list) { + if (value == null) { + parsedList.add(null); + } else if (value instanceof Map) { + parsedList.add(checkMapEntryTypes((Map) value)); + } else if (value instanceof List) { + parsedList.add(checkListEntryTypes((List) value)); + } else if (value instanceof String) { + parsedList.add(value); + } else if (value instanceof Double) { + parsedList.add(value); + } else if (value instanceof Boolean) { + parsedList.add(value); + } else { + throw new IllegalArgumentException( + "The entry '" + value + "' is of type '" + value.getClass() + + "', which is not supported"); + } + } + return Collections.unmodifiableList(parsedList); + } + + /** + * Disable or enable stats features. Enabled by default. + * + *

For the current release, calling {@code setStatsEnabled(true)} may have a side effect that + * disables retry. + */ + public void setStatsEnabled(boolean value) { + statsEnabled = value; + } + + /** + * Disable or enable stats recording for RPC upstarts. Effective only if {@link + * #setStatsEnabled} is set to true. Enabled by default. + */ + public void setStatsRecordStartedRpcs(boolean value) { + recordStartedRpcs = value; + } + + /** + * Disable or enable stats recording for RPC completions. Effective only if {@link + * #setStatsEnabled} is set to true. Enabled by default. + */ + public void setStatsRecordFinishedRpcs(boolean value) { + recordFinishedRpcs = value; + } + + /** + * Disable or enable real-time metrics recording. Effective only if {@link #setStatsEnabled} is + * set to true. Disabled by default. + */ + public void setStatsRecordRealTimeMetrics(boolean value) { + recordRealTimeMetrics = value; + } + + /** + * Disable or enable tracing features. Enabled by default. + * + *

For the current release, calling {@code setTracingEnabled(true)} may have a side effect that + * disables retry. + */ public void setTracingEnabled(boolean value) { - super.setTracingEnabled(value); + tracingEnabled = value; + } + + /** Disable the check whether the authority is valid. */ + public ManagedChannelImplBuilder disableCheckAuthority() { + authorityCheckerDisabled = true; + return this; + } + + /** Enable previously disabled authority check. */ + public ManagedChannelImplBuilder enableCheckAuthority() { + authorityCheckerDisabled = false; + return this; + } + + @VisibleForTesting + final long getIdleTimeoutMillis() { + return idleTimeoutMillis; + } + + /** + * Verifies the authority is valid. + */ + @VisibleForTesting + String checkAuthority(String authority) { + if (authorityCheckerDisabled) { + return authority; + } + return GrpcUtil.checkAuthority(authority); } @Override