Skip to content

Commit

Permalink
EFK with transform for tabular data (#616)
Browse files Browse the repository at this point in the history
request logging EFK setup
  • Loading branch information
ryandawsonuk authored Jun 19, 2019
1 parent 74ca0b0 commit aacb697
Show file tree
Hide file tree
Showing 32 changed files with 934 additions and 59 deletions.
33 changes: 30 additions & 3 deletions doc/source/graph/annotations.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,43 @@ You can configure aspects of Seldon Core via annotations in the SeldonDeployment

### Service Orchestrator

* ```seldon.io/engine-java-opts``` : Java Opts for Service Orchestrator
* Locations : SeldonDeployment.spec.predictors.annotations
* [Java Opts example](model_engine_java_opts.md)
* ```seldon.io/engine-separate-pod``` : Use a separate pod for the service orchestrator
* Locations : SeldonDeployment.spec.annotations
* [Separate svc-orc pod example](model_svcorch_sep.md)
* ```seldon.io/headless-svc``` : Run main endpoint as headless kubernetes service. This is required for gRPC load balancing via Ambassador.
* Locations : SeldonDeployment.spec.annotations
* [gRPC headless example](grpc_load_balancing_ambassador.md)

Otherwise any annotations starting with `seldon.io/engine-` will be interpreted as specifying environment variables for the engine container. These include:

* ```seldon.io/engine-java-opts``` : Java Opts for Service Orchestrator
* Locations : SeldonDeployment.spec.predictors.annotations
* [Java Opts example](model_engine_java_opts.md)
* Translates to the environment variable JAVA_OPTS
* ```seldon.io/engine-log-requests``` : Whether to log raw requests from engine
* Locations : SeldonDeployment.spec.predictors.annotations
* Translates to the environment variable LOG_REQUESTS
* ```seldon.io/engine-log-responses``` : Whether to log raw responses from engine
* Locations : SeldonDeployment.spec.predictors.annotations
* Translates to the environment variable LOG_RESPONSES
* ```seldon.io/engine-log-messages-externally``` : Option to turn on logging of requests via a logging service
* Locations : SeldonDeployment.spec.predictors.annotations
* Translates to the environment variable LOG_MESSAGES_EXTERNALLY
* ```seldon.io/engine-log-message-type``` : Option to override type set on messages when sending to logging service
* Locations : SeldonDeployment.spec.predictors.annotations
* Translates to the environment variable LOG_MESSAGE_TYPE
* ```seldon.io/engine-message-logging-service``` : Option to override url to broker that sends to logging service
* Locations : SeldonDeployment.spec.predictors.annotations
* Translates to the environment variable MESSAGE_LOGGING_SERVICE

More details on logging-related variables can be seen in the [request-logging example](https://github.com/SeldonIO/seldon-core/tree/master/examples/centralised-logging/README.md).

Environment variables for the engine can also be set in the `svcOrchSpec` section of the SeldonDeployment, alongside engine resources. For examples see the helm charts or the [distributed tracing example](./distributed-tracing.md).

If both annotations and `svcOrchSpec` environment variables are used to set an environment variable for the engine container then `svcOrchSpec` environment variables take priority.

The above are the key engine env vars. For a full listing of engine env vars see the application.properties file of the engine source code.

## API OAuth Gateway Annotations
The API OAuth Gateway, if used, can also have the following annotations:

Expand Down
2 changes: 1 addition & 1 deletion doc/source/graph/distributed-tracing.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ Add an environment variable: TRACING with value 1 to activate tracing.

You can utilize the default configuration by simply providing the name of the Jaeger agent service by providing JAEGER_AGENT_HOST environment variable. Override default Jaeger agent port `5775` by setting JAEGER_AGENT_PORT environment variable.

To provide a custom configuration following the Jarger Python configuration yaml defined [here](https://github.com/jaegertracing/jaeger-client-python) you can provide a configmap and the path to the YAML file in JAEGER_CONFIG_PATH environment variable.
To provide a custom configuration following the Jaeger Python configuration yaml defined [here](https://github.com/jaegertracing/jaeger-client-python) you can provide a configmap and the path to the YAML file in JAEGER_CONFIG_PATH environment variable.

An example is show below:

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@
public class RestClientController {

private static Logger logger = LoggerFactory.getLogger(RestClientController.class.getName());



@Autowired
private PredictionService predictionService;

Expand Down Expand Up @@ -150,8 +151,8 @@ public ResponseEntity<String> predictions(RequestEntity<String> requestEntity)
try
{
SeldonMessage response = predictionService.predict(request);
String json = ProtoBufUtils.toJson(response);
return new ResponseEntity<String>(json,HttpStatus.OK);
String responseJson = ProtoBufUtils.toJson(response);
return new ResponseEntity<String>(responseJson,HttpStatus.OK);
}
catch (InterruptedException e) {
throw new APIException(ApiExceptionType.ENGINE_INTERRUPTED,e.getMessage());
Expand Down
125 changes: 122 additions & 3 deletions engine/src/main/java/io/seldon/engine/service/PredictionService.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,23 @@
*******************************************************************************/
package io.seldon.engine.service;

import java.io.IOException;
import java.math.BigInteger;
import java.security.SecureRandom;
import java.time.ZonedDateTime;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.protobuf.Message;
import io.seldon.engine.pb.ProtoBufUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.HttpEntity;
import org.springframework.stereotype.Service;

import com.google.protobuf.InvalidProtocolBufferException;
Expand All @@ -35,6 +42,9 @@
import io.seldon.protos.PredictionProtos.Feedback;
import io.seldon.protos.PredictionProtos.SeldonMessage;
import io.seldon.protos.PredictionProtos.Meta;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
import org.springframework.web.client.RestTemplate;

@Service
public class PredictionService {
Expand All @@ -49,6 +59,21 @@ public class PredictionService {

PuidGenerator puidGenerator = new PuidGenerator();

@Value("${log.requests}")
private boolean logRequests;

@Value("${log.responses}")
private boolean logResponses;

@Value("${log.feedback.requests}")
private boolean logFeedbackRequests;

@Value("${log.messages.externally}")
private boolean logMessagesExternally;

@Value("${message.logging.service}")
private String messageLoggingService;

public final class PuidGenerator {
private SecureRandom random = new SecureRandom();

Expand All @@ -62,13 +87,19 @@ public void sendFeedback(Feedback feedback) throws InterruptedException, Executi
PredictorState predictorState = predictorBean.predictorStateFromPredictorSpec(enginePredictor.getPredictorSpec());

predictorBean.sendFeedback(feedback, predictorState);

if(logFeedbackRequests) {
logMessageAsJson(feedback);
}

return;
}

public SeldonMessage predict(SeldonMessage request) throws InterruptedException, ExecutionException, InvalidProtocolBufferException
{

ZonedDateTime requestTime = ZonedDateTime.now();

if (!request.hasMeta())
{
request = request.toBuilder().setMeta(Meta.newBuilder().setPuid(puidGenerator.nextPuidId()).build()).build();
Expand All @@ -85,7 +116,95 @@ else if (StringUtils.isEmpty(request.getMeta().getPuid()))

SeldonMessage.Builder builder = SeldonMessage.newBuilder(predictorReturn).setMeta(Meta.newBuilder(predictorReturn.getMeta()).setPuid(puid));

return builder.build();
SeldonMessage response = builder.build();

//raw logging in engine, if enabled
if(logRequests){
//log json now we've added puid
logMessageAsJson(request);
}
if(logResponses){
logMessageAsJson(response);
}

//enriched logging outside engine, if enabled
if(logMessagesExternally){
ZonedDateTime responseTime = ZonedDateTime.now();
sendMessagePairAsJson(request,response,requestTime,responseTime);
}

return response;

}

private JsonNode combineRequestResponse(String request, String response, ZonedDateTime requestTime, ZonedDateTime responseTime) throws IOException {
ObjectMapper mapper = new ObjectMapper();
JsonNode requestNode = mapper.readTree(request);
JsonNode responseNode = mapper.readTree(response);
ObjectNode combined = mapper.createObjectNode();
combined.set("request",requestNode);
combined.set("response",responseNode);
((ObjectNode)combined.get("request")).set("date",mapper.readTree(mapper.writeValueAsString(requestTime.toString())));
((ObjectNode)combined.get("response")).set("date",mapper.readTree(mapper.writeValueAsString(responseTime.toString())));
String depName = System.getenv().get("DEPLOYMENT_NAME");
if(depName!=null){
combined.set("sdepName",mapper.readTree(mapper.writeValueAsString(depName)));
}

String depNamespace = System.getenv().get("DEPLOYMENT_NAMESPACE");
if(depNamespace!=null && depNamespace!=""){
combined.set("namespace",mapper.readTree(mapper.writeValueAsString(depNamespace)));
}

return combined;
}

private void sendMessagePairAsJson(SeldonMessage request, SeldonMessage response, ZonedDateTime requestTime, ZonedDateTime responseTime){
try {
String requestJson = ProtoBufUtils.toJson(request);
String responseJson = ProtoBufUtils.toJson(response);
JsonNode pair = combineRequestResponse(requestJson,responseJson,requestTime,responseTime);

MultiValueMap<String, String> headers = new LinkedMultiValueMap<String, String>();
headers.add("Content-Type", "application/json");
headers.add("X-B3-Flags", "1");
headers.add("CE-SpecVersion", "0.2");
headers.add("CE-Type", "seldon.message.pair");
headers.add("CE-Time", requestTime.toString());
headers.add("CE-EventID", request.getMeta().getPuid());

String depName = System.getenv().get("DEPLOYMENT_NAME");
if(depName!=null){
headers.add("CE-Source", "application/json");
} else{
headers.add("CE-Source", "seldon");
}

HttpEntity<?> requestBody = new HttpEntity<Object>(pair.toString(), headers);
RestTemplate restTemplate = new RestTemplate();

restTemplate.postForEntity(messageLoggingService,requestBody,String.class);

}catch (Exception ex){
logger.error("Unable to parse message",ex);
}
}

private void logMessageAsJson(Feedback message){
try {
String json = ProtoBufUtils.toJson(message);
System.out.println(json);
}catch (Exception ex){
logger.error("Unable to parse message",ex);
}
}

private void logMessageAsJson(Message message){
try {
String json = ProtoBufUtils.toJson(message);
System.out.println(json);
}catch (Exception ex){
logger.error("Unable to parse message",ex);
}
}
}
14 changes: 13 additions & 1 deletion engine/src/main/resources/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,16 @@ spring.jmx.enabled = false

logging.file=
logging.level.root=WARN
logging.level.io.seldon=${SELDON_LOG_LEVEL:INFO}
logging.level.io.seldon=${SELDON_LOG_LEVEL:INFO}

#logging of raw requests in-engine
log.requests=${SELDON_LOG_REQUESTS:false}
log.responses=${SELDON_LOG_RESPONSES:false}
log.feedback.requests=${SELDON_LOG_FEEDBACK_REQUESTS:false}

#namespace in which deployed
deployment.namespace=${DEPLOYMENT_NAMESPACE:default}
#send request-response pairs to be processed and logged outside engine
message.logging.service=${SELDON_MESSAGE_LOGGING_SERVICE:http://default-broker.${deployment.namespace}.svc.cluster.local}
log.messages.externally=${SELDON_LOG_MESSAGES_EXTERNALLY:false}
log.message.type=${SELDON_LOG_MESSAGE_TYPE:seldon.message.pair}
37 changes: 37 additions & 0 deletions engine/src/test/java/io/seldon/engine/pb/TestJsonParse.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,11 @@
package io.seldon.engine.pb;

import java.io.IOException;
import java.time.ZonedDateTime;
import java.util.ArrayList;

import com.fasterxml.jackson.databind.node.ArrayNode;
import org.junit.Assert;
import org.junit.Test;

import com.fasterxml.jackson.core.JsonFactory;
Expand All @@ -27,8 +31,12 @@
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.primitives.Doubles;

import static java.time.format.DateTimeFormatter.ISO_ZONED_DATE_TIME;

public class TestJsonParse {

String rawRequest = "{ \"meta\": { \"puid\": \"avodt6jrk9nbgomnco7nhrvpo0\", \"tags\": { }, \"routing\": { }, \"requestPath\": { }, \"metrics\": [] }, \"data\": { \"names\": [\"f0\", \"f1\"], \"ndarray\": [[0.15, 0.74]] }}";
String rawResponse = "{ \"meta\": { \"puid\": \"avodt6jrk9nbgomnco7nhrvpo0\", \"tags\": { }, \"routing\": { }, \"requestPath\": { \"classifier\": \"seldonio/mock_classifier:1.0\" }, \"metrics\": [] }, \"data\": { \"names\": [\"proba\"], \"ndarray\": [[0.07786847593954888]] }}";

@Test
public void multiDimTest() throws JsonProcessingException, IOException
Expand All @@ -49,4 +57,33 @@ public void multiDimTest() throws JsonProcessingException, IOException
((ObjectNode) j.get("request")).set("shape",mapper.valueToTree(shape));
System.out.println(j.toString());
}


private JsonNode combineRequestResponse(String request, String response, ZonedDateTime requestTime, ZonedDateTime responseTime) throws IOException {
ObjectMapper mapper = new ObjectMapper();
JsonNode requestNode = mapper.readTree(request);
JsonNode responseNode = mapper.readTree(response);
ObjectNode combined = mapper.createObjectNode();
combined.set("request",requestNode);
combined.set("response",responseNode);
((ObjectNode)combined.get("request")).set("date",mapper.readTree(mapper.writeValueAsString(requestTime.toString())));
((ObjectNode)combined.get("response")).set("date",mapper.readTree(mapper.writeValueAsString(responseTime.toString())));
String depName = System.getenv().get("DEPLOYMENT_NAME");
if(depName!=null){
combined.set("sdepName",mapper.readTree(depName));
}

return combined;
}


@Test
public void combineRequestResponse() throws JsonProcessingException, IOException
{

ZonedDateTime time = ZonedDateTime.parse("2018-04-26T14:48:09.769Z", ISO_ZONED_DATE_TIME);
JsonNode j = combineRequestResponse(rawRequest,rawResponse,time,time);
Assert.assertEquals(j.toString(),"{\"request\":{\"meta\":{\"puid\":\"avodt6jrk9nbgomnco7nhrvpo0\",\"tags\":{},\"routing\":{},\"requestPath\":{},\"metrics\":[]},\"data\":{\"names\":[\"f0\",\"f1\"],\"ndarray\":[[0.15,0.74]]},\"date\":\"2018-04-26T14:48:09.769Z\"},\"response\":{\"meta\":{\"puid\":\"avodt6jrk9nbgomnco7nhrvpo0\",\"tags\":{},\"routing\":{},\"requestPath\":{\"classifier\":\"seldonio/mock_classifier:1.0\"},\"metrics\":[]},\"data\":{\"names\":[\"proba\"],\"ndarray\":[[0.07786847593954888]]},\"date\":\"2018-04-26T14:48:09.769Z\"}}");
}

}
7 changes: 7 additions & 0 deletions engine/src/test/resources/application.properties
Original file line number Diff line number Diff line change
@@ -1,2 +1,9 @@
server.port = 8081

log.requests=${SELDON_LOG_REQUESTS:true}
log.responses=${SELDON_LOG_RESPONSES:true}
log.feedback.requests=${SELDON_LOG_FEEDBACK_REQUESTS:true}

#send request-response pairs to be processed and logged outside engine
message.logging.service=${SELDON_MESSAGE_LOGGING_SERVICE:http://default-broker.default.svc.cluster.local}
log.messages.externally=${SELDON_LOG_MESSAGES_EXTERNALLY:false}
Loading

0 comments on commit aacb697

Please sign in to comment.