Skip to content

Commit

Permalink
[pinpoint-apm#7654] Async support SDK
Browse files Browse the repository at this point in the history
  • Loading branch information
zhangyinhao1234 authored and emeroad committed May 7, 2021
1 parent fa6cd87 commit 5170af7
Show file tree
Hide file tree
Showing 36 changed files with 1,551 additions and 17 deletions.
67 changes: 67 additions & 0 deletions agent-sdk/AsyncPropagationExample.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
# AsyncContextProgation

## Runnable instrumentation
Wrap the method you want to trace with `TraceRunnable.asyncEntry()`.

```java
public class AsyncEntryExample {
private final ExecutorService executor = Executors.newSingleThreadExecutor();

@GetMapping(value = "/sdk-async-plugin/asyncEntry-propagation")
public String asyncEntryAndExecute() throws Exception {
CompletableFuture<String> future = new CompletableFuture<>();

Runnable command = TraceRunnable.asyncEntry(() -> future.complete("asyncEntry-execute"));
executor.execute(command);

Thread.sleep(1000);

return future.get();
}
}
```

## Executor instrumentation
Wrap the executor you want to trace to `TraceExecutors.wrapExecutorService(executr, true)`.

```java
public class AutoExample {

private final ExecutorService contextPropagationExecutor
= TraceExecutors.wrapExecutorService(Executors.newSingleThreadExecutor(), true);

@GetMapping(value = "/sdk-async-plugin/auto-context-propagation")
public String autoWrapAndExecute() throws Exception {
CompletableFuture<String> future = new CompletableFuture<>();

contextPropagationExecutor.execute(() -> future.complete("auto-execute"));

Thread.sleep(1000);

return future.get();
}
}
```

## Executor and Runnable instrumentation
This method has high tracing precision.
```java
public class ManualExample {
private final ExecutorService traceExecutor
= TraceExecutors.wrapExecutorService(Executors.newSingleThreadExecutor());

@GetMapping(value = "/sdk-async-plugin/manual-context-propagation")
public String manualWrapAndExecute() throws Exception {

CompletableFuture<String> future = new CompletableFuture<>();

traceExecutor.execute(TraceRunnable.wrap(() -> future.complete("manual-execute")));

traceExecutor.execute(() -> "Not captured");

Thread.sleep(1000);

return future.get();
}
}
```
25 changes: 25 additions & 0 deletions agent-sdk/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>pinpoint</artifactId>
<groupId>com.navercorp.pinpoint</groupId>
<version>2.3.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>pinpoint-agent-sdk</artifactId>
<name>pinpoint-agent-sdk</name>

<properties>

</properties>

<dependencies>

</dependencies>

<build>
</build>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright 2021 NAVER Corp.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.navercorp.pinpoint.sdk.v1.concurrent;

import java.util.Objects;
import java.util.concurrent.Callable;

/**
* {@link Callable} for TraceContext propagation
* @param <V> return type
*/
public class TraceCallable<V> implements Callable<V> {

public static <V> Callable<V> wrap(Callable<V> delegate) {
return new TraceCallable<>(delegate);
}

public static <V> Callable<V> asyncEntry(Callable<V> delegate) {
return new TraceCallable<>(delegate);
}

protected final Callable<V> delegate;

public TraceCallable(Callable<V> delegate) {
this.delegate = Objects.requireNonNull(delegate, "delegate");
}

/**
* the starting point of the async execution
*/
@Override
public V call() throws Exception {
return delegate.call();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright 2021 NAVER Corp.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.navercorp.pinpoint.sdk.v1.concurrent;

import com.navercorp.pinpoint.sdk.v1.concurrent.wrapper.CommandWrapper;

import java.util.Objects;
import java.util.concurrent.Executor;

/**
* {@link Executor} for TraceContext propagation.
* <p>{@link TraceScheduledExecutorService} marks the entry point of the async action.
*/
public class TraceExecutor implements Executor {

protected final Executor delegate;
protected final CommandWrapper wrapper;

public TraceExecutor(Executor delegate, CommandWrapper wrapper) {
this.delegate = Objects.requireNonNull(delegate, "delegate");
this.wrapper = Objects.requireNonNull(wrapper, "wrapper");
}


@Override
public void execute(Runnable command) {
Objects.requireNonNull(command, "command");

command = wrapper.wrap(command);
delegate.execute(command);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
/*
* Copyright 2021 NAVER Corp.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.navercorp.pinpoint.sdk.v1.concurrent;

import com.navercorp.pinpoint.sdk.v1.concurrent.wrapper.CommandWrapper;

import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

/*
* {@link ExecutorService} for TraceContext propagation.
* <p>{@link TraceExecutorService} marks the entry point of the async action.
*/

public class TraceExecutorService implements ExecutorService {

protected final ExecutorService delegate;
protected final CommandWrapper wrapper;

public TraceExecutorService(ExecutorService delegate, CommandWrapper wrapper) {
this.delegate = Objects.requireNonNull(delegate, "delegate");
this.wrapper = Objects.requireNonNull(wrapper, "wrapper");
}

@Override
public void shutdown() {
delegate.shutdown();
}

@Override
public List<Runnable> shutdownNow() {
return delegate.shutdownNow();
}

@Override
public boolean isShutdown() {
return delegate.isShutdown();
}

@Override
public boolean isTerminated() {
return delegate.isTerminated();
}

@Override
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
return delegate.awaitTermination(timeout, unit);
}

@Override
public <T> Future<T> submit(Callable<T> task) {
Objects.requireNonNull(task, "task");

task = wrapper.wrap(task);
return delegate.submit(task);
}

@Override
public <T> Future<T> submit(Runnable task, T result) {
Objects.requireNonNull(task, "task");

task = wrapper.wrap(task);
return delegate.submit(task, result);
}

@Override
public Future<?> submit(Runnable task) {
Objects.requireNonNull(task, "task");

task = wrapper.wrap(task);
return delegate.submit(task);
}

@Override
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {
Objects.requireNonNull(tasks, "tasks");

tasks = wrapper.wrap(tasks);
return delegate.invokeAll(tasks);
}

@Override
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException {
Objects.requireNonNull(tasks, "tasks");

tasks = wrapper.wrap(tasks);
return delegate.invokeAll(tasks, timeout, unit);
}

@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
Objects.requireNonNull(tasks, "tasks");

tasks = wrapper.wrap(tasks);
return delegate.invokeAny(tasks);
}

@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
Objects.requireNonNull(tasks, "tasks");

tasks = wrapper.wrap(tasks);
return delegate.invokeAny(tasks, timeout, unit);
}

@Override
public void execute(Runnable command) {
Objects.requireNonNull(command, "command");

command = wrapper.wrap(command);
delegate.execute(command);
}

}
Loading

0 comments on commit 5170af7

Please sign in to comment.