Skip to content

Commit

Permalink
WIP, fix text + some changes
Browse files Browse the repository at this point in the history
  • Loading branch information
danielezonca authored and VedantMahabaleshwarkar committed Jul 3, 2023
1 parent 4f1130d commit fbc8b1b
Show file tree
Hide file tree
Showing 7 changed files with 41 additions and 48 deletions.
7 changes: 1 addition & 6 deletions src/main/java/com/ibm/watson/modelmesh/Metrics.java
Original file line number Diff line number Diff line change
Expand Up @@ -404,12 +404,7 @@ public void logRequestMetrics(boolean external, String name, long elapsedNanos,
.get(external ? API_REQUEST_TIME : INVOKE_MODEL_TIME);
int idx = shortNames ? name.indexOf('/') : -1;
String methodName = idx == -1 ? name : name.substring(idx + 1);
if (modelId == null) {
logger.error("invalid ModelId. Label value for ModelId will be left blank");
modelId = "";
}
if (vModelId == null) {
logger.debug("vModelId is empty, creating empty label");
if (enablePerModelMetrics && vModelId == null) {
vModelId = "";
}
if (enablePerModelMetrics) {
Expand Down
29 changes: 10 additions & 19 deletions src/main/java/com/ibm/watson/modelmesh/ModelMesh.java
Original file line number Diff line number Diff line change
Expand Up @@ -3348,7 +3348,7 @@ public StatusInfo internalOperation(String modelId, boolean returnStatus, boolea
List<String> excludeInstances)
throws ModelNotFoundException, ModelLoadException, ModelNotHereException, InternalException {
try {
return (StatusInfo) invokeModel(modelId, false, null,
return (StatusInfo) invokeModel(modelId, null,
internalOpRemoteMeth, returnStatus, load, sync, lastUsed, excludeInstances); // <-- "args"
} catch (ModelNotFoundException | ModelLoadException | ModelNotHereException | InternalException e) {
throw e;
Expand Down Expand Up @@ -3417,7 +3417,7 @@ public StatusInfo internalOperation(String modelId, boolean returnStatus, boolea
* @throws TException
*/
@SuppressWarnings("unchecked")
protected Object invokeModel(final String modelId, Boolean isVModel, final Method method,
protected Object invokeModel(final String modelId, final Method method,
final Method remoteMeth, final Object... args) throws ModelNotFoundException, ModelNotHereException, ModelLoadException, TException {

//verify parameter values
Expand All @@ -3431,10 +3431,7 @@ protected Object invokeModel(final String modelId, Boolean isVModel, final Metho
}

final String tasInternal = contextMap.get(TAS_INTERNAL_CXT_KEY);
String vModelId = "";
if (isVModel) {
vModelId = contextMap.get(VMODELID);
}
String vModelId = contextMap.getOrDefault(VMODELID, "");
// Set the external request flag if it's not a tasInternal call or if
// tasInternal == INTERNAL_REQ. The latter is a new ensureLoaded
// invocation originating from within the cluster.
Expand Down Expand Up @@ -3507,7 +3504,7 @@ protected Object invokeModel(final String modelId, Boolean isVModel, final Metho
throw new ModelNotHereException(instanceId, modelId);
}
try {
return invokeLocalModel(ce, method, args, modelId, isVModel);
return invokeLocalModel(ce, method, args, modelId);
} catch (ModelLoadException mle) {
mr = registry.get(modelId);
if (mr == null || !mr.loadFailedInInstance(instanceId)) {
Expand Down Expand Up @@ -3721,7 +3718,7 @@ protected Object invokeModel(final String modelId, Boolean isVModel, final Metho
localInvokesInFlight.incrementAndGet();
}
try {
Object result = invokeLocalModel(cacheEntry, method, args, modelId, isVModel);
Object result = invokeLocalModel(cacheEntry, method, args, modelId);
return method == null && externalReq ? updateWithModelCopyInfo(result, mr) : result;
} finally {
if (!favourSelfForHits) {
Expand Down Expand Up @@ -3941,7 +3938,7 @@ else if (mr.getInstanceIds().containsKey(instanceId)) {

// invoke model
try {
Object result = invokeLocalModel(cacheEntry, method, args, modelId, isVModel);
Object result = invokeLocalModel(cacheEntry, method, args, modelId);
return method == null && externalReq ? updateWithModelCopyInfo(result, mr) : result;
} catch (ModelNotHereException e) {
if (loadTargetFilter != null) loadTargetFilter.remove(instanceId);
Expand Down Expand Up @@ -4126,15 +4123,12 @@ private Map<String, Long> filterIfReadOnly(Map<String, Long> instId) {
* instances inside and some out, and a request has been sent from outside the
* cluster to an instance inside (since it may land on an unintended instance in
* that case).
* @param isVModel TODO
* @throws TException TODO
* @throws ModelNotHereException if the specified destination instance isn't found
*/
protected Object forwardInvokeModel(String destId, String modelId, Method remoteMeth, Object... args)
throws TException {
destinationInstance.set(destId);
try {
//TODO: not sure what is happening here.. do I need to pass vmodelid to the remoteMeth.invoke?
return remoteMeth.invoke(directClient, ObjectArrays.concat(modelId, args));
} catch (Exception e) {
if (e instanceof InvocationTargetException) {
Expand Down Expand Up @@ -4410,17 +4404,17 @@ protected Object invokeRemoteModel(BaseModelMeshService.Iface client, Method met
return remoteMeth.invoke(client, ObjectArrays.concat(modelId, args));
}

protected Object invokeLocalModel(CacheEntry<?> ce, Method method, Object[] args, String modelId, Boolean isVModel)
protected Object invokeLocalModel(CacheEntry<?> ce, Method method, Object[] args, String modelId)
throws InterruptedException, TException {
Object result = invokeLocalModel(ce, method, false, args);
Object result = invokeLocalModel(ce, method, args);
// if this is an ensure-loaded request, check-for and trigger a "chained" load if necessary
if (method == null) {
triggerChainedLoadIfNecessary(modelId, result, args, ce.getWeight(), null);
}
return result;
}

private Object invokeLocalModel(CacheEntry<?> ce, Method method, Boolean isVModel, Object[] args)
private Object invokeLocalModel(CacheEntry<?> ce, Method method, Object[] args)
throws InterruptedException, TException {

if (method == null) {
Expand All @@ -4436,10 +4430,7 @@ private Object invokeLocalModel(CacheEntry<?> ce, Method method, Boolean isVMode
ce.upgradePriority(now + 3600_000L, now + 7200_000L); // (2 hours in future)
}
Map<String, String> contextMap = ThreadContext.getCurrentContext();
String vModelId = null;
if (isVModel) {
vModelId = contextMap.get(VMODELID);
}
String vModelId = contextMap.getOrDefault(VMODELID, "");
// The future-waiting timeouts should not be needed, request threads are interrupted when their
// timeouts/deadlines expire, and the model loading thread that it waits for has its own timeout.
// But we still set a large one as a safeguard (there can be pathalogical cases where model-loading
Expand Down
15 changes: 8 additions & 7 deletions src/main/java/com/ibm/watson/modelmesh/ModelMeshApi.java
Original file line number Diff line number Diff line change
Expand Up @@ -434,26 +434,27 @@ public void ensureLoaded(EnsureLoadedRequest request, StreamObserver<ModelStatus
// Returned ModelResponse will be released once the request thread exits so
// must be retained before transferring.
// non-private to avoid synthetic method access
ModelResponse callModel(String modelId, boolean isVModel, String methodName, String grpcBalancedHeader,
ModelResponse callModel(String originalModelId, boolean isVModel, String methodName, String grpcBalancedHeader,
Metadata headers, ByteBuf data) throws Exception {
boolean unbalanced = grpcBalancedHeader == null ? UNBALANCED_DEFAULT : !"true".equals(grpcBalancedHeader);
if (!isVModel) {
if (unbalanced) {
setUnbalancedLitelinksContextParam();
}
return delegate.callModel(modelId, isVModel, methodName, headers, data);
return delegate.callModel(originalModelId, methodName, headers, data);
}
String vModelId = originalModelId;
if (delegate.metrics.isEnabled()) {
setvModelIdLiteLinksContextParam(originalModelId);
}
String vModelId = modelId;
modelId = null;
boolean first = true;
while (true) {
modelId = vmm().resolveVModelId(vModelId, modelId);
setvModelIdLiteLinksContextParam(vModelId);
String modelId = vmm().resolveVModelId(vModelId, originalModelId);
if (unbalanced) {
setUnbalancedLitelinksContextParam();
}
try {
return delegate.callModel(modelId, true, methodName, headers, data);
return delegate.callModel(modelId, methodName, headers, data);
} catch (ModelNotFoundException mnfe) {
if (!first) throw mnfe;
} catch (Exception e) {
Expand Down
6 changes: 3 additions & 3 deletions src/main/java/com/ibm/watson/modelmesh/SidecarModelMesh.java
Original file line number Diff line number Diff line change
Expand Up @@ -1098,12 +1098,12 @@ public List<ByteBuffer> applyModelMulti(String modelId, List<ByteBuffer> input,
@SuppressWarnings("unchecked")
List<ByteBuffer> applyModel(String modelId, List<ByteBuffer> input, Map<String, String> metadata)
throws TException {
return (List<ByteBuffer>) invokeModel(modelId, false, localMeth, remoteMeth, input, metadata);
return (List<ByteBuffer>) invokeModel(modelId, localMeth, remoteMeth, input, metadata);
}

// refcount of provided ByteBuf should not be modified
ModelResponse callModel(String modelId, Boolean isVModel, String methodName, Metadata headers, ByteBuf data) throws TException {
return (ModelResponse) invokeModel(modelId, isVModel, directLocalMeth, remoteMeth, methodName, headers, data);
ModelResponse callModel(String modelId, String methodName, Metadata headers, ByteBuf data) throws TException {
return (ModelResponse) invokeModel(modelId, directLocalMeth, remoteMeth, methodName, headers, data);
}

@Idempotent
Expand Down
16 changes: 11 additions & 5 deletions src/main/java/com/ibm/watson/modelmesh/VModelManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import com.ibm.watson.kvutils.KVTable.Helper.TableTxn;
import com.ibm.watson.kvutils.KVTable.TableView;
import com.ibm.watson.kvutils.factory.KVUtilsFactory;
import com.ibm.watson.litelinks.ThreadContext;
import com.ibm.watson.litelinks.ThreadPoolHelper;
import com.ibm.watson.modelmesh.GrpcSupport.InterruptingListener;
import com.ibm.watson.modelmesh.api.ModelInfo;
Expand All @@ -43,6 +42,9 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.Closeable;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
Expand All @@ -68,7 +70,7 @@
/**
* This class contains logic related to VModels (virtual or versioned models)
*/
public final class VModelManager implements AutoCloseable {
public final class VModelManager implements Closeable {

private static final Logger logger = LoggerFactory.getLogger(VModelManager.class);

Expand Down Expand Up @@ -124,9 +126,13 @@ public ListenableFuture<Boolean> start() {
}

@Override
public void close() throws Exception {
vModelTable.close();
targetScaleupExecutor.shutdown();
public void close() {
try {
vModelTable.close();
targetScaleupExecutor.shutdown();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}

/**
Expand Down
2 changes: 1 addition & 1 deletion src/test/java/com/ibm/watson/modelmesh/DummyModelMesh.java
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ protected ModelLoader<?> getLoader() {
@Override
public ByteBuffer applyModel(String modelId, ByteBuffer input, Map<String, String> metadata)
throws TException {
return (ByteBuffer) invokeModel(modelId, false, localMeth, remoteMeth, input, metadata);
return (ByteBuffer) invokeModel(modelId, localMeth, remoteMeth, input, metadata);
}

@Override
Expand Down
14 changes: 7 additions & 7 deletions src/test/java/com/ibm/watson/modelmesh/ModelMeshMetricsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -190,19 +190,19 @@ public void verifyMetrics() throws Exception {
assertEquals(40.0, metrics.get("modelmesh_api_request_milliseconds_bucket{method=\"predict\",code=\"OK\",modelId=\"myModel\",vModelId=\"\",le=\"2000.0\",}"));
// External response time should all be < 200ms (includes cache hit loading time)
assertEquals(40.0,
metrics.get("modelmesh_invoke_model_milliseconds_bucket{method=\"predict\",code=\"OK\",modelId=\"myModel\",vmodelId=\"\",le=\"120000.0\",}"));
metrics.get("modelmesh_invoke_model_milliseconds_bucket{method=\"predict\",code=\"OK\",modelId=\"myModel\",vModelId=\"\",le=\"120000.0\",}"));
// Simulated model sizing time is < 200ms
assertEquals(1.0, metrics.get("modelmesh_model_sizing_milliseconds_bucket{modelId=\"myModel\",vmodelId=\"\",le=\"60000.0\",}"));
assertEquals(1.0, metrics.get("modelmesh_model_sizing_milliseconds_bucket{modelId=\"myModel\",vModelId=\"\",le=\"60000.0\",}"));
// Simulated model sizing time is > 50ms
assertEquals(0.0, metrics.get("modelmesh_model_sizing_milliseconds_bucket{modelId=\"myModel\",vmodelId=\"\",le=\"50.0\",}"));
assertEquals(0.0, metrics.get("modelmesh_model_sizing_milliseconds_bucket{modelId=\"myModel\",vModelId=\"\",le=\"50.0\",}"));
// Simulated model size is between 64MiB and 256MiB
assertEquals(0.0, metrics.get("modelmesh_loaded_model_size_bytes_bucket{modelId=\"myModel\",vmodelId=\"\",le=\"6.7108864E7\",}"));
assertEquals(1.0, metrics.get("modelmesh_loaded_model_size_bytes_bucket{modelId=\"myModel\",vmodelId=\"\",le=\"2.68435456E8\",}"));
assertEquals(0.0, metrics.get("modelmesh_loaded_model_size_bytes_bucket{modelId=\"myModel\",vModelId=\"\",le=\"6.7108864E7\",}"));
assertEquals(1.0, metrics.get("modelmesh_loaded_model_size_bytes_bucket{modelId=\"myModel\",vModelId=\"\",le=\"2.68435456E8\",}"));
// One model is loaded
assertEquals(1.0, metrics.get("modelmesh_instance_models_total"));
// Histogram counts should reflect the two payload sizes (30 small, 10 large)
assertEquals(30.0, metrics.get("modelmesh_request_size_bytes_bucket{method=\"predict\",code=\"OK\",modelId=\"myModel\",vmodelId=\"\",le=\"128.0\",}"));
assertEquals(40.0, metrics.get("modelmesh_request_size_bytes_bucket{method=\"predict\",code=\"OK\",modelId=\"myModel\",vmodelId=\"\",le=\"2097152.0\",}"));
assertEquals(30.0, metrics.get("modelmesh_request_size_bytes_bucket{method=\"predict\",code=\"OK\",modelId=\"myModel\",vModelId=\"\",le=\"128.0\",}"));
assertEquals(40.0, metrics.get("modelmesh_request_size_bytes_bucket{method=\"predict\",code=\"OK\",modelId=\"myModel\",vModelId=\"\",le=\"2097152.0\",}"));

// Memory metrics
assertTrue(metrics.containsKey("netty_pool_mem_allocated_bytes{area=\"direct\",}"));
Expand Down

0 comments on commit fbc8b1b

Please sign in to comment.