diff --git a/agent-module/profiler-test/src/main/java/com/navercorp/pinpoint/profiler/test/ApplicationContextHandler.java b/agent-module/profiler-test/src/main/java/com/navercorp/pinpoint/profiler/test/ApplicationContextHandler.java index f0476d58480f..8e80a8fdc890 100644 --- a/agent-module/profiler-test/src/main/java/com/navercorp/pinpoint/profiler/test/ApplicationContextHandler.java +++ b/agent-module/profiler-test/src/main/java/com/navercorp/pinpoint/profiler/test/ApplicationContextHandler.java @@ -21,7 +21,6 @@ import com.google.inject.TypeLiteral; import com.navercorp.pinpoint.common.profiler.message.DataSender; import com.navercorp.pinpoint.common.profiler.message.EnhancedDataSender; -import com.navercorp.pinpoint.io.ResponseMessage; import com.navercorp.pinpoint.loader.service.AnnotationKeyRegistryService; import com.navercorp.pinpoint.loader.service.ServiceTypeRegistryService; import com.navercorp.pinpoint.profiler.context.ServerMetaDataRegistryService; @@ -42,7 +41,7 @@ public class ApplicationContextHandler { private final OrderedSpanRecorder orderedSpanRecorder; - private final TestTcpDataSender tcpDataSender; + private final TestDataSender testDataSender; private final ServerMetaDataRegistryService serverMetaDataRegistryService; private final AnnotationKeyRegistryService annotationKeyRegistryService; private final ServiceTypeRegistryService serviceTypeRegistry; @@ -50,7 +49,7 @@ public class ApplicationContextHandler { public ApplicationContextHandler(DefaultApplicationContext defaultApplicationContext) { Injector injector = defaultApplicationContext.getInjector(); this.orderedSpanRecorder = findRecorder(injector); - this.tcpDataSender = findTestTcpDataSender(injector); + this.testDataSender = findTestTcpDataSender(injector); this.serverMetaDataRegistryService = findServerMetaDataRegistryService(injector); this.annotationKeyRegistryService = findAnnotationKeyRegistryService(injector); this.serviceTypeRegistry = findServiceTypeRegistry(injector); @@ -71,13 +70,13 @@ private OrderedSpanRecorder findRecorder(Injector injector) { throw new IllegalStateException("unexpected dataSender:" + dataSender); } - private TestTcpDataSender findTestTcpDataSender(Injector injector) { - TypeLiteral> dataSenderTypeLiteral = new TypeLiteral>() { + private TestDataSender findTestTcpDataSender(Injector injector) { + TypeLiteral> dataSenderTypeLiteral = new TypeLiteral>() { }; - Key> dataSenderKey = Key.get(dataSenderTypeLiteral); - EnhancedDataSender dataSender = injector.getInstance(dataSenderKey); - if (dataSender instanceof TestTcpDataSender) { - return (TestTcpDataSender) dataSender; + Key> dataSenderKey = Key.get(dataSenderTypeLiteral); + EnhancedDataSender dataSender = injector.getInstance(dataSenderKey); + if (dataSender instanceof TestDataSender) { + return (TestDataSender) dataSender; } throw new IllegalStateException("unexpected dataSender" + dataSender); } @@ -114,7 +113,7 @@ public List getExecutedMethod() { private void addApiDescription(List list, List spanEventList) { for (SpanEvent spanEvent : spanEventList) { int apiId = spanEvent.getApiId(); - String apiDescription = this.tcpDataSender.getApiDescription(apiId); + String apiDescription = this.testDataSender.getApiDescription(apiId); list.add(apiDescription); } } @@ -124,8 +123,8 @@ public OrderedSpanRecorder getOrderedSpanRecorder() { return orderedSpanRecorder; } - public TestTcpDataSender getTcpDataSender() { - return tcpDataSender; + public TestDataSender getTestDataSender() { + return testDataSender; } public ServerMetaDataRegistryService getServerMetaDataRegistryService() { diff --git a/agent-module/profiler-test/src/main/java/com/navercorp/pinpoint/profiler/test/AsyncDataSenderDelegator.java b/agent-module/profiler-test/src/main/java/com/navercorp/pinpoint/profiler/test/AsyncDataSenderDelegator.java new file mode 100644 index 000000000000..3c490a9b01ca --- /dev/null +++ b/agent-module/profiler-test/src/main/java/com/navercorp/pinpoint/profiler/test/AsyncDataSenderDelegator.java @@ -0,0 +1,41 @@ +package com.navercorp.pinpoint.profiler.test; + +import com.navercorp.pinpoint.common.profiler.message.AsyncDataSender; +import com.navercorp.pinpoint.common.profiler.message.DefaultResultResponse; +import com.navercorp.pinpoint.common.profiler.message.EnhancedDataSender; +import com.navercorp.pinpoint.common.profiler.message.ResultResponse; +import com.navercorp.pinpoint.profiler.metadata.MetaDataType; + +import java.util.Objects; +import java.util.concurrent.CompletableFuture; + +public class AsyncDataSenderDelegator implements AsyncDataSender { + + private final EnhancedDataSender dataSender; + + private final ResultResponse success = new DefaultResultResponse(true, "success"); + private final ResultResponse failed = new DefaultResultResponse(false, "failed"); + + public AsyncDataSenderDelegator(EnhancedDataSender dataSender) { + this.dataSender = Objects.requireNonNull(dataSender, "dataSender"); + } + + @Override + public CompletableFuture request(MetaDataType data) { + if (this.dataSender.request(data)) { + return CompletableFuture.completedFuture(success); + } else { + return CompletableFuture.completedFuture(failed); + } + } + + @Override + public boolean send(MetaDataType data) { + return this.dataSender.send(data); + } + + @Override + public void stop() { + // empty + } +} diff --git a/agent-module/profiler-test/src/main/java/com/navercorp/pinpoint/profiler/test/MockApiMetaDataService.java b/agent-module/profiler-test/src/main/java/com/navercorp/pinpoint/profiler/test/MockApiMetaDataService.java index 1136c7586105..1fbcb1f47344 100644 --- a/agent-module/profiler-test/src/main/java/com/navercorp/pinpoint/profiler/test/MockApiMetaDataService.java +++ b/agent-module/profiler-test/src/main/java/com/navercorp/pinpoint/profiler/test/MockApiMetaDataService.java @@ -18,7 +18,6 @@ import com.navercorp.pinpoint.bootstrap.context.MethodDescriptor; import com.navercorp.pinpoint.common.profiler.message.EnhancedDataSender; -import com.navercorp.pinpoint.io.ResponseMessage; import com.navercorp.pinpoint.profiler.cache.IdAllocator; import com.navercorp.pinpoint.profiler.cache.Result; import com.navercorp.pinpoint.profiler.cache.SimpleCache; @@ -35,9 +34,9 @@ public class MockApiMetaDataService implements ApiMetaDataService { private final SimpleCache apiCache = new SimpleCache<>(new IdAllocator.ZigZagAllocator()); - private final EnhancedDataSender enhancedDataSender; + private final EnhancedDataSender enhancedDataSender; - public MockApiMetaDataService(EnhancedDataSender enhancedDataSender) { + public MockApiMetaDataService(EnhancedDataSender enhancedDataSender) { this.enhancedDataSender = Objects.requireNonNull(enhancedDataSender, "enhancedDataSender"); } diff --git a/agent-module/profiler-test/src/main/java/com/navercorp/pinpoint/profiler/test/MockApiMetaDataServiceProvider.java b/agent-module/profiler-test/src/main/java/com/navercorp/pinpoint/profiler/test/MockApiMetaDataServiceProvider.java index 5eb61f853f4e..307cb85726a0 100644 --- a/agent-module/profiler-test/src/main/java/com/navercorp/pinpoint/profiler/test/MockApiMetaDataServiceProvider.java +++ b/agent-module/profiler-test/src/main/java/com/navercorp/pinpoint/profiler/test/MockApiMetaDataServiceProvider.java @@ -19,7 +19,6 @@ import com.google.inject.Inject; import com.google.inject.Provider; import com.navercorp.pinpoint.common.profiler.message.EnhancedDataSender; -import com.navercorp.pinpoint.io.ResponseMessage; import com.navercorp.pinpoint.profiler.metadata.ApiMetaDataService; import com.navercorp.pinpoint.profiler.metadata.MetaDataType; @@ -30,16 +29,16 @@ */ public class MockApiMetaDataServiceProvider implements Provider { - private final Provider> enhancedDataSenderProvider; + private final Provider> enhancedDataSenderProvider; @Inject - public MockApiMetaDataServiceProvider(Provider> enhancedDataSenderProvider) { + public MockApiMetaDataServiceProvider(Provider> enhancedDataSenderProvider) { this.enhancedDataSenderProvider = Objects.requireNonNull(enhancedDataSenderProvider, "enhancedDataSenderProvider"); } @Override public ApiMetaDataService get() { - final EnhancedDataSender enhancedDataSender = this.enhancedDataSenderProvider.get(); + final EnhancedDataSender enhancedDataSender = this.enhancedDataSenderProvider.get(); return new MockApiMetaDataService(enhancedDataSender); } diff --git a/agent-module/profiler-test/src/main/java/com/navercorp/pinpoint/profiler/test/PluginApplicationContextModule.java b/agent-module/profiler-test/src/main/java/com/navercorp/pinpoint/profiler/test/PluginApplicationContextModule.java index cb951e811ffd..2d988d5f5550 100644 --- a/agent-module/profiler-test/src/main/java/com/navercorp/pinpoint/profiler/test/PluginApplicationContextModule.java +++ b/agent-module/profiler-test/src/main/java/com/navercorp/pinpoint/profiler/test/PluginApplicationContextModule.java @@ -22,7 +22,6 @@ import com.google.inject.TypeLiteral; import com.navercorp.pinpoint.common.profiler.message.DataSender; import com.navercorp.pinpoint.common.profiler.message.EnhancedDataSender; -import com.navercorp.pinpoint.io.ResponseMessage; import com.navercorp.pinpoint.profiler.context.DefaultServerMetaDataRegistryService; import com.navercorp.pinpoint.profiler.context.ServerMetaDataRegistryService; import com.navercorp.pinpoint.profiler.context.SpanType; @@ -66,9 +65,9 @@ protected void configure() { bind(StorageFactory.class).to(TestSpanStorageFactory.class); - EnhancedDataSender enhancedDataSender = newTcpDataSender(); + EnhancedDataSender enhancedDataSender = newTcpDataSender(); logger.debug("enhancedDataSender:{}", enhancedDataSender); - TypeLiteral> dataSenderTypeLiteral = new TypeLiteral>() {}; + TypeLiteral> dataSenderTypeLiteral = new TypeLiteral>() {}; bind(dataSenderTypeLiteral).toInstance(enhancedDataSender); ServerMetaDataRegistryService serverMetaDataRegistryService = newServerMetaDataRegistryService(); @@ -89,8 +88,8 @@ private DataSender newUdpSpanDataSender() { return sender; } - private EnhancedDataSender newTcpDataSender() { - return new TestTcpDataSender(); + private EnhancedDataSender newTcpDataSender() { + return new TestDataSender(); } private ServerMetaDataRegistryService newServerMetaDataRegistryService() { diff --git a/agent-module/profiler-test/src/main/java/com/navercorp/pinpoint/profiler/test/PluginVerifierExternalAdaptor.java b/agent-module/profiler-test/src/main/java/com/navercorp/pinpoint/profiler/test/PluginVerifierExternalAdaptor.java index 8c484d885524..b57e54b1893b 100644 --- a/agent-module/profiler-test/src/main/java/com/navercorp/pinpoint/profiler/test/PluginVerifierExternalAdaptor.java +++ b/agent-module/profiler-test/src/main/java/com/navercorp/pinpoint/profiler/test/PluginVerifierExternalAdaptor.java @@ -406,7 +406,7 @@ private void verifySpan(final ResolvedExpectedTrace expected, ActualTrace actual if (expected.exception != null) { final IntStringValue actualExceptionInfo = actual.getExceptionInfo(); if (actualExceptionInfo != null) { - String actualExceptionClassName = this.handler.getTcpDataSender().getString(actualExceptionInfo.getIntValue()); + String actualExceptionClassName = this.handler.getTestDataSender().getString(actualExceptionInfo.getIntValue()); String actualExceptionMessage = actualExceptionInfo.getStringValue(); verifyException(expected.exception, actualExceptionClassName, actualExceptionMessage); } else { @@ -459,7 +459,7 @@ private void annotationCompare(int index, ResolvedExpectedTrace expected, Actual } if (AnnotationKeyUtils.isCachedArgsKey(expectedAnnotationKey.getCode())) { - expectedValue = this.handler.getTcpDataSender().getStringId(expectedValue.toString()); + expectedValue = this.handler.getTestDataSender().getStringId(expectedValue.toString()); } if (!Objects.equals(expectedValue, actualAnnotation.getValue())) { @@ -505,11 +505,11 @@ private void verifyException(Exception expectedException, String actualException } private void verifySql(int index, ExpectedSql expected, Annotation actual) { - int id = this.handler.getTcpDataSender().getSqlId(expected.getQuery()); + int id = this.handler.getTestDataSender().getSqlId(expected.getQuery()); IntStringStringValue actualSql = (IntStringStringValue) actual.getValue(); if (actualSql.getIntValue() != id) { - String actualQuery = this.handler.getTcpDataSender().getSql(actualSql.getIntValue()); + String actualQuery = this.handler.getTestDataSender().getSql(actualSql.getIntValue()); AssertionErrorBuilder builder = new AssertionErrorBuilder(String.format("Annotation[%s].sqlId", index), id + ":" + expected.getQuery(), actualSql.getIntValue() + ": " + actualQuery); @@ -533,11 +533,11 @@ private void verifySql(int index, ExpectedSql expected, Annotation actual) { } private void verifySqlUid(int index, ExpectedSql expected, Annotation actual) { - byte[] uid = this.handler.getTcpDataSender().getSqlUid(expected.getQuery()); + byte[] uid = this.handler.getTestDataSender().getSqlUid(expected.getQuery()); BytesStringStringValue actualSql = (BytesStringStringValue) actual.getValue(); if (!Arrays.equals(actualSql.getBytesValue(), uid)) { - String actualQuery = this.handler.getTcpDataSender().getSql(actualSql.getBytesValue()); + String actualQuery = this.handler.getTestDataSender().getSql(actualSql.getBytesValue()); AssertionErrorBuilder builder = new AssertionErrorBuilder(String.format("Annotation[%s].sqlUid", index), Arrays.toString(uid) + ":" + expected.getQuery(), Arrays.toString(actualSql.getBytesValue()) + ": " + actualQuery); @@ -591,7 +591,7 @@ private String getConstructorInfo(Constructor constructor) { private int findApiId(String desc) throws AssertionError { try { - return this.handler.getTcpDataSender().getApiId(desc); + return this.handler.getTestDataSender().getApiId(desc); } catch (NoSuchElementException e) { throw new AssertionError("Cannot find apiId of [" + desc + "]"); } @@ -635,7 +635,7 @@ public List getExecutedMethod() { @Override public void printCache(PrintStream out) { this.handler.getOrderedSpanRecorder().print(out); - this.handler.getTcpDataSender().printDatas(out); + this.handler.getTestDataSender().printDatas(out); } @Override @@ -651,7 +651,7 @@ public void initialize(boolean createTraceObject) { } this.handler.getOrderedSpanRecorder().clear(); - this.handler.getTcpDataSender().clear(); + this.handler.getTestDataSender().clear(); ignoredServiceTypes.clear(); } @@ -663,7 +663,7 @@ public void cleanUp(boolean detachTraceObject) { } this.handler.getOrderedSpanRecorder().clear(); - this.handler.getTcpDataSender().clear(); + this.handler.getTestDataSender().clear(); ignoredServiceTypes.clear(); } diff --git a/agent-module/profiler-test/src/main/java/com/navercorp/pinpoint/profiler/test/TestTcpDataSender.java b/agent-module/profiler-test/src/main/java/com/navercorp/pinpoint/profiler/test/TestDataSender.java similarity index 95% rename from agent-module/profiler-test/src/main/java/com/navercorp/pinpoint/profiler/test/TestTcpDataSender.java rename to agent-module/profiler-test/src/main/java/com/navercorp/pinpoint/profiler/test/TestDataSender.java index afe49af15910..b01528e9f5b6 100644 --- a/agent-module/profiler-test/src/main/java/com/navercorp/pinpoint/profiler/test/TestTcpDataSender.java +++ b/agent-module/profiler-test/src/main/java/com/navercorp/pinpoint/profiler/test/TestDataSender.java @@ -17,7 +17,6 @@ import com.google.common.primitives.UnsignedBytes; import com.navercorp.pinpoint.common.profiler.message.EnhancedDataSender; -import com.navercorp.pinpoint.io.ResponseMessage; import com.navercorp.pinpoint.profiler.metadata.ApiMetaData; import com.navercorp.pinpoint.profiler.metadata.MetaDataType; import com.navercorp.pinpoint.profiler.metadata.SqlMetaData; @@ -33,13 +32,12 @@ import java.util.List; import java.util.Map.Entry; import java.util.NoSuchElementException; -import java.util.function.BiConsumer; /** * @author Jongho Moon * @author jaehong.kim */ -public class TestTcpDataSender implements EnhancedDataSender { +public class TestDataSender implements EnhancedDataSender { private final List datas = Collections.synchronizedList(new ArrayList<>()); @@ -150,12 +148,6 @@ public boolean request(MetaDataType data, int retry) { return true; } - @Override - public boolean request(MetaDataType data, BiConsumer listener) { - addData(data); - return true; - } - public String getApiDescription(int id) { return syncGet(apiIdMap, id); diff --git a/agent-module/profiler-test/src/main/java/com/navercorp/pinpoint/profiler/test/rpc/MockMessageConverter.java b/agent-module/profiler-test/src/main/java/com/navercorp/pinpoint/profiler/test/rpc/MockMessageConverter.java deleted file mode 100644 index 6721231ebaab..000000000000 --- a/agent-module/profiler-test/src/main/java/com/navercorp/pinpoint/profiler/test/rpc/MockMessageConverter.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Copyright 2023 NAVER Corp. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.navercorp.pinpoint.profiler.test.rpc; - -import com.google.inject.Inject; -import com.navercorp.pinpoint.common.profiler.message.DefaultResultResponse; -import com.navercorp.pinpoint.common.profiler.message.MessageConverter; -import com.navercorp.pinpoint.common.profiler.message.ResultResponse; - -public class MockMessageConverter implements MessageConverter { - - @Inject - public MockMessageConverter() { - } - - @Override - public ResultResponse toMessage(Object message) { - return new DefaultResultResponse(true, "success by mocking"); - } -} diff --git a/agent-module/profiler-test/src/main/java/com/navercorp/pinpoint/profiler/test/rpc/MockRpcModule.java b/agent-module/profiler-test/src/main/java/com/navercorp/pinpoint/profiler/test/rpc/MockRpcModule.java index fddaf03d0007..ec6741a6d4bd 100644 --- a/agent-module/profiler-test/src/main/java/com/navercorp/pinpoint/profiler/test/rpc/MockRpcModule.java +++ b/agent-module/profiler-test/src/main/java/com/navercorp/pinpoint/profiler/test/rpc/MockRpcModule.java @@ -21,23 +21,22 @@ import com.google.inject.Scopes; import com.google.inject.TypeLiteral; import com.google.inject.name.Names; +import com.navercorp.pinpoint.common.profiler.message.AsyncDataSender; import com.navercorp.pinpoint.common.profiler.message.DataSender; import com.navercorp.pinpoint.common.profiler.message.EnhancedDataSender; -import com.navercorp.pinpoint.common.profiler.message.MessageConverter; import com.navercorp.pinpoint.common.profiler.message.ResultResponse; -import com.navercorp.pinpoint.io.ResponseMessage; import com.navercorp.pinpoint.profiler.context.SpanType; import com.navercorp.pinpoint.profiler.context.module.AgentDataSender; import com.navercorp.pinpoint.profiler.context.module.MetadataDataSender; import com.navercorp.pinpoint.profiler.context.module.ModuleLifeCycle; -import com.navercorp.pinpoint.profiler.context.module.ResultConverter; import com.navercorp.pinpoint.profiler.context.module.SpanDataSender; import com.navercorp.pinpoint.profiler.context.module.StatDataSender; import com.navercorp.pinpoint.profiler.metadata.MetaDataType; import com.navercorp.pinpoint.profiler.monitor.metric.MetricType; +import com.navercorp.pinpoint.profiler.test.AsyncDataSenderDelegator; import com.navercorp.pinpoint.profiler.test.ListenableDataSender; import com.navercorp.pinpoint.profiler.test.OrderedSpanRecorder; -import com.navercorp.pinpoint.profiler.test.TestTcpDataSender; +import com.navercorp.pinpoint.profiler.test.TestDataSender; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -72,28 +71,33 @@ protected void configure() { bind(statDataSenderKey).toInstance(statDataSender); expose(statDataSenderKey); - EnhancedDataSender enhancedDataSender = new TestTcpDataSender(); + EnhancedDataSender enhancedDataSender = new TestDataSender(); logger.debug("enhancedDataSender:{}", enhancedDataSender); - TypeLiteral> dataSenderTypeLiteral = new TypeLiteral>() { + TypeLiteral> dataSenderTypeLiteral = new TypeLiteral>() { }; bind(dataSenderTypeLiteral).toInstance(enhancedDataSender); expose(dataSenderTypeLiteral); - Key> agentDataSender = Key.get(dataSenderTypeLiteral, AgentDataSender.class); - bind(agentDataSender).to(dataSenderTypeLiteral).in(Scopes.SINGLETON); + AsyncDataSender asyncDataSender = new AsyncDataSenderDelegator(enhancedDataSender); + logger.debug("asyncDataSender:{}", asyncDataSender); + TypeLiteral> asyncDataSenderTypeLiteral = new TypeLiteral>() { + }; + bind(asyncDataSenderTypeLiteral).toInstance(asyncDataSender); + expose(asyncDataSenderTypeLiteral); + + + Key> agentDataSender = Key.get(asyncDataSenderTypeLiteral, AgentDataSender.class); + bind(agentDataSender).to(asyncDataSenderTypeLiteral).in(Scopes.SINGLETON); expose(agentDataSender); - Key> metadataDataSender = Key.get(dataSenderTypeLiteral, MetadataDataSender.class); + logger.debug("enhancedDataSender:{}", enhancedDataSender); + TypeLiteral> enhancedDataSenderTypeLiteral = new TypeLiteral>() { + }; + Key> metadataDataSender = Key.get(enhancedDataSenderTypeLiteral, MetadataDataSender.class); bind(metadataDataSender).to(dataSenderTypeLiteral).in(Scopes.SINGLETON); expose(metadataDataSender); - TypeLiteral> resultMessageConverter = new TypeLiteral>() {}; - Key> resultMessageConverterKey = Key.get(resultMessageConverter, ResultConverter.class); - bind(resultMessageConverterKey).to(MockMessageConverter.class).in(Scopes.SINGLETON); - expose(resultMessageConverterKey); - - Key rpcModuleLifeCycleKey = Key.get(ModuleLifeCycle.class, Names.named("RPC-MODULE")); bind(rpcModuleLifeCycleKey).to(MockModuleLifeCycle.class).in(Scopes.SINGLETON); expose(rpcModuleLifeCycleKey); diff --git a/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/AgentInfoSender.java b/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/AgentInfoSender.java index 74e232cb1693..06ac30d98875 100644 --- a/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/AgentInfoSender.java +++ b/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/AgentInfoSender.java @@ -16,10 +16,8 @@ package com.navercorp.pinpoint.profiler; -import com.navercorp.pinpoint.common.profiler.message.EnhancedDataSender; -import com.navercorp.pinpoint.common.profiler.message.MessageConverter; +import com.navercorp.pinpoint.common.profiler.message.AsyncDataSender; import com.navercorp.pinpoint.common.profiler.message.ResultResponse; -import com.navercorp.pinpoint.io.ResponseMessage; import com.navercorp.pinpoint.profiler.metadata.AgentInfo; import com.navercorp.pinpoint.profiler.metadata.MetaDataType; import com.navercorp.pinpoint.profiler.util.AgentInfoFactory; @@ -29,6 +27,7 @@ import java.util.Objects; import java.util.Timer; import java.util.TimerTask; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -49,13 +48,12 @@ public class AgentInfoSender { private final Logger logger = LogManager.getLogger(this.getClass()); - private final EnhancedDataSender dataSender; + private final AsyncDataSender dataSender; private final AgentInfoFactory agentInfoFactory; private final long refreshIntervalMs; private final long sendIntervalMs; private final int maxTryPerAttempt; private final Scheduler scheduler; - private final MessageConverter messageConverter; private AgentInfoSender(Builder builder) { this.dataSender = builder.dataSender; @@ -64,7 +62,6 @@ private AgentInfoSender(Builder builder) { this.sendIntervalMs = builder.sendIntervalMs; this.maxTryPerAttempt = builder.maxTryPerAttempt; this.scheduler = new Scheduler(); - this.messageConverter = builder.messageConverter; } public void start() { @@ -179,10 +176,9 @@ private boolean sendAgentInfo() { agentInfo = agentInfoFactory.createAgentInfo(); logger.info("Sending AgentInfo={}", agentInfo); - ResponseFutureListener listener = new ResponseFutureListener<>(); - dataSender.request(agentInfo, listener); - ResponseMessage responseMessage = listener.getResponseFuture().get(3000, TimeUnit.MILLISECONDS); - if (responseMessage == null) { + CompletableFuture future = dataSender.request(agentInfo); + ResultResponse result = future.get(3000, TimeUnit.MILLISECONDS); + if (result == null) { if (agentInfo != null && agentInfo.getAgentInformation() != null) { logger.warn("Failed to send agentInfo={}. result not set", agentInfo.getAgentInformation()); } else { @@ -190,7 +186,6 @@ private boolean sendAgentInfo() { } return false; } - final ResultResponse result = messageConverter.toMessage(responseMessage); if (!result.isSuccess()) { if (agentInfo != null && agentInfo.getAgentInformation() != null) { logger.warn("Failed to send agentInfo={}. request unsuccessful, response={}", agentInfo.getAgentInformation(), result.getMessage()); @@ -220,14 +215,13 @@ private void logError(AgentInfo agentInfo, Throwable cause) { } public static class Builder { - private final EnhancedDataSender dataSender; + private final AsyncDataSender dataSender; private final AgentInfoFactory agentInfoFactory; private long refreshIntervalMs = DEFAULT_AGENT_INFO_REFRESH_INTERVAL_MS; private long sendIntervalMs = DEFAULT_AGENT_INFO_SEND_INTERVAL_MS; private int maxTryPerAttempt = DEFAULT_MAX_TRY_COUNT_PER_ATTEMPT; - private MessageConverter messageConverter; - public Builder(EnhancedDataSender dataSender, AgentInfoFactory agentInfoFactory) { + public Builder(AsyncDataSender dataSender, AgentInfoFactory agentInfoFactory) { this.dataSender = Objects.requireNonNull(dataSender, "dataSender"); this.agentInfoFactory = Objects.requireNonNull(agentInfoFactory, "agentInfoFactory"); } @@ -247,10 +241,6 @@ public Builder maxTryPerAttempt(int maxTryCountPerAttempt) { return this; } - public Builder setMessageConverter(MessageConverter messageConverter) { - this.messageConverter = messageConverter; - return this; - } public AgentInfoSender build() { if (this.refreshIntervalMs <= 0) { diff --git a/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/context/exception/storage/BufferedExceptionStorage.java b/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/context/exception/storage/BufferedExceptionStorage.java index 7042c96f6cd0..c476ff0b0a43 100644 --- a/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/context/exception/storage/BufferedExceptionStorage.java +++ b/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/context/exception/storage/BufferedExceptionStorage.java @@ -17,7 +17,6 @@ import com.navercorp.pinpoint.common.profiler.message.EnhancedDataSender; import com.navercorp.pinpoint.common.util.CollectionUtils; -import com.navercorp.pinpoint.io.ResponseMessage; import com.navercorp.pinpoint.profiler.context.exception.model.ExceptionMetaData; import com.navercorp.pinpoint.profiler.context.exception.model.ExceptionMetaDataFactory; import com.navercorp.pinpoint.profiler.context.exception.model.ExceptionWrapper; @@ -38,12 +37,12 @@ public class BufferedExceptionStorage implements ExceptionStorage { private static final boolean isDebug = logger.isDebugEnabled(); private final ArrayBuffer buffer; - private final EnhancedDataSender dataSender; + private final EnhancedDataSender dataSender; private final ExceptionMetaDataFactory factory; public BufferedExceptionStorage( int bufferSize, - EnhancedDataSender dataSender, + EnhancedDataSender dataSender, ExceptionMetaDataFactory exceptionMetaDataFactory ) { this.dataSender = Objects.requireNonNull(dataSender, "dataSender"); diff --git a/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/context/exception/storage/ExceptionStorageFactory.java b/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/context/exception/storage/ExceptionStorageFactory.java index dc1267a39716..8cf39f49e68e 100644 --- a/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/context/exception/storage/ExceptionStorageFactory.java +++ b/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/context/exception/storage/ExceptionStorageFactory.java @@ -16,7 +16,6 @@ package com.navercorp.pinpoint.profiler.context.exception.storage; import com.navercorp.pinpoint.common.profiler.message.EnhancedDataSender; -import com.navercorp.pinpoint.io.ResponseMessage; import com.navercorp.pinpoint.profiler.context.exception.model.ExceptionMetaDataFactory; import com.navercorp.pinpoint.profiler.metadata.MetaDataType; @@ -27,10 +26,10 @@ */ public class ExceptionStorageFactory { - private final EnhancedDataSender dataSender; + private final EnhancedDataSender dataSender; private final int bufferSize; - public ExceptionStorageFactory(EnhancedDataSender dataSender, int bufferSize) { + public ExceptionStorageFactory(EnhancedDataSender dataSender, int bufferSize) { this.dataSender = Objects.requireNonNull(dataSender, "dataSender"); this.bufferSize = bufferSize; } diff --git a/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/context/grpc/GrpcMessageToResultConverter.java b/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/context/grpc/GrpcMessageToResultConverter.java deleted file mode 100644 index 07475a5e87d4..000000000000 --- a/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/context/grpc/GrpcMessageToResultConverter.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Copyright 2019 NAVER Corp. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.navercorp.pinpoint.profiler.context.grpc; - -import com.google.protobuf.InvalidProtocolBufferException; -import com.navercorp.pinpoint.common.profiler.message.DefaultResultResponse; -import com.navercorp.pinpoint.common.profiler.message.MessageConverter; -import com.navercorp.pinpoint.common.profiler.message.ResultResponse; -import com.navercorp.pinpoint.grpc.trace.PResult; -import com.navercorp.pinpoint.io.ResponseMessage; - -/** - * @author jaehong.kim - */ -public class GrpcMessageToResultConverter implements MessageConverter { - @Override - public ResultResponse toMessage(Object object) { - if (object instanceof ResponseMessage) { - final ResponseMessage responseMessage = (ResponseMessage) object; - final byte[] byteMessage = responseMessage.getMessage(); - try { - final PResult pResult = PResult.parseFrom(byteMessage); - return new DefaultResultResponse(pResult.getSuccess(), pResult.getMessage()); - } catch (InvalidProtocolBufferException e) { - throw new IllegalArgumentException("invalid message data. response message=" + responseMessage, e); - } - } - return null; - } -} diff --git a/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/context/grpc/GrpcMessageToResultConverterProvider.java b/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/context/grpc/GrpcMessageToResultConverterProvider.java deleted file mode 100644 index f284fde5ac05..000000000000 --- a/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/context/grpc/GrpcMessageToResultConverterProvider.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Copyright 2019 NAVER Corp. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.navercorp.pinpoint.profiler.context.grpc; - -import com.google.inject.Inject; -import com.google.inject.Provider; -import com.navercorp.pinpoint.common.profiler.message.MessageConverter; -import com.navercorp.pinpoint.common.profiler.message.ResultResponse; - -/** - * @author jaehong.kim - */ -public class GrpcMessageToResultConverterProvider implements Provider> { - @Inject - public GrpcMessageToResultConverterProvider() { - } - - @Override - public MessageConverter get() { - return new GrpcMessageToResultConverter(); - } -} diff --git a/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/context/module/GrpcModule.java b/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/context/module/GrpcModule.java index dc4a999872f0..ae390d209d8d 100644 --- a/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/context/module/GrpcModule.java +++ b/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/context/module/GrpcModule.java @@ -23,6 +23,7 @@ import com.google.inject.name.Names; import com.google.protobuf.GeneratedMessageV3; import com.navercorp.pinpoint.bootstrap.config.ProfilerConfig; +import com.navercorp.pinpoint.common.profiler.message.AsyncDataSender; import com.navercorp.pinpoint.common.profiler.message.DataSender; import com.navercorp.pinpoint.common.profiler.message.EnhancedDataSender; import com.navercorp.pinpoint.common.profiler.message.MessageConverter; @@ -30,10 +31,8 @@ import com.navercorp.pinpoint.grpc.client.HeaderFactory; import com.navercorp.pinpoint.grpc.trace.PSpan; import com.navercorp.pinpoint.grpc.trace.PSpanChunk; -import com.navercorp.pinpoint.io.ResponseMessage; import com.navercorp.pinpoint.profiler.context.SpanType; import com.navercorp.pinpoint.profiler.context.compress.SpanProcessor; -import com.navercorp.pinpoint.profiler.context.grpc.GrpcMessageToResultConverterProvider; import com.navercorp.pinpoint.profiler.context.grpc.GrpcMetadataMessageConverterProvider; import com.navercorp.pinpoint.profiler.context.grpc.GrpcSpanMessageConverterProvider; import com.navercorp.pinpoint.profiler.context.grpc.GrpcStatMessageConverterProvider; @@ -165,19 +164,15 @@ private void bindAgentDataSender() { Key> metadataMessageConverterKey = Key.get(metadataMessageConverter, MetadataDataSender.class); bind(metadataMessageConverterKey).toProvider(GrpcMetadataMessageConverterProvider.class).in(Scopes.SINGLETON); - TypeLiteral> resultMessageConverter = new TypeLiteral>() { + TypeLiteral> agentDataSenderTypeLiteral = new TypeLiteral>() { }; - Key> resultMessageConverterKey = Key.get(resultMessageConverter, ResultConverter.class); - bind(resultMessageConverterKey).toProvider(GrpcMessageToResultConverterProvider.class).in(Scopes.SINGLETON); - expose(resultMessageConverterKey); - - TypeLiteral> dataSenderTypeLiteral = new TypeLiteral>() { - }; - Key> agentDataSender = Key.get(dataSenderTypeLiteral, AgentDataSender.class); + Key> agentDataSender = Key.get(agentDataSenderTypeLiteral, AgentDataSender.class); bind(agentDataSender).toProvider(AgentGrpcDataSenderProvider.class).in(Scopes.SINGLETON); expose(agentDataSender); - Key> metadataDataSender = Key.get(dataSenderTypeLiteral, MetadataDataSender.class); + TypeLiteral> metaDataSenderTypeLiteral = new TypeLiteral>() { + }; + Key> metadataDataSender = Key.get(metaDataSenderTypeLiteral, MetadataDataSender.class); bind(metadataDataSender).toProvider(MetadataGrpcDataSenderProvider.class).in(Scopes.SINGLETON); expose(metadataDataSender); } diff --git a/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/context/module/GrpcModuleLifeCycle.java b/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/context/module/GrpcModuleLifeCycle.java index 2a8911518eff..07f0bbf2b87b 100644 --- a/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/context/module/GrpcModuleLifeCycle.java +++ b/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/context/module/GrpcModuleLifeCycle.java @@ -20,10 +20,12 @@ import com.google.inject.Provider; import com.navercorp.pinpoint.bootstrap.logging.PluginLogManager; import com.navercorp.pinpoint.bootstrap.logging.PluginLogger; +import com.navercorp.pinpoint.common.profiler.Stoppable; +import com.navercorp.pinpoint.common.profiler.message.AsyncDataSender; import com.navercorp.pinpoint.common.profiler.message.DataSender; import com.navercorp.pinpoint.common.profiler.message.EnhancedDataSender; +import com.navercorp.pinpoint.common.profiler.message.ResultResponse; import com.navercorp.pinpoint.grpc.ExecutorUtils; -import com.navercorp.pinpoint.io.ResponseMessage; import com.navercorp.pinpoint.profiler.context.SpanType; import com.navercorp.pinpoint.profiler.metadata.MetaDataType; import com.navercorp.pinpoint.profiler.monitor.metric.MetricType; @@ -40,16 +42,16 @@ public class GrpcModuleLifeCycle implements ModuleLifeCycle { private final PluginLogger logger = PluginLogManager.getLogger(this.getClass()); - private final Provider> agentDataSenderProvider; - private final Provider> metadataDataSenderProvider; + private final Provider> agentDataSenderProvider; + private final Provider> metadataDataSenderProvider; private final Provider> spanDataSenderProvider; private final Provider> statDataSenderProvider; private final Provider dnsExecutorServiceProvider; private final Provider reconnectScheduledExecutorProvider; - private EnhancedDataSender agentDataSender; - private EnhancedDataSender metadataDataSender; + private AsyncDataSender agentDataSender; + private EnhancedDataSender metadataDataSender; private DataSender spanDataSender; private DataSender statDataSender; @@ -61,8 +63,8 @@ public class GrpcModuleLifeCycle implements ModuleLifeCycle { @Inject public GrpcModuleLifeCycle( - @AgentDataSender Provider> agentDataSenderProvider, - @MetadataDataSender Provider> metadataDataSenderProvider, + @AgentDataSender Provider> agentDataSenderProvider, + @MetadataDataSender Provider> metadataDataSenderProvider, @SpanDataSender Provider> spanDataSenderProvider, @StatDataSender Provider> statDataSenderProvider, Provider dnsExecutorServiceProvider, @@ -104,20 +106,10 @@ public void start() { @Override public void shutdown() { logger.info("shutdown()"); - if (spanDataSender != null) { - this.spanDataSender.stop(); - } - if (statDataSender != null) { - this.statDataSender.stop(); - } - - if (agentDataSender != null) { - this.agentDataSender.stop(); - } - - if (metadataDataSender != null) { - this.metadataDataSender.stop(); - } + Stoppable.stopQuietly(spanDataSender); + Stoppable.stopQuietly(statDataSender); + Stoppable.stopQuietly(agentDataSender); + Stoppable.stopQuietly(metadataDataSender); if (dnsExecutorService != null) { ExecutorUtils.shutdownExecutorService("dnsExecutor", dnsExecutorService); @@ -125,21 +117,18 @@ public void shutdown() { if (reconnectScheduledExecutorService != null) { ExecutorUtils.shutdownExecutorService("reconnectScheduledExecutor", reconnectScheduledExecutorService); } - if (reporter != null) { - reporter.stop(); - } + Stoppable.stopQuietly(reporter); } @Override public String toString() { - final StringBuilder sb = new StringBuilder("GrpcModuleLifeCycle{"); - sb.append(", agentDataSender=").append(agentDataSender); - sb.append(", metadataDataSender=").append(metadataDataSender); - sb.append(", spanDataSender=").append(spanDataSender); - sb.append(", statDataSender=").append(statDataSender); - sb.append(", dnsExecutorService=").append(dnsExecutorService); - sb.append(", reconnectScheduledExecutorService=" + reconnectScheduledExecutorService); - sb.append('}'); - return sb.toString(); + return "GrpcModuleLifeCycle{" + + ", agentDataSender=" + agentDataSender + + ", metadataDataSender=" + metadataDataSender + + ", spanDataSender=" + spanDataSender + + ", statDataSender=" + statDataSender + + ", dnsExecutorService=" + dnsExecutorService + + ", reconnectScheduledExecutorService=" + reconnectScheduledExecutorService + + '}'; } } diff --git a/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/context/provider/AgentInfoSenderProvider.java b/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/context/provider/AgentInfoSenderProvider.java index 523b25949815..52f39f9e88d6 100644 --- a/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/context/provider/AgentInfoSenderProvider.java +++ b/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/context/provider/AgentInfoSenderProvider.java @@ -18,15 +18,12 @@ import com.google.inject.Inject; import com.google.inject.Provider; -import com.navercorp.pinpoint.common.profiler.message.EnhancedDataSender; -import com.navercorp.pinpoint.common.profiler.message.MessageConverter; +import com.navercorp.pinpoint.common.profiler.message.AsyncDataSender; import com.navercorp.pinpoint.common.profiler.message.ResultResponse; -import com.navercorp.pinpoint.io.ResponseMessage; import com.navercorp.pinpoint.profiler.AgentInfoSender; import com.navercorp.pinpoint.profiler.context.ServerMetaDataRegistryService; import com.navercorp.pinpoint.profiler.context.config.ContextConfig; import com.navercorp.pinpoint.profiler.context.module.AgentDataSender; -import com.navercorp.pinpoint.profiler.context.module.ResultConverter; import com.navercorp.pinpoint.profiler.metadata.MetaDataType; import com.navercorp.pinpoint.profiler.util.AgentInfoFactory; @@ -39,32 +36,28 @@ public class AgentInfoSenderProvider implements Provider { private final ContextConfig contextConfig; - private final Provider> enhancedDataSenderProvider; + private final Provider> enhancedDataSenderProvider; private final Provider agentInfoFactoryProvider; private final ServerMetaDataRegistryService serverMetaDataRegistryService; - private final MessageConverter messageConverter; @Inject public AgentInfoSenderProvider( ContextConfig contextConfig, - @AgentDataSender Provider> enhancedDataSenderProvider, + @AgentDataSender Provider> asyncDataSenderProvider, Provider agentInfoFactoryProvider, - ServerMetaDataRegistryService serverMetaDataRegistryService, - @ResultConverter MessageConverter messageConverter) { + ServerMetaDataRegistryService serverMetaDataRegistryService) { this.contextConfig = Objects.requireNonNull(contextConfig, "contextConfig"); - this.enhancedDataSenderProvider = Objects.requireNonNull(enhancedDataSenderProvider, "enhancedDataSenderProvider"); + this.enhancedDataSenderProvider = Objects.requireNonNull(asyncDataSenderProvider, "asyncDataSenderProvider"); this.agentInfoFactoryProvider = Objects.requireNonNull(agentInfoFactoryProvider, "agentInfoFactoryProvider"); this.serverMetaDataRegistryService = Objects.requireNonNull(serverMetaDataRegistryService, "serverMetaDataRegistryService"); - this.messageConverter = Objects.requireNonNull(messageConverter, "messageConverter"); } @Override public AgentInfoSender get() { - final EnhancedDataSender enhancedDataSender = this.enhancedDataSenderProvider.get(); + final AsyncDataSender enhancedDataSender = this.enhancedDataSenderProvider.get(); final AgentInfoFactory agentInfoFactory = this.agentInfoFactoryProvider.get(); final AgentInfoSender agentInfoSender = new AgentInfoSender.Builder(enhancedDataSender, agentInfoFactory) .sendInterval(contextConfig.getAgentInfoSendRetryInterval()) - .setMessageConverter(this.messageConverter) .build(); serverMetaDataRegistryService.addListener(new ServerMetaDataRegistryService.OnChangeListener() { @Override diff --git a/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/context/provider/exception/ExceptionStorageFactoryProvider.java b/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/context/provider/exception/ExceptionStorageFactoryProvider.java index ca5799ece228..627be9fe4f4d 100644 --- a/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/context/provider/exception/ExceptionStorageFactoryProvider.java +++ b/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/context/provider/exception/ExceptionStorageFactoryProvider.java @@ -18,7 +18,6 @@ import com.google.inject.Inject; import com.google.inject.Provider; import com.navercorp.pinpoint.common.profiler.message.EnhancedDataSender; -import com.navercorp.pinpoint.io.ResponseMessage; import com.navercorp.pinpoint.profiler.context.exception.storage.ExceptionStorageFactory; import com.navercorp.pinpoint.profiler.context.module.MetadataDataSender; import com.navercorp.pinpoint.profiler.context.monitor.config.ExceptionTraceConfig; @@ -32,12 +31,12 @@ public class ExceptionStorageFactoryProvider implements Provider { private final ExceptionTraceConfig exceptionTraceConfig; - private final EnhancedDataSender spanTypeDataSender; + private final EnhancedDataSender spanTypeDataSender; @Inject public ExceptionStorageFactoryProvider( ExceptionTraceConfig exceptionTraceConfig, - @MetadataDataSender EnhancedDataSender metadataDataSender + @MetadataDataSender EnhancedDataSender metadataDataSender ) { this.exceptionTraceConfig = Objects.requireNonNull(exceptionTraceConfig, "exceptionTraceConfig"); this.spanTypeDataSender = metadataDataSender; diff --git a/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/context/provider/grpc/AgentGrpcDataSenderProvider.java b/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/context/provider/grpc/AgentGrpcDataSenderProvider.java index 4aa85b091e8c..a387e82b1711 100644 --- a/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/context/provider/grpc/AgentGrpcDataSenderProvider.java +++ b/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/context/provider/grpc/AgentGrpcDataSenderProvider.java @@ -19,15 +19,15 @@ import com.google.inject.Inject; import com.google.inject.Provider; import com.google.protobuf.GeneratedMessageV3; -import com.navercorp.pinpoint.common.profiler.message.EnhancedDataSender; +import com.navercorp.pinpoint.common.profiler.message.AsyncDataSender; import com.navercorp.pinpoint.common.profiler.message.MessageConverter; +import com.navercorp.pinpoint.common.profiler.message.ResultResponse; import com.navercorp.pinpoint.grpc.client.ChannelFactory; import com.navercorp.pinpoint.grpc.client.ChannelFactoryBuilder; import com.navercorp.pinpoint.grpc.client.DefaultChannelFactoryBuilder; import com.navercorp.pinpoint.grpc.client.HeaderFactory; import com.navercorp.pinpoint.grpc.client.UnaryCallDeadlineInterceptor; import com.navercorp.pinpoint.grpc.client.config.ClientOption; -import com.navercorp.pinpoint.io.ResponseMessage; import com.navercorp.pinpoint.profiler.context.active.ActiveTraceRepository; import com.navercorp.pinpoint.profiler.context.grpc.config.GrpcTransportConfig; import com.navercorp.pinpoint.profiler.context.grpc.mapper.ThreadDumpMapper; @@ -56,7 +56,7 @@ /** * @author jaehong.kim */ -public class AgentGrpcDataSenderProvider implements Provider> { +public class AgentGrpcDataSenderProvider implements Provider> { private final Logger logger = LogManager.getLogger(this.getClass()); @@ -109,7 +109,7 @@ public void setClientInterceptor(@AgentDataSender List client } @Override - public EnhancedDataSender get() { + public AsyncDataSender get() { final String collectorIp = grpcTransportConfig.getAgentCollectorIp(); final int collectorPort = grpcTransportConfig.getAgentCollectorPort(); final boolean sslEnable = grpcTransportConfig.isAgentSslEnable(); @@ -127,11 +127,11 @@ public EnhancedDataSender get() { channelFactory, reconnectExecutor, retransmissionExecutor, profilerCommandServiceLocator); } - protected EnhancedDataSender newAgentGrpcDataSender(String collectorIp, int collectorPort, int senderExecutorQueueSize, - MessageConverter messageConverter, - ChannelFactory channelFactory, ReconnectExecutor reconnectExecutor, - ScheduledExecutorService retransmissionExecutor, - ProfilerCommandServiceLocator profilerCommandServiceLocator) { + protected AsyncDataSender newAgentGrpcDataSender(String collectorIp, int collectorPort, int senderExecutorQueueSize, + MessageConverter messageConverter, + ChannelFactory channelFactory, ReconnectExecutor reconnectExecutor, + ScheduledExecutorService retransmissionExecutor, + ProfilerCommandServiceLocator profilerCommandServiceLocator) { return new AgentGrpcDataSender(collectorIp, collectorPort, senderExecutorQueueSize, messageConverter, reconnectExecutor, retransmissionExecutor, channelFactory, profilerCommandServiceLocator); } diff --git a/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/context/provider/grpc/MetadataGrpcDataSenderProvider.java b/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/context/provider/grpc/MetadataGrpcDataSenderProvider.java index 94777f774552..293537ca938d 100644 --- a/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/context/provider/grpc/MetadataGrpcDataSenderProvider.java +++ b/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/context/provider/grpc/MetadataGrpcDataSenderProvider.java @@ -30,7 +30,6 @@ import com.navercorp.pinpoint.grpc.client.config.ClientRetryOption; import com.navercorp.pinpoint.grpc.client.retry.HedgingServiceConfigBuilder; import com.navercorp.pinpoint.grpc.client.retry.RetryHeaderFactory; -import com.navercorp.pinpoint.io.ResponseMessage; import com.navercorp.pinpoint.profiler.context.grpc.config.GrpcTransportConfig; import com.navercorp.pinpoint.profiler.context.module.MetadataDataSender; import com.navercorp.pinpoint.profiler.metadata.MetaDataType; @@ -48,7 +47,7 @@ /** * @author jaehong.kim */ -public class MetadataGrpcDataSenderProvider implements Provider> { +public class MetadataGrpcDataSenderProvider implements Provider> { private final Logger logger = LogManager.getLogger(this.getClass()); @@ -79,7 +78,7 @@ public void setClientInterceptor(@MetadataDataSender List cli } @Override - public EnhancedDataSender get() { + public EnhancedDataSender get() { final String collectorIp = grpcTransportConfig.getMetadataCollectorIp(); final int collectorPort = grpcTransportConfig.getMetadataCollectorPort(); final boolean sslEnable = grpcTransportConfig.isMetadataSslEnable(); diff --git a/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/context/provider/metadata/ApiMetaDataServiceProvider.java b/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/context/provider/metadata/ApiMetaDataServiceProvider.java index a13a29e3c264..f07e1e674985 100644 --- a/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/context/provider/metadata/ApiMetaDataServiceProvider.java +++ b/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/context/provider/metadata/ApiMetaDataServiceProvider.java @@ -19,7 +19,6 @@ import com.google.inject.Inject; import com.google.inject.Provider; import com.navercorp.pinpoint.common.profiler.message.EnhancedDataSender; -import com.navercorp.pinpoint.io.ResponseMessage; import com.navercorp.pinpoint.profiler.cache.SimpleCache; import com.navercorp.pinpoint.profiler.context.module.MetadataDataSender; import com.navercorp.pinpoint.profiler.metadata.ApiMetaDataService; @@ -33,11 +32,11 @@ */ public class ApiMetaDataServiceProvider implements Provider { - private final Provider> enhancedDataSenderProvider; + private final Provider> enhancedDataSenderProvider; private final SimpleCacheFactory simpleCacheFactory; @Inject - public ApiMetaDataServiceProvider(@MetadataDataSender Provider> enhancedDataSenderProvider, SimpleCacheFactory simpleCacheFactory) { + public ApiMetaDataServiceProvider(@MetadataDataSender Provider> enhancedDataSenderProvider, SimpleCacheFactory simpleCacheFactory) { this.enhancedDataSenderProvider = Objects.requireNonNull(enhancedDataSenderProvider, "enhancedDataSenderProvider"); this.simpleCacheFactory = Objects.requireNonNull(simpleCacheFactory, "simpleCacheFactory"); @@ -45,7 +44,7 @@ public ApiMetaDataServiceProvider(@MetadataDataSender Provider enhancedDataSender = this.enhancedDataSenderProvider.get(); + final EnhancedDataSender enhancedDataSender = this.enhancedDataSenderProvider.get(); final SimpleCache simpleCache = simpleCacheFactory.newSimpleCache(); return new DefaultApiMetaDataService(enhancedDataSender, simpleCache); } diff --git a/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/context/provider/metadata/SqlMetadataServiceProvider.java b/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/context/provider/metadata/SqlMetadataServiceProvider.java index b20289a565b2..7254ea35e2d8 100644 --- a/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/context/provider/metadata/SqlMetadataServiceProvider.java +++ b/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/context/provider/metadata/SqlMetadataServiceProvider.java @@ -20,7 +20,6 @@ import com.google.inject.Provider; import com.navercorp.pinpoint.bootstrap.config.ProfilerConfig; import com.navercorp.pinpoint.common.profiler.message.EnhancedDataSender; -import com.navercorp.pinpoint.io.ResponseMessage; import com.navercorp.pinpoint.profiler.cache.SimpleCache; import com.navercorp.pinpoint.profiler.context.module.MetadataDataSender; import com.navercorp.pinpoint.profiler.context.monitor.config.MonitorConfig; @@ -40,13 +39,13 @@ public class SqlMetadataServiceProvider implements Provider { private final ProfilerConfig profilerConfig; private final MonitorConfig monitorConfig; - private final EnhancedDataSender enhancedDataSender; + private final EnhancedDataSender enhancedDataSender; private final SimpleCacheFactory simpleCacheFactory; @Inject public SqlMetadataServiceProvider(ProfilerConfig profilerConfig, MonitorConfig monitorConfig, - @MetadataDataSender EnhancedDataSender enhancedDataSender, + @MetadataDataSender EnhancedDataSender enhancedDataSender, SimpleCacheFactory simpleCacheFactory) { this.profilerConfig = Objects.requireNonNull(profilerConfig, "profilerConfig"); this.monitorConfig = Objects.requireNonNull(monitorConfig, "monitorConfig"); diff --git a/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/context/provider/metadata/StringMetadataServiceProvider.java b/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/context/provider/metadata/StringMetadataServiceProvider.java index 97c070983a99..57f4e81be97d 100644 --- a/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/context/provider/metadata/StringMetadataServiceProvider.java +++ b/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/context/provider/metadata/StringMetadataServiceProvider.java @@ -19,7 +19,6 @@ import com.google.inject.Inject; import com.google.inject.Provider; import com.navercorp.pinpoint.common.profiler.message.EnhancedDataSender; -import com.navercorp.pinpoint.io.ResponseMessage; import com.navercorp.pinpoint.profiler.cache.SimpleCache; import com.navercorp.pinpoint.profiler.context.module.MetadataDataSender; import com.navercorp.pinpoint.profiler.metadata.DefaultStringMetaDataService; @@ -34,11 +33,11 @@ */ public class StringMetadataServiceProvider implements Provider { - private final EnhancedDataSender enhancedDataSender; + private final EnhancedDataSender enhancedDataSender; private final SimpleCacheFactory simpleCacheFactory; @Inject - public StringMetadataServiceProvider(@MetadataDataSender EnhancedDataSender enhancedDataSender, SimpleCacheFactory simpleCacheFactory) { + public StringMetadataServiceProvider(@MetadataDataSender EnhancedDataSender enhancedDataSender, SimpleCacheFactory simpleCacheFactory) { this.enhancedDataSender = Objects.requireNonNull(enhancedDataSender, "enhancedDataSender"); this.simpleCacheFactory = Objects.requireNonNull(simpleCacheFactory, "simpleCacheFactory"); } diff --git a/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/metadata/DefaultApiMetaDataService.java b/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/metadata/DefaultApiMetaDataService.java index 9f88a57d766e..ae237335e82b 100644 --- a/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/metadata/DefaultApiMetaDataService.java +++ b/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/metadata/DefaultApiMetaDataService.java @@ -18,7 +18,6 @@ import com.navercorp.pinpoint.bootstrap.context.MethodDescriptor; import com.navercorp.pinpoint.common.profiler.message.EnhancedDataSender; -import com.navercorp.pinpoint.io.ResponseMessage; import com.navercorp.pinpoint.profiler.cache.Result; import com.navercorp.pinpoint.profiler.cache.SimpleCache; @@ -31,9 +30,9 @@ public class DefaultApiMetaDataService implements ApiMetaDataService { private final SimpleCache apiCache; - private final EnhancedDataSender enhancedDataSender; + private final EnhancedDataSender enhancedDataSender; - public DefaultApiMetaDataService(EnhancedDataSender enhancedDataSender, SimpleCache apiCache) { + public DefaultApiMetaDataService(EnhancedDataSender enhancedDataSender, SimpleCache apiCache) { this.enhancedDataSender = Objects.requireNonNull(enhancedDataSender, "enhancedDataSender"); this.apiCache = Objects.requireNonNull(apiCache, "apiCache"); } diff --git a/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/metadata/DefaultStringMetaDataService.java b/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/metadata/DefaultStringMetaDataService.java index 93dc0c0e3875..260fd62052c6 100644 --- a/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/metadata/DefaultStringMetaDataService.java +++ b/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/metadata/DefaultStringMetaDataService.java @@ -17,7 +17,6 @@ package com.navercorp.pinpoint.profiler.metadata; import com.navercorp.pinpoint.common.profiler.message.EnhancedDataSender; -import com.navercorp.pinpoint.io.ResponseMessage; import com.navercorp.pinpoint.profiler.cache.Result; import com.navercorp.pinpoint.profiler.cache.SimpleCache; @@ -30,9 +29,9 @@ public class DefaultStringMetaDataService implements StringMetaDataService { private final SimpleCache stringCache; - private final EnhancedDataSender enhancedDataSender; + private final EnhancedDataSender enhancedDataSender; - public DefaultStringMetaDataService(EnhancedDataSender enhancedDataSender, SimpleCache stringCache) { + public DefaultStringMetaDataService(EnhancedDataSender enhancedDataSender, SimpleCache stringCache) { this.enhancedDataSender = Objects.requireNonNull(enhancedDataSender, "enhancedDataSender"); this.stringCache = Objects.requireNonNull(stringCache, "stringCache"); diff --git a/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/metadata/SqlCacheService.java b/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/metadata/SqlCacheService.java index 5df535a4783d..612bcbfb2876 100644 --- a/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/metadata/SqlCacheService.java +++ b/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/metadata/SqlCacheService.java @@ -1,7 +1,6 @@ package com.navercorp.pinpoint.profiler.metadata; import com.navercorp.pinpoint.common.profiler.message.EnhancedDataSender; -import com.navercorp.pinpoint.io.ResponseMessage; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -14,9 +13,9 @@ public class SqlCacheService { private final CachingSqlNormalizer> cachingSqlNormalizer; - private final EnhancedDataSender enhancedDataSender; + private final EnhancedDataSender enhancedDataSender; - public SqlCacheService(EnhancedDataSender enhancedDataSender, CachingSqlNormalizer> cachingSqlNormalizer) { + public SqlCacheService(EnhancedDataSender enhancedDataSender, CachingSqlNormalizer> cachingSqlNormalizer) { this.enhancedDataSender = Objects.requireNonNull(enhancedDataSender, "enhancedDataSender"); this.cachingSqlNormalizer = Objects.requireNonNull(cachingSqlNormalizer, "cachingSqlNormalizer"); } diff --git a/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/sender/grpc/AgentGrpcDataSender.java b/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/sender/grpc/AgentGrpcDataSender.java index 5e6082f1c113..601c8d475a6c 100644 --- a/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/sender/grpc/AgentGrpcDataSender.java +++ b/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/sender/grpc/AgentGrpcDataSender.java @@ -17,26 +17,26 @@ package com.navercorp.pinpoint.profiler.sender.grpc; import com.google.protobuf.GeneratedMessageV3; -import com.navercorp.pinpoint.common.profiler.message.EnhancedDataSender; +import com.navercorp.pinpoint.common.profiler.message.AsyncDataSender; import com.navercorp.pinpoint.common.profiler.message.MessageConverter; +import com.navercorp.pinpoint.common.profiler.message.ResultResponse; import com.navercorp.pinpoint.grpc.client.ChannelFactory; import com.navercorp.pinpoint.grpc.client.SocketIdClientInterceptor; import com.navercorp.pinpoint.grpc.trace.AgentGrpc; import com.navercorp.pinpoint.grpc.trace.PAgentInfo; import com.navercorp.pinpoint.grpc.trace.PResult; -import com.navercorp.pinpoint.io.ResponseMessage; +import com.navercorp.pinpoint.profiler.metadata.MetaDataType; import com.navercorp.pinpoint.profiler.receiver.ProfilerCommandServiceLocator; import com.navercorp.pinpoint.profiler.receiver.grpc.CommandServiceStubFactory; import com.navercorp.pinpoint.profiler.receiver.grpc.GrpcCommandService; -import io.grpc.stub.StreamObserver; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ScheduledExecutorService; -import java.util.function.BiConsumer; /** * @author jaehong.kim */ -public class AgentGrpcDataSender extends GrpcDataSender implements EnhancedDataSender { +public class AgentGrpcDataSender extends GrpcDataSender implements AsyncDataSender { private final AgentGrpc.AgentStub agentInfoStub; private final AgentGrpc.AgentStub agentPingStub; private final GrpcCommandService grpcCommandService; @@ -47,7 +47,7 @@ public class AgentGrpcDataSender extends GrpcDataSender implements Enhance private final Reconnector reconnector; public AgentGrpcDataSender(String host, int port, int executorQueueSize, - MessageConverter messageConverter, + MessageConverter messageConverter, ReconnectExecutor reconnectExecutor, final ScheduledExecutorService retransmissionExecutor, ChannelFactory channelFactory, @@ -84,33 +84,21 @@ private PingStreamContext newPingStream(AgentGrpc.AgentStub agentStub, Scheduled return pingStreamContext; } - @Override - public boolean request(T data) { - throw new UnsupportedOperationException("unsupported operation request(data)"); - } - - @Override - public boolean request(T data, int retryCount) { - throw new UnsupportedOperationException("unsupported operation request(data, retryCount)"); - } @Override - public boolean request(T data, final BiConsumer listener) { + public CompletableFuture request(MetaDataType data) { final GeneratedMessageV3 message = this.messageConverter.toMessage(data); if (!(message instanceof PAgentInfo)) { throw new IllegalArgumentException("unsupported message " + data); } final PAgentInfo pAgentInfo = (PAgentInfo) message; - this.agentInfoStub.requestAgentInfo(pAgentInfo, new FutureListenerStreamObserver(listener)); - return true; + CompletableFutureObserver observer = new CompletableFutureObserver<>(PResults::toResponse); + this.agentInfoStub.requestAgentInfo(pAgentInfo, observer); + return observer.future(); } - @Override - public boolean send(Object data) { - throw new UnsupportedOperationException("unsupported operation send(data)"); - } @Override public void stop() { @@ -136,27 +124,4 @@ public void stop() { this.release(); } - - private static class FutureListenerStreamObserver implements StreamObserver { - private final BiConsumer listener; - - private FutureListenerStreamObserver(BiConsumer listener) { - this.listener = listener; - } - - @Override - public void onNext(PResult result) { - final ResponseMessage response = ResponseMessage.wrap(result.toByteArray()); - listener.accept(response, null); - } - - @Override - public void onError(Throwable throwable) { - listener.accept(null, throwable); - } - - @Override - public void onCompleted() { - } - } } \ No newline at end of file diff --git a/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/sender/grpc/CompletableFutureObserver.java b/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/sender/grpc/CompletableFutureObserver.java new file mode 100644 index 000000000000..3974df35d4aa --- /dev/null +++ b/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/sender/grpc/CompletableFutureObserver.java @@ -0,0 +1,39 @@ +package com.navercorp.pinpoint.profiler.sender.grpc; + +import io.grpc.stub.StreamObserver; + +import java.util.Objects; +import java.util.concurrent.CompletableFuture; +import java.util.function.Function; + +public class CompletableFutureObserver implements StreamObserver { + private final CompletableFuture future = new CompletableFuture<>(); + private final Function converter; + + CompletableFutureObserver(Function converter) { + this.converter = Objects.requireNonNull(converter, "converter"); + } + + @Override + public void onNext(T value) { + R response = converter.apply(value); + this.future.complete(response); + } + + @Override + public void onError(Throwable throwable) { + this.future.completeExceptionally(throwable); + } + + @Override + public void onCompleted() { + final CompletableFuture future = this.future; + if (!future.isDone()) { + future.completeExceptionally(new Exception("Response is not arrived")); + } + } + + public CompletableFuture future() { + return future; + } +} diff --git a/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/sender/grpc/MetadataGrpcDataSender.java b/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/sender/grpc/MetadataGrpcDataSender.java index 2fd0d7bdeb0a..274fe703a2dc 100644 --- a/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/sender/grpc/MetadataGrpcDataSender.java +++ b/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/sender/grpc/MetadataGrpcDataSender.java @@ -29,7 +29,6 @@ import com.navercorp.pinpoint.grpc.trace.PSqlMetaData; import com.navercorp.pinpoint.grpc.trace.PSqlUidMetaData; import com.navercorp.pinpoint.grpc.trace.PStringMetaData; -import com.navercorp.pinpoint.io.ResponseMessage; import io.grpc.stub.StreamObserver; import io.netty.util.HashedWheelTimer; import io.netty.util.Timeout; @@ -39,12 +38,11 @@ import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; -import java.util.function.BiConsumer; /** * @author jaehong.kim */ -public class MetadataGrpcDataSender extends GrpcDataSender implements EnhancedDataSender { +public class MetadataGrpcDataSender extends GrpcDataSender implements EnhancedDataSender { // private final MetadataGrpc.MetadataStub metadataStub; private final int maxAttempts; @@ -90,11 +88,6 @@ public boolean request(T data, int retry) { throw new UnsupportedOperationException("unsupported operation request(data, retry)"); } - @Override - public boolean request(T data, BiConsumer listener) { - throw new UnsupportedOperationException("unsupported operation request(data, listener)"); - } - @Override public boolean send(T data) { throw new UnsupportedOperationException("unsupported operation send(data)"); diff --git a/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/sender/grpc/MetadataGrpcHedgingDataSender.java b/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/sender/grpc/MetadataGrpcHedgingDataSender.java index 24a6fbe4bc33..ef9d3fdd45a6 100644 --- a/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/sender/grpc/MetadataGrpcHedgingDataSender.java +++ b/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/sender/grpc/MetadataGrpcHedgingDataSender.java @@ -31,18 +31,16 @@ import com.navercorp.pinpoint.grpc.trace.PSqlMetaData; import com.navercorp.pinpoint.grpc.trace.PSqlUidMetaData; import com.navercorp.pinpoint.grpc.trace.PStringMetaData; -import com.navercorp.pinpoint.io.ResponseMessage; import io.grpc.stub.StreamObserver; import java.util.concurrent.ExecutorService; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicLong; -import java.util.function.BiConsumer; /** */ -public class MetadataGrpcHedgingDataSender extends AbstractGrpcDataSender implements EnhancedDataSender { +public class MetadataGrpcHedgingDataSender extends AbstractGrpcDataSender implements EnhancedDataSender { private final MetadataGrpc.MetadataStub metadataStub; @@ -71,11 +69,6 @@ public boolean request(T data, int retry) { throw new UnsupportedOperationException("unsupported operation request(data, retry)"); } - @Override - public boolean request(T data, BiConsumer listener) { - throw new UnsupportedOperationException("unsupported operation request(data, listener)"); - } - @Override public boolean send(T data) { throw new UnsupportedOperationException("unsupported operation send(data)"); diff --git a/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/sender/grpc/PResults.java b/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/sender/grpc/PResults.java new file mode 100644 index 000000000000..e29f8b94395b --- /dev/null +++ b/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/sender/grpc/PResults.java @@ -0,0 +1,11 @@ +package com.navercorp.pinpoint.profiler.sender.grpc; + +import com.navercorp.pinpoint.common.profiler.message.DefaultResultResponse; +import com.navercorp.pinpoint.common.profiler.message.ResultResponse; +import com.navercorp.pinpoint.grpc.trace.PResult; + +public final class PResults { + public static ResultResponse toResponse(PResult result) { + return new DefaultResultResponse(result.getSuccess(), result.getMessage()); + } +} diff --git a/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/sender/grpc/metric/ChannelzScheduledReporter.java b/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/sender/grpc/metric/ChannelzScheduledReporter.java index f77dc5ac7687..70564c8e4016 100644 --- a/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/sender/grpc/metric/ChannelzScheduledReporter.java +++ b/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/sender/grpc/metric/ChannelzScheduledReporter.java @@ -1,7 +1,10 @@ package com.navercorp.pinpoint.profiler.sender.grpc.metric; -public interface ChannelzScheduledReporter { +import com.navercorp.pinpoint.common.profiler.Stoppable; + +public interface ChannelzScheduledReporter extends Stoppable { void registerRootChannel(long id, ChannelzReporter reporter); + @Override void stop(); } diff --git a/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/sender/grpc/metric/EmptyChannelzScheduledReporter.java b/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/sender/grpc/metric/EmptyChannelzScheduledReporter.java index 2a7c7d856e7e..e7a4d4c0ad4f 100644 --- a/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/sender/grpc/metric/EmptyChannelzScheduledReporter.java +++ b/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/sender/grpc/metric/EmptyChannelzScheduledReporter.java @@ -1,6 +1,6 @@ package com.navercorp.pinpoint.profiler.sender.grpc.metric; -public class EmptyChannelzScheduledReporter implements ChannelzScheduledReporter{ +public class EmptyChannelzScheduledReporter implements ChannelzScheduledReporter { @Override public void registerRootChannel(long id, ChannelzReporter reporter) { diff --git a/agent-module/profiler/src/test/java/com/navercorp/pinpoint/profiler/metadata/DefaultApiMetaDataServiceTest.java b/agent-module/profiler/src/test/java/com/navercorp/pinpoint/profiler/metadata/DefaultApiMetaDataServiceTest.java index 922b8d9afa0b..8a2d4ad0daa5 100644 --- a/agent-module/profiler/src/test/java/com/navercorp/pinpoint/profiler/metadata/DefaultApiMetaDataServiceTest.java +++ b/agent-module/profiler/src/test/java/com/navercorp/pinpoint/profiler/metadata/DefaultApiMetaDataServiceTest.java @@ -18,7 +18,6 @@ import com.navercorp.pinpoint.bootstrap.context.MethodDescriptor; import com.navercorp.pinpoint.common.profiler.message.EnhancedDataSender; -import com.navercorp.pinpoint.io.ResponseMessage; import com.navercorp.pinpoint.profiler.cache.IdAllocator; import com.navercorp.pinpoint.profiler.cache.SimpleCache; import com.navercorp.pinpoint.profiler.context.DefaultMethodDescriptor; @@ -36,7 +35,7 @@ public class DefaultApiMetaDataServiceTest { @Test public void cacheApi() { - EnhancedDataSender dataSender = mock(EnhancedDataSender.class); + EnhancedDataSender dataSender = mock(EnhancedDataSender.class); SimpleCache cache = new SimpleCache<>(new IdAllocator.ZigZagAllocator(1)); ApiMetaDataService apiMetaDataService = new DefaultApiMetaDataService(dataSender, cache); diff --git a/agent-module/profiler/src/test/java/com/navercorp/pinpoint/profiler/metadata/DefaultStringMetaDataServiceTest.java b/agent-module/profiler/src/test/java/com/navercorp/pinpoint/profiler/metadata/DefaultStringMetaDataServiceTest.java index 06fa2e538645..d3cb95e76ef5 100644 --- a/agent-module/profiler/src/test/java/com/navercorp/pinpoint/profiler/metadata/DefaultStringMetaDataServiceTest.java +++ b/agent-module/profiler/src/test/java/com/navercorp/pinpoint/profiler/metadata/DefaultStringMetaDataServiceTest.java @@ -17,7 +17,6 @@ package com.navercorp.pinpoint.profiler.metadata; import com.navercorp.pinpoint.common.profiler.message.EnhancedDataSender; -import com.navercorp.pinpoint.io.ResponseMessage; import com.navercorp.pinpoint.profiler.cache.IdAllocator; import com.navercorp.pinpoint.profiler.cache.SimpleCache; import org.junit.jupiter.api.Assertions; @@ -34,7 +33,7 @@ public class DefaultStringMetaDataServiceTest { @Test public void cacheString() { - EnhancedDataSender dataSender = mock(EnhancedDataSender.class); + EnhancedDataSender dataSender = mock(EnhancedDataSender.class); SimpleCache stringCache = new SimpleCache<>(new IdAllocator.ZigZagAllocator()); StringMetaDataService stringMetaDataService = new DefaultStringMetaDataService(dataSender, stringCache); diff --git a/agent-module/profiler/src/test/java/com/navercorp/pinpoint/profiler/metadata/SqlCacheServiceTest.java b/agent-module/profiler/src/test/java/com/navercorp/pinpoint/profiler/metadata/SqlCacheServiceTest.java index daad6867d258..386de6ef73e4 100644 --- a/agent-module/profiler/src/test/java/com/navercorp/pinpoint/profiler/metadata/SqlCacheServiceTest.java +++ b/agent-module/profiler/src/test/java/com/navercorp/pinpoint/profiler/metadata/SqlCacheServiceTest.java @@ -17,7 +17,6 @@ package com.navercorp.pinpoint.profiler.metadata; import com.navercorp.pinpoint.common.profiler.message.EnhancedDataSender; -import com.navercorp.pinpoint.io.ResponseMessage; import com.navercorp.pinpoint.profiler.cache.IdAllocator; import com.navercorp.pinpoint.profiler.cache.SimpleCache; import org.junit.jupiter.api.Assertions; @@ -34,7 +33,7 @@ public class SqlCacheServiceTest { @Test public void cacheSql() { - final EnhancedDataSender dataSender = mock(EnhancedDataSender.class); + final EnhancedDataSender dataSender = mock(EnhancedDataSender.class); SimpleCache sqlCache = new SimpleCache<>(new IdAllocator.ZigZagAllocator(), 100); SimpleCachingSqlNormalizer simpleCachingSqlNormalizer = new SimpleCachingSqlNormalizer(sqlCache); final SqlCacheService sqlMetaDataService = new SqlCacheService<>(dataSender, simpleCachingSqlNormalizer); diff --git a/agent-module/profiler/src/test/java/com/navercorp/pinpoint/profiler/metadata/SqlUidMetaDataServiceTest.java b/agent-module/profiler/src/test/java/com/navercorp/pinpoint/profiler/metadata/SqlUidMetaDataServiceTest.java index 740b2adf0b87..da54193adcc5 100644 --- a/agent-module/profiler/src/test/java/com/navercorp/pinpoint/profiler/metadata/SqlUidMetaDataServiceTest.java +++ b/agent-module/profiler/src/test/java/com/navercorp/pinpoint/profiler/metadata/SqlUidMetaDataServiceTest.java @@ -1,7 +1,6 @@ package com.navercorp.pinpoint.profiler.metadata; import com.navercorp.pinpoint.common.profiler.message.EnhancedDataSender; -import com.navercorp.pinpoint.io.ResponseMessage; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -22,7 +21,7 @@ class SqlUidMetaDataServiceTest { SqlUidMetaDataService sut; @Mock - EnhancedDataSender dataSender; + EnhancedDataSender dataSender; AutoCloseable autoCloseable; diff --git a/agent-module/profiler/src/test/java/com/navercorp/pinpoint/profiler/sender/CountingDataSender.java b/agent-module/profiler/src/test/java/com/navercorp/pinpoint/profiler/sender/CountingDataSender.java index 8bfe0adabe00..4c78121fd25b 100644 --- a/agent-module/profiler/src/test/java/com/navercorp/pinpoint/profiler/sender/CountingDataSender.java +++ b/agent-module/profiler/src/test/java/com/navercorp/pinpoint/profiler/sender/CountingDataSender.java @@ -17,18 +17,16 @@ package com.navercorp.pinpoint.profiler.sender; import com.navercorp.pinpoint.common.profiler.message.EnhancedDataSender; -import com.navercorp.pinpoint.io.ResponseMessage; import com.navercorp.pinpoint.profiler.context.Span; import com.navercorp.pinpoint.profiler.context.SpanChunk; import com.navercorp.pinpoint.profiler.context.SpanType; import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.BiConsumer; /** * @author emeroad */ -public class CountingDataSender implements EnhancedDataSender { +public class CountingDataSender implements EnhancedDataSender { private final AtomicInteger requestCounter = new AtomicInteger(); private final AtomicInteger requestRetryCounter = new AtomicInteger(); @@ -51,11 +49,6 @@ public boolean request(SpanType data, int retry) { return false; } - @Override - public boolean request(SpanType data, BiConsumer listener) { - return false; - } - @Override public boolean send(SpanType data) { diff --git a/agent-module/profiler/src/test/java/com/navercorp/pinpoint/profiler/sender/grpc/AgentGrpcDataSenderTestMain.java b/agent-module/profiler/src/test/java/com/navercorp/pinpoint/profiler/sender/grpc/AgentGrpcDataSenderTestMain.java index 7d37c71b2532..ac8ef9764f37 100644 --- a/agent-module/profiler/src/test/java/com/navercorp/pinpoint/profiler/sender/grpc/AgentGrpcDataSenderTestMain.java +++ b/agent-module/profiler/src/test/java/com/navercorp/pinpoint/profiler/sender/grpc/AgentGrpcDataSenderTestMain.java @@ -70,7 +70,7 @@ public void request() throws Exception { channelFactoryBuilder.setClientOption(new ClientOption()); ChannelFactory channelFactory = channelFactoryBuilder.build(); - AgentGrpcDataSender sender = new AgentGrpcDataSender<>("localhost", 9997, 1, messageConverter, + AgentGrpcDataSender sender = new AgentGrpcDataSender("localhost", 9997, 1, messageConverter, reconnectExecutor, scheduledExecutorService, channelFactory, null); AgentInfo agentInfo = newAgentInfo(); diff --git a/agent-module/profiler/src/test/java/com/navercorp/pinpoint/profiler/sender/grpc/CompletableFutureObserverTest.java b/agent-module/profiler/src/test/java/com/navercorp/pinpoint/profiler/sender/grpc/CompletableFutureObserverTest.java new file mode 100644 index 000000000000..c998b3edc78c --- /dev/null +++ b/agent-module/profiler/src/test/java/com/navercorp/pinpoint/profiler/sender/grpc/CompletableFutureObserverTest.java @@ -0,0 +1,51 @@ +package com.navercorp.pinpoint.profiler.sender.grpc; + +import com.navercorp.pinpoint.common.profiler.message.ResultResponse; +import com.navercorp.pinpoint.grpc.trace.PResult; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.concurrent.CompletableFuture; + +import static org.junit.jupiter.api.Assertions.assertThrows; + +class CompletableFutureObserverTest { + @Test + void testFuture_response_not_arrive() { + CompletableFutureObserver observer = new CompletableFutureObserver<>(PResults::toResponse); + observer.onCompleted(); + + CompletableFuture future = observer.future(); + assertThrows(Exception.class, future::get); + } + + @Test + void testFuture_response_arrive() { + CompletableFutureObserver observer = new CompletableFutureObserver<>(PResults::toResponse); + PResult result = PResult.newBuilder() + .setSuccess(true) + .setMessage("hello") + .build(); + observer.onNext(result); + + CompletableFuture future = observer.future(); + String message = future.join().getMessage(); + Assertions.assertEquals("hello", message); + } + + + @Test + void testFuture_response_arrive_and_complete() { + CompletableFutureObserver observer = new CompletableFutureObserver<>(PResults::toResponse); + PResult result = PResult.newBuilder() + .setSuccess(true) + .setMessage("hello") + .build(); + observer.onNext(result); + observer.onCompleted(); + + CompletableFuture future = observer.future(); + String message = future.join().getMessage(); + Assertions.assertEquals("hello", message); + } +} \ No newline at end of file diff --git a/commons-profiler/src/main/java/com/navercorp/pinpoint/common/profiler/Stoppable.java b/commons-profiler/src/main/java/com/navercorp/pinpoint/common/profiler/Stoppable.java new file mode 100644 index 000000000000..0e085dbdd370 --- /dev/null +++ b/commons-profiler/src/main/java/com/navercorp/pinpoint/common/profiler/Stoppable.java @@ -0,0 +1,14 @@ +package com.navercorp.pinpoint.common.profiler; + +public interface Stoppable { + void stop(); + + static void stopQuietly(Stoppable stoppable) { + if (stoppable != null) { + try { + stoppable.stop(); + } catch (Exception ignore) { + } + } + } +} diff --git a/commons-profiler/src/main/java/com/navercorp/pinpoint/common/profiler/message/AsyncDataSender.java b/commons-profiler/src/main/java/com/navercorp/pinpoint/common/profiler/message/AsyncDataSender.java new file mode 100644 index 000000000000..abb18fe4fdc0 --- /dev/null +++ b/commons-profiler/src/main/java/com/navercorp/pinpoint/common/profiler/message/AsyncDataSender.java @@ -0,0 +1,11 @@ +package com.navercorp.pinpoint.common.profiler.message; + +import java.util.concurrent.CompletableFuture; + +/** + * @author emeroad + */ +public interface AsyncDataSender extends DataSender { + + CompletableFuture request(REQ data); +} diff --git a/commons-profiler/src/main/java/com/navercorp/pinpoint/common/profiler/message/DataSender.java b/commons-profiler/src/main/java/com/navercorp/pinpoint/common/profiler/message/DataSender.java index 8f2bcf5d12d5..fe22fc32b16d 100644 --- a/commons-profiler/src/main/java/com/navercorp/pinpoint/common/profiler/message/DataSender.java +++ b/commons-profiler/src/main/java/com/navercorp/pinpoint/common/profiler/message/DataSender.java @@ -16,14 +16,17 @@ package com.navercorp.pinpoint.common.profiler.message; +import com.navercorp.pinpoint.common.profiler.Stoppable; + /** * @author emeroad * @author netspider */ -public interface DataSender { +public interface DataSender extends Stoppable { boolean send(REQ data); + @Override void stop(); } diff --git a/commons-profiler/src/main/java/com/navercorp/pinpoint/common/profiler/message/EmptyDataSender.java b/commons-profiler/src/main/java/com/navercorp/pinpoint/common/profiler/message/EmptyDataSender.java index 89c9d4dff426..b919b83ffe35 100644 --- a/commons-profiler/src/main/java/com/navercorp/pinpoint/common/profiler/message/EmptyDataSender.java +++ b/commons-profiler/src/main/java/com/navercorp/pinpoint/common/profiler/message/EmptyDataSender.java @@ -17,13 +17,10 @@ package com.navercorp.pinpoint.common.profiler.message; -import java.util.function.BiConsumer; - - /** * @author Woonduk Kang(emeroad) */ -public class EmptyDataSender implements EnhancedDataSender { +public class EmptyDataSender implements EnhancedDataSender { private static final DataSender INSTANCE = new EmptyDataSender<>(); @@ -52,10 +49,4 @@ public boolean request(REQ data, int retry) { return false; } - - @Override - public boolean request(REQ data, BiConsumer listener) { - return false; - } - } \ No newline at end of file diff --git a/commons-profiler/src/main/java/com/navercorp/pinpoint/common/profiler/message/EnhancedDataSender.java b/commons-profiler/src/main/java/com/navercorp/pinpoint/common/profiler/message/EnhancedDataSender.java index 3120e2e4abf8..3ffab5d91675 100644 --- a/commons-profiler/src/main/java/com/navercorp/pinpoint/common/profiler/message/EnhancedDataSender.java +++ b/commons-profiler/src/main/java/com/navercorp/pinpoint/common/profiler/message/EnhancedDataSender.java @@ -1,16 +1,12 @@ package com.navercorp.pinpoint.common.profiler.message; -import java.util.function.BiConsumer; - /** * @author emeroad */ -public interface EnhancedDataSender extends DataSender { +public interface EnhancedDataSender extends DataSender { boolean request(REQ data); boolean request(REQ data, int retry); - boolean request(REQ data, BiConsumer listener); - } diff --git a/commons-profiler/src/main/java/com/navercorp/pinpoint/common/profiler/message/LoggingDataSender.java b/commons-profiler/src/main/java/com/navercorp/pinpoint/common/profiler/message/LoggingDataSender.java index d493397e0863..f26a12b54aca 100644 --- a/commons-profiler/src/main/java/com/navercorp/pinpoint/common/profiler/message/LoggingDataSender.java +++ b/commons-profiler/src/main/java/com/navercorp/pinpoint/common/profiler/message/LoggingDataSender.java @@ -19,14 +19,12 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import java.util.function.BiConsumer; - /** * @author emeroad * @author netspider */ -public class LoggingDataSender implements EnhancedDataSender { +public class LoggingDataSender implements EnhancedDataSender { private final Logger logger = LogManager.getLogger(this.getClass()); @@ -55,10 +53,4 @@ public boolean request(REQ data, int retry) { } - @Override - public boolean request(REQ data, BiConsumer listener) { - logger.info("request tBase:{} FutureListener:{}", data, listener); - return false; - } - } \ No newline at end of file diff --git a/thrift-datasender/src/main/java/com/navercorp/pinpoint/thrift/sender/TcpDataSender.java b/thrift-datasender/src/main/java/com/navercorp/pinpoint/thrift/sender/TcpDataSender.java index 34316d55565f..af7ae948c67d 100644 --- a/thrift-datasender/src/main/java/com/navercorp/pinpoint/thrift/sender/TcpDataSender.java +++ b/thrift-datasender/src/main/java/com/navercorp/pinpoint/thrift/sender/TcpDataSender.java @@ -54,7 +54,7 @@ * @author koo.taejin * @author netspider */ -public class TcpDataSender implements EnhancedDataSender, ReconnectEventListenerRegistry { +public class TcpDataSender implements EnhancedDataSender, ReconnectEventListenerRegistry { private static final int DEFAULT_QUEUE_SIZE = 1024 * 5; @@ -168,11 +168,6 @@ public boolean request(T data, int retryCount) { return executor.execute(message); } - @Override - public boolean request(T data, BiConsumer listener) { - final RequestMessage message = RequestMessageFactory.request(data, 3, listener); - return executor.execute(message); - } public boolean isConnected() { return client.isConnected();