From f0a6482a13c8e9850cdf52928f94575684d32dca Mon Sep 17 00:00:00 2001 From: Khor Shu Heng Date: Wed, 23 Dec 2020 11:02:12 +0800 Subject: [PATCH] Apply grpc tracing interceptor on online serving Signed-off-by: Khor Shu Heng --- serving/pom.xml | 13 +- .../serving/config/InstrumentationConfig.java | 6 + .../ServingServiceGRpcController.java | 17 +- .../service/OnlineServingServiceV2.java | 194 +++++++++--------- 4 files changed, 127 insertions(+), 103 deletions(-) diff --git a/serving/pom.xml b/serving/pom.xml index ebc6828ebf..f70a118b21 100644 --- a/serving/pom.xml +++ b/serving/pom.xml @@ -155,21 +155,26 @@ joda-time joda-time - + io.jaegertracing jaeger-client - 0.31.0 + 1.3.2 io.opentracing opentracing-api - 0.31.0 + 0.33.0 io.opentracing opentracing-noop - 0.31.0 + 0.33.0 + + + io.opentracing.contrib + opentracing-grpc + 0.2.3 diff --git a/serving/src/main/java/feast/serving/config/InstrumentationConfig.java b/serving/src/main/java/feast/serving/config/InstrumentationConfig.java index 30269c5d0e..295b263f66 100644 --- a/serving/src/main/java/feast/serving/config/InstrumentationConfig.java +++ b/serving/src/main/java/feast/serving/config/InstrumentationConfig.java @@ -17,6 +17,7 @@ package feast.serving.config; import io.opentracing.Tracer; +import io.opentracing.contrib.grpc.TracingServerInterceptor; import io.opentracing.noop.NoopTracerFactory; import io.prometheus.client.exporter.MetricsServlet; import io.prometheus.client.hotspot.DefaultExports; @@ -54,4 +55,9 @@ public Tracer tracer() { return io.jaegertracing.Configuration.fromEnv(feastProperties.getTracing().getServiceName()) .getTracer(); } + + @Bean + public TracingServerInterceptor tracingInterceptor(Tracer tracer) { + return TracingServerInterceptor.newBuilder().withTracer(tracer).build(); + } } diff --git a/serving/src/main/java/feast/serving/controller/ServingServiceGRpcController.java b/serving/src/main/java/feast/serving/controller/ServingServiceGRpcController.java index f0dd4302d3..531be39f9d 100644 --- a/serving/src/main/java/feast/serving/controller/ServingServiceGRpcController.java +++ b/serving/src/main/java/feast/serving/controller/ServingServiceGRpcController.java @@ -30,16 +30,21 @@ import feast.serving.util.RequestHelper; import io.grpc.Status; import io.grpc.stub.StreamObserver; -import io.opentracing.Scope; import io.opentracing.Span; import io.opentracing.Tracer; +import io.opentracing.contrib.grpc.TracingServerInterceptor; import net.devh.boot.grpc.server.service.GrpcService; import org.slf4j.Logger; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.security.access.AccessDeniedException; import org.springframework.security.core.context.SecurityContextHolder; -@GrpcService(interceptors = {GrpcMessageInterceptor.class, GrpcMonitoringInterceptor.class}) +@GrpcService( + interceptors = { + TracingServerInterceptor.class, + GrpcMessageInterceptor.class, + GrpcMonitoringInterceptor.class + }) public class ServingServiceGRpcController extends ServingServiceImplBase { private static final Logger log = @@ -75,8 +80,7 @@ public void getFeastServingInfo( public void getOnlineFeaturesV2( ServingAPIProto.GetOnlineFeaturesRequestV2 request, StreamObserver responseObserver) { - Span span = tracer.buildSpan("getOnlineFeaturesV2").start(); - try (Scope scope = tracer.scopeManager().activate(span, false)) { + try { // authorize for the project in request object. if (request.getProject() != null && !request.getProject().isEmpty()) { // project set at root level overrides the project set at feature table level @@ -84,7 +88,11 @@ public void getOnlineFeaturesV2( SecurityContextHolder.getContext(), request.getProject()); } RequestHelper.validateOnlineRequest(request); + Span span = tracer.buildSpan("getOnlineFeaturesV2").start(); GetOnlineFeaturesResponse onlineFeatures = servingServiceV2.getOnlineFeatures(request); + if (span != null) { + span.finish(); + } responseObserver.onNext(onlineFeatures); responseObserver.onCompleted(); } catch (SpecRetrievalException e) { @@ -102,6 +110,5 @@ public void getOnlineFeaturesV2( log.warn("Failed to get Online Features", e); responseObserver.onError(e); } - span.finish(); } } diff --git a/serving/src/main/java/feast/serving/service/OnlineServingServiceV2.java b/serving/src/main/java/feast/serving/service/OnlineServingServiceV2.java index 39478d0c64..a78a94ab00 100644 --- a/serving/src/main/java/feast/serving/service/OnlineServingServiceV2.java +++ b/serving/src/main/java/feast/serving/service/OnlineServingServiceV2.java @@ -32,7 +32,7 @@ import feast.storage.api.retriever.Feature; import feast.storage.api.retriever.OnlineRetrieverV2; import io.grpc.Status; -import io.opentracing.Scope; +import io.opentracing.Span; import io.opentracing.Tracer; import java.util.*; import java.util.stream.Collectors; @@ -71,105 +71,111 @@ public GetOnlineFeaturesResponse getOnlineFeatures(GetOnlineFeaturesRequestV2 re projectName = "default"; } - try (Scope scope = tracer.buildSpan("getOnlineFeaturesV2").startActive(true)) { - List entityRows = request.getEntityRowsList(); - // Collect the feature/entity value for each entity row in entityValueMap - Map> entityValuesMap = - entityRows.stream().collect(Collectors.toMap(row -> row, row -> new HashMap<>())); - // Collect the feature/entity status metadata for each entity row in entityValueMap - Map> - entityStatusesMap = - entityRows.stream().collect(Collectors.toMap(row -> row, row -> new HashMap<>())); - - entityRows.forEach( - entityRow -> { - Map valueMap = entityRow.getFieldsMap(); - entityValuesMap.get(entityRow).putAll(valueMap); - entityStatusesMap.get(entityRow).putAll(getMetadataMap(valueMap, false, false)); - }); - - List>> entityRowsFeatures = - retriever.getOnlineFeatures(projectName, entityRows, featureReferences); - - if (entityRowsFeatures.size() != entityRows.size()) { - throw Status.INTERNAL - .withDescription( - "The no. of FeatureRow obtained from OnlineRetriever" - + "does not match no. of entityRow passed.") - .asRuntimeException(); - } + List entityRows = request.getEntityRowsList(); + // Collect the feature/entity value for each entity row in entityValueMap + Map> entityValuesMap = + entityRows.stream().collect(Collectors.toMap(row -> row, row -> new HashMap<>())); + // Collect the feature/entity status metadata for each entity row in entityValueMap + Map> + entityStatusesMap = + entityRows.stream().collect(Collectors.toMap(row -> row, row -> new HashMap<>())); + + entityRows.forEach( + entityRow -> { + Map valueMap = entityRow.getFieldsMap(); + entityValuesMap.get(entityRow).putAll(valueMap); + entityStatusesMap.get(entityRow).putAll(getMetadataMap(valueMap, false, false)); + }); - for (int i = 0; i < entityRows.size(); i++) { - GetOnlineFeaturesRequestV2.EntityRow entityRow = entityRows.get(i); - List> curEntityRowFeatures = entityRowsFeatures.get(i); - - Map> featureReferenceFeatureMap = - getFeatureRefFeatureMap(curEntityRowFeatures); - - Map allValueMaps = new HashMap<>(); - Map allStatusMaps = new HashMap<>(); - - for (FeatureReferenceV2 featureReference : featureReferences) { - if (featureReferenceFeatureMap.containsKey(featureReference)) { - Optional feature = featureReferenceFeatureMap.get(featureReference); - - FeatureTableSpec featureTableSpec = - specService.getFeatureTableSpec(projectName, feature.get().getFeatureReference()); - FeatureProto.FeatureSpecV2 featureSpec = - specService.getFeatureSpec(projectName, feature.get().getFeatureReference()); - ValueProto.ValueType.Enum valueTypeEnum = featureSpec.getValueType(); - ValueProto.Value.ValCase valueCase = feature.get().getFeatureValue().getValCase(); - boolean isMatchingFeatureSpec = checkSameFeatureSpec(valueTypeEnum, valueCase); - - boolean isOutsideMaxAge = checkOutsideMaxAge(featureTableSpec, entityRow, feature); - Map valueMap = - unpackValueMap(feature, isOutsideMaxAge, isMatchingFeatureSpec); - allValueMaps.putAll(valueMap); - - // Generate metadata for feature values and merge into entityFieldsMap - Map statusMap = - getMetadataMap(valueMap, !isMatchingFeatureSpec, isOutsideMaxAge); - allStatusMaps.putAll(statusMap); - - // Populate metrics/log request - populateCountMetrics(statusMap, projectName); - } else { - Map valueMap = - new HashMap<>() { - { - put( - FeatureV2.getFeatureStringRef(featureReference), - ValueProto.Value.newBuilder().build()); - } - }; - allValueMaps.putAll(valueMap); + Span onlineRetrievalSpan = tracer.buildSpan("onlineRetrieval").start(); + if (onlineRetrievalSpan != null) { + onlineRetrievalSpan.setTag("entities", entityRows.size()); + onlineRetrievalSpan.setTag("features", featureReferences.size()); + } + List>> entityRowsFeatures = + retriever.getOnlineFeatures(projectName, entityRows, featureReferences); + if (onlineRetrievalSpan != null) { + onlineRetrievalSpan.finish(); + } - Map statusMap = - getMetadataMap(valueMap, true, false); - allStatusMaps.putAll(statusMap); + if (entityRowsFeatures.size() != entityRows.size()) { + throw Status.INTERNAL + .withDescription( + "The no. of FeatureRow obtained from OnlineRetriever" + + "does not match no. of entityRow passed.") + .asRuntimeException(); + } - // Populate metrics/log request - populateCountMetrics(statusMap, projectName); - } + for (int i = 0; i < entityRows.size(); i++) { + GetOnlineFeaturesRequestV2.EntityRow entityRow = entityRows.get(i); + List> curEntityRowFeatures = entityRowsFeatures.get(i); + + Map> featureReferenceFeatureMap = + getFeatureRefFeatureMap(curEntityRowFeatures); + + Map allValueMaps = new HashMap<>(); + Map allStatusMaps = new HashMap<>(); + + for (FeatureReferenceV2 featureReference : featureReferences) { + if (featureReferenceFeatureMap.containsKey(featureReference)) { + Optional feature = featureReferenceFeatureMap.get(featureReference); + + FeatureTableSpec featureTableSpec = + specService.getFeatureTableSpec(projectName, feature.get().getFeatureReference()); + FeatureProto.FeatureSpecV2 featureSpec = + specService.getFeatureSpec(projectName, feature.get().getFeatureReference()); + ValueProto.ValueType.Enum valueTypeEnum = featureSpec.getValueType(); + ValueProto.Value.ValCase valueCase = feature.get().getFeatureValue().getValCase(); + boolean isMatchingFeatureSpec = checkSameFeatureSpec(valueTypeEnum, valueCase); + + boolean isOutsideMaxAge = checkOutsideMaxAge(featureTableSpec, entityRow, feature); + Map valueMap = + unpackValueMap(feature, isOutsideMaxAge, isMatchingFeatureSpec); + allValueMaps.putAll(valueMap); + + // Generate metadata for feature values and merge into entityFieldsMap + Map statusMap = + getMetadataMap(valueMap, !isMatchingFeatureSpec, isOutsideMaxAge); + allStatusMaps.putAll(statusMap); + + // Populate metrics/log request + populateCountMetrics(statusMap, projectName); + } else { + Map valueMap = + new HashMap<>() { + { + put( + FeatureV2.getFeatureStringRef(featureReference), + ValueProto.Value.newBuilder().build()); + } + }; + allValueMaps.putAll(valueMap); + + Map statusMap = + getMetadataMap(valueMap, true, false); + allStatusMaps.putAll(statusMap); + + // Populate metrics/log request + populateCountMetrics(statusMap, projectName); } - entityValuesMap.get(entityRow).putAll(allValueMaps); - entityStatusesMap.get(entityRow).putAll(allStatusMaps); } - - // Build response field values from entityValuesMap and entityStatusesMap - // Response field values should be in the same order as the entityRows provided by the user. - List fieldValuesList = - entityRows.stream() - .map( - entityRow -> { - return GetOnlineFeaturesResponse.FieldValues.newBuilder() - .putAllFields(entityValuesMap.get(entityRow)) - .putAllStatuses(entityStatusesMap.get(entityRow)) - .build(); - }) - .collect(Collectors.toList()); - return GetOnlineFeaturesResponse.newBuilder().addAllFieldValues(fieldValuesList).build(); + entityValuesMap.get(entityRow).putAll(allValueMaps); + entityStatusesMap.get(entityRow).putAll(allStatusMaps); } + + // Build response field values from entityValuesMap and entityStatusesMap + // Response field values should be in the same order as the entityRows provided by the user. + List fieldValuesList = + entityRows.stream() + .map( + entityRow -> { + return GetOnlineFeaturesResponse.FieldValues.newBuilder() + .putAllFields(entityValuesMap.get(entityRow)) + .putAllStatuses(entityStatusesMap.get(entityRow)) + .build(); + }) + .collect(Collectors.toList()); + return GetOnlineFeaturesResponse.newBuilder().addAllFieldValues(fieldValuesList).build(); } private boolean checkSameFeatureSpec(