Skip to content

Commit

Permalink
Add modelid and vmodelid labels to metrics.
Browse files Browse the repository at this point in the history
Signed-off-by: Vedant Mahabaleshwarkar <vmahabal@redhat.com>
  • Loading branch information
ScrapCodes authored and VedantMahabaleshwarkar committed May 9, 2023
1 parent 9cc40e3 commit 35df353
Show file tree
Hide file tree
Showing 5 changed files with 127 additions and 59 deletions.
109 changes: 80 additions & 29 deletions src/main/java/com/ibm/watson/modelmesh/Metrics.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,20 @@
import java.lang.reflect.Array;
import java.net.SocketAddress;
import java.nio.channels.DatagramChannel;
import java.util.*;
import java.util.Collections;
import java.util.EnumMap;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;

import static com.ibm.watson.modelmesh.Metric.*;
import static com.ibm.watson.modelmesh.Metric.MetricType.*;
import static com.ibm.watson.modelmesh.ModelMesh.M;
import static com.ibm.watson.modelmesh.ModelMeshEnvVars.MMESH_CUSTOM_ENV_VAR;
import static com.ibm.watson.modelmesh.ModelMeshEnvVars.MMESH_METRICS_ENV_VAR;
Expand All @@ -56,14 +62,14 @@
*
*/
interface Metrics extends AutoCloseable {
boolean isPerModelMetricsEnabled();

boolean isEnabled();

void logTimingMetricSince(Metric metric, long prevTime, boolean isNano);

void logTimingMetricDuration(Metric metric, long elapsed, boolean isNano);
void logTimingMetricDuration(Metric metric, long elapsed, boolean isNano, String modelId);

void logSizeEventMetric(Metric metric, long value);
void logSizeEventMetric(Metric metric, long value, String modelId);

void logGaugeMetric(Metric metric, long value);

Expand Down Expand Up @@ -101,7 +107,7 @@ default void logInstanceStats(final InstanceRecord ir) {
* @param respPayloadSize response payload size in bytes (or -1 if not applicable)
*/
void logRequestMetrics(boolean external, String name, long elapsedNanos, Code code,
int reqPayloadSize, int respPayloadSize);
int reqPayloadSize, int respPayloadSize, String modelId, String vModelId);

default void registerGlobals() {}

Expand All @@ -111,6 +117,11 @@ default void unregisterGlobals() {}
default void close() {}

Metrics NO_OP_METRICS = new Metrics() {
@Override
public boolean isPerModelMetricsEnabled() {
return false;
}

@Override
public boolean isEnabled() {
return false;
Expand All @@ -120,10 +131,10 @@ public boolean isEnabled() {
public void logTimingMetricSince(Metric metric, long prevTime, boolean isNano) {}

@Override
public void logTimingMetricDuration(Metric metric, long elapsed, boolean isNano) {}
public void logTimingMetricDuration(Metric metric, long elapsed, boolean isNano, String modelId){}

@Override
public void logSizeEventMetric(Metric metric, long value) {}
public void logSizeEventMetric(Metric metric, long value, String modelId){}

@Override
public void logGaugeMetric(Metric metric, long value) {}
Expand All @@ -136,7 +147,7 @@ public void logInstanceStats(InstanceRecord ir) {}

@Override
public void logRequestMetrics(boolean external, String name, long elapsedNanos, Code code,
int reqPayloadSize, int respPayloadSize) {}
int reqPayloadSize, int respPayloadSize, String modelId, String vModelId) {}
};

final class PrometheusMetrics implements Metrics {
Expand All @@ -154,12 +165,14 @@ final class PrometheusMetrics implements Metrics {
private final CollectorRegistry registry;
private final NettyServer metricServer;
private final boolean shortNames;
private final boolean enablePerModelMetrics;
private final EnumMap<Metric, Collector> metricsMap = new EnumMap<>(Metric.class);

public PrometheusMetrics(Map<String, String> params, Map<String, String> infoMetricParams) throws Exception {
int port = 2112;
boolean shortNames = true;
boolean https = true;
boolean enablePerModelMetrics = true;
String memMetrics = "all"; // default to all
for (Entry<String, String> ent : params.entrySet()) {
switch (ent.getKey()) {
Expand All @@ -170,6 +183,9 @@ public PrometheusMetrics(Map<String, String> params, Map<String, String> infoMet
throw new Exception("Invalid metrics port: " + ent.getValue());
}
break;
case "per_model_metrics":
enablePerModelMetrics = "true".equalsIgnoreCase(ent.getValue());
break;
case "fq_names":
shortNames = !"true".equalsIgnoreCase(ent.getValue());
break;
Expand All @@ -188,6 +204,7 @@ public PrometheusMetrics(Map<String, String> params, Map<String, String> infoMet
throw new Exception("Unrecognized metrics config parameter: " + ent.getKey());
}
}
this.enablePerModelMetrics = enablePerModelMetrics;

registry = new CollectorRegistry();
for (Metric m : Metric.values()) {
Expand Down Expand Up @@ -220,10 +237,15 @@ public PrometheusMetrics(Map<String, String> params, Map<String, String> infoMet
}

if (m == API_REQUEST_TIME || m == API_REQUEST_COUNT || m == INVOKE_MODEL_TIME
|| m == INVOKE_MODEL_COUNT || m == REQUEST_PAYLOAD_SIZE || m == RESPONSE_PAYLOAD_SIZE) {
builder.labelNames("method", "code");
|| m == INVOKE_MODEL_COUNT || m == REQUEST_PAYLOAD_SIZE || m == RESPONSE_PAYLOAD_SIZE) {
if (this.enablePerModelMetrics && m.type != COUNTER_WITH_HISTO) {
builder.labelNames("method", "code", "modelId");
} else {
builder.labelNames("method", "code");
}
} else if (this.enablePerModelMetrics && m.type != GAUGE && m.type != COUNTER && m.type != COUNTER_WITH_HISTO) {
builder.labelNames("modelId");
}

Collector collector = builder.name(m.promName).help(m.description).create();
metricsMap.put(m, collector);
if (!m.global) {
Expand Down Expand Up @@ -251,7 +273,6 @@ public PrometheusMetrics(Map<String, String> params, Map<String, String> infoMet

this.metricServer = new NettyServer(registry, port, https);
this.shortNames = shortNames;

logger.info("Will expose " + (https ? "https" : "http") + " Prometheus metrics on port " + port
+ " using " + (shortNames ? "short" : "fully-qualified") + " method names");

Expand Down Expand Up @@ -330,6 +351,11 @@ public void close() {
this.metricServer.close();
}

@Override
public boolean isPerModelMetricsEnabled() {
return enablePerModelMetrics;
}

@Override
public boolean isEnabled() {
return true;
Expand All @@ -342,13 +368,21 @@ public void logTimingMetricSince(Metric metric, long prevTime, boolean isNano) {
}

@Override
public void logTimingMetricDuration(Metric metric, long elapsed, boolean isNano) {
((Histogram) metricsMap.get(metric)).observe(isNano ? elapsed / M : elapsed);
public void logTimingMetricDuration(Metric metric, long elapsed, boolean isNano, String modelId) {
if (enablePerModelMetrics) {
((Histogram) metricsMap.get(metric)).labels(modelId).observe(isNano ? elapsed / M : elapsed);
} else {
((Histogram) metricsMap.get(metric)).observe(isNano ? elapsed / M : elapsed);
}
}

@Override
public void logSizeEventMetric(Metric metric, long value) {
((Histogram) metricsMap.get(metric)).observe(value * metric.newMultiplier);
public void logSizeEventMetric(Metric metric, long value, String modelId) {
if (enablePerModelMetrics) {
((Histogram) metricsMap.get(metric)).labels(modelId).observe(value * metric.newMultiplier);
} else {
((Histogram) metricsMap.get(metric)).observe(value * metric.newMultiplier);
}
}

@Override
Expand All @@ -365,23 +399,35 @@ public void logCounterMetric(Metric metric) {

@Override
public void logRequestMetrics(boolean external, String name, long elapsedNanos, Code code,
int reqPayloadSize, int respPayloadSize) {
int reqPayloadSize, int respPayloadSize, String modelId, String vModelId) {
final long elapsedMillis = elapsedNanos / M;
final Histogram timingHisto = (Histogram) metricsMap
.get(external ? API_REQUEST_TIME : INVOKE_MODEL_TIME);

String mId = vModelId == null ? modelId : vModelId;
int idx = shortNames ? name.indexOf('/') : -1;
final String methodName = idx == -1 ? name : name.substring(idx + 1);

timingHisto.labels(methodName, code.name()).observe(elapsedMillis);

String methodName = idx == -1 ? name : name.substring(idx + 1);
if (enablePerModelMetrics) {
timingHisto.labels(methodName, code.name(), mId).observe(elapsedMillis);
} else {
timingHisto.labels(methodName, code.name()).observe(elapsedMillis);
}
if (reqPayloadSize != -1) {
((Histogram) metricsMap.get(REQUEST_PAYLOAD_SIZE))
.labels(methodName, code.name()).observe(reqPayloadSize);
if (enablePerModelMetrics) {
((Histogram) metricsMap.get(REQUEST_PAYLOAD_SIZE))
.labels(methodName, code.name(), mId).observe(reqPayloadSize);
} else {
((Histogram) metricsMap.get(REQUEST_PAYLOAD_SIZE))
.labels(methodName, code.name()).observe(reqPayloadSize);
}
}
if (respPayloadSize != -1) {
((Histogram) metricsMap.get(RESPONSE_PAYLOAD_SIZE))
.labels(methodName, code.name()).observe(respPayloadSize);
if (enablePerModelMetrics) {
((Histogram) metricsMap.get(RESPONSE_PAYLOAD_SIZE))
.labels(methodName, code.name(), mId).observe(respPayloadSize);
} else {
((Histogram) metricsMap.get(RESPONSE_PAYLOAD_SIZE))
.labels(methodName, code.name()).observe(respPayloadSize);
}
}
}

Expand Down Expand Up @@ -437,6 +483,11 @@ protected StatsDSender createSender(Callable<SocketAddress> addressLookup, int q
+ (shortNames ? "short" : "fully-qualified") + " method names");
}

@Override
public boolean isPerModelMetricsEnabled() {
return false;
}

@Override
public boolean isEnabled() {
return true;
Expand All @@ -454,12 +505,12 @@ public void logTimingMetricSince(Metric metric, long prevTime, boolean isNano) {
}

@Override
public void logTimingMetricDuration(Metric metric, long elapsed, boolean isNano) {
public void logTimingMetricDuration(Metric metric, long elapsed, boolean isNano, String modelId) {
client.recordExecutionTime(name(metric), isNano ? elapsed / M : elapsed);
}

@Override
public void logSizeEventMetric(Metric metric, long value) {
public void logSizeEventMetric(Metric metric, long value, String modelId) {
if (!legacy) {
value *= metric.newMultiplier;
}
Expand Down Expand Up @@ -497,7 +548,7 @@ static String[] getOkTags(String method, boolean shortName) {

@Override
public void logRequestMetrics(boolean external, String name, long elapsedNanos, Code code,
int reqPayloadSize, int respPayloadSize) {
int reqPayloadSize, int respPayloadSize, String modelId, String vModelId) {
final StatsDClient client = this.client;
final long elapsedMillis = elapsedNanos / M;
final String countName = name(external ? API_REQUEST_COUNT : INVOKE_MODEL_COUNT);
Expand Down
25 changes: 13 additions & 12 deletions src/main/java/com/ibm/watson/modelmesh/ModelMesh.java
Original file line number Diff line number Diff line change
Expand Up @@ -1966,7 +1966,7 @@ final synchronized boolean doRemove(final boolean evicted,
// "unload" event if explicit unloading isn't enabled.
// Otherwise, this gets recorded in a callback set in the
// CacheEntry.unload(int) method
metrics.logTimingMetricDuration(Metric.UNLOAD_MODEL_TIME, 0L, false);
metrics.logTimingMetricDuration(Metric.UNLOAD_MODEL_TIME, 0L, false, modelId);
metrics.logCounterMetric(Metric.UNLOAD_MODEL);
}
}
Expand Down Expand Up @@ -2037,7 +2037,7 @@ public void onSuccess(Boolean reallyHappened) {
//TODO probably only log if took longer than a certain time
long tookMillis = msSince(beforeNanos);
logger.info("Unload of " + modelId + " completed in " + tookMillis + "ms");
metrics.logTimingMetricDuration(Metric.UNLOAD_MODEL_TIME, tookMillis, false);
metrics.logTimingMetricDuration(Metric.UNLOAD_MODEL_TIME, tookMillis, false, modelId);
metrics.logCounterMetric(Metric.UNLOAD_MODEL);
}
// else considered trivially succeeded because the corresponding
Expand Down Expand Up @@ -2158,7 +2158,7 @@ public final void run() {
long queueStartTimeNanos = getAndResetLoadingQueueStartTimeNanos();
if (queueStartTimeNanos > 0) {
long queueDelayMillis = (nanoTime() - queueStartTimeNanos) / M;
metrics.logSizeEventMetric(Metric.LOAD_MODEL_QUEUE_DELAY, queueDelayMillis);
metrics.logSizeEventMetric(Metric.LOAD_MODEL_QUEUE_DELAY, queueDelayMillis, modelId);
// Only log if the priority value is "in the future" which indicates
// that there is or were runtime requests waiting for this load.
// Otherwise we don't care about arbitrary delays here
Expand Down Expand Up @@ -2228,7 +2228,7 @@ public final void run() {
loadingTimeStats(modelType).recordTime(tookMillis);
logger.info("Load of model " + modelId + " type=" + modelType + " completed in " + tookMillis
+ "ms");
metrics.logTimingMetricDuration(Metric.LOAD_MODEL_TIME, tookMillis, false);
metrics.logTimingMetricDuration(Metric.LOAD_MODEL_TIME, tookMillis, false, modelId);
metrics.logCounterMetric(Metric.LOAD_MODEL);
} catch (Throwable t) {
loadFuture = null;
Expand Down Expand Up @@ -2388,7 +2388,7 @@ protected final void complete(LoadedRuntime<T> result, Throwable error) {
if (size > 0) {
long sizeBytes = size * UNIT_SIZE;
logger.info("Model " + modelId + " size = " + size + " units" + ", ~" + mb(sizeBytes));
metrics.logSizeEventMetric(Metric.LOADED_MODEL_SIZE, sizeBytes);
metrics.logSizeEventMetric(Metric.LOADED_MODEL_SIZE, sizeBytes, modelId);
} else {
try {
long before = nanoTime();
Expand All @@ -2397,9 +2397,9 @@ protected final void complete(LoadedRuntime<T> result, Throwable error) {
long took = msSince(before), sizeBytes = size * UNIT_SIZE;
logger.info("Model " + modelId + " size = " + size + " units" + ", ~" + mb(sizeBytes)
+ " sizing took " + took + "ms");
metrics.logTimingMetricDuration(Metric.MODEL_SIZING_TIME, took, false);
metrics.logTimingMetricDuration(Metric.MODEL_SIZING_TIME, took, false, modelId);
// this is actually a size (bytes), not a "time"
metrics.logSizeEventMetric(Metric.LOADED_MODEL_SIZE, sizeBytes);
metrics.logSizeEventMetric(Metric.LOADED_MODEL_SIZE, sizeBytes, modelId);
}
} catch (Exception e) {
if (!isInterruption(e) && state == SIZING) {
Expand Down Expand Up @@ -2722,7 +2722,7 @@ protected void beforeInvoke(int requestWeight)
//noinspection ThrowFromFinallyBlock
throw new ModelNotHereException(instanceId, modelId);
}
metrics.logTimingMetricDuration(Metric.QUEUE_DELAY, tookMillis, false);
metrics.logTimingMetricDuration(Metric.QUEUE_DELAY, tookMillis, false, modelId);
}
}
}
Expand Down Expand Up @@ -2901,7 +2901,7 @@ public void onEviction(String key, CacheEntry<?> ce, long lastUsed) {
logger.info("Evicted " + (failed ? "failed model record" : "model") + " " + key
+ " from local cache, last used " + readableTime(millisSinceLastUsed) + " ago (" + lastUsed
+ "ms), invoked " + ce.getTotalInvocationCount() + " times");
metrics.logTimingMetricDuration(Metric.AGE_AT_EVICTION, millisSinceLastUsed, false);
metrics.logTimingMetricDuration(Metric.AGE_AT_EVICTION, millisSinceLastUsed, false, ce.modelId);
metrics.logCounterMetric(Metric.EVICT_MODEL);
}

Expand Down Expand Up @@ -3989,9 +3989,10 @@ else if (mr.getInstanceIds().containsKey(instanceId)) {
throw t;
} finally {
if (methodStartNanos > 0L && metrics.isEnabled()) {
String[] extraLabels = new String[]{modelId};
// only logged here in non-grpc (legacy) mode
metrics.logRequestMetrics(true, getRequestMethodName(method, args),
nanoTime() - methodStartNanos, metricStatusCode, -1, -1);
nanoTime() - methodStartNanos, metricStatusCode, -1, -1, modelId, "");
}
curThread.setName(threadNameBefore);
}
Expand Down Expand Up @@ -4450,7 +4451,7 @@ private Object invokeLocalModel(CacheEntry<?> ce, Method method, Object[] args)
long delayMillis = msSince(beforeNanos);
logger.info("Cache miss for model invocation, held up " + delayMillis + "ms");
metrics.logCounterMetric(Metric.CACHE_MISS);
metrics.logTimingMetricDuration(Metric.CACHE_MISS_DELAY, delayMillis, false);
metrics.logTimingMetricDuration(Metric.CACHE_MISS_DELAY, delayMillis, false, ce.modelId);
}
}
} else {
Expand Down Expand Up @@ -4528,7 +4529,7 @@ private Object invokeLocalModel(CacheEntry<?> ce, Method method, Object[] args)
ce.afterInvoke(weight, tookNanos);
if (code != null && metrics.isEnabled()) {
metrics.logRequestMetrics(false, getRequestMethodName(method, args),
tookNanos, code, -1, -1);
tookNanos, code, -1, -1, ce.modelId, "");
}
}
}
Expand Down
Loading

0 comments on commit 35df353

Please sign in to comment.