Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Audit Message Logging Interceptor Race Condition #938

Merged
merged 18 commits into from
Sep 7, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,12 @@ public GrpcMessageInterceptor(@Nullable SecurityProperties securityProperties) {
public <ReqT, RespT> Listener<ReqT> interceptCall(
ServerCall<ReqT, RespT> call, Metadata headers, ServerCallHandler<ReqT, RespT> next) {
MessageAuditLogEntry.Builder entryBuilder = MessageAuditLogEntry.newBuilder();
// default response message to empty proto in log entry.
// default response/request message to empty proto in log entry.
// request could be empty when the client closes the connection before sending a request
// message.
// response could be unset when the service encounters an error when processsing the service
// call.
entryBuilder.setRequest(Empty.newBuilder().build());
entryBuilder.setResponse(Empty.newBuilder().build());

// Unpack service & method name from call
Expand Down
232 changes: 232 additions & 0 deletions core/src/test/java/feast/core/logging/CoreLoggingIT.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,232 @@
/*
* SPDX-License-Identifier: Apache-2.0
* Copyright 2018-2020 The Feast Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package feast.core.logging;

import static org.hamcrest.CoreMatchers.*;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

import com.google.common.collect.Streams;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.util.JsonFormat;
import feast.common.it.BaseIT;
import feast.common.it.DataGenerator;
import feast.common.logging.entry.AuditLogEntryKind;
import feast.proto.core.CoreServiceGrpc;
import feast.proto.core.CoreServiceGrpc.CoreServiceBlockingStub;
import feast.proto.core.CoreServiceGrpc.CoreServiceFutureStub;
import feast.proto.core.CoreServiceProto.GetFeastCoreVersionRequest;
import feast.proto.core.CoreServiceProto.ListFeatureSetsRequest;
import feast.proto.core.CoreServiceProto.ListStoresRequest;
import feast.proto.core.CoreServiceProto.ListStoresResponse;
import feast.proto.core.CoreServiceProto.UpdateStoreRequest;
import feast.proto.core.CoreServiceProto.UpdateStoreResponse;
import io.grpc.Channel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.Status.Code;
import io.grpc.StatusRuntimeException;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.core.LoggerContext;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest(
properties = {
"feast.logging.audit.enabled=true",
"feast.logging.audit.messageLoggingEnabled=true",
})
public class CoreLoggingIT extends BaseIT {
private static TestLogAppender testAuditLogAppender;
private static CoreServiceBlockingStub coreService;
private static CoreServiceFutureStub asyncCoreService;

@BeforeAll
public static void globalSetUp(@Value("${grpc.server.port}") int coreGrpcPort)
throws InterruptedException, ExecutionException {
LoggerContext logContext = (LoggerContext) LogManager.getContext(false);
// NOTE: As log appender state is shared across tests use a different method
// for each test and filter by method name to ensure that you only get logs
// for a specific test.
testAuditLogAppender = logContext.getConfiguration().getAppender("TestAuditLogAppender");

// Connect to core service.
Channel channel =
ManagedChannelBuilder.forAddress("localhost", coreGrpcPort).usePlaintext().build();
coreService = CoreServiceGrpc.newBlockingStub(channel);
asyncCoreService = CoreServiceGrpc.newFutureStub(channel);

// Preflight a request to core service stubs to verify connection
coreService.getFeastCoreVersion(GetFeastCoreVersionRequest.getDefaultInstance());
asyncCoreService.getFeastCoreVersion(GetFeastCoreVersionRequest.getDefaultInstance()).get();
}

/** Check that messsage audit log are produced on service call */
@Test
public void shouldProduceMessageAuditLogsOnCall()
throws InterruptedException, InvalidProtocolBufferException {
// Generate artifical load on feast core.
UpdateStoreRequest request =
UpdateStoreRequest.newBuilder().setStore(DataGenerator.getDefaultStore()).build();
UpdateStoreResponse response = coreService.updateStore(request);

// Wait required to ensure audit logs are flushed into test audit log appender
Thread.sleep(1000);
// Check message audit logs are produced for each audit log.
woop marked this conversation as resolved.
Show resolved Hide resolved
JsonFormat.Parser protoJSONParser = JsonFormat.parser();
// Pull message audit logs logs from test log appender
List<JsonObject> logJsonObjects =
parseMessageJsonLogObjects(testAuditLogAppender.getLogs(), "UpdateStore");
assertEquals(1, logJsonObjects.size());
JsonObject logObj = logJsonObjects.get(0);

// Extract & Check that request/response are returned correctly
String requestJson = logObj.getAsJsonObject("request").toString();
UpdateStoreRequest.Builder gotRequest = UpdateStoreRequest.newBuilder();
protoJSONParser.merge(requestJson, gotRequest);

String responseJson = logObj.getAsJsonObject("response").toString();
UpdateStoreResponse.Builder gotResponse = UpdateStoreResponse.newBuilder();
protoJSONParser.merge(responseJson, gotResponse);

assertThat(gotRequest.build(), equalTo(request));
assertThat(gotResponse.build(), equalTo(response));
}

/** Check that message audit logs are produced when server encounters an error */
@Test
public void shouldProduceMessageAuditLogsOnError() throws InterruptedException {
// Send a bad request which should cause Core to error
ListFeatureSetsRequest request =
ListFeatureSetsRequest.newBuilder()
.setFilter(
ListFeatureSetsRequest.Filter.newBuilder()
.setProject("*")
.setFeatureSetName("nop")
.build())
.build();

boolean hasExpectedException = false;
Code statusCode = null;
try {
coreService.listFeatureSets(request);
} catch (StatusRuntimeException e) {
hasExpectedException = true;
statusCode = e.getStatus().getCode();
}
assertTrue(hasExpectedException);

// Wait required to ensure audit logs are flushed into test audit log appender
Thread.sleep(1000);
// Pull message audit logs logs from test log appender
List<JsonObject> logJsonObjects =
parseMessageJsonLogObjects(testAuditLogAppender.getLogs(), "ListFeatureSets");

assertEquals(1, logJsonObjects.size());
JsonObject logJsonObject = logJsonObjects.get(0);
// Check correct status code is tracked on error.
assertEquals(logJsonObject.get("statusCode").getAsString(), statusCode.toString());
}

/** Check that expected message audit logs are produced when under load. */
@Test
public void shouldProduceExpectedAuditLogsUnderLoad()
throws InterruptedException, ExecutionException {
// Generate artifical requests on core to simulate load.
int LOAD_SIZE = 40; // Total number of requests to send.
int BURST_SIZE = 5; // Number of requests to send at once.

ListStoresRequest request = ListStoresRequest.getDefaultInstance();
List<ListStoresResponse> responses = new LinkedList<>();
for (int i = 0; i < LOAD_SIZE; i += 5) {
List<ListenableFuture<ListStoresResponse>> futures = new LinkedList<>();
for (int j = 0; j < BURST_SIZE; j++) {
futures.add(asyncCoreService.listStores(request));
}

responses.addAll(Futures.allAsList(futures).get());
}
// Wait required to ensure audit logs are flushed into test audit log appender
Thread.sleep(1000);

// Pull message audit logs from test log appender
List<JsonObject> logJsonObjects =
parseMessageJsonLogObjects(testAuditLogAppender.getLogs(), "ListStores");
assertEquals(responses.size(), logJsonObjects.size());

// Extract & Check that request/response are returned correctly
JsonFormat.Parser protoJSONParser = JsonFormat.parser();
Streams.zip(
responses.stream(),
logJsonObjects.stream(),
(response, logObj) -> Pair.of(response, logObj))
.forEach(
responseLogJsonPair -> {
ListStoresResponse response = responseLogJsonPair.getLeft();
JsonObject logObj = responseLogJsonPair.getRight();

ListStoresRequest.Builder gotRequest = null;
ListStoresResponse.Builder gotResponse = null;
try {
String requestJson = logObj.getAsJsonObject("request").toString();
gotRequest = ListStoresRequest.newBuilder();
protoJSONParser.merge(requestJson, gotRequest);

String responseJson = logObj.getAsJsonObject("response").toString();
gotResponse = ListStoresResponse.newBuilder();
protoJSONParser.merge(responseJson, gotResponse);
} catch (InvalidProtocolBufferException e) {
throw new RuntimeException(e);
}

assertThat(gotRequest.build(), equalTo(request));
assertThat(gotResponse.build(), equalTo(response));
});
}

/**
* Filter and Parse out Message Audit Logs from the given logsStrings for the given method name
*/
private List<JsonObject> parseMessageJsonLogObjects(List<String> logsStrings, String methodName) {
JsonParser jsonParser = new JsonParser();
// copy to prevent concurrent modification.
return logsStrings.stream()
.map(logJSON -> jsonParser.parse(logJSON).getAsJsonObject())
// Filter to only include message audit logs
.filter(
logObj ->
logObj
.getAsJsonPrimitive("kind")
.getAsString()
.equals(AuditLogEntryKind.MESSAGE.toString())
// filter by method name to ensure logs from other tests do not interfere with
// test
&& logObj.get("method").getAsString().equals(methodName))
.collect(Collectors.toList());
}
}
68 changes: 68 additions & 0 deletions core/src/test/java/feast/core/logging/TestLogAppender.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* SPDX-License-Identifier: Apache-2.0
* Copyright 2018-2020 The Feast Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package feast.core.logging;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import lombok.Getter;
import org.apache.logging.log4j.core.Appender;
import org.apache.logging.log4j.core.Core;
import org.apache.logging.log4j.core.Filter;
import org.apache.logging.log4j.core.Layout;
import org.apache.logging.log4j.core.LogEvent;
import org.apache.logging.log4j.core.appender.AbstractAppender;
import org.apache.logging.log4j.core.config.Property;
import org.apache.logging.log4j.core.config.plugins.Plugin;
import org.apache.logging.log4j.core.config.plugins.PluginAttribute;
import org.apache.logging.log4j.core.config.plugins.PluginElement;
import org.apache.logging.log4j.core.config.plugins.PluginFactory;
import org.apache.logging.log4j.core.layout.PatternLayout;

/** Test Log Appender used for collecting logs for testing logging. */
@Plugin(
name = "TestLogAppender",
category = Core.CATEGORY_NAME,
elementType = Appender.ELEMENT_TYPE)
@Getter
public class TestLogAppender extends AbstractAppender {
private List<String> logs;

protected TestLogAppender(String name, Filter filter, Layout<? extends Serializable> layout) {
super(name, filter, layout, false, new Property[] {});
logs = new ArrayList<>();
}

@Override
public void append(LogEvent event) {
getLogs().add(event.getMessage().toString());
}

@PluginFactory
public static TestLogAppender createAppender(
@PluginAttribute("name") String name,
@PluginElement("Layout") Layout<? extends Serializable> layout,
@PluginElement("Filter") final Filter filter) {
if (name == null) {
return null;
}
if (layout == null) {
layout = PatternLayout.createDefaultLayout();
}
return new TestLogAppender(name, filter, layout);
}
}
53 changes: 53 additions & 0 deletions core/src/test/resources/log4j2.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Copyright 2018 The Feast Authors
~
~ Licensed under the Apache License, Version 2.0 (the "License");
~ you may not use this file except in compliance with the License.
~ You may obtain a copy of the License at
~
~ https://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
~
-->

<Configuration status="WARN" packages="feast.core.logging">
<Properties>
<Property name="LOG_PATTERN">
%d{yyyy-MM-dd HH:mm:ss.SSS} %5p ${hostName} --- [%15.15t] %-40.40c{1.} : %m%n%ex
</Property>
<Property name="JSON_LOG_PATTERN">
{"time":"%d{yyyy-MM-dd'T'HH:mm:ssXXX}","hostname":"${hostName}","severity":"%p","message":%m}%n%ex
</Property>
</Properties>
<Appenders>
<Console name="ConsoleAppender" target="SYSTEM_OUT" follow="true">
<MarkerFilter marker="AUDIT_MARK" onMatch="DENY" onMismatch="ACCEPT"/>
<PatternLayout pattern="${LOG_PATTERN}"/>
</Console>
<Console name="JSONAppender" target="SYSTEM_OUT" follow="true">
<MarkerFilter marker="AUDIT_MARK" onMatch="ACCEPT" onMismatch="DENY"/>
<PatternLayout pattern="${JSON_LOG_PATTERN}"/>
</Console>
<TestLogAppender name="TestAuditLogAppender">
<MarkerFilter marker="AUDIT_MARK" onMatch="ACCEPT" onMismatch="DENY"/>
</TestLogAppender>
</Appenders>
<Loggers>
<Logger name="feast.core" level="info" additivity="false">
<AppenderRef ref="ConsoleAppender"/>
<AppenderRef ref="JSONAppender"/>
<AppenderRef ref="TestAuditLogAppender"/>
</Logger>
<Root level="info">
<AppenderRef ref="ConsoleAppender"/>
<AppenderRef ref="JSONAppender"/>
<AppenderRef ref="TestAuditLogAppender"/>
</Root>
</Loggers>
</Configuration>