From 2b1ba713f5b1a909a824e30dc543cd5fc8b85060 Mon Sep 17 00:00:00 2001 From: Prashant Sharma Date: Mon, 6 Mar 2023 11:24:59 +0530 Subject: [PATCH 1/6] Add modelid and vmodelid labels to metrics. Signed-off-by: Vedant Mahabaleshwarkar --- .../com/ibm/watson/modelmesh/Metrics.java | 109 +++++++++++++----- .../com/ibm/watson/modelmesh/ModelMesh.java | 25 ++-- .../ibm/watson/modelmesh/ModelMeshApi.java | 10 +- .../watson/prometheus/SimpleCollector.java | 2 +- .../modelmesh/ModelMeshMetricsTest.java | 40 ++++--- 5 files changed, 127 insertions(+), 59 deletions(-) diff --git a/src/main/java/com/ibm/watson/modelmesh/Metrics.java b/src/main/java/com/ibm/watson/modelmesh/Metrics.java index b246a5c3..5afc9491 100644 --- a/src/main/java/com/ibm/watson/modelmesh/Metrics.java +++ b/src/main/java/com/ibm/watson/modelmesh/Metrics.java @@ -39,14 +39,20 @@ import java.lang.reflect.Array; import java.net.SocketAddress; import java.nio.channels.DatagramChannel; -import java.util.*; +import java.util.Collections; +import java.util.EnumMap; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; import java.util.Map.Entry; +import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.stream.Stream; import static com.ibm.watson.modelmesh.Metric.*; +import static com.ibm.watson.modelmesh.Metric.MetricType.*; import static com.ibm.watson.modelmesh.ModelMesh.M; import static com.ibm.watson.modelmesh.ModelMeshEnvVars.MMESH_CUSTOM_ENV_VAR; import static com.ibm.watson.modelmesh.ModelMeshEnvVars.MMESH_METRICS_ENV_VAR; @@ -56,14 +62,14 @@ * */ interface Metrics extends AutoCloseable { + boolean isPerModelMetricsEnabled(); boolean isEnabled(); - void logTimingMetricSince(Metric metric, long prevTime, boolean isNano); - void logTimingMetricDuration(Metric metric, long elapsed, boolean isNano); + void logTimingMetricDuration(Metric metric, long elapsed, boolean isNano, String modelId); - void logSizeEventMetric(Metric metric, long value); + void logSizeEventMetric(Metric metric, long value, String modelId); void logGaugeMetric(Metric metric, long value); @@ -101,7 +107,7 @@ default void logInstanceStats(final InstanceRecord ir) { * @param respPayloadSize response payload size in bytes (or -1 if not applicable) */ void logRequestMetrics(boolean external, String name, long elapsedNanos, Code code, - int reqPayloadSize, int respPayloadSize); + int reqPayloadSize, int respPayloadSize, String modelId, String vModelId); default void registerGlobals() {} @@ -111,6 +117,11 @@ default void unregisterGlobals() {} default void close() {} Metrics NO_OP_METRICS = new Metrics() { + @Override + public boolean isPerModelMetricsEnabled() { + return false; + } + @Override public boolean isEnabled() { return false; @@ -120,10 +131,10 @@ public boolean isEnabled() { public void logTimingMetricSince(Metric metric, long prevTime, boolean isNano) {} @Override - public void logTimingMetricDuration(Metric metric, long elapsed, boolean isNano) {} + public void logTimingMetricDuration(Metric metric, long elapsed, boolean isNano, String modelId){} @Override - public void logSizeEventMetric(Metric metric, long value) {} + public void logSizeEventMetric(Metric metric, long value, String modelId){} @Override public void logGaugeMetric(Metric metric, long value) {} @@ -136,7 +147,7 @@ public void logInstanceStats(InstanceRecord ir) {} @Override public void logRequestMetrics(boolean external, String name, long elapsedNanos, Code code, - int reqPayloadSize, int respPayloadSize) {} + int reqPayloadSize, int respPayloadSize, String modelId, String vModelId) {} }; final class PrometheusMetrics implements Metrics { @@ -154,12 +165,14 @@ final class PrometheusMetrics implements Metrics { private final CollectorRegistry registry; private final NettyServer metricServer; private final boolean shortNames; + private final boolean enablePerModelMetrics; private final EnumMap metricsMap = new EnumMap<>(Metric.class); public PrometheusMetrics(Map params, Map infoMetricParams) throws Exception { int port = 2112; boolean shortNames = true; boolean https = true; + boolean enablePerModelMetrics = true; String memMetrics = "all"; // default to all for (Entry ent : params.entrySet()) { switch (ent.getKey()) { @@ -170,6 +183,9 @@ public PrometheusMetrics(Map params, Map infoMet throw new Exception("Invalid metrics port: " + ent.getValue()); } break; + case "per_model_metrics": + enablePerModelMetrics = "true".equalsIgnoreCase(ent.getValue()); + break; case "fq_names": shortNames = !"true".equalsIgnoreCase(ent.getValue()); break; @@ -188,6 +204,7 @@ public PrometheusMetrics(Map params, Map infoMet throw new Exception("Unrecognized metrics config parameter: " + ent.getKey()); } } + this.enablePerModelMetrics = enablePerModelMetrics; registry = new CollectorRegistry(); for (Metric m : Metric.values()) { @@ -220,10 +237,15 @@ public PrometheusMetrics(Map params, Map infoMet } if (m == API_REQUEST_TIME || m == API_REQUEST_COUNT || m == INVOKE_MODEL_TIME - || m == INVOKE_MODEL_COUNT || m == REQUEST_PAYLOAD_SIZE || m == RESPONSE_PAYLOAD_SIZE) { - builder.labelNames("method", "code"); + || m == INVOKE_MODEL_COUNT || m == REQUEST_PAYLOAD_SIZE || m == RESPONSE_PAYLOAD_SIZE) { + if (this.enablePerModelMetrics && m.type != COUNTER_WITH_HISTO) { + builder.labelNames("method", "code", "modelId"); + } else { + builder.labelNames("method", "code"); + } + } else if (this.enablePerModelMetrics && m.type != GAUGE && m.type != COUNTER && m.type != COUNTER_WITH_HISTO) { + builder.labelNames("modelId"); } - Collector collector = builder.name(m.promName).help(m.description).create(); metricsMap.put(m, collector); if (!m.global) { @@ -251,7 +273,6 @@ public PrometheusMetrics(Map params, Map infoMet this.metricServer = new NettyServer(registry, port, https); this.shortNames = shortNames; - logger.info("Will expose " + (https ? "https" : "http") + " Prometheus metrics on port " + port + " using " + (shortNames ? "short" : "fully-qualified") + " method names"); @@ -330,6 +351,11 @@ public void close() { this.metricServer.close(); } + @Override + public boolean isPerModelMetricsEnabled() { + return enablePerModelMetrics; + } + @Override public boolean isEnabled() { return true; @@ -342,13 +368,21 @@ public void logTimingMetricSince(Metric metric, long prevTime, boolean isNano) { } @Override - public void logTimingMetricDuration(Metric metric, long elapsed, boolean isNano) { - ((Histogram) metricsMap.get(metric)).observe(isNano ? elapsed / M : elapsed); + public void logTimingMetricDuration(Metric metric, long elapsed, boolean isNano, String modelId) { + if (enablePerModelMetrics) { + ((Histogram) metricsMap.get(metric)).labels(modelId).observe(isNano ? elapsed / M : elapsed); + } else { + ((Histogram) metricsMap.get(metric)).observe(isNano ? elapsed / M : elapsed); + } } @Override - public void logSizeEventMetric(Metric metric, long value) { - ((Histogram) metricsMap.get(metric)).observe(value * metric.newMultiplier); + public void logSizeEventMetric(Metric metric, long value, String modelId) { + if (enablePerModelMetrics) { + ((Histogram) metricsMap.get(metric)).labels(modelId).observe(value * metric.newMultiplier); + } else { + ((Histogram) metricsMap.get(metric)).observe(value * metric.newMultiplier); + } } @Override @@ -365,23 +399,35 @@ public void logCounterMetric(Metric metric) { @Override public void logRequestMetrics(boolean external, String name, long elapsedNanos, Code code, - int reqPayloadSize, int respPayloadSize) { + int reqPayloadSize, int respPayloadSize, String modelId, String vModelId) { final long elapsedMillis = elapsedNanos / M; final Histogram timingHisto = (Histogram) metricsMap .get(external ? API_REQUEST_TIME : INVOKE_MODEL_TIME); - + String mId = vModelId == null ? modelId : vModelId; int idx = shortNames ? name.indexOf('/') : -1; - final String methodName = idx == -1 ? name : name.substring(idx + 1); - - timingHisto.labels(methodName, code.name()).observe(elapsedMillis); - + String methodName = idx == -1 ? name : name.substring(idx + 1); + if (enablePerModelMetrics) { + timingHisto.labels(methodName, code.name(), mId).observe(elapsedMillis); + } else { + timingHisto.labels(methodName, code.name()).observe(elapsedMillis); + } if (reqPayloadSize != -1) { - ((Histogram) metricsMap.get(REQUEST_PAYLOAD_SIZE)) - .labels(methodName, code.name()).observe(reqPayloadSize); + if (enablePerModelMetrics) { + ((Histogram) metricsMap.get(REQUEST_PAYLOAD_SIZE)) + .labels(methodName, code.name(), mId).observe(reqPayloadSize); + } else { + ((Histogram) metricsMap.get(REQUEST_PAYLOAD_SIZE)) + .labels(methodName, code.name()).observe(reqPayloadSize); + } } if (respPayloadSize != -1) { - ((Histogram) metricsMap.get(RESPONSE_PAYLOAD_SIZE)) - .labels(methodName, code.name()).observe(respPayloadSize); + if (enablePerModelMetrics) { + ((Histogram) metricsMap.get(RESPONSE_PAYLOAD_SIZE)) + .labels(methodName, code.name(), mId).observe(respPayloadSize); + } else { + ((Histogram) metricsMap.get(RESPONSE_PAYLOAD_SIZE)) + .labels(methodName, code.name()).observe(respPayloadSize); + } } } @@ -437,6 +483,11 @@ protected StatsDSender createSender(Callable addressLookup, int q + (shortNames ? "short" : "fully-qualified") + " method names"); } + @Override + public boolean isPerModelMetricsEnabled() { + return false; + } + @Override public boolean isEnabled() { return true; @@ -454,12 +505,12 @@ public void logTimingMetricSince(Metric metric, long prevTime, boolean isNano) { } @Override - public void logTimingMetricDuration(Metric metric, long elapsed, boolean isNano) { + public void logTimingMetricDuration(Metric metric, long elapsed, boolean isNano, String modelId) { client.recordExecutionTime(name(metric), isNano ? elapsed / M : elapsed); } @Override - public void logSizeEventMetric(Metric metric, long value) { + public void logSizeEventMetric(Metric metric, long value, String modelId) { if (!legacy) { value *= metric.newMultiplier; } @@ -497,7 +548,7 @@ static String[] getOkTags(String method, boolean shortName) { @Override public void logRequestMetrics(boolean external, String name, long elapsedNanos, Code code, - int reqPayloadSize, int respPayloadSize) { + int reqPayloadSize, int respPayloadSize, String modelId, String vModelId) { final StatsDClient client = this.client; final long elapsedMillis = elapsedNanos / M; final String countName = name(external ? API_REQUEST_COUNT : INVOKE_MODEL_COUNT); diff --git a/src/main/java/com/ibm/watson/modelmesh/ModelMesh.java b/src/main/java/com/ibm/watson/modelmesh/ModelMesh.java index 9755df49..53b7b918 100644 --- a/src/main/java/com/ibm/watson/modelmesh/ModelMesh.java +++ b/src/main/java/com/ibm/watson/modelmesh/ModelMesh.java @@ -1966,7 +1966,7 @@ final synchronized boolean doRemove(final boolean evicted, // "unload" event if explicit unloading isn't enabled. // Otherwise, this gets recorded in a callback set in the // CacheEntry.unload(int) method - metrics.logTimingMetricDuration(Metric.UNLOAD_MODEL_TIME, 0L, false); + metrics.logTimingMetricDuration(Metric.UNLOAD_MODEL_TIME, 0L, false, modelId); metrics.logCounterMetric(Metric.UNLOAD_MODEL); } } @@ -2037,7 +2037,7 @@ public void onSuccess(Boolean reallyHappened) { //TODO probably only log if took longer than a certain time long tookMillis = msSince(beforeNanos); logger.info("Unload of " + modelId + " completed in " + tookMillis + "ms"); - metrics.logTimingMetricDuration(Metric.UNLOAD_MODEL_TIME, tookMillis, false); + metrics.logTimingMetricDuration(Metric.UNLOAD_MODEL_TIME, tookMillis, false, modelId); metrics.logCounterMetric(Metric.UNLOAD_MODEL); } // else considered trivially succeeded because the corresponding @@ -2158,7 +2158,7 @@ public final void run() { long queueStartTimeNanos = getAndResetLoadingQueueStartTimeNanos(); if (queueStartTimeNanos > 0) { long queueDelayMillis = (nanoTime() - queueStartTimeNanos) / M; - metrics.logSizeEventMetric(Metric.LOAD_MODEL_QUEUE_DELAY, queueDelayMillis); + metrics.logSizeEventMetric(Metric.LOAD_MODEL_QUEUE_DELAY, queueDelayMillis, modelId); // Only log if the priority value is "in the future" which indicates // that there is or were runtime requests waiting for this load. // Otherwise we don't care about arbitrary delays here @@ -2228,7 +2228,7 @@ public final void run() { loadingTimeStats(modelType).recordTime(tookMillis); logger.info("Load of model " + modelId + " type=" + modelType + " completed in " + tookMillis + "ms"); - metrics.logTimingMetricDuration(Metric.LOAD_MODEL_TIME, tookMillis, false); + metrics.logTimingMetricDuration(Metric.LOAD_MODEL_TIME, tookMillis, false, modelId); metrics.logCounterMetric(Metric.LOAD_MODEL); } catch (Throwable t) { loadFuture = null; @@ -2388,7 +2388,7 @@ protected final void complete(LoadedRuntime result, Throwable error) { if (size > 0) { long sizeBytes = size * UNIT_SIZE; logger.info("Model " + modelId + " size = " + size + " units" + ", ~" + mb(sizeBytes)); - metrics.logSizeEventMetric(Metric.LOADED_MODEL_SIZE, sizeBytes); + metrics.logSizeEventMetric(Metric.LOADED_MODEL_SIZE, sizeBytes, modelId); } else { try { long before = nanoTime(); @@ -2397,9 +2397,9 @@ protected final void complete(LoadedRuntime result, Throwable error) { long took = msSince(before), sizeBytes = size * UNIT_SIZE; logger.info("Model " + modelId + " size = " + size + " units" + ", ~" + mb(sizeBytes) + " sizing took " + took + "ms"); - metrics.logTimingMetricDuration(Metric.MODEL_SIZING_TIME, took, false); + metrics.logTimingMetricDuration(Metric.MODEL_SIZING_TIME, took, false, modelId); // this is actually a size (bytes), not a "time" - metrics.logSizeEventMetric(Metric.LOADED_MODEL_SIZE, sizeBytes); + metrics.logSizeEventMetric(Metric.LOADED_MODEL_SIZE, sizeBytes, modelId); } } catch (Exception e) { if (!isInterruption(e) && state == SIZING) { @@ -2722,7 +2722,7 @@ protected void beforeInvoke(int requestWeight) //noinspection ThrowFromFinallyBlock throw new ModelNotHereException(instanceId, modelId); } - metrics.logTimingMetricDuration(Metric.QUEUE_DELAY, tookMillis, false); + metrics.logTimingMetricDuration(Metric.QUEUE_DELAY, tookMillis, false, modelId); } } } @@ -2901,7 +2901,7 @@ public void onEviction(String key, CacheEntry ce, long lastUsed) { logger.info("Evicted " + (failed ? "failed model record" : "model") + " " + key + " from local cache, last used " + readableTime(millisSinceLastUsed) + " ago (" + lastUsed + "ms), invoked " + ce.getTotalInvocationCount() + " times"); - metrics.logTimingMetricDuration(Metric.AGE_AT_EVICTION, millisSinceLastUsed, false); + metrics.logTimingMetricDuration(Metric.AGE_AT_EVICTION, millisSinceLastUsed, false, ce.modelId); metrics.logCounterMetric(Metric.EVICT_MODEL); } @@ -3989,9 +3989,10 @@ else if (mr.getInstanceIds().containsKey(instanceId)) { throw t; } finally { if (methodStartNanos > 0L && metrics.isEnabled()) { + String[] extraLabels = new String[]{modelId}; // only logged here in non-grpc (legacy) mode metrics.logRequestMetrics(true, getRequestMethodName(method, args), - nanoTime() - methodStartNanos, metricStatusCode, -1, -1); + nanoTime() - methodStartNanos, metricStatusCode, -1, -1, modelId, ""); } curThread.setName(threadNameBefore); } @@ -4450,7 +4451,7 @@ private Object invokeLocalModel(CacheEntry ce, Method method, Object[] args) long delayMillis = msSince(beforeNanos); logger.info("Cache miss for model invocation, held up " + delayMillis + "ms"); metrics.logCounterMetric(Metric.CACHE_MISS); - metrics.logTimingMetricDuration(Metric.CACHE_MISS_DELAY, delayMillis, false); + metrics.logTimingMetricDuration(Metric.CACHE_MISS_DELAY, delayMillis, false, ce.modelId); } } } else { @@ -4528,7 +4529,7 @@ private Object invokeLocalModel(CacheEntry ce, Method method, Object[] args) ce.afterInvoke(weight, tookNanos); if (code != null && metrics.isEnabled()) { metrics.logRequestMetrics(false, getRequestMethodName(method, args), - tookNanos, code, -1, -1); + tookNanos, code, -1, -1, ce.modelId, ""); } } } diff --git a/src/main/java/com/ibm/watson/modelmesh/ModelMeshApi.java b/src/main/java/com/ibm/watson/modelmesh/ModelMeshApi.java index 6f3f8202..8705d4a5 100644 --- a/src/main/java/com/ibm/watson/modelmesh/ModelMeshApi.java +++ b/src/main/java/com/ibm/watson/modelmesh/ModelMeshApi.java @@ -87,6 +87,7 @@ import io.netty.util.ReferenceCountUtil; import io.netty.util.concurrent.FastThreadLocalThread; import org.apache.thrift.TException; +import org.checkerframework.checker.units.qual.A; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.MDC; @@ -786,8 +787,13 @@ public void onHalfClose() { call.close(status, emptyMeta()); Metrics metrics = delegate.metrics; if (metrics.isEnabled()) { - metrics.logRequestMetrics(true, methodName, nanoTime() - startNanos, - status.getCode(), reqSize, respSize); + if (isVModel) { + metrics.logRequestMetrics(true, methodName, nanoTime() - startNanos, + status.getCode(), reqSize, respSize, "", Iterables.toString(modelIds)); + } else { + metrics.logRequestMetrics(true, methodName, nanoTime() - startNanos, + status.getCode(), reqSize, respSize, Iterables.toString(modelIds), ""); + } } } } diff --git a/src/main/java/com/ibm/watson/prometheus/SimpleCollector.java b/src/main/java/com/ibm/watson/prometheus/SimpleCollector.java index ffca070b..c7b25c1f 100644 --- a/src/main/java/com/ibm/watson/prometheus/SimpleCollector.java +++ b/src/main/java/com/ibm/watson/prometheus/SimpleCollector.java @@ -161,7 +161,7 @@ private static int nextIdx(int i, int len) { private void validateCount(int count) { if (count != labelCount) { - throw new IllegalArgumentException("Incorrect number of labels."); + throw new IllegalArgumentException("Incorrect number of labels. Expected: " + labelCount + ", got: " + count); } } diff --git a/src/test/java/com/ibm/watson/modelmesh/ModelMeshMetricsTest.java b/src/test/java/com/ibm/watson/modelmesh/ModelMeshMetricsTest.java index dc6ee35e..4ca4f05e 100644 --- a/src/test/java/com/ibm/watson/modelmesh/ModelMeshMetricsTest.java +++ b/src/test/java/com/ibm/watson/modelmesh/ModelMeshMetricsTest.java @@ -32,6 +32,7 @@ import io.grpc.ManagedChannel; import io.grpc.netty.NettyChannelBuilder; import io.netty.handler.ssl.util.InsecureTrustManagerFactory; +import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import javax.net.ssl.SSLContext; @@ -76,10 +77,11 @@ protected int requestCount() { @Override protected Map extraEnvVars() { - return ImmutableMap.of("MM_METRICS", "prometheus:port=" + METRICS_PORT + ";scheme=" + SCHEME); + return ImmutableMap.of("MM_METRICS", "prometheus:port=" + METRICS_PORT + ";scheme=" + SCHEME + + ";per_model_metrics=true"); } - @Test + @BeforeAll public void metricsTest() throws Exception { ManagedChannel channel = NettyChannelBuilder.forAddress("localhost", 9000).usePlaintext().build(); @@ -150,8 +152,9 @@ public void metricsTest() throws Exception { channel.shutdown(); } } + protected Map metrics; - public void verifyMetrics() throws Exception { + protected void prepareMetrics() throws Exception { // Insecure trust manager - skip TLS verification SSLContext sslContext = SSLContext.getInstance("TLS"); sslContext.init(null, InsecureTrustManagerFactory.INSTANCE.getTrustManagers(), null); @@ -168,33 +171,40 @@ public void verifyMetrics() throws Exception { final Pattern line = Pattern.compile("([^\\s{]+(?:\\{.+\\})?)\\s+(\\S+)"); - Map metrics = resp.body().filter(s -> !s.startsWith("#")).map(s -> line.matcher(s)) + metrics = resp.body().filter(s -> !s.startsWith("#")).map(s -> line.matcher(s)) .filter(Matcher::matches) .collect(Collectors.toMap(m -> m.group(1), m -> Double.parseDouble(m.group(2)))); + } + + @Test + public void verifyMetrics() throws Exception { + // Insecure trust manager - skip TLS verification + prepareMetrics(); + System.out.println(metrics.size() + " metrics scraped"); // Spot check some expected metrics and values // External response time should all be < 2000ms (includes cache hit loading time) - assertEquals(40.0, metrics.get("modelmesh_api_request_milliseconds_bucket{method=\"predict\",code=\"OK\",le=\"2000.0\",}")); + assertEquals(40.0, metrics.get("modelmesh_api_request_milliseconds_bucket{method=\"predict\",code=\"OK\",modelId=\"\",le=\"2000.0\",}")); // External response time should all be < 200ms (includes cache hit loading time) - assertEquals(40.0, metrics.get("modelmesh_invoke_model_milliseconds_bucket{method=\"predict\",code=\"OK\",le=\"200.0\",}")); + assertEquals(40.0, + metrics.get("modelmesh_invoke_model_milliseconds_bucket{method=\"predict\",code=\"OK\",modelId=\"\",le=\"120000.0\",}")); // Simulated model sizing time is < 200ms - assertEquals(1.0, metrics.get("modelmesh_model_sizing_milliseconds_bucket{le=\"200.0\",}")); + assertEquals(1.0, metrics.get("modelmesh_model_sizing_milliseconds_bucket{modelId=\"myModel\",le=\"60000.0\",}")); // Simulated model sizing time is > 50ms - assertEquals(0.0, metrics.get("modelmesh_model_sizing_milliseconds_bucket{le=\"50.0\",}")); + assertEquals(0.0, metrics.get("modelmesh_model_sizing_milliseconds_bucket{modelId=\"myModel\",le=\"50.0\",}")); // Simulated model size is between 64MiB and 256MiB - assertEquals(0.0, metrics.get("modelmesh_loaded_model_size_bytes_bucket{le=\"6.7108864E7\",}")); - assertEquals(1.0, metrics.get("modelmesh_loaded_model_size_bytes_bucket{le=\"2.68435456E8\",}")); + assertEquals(0.0, metrics.get("modelmesh_loaded_model_size_bytes_bucket{modelId=\"myModel\",le=\"6.7108864E7\",}")); + assertEquals(1.0, metrics.get("modelmesh_loaded_model_size_bytes_bucket{modelId=\"myModel\",le=\"2.68435456E8\",}")); // One model is loaded - assertEquals(1.0, metrics.get("modelmesh_models_loaded_total")); assertEquals(1.0, metrics.get("modelmesh_instance_models_total")); // Histogram counts should reflect the two payload sizes (30 small, 10 large) - assertEquals(30.0, metrics.get("modelmesh_request_size_bytes_bucket{method=\"predict\",code=\"OK\",le=\"128.0\",}")); - assertEquals(40.0, metrics.get("modelmesh_request_size_bytes_bucket{method=\"predict\",code=\"OK\",le=\"2097152.0\",}")); - assertEquals(30.0, metrics.get("modelmesh_response_size_bytes_bucket{method=\"predict\",code=\"OK\",le=\"128.0\",}")); - assertEquals(40.0, metrics.get("modelmesh_response_size_bytes_bucket{method=\"predict\",code=\"OK\",le=\"2097152.0\",}")); + assertEquals(30.0, metrics.get("modelmesh_request_size_bytes_bucket{method=\"predict\",code=\"OK\",modelId=\"\",le=\"128.0\",}")); + assertEquals(40.0, metrics.get("modelmesh_request_size_bytes_bucket{method=\"predict\",code=\"OK\",modelId=\"\",le=\"2097152.0\",}")); + assertEquals(30.0, metrics.get("modelmesh_response_size_bytes_bucket{method=\"predict\",code=\"OK\",modelId=\"\",le=\"128.0\",}")); + assertEquals(40.0, metrics.get("modelmesh_response_size_bytes_bucket{method=\"predict\",code=\"OK\",modelId=\"\",le=\"2097152.0\",}")); // Memory metrics assertTrue(metrics.containsKey("netty_pool_mem_allocated_bytes{area=\"direct\",}")); From 33829a2e1f4c2b06f789cbd175b2e1a6a32aef79 Mon Sep 17 00:00:00 2001 From: Vedant Mahabaleshwarkar Date: Fri, 2 Jun 2023 03:32:19 -0400 Subject: [PATCH 2/6] addressing PR comments Signed-off-by: Vedant Mahabaleshwarkar --- .../com/ibm/watson/modelmesh/Metrics.java | 28 ++++++++----- .../com/ibm/watson/modelmesh/ModelMesh.java | 40 ++++++++++++------- .../ibm/watson/modelmesh/ModelMeshApi.java | 35 +++++++++++----- .../watson/modelmesh/SidecarModelMesh.java | 6 +-- .../ibm/watson/modelmesh/DummyModelMesh.java | 2 +- .../modelmesh/ModelMeshMetricsTest.java | 28 ++++++------- 6 files changed, 85 insertions(+), 54 deletions(-) diff --git a/src/main/java/com/ibm/watson/modelmesh/Metrics.java b/src/main/java/com/ibm/watson/modelmesh/Metrics.java index 5afc9491..2a1be8c5 100644 --- a/src/main/java/com/ibm/watson/modelmesh/Metrics.java +++ b/src/main/java/com/ibm/watson/modelmesh/Metrics.java @@ -36,7 +36,6 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import java.lang.reflect.Array; import java.net.SocketAddress; import java.nio.channels.DatagramChannel; import java.util.Collections; @@ -238,13 +237,13 @@ public PrometheusMetrics(Map params, Map infoMet if (m == API_REQUEST_TIME || m == API_REQUEST_COUNT || m == INVOKE_MODEL_TIME || m == INVOKE_MODEL_COUNT || m == REQUEST_PAYLOAD_SIZE || m == RESPONSE_PAYLOAD_SIZE) { - if (this.enablePerModelMetrics && m.type != COUNTER_WITH_HISTO) { - builder.labelNames("method", "code", "modelId"); + if (this.enablePerModelMetrics) { + builder.labelNames("method", "code", "modelId", "vModelId"); } else { builder.labelNames("method", "code"); } } else if (this.enablePerModelMetrics && m.type != GAUGE && m.type != COUNTER && m.type != COUNTER_WITH_HISTO) { - builder.labelNames("modelId"); + builder.labelNames("modelId", "vModelId"); } Collector collector = builder.name(m.promName).help(m.description).create(); metricsMap.put(m, collector); @@ -369,8 +368,8 @@ public void logTimingMetricSince(Metric metric, long prevTime, boolean isNano) { @Override public void logTimingMetricDuration(Metric metric, long elapsed, boolean isNano, String modelId) { - if (enablePerModelMetrics) { - ((Histogram) metricsMap.get(metric)).labels(modelId).observe(isNano ? elapsed / M : elapsed); + if (enablePerModelMetrics && modelId != null) { + ((Histogram) metricsMap.get(metric)).labels(modelId, "").observe(isNano ? elapsed / M : elapsed); } else { ((Histogram) metricsMap.get(metric)).observe(isNano ? elapsed / M : elapsed); } @@ -379,7 +378,7 @@ public void logTimingMetricDuration(Metric metric, long elapsed, boolean isNano, @Override public void logSizeEventMetric(Metric metric, long value, String modelId) { if (enablePerModelMetrics) { - ((Histogram) metricsMap.get(metric)).labels(modelId).observe(value * metric.newMultiplier); + ((Histogram) metricsMap.get(metric)).labels(modelId, "").observe(value * metric.newMultiplier); } else { ((Histogram) metricsMap.get(metric)).observe(value * metric.newMultiplier); } @@ -403,18 +402,25 @@ public void logRequestMetrics(boolean external, String name, long elapsedNanos, final long elapsedMillis = elapsedNanos / M; final Histogram timingHisto = (Histogram) metricsMap .get(external ? API_REQUEST_TIME : INVOKE_MODEL_TIME); - String mId = vModelId == null ? modelId : vModelId; int idx = shortNames ? name.indexOf('/') : -1; String methodName = idx == -1 ? name : name.substring(idx + 1); + if (modelId == null) { + logger.error("invalid ModelId. Label value for ModelId will be left blank"); + modelId = ""; + } + if (vModelId == null) { + logger.debug("vModelId is empty, creating empty label"); + vModelId = ""; + } if (enablePerModelMetrics) { - timingHisto.labels(methodName, code.name(), mId).observe(elapsedMillis); + timingHisto.labels(methodName, code.name(), modelId, vModelId).observe(elapsedMillis); } else { timingHisto.labels(methodName, code.name()).observe(elapsedMillis); } if (reqPayloadSize != -1) { if (enablePerModelMetrics) { ((Histogram) metricsMap.get(REQUEST_PAYLOAD_SIZE)) - .labels(methodName, code.name(), mId).observe(reqPayloadSize); + .labels(methodName, code.name(), modelId, vModelId).observe(reqPayloadSize); } else { ((Histogram) metricsMap.get(REQUEST_PAYLOAD_SIZE)) .labels(methodName, code.name()).observe(reqPayloadSize); @@ -423,7 +429,7 @@ public void logRequestMetrics(boolean external, String name, long elapsedNanos, if (respPayloadSize != -1) { if (enablePerModelMetrics) { ((Histogram) metricsMap.get(RESPONSE_PAYLOAD_SIZE)) - .labels(methodName, code.name(), mId).observe(respPayloadSize); + .labels(methodName, code.name(), modelId, vModelId).observe(respPayloadSize); } else { ((Histogram) metricsMap.get(RESPONSE_PAYLOAD_SIZE)) .labels(methodName, code.name()).observe(respPayloadSize); diff --git a/src/main/java/com/ibm/watson/modelmesh/ModelMesh.java b/src/main/java/com/ibm/watson/modelmesh/ModelMesh.java index 53b7b918..723cab00 100644 --- a/src/main/java/com/ibm/watson/modelmesh/ModelMesh.java +++ b/src/main/java/com/ibm/watson/modelmesh/ModelMesh.java @@ -3315,6 +3315,7 @@ protected Map getMap(Object[] arr) { static final String KNOWN_SIZE_CXT_KEY = "tas.known_size"; static final String UNBALANCED_KEY = "mmesh.unbalanced"; static final String DEST_INST_ID_KEY = "tas.dest_iid"; + static final String VMODELID = "vmodelid"; // these are the possible values for the tas.internal context parameter // it won't be set on requests from outside of the cluster, and will @@ -3347,8 +3348,8 @@ public StatusInfo internalOperation(String modelId, boolean returnStatus, boolea List excludeInstances) throws ModelNotFoundException, ModelLoadException, ModelNotHereException, InternalException { try { - return (StatusInfo) invokeModel(modelId, null, internalOpRemoteMeth, - returnStatus, load, sync, lastUsed, excludeInstances); // <-- "args" + return (StatusInfo) invokeModel(modelId, false, null, + internalOpRemoteMeth, returnStatus, load, sync, lastUsed, excludeInstances); // <-- "args" } catch (ModelNotFoundException | ModelLoadException | ModelNotHereException | InternalException e) { throw e; } catch (TException e) { @@ -3416,8 +3417,8 @@ public StatusInfo internalOperation(String modelId, boolean returnStatus, boolea * @throws TException */ @SuppressWarnings("unchecked") - protected Object invokeModel(final String modelId, final Method method, final Method remoteMeth, - final Object... args) throws ModelNotFoundException, ModelNotHereException, ModelLoadException, TException { + protected Object invokeModel(final String modelId, Boolean isVModel, final Method method, + final Method remoteMeth, final Object... args) throws ModelNotFoundException, ModelNotHereException, ModelLoadException, TException { //verify parameter values if (modelId == null || modelId.isEmpty()) { @@ -3430,6 +3431,10 @@ protected Object invokeModel(final String modelId, final Method method, final Me } final String tasInternal = contextMap.get(TAS_INTERNAL_CXT_KEY); + String vModelId = ""; + if (isVModel) { + vModelId = contextMap.get(VMODELID); + } // Set the external request flag if it's not a tasInternal call or if // tasInternal == INTERNAL_REQ. The latter is a new ensureLoaded // invocation originating from within the cluster. @@ -3502,7 +3507,7 @@ protected Object invokeModel(final String modelId, final Method method, final Me throw new ModelNotHereException(instanceId, modelId); } try { - return invokeLocalModel(ce, method, args, modelId); + return invokeLocalModel(ce, method, args, modelId, isVModel); } catch (ModelLoadException mle) { mr = registry.get(modelId); if (mr == null || !mr.loadFailedInInstance(instanceId)) { @@ -3716,7 +3721,7 @@ protected Object invokeModel(final String modelId, final Method method, final Me localInvokesInFlight.incrementAndGet(); } try { - Object result = invokeLocalModel(cacheEntry, method, args, modelId); + Object result = invokeLocalModel(cacheEntry, method, args, modelId, isVModel); return method == null && externalReq ? updateWithModelCopyInfo(result, mr) : result; } finally { if (!favourSelfForHits) { @@ -3936,7 +3941,7 @@ else if (mr.getInstanceIds().containsKey(instanceId)) { // invoke model try { - Object result = invokeLocalModel(cacheEntry, method, args, modelId); + Object result = invokeLocalModel(cacheEntry, method, args, modelId, isVModel); return method == null && externalReq ? updateWithModelCopyInfo(result, mr) : result; } catch (ModelNotHereException e) { if (loadTargetFilter != null) loadTargetFilter.remove(instanceId); @@ -3989,10 +3994,9 @@ else if (mr.getInstanceIds().containsKey(instanceId)) { throw t; } finally { if (methodStartNanos > 0L && metrics.isEnabled()) { - String[] extraLabels = new String[]{modelId}; // only logged here in non-grpc (legacy) mode metrics.logRequestMetrics(true, getRequestMethodName(method, args), - nanoTime() - methodStartNanos, metricStatusCode, -1, -1, modelId, ""); + nanoTime() - methodStartNanos, metricStatusCode, -1, -1, modelId, vModelId); } curThread.setName(threadNameBefore); } @@ -4122,13 +4126,15 @@ private Map filterIfReadOnly(Map instId) { * instances inside and some out, and a request has been sent from outside the * cluster to an instance inside (since it may land on an unintended instance in * that case). - * + * @param isVModel TODO + * @throws TException TODO * @throws ModelNotHereException if the specified destination instance isn't found */ protected Object forwardInvokeModel(String destId, String modelId, Method remoteMeth, Object... args) throws TException { destinationInstance.set(destId); try { + //TODO: not sure what is happening here.. do I need to pass vmodelid to the remoteMeth.invoke? return remoteMeth.invoke(directClient, ObjectArrays.concat(modelId, args)); } catch (Exception e) { if (e instanceof InvocationTargetException) { @@ -4404,9 +4410,9 @@ protected Object invokeRemoteModel(BaseModelMeshService.Iface client, Method met return remoteMeth.invoke(client, ObjectArrays.concat(modelId, args)); } - protected Object invokeLocalModel(CacheEntry ce, Method method, Object[] args, String modelId) + protected Object invokeLocalModel(CacheEntry ce, Method method, Object[] args, String modelId, Boolean isVModel) throws InterruptedException, TException { - Object result = invokeLocalModel(ce, method, args); + Object result = invokeLocalModel(ce, method, false, args); // if this is an ensure-loaded request, check-for and trigger a "chained" load if necessary if (method == null) { triggerChainedLoadIfNecessary(modelId, result, args, ce.getWeight(), null); @@ -4414,7 +4420,7 @@ protected Object invokeLocalModel(CacheEntry ce, Method method, Object[] args return result; } - private Object invokeLocalModel(CacheEntry ce, Method method, Object[] args) + private Object invokeLocalModel(CacheEntry ce, Method method, Boolean isVModel, Object[] args) throws InterruptedException, TException { if (method == null) { @@ -4429,7 +4435,11 @@ private Object invokeLocalModel(CacheEntry ce, Method method, Object[] args) long now = currentTimeMillis(); ce.upgradePriority(now + 3600_000L, now + 7200_000L); // (2 hours in future) } - + Map contextMap = ThreadContext.getCurrentContext(); + String vModelId = null; + if (isVModel) { + vModelId = contextMap.get(VMODELID); + } // The future-waiting timeouts should not be needed, request threads are interrupted when their // timeouts/deadlines expire, and the model loading thread that it waits for has its own timeout. // But we still set a large one as a safeguard (there can be pathalogical cases where model-loading @@ -4529,7 +4539,7 @@ private Object invokeLocalModel(CacheEntry ce, Method method, Object[] args) ce.afterInvoke(weight, tookNanos); if (code != null && metrics.isEnabled()) { metrics.logRequestMetrics(false, getRequestMethodName(method, args), - tookNanos, code, -1, -1, ce.modelId, ""); + tookNanos, code, -1, -1, ce.modelId, vModelId); } } } diff --git a/src/main/java/com/ibm/watson/modelmesh/ModelMeshApi.java b/src/main/java/com/ibm/watson/modelmesh/ModelMeshApi.java index 8705d4a5..5bff237c 100644 --- a/src/main/java/com/ibm/watson/modelmesh/ModelMeshApi.java +++ b/src/main/java/com/ibm/watson/modelmesh/ModelMeshApi.java @@ -87,7 +87,6 @@ import io.netty.util.ReferenceCountUtil; import io.netty.util.concurrent.FastThreadLocalThread; import org.apache.thrift.TException; -import org.checkerframework.checker.units.qual.A; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.MDC; @@ -345,6 +344,10 @@ protected static void setUnbalancedLitelinksContextParam() { ThreadContext.addContextEntry(ModelMesh.UNBALANCED_KEY, "true"); // unbalanced } + protected static void setvModelIdLiteLinksContextParam(String vModelId) { + ThreadContext.addContextEntry(ModelMesh.VMODELID, vModelId); + } + // ----------------- concrete model management methods @Override @@ -438,18 +441,19 @@ ModelResponse callModel(String modelId, boolean isVModel, String methodName, Str if (unbalanced) { setUnbalancedLitelinksContextParam(); } - return delegate.callModel(modelId, methodName, headers, data); + return delegate.callModel(modelId, isVModel, methodName, headers, data); } String vModelId = modelId; modelId = null; boolean first = true; while (true) { modelId = vmm().resolveVModelId(vModelId, modelId); + setvModelIdLiteLinksContextParam(vModelId); if (unbalanced) { setUnbalancedLitelinksContextParam(); } try { - return delegate.callModel(modelId, methodName, headers, data); + return delegate.callModel(modelId, true, methodName, headers, data); } catch (ModelNotFoundException mnfe) { if (!first) throw mnfe; } catch (Exception e) { @@ -787,12 +791,25 @@ public void onHalfClose() { call.close(status, emptyMeta()); Metrics metrics = delegate.metrics; if (metrics.isEnabled()) { - if (isVModel) { - metrics.logRequestMetrics(true, methodName, nanoTime() - startNanos, - status.getCode(), reqSize, respSize, "", Iterables.toString(modelIds)); - } else { - metrics.logRequestMetrics(true, methodName, nanoTime() - startNanos, - status.getCode(), reqSize, respSize, Iterables.toString(modelIds), ""); + Iterator midIt = modelIds.iterator(); + while (midIt.hasNext()) { + if (isVModel) { + String mId = null; + String vmId = midIt.next(); + try { + mId = vmm().resolveVModelId(midIt.next(), mId); + metrics.logRequestMetrics(true, methodName, nanoTime() - startNanos, + status.getCode(), reqSize, respSize, mId, vmId); + } + catch (Exception e) { + logger.error("Could not resolve model id for vModelId" + vmId, e); + metrics.logRequestMetrics(true, methodName, nanoTime() - startNanos, + status.getCode(), reqSize, respSize, "", vmId); + } + } else { + metrics.logRequestMetrics(true, methodName, nanoTime() - startNanos, + status.getCode(), reqSize, respSize, midIt.next(), ""); + } } } } diff --git a/src/main/java/com/ibm/watson/modelmesh/SidecarModelMesh.java b/src/main/java/com/ibm/watson/modelmesh/SidecarModelMesh.java index b90f6865..5dac73c1 100644 --- a/src/main/java/com/ibm/watson/modelmesh/SidecarModelMesh.java +++ b/src/main/java/com/ibm/watson/modelmesh/SidecarModelMesh.java @@ -1098,12 +1098,12 @@ public List applyModelMulti(String modelId, List input, @SuppressWarnings("unchecked") List applyModel(String modelId, List input, Map metadata) throws TException { - return (List) invokeModel(modelId, localMeth, remoteMeth, input, metadata); + return (List) invokeModel(modelId, false, localMeth, remoteMeth, input, metadata); } // refcount of provided ByteBuf should not be modified - ModelResponse callModel(String modelId, String methodName, Metadata headers, ByteBuf data) throws TException { - return (ModelResponse) invokeModel(modelId, directLocalMeth, remoteMeth, methodName, headers, data); + ModelResponse callModel(String modelId, Boolean isVModel, String methodName, Metadata headers, ByteBuf data) throws TException { + return (ModelResponse) invokeModel(modelId, isVModel, directLocalMeth, remoteMeth, methodName, headers, data); } @Idempotent diff --git a/src/test/java/com/ibm/watson/modelmesh/DummyModelMesh.java b/src/test/java/com/ibm/watson/modelmesh/DummyModelMesh.java index 60b7c939..2791d338 100644 --- a/src/test/java/com/ibm/watson/modelmesh/DummyModelMesh.java +++ b/src/test/java/com/ibm/watson/modelmesh/DummyModelMesh.java @@ -75,7 +75,7 @@ protected ModelLoader getLoader() { @Override public ByteBuffer applyModel(String modelId, ByteBuffer input, Map metadata) throws TException { - return (ByteBuffer) invokeModel(modelId, localMeth, remoteMeth, input, metadata); + return (ByteBuffer) invokeModel(modelId, false, localMeth, remoteMeth, input, metadata); } @Override diff --git a/src/test/java/com/ibm/watson/modelmesh/ModelMeshMetricsTest.java b/src/test/java/com/ibm/watson/modelmesh/ModelMeshMetricsTest.java index 4ca4f05e..2d9633ec 100644 --- a/src/test/java/com/ibm/watson/modelmesh/ModelMeshMetricsTest.java +++ b/src/test/java/com/ibm/watson/modelmesh/ModelMeshMetricsTest.java @@ -152,13 +152,12 @@ public void metricsTest() throws Exception { channel.shutdown(); } } - protected Map metrics; - protected void prepareMetrics() throws Exception { + protected Map prepareMetrics() throws Exception { // Insecure trust manager - skip TLS verification SSLContext sslContext = SSLContext.getInstance("TLS"); sslContext.init(null, InsecureTrustManagerFactory.INSTANCE.getTrustManagers(), null); - + HttpClient client = HttpClient.newBuilder().sslContext(sslContext).build(); HttpRequest metricsRequest = HttpRequest.newBuilder() .uri(URI.create(SCHEME + "://localhost:" + METRICS_PORT + "/metrics")).build(); @@ -171,40 +170,39 @@ protected void prepareMetrics() throws Exception { final Pattern line = Pattern.compile("([^\\s{]+(?:\\{.+\\})?)\\s+(\\S+)"); - metrics = resp.body().filter(s -> !s.startsWith("#")).map(s -> line.matcher(s)) + Map metrics = resp.body().filter(s -> !s.startsWith("#")).map(s -> line.matcher(s)) .filter(Matcher::matches) .collect(Collectors.toMap(m -> m.group(1), m -> Double.parseDouble(m.group(2)))); + return metrics; } @Test public void verifyMetrics() throws Exception { // Insecure trust manager - skip TLS verification - prepareMetrics(); + Map metrics = prepareMetrics(); System.out.println(metrics.size() + " metrics scraped"); // Spot check some expected metrics and values // External response time should all be < 2000ms (includes cache hit loading time) - assertEquals(40.0, metrics.get("modelmesh_api_request_milliseconds_bucket{method=\"predict\",code=\"OK\",modelId=\"\",le=\"2000.0\",}")); + assertEquals(40.0, metrics.get("modelmesh_api_request_milliseconds_bucket{method=\"predict\",code=\"OK\",modelId=\"myModel\",vModelId=\"\",le=\"2000.0\",}")); // External response time should all be < 200ms (includes cache hit loading time) assertEquals(40.0, - metrics.get("modelmesh_invoke_model_milliseconds_bucket{method=\"predict\",code=\"OK\",modelId=\"\",le=\"120000.0\",}")); + metrics.get("modelmesh_invoke_model_milliseconds_bucket{method=\"predict\",code=\"OK\",modelId=\"myModel\",vmodelId=\"\",le=\"120000.0\",}")); // Simulated model sizing time is < 200ms - assertEquals(1.0, metrics.get("modelmesh_model_sizing_milliseconds_bucket{modelId=\"myModel\",le=\"60000.0\",}")); + assertEquals(1.0, metrics.get("modelmesh_model_sizing_milliseconds_bucket{modelId=\"myModel\",vmodelId=\"\",le=\"60000.0\",}")); // Simulated model sizing time is > 50ms - assertEquals(0.0, metrics.get("modelmesh_model_sizing_milliseconds_bucket{modelId=\"myModel\",le=\"50.0\",}")); + assertEquals(0.0, metrics.get("modelmesh_model_sizing_milliseconds_bucket{modelId=\"myModel\",vmodelId=\"\",le=\"50.0\",}")); // Simulated model size is between 64MiB and 256MiB - assertEquals(0.0, metrics.get("modelmesh_loaded_model_size_bytes_bucket{modelId=\"myModel\",le=\"6.7108864E7\",}")); - assertEquals(1.0, metrics.get("modelmesh_loaded_model_size_bytes_bucket{modelId=\"myModel\",le=\"2.68435456E8\",}")); + assertEquals(0.0, metrics.get("modelmesh_loaded_model_size_bytes_bucket{modelId=\"myModel\",vmodelId=\"\",le=\"6.7108864E7\",}")); + assertEquals(1.0, metrics.get("modelmesh_loaded_model_size_bytes_bucket{modelId=\"myModel\",vmodelId=\"\",le=\"2.68435456E8\",}")); // One model is loaded assertEquals(1.0, metrics.get("modelmesh_instance_models_total")); // Histogram counts should reflect the two payload sizes (30 small, 10 large) - assertEquals(30.0, metrics.get("modelmesh_request_size_bytes_bucket{method=\"predict\",code=\"OK\",modelId=\"\",le=\"128.0\",}")); - assertEquals(40.0, metrics.get("modelmesh_request_size_bytes_bucket{method=\"predict\",code=\"OK\",modelId=\"\",le=\"2097152.0\",}")); - assertEquals(30.0, metrics.get("modelmesh_response_size_bytes_bucket{method=\"predict\",code=\"OK\",modelId=\"\",le=\"128.0\",}")); - assertEquals(40.0, metrics.get("modelmesh_response_size_bytes_bucket{method=\"predict\",code=\"OK\",modelId=\"\",le=\"2097152.0\",}")); + assertEquals(30.0, metrics.get("modelmesh_request_size_bytes_bucket{method=\"predict\",code=\"OK\",modelId=\"myModel\",vmodelId=\"\",le=\"128.0\",}")); + assertEquals(40.0, metrics.get("modelmesh_request_size_bytes_bucket{method=\"predict\",code=\"OK\",modelId=\"myModel\",vmodelId=\"\",le=\"2097152.0\",}")); // Memory metrics assertTrue(metrics.containsKey("netty_pool_mem_allocated_bytes{area=\"direct\",}")); From d32b31a00f6d24476064ca3076f2dcb6efa9da72 Mon Sep 17 00:00:00 2001 From: Daniele Zonca Date: Mon, 3 Jul 2023 18:42:34 +0200 Subject: [PATCH 3/6] WIP, fix text + some changes Signed-off-by: Vedant Mahabaleshwarkar --- .../com/ibm/watson/modelmesh/Metrics.java | 7 +---- .../com/ibm/watson/modelmesh/ModelMesh.java | 29 +++++++------------ .../ibm/watson/modelmesh/ModelMeshApi.java | 15 +++++----- .../watson/modelmesh/SidecarModelMesh.java | 6 ++-- .../ibm/watson/modelmesh/VModelManager.java | 16 ++++++---- .../ibm/watson/modelmesh/DummyModelMesh.java | 2 +- .../modelmesh/ModelMeshMetricsTest.java | 14 ++++----- 7 files changed, 41 insertions(+), 48 deletions(-) diff --git a/src/main/java/com/ibm/watson/modelmesh/Metrics.java b/src/main/java/com/ibm/watson/modelmesh/Metrics.java index 2a1be8c5..715590f0 100644 --- a/src/main/java/com/ibm/watson/modelmesh/Metrics.java +++ b/src/main/java/com/ibm/watson/modelmesh/Metrics.java @@ -404,12 +404,7 @@ public void logRequestMetrics(boolean external, String name, long elapsedNanos, .get(external ? API_REQUEST_TIME : INVOKE_MODEL_TIME); int idx = shortNames ? name.indexOf('/') : -1; String methodName = idx == -1 ? name : name.substring(idx + 1); - if (modelId == null) { - logger.error("invalid ModelId. Label value for ModelId will be left blank"); - modelId = ""; - } - if (vModelId == null) { - logger.debug("vModelId is empty, creating empty label"); + if (enablePerModelMetrics && vModelId == null) { vModelId = ""; } if (enablePerModelMetrics) { diff --git a/src/main/java/com/ibm/watson/modelmesh/ModelMesh.java b/src/main/java/com/ibm/watson/modelmesh/ModelMesh.java index 723cab00..54a9ebe4 100644 --- a/src/main/java/com/ibm/watson/modelmesh/ModelMesh.java +++ b/src/main/java/com/ibm/watson/modelmesh/ModelMesh.java @@ -3348,7 +3348,7 @@ public StatusInfo internalOperation(String modelId, boolean returnStatus, boolea List excludeInstances) throws ModelNotFoundException, ModelLoadException, ModelNotHereException, InternalException { try { - return (StatusInfo) invokeModel(modelId, false, null, + return (StatusInfo) invokeModel(modelId, null, internalOpRemoteMeth, returnStatus, load, sync, lastUsed, excludeInstances); // <-- "args" } catch (ModelNotFoundException | ModelLoadException | ModelNotHereException | InternalException e) { throw e; @@ -3417,7 +3417,7 @@ public StatusInfo internalOperation(String modelId, boolean returnStatus, boolea * @throws TException */ @SuppressWarnings("unchecked") - protected Object invokeModel(final String modelId, Boolean isVModel, final Method method, + protected Object invokeModel(final String modelId, final Method method, final Method remoteMeth, final Object... args) throws ModelNotFoundException, ModelNotHereException, ModelLoadException, TException { //verify parameter values @@ -3431,10 +3431,7 @@ protected Object invokeModel(final String modelId, Boolean isVModel, final Metho } final String tasInternal = contextMap.get(TAS_INTERNAL_CXT_KEY); - String vModelId = ""; - if (isVModel) { - vModelId = contextMap.get(VMODELID); - } + String vModelId = contextMap.getOrDefault(VMODELID, ""); // Set the external request flag if it's not a tasInternal call or if // tasInternal == INTERNAL_REQ. The latter is a new ensureLoaded // invocation originating from within the cluster. @@ -3507,7 +3504,7 @@ protected Object invokeModel(final String modelId, Boolean isVModel, final Metho throw new ModelNotHereException(instanceId, modelId); } try { - return invokeLocalModel(ce, method, args, modelId, isVModel); + return invokeLocalModel(ce, method, args, modelId); } catch (ModelLoadException mle) { mr = registry.get(modelId); if (mr == null || !mr.loadFailedInInstance(instanceId)) { @@ -3721,7 +3718,7 @@ protected Object invokeModel(final String modelId, Boolean isVModel, final Metho localInvokesInFlight.incrementAndGet(); } try { - Object result = invokeLocalModel(cacheEntry, method, args, modelId, isVModel); + Object result = invokeLocalModel(cacheEntry, method, args, modelId); return method == null && externalReq ? updateWithModelCopyInfo(result, mr) : result; } finally { if (!favourSelfForHits) { @@ -3941,7 +3938,7 @@ else if (mr.getInstanceIds().containsKey(instanceId)) { // invoke model try { - Object result = invokeLocalModel(cacheEntry, method, args, modelId, isVModel); + Object result = invokeLocalModel(cacheEntry, method, args, modelId); return method == null && externalReq ? updateWithModelCopyInfo(result, mr) : result; } catch (ModelNotHereException e) { if (loadTargetFilter != null) loadTargetFilter.remove(instanceId); @@ -4126,15 +4123,12 @@ private Map filterIfReadOnly(Map instId) { * instances inside and some out, and a request has been sent from outside the * cluster to an instance inside (since it may land on an unintended instance in * that case). - * @param isVModel TODO - * @throws TException TODO * @throws ModelNotHereException if the specified destination instance isn't found */ protected Object forwardInvokeModel(String destId, String modelId, Method remoteMeth, Object... args) throws TException { destinationInstance.set(destId); try { - //TODO: not sure what is happening here.. do I need to pass vmodelid to the remoteMeth.invoke? return remoteMeth.invoke(directClient, ObjectArrays.concat(modelId, args)); } catch (Exception e) { if (e instanceof InvocationTargetException) { @@ -4410,9 +4404,9 @@ protected Object invokeRemoteModel(BaseModelMeshService.Iface client, Method met return remoteMeth.invoke(client, ObjectArrays.concat(modelId, args)); } - protected Object invokeLocalModel(CacheEntry ce, Method method, Object[] args, String modelId, Boolean isVModel) + protected Object invokeLocalModel(CacheEntry ce, Method method, Object[] args, String modelId) throws InterruptedException, TException { - Object result = invokeLocalModel(ce, method, false, args); + Object result = invokeLocalModel(ce, method, args); // if this is an ensure-loaded request, check-for and trigger a "chained" load if necessary if (method == null) { triggerChainedLoadIfNecessary(modelId, result, args, ce.getWeight(), null); @@ -4420,7 +4414,7 @@ protected Object invokeLocalModel(CacheEntry ce, Method method, Object[] args return result; } - private Object invokeLocalModel(CacheEntry ce, Method method, Boolean isVModel, Object[] args) + private Object invokeLocalModel(CacheEntry ce, Method method, Object[] args) throws InterruptedException, TException { if (method == null) { @@ -4436,10 +4430,7 @@ private Object invokeLocalModel(CacheEntry ce, Method method, Boolean isVMode ce.upgradePriority(now + 3600_000L, now + 7200_000L); // (2 hours in future) } Map contextMap = ThreadContext.getCurrentContext(); - String vModelId = null; - if (isVModel) { - vModelId = contextMap.get(VMODELID); - } + String vModelId = contextMap.getOrDefault(VMODELID, ""); // The future-waiting timeouts should not be needed, request threads are interrupted when their // timeouts/deadlines expire, and the model loading thread that it waits for has its own timeout. // But we still set a large one as a safeguard (there can be pathalogical cases where model-loading diff --git a/src/main/java/com/ibm/watson/modelmesh/ModelMeshApi.java b/src/main/java/com/ibm/watson/modelmesh/ModelMeshApi.java index 5bff237c..1bfa1141 100644 --- a/src/main/java/com/ibm/watson/modelmesh/ModelMeshApi.java +++ b/src/main/java/com/ibm/watson/modelmesh/ModelMeshApi.java @@ -434,26 +434,27 @@ public void ensureLoaded(EnsureLoadedRequest request, StreamObserver applyModelMulti(String modelId, List input, @SuppressWarnings("unchecked") List applyModel(String modelId, List input, Map metadata) throws TException { - return (List) invokeModel(modelId, false, localMeth, remoteMeth, input, metadata); + return (List) invokeModel(modelId, localMeth, remoteMeth, input, metadata); } // refcount of provided ByteBuf should not be modified - ModelResponse callModel(String modelId, Boolean isVModel, String methodName, Metadata headers, ByteBuf data) throws TException { - return (ModelResponse) invokeModel(modelId, isVModel, directLocalMeth, remoteMeth, methodName, headers, data); + ModelResponse callModel(String modelId, String methodName, Metadata headers, ByteBuf data) throws TException { + return (ModelResponse) invokeModel(modelId, directLocalMeth, remoteMeth, methodName, headers, data); } @Idempotent diff --git a/src/main/java/com/ibm/watson/modelmesh/VModelManager.java b/src/main/java/com/ibm/watson/modelmesh/VModelManager.java index d2706a16..c1867b3f 100644 --- a/src/main/java/com/ibm/watson/modelmesh/VModelManager.java +++ b/src/main/java/com/ibm/watson/modelmesh/VModelManager.java @@ -27,7 +27,6 @@ import com.ibm.watson.kvutils.KVTable.Helper.TableTxn; import com.ibm.watson.kvutils.KVTable.TableView; import com.ibm.watson.kvutils.factory.KVUtilsFactory; -import com.ibm.watson.litelinks.ThreadContext; import com.ibm.watson.litelinks.ThreadPoolHelper; import com.ibm.watson.modelmesh.GrpcSupport.InterruptingListener; import com.ibm.watson.modelmesh.api.ModelInfo; @@ -43,6 +42,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.Closeable; +import java.io.IOException; +import java.io.UncheckedIOException; import java.util.ArrayList; import java.util.HashSet; import java.util.Iterator; @@ -68,7 +70,7 @@ /** * This class contains logic related to VModels (virtual or versioned models) */ -public final class VModelManager implements AutoCloseable { +public final class VModelManager implements Closeable { private static final Logger logger = LoggerFactory.getLogger(VModelManager.class); @@ -124,9 +126,13 @@ public ListenableFuture start() { } @Override - public void close() throws Exception { - vModelTable.close(); - targetScaleupExecutor.shutdown(); + public void close() { + try { + vModelTable.close(); + targetScaleupExecutor.shutdown(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } } /** diff --git a/src/test/java/com/ibm/watson/modelmesh/DummyModelMesh.java b/src/test/java/com/ibm/watson/modelmesh/DummyModelMesh.java index 2791d338..60b7c939 100644 --- a/src/test/java/com/ibm/watson/modelmesh/DummyModelMesh.java +++ b/src/test/java/com/ibm/watson/modelmesh/DummyModelMesh.java @@ -75,7 +75,7 @@ protected ModelLoader getLoader() { @Override public ByteBuffer applyModel(String modelId, ByteBuffer input, Map metadata) throws TException { - return (ByteBuffer) invokeModel(modelId, false, localMeth, remoteMeth, input, metadata); + return (ByteBuffer) invokeModel(modelId, localMeth, remoteMeth, input, metadata); } @Override diff --git a/src/test/java/com/ibm/watson/modelmesh/ModelMeshMetricsTest.java b/src/test/java/com/ibm/watson/modelmesh/ModelMeshMetricsTest.java index 2d9633ec..a78cef1c 100644 --- a/src/test/java/com/ibm/watson/modelmesh/ModelMeshMetricsTest.java +++ b/src/test/java/com/ibm/watson/modelmesh/ModelMeshMetricsTest.java @@ -190,19 +190,19 @@ public void verifyMetrics() throws Exception { assertEquals(40.0, metrics.get("modelmesh_api_request_milliseconds_bucket{method=\"predict\",code=\"OK\",modelId=\"myModel\",vModelId=\"\",le=\"2000.0\",}")); // External response time should all be < 200ms (includes cache hit loading time) assertEquals(40.0, - metrics.get("modelmesh_invoke_model_milliseconds_bucket{method=\"predict\",code=\"OK\",modelId=\"myModel\",vmodelId=\"\",le=\"120000.0\",}")); + metrics.get("modelmesh_invoke_model_milliseconds_bucket{method=\"predict\",code=\"OK\",modelId=\"myModel\",vModelId=\"\",le=\"120000.0\",}")); // Simulated model sizing time is < 200ms - assertEquals(1.0, metrics.get("modelmesh_model_sizing_milliseconds_bucket{modelId=\"myModel\",vmodelId=\"\",le=\"60000.0\",}")); + assertEquals(1.0, metrics.get("modelmesh_model_sizing_milliseconds_bucket{modelId=\"myModel\",vModelId=\"\",le=\"60000.0\",}")); // Simulated model sizing time is > 50ms - assertEquals(0.0, metrics.get("modelmesh_model_sizing_milliseconds_bucket{modelId=\"myModel\",vmodelId=\"\",le=\"50.0\",}")); + assertEquals(0.0, metrics.get("modelmesh_model_sizing_milliseconds_bucket{modelId=\"myModel\",vModelId=\"\",le=\"50.0\",}")); // Simulated model size is between 64MiB and 256MiB - assertEquals(0.0, metrics.get("modelmesh_loaded_model_size_bytes_bucket{modelId=\"myModel\",vmodelId=\"\",le=\"6.7108864E7\",}")); - assertEquals(1.0, metrics.get("modelmesh_loaded_model_size_bytes_bucket{modelId=\"myModel\",vmodelId=\"\",le=\"2.68435456E8\",}")); + assertEquals(0.0, metrics.get("modelmesh_loaded_model_size_bytes_bucket{modelId=\"myModel\",vModelId=\"\",le=\"6.7108864E7\",}")); + assertEquals(1.0, metrics.get("modelmesh_loaded_model_size_bytes_bucket{modelId=\"myModel\",vModelId=\"\",le=\"2.68435456E8\",}")); // One model is loaded assertEquals(1.0, metrics.get("modelmesh_instance_models_total")); // Histogram counts should reflect the two payload sizes (30 small, 10 large) - assertEquals(30.0, metrics.get("modelmesh_request_size_bytes_bucket{method=\"predict\",code=\"OK\",modelId=\"myModel\",vmodelId=\"\",le=\"128.0\",}")); - assertEquals(40.0, metrics.get("modelmesh_request_size_bytes_bucket{method=\"predict\",code=\"OK\",modelId=\"myModel\",vmodelId=\"\",le=\"2097152.0\",}")); + assertEquals(30.0, metrics.get("modelmesh_request_size_bytes_bucket{method=\"predict\",code=\"OK\",modelId=\"myModel\",vModelId=\"\",le=\"128.0\",}")); + assertEquals(40.0, metrics.get("modelmesh_request_size_bytes_bucket{method=\"predict\",code=\"OK\",modelId=\"myModel\",vModelId=\"\",le=\"2097152.0\",}")); // Memory metrics assertTrue(metrics.containsKey("netty_pool_mem_allocated_bytes{area=\"direct\",}")); From 3410e04377ab2a5d9fb9380cfa3e0ae5e3928273 Mon Sep 17 00:00:00 2001 From: Vedant Mahabaleshwarkar Date: Mon, 3 Jul 2023 15:41:03 -0400 Subject: [PATCH 4/6] add a sanity check while getting key from contextmap Signed-off-by: Vedant Mahabaleshwarkar --- src/main/java/com/ibm/watson/modelmesh/ModelMesh.java | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/src/main/java/com/ibm/watson/modelmesh/ModelMesh.java b/src/main/java/com/ibm/watson/modelmesh/ModelMesh.java index 54a9ebe4..7422a087 100644 --- a/src/main/java/com/ibm/watson/modelmesh/ModelMesh.java +++ b/src/main/java/com/ibm/watson/modelmesh/ModelMesh.java @@ -4430,7 +4430,16 @@ private Object invokeLocalModel(CacheEntry ce, Method method, Object[] args) ce.upgradePriority(now + 3600_000L, now + 7200_000L); // (2 hours in future) } Map contextMap = ThreadContext.getCurrentContext(); - String vModelId = contextMap.getOrDefault(VMODELID, ""); + String vModelId = null; + // We might arrive here from a path where the original call was with a modelid. + // Hence, it is possible to arrive here with a null contextMap because the vModelId was never set + // To avoid catching a null pointer exception we just sanity check instead. + if (contextMap == null) { + vModelId = ""; + } else { + vModelId = contextMap.get(VMODELID); + } + // The future-waiting timeouts should not be needed, request threads are interrupted when their // timeouts/deadlines expire, and the model loading thread that it waits for has its own timeout. // But we still set a large one as a safeguard (there can be pathalogical cases where model-loading From 72e1c8e8f69b4562c513069dc1e1313c68dfe993 Mon Sep 17 00:00:00 2001 From: Vedant Mahabaleshwarkar Date: Thu, 20 Jul 2023 20:04:44 -0400 Subject: [PATCH 5/6] Address PR comments Signed-off-by: Vedant Mahabaleshwarkar --- .../com/ibm/watson/modelmesh/Metrics.java | 27 +++--- .../com/ibm/watson/modelmesh/ModelMesh.java | 6 +- .../ibm/watson/modelmesh/ModelMeshApi.java | 97 +++++++++++-------- .../ibm/watson/modelmesh/VModelManager.java | 12 +-- 4 files changed, 75 insertions(+), 67 deletions(-) diff --git a/src/main/java/com/ibm/watson/modelmesh/Metrics.java b/src/main/java/com/ibm/watson/modelmesh/Metrics.java index 715590f0..e8c5e623 100644 --- a/src/main/java/com/ibm/watson/modelmesh/Metrics.java +++ b/src/main/java/com/ibm/watson/modelmesh/Metrics.java @@ -64,6 +64,7 @@ interface Metrics extends AutoCloseable { boolean isPerModelMetricsEnabled(); boolean isEnabled(); + void logTimingMetricSince(Metric metric, long prevTime, boolean isNano); void logTimingMetricDuration(Metric metric, long elapsed, boolean isNano, String modelId); @@ -164,14 +165,14 @@ final class PrometheusMetrics implements Metrics { private final CollectorRegistry registry; private final NettyServer metricServer; private final boolean shortNames; - private final boolean enablePerModelMetrics; + private final boolean perModelMetricsEnabled; private final EnumMap metricsMap = new EnumMap<>(Metric.class); public PrometheusMetrics(Map params, Map infoMetricParams) throws Exception { int port = 2112; boolean shortNames = true; boolean https = true; - boolean enablePerModelMetrics = true; + boolean perModelMetricsEnabled= true; String memMetrics = "all"; // default to all for (Entry ent : params.entrySet()) { switch (ent.getKey()) { @@ -183,7 +184,7 @@ public PrometheusMetrics(Map params, Map infoMet } break; case "per_model_metrics": - enablePerModelMetrics = "true".equalsIgnoreCase(ent.getValue()); + perModelMetricsEnabled= "true".equalsIgnoreCase(ent.getValue()); break; case "fq_names": shortNames = !"true".equalsIgnoreCase(ent.getValue()); @@ -203,7 +204,7 @@ public PrometheusMetrics(Map params, Map infoMet throw new Exception("Unrecognized metrics config parameter: " + ent.getKey()); } } - this.enablePerModelMetrics = enablePerModelMetrics; + this.perModelMetricsEnabled= perModelMetricsEnabled; registry = new CollectorRegistry(); for (Metric m : Metric.values()) { @@ -237,12 +238,12 @@ public PrometheusMetrics(Map params, Map infoMet if (m == API_REQUEST_TIME || m == API_REQUEST_COUNT || m == INVOKE_MODEL_TIME || m == INVOKE_MODEL_COUNT || m == REQUEST_PAYLOAD_SIZE || m == RESPONSE_PAYLOAD_SIZE) { - if (this.enablePerModelMetrics) { + if (this.perModelMetricsEnabled) { builder.labelNames("method", "code", "modelId", "vModelId"); } else { builder.labelNames("method", "code"); } - } else if (this.enablePerModelMetrics && m.type != GAUGE && m.type != COUNTER && m.type != COUNTER_WITH_HISTO) { + } else if (this.perModelMetricsEnabled && m.type != GAUGE && m.type != COUNTER && m.type != COUNTER_WITH_HISTO) { builder.labelNames("modelId", "vModelId"); } Collector collector = builder.name(m.promName).help(m.description).create(); @@ -352,7 +353,7 @@ public void close() { @Override public boolean isPerModelMetricsEnabled() { - return enablePerModelMetrics; + return perModelMetricsEnabled; } @Override @@ -368,7 +369,7 @@ public void logTimingMetricSince(Metric metric, long prevTime, boolean isNano) { @Override public void logTimingMetricDuration(Metric metric, long elapsed, boolean isNano, String modelId) { - if (enablePerModelMetrics && modelId != null) { + if (perModelMetricsEnabled && modelId != null) { ((Histogram) metricsMap.get(metric)).labels(modelId, "").observe(isNano ? elapsed / M : elapsed); } else { ((Histogram) metricsMap.get(metric)).observe(isNano ? elapsed / M : elapsed); @@ -377,7 +378,7 @@ public void logTimingMetricDuration(Metric metric, long elapsed, boolean isNano, @Override public void logSizeEventMetric(Metric metric, long value, String modelId) { - if (enablePerModelMetrics) { + if (perModelMetricsEnabled) { ((Histogram) metricsMap.get(metric)).labels(modelId, "").observe(value * metric.newMultiplier); } else { ((Histogram) metricsMap.get(metric)).observe(value * metric.newMultiplier); @@ -404,16 +405,16 @@ public void logRequestMetrics(boolean external, String name, long elapsedNanos, .get(external ? API_REQUEST_TIME : INVOKE_MODEL_TIME); int idx = shortNames ? name.indexOf('/') : -1; String methodName = idx == -1 ? name : name.substring(idx + 1); - if (enablePerModelMetrics && vModelId == null) { + if (perModelMetricsEnabled&& vModelId == null) { vModelId = ""; } - if (enablePerModelMetrics) { + if (perModelMetricsEnabled) { timingHisto.labels(methodName, code.name(), modelId, vModelId).observe(elapsedMillis); } else { timingHisto.labels(methodName, code.name()).observe(elapsedMillis); } if (reqPayloadSize != -1) { - if (enablePerModelMetrics) { + if (perModelMetricsEnabled) { ((Histogram) metricsMap.get(REQUEST_PAYLOAD_SIZE)) .labels(methodName, code.name(), modelId, vModelId).observe(reqPayloadSize); } else { @@ -422,7 +423,7 @@ public void logRequestMetrics(boolean external, String name, long elapsedNanos, } } if (respPayloadSize != -1) { - if (enablePerModelMetrics) { + if (perModelMetricsEnabled) { ((Histogram) metricsMap.get(RESPONSE_PAYLOAD_SIZE)) .labels(methodName, code.name(), modelId, vModelId).observe(respPayloadSize); } else { diff --git a/src/main/java/com/ibm/watson/modelmesh/ModelMesh.java b/src/main/java/com/ibm/watson/modelmesh/ModelMesh.java index 7422a087..54716eae 100644 --- a/src/main/java/com/ibm/watson/modelmesh/ModelMesh.java +++ b/src/main/java/com/ibm/watson/modelmesh/ModelMesh.java @@ -3315,7 +3315,7 @@ protected Map getMap(Object[] arr) { static final String KNOWN_SIZE_CXT_KEY = "tas.known_size"; static final String UNBALANCED_KEY = "mmesh.unbalanced"; static final String DEST_INST_ID_KEY = "tas.dest_iid"; - static final String VMODELID = "vmodelid"; + static final String VMODEL_ID = "vmodelid"; // these are the possible values for the tas.internal context parameter // it won't be set on requests from outside of the cluster, and will @@ -3431,7 +3431,7 @@ protected Object invokeModel(final String modelId, final Method method, } final String tasInternal = contextMap.get(TAS_INTERNAL_CXT_KEY); - String vModelId = contextMap.getOrDefault(VMODELID, ""); + String vModelId = contextMap.getOrDefault(VMODEL_ID, ""); // Set the external request flag if it's not a tasInternal call or if // tasInternal == INTERNAL_REQ. The latter is a new ensureLoaded // invocation originating from within the cluster. @@ -4437,7 +4437,7 @@ private Object invokeLocalModel(CacheEntry ce, Method method, Object[] args) if (contextMap == null) { vModelId = ""; } else { - vModelId = contextMap.get(VMODELID); + vModelId = contextMap.get(VMODEL_ID); } // The future-waiting timeouts should not be needed, request threads are interrupted when their diff --git a/src/main/java/com/ibm/watson/modelmesh/ModelMeshApi.java b/src/main/java/com/ibm/watson/modelmesh/ModelMeshApi.java index 1bfa1141..335b9d79 100644 --- a/src/main/java/com/ibm/watson/modelmesh/ModelMeshApi.java +++ b/src/main/java/com/ibm/watson/modelmesh/ModelMeshApi.java @@ -30,6 +30,7 @@ import com.ibm.watson.litelinks.server.ReleaseAfterResponse; import com.ibm.watson.litelinks.server.ServerRequestThread; import com.ibm.watson.modelmesh.DataplaneApiConfig.RpcConfig; +import com.ibm.watson.modelmesh.GrpcSupport.InterruptingListener; import com.ibm.watson.modelmesh.ModelMesh.ExtendedStatusInfo; import com.ibm.watson.modelmesh.api.DeleteVModelRequest; import com.ibm.watson.modelmesh.api.DeleteVModelResponse; @@ -68,6 +69,7 @@ import io.grpc.ServerInterceptors; import io.grpc.ServerMethodDefinition; import io.grpc.ServerServiceDefinition; +import io.grpc.Status.Code; import io.grpc.StatusException; import io.grpc.StatusRuntimeException; import io.grpc.netty.GrpcSslContexts; @@ -345,7 +347,7 @@ protected static void setUnbalancedLitelinksContextParam() { } protected static void setvModelIdLiteLinksContextParam(String vModelId) { - ThreadContext.addContextEntry(ModelMesh.VMODELID, vModelId); + ThreadContext.addContextEntry(ModelMesh.VMODEL_ID, vModelId); } // ----------------- concrete model management methods @@ -434,38 +436,39 @@ public void ensureLoaded(EnsureLoadedRequest request, StreamObserver idList = new ArrayList<>(); idList.add(modelId); @@ -792,26 +807,19 @@ public void onHalfClose() { call.close(status, emptyMeta()); Metrics metrics = delegate.metrics; if (metrics.isEnabled()) { - Iterator midIt = modelIds.iterator(); - while (midIt.hasNext()) { + if (isSingleModelRequest && metrics.isPerModelMetricsEnabled() && modelId!=null) { if (isVModel) { - String mId = null; - String vmId = midIt.next(); - try { - mId = vmm().resolveVModelId(midIt.next(), mId); - metrics.logRequestMetrics(true, methodName, nanoTime() - startNanos, - status.getCode(), reqSize, respSize, mId, vmId); - } - catch (Exception e) { - logger.error("Could not resolve model id for vModelId" + vmId, e); - metrics.logRequestMetrics(true, methodName, nanoTime() - startNanos, - status.getCode(), reqSize, respSize, "", vmId); - } + metrics.logRequestMetrics(true, methodName, nanoTime() - startNanos, + status.getCode(), reqSize, respSize, resolvedModelId, modelId); } else { metrics.logRequestMetrics(true, methodName, nanoTime() - startNanos, - status.getCode(), reqSize, respSize, midIt.next(), ""); + status.getCode(), reqSize, respSize, modelId, ""); } } + else { + metrics.logRequestMetrics(true, methodName, nanoTime() - startNanos, + status.getCode(), reqSize, respSize, "", ""); + } } } } @@ -972,7 +980,10 @@ protected ModelResponse applyParallelMultiModel(List modelIds, boolean i logHeaders.addToMDC(headers); } // need to pass slices of the buffers for threadsafety - return callModel(modelId, isVModel, methodName, balancedMetaVal, headers, data.slice()); + if (isVModel) { + return callModel("", modelId, methodName, balancedMetaVal, headers, data.slice()); + } + return callModel(modelId, "", methodName, balancedMetaVal, headers, data.slice()); } catch (ModelNotFoundException mnfe) { logger.warn("model " + modelId + " not found (from supplied list of " + total + ")"); if (!requireAll) { diff --git a/src/main/java/com/ibm/watson/modelmesh/VModelManager.java b/src/main/java/com/ibm/watson/modelmesh/VModelManager.java index c1867b3f..058350d0 100644 --- a/src/main/java/com/ibm/watson/modelmesh/VModelManager.java +++ b/src/main/java/com/ibm/watson/modelmesh/VModelManager.java @@ -70,7 +70,7 @@ /** * This class contains logic related to VModels (virtual or versioned models) */ -public final class VModelManager implements Closeable { +public final class VModelManager implements AutoCloseable { private static final Logger logger = LoggerFactory.getLogger(VModelManager.class); @@ -126,13 +126,9 @@ public ListenableFuture start() { } @Override - public void close() { - try { - vModelTable.close(); - targetScaleupExecutor.shutdown(); - } catch (IOException e) { - throw new UncheckedIOException(e); - } + public void close() throws Exception { + vModelTable.close(); + targetScaleupExecutor.shutdown(); } /** From 37eed3185898bc1340a7f47d1182c668fdf04637 Mon Sep 17 00:00:00 2001 From: Nick Hill Date: Sun, 20 Aug 2023 08:00:10 -0700 Subject: [PATCH 6/6] Update per-model metric changes Use ThreadLocal to store resolved modelId in vModel case Revert/simplify a few things Signed-off-by: Nick Hill --- .../com/ibm/watson/modelmesh/Metrics.java | 37 +++--- .../com/ibm/watson/modelmesh/ModelMesh.java | 37 +++--- .../ibm/watson/modelmesh/ModelMeshApi.java | 118 ++++++++---------- .../ibm/watson/modelmesh/VModelManager.java | 3 - 4 files changed, 89 insertions(+), 106 deletions(-) diff --git a/src/main/java/com/ibm/watson/modelmesh/Metrics.java b/src/main/java/com/ibm/watson/modelmesh/Metrics.java index e8c5e623..7be788fe 100644 --- a/src/main/java/com/ibm/watson/modelmesh/Metrics.java +++ b/src/main/java/com/ibm/watson/modelmesh/Metrics.java @@ -16,6 +16,7 @@ package com.ibm.watson.modelmesh; +import com.google.common.base.Strings; import com.ibm.watson.prometheus.Counter; import com.ibm.watson.prometheus.Gauge; import com.ibm.watson.prometheus.Histogram; @@ -54,7 +55,6 @@ import static com.ibm.watson.modelmesh.Metric.MetricType.*; import static com.ibm.watson.modelmesh.ModelMesh.M; import static com.ibm.watson.modelmesh.ModelMeshEnvVars.MMESH_CUSTOM_ENV_VAR; -import static com.ibm.watson.modelmesh.ModelMeshEnvVars.MMESH_METRICS_ENV_VAR; import static java.util.concurrent.TimeUnit.*; /** @@ -172,7 +172,7 @@ public PrometheusMetrics(Map params, Map infoMet int port = 2112; boolean shortNames = true; boolean https = true; - boolean perModelMetricsEnabled= true; + boolean perModelMetricsEnabled = true; String memMetrics = "all"; // default to all for (Entry ent : params.entrySet()) { switch (ent.getKey()) { @@ -204,7 +204,7 @@ public PrometheusMetrics(Map params, Map infoMet throw new Exception("Unrecognized metrics config parameter: " + ent.getKey()); } } - this.perModelMetricsEnabled= perModelMetricsEnabled; + this.perModelMetricsEnabled = perModelMetricsEnabled; registry = new CollectorRegistry(); for (Metric m : Metric.values()) { @@ -273,6 +273,7 @@ public PrometheusMetrics(Map params, Map infoMet this.metricServer = new NettyServer(registry, port, https); this.shortNames = shortNames; + logger.info("Will expose " + (https ? "https" : "http") + " Prometheus metrics on port " + port + " using " + (shortNames ? "short" : "fully-qualified") + " method names"); @@ -369,19 +370,21 @@ public void logTimingMetricSince(Metric metric, long prevTime, boolean isNano) { @Override public void logTimingMetricDuration(Metric metric, long elapsed, boolean isNano, String modelId) { + Histogram histogram = (Histogram) metricsMap.get(metric); if (perModelMetricsEnabled && modelId != null) { - ((Histogram) metricsMap.get(metric)).labels(modelId, "").observe(isNano ? elapsed / M : elapsed); + histogram.labels(modelId, "").observe(isNano ? elapsed / M : elapsed); } else { - ((Histogram) metricsMap.get(metric)).observe(isNano ? elapsed / M : elapsed); + histogram.observe(isNano ? elapsed / M : elapsed); } } @Override public void logSizeEventMetric(Metric metric, long value, String modelId) { + Histogram histogram = (Histogram) metricsMap.get(metric); if (perModelMetricsEnabled) { - ((Histogram) metricsMap.get(metric)).labels(modelId, "").observe(value * metric.newMultiplier); + histogram.labels(modelId, "").observe(value * metric.newMultiplier); } else { - ((Histogram) metricsMap.get(metric)).observe(value * metric.newMultiplier); + histogram.observe(value * metric.newMultiplier); } } @@ -403,10 +406,12 @@ public void logRequestMetrics(boolean external, String name, long elapsedNanos, final long elapsedMillis = elapsedNanos / M; final Histogram timingHisto = (Histogram) metricsMap .get(external ? API_REQUEST_TIME : INVOKE_MODEL_TIME); + int idx = shortNames ? name.indexOf('/') : -1; String methodName = idx == -1 ? name : name.substring(idx + 1); - if (perModelMetricsEnabled&& vModelId == null) { - vModelId = ""; + if (perModelMetricsEnabled) { + modelId = Strings.nullToEmpty(modelId); + vModelId = Strings.nullToEmpty(vModelId); } if (perModelMetricsEnabled) { timingHisto.labels(methodName, code.name(), modelId, vModelId).observe(elapsedMillis); @@ -414,21 +419,19 @@ public void logRequestMetrics(boolean external, String name, long elapsedNanos, timingHisto.labels(methodName, code.name()).observe(elapsedMillis); } if (reqPayloadSize != -1) { + Histogram reqPayloadHisto = (Histogram) metricsMap.get(REQUEST_PAYLOAD_SIZE); if (perModelMetricsEnabled) { - ((Histogram) metricsMap.get(REQUEST_PAYLOAD_SIZE)) - .labels(methodName, code.name(), modelId, vModelId).observe(reqPayloadSize); + reqPayloadHisto.labels(methodName, code.name(), modelId, vModelId).observe(reqPayloadSize); } else { - ((Histogram) metricsMap.get(REQUEST_PAYLOAD_SIZE)) - .labels(methodName, code.name()).observe(reqPayloadSize); + reqPayloadHisto.labels(methodName, code.name()).observe(reqPayloadSize); } } if (respPayloadSize != -1) { + Histogram respPayloadHisto = (Histogram) metricsMap.get(RESPONSE_PAYLOAD_SIZE); if (perModelMetricsEnabled) { - ((Histogram) metricsMap.get(RESPONSE_PAYLOAD_SIZE)) - .labels(methodName, code.name(), modelId, vModelId).observe(respPayloadSize); + respPayloadHisto.labels(methodName, code.name(), modelId, vModelId).observe(respPayloadSize); } else { - ((Histogram) metricsMap.get(RESPONSE_PAYLOAD_SIZE)) - .labels(methodName, code.name()).observe(respPayloadSize); + respPayloadHisto.labels(methodName, code.name()).observe(respPayloadSize); } } } diff --git a/src/main/java/com/ibm/watson/modelmesh/ModelMesh.java b/src/main/java/com/ibm/watson/modelmesh/ModelMesh.java index 54716eae..78c776b4 100644 --- a/src/main/java/com/ibm/watson/modelmesh/ModelMesh.java +++ b/src/main/java/com/ibm/watson/modelmesh/ModelMesh.java @@ -3348,8 +3348,8 @@ public StatusInfo internalOperation(String modelId, boolean returnStatus, boolea List excludeInstances) throws ModelNotFoundException, ModelLoadException, ModelNotHereException, InternalException { try { - return (StatusInfo) invokeModel(modelId, null, - internalOpRemoteMeth, returnStatus, load, sync, lastUsed, excludeInstances); // <-- "args" + return (StatusInfo) invokeModel(modelId, null, internalOpRemoteMeth, + returnStatus, load, sync, lastUsed, excludeInstances); // <-- "args" } catch (ModelNotFoundException | ModelLoadException | ModelNotHereException | InternalException e) { throw e; } catch (TException e) { @@ -3417,8 +3417,8 @@ public StatusInfo internalOperation(String modelId, boolean returnStatus, boolea * @throws TException */ @SuppressWarnings("unchecked") - protected Object invokeModel(final String modelId, final Method method, - final Method remoteMeth, final Object... args) throws ModelNotFoundException, ModelNotHereException, ModelLoadException, TException { + protected Object invokeModel(final String modelId, final Method method, final Method remoteMeth, + final Object... args) throws ModelNotFoundException, ModelNotHereException, ModelLoadException, TException { //verify parameter values if (modelId == null || modelId.isEmpty()) { @@ -3431,7 +3431,7 @@ protected Object invokeModel(final String modelId, final Method method, } final String tasInternal = contextMap.get(TAS_INTERNAL_CXT_KEY); - String vModelId = contextMap.getOrDefault(VMODEL_ID, ""); + final String vModelId = contextMap.getOrDefault(VMODEL_ID, ""); // Set the external request flag if it's not a tasInternal call or if // tasInternal == INTERNAL_REQ. The latter is a new ensureLoaded // invocation originating from within the cluster. @@ -3504,7 +3504,7 @@ protected Object invokeModel(final String modelId, final Method method, throw new ModelNotHereException(instanceId, modelId); } try { - return invokeLocalModel(ce, method, args, modelId); + return invokeLocalModel(ce, method, args, vModelId); } catch (ModelLoadException mle) { mr = registry.get(modelId); if (mr == null || !mr.loadFailedInInstance(instanceId)) { @@ -3718,7 +3718,7 @@ protected Object invokeModel(final String modelId, final Method method, localInvokesInFlight.incrementAndGet(); } try { - Object result = invokeLocalModel(cacheEntry, method, args, modelId); + Object result = invokeLocalModel(cacheEntry, method, args, vModelId); return method == null && externalReq ? updateWithModelCopyInfo(result, mr) : result; } finally { if (!favourSelfForHits) { @@ -3938,7 +3938,7 @@ else if (mr.getInstanceIds().containsKey(instanceId)) { // invoke model try { - Object result = invokeLocalModel(cacheEntry, method, args, modelId); + Object result = invokeLocalModel(cacheEntry, method, args, vModelId); return method == null && externalReq ? updateWithModelCopyInfo(result, mr) : result; } catch (ModelNotHereException e) { if (loadTargetFilter != null) loadTargetFilter.remove(instanceId); @@ -4123,6 +4123,7 @@ private Map filterIfReadOnly(Map instId) { * instances inside and some out, and a request has been sent from outside the * cluster to an instance inside (since it may land on an unintended instance in * that case). + * * @throws ModelNotHereException if the specified destination instance isn't found */ protected Object forwardInvokeModel(String destId, String modelId, Method remoteMeth, Object... args) @@ -4404,17 +4405,17 @@ protected Object invokeRemoteModel(BaseModelMeshService.Iface client, Method met return remoteMeth.invoke(client, ObjectArrays.concat(modelId, args)); } - protected Object invokeLocalModel(CacheEntry ce, Method method, Object[] args, String modelId) + protected Object invokeLocalModel(CacheEntry ce, Method method, Object[] args, String vModelId) throws InterruptedException, TException { - Object result = invokeLocalModel(ce, method, args); + final Object result = _invokeLocalModel(ce, method, args, vModelId); // if this is an ensure-loaded request, check-for and trigger a "chained" load if necessary if (method == null) { - triggerChainedLoadIfNecessary(modelId, result, args, ce.getWeight(), null); + triggerChainedLoadIfNecessary(ce.modelId, result, args, ce.getWeight(), null); } return result; } - private Object invokeLocalModel(CacheEntry ce, Method method, Object[] args) + private Object _invokeLocalModel(CacheEntry ce, Method method, Object[] args, String vModelId) throws InterruptedException, TException { if (method == null) { @@ -4429,17 +4430,7 @@ private Object invokeLocalModel(CacheEntry ce, Method method, Object[] args) long now = currentTimeMillis(); ce.upgradePriority(now + 3600_000L, now + 7200_000L); // (2 hours in future) } - Map contextMap = ThreadContext.getCurrentContext(); - String vModelId = null; - // We might arrive here from a path where the original call was with a modelid. - // Hence, it is possible to arrive here with a null contextMap because the vModelId was never set - // To avoid catching a null pointer exception we just sanity check instead. - if (contextMap == null) { - vModelId = ""; - } else { - vModelId = contextMap.get(VMODEL_ID); - } - + // The future-waiting timeouts should not be needed, request threads are interrupted when their // timeouts/deadlines expire, and the model loading thread that it waits for has its own timeout. // But we still set a large one as a safeguard (there can be pathalogical cases where model-loading diff --git a/src/main/java/com/ibm/watson/modelmesh/ModelMeshApi.java b/src/main/java/com/ibm/watson/modelmesh/ModelMeshApi.java index 335b9d79..715c0efe 100644 --- a/src/main/java/com/ibm/watson/modelmesh/ModelMeshApi.java +++ b/src/main/java/com/ibm/watson/modelmesh/ModelMeshApi.java @@ -87,6 +87,7 @@ import io.netty.handler.ssl.ClientAuth; import io.netty.handler.ssl.SslContext; import io.netty.util.ReferenceCountUtil; +import io.netty.util.concurrent.FastThreadLocal; import io.netty.util.concurrent.FastThreadLocalThread; import org.apache.thrift.TException; import org.slf4j.Logger; @@ -346,7 +347,7 @@ protected static void setUnbalancedLitelinksContextParam() { ThreadContext.addContextEntry(ModelMesh.UNBALANCED_KEY, "true"); // unbalanced } - protected static void setvModelIdLiteLinksContextParam(String vModelId) { + protected static void setVModelIdLiteLinksContextParam(String vModelId) { ThreadContext.addContextEntry(ModelMesh.VMODEL_ID, vModelId); } @@ -433,42 +434,46 @@ public void ensureLoaded(EnsureLoadedRequest request, StreamObserver resolvedModelId = new FastThreadLocal<>(); + // Returned ModelResponse will be released once the request thread exits so // must be retained before transferring. // non-private to avoid synthetic method access - ModelResponse callModel(String modelId, String vModelId, String methodName, String grpcBalancedHeader, + ModelResponse callModel(String modelId, boolean isVModel, String methodName, String grpcBalancedHeader, Metadata headers, ByteBuf data) throws Exception { boolean unbalanced = grpcBalancedHeader == null ? UNBALANCED_DEFAULT : !"true".equals(grpcBalancedHeader); - if (!modelId.isBlank()) { + if (!isVModel) { if (unbalanced) { setUnbalancedLitelinksContextParam(); } return delegate.callModel(modelId, methodName, headers, data); - } else if (!vModelId.isBlank()) { - boolean first = true; - while (true) { - String resolvedModelId = vmm().resolveVModelId(vModelId, vModelId); - if (unbalanced) { - setUnbalancedLitelinksContextParam(); - } - try { - return delegate.callModel(resolvedModelId, methodName, headers, data); - } catch (ModelNotFoundException mnfe) { - if (!first) throw mnfe; - } catch (Exception e) { - logger.error("Exception invoking " + methodName + " method of resolved model " + modelId + " of vmodel " - + vModelId + ": " + e.getClass().getSimpleName() + ": " + e.getMessage()); - throw e; - } - // try again - first = false; - data.readerIndex(0); // rewind buffer + } + String vModelId = modelId; + modelId = null; + if (delegate.metrics.isPerModelMetricsEnabled()) { + setVModelIdLiteLinksContextParam(vModelId); + } + boolean first = true; + while (true) { + modelId = vmm().resolveVModelId(vModelId, modelId); + resolvedModelId.set(modelId); + if (unbalanced) { + setUnbalancedLitelinksContextParam(); } - } else { - throw statusException(DATA_LOSS, - "no valid modelid or vmodelid found for request"); + try { + return delegate.callModel(modelId, methodName, headers, data); + } catch (ModelNotFoundException mnfe) { + if (!first) throw mnfe; + } catch (Exception e) { + logger.error("Exception invoking " + methodName + " method of resolved model " + modelId + " of vmodel " + + vModelId + ": " + e.getClass().getSimpleName() + ": " + e.getMessage()); + throw e; + } + // try again + first = false; + data.readerIndex(0); // rewind buffer } - } // ----- @@ -551,7 +556,7 @@ protected static void respondAndComplete(StreamObserver response, } protected static io.grpc.Status toStatus(Exception e) { - io.grpc.Status s = null; + io.grpc.Status s; String msg = e.getMessage(); if (e instanceof ModelNotFoundException) { return MODEL_NOT_FOUND_STATUS; @@ -664,7 +669,7 @@ public Listener startCall(ServerCall call, Metadata h call.request(2); // request 2 to force failure if streaming method - return new Listener() { + return new Listener<>() { ByteBuf reqMessage; boolean canInvoke = true; Iterable modelIds = mids.modelIds; @@ -716,11 +721,10 @@ public void onHalfClose() { int respReaderIndex = 0; io.grpc.Status status = INTERNAL; - String modelId = null; - String requestId = null; String resolvedModelId = null; + String vModelId = null; + String requestId = null; ModelResponse response = null; - Boolean isSingleModelRequest = null; try (InterruptingListener cancelListener = newInterruptingListener()) { if (logHeaders != null) { logHeaders.addToMDC(headers); // MDC cleared in finally block @@ -732,26 +736,28 @@ public void onHalfClose() { String balancedMetaVal = headers.get(BALANCED_META_KEY); Iterator midIt = modelIds.iterator(); // guaranteed at least one - modelId = validateModelId(midIt.next(), isVModel); + String modelOrVModelId = validateModelId(midIt.next(), isVModel); if (!midIt.hasNext()) { // single model case (most common) - isSingleModelRequest = true; - if (isVModel && delegate.metrics.isEnabled()) { - setvModelIdLiteLinksContextParam(modelId); - resolvedModelId = vmm().resolveVModelId(modelId, modelId); - response = callModel("", modelId, methodName, - balancedMetaVal, headers, reqMessage).retain(); - } else { - response = callModel(modelId, "", methodName, - balancedMetaVal, headers, reqMessage).retain(); + if (isVModel) { + ModelMeshApi.resolvedModelId.set(null); + } + try { + response = callModel(modelOrVModelId, isVModel, methodName, + balancedMetaVal, headers, reqMessage).retain(); + } finally { + if (isVModel) { + vModelId = modelOrVModelId; + resolvedModelId = ModelMeshApi.resolvedModelId.getIfExists(); + } else { + resolvedModelId = modelOrVModelId; + } } - } else { // multi-model case (specialized) - isSingleModelRequest = false; boolean allRequired = "all".equalsIgnoreCase(headers.get(REQUIRED_KEY)); List idList = new ArrayList<>(); - idList.add(modelId); + idList.add(modelOrVModelId); while (midIt.hasNext()) { idList.add(validateModelId(midIt.next(), isVModel)); } @@ -761,7 +767,7 @@ public void onHalfClose() { } finally { if (payloadProcessor != null) { processPayload(reqMessage.readerIndex(reqReaderIndex), - requestId, modelId, methodName, headers, null, true); + requestId, resolvedModelId, methodName, headers, null, true); } else { releaseReqMessage(); } @@ -797,7 +803,7 @@ public void onHalfClose() { data = response.data.readerIndex(respReaderIndex); metadata = response.metadata; } - processPayload(data, requestId, modelId, methodName, metadata, status, releaseResponse); + processPayload(data, requestId, resolvedModelId, methodName, metadata, status, releaseResponse); } else if (releaseResponse && response != null) { response.release(); } @@ -807,19 +813,8 @@ public void onHalfClose() { call.close(status, emptyMeta()); Metrics metrics = delegate.metrics; if (metrics.isEnabled()) { - if (isSingleModelRequest && metrics.isPerModelMetricsEnabled() && modelId!=null) { - if (isVModel) { - metrics.logRequestMetrics(true, methodName, nanoTime() - startNanos, - status.getCode(), reqSize, respSize, resolvedModelId, modelId); - } else { - metrics.logRequestMetrics(true, methodName, nanoTime() - startNanos, - status.getCode(), reqSize, respSize, modelId, ""); - } - } - else { - metrics.logRequestMetrics(true, methodName, nanoTime() - startNanos, - status.getCode(), reqSize, respSize, "", ""); - } + metrics.logRequestMetrics(true, methodName, nanoTime() - startNanos, + status.getCode(), reqSize, respSize, resolvedModelId, vModelId); } } } @@ -980,10 +975,7 @@ protected ModelResponse applyParallelMultiModel(List modelIds, boolean i logHeaders.addToMDC(headers); } // need to pass slices of the buffers for threadsafety - if (isVModel) { - return callModel("", modelId, methodName, balancedMetaVal, headers, data.slice()); - } - return callModel(modelId, "", methodName, balancedMetaVal, headers, data.slice()); + return callModel(modelId, isVModel, methodName, balancedMetaVal, headers, data.slice()); } catch (ModelNotFoundException mnfe) { logger.warn("model " + modelId + " not found (from supplied list of " + total + ")"); if (!requireAll) { diff --git a/src/main/java/com/ibm/watson/modelmesh/VModelManager.java b/src/main/java/com/ibm/watson/modelmesh/VModelManager.java index 058350d0..7ad5da8a 100644 --- a/src/main/java/com/ibm/watson/modelmesh/VModelManager.java +++ b/src/main/java/com/ibm/watson/modelmesh/VModelManager.java @@ -42,9 +42,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.Closeable; -import java.io.IOException; -import java.io.UncheckedIOException; import java.util.ArrayList; import java.util.HashSet; import java.util.Iterator;