Skip to content

Commit

Permalink
Implement agentless log submission for Log4j2 (#7082)
Browse files Browse the repository at this point in the history
  • Loading branch information
nikita-tkachenko-datadog authored Jun 7, 2024
1 parent 9121e58 commit edb2c3f
Show file tree
Hide file tree
Showing 21 changed files with 760 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,12 @@ private HttpUrl getAgentlessUrl(Intake intake) {
}

public enum Intake {
API("api", "v2", Config::isCiVisibilityAgentlessEnabled, Config::getCiVisibilityAgentlessUrl);
API("api", "v2", Config::isCiVisibilityAgentlessEnabled, Config::getCiVisibilityAgentlessUrl),
LOGS(
"http-intake.logs",
"v2",
Config::isAgentlessLogSubmissionEnabled,
Config::getAgentlessLogSubmissionUrl);

public final String urlPrefix;
public final String version;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,7 @@ public static void shutdown(final boolean sync) {
if (telemetryEnabled) {
stopTelemetry();
}
shutdownLogsIntake();
}

public static synchronized Class<?> installAgentCLI() throws Exception {
Expand Down Expand Up @@ -487,6 +488,7 @@ public void execute() {
maybeStartAppSec(scoClass, sco);
maybeStartIast(scoClass, sco);
maybeStartCiVisibility(instrumentation, scoClass, sco);
maybeStartLogsIntake(scoClass, sco);
// start debugger before remote config to subscribe to it before starting to poll
maybeStartDebugger(instrumentation, scoClass, sco);
maybeStartRemoteConfig(scoClass, sco);
Expand Down Expand Up @@ -798,6 +800,37 @@ private static void maybeStartCiVisibility(Instrumentation inst, Class<?> scoCla
}
}

private static void maybeStartLogsIntake(Class<?> scoClass, Object sco) {
StaticEventLogger.begin("Logs Intake");

try {
final Class<?> logsIntakeSystemClass =
AGENT_CLASSLOADER.loadClass("datadog.trace.logging.intake.LogsIntakeSystem");
final Method logsIntakeInstallerMethod = logsIntakeSystemClass.getMethod("start", scoClass);
logsIntakeInstallerMethod.invoke(null, sco);
} catch (final Throwable e) {
log.warn("Not starting Logs Intake subsystem", e);
}

StaticEventLogger.end("Logs Intake");
}

private static void shutdownLogsIntake() {
if (AGENT_CLASSLOADER == null) {
// It wasn't started, so no need to shut it down
return;
}
try {
Thread.currentThread().setContextClassLoader(AGENT_CLASSLOADER);
final Class<?> logsIntakeSystemClass =
AGENT_CLASSLOADER.loadClass("datadog.trace.logging.intake.LogsIntakeSystem");
final Method shutdownMethod = logsIntakeSystemClass.getMethod("shutdown");
shutdownMethod.invoke(null);
} catch (final Throwable ex) {
log.error("Throwable thrown while shutting down logs intake", ex);
}
}

private static void startTelemetry(Instrumentation inst, Class<?> scoClass, Object sco) {
StaticEventLogger.begin("Telemetry");

Expand Down
24 changes: 24 additions & 0 deletions dd-java-agent/agent-logs-intake/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
plugins {
id 'com.github.johnrengelman.shadow'
}

apply from: "$rootDir/gradle/java.gradle"
apply from: "$rootDir/gradle/version.gradle"

excludedClassesCoverage += [
"datadog.trace.logging.intake.LogsWriterImpl",
"datadog.trace.logging.intake.LogsIntakeSystem",
]

dependencies {
implementation project(':internal-api')
implementation project(':communication')
}

shadowJar {
dependencies deps.excludeShared
}

jar {
archiveClassifier = 'unbundled'
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package datadog.trace.logging.intake;

import com.squareup.moshi.JsonAdapter;
import com.squareup.moshi.Moshi;
import datadog.communication.BackendApi;
import datadog.communication.http.OkHttpUtils;
import datadog.communication.util.IOThrowingFunction;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
import okhttp3.MediaType;
import okhttp3.RequestBody;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class LogsDispatcher {

private static final Logger LOGGER = LoggerFactory.getLogger(LogsDispatcher.class);

private static final MediaType JSON = MediaType.get("application/json");
private static final IOThrowingFunction<InputStream, Object> IGNORE_RESPONSE = is -> null;

// Maximum array size if sending multiple logs in an array: 1000 entries
static final int MAX_BATCH_RECORDS = 1000;

// Maximum content size per payload (uncompressed): 5MB
static final int MAX_BATCH_BYTES = 5 * 1024 * 1024;

// Maximum size for a single log: 1MB
static final int MAX_MESSAGE_BYTES = 1024 * 1024;

private final BackendApi backendApi;
private final JsonAdapter<Map> jsonAdapter;
private final int maxBatchRecords;
private final int maxBatchBytes;
private final int maxMessageBytes;

public LogsDispatcher(BackendApi backendApi) {
this(backendApi, MAX_BATCH_RECORDS, MAX_BATCH_BYTES, MAX_MESSAGE_BYTES);
}

LogsDispatcher(
BackendApi backendApi, int maxBatchRecords, int maxBatchBytes, int maxMessageBytes) {
this.backendApi = backendApi;

Moshi moshi = new Moshi.Builder().build();
jsonAdapter = moshi.adapter(Map.class);

this.maxBatchRecords = maxBatchRecords;
this.maxBatchBytes = maxBatchBytes;
this.maxMessageBytes = maxMessageBytes;
}

public void dispatch(List<Map<String, Object>> messages) {
StringBuilder batch = new StringBuilder("[");
int batchCount = 0, batchLength = 0;

for (Map<String, Object> message : messages) {
String json = jsonAdapter.toJson(message);
byte[] bytes = json.getBytes(StandardCharsets.UTF_8);
if (bytes.length > maxMessageBytes) {
LOGGER.debug("Discarding a log message whose size {} exceeds the limit", bytes.length);
continue;
}

if (batchCount + 1 > maxBatchRecords || batchLength + bytes.length >= maxBatchBytes) {
flush(batch.append("]"));
batch = new StringBuilder("[");
batchCount = 0;
batchLength = 0;
}

if (batchCount != 0) {
batch.append(",");
}
batch.append(json);
batchCount += 1;
batchLength += bytes.length;
}

flush(batch.append("]"));
}

private void flush(StringBuilder batch) {
try {
RequestBody requestBody = RequestBody.create(JSON, batch.toString());
RequestBody gzippedRequestBody = OkHttpUtils.gzippedRequestBodyOf(requestBody);
backendApi.post("logs", gzippedRequestBody, IGNORE_RESPONSE, null, true);
} catch (IOException e) {
LOGGER.error("Could not dispatch logs", e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package datadog.trace.logging.intake;

import datadog.communication.BackendApi;
import datadog.communication.BackendApiFactory;
import datadog.communication.ddagent.SharedCommunicationObjects;
import datadog.trace.api.Config;
import datadog.trace.api.logging.intake.LogsIntake;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class LogsIntakeSystem {

private static final Logger LOGGER = LoggerFactory.getLogger(LogsIntakeSystem.class);

public static void start(SharedCommunicationObjects sco) {
Config config = Config.get();
if (!config.isAgentlessLogSubmissionEnabled()) {
LOGGER.debug("Agentless logs intake is disabled");
return;
}

BackendApiFactory apiFactory = new BackendApiFactory(config, sco);
BackendApi backendApi = apiFactory.createBackendApi(BackendApiFactory.Intake.LOGS);
LogsDispatcher dispatcher = new LogsDispatcher(backendApi);
LogsWriterImpl writer = new LogsWriterImpl(config, dispatcher);
writer.start();

LogsIntake.registerWriter(writer);
}

public static void shutdown() {
LogsIntake.shutdown();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
package datadog.trace.logging.intake;

import static datadog.trace.util.AgentThreadFactory.AGENT_THREAD_GROUP;

import datadog.trace.api.Config;
import datadog.trace.api.logging.intake.LogsWriter;
import datadog.trace.util.AgentThreadFactory;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class LogsWriterImpl implements LogsWriter {

private static final Logger LOGGER = LoggerFactory.getLogger(LogsWriterImpl.class);

private static final long POLLING_THREAD_SHUTDOWN_TIMEOUT_MILLIS = 5_000;
private static final int ENQUEUE_LOG_TIMEOUT_MILLIS = 1_000;

private final Map<String, Object> commonTags;
private final LogsDispatcher logsDispatcher;
private final BlockingQueue<Map<String, Object>> messageQueue;
private final Thread messagePollingThread;

public LogsWriterImpl(Config config, LogsDispatcher logsDispatcher) {
this.logsDispatcher = logsDispatcher;

commonTags = new HashMap<>();
commonTags.put("ddsource", "java");
commonTags.put("ddtags", "datadog.product:" + config.getAgentlessLogSubmissionProduct());
commonTags.put("service", config.getServiceName());
commonTags.put("hostname", config.getHostName());

messageQueue = new ArrayBlockingQueue<>(config.getAgentlessLogSubmissionQueueSize());
messagePollingThread =
AgentThreadFactory.newAgentThread(
AgentThreadFactory.AgentThread.LOGS_INTAKE, this::logPollingLoop);
}

@Override
public void start() {
try {
Runtime.getRuntime()
.addShutdownHook(
new Thread(AGENT_THREAD_GROUP, this::shutdown, "dd-logs-intake-shutdown-hook"));
messagePollingThread.start();
} catch (final IllegalStateException ex) {
// The JVM is already shutting down.
}
}

@Override
public void shutdown() {
if (!messagePollingThread.isAlive()) {
return;
}

messagePollingThread.interrupt();
try {
messagePollingThread.join(POLLING_THREAD_SHUTDOWN_TIMEOUT_MILLIS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOGGER.debug("Interrupted while waiting for log polling thread to stop");
}
}

@Override
public void log(Map<String, Object> message) {
try {
message.putAll(commonTags);

if (!messageQueue.offer(message, ENQUEUE_LOG_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
LOGGER.debug("Timeout while trying to enqueue log message");
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOGGER.debug("Interrupted while trying to log");
}
}

private void logPollingLoop() {
while (!Thread.currentThread().isInterrupted()) {
try {
List<Map<String, Object>> batch = new ArrayList<>();
batch.add(messageQueue.take());
messageQueue.drainTo(batch);
logsDispatcher.dispatch(batch);

} catch (InterruptedException e) {
break;
}
}

List<Map<String, Object>> batch = new ArrayList<>();
messageQueue.drainTo(batch);
if (!batch.isEmpty()) {
logsDispatcher.dispatch(batch);
}
}
}
Loading

0 comments on commit edb2c3f

Please sign in to comment.