From de107e5d992e304b0490c1ab3a2fbd7ac74e8b2b Mon Sep 17 00:00:00 2001 From: Trask Stalnaker Date: Thu, 16 Nov 2023 19:02:14 -0800 Subject: [PATCH] use readwritelock --- .../appender/v1_0/OpenTelemetryAppender.java | 61 ++++++++++++------- .../AbstractOpenTelemetryAppenderTest.java | 2 +- .../LogReplayOpenTelemetryAppenderTest.java | 21 ++++++- 3 files changed, 58 insertions(+), 26 deletions(-) diff --git a/instrumentation/logback/logback-appender-1.0/library/src/main/java/io/opentelemetry/instrumentation/logback/appender/v1_0/OpenTelemetryAppender.java b/instrumentation/logback/logback-appender-1.0/library/src/main/java/io/opentelemetry/instrumentation/logback/appender/v1_0/OpenTelemetryAppender.java index 34552fae0c4f..1b3ed0574c66 100644 --- a/instrumentation/logback/logback-appender-1.0/library/src/main/java/io/opentelemetry/instrumentation/logback/appender/v1_0/OpenTelemetryAppender.java +++ b/instrumentation/logback/logback-appender-1.0/library/src/main/java/io/opentelemetry/instrumentation/logback/appender/v1_0/OpenTelemetryAppender.java @@ -12,11 +12,16 @@ import ch.qos.logback.core.UnsynchronizedAppenderBase; import io.opentelemetry.api.OpenTelemetry; import io.opentelemetry.instrumentation.logback.appender.v1_0.internal.LoggingEventMapper; +import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Consumer; import java.util.stream.Collectors; import org.slf4j.ILoggerFactory; import org.slf4j.LoggerFactory; @@ -37,6 +42,8 @@ public class OpenTelemetryAppender extends UnsynchronizedAppenderBase eventsToReplay = new ArrayBlockingQueue<>(1000); private final AtomicBoolean replayLimitWarningLogged = new AtomicBoolean(); + private final ReadWriteLock lock = new ReentrantReadWriteLock(); + public OpenTelemetryAppender() {} /** @@ -71,35 +78,32 @@ public void start() { captureMarkerAttribute, captureKeyValuePairAttributes, captureLoggerContext); - if (openTelemetry == null) { - openTelemetry = OpenTelemetry.noop(); - } super.start(); } @SuppressWarnings("SystemOut") @Override protected void append(ILoggingEvent event) { - // TODO(jean): Race condition to fix - // time=1 append() thread gets the no-op instance - // time=2 install() thread updates the instance and flushes the queue - // time=3 append() thread adds the log event to the queue - if (openTelemetry == OpenTelemetry.noop()) { - if (eventsToReplay.remainingCapacity() > 0) { - LoggingEventToReplay logEventToReplay = - new LoggingEventToReplay(event, captureExperimentalAttributes, captureCodeAttributes); - eventsToReplay.offer(logEventToReplay); - } else if (!replayLimitWarningLogged.getAndSet(true)) { + Lock readLock = lock.readLock(); + readLock.lock(); + try { + OpenTelemetry openTelemetry = this.openTelemetry; + if (openTelemetry != null) { + emit(openTelemetry, event); + return; + } + + LoggingEventToReplay logEventToReplay = + new LoggingEventToReplay(event, captureExperimentalAttributes, captureCodeAttributes); + + if (!eventsToReplay.offer(logEventToReplay) && !replayLimitWarningLogged.getAndSet(true)) { String message = "Log cache size of the OpenTelemetry appender is too small. firstLogsCacheSize value has to be increased;"; System.err.println(message); } - return; + } finally { + readLock.unlock(); } - mapper.emit( - openTelemetry.getLogsBridge(), - new LoggingEventToReplay(event, captureExperimentalAttributes, captureCodeAttributes), - -1); } /** @@ -173,13 +177,26 @@ public void setNumLogsCapturedBeforeOtelInstall(int size) { * to function. See {@link #install(OpenTelemetry)} for simple installation option. */ public void setOpenTelemetry(OpenTelemetry openTelemetry) { - this.openTelemetry = openTelemetry; - LoggingEventToReplay eventToReplay; - while ((eventToReplay = eventsToReplay.poll()) != null) { - mapper.emit(openTelemetry.getLogsBridge(), eventToReplay, -1); + List eventsToReplay = new ArrayList<>(); + Lock writeLock = lock.writeLock(); + writeLock.lock(); + try { + // minimize scope of write lock + this.openTelemetry = openTelemetry; + this.eventsToReplay.drainTo(eventsToReplay); + } finally { + writeLock.unlock(); + } + // now emit + for (LoggingEventToReplay eventToReplay : eventsToReplay) { + emit(openTelemetry, eventToReplay); } } + private void emit(OpenTelemetry openTelemetry, ILoggingEvent event) { + mapper.emit(openTelemetry.getLogsBridge(), event, -1); + } + // copied from SDK's DefaultConfigProperties private static List filterBlanksAndNulls(String[] values) { return Arrays.stream(values) diff --git a/instrumentation/logback/logback-appender-1.0/library/src/test/java/io/opentelemetry/instrumentation/logback/appender/v1_0/AbstractOpenTelemetryAppenderTest.java b/instrumentation/logback/logback-appender-1.0/library/src/test/java/io/opentelemetry/instrumentation/logback/appender/v1_0/AbstractOpenTelemetryAppenderTest.java index fceab63edbc4..dea7610db9f3 100644 --- a/instrumentation/logback/logback-appender-1.0/library/src/test/java/io/opentelemetry/instrumentation/logback/appender/v1_0/AbstractOpenTelemetryAppenderTest.java +++ b/instrumentation/logback/logback-appender-1.0/library/src/test/java/io/opentelemetry/instrumentation/logback/appender/v1_0/AbstractOpenTelemetryAppenderTest.java @@ -61,7 +61,7 @@ static void setupAll() { resetLoggerContext(); } - private static void resetLoggerContext() { + static void resetLoggerContext() { try { LoggerContext loggerContext = (LoggerContext) LoggerFactory.getILoggerFactory(); Field field = ContextBase.class.getDeclaredField("propertyMap"); diff --git a/instrumentation/logback/logback-appender-1.0/library/src/test/java/io/opentelemetry/instrumentation/logback/appender/v1_0/LogReplayOpenTelemetryAppenderTest.java b/instrumentation/logback/logback-appender-1.0/library/src/test/java/io/opentelemetry/instrumentation/logback/appender/v1_0/LogReplayOpenTelemetryAppenderTest.java index fdb8674a78b3..f398fe2b0c31 100644 --- a/instrumentation/logback/logback-appender-1.0/library/src/test/java/io/opentelemetry/instrumentation/logback/appender/v1_0/LogReplayOpenTelemetryAppenderTest.java +++ b/instrumentation/logback/logback-appender-1.0/library/src/test/java/io/opentelemetry/instrumentation/logback/appender/v1_0/LogReplayOpenTelemetryAppenderTest.java @@ -7,19 +7,34 @@ import static org.assertj.core.api.Assertions.assertThat; -import io.opentelemetry.api.OpenTelemetry; +import ch.qos.logback.classic.LoggerContext; +import ch.qos.logback.classic.util.ContextInitializer; +import ch.qos.logback.core.joran.spi.JoranException; import io.opentelemetry.sdk.logs.data.LogRecordData; import io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions; +import java.net.URL; import java.util.List; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.slf4j.LoggerFactory; class LogReplayOpenTelemetryAppenderTest extends AbstractOpenTelemetryAppenderTest { @BeforeEach - void setup() { + void setup() throws JoranException { generalBeforeEachSetup(); - OpenTelemetryAppender.install(OpenTelemetry.noop()); + // to make sure we start fresh with a new OpenTelemetryAppender for each test + reloadLoggerConfiguration(); + } + + private static void reloadLoggerConfiguration() throws JoranException { + LoggerContext loggerContext = (LoggerContext) LoggerFactory.getILoggerFactory(); + ContextInitializer ci = new ContextInitializer(loggerContext); + URL url = ci.findURLOfDefaultConfigurationFile(true); + loggerContext.reset(); + ci.configureByResource(url); + // by default LoggerContext contains HOSTNAME property we clear it to start with empty context + resetLoggerContext(); } @Override