From 000cce779a5bf324ce9f784442415be5c1ac2142 Mon Sep 17 00:00:00 2001 From: Vedant Mahabaleshwarkar Date: Thu, 20 Jul 2023 20:04:44 -0400 Subject: [PATCH] Address PR comments Signed-off-by: Vedant Mahabaleshwarkar --- .../com/ibm/watson/modelmesh/Metrics.java | 27 +++--- .../com/ibm/watson/modelmesh/ModelMesh.java | 6 +- .../ibm/watson/modelmesh/ModelMeshApi.java | 97 +++++++++++-------- .../ibm/watson/modelmesh/VModelManager.java | 12 +-- 4 files changed, 75 insertions(+), 67 deletions(-) diff --git a/src/main/java/com/ibm/watson/modelmesh/Metrics.java b/src/main/java/com/ibm/watson/modelmesh/Metrics.java index 715590f07..e8c5e6233 100644 --- a/src/main/java/com/ibm/watson/modelmesh/Metrics.java +++ b/src/main/java/com/ibm/watson/modelmesh/Metrics.java @@ -64,6 +64,7 @@ interface Metrics extends AutoCloseable { boolean isPerModelMetricsEnabled(); boolean isEnabled(); + void logTimingMetricSince(Metric metric, long prevTime, boolean isNano); void logTimingMetricDuration(Metric metric, long elapsed, boolean isNano, String modelId); @@ -164,14 +165,14 @@ final class PrometheusMetrics implements Metrics { private final CollectorRegistry registry; private final NettyServer metricServer; private final boolean shortNames; - private final boolean enablePerModelMetrics; + private final boolean perModelMetricsEnabled; private final EnumMap metricsMap = new EnumMap<>(Metric.class); public PrometheusMetrics(Map params, Map infoMetricParams) throws Exception { int port = 2112; boolean shortNames = true; boolean https = true; - boolean enablePerModelMetrics = true; + boolean perModelMetricsEnabled= true; String memMetrics = "all"; // default to all for (Entry ent : params.entrySet()) { switch (ent.getKey()) { @@ -183,7 +184,7 @@ public PrometheusMetrics(Map params, Map infoMet } break; case "per_model_metrics": - enablePerModelMetrics = "true".equalsIgnoreCase(ent.getValue()); + perModelMetricsEnabled= "true".equalsIgnoreCase(ent.getValue()); break; case "fq_names": shortNames = !"true".equalsIgnoreCase(ent.getValue()); @@ -203,7 +204,7 @@ public PrometheusMetrics(Map params, Map infoMet throw new Exception("Unrecognized metrics config parameter: " + ent.getKey()); } } - this.enablePerModelMetrics = enablePerModelMetrics; + this.perModelMetricsEnabled= perModelMetricsEnabled; registry = new CollectorRegistry(); for (Metric m : Metric.values()) { @@ -237,12 +238,12 @@ public PrometheusMetrics(Map params, Map infoMet if (m == API_REQUEST_TIME || m == API_REQUEST_COUNT || m == INVOKE_MODEL_TIME || m == INVOKE_MODEL_COUNT || m == REQUEST_PAYLOAD_SIZE || m == RESPONSE_PAYLOAD_SIZE) { - if (this.enablePerModelMetrics) { + if (this.perModelMetricsEnabled) { builder.labelNames("method", "code", "modelId", "vModelId"); } else { builder.labelNames("method", "code"); } - } else if (this.enablePerModelMetrics && m.type != GAUGE && m.type != COUNTER && m.type != COUNTER_WITH_HISTO) { + } else if (this.perModelMetricsEnabled && m.type != GAUGE && m.type != COUNTER && m.type != COUNTER_WITH_HISTO) { builder.labelNames("modelId", "vModelId"); } Collector collector = builder.name(m.promName).help(m.description).create(); @@ -352,7 +353,7 @@ public void close() { @Override public boolean isPerModelMetricsEnabled() { - return enablePerModelMetrics; + return perModelMetricsEnabled; } @Override @@ -368,7 +369,7 @@ public void logTimingMetricSince(Metric metric, long prevTime, boolean isNano) { @Override public void logTimingMetricDuration(Metric metric, long elapsed, boolean isNano, String modelId) { - if (enablePerModelMetrics && modelId != null) { + if (perModelMetricsEnabled && modelId != null) { ((Histogram) metricsMap.get(metric)).labels(modelId, "").observe(isNano ? elapsed / M : elapsed); } else { ((Histogram) metricsMap.get(metric)).observe(isNano ? elapsed / M : elapsed); @@ -377,7 +378,7 @@ public void logTimingMetricDuration(Metric metric, long elapsed, boolean isNano, @Override public void logSizeEventMetric(Metric metric, long value, String modelId) { - if (enablePerModelMetrics) { + if (perModelMetricsEnabled) { ((Histogram) metricsMap.get(metric)).labels(modelId, "").observe(value * metric.newMultiplier); } else { ((Histogram) metricsMap.get(metric)).observe(value * metric.newMultiplier); @@ -404,16 +405,16 @@ public void logRequestMetrics(boolean external, String name, long elapsedNanos, .get(external ? API_REQUEST_TIME : INVOKE_MODEL_TIME); int idx = shortNames ? name.indexOf('/') : -1; String methodName = idx == -1 ? name : name.substring(idx + 1); - if (enablePerModelMetrics && vModelId == null) { + if (perModelMetricsEnabled&& vModelId == null) { vModelId = ""; } - if (enablePerModelMetrics) { + if (perModelMetricsEnabled) { timingHisto.labels(methodName, code.name(), modelId, vModelId).observe(elapsedMillis); } else { timingHisto.labels(methodName, code.name()).observe(elapsedMillis); } if (reqPayloadSize != -1) { - if (enablePerModelMetrics) { + if (perModelMetricsEnabled) { ((Histogram) metricsMap.get(REQUEST_PAYLOAD_SIZE)) .labels(methodName, code.name(), modelId, vModelId).observe(reqPayloadSize); } else { @@ -422,7 +423,7 @@ public void logRequestMetrics(boolean external, String name, long elapsedNanos, } } if (respPayloadSize != -1) { - if (enablePerModelMetrics) { + if (perModelMetricsEnabled) { ((Histogram) metricsMap.get(RESPONSE_PAYLOAD_SIZE)) .labels(methodName, code.name(), modelId, vModelId).observe(respPayloadSize); } else { diff --git a/src/main/java/com/ibm/watson/modelmesh/ModelMesh.java b/src/main/java/com/ibm/watson/modelmesh/ModelMesh.java index 7422a0870..54716eaee 100644 --- a/src/main/java/com/ibm/watson/modelmesh/ModelMesh.java +++ b/src/main/java/com/ibm/watson/modelmesh/ModelMesh.java @@ -3315,7 +3315,7 @@ protected Map getMap(Object[] arr) { static final String KNOWN_SIZE_CXT_KEY = "tas.known_size"; static final String UNBALANCED_KEY = "mmesh.unbalanced"; static final String DEST_INST_ID_KEY = "tas.dest_iid"; - static final String VMODELID = "vmodelid"; + static final String VMODEL_ID = "vmodelid"; // these are the possible values for the tas.internal context parameter // it won't be set on requests from outside of the cluster, and will @@ -3431,7 +3431,7 @@ protected Object invokeModel(final String modelId, final Method method, } final String tasInternal = contextMap.get(TAS_INTERNAL_CXT_KEY); - String vModelId = contextMap.getOrDefault(VMODELID, ""); + String vModelId = contextMap.getOrDefault(VMODEL_ID, ""); // Set the external request flag if it's not a tasInternal call or if // tasInternal == INTERNAL_REQ. The latter is a new ensureLoaded // invocation originating from within the cluster. @@ -4437,7 +4437,7 @@ private Object invokeLocalModel(CacheEntry ce, Method method, Object[] args) if (contextMap == null) { vModelId = ""; } else { - vModelId = contextMap.get(VMODELID); + vModelId = contextMap.get(VMODEL_ID); } // The future-waiting timeouts should not be needed, request threads are interrupted when their diff --git a/src/main/java/com/ibm/watson/modelmesh/ModelMeshApi.java b/src/main/java/com/ibm/watson/modelmesh/ModelMeshApi.java index a72ed9d5c..3f87bc5dd 100644 --- a/src/main/java/com/ibm/watson/modelmesh/ModelMeshApi.java +++ b/src/main/java/com/ibm/watson/modelmesh/ModelMeshApi.java @@ -30,6 +30,7 @@ import com.ibm.watson.litelinks.server.ReleaseAfterResponse; import com.ibm.watson.litelinks.server.ServerRequestThread; import com.ibm.watson.modelmesh.DataplaneApiConfig.RpcConfig; +import com.ibm.watson.modelmesh.GrpcSupport.InterruptingListener; import com.ibm.watson.modelmesh.ModelMesh.ExtendedStatusInfo; import com.ibm.watson.modelmesh.api.DeleteVModelRequest; import com.ibm.watson.modelmesh.api.DeleteVModelResponse; @@ -68,6 +69,7 @@ import io.grpc.ServerInterceptors; import io.grpc.ServerMethodDefinition; import io.grpc.ServerServiceDefinition; +import io.grpc.Status.Code; import io.grpc.StatusException; import io.grpc.StatusRuntimeException; import io.grpc.netty.GrpcSslContexts; @@ -345,7 +347,7 @@ protected static void setUnbalancedLitelinksContextParam() { } protected static void setvModelIdLiteLinksContextParam(String vModelId) { - ThreadContext.addContextEntry(ModelMesh.VMODELID, vModelId); + ThreadContext.addContextEntry(ModelMesh.VMODEL_ID, vModelId); } // ----------------- concrete model management methods @@ -434,38 +436,39 @@ public void ensureLoaded(EnsureLoadedRequest request, StreamObserver idList = new ArrayList<>(); idList.add(modelId); @@ -789,26 +804,19 @@ public void onHalfClose() { call.close(status, emptyMeta()); Metrics metrics = delegate.metrics; if (metrics.isEnabled()) { - Iterator midIt = modelIds.iterator(); - while (midIt.hasNext()) { + if (isSingleModelRequest && metrics.isPerModelMetricsEnabled() && modelId!=null) { if (isVModel) { - String mId = null; - String vmId = midIt.next(); - try { - mId = vmm().resolveVModelId(midIt.next(), mId); - metrics.logRequestMetrics(true, methodName, nanoTime() - startNanos, - status.getCode(), reqSize, respSize, mId, vmId); - } - catch (Exception e) { - logger.error("Could not resolve model id for vModelId" + vmId, e); - metrics.logRequestMetrics(true, methodName, nanoTime() - startNanos, - status.getCode(), reqSize, respSize, "", vmId); - } + metrics.logRequestMetrics(true, methodName, nanoTime() - startNanos, + status.getCode(), reqSize, respSize, resolvedModelId, modelId); } else { metrics.logRequestMetrics(true, methodName, nanoTime() - startNanos, - status.getCode(), reqSize, respSize, midIt.next(), ""); + status.getCode(), reqSize, respSize, modelId, ""); } } + else { + metrics.logRequestMetrics(true, methodName, nanoTime() - startNanos, + status.getCode(), reqSize, respSize, "", ""); + } } } } @@ -969,7 +977,10 @@ protected ModelResponse applyParallelMultiModel(List modelIds, boolean i logHeaders.addToMDC(headers); } // need to pass slices of the buffers for threadsafety - return callModel(modelId, isVModel, methodName, balancedMetaVal, headers, data.slice()); + if (isVModel) { + return callModel("", modelId, methodName, balancedMetaVal, headers, data.slice()); + } + return callModel(modelId, "", methodName, balancedMetaVal, headers, data.slice()); } catch (ModelNotFoundException mnfe) { logger.warn("model " + modelId + " not found (from supplied list of " + total + ")"); if (!requireAll) { diff --git a/src/main/java/com/ibm/watson/modelmesh/VModelManager.java b/src/main/java/com/ibm/watson/modelmesh/VModelManager.java index c1867b3f2..058350d06 100644 --- a/src/main/java/com/ibm/watson/modelmesh/VModelManager.java +++ b/src/main/java/com/ibm/watson/modelmesh/VModelManager.java @@ -70,7 +70,7 @@ /** * This class contains logic related to VModels (virtual or versioned models) */ -public final class VModelManager implements Closeable { +public final class VModelManager implements AutoCloseable { private static final Logger logger = LoggerFactory.getLogger(VModelManager.class); @@ -126,13 +126,9 @@ public ListenableFuture start() { } @Override - public void close() { - try { - vModelTable.close(); - targetScaleupExecutor.shutdown(); - } catch (IOException e) { - throw new UncheckedIOException(e); - } + public void close() throws Exception { + vModelTable.close(); + targetScaleupExecutor.shutdown(); } /**