Skip to content

Commit

Permalink
[#11195] Update reactor onNext
Browse files Browse the repository at this point in the history
  • Loading branch information
jaehong-kim committed Jul 1, 2024
1 parent 47d042a commit f5b0e72
Show file tree
Hide file tree
Showing 3 changed files with 115 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Copyright 2024 NAVER Corp.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.navercorp.pinpoint.bootstrap.plugin.reactor;

import com.navercorp.pinpoint.bootstrap.context.AsyncContext;
import com.navercorp.pinpoint.bootstrap.context.SpanEventRecorder;
import com.navercorp.pinpoint.bootstrap.context.Trace;
import com.navercorp.pinpoint.bootstrap.context.TraceContext;
import com.navercorp.pinpoint.bootstrap.interceptor.AsyncContextSpanEventApiIdAwareAroundInterceptor;
import com.navercorp.pinpoint.common.trace.ServiceType;

public class CoreSubscriberOnNextInterceptor extends AsyncContextSpanEventApiIdAwareAroundInterceptor {
private final ServiceType serviceType;

public CoreSubscriberOnNextInterceptor(TraceContext traceContext, ServiceType serviceType) {
super(traceContext);
this.serviceType = serviceType;
}

// AsyncContext must exist in Target for tracking.
public AsyncContext getAsyncContext(Object target, Object[] args) {
return ReactorContextAccessorUtils.getAsyncContext(target);
}

@Override
public void doInBeforeTrace(SpanEventRecorder recorder, AsyncContext asyncContext, Object target, int apiId, Object[] args) {
}

public AsyncContext getAsyncContext(Object target, Object[] args, Object result, Throwable throwable) {
return ReactorContextAccessorUtils.getAsyncContext(target);
}

@Override
public void afterTrace(AsyncContext asyncContext, Trace trace, SpanEventRecorder recorder, Object target, int apiId, Object[] args, Object result, Throwable throwable) {
if (trace.canSampled()) {
recorder.recordServiceType(serviceType);
recorder.recordApiId(apiId);
recorder.recordException(throwable);
}
}

@Override
public void doInAfterTrace(SpanEventRecorder recorder, Object target, int apiId, Object[] args, Object result, Throwable throwable) {
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright 2024 NAVER Corp.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.navercorp.pinpoint.bootstrap.plugin.reactor;

import com.navercorp.pinpoint.bootstrap.context.AsyncContext;
import com.navercorp.pinpoint.bootstrap.interceptor.AroundInterceptor;

public class CoreSubscriberOnSubscribeInterceptor implements AroundInterceptor {

public CoreSubscriberOnSubscribeInterceptor() {
}

@Override
public void before(Object target, Object[] args) {
final AsyncContext thisAsyncContext = ReactorContextAccessorUtils.getAsyncContext(target);
final AsyncContext subscriptionAsyncContext = ReactorContextAccessorUtils.getAsyncContext(args, 0);
if (thisAsyncContext != null) {
if (subscriptionAsyncContext == null) {
ReactorContextAccessorUtils.setAsyncContext(thisAsyncContext, args, 0);
}
} else {
if (subscriptionAsyncContext != null) {
ReactorContextAccessorUtils.setAsyncContext(subscriptionAsyncContext, target);
}
}
}

@Override
public void after(Object target, Object[] args, Object result, Throwable throwable) {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
import com.navercorp.pinpoint.bootstrap.plugin.ProfilerPlugin;
import com.navercorp.pinpoint.bootstrap.plugin.ProfilerPluginSetupContext;
import com.navercorp.pinpoint.bootstrap.plugin.reactor.CoreSubscriberConstructorInterceptor;
import com.navercorp.pinpoint.bootstrap.plugin.reactor.CoreSubscriberOnNextInterceptor;
import com.navercorp.pinpoint.bootstrap.plugin.reactor.CoreSubscriberOnSubscribeInterceptor;
import com.navercorp.pinpoint.bootstrap.plugin.reactor.FluxAndMonoSubscribeOrReturnInterceptor;
import com.navercorp.pinpoint.bootstrap.plugin.reactor.ReactorContextAccessor;
import com.navercorp.pinpoint.common.util.ArrayUtils;
Expand Down Expand Up @@ -963,6 +965,15 @@ public byte[] doInTransform(Instrumentor instrumentor, ClassLoader loader, Strin
}
}

final InstrumentMethod onSubscribeMethod = target.getDeclaredMethod("onSubscribe", "org.reactivestreams.Subscription");
if (onSubscribeMethod != null) {
onSubscribeMethod.addInterceptor(CoreSubscriberOnSubscribeInterceptor.class);
}
final InstrumentMethod onNextMethod = target.getDeclaredMethod("onNext", "java.lang.Object");
if (onNextMethod != null) {
onNextMethod.addScopedInterceptor(CoreSubscriberOnNextInterceptor.class, va(ReactorConstants.REACTOR), "CoreSubscriberOnNext");
}

return target.toBytecode();
}
}
Expand Down

0 comments on commit f5b0e72

Please sign in to comment.