Skip to content

Commit

Permalink
ensure queue timer is started for RunnableFuture types
Browse files Browse the repository at this point in the history
  • Loading branch information
richardstartin committed Mar 20, 2024
1 parent e20dfff commit e893948
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,9 @@ public static boolean shouldPropagate(
return Boolean.TRUE.equals(contextStore.get(executor));
}

public static void capture(
ContextStore<Runnable, State> contextStore, ThreadPoolExecutor executor, Runnable task) {
public static void capture(ContextStore<Runnable, State> contextStore, Runnable task) {
if (task != null && !exclude(RUNNABLE, task)) {
AdviceUtils.capture(contextStore, task, true);
QueueTimerHelper.startQueuingTimer(contextStore, executor.getClass(), task);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named;
import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.namedOneOf;
import static datadog.trace.bootstrap.instrumentation.java.concurrent.ExcludeFilter.ExcludeType.RUNNABLE;
import static datadog.trace.bootstrap.instrumentation.java.concurrent.ExcludeFilter.ExcludeType.RUNNABLE_FUTURE;
import static datadog.trace.bootstrap.instrumentation.java.concurrent.ExcludeFilter.exclude;
import static datadog.trace.instrumentation.java.concurrent.AbstractExecutorInstrumentation.EXEC_NAME;
import static net.bytebuddy.matcher.ElementMatchers.isConstructor;
import static net.bytebuddy.matcher.ElementMatchers.isDeclaredBy;
Expand All @@ -20,6 +22,7 @@
import datadog.trace.bootstrap.InstrumentationContext;
import datadog.trace.bootstrap.instrumentation.api.AgentScope;
import datadog.trace.bootstrap.instrumentation.java.concurrent.ExcludeFilter;
import datadog.trace.bootstrap.instrumentation.java.concurrent.QueueTimerHelper;
import datadog.trace.bootstrap.instrumentation.java.concurrent.State;
import datadog.trace.bootstrap.instrumentation.java.concurrent.TPEHelper;
import datadog.trace.bootstrap.instrumentation.java.concurrent.Wrapper;
Expand All @@ -28,7 +31,9 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.ThreadPoolExecutor;

import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.method.MethodDescription;
import net.bytebuddy.description.type.TypeDescription;
Expand Down Expand Up @@ -91,6 +96,7 @@ public Map<String, String> contextStore() {
final Map<String, String> stores = new HashMap<>();
stores.put(TPE, Boolean.class.getName());
stores.put(Runnable.class.getName(), State.class.getName());
stores.put(RunnableFuture.class.getName(), State.class.getName());
return Collections.unmodifiableMap(stores);
}

Expand Down Expand Up @@ -149,7 +155,19 @@ public static void capture(
if (TPEHelper.useWrapping(task)) {
task = Wrapper.wrap(task);
} else {
TPEHelper.capture(InstrumentationContext.get(Runnable.class, State.class), tpe, task);
TPEHelper.capture(InstrumentationContext.get(Runnable.class, State.class), task);
// queue time needs to be handled separately because there are RunnableFutures which are
// excluded as
// Runnables but it is not until now that they will be put on the executor's queue
if (!exclude(RUNNABLE, task)) {
QueueTimerHelper.startQueuingTimer(
InstrumentationContext.get(Runnable.class, State.class), tpe.getClass(), task);
} else if (!exclude(RUNNABLE_FUTURE, task) && task instanceof RunnableFuture) {
QueueTimerHelper.startQueuingTimer(
InstrumentationContext.get(RunnableFuture.class, State.class),
tpe.getClass(),
(RunnableFuture<?>) task);
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
import datadog.trace.agent.test.AgentTestRunner
import datadog.trace.agent.test.TestProfilingContextIntegration
import datadog.trace.bootstrap.instrumentation.jfr.InstrumentationBasedProfiling

import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
import java.util.concurrent.ForkJoinPool

import static datadog.trace.agent.test.utils.TraceUtils.runUnderTrace

class QueueTimingForkedTest extends AgentTestRunner {

@Override
protected void configurePreAgent() {
// required for enabling the unwrapping instrumentation to get the relevant non-carrier class names
injectSysConfig("dd.profiling.enabled", "true")
injectSysConfig("dd.profiling.experimental.queueing.time.enabled", "true")
InstrumentationBasedProfiling.enableInstrumentationBasedProfiling()
super.configurePreAgent()
}

def "test queue timing with submit #executor"() {

when:
runUnderTrace("parent", {
executor.submit(new TestRunnable()).get()
})

then:
verify(executor)

cleanup:
executor.shutdown()
TEST_PROFILING_CONTEXT_INTEGRATION.closedTimings.clear()

where:
executor << [Executors.newSingleThreadExecutor(), new ForkJoinPool(1)]
}

void verify(ExecutorService executorService) {
assert TEST_PROFILING_CONTEXT_INTEGRATION.isBalanced()
assert !TEST_PROFILING_CONTEXT_INTEGRATION.closedTimings.isEmpty()
int numAsserts = 0
while (!TEST_PROFILING_CONTEXT_INTEGRATION.closedTimings.isEmpty()) {
def timing = TEST_PROFILING_CONTEXT_INTEGRATION.closedTimings.takeFirst() as TestProfilingContextIntegration.TestQueueTiming
if (!(timing.task as Class).simpleName.isEmpty()) {
assert timing != null
assert timing.task == TestRunnable
assert timing.scheduler != null
assert timing.origin == Thread.currentThread()
numAsserts++
}
}
assert numAsserts > 0
}


class TestRunnable implements Runnable {
@Override
void run() {}
}
}

0 comments on commit e893948

Please sign in to comment.