From 84db9f1f2b9a1a94ed563096802fbd11f6367fe3 Mon Sep 17 00:00:00 2001 From: emeroad Date: Wed, 25 Sep 2024 17:38:56 +0900 Subject: [PATCH] [#noissue] Cleanup --- .../receiver/grpc/EmptyStreamObserver.java | 2 +- .../grpc/GrpcActiveThreadCountService.java | 12 ++++++------ .../grpc/GrpcActiveThreadDumpService.java | 2 +- .../client/AbstractChannelServiceClient.java | 2 +- .../service/server/ChannelServiceServerImpl.java | 7 ++++--- .../receiver/grpc/service/AgentService.java | 11 ++++++----- .../log/web/websocket/LogWebSocketHandler.java | 11 +++++++++-- .../receiver/grpc/GrpcCommandService.java | 2 ++ .../service/ActiveThreadCountServiceImpl.java | 11 +++++------ .../RedisActiveThreadCountWebSocketHandler.java | 6 +++++- .../RedisActiveThreadCountHandlerAdaptor.java | 16 ++++++++++++++-- .../web/websocket/ActiveThreadCountHandler.java | 12 +++++++++--- 12 files changed, 63 insertions(+), 31 deletions(-) diff --git a/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/receiver/grpc/EmptyStreamObserver.java b/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/receiver/grpc/EmptyStreamObserver.java index 1c97e2ab8be2..ed7b12032c8a 100644 --- a/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/receiver/grpc/EmptyStreamObserver.java +++ b/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/receiver/grpc/EmptyStreamObserver.java @@ -45,7 +45,7 @@ public void onCompleted() { logger.info("onCompleted."); } - static StreamObserver create() { + static StreamObserver create() { return new EmptyStreamObserver(); } diff --git a/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/receiver/grpc/GrpcActiveThreadCountService.java b/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/receiver/grpc/GrpcActiveThreadCountService.java index 992b0a220220..ecb97b6ad64e 100644 --- a/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/receiver/grpc/GrpcActiveThreadCountService.java +++ b/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/receiver/grpc/GrpcActiveThreadCountService.java @@ -42,8 +42,8 @@ public class GrpcActiveThreadCountService implements ProfilerGrpcCommandService, private static final long DEFAULT_FLUSH_DELAY = 1000; - private static final Logger LOGGER = LogManager.getLogger(GrpcActiveThreadCountService.class); - private final boolean isDebug = LOGGER.isDebugEnabled(); + private final Logger logger = LogManager.getLogger(getClass()); + private final boolean isDebug = logger.isDebugEnabled(); private final ActiveTraceRepository activeTraceRepository; @@ -85,7 +85,7 @@ private PCmdActiveThreadCountRes.Builder getActiveThreadCountResponse() { @Override public void close() throws IOException { - LOGGER.info("close"); + logger.info("close"); grpcStreamService.close(); } @@ -94,7 +94,7 @@ private class ActiveThreadCountTimerTask extends TimerTask { @Override public void run() { if (isDebug) { - LOGGER.debug("ActiveThreadCountTimerTask started. streamSocketList:{}", Arrays.toString(grpcStreamService.getStreamSocketList())); + logger.debug("ActiveThreadCountTimerTask started. streamSocketList:{}", Arrays.toString(grpcStreamService.getStreamSocketList())); } PCmdActiveThreadCountRes.Builder activeThreadCountResponseBuilder = getActiveThreadCountResponse(); @@ -109,10 +109,10 @@ public void run() { stream.send(activeThreadCount); if (isDebug) { - LOGGER.debug("ActiveThreadCountStreamSocket. {}", stream); + logger.debug("ActiveThreadCountStreamSocket. {}", stream); } } catch (Throwable e) { - LOGGER.warn("failed to execute ActiveThreadCountTimerTask.run method. streamSocket:{}, message:{}", streamSocket, e.getMessage(), e); + logger.warn("failed to execute ActiveThreadCountTimerTask.run method. streamSocket:{}, message:{}", streamSocket, e.getMessage(), e); streamSocket.close(e); } } diff --git a/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/receiver/grpc/GrpcActiveThreadDumpService.java b/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/receiver/grpc/GrpcActiveThreadDumpService.java index defbf74c0049..4b7925f80d8a 100644 --- a/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/receiver/grpc/GrpcActiveThreadDumpService.java +++ b/agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/receiver/grpc/GrpcActiveThreadDumpService.java @@ -96,7 +96,7 @@ private List getActiveThreadDumpList(PCmdActiveThreadDump com } private List toPActiveThreadDump(Collection activeTraceInfoList) { - final List result = new ArrayList(activeTraceInfoList.size()); + final List result = new ArrayList<>(activeTraceInfoList.size()); for (ThreadDump threadDump : activeTraceInfoList) { PActiveThreadDump pActiveThreadLightDump = createActiveThreadDump(threadDump); result.add(pActiveThreadLightDump); diff --git a/channel/src/main/java/com/navercorp/pinpoint/channel/service/client/AbstractChannelServiceClient.java b/channel/src/main/java/com/navercorp/pinpoint/channel/service/client/AbstractChannelServiceClient.java index b386aeaca57e..cd06822552ea 100644 --- a/channel/src/main/java/com/navercorp/pinpoint/channel/service/client/AbstractChannelServiceClient.java +++ b/channel/src/main/java/com/navercorp/pinpoint/channel/service/client/AbstractChannelServiceClient.java @@ -73,7 +73,7 @@ protected Subscription subscribe( getProtocol() ); Subscription subscription = supplyChannel.subscribe(subConsumer); - deferredDisposable.setDisposable(() -> subscription.unsubscribe()); + deferredDisposable.setDisposable(subscription::unsubscribe); return subscription; } diff --git a/channel/src/main/java/com/navercorp/pinpoint/channel/service/server/ChannelServiceServerImpl.java b/channel/src/main/java/com/navercorp/pinpoint/channel/service/server/ChannelServiceServerImpl.java index 4c4bc84c1b39..535416558b2c 100644 --- a/channel/src/main/java/com/navercorp/pinpoint/channel/service/server/ChannelServiceServerImpl.java +++ b/channel/src/main/java/com/navercorp/pinpoint/channel/service/server/ChannelServiceServerImpl.java @@ -134,7 +134,8 @@ public FluxDemandHandler(ChannelServiceFluxBackend backend) { @Override public boolean consume(byte[] rawDemand) { try { - return responseToDemand(getProtocol().deserializeDemand(rawDemand)); + D demand = getProtocol().deserializeDemand(rawDemand); + return responseToDemand(demand); } catch (Exception e) { throw new RuntimeException("Failed to supply for demand: " + BytesUtils.toString(rawDemand), e); } @@ -162,7 +163,7 @@ private class PubChannelProxy extends BaseSubscriber { private final Supplier channelSupplier = Suppliers.memoize(this::buildPubChannel); PubChannelProxy(D demand) { - this.demand = demand; + this.demand = Objects.requireNonNull(demand, "demand"); } @Override @@ -171,7 +172,7 @@ public void hookOnNext(@NonNull S supply) { byte[] rawResponse = getProtocol().serializeSupply(supply); this.channelSupplier.get().publish(rawResponse); } catch (Exception e) { - logger.error("Failed to send", e); + logger.warn("Failed to send", e); } } diff --git a/collector/src/main/java/com/navercorp/pinpoint/collector/receiver/grpc/service/AgentService.java b/collector/src/main/java/com/navercorp/pinpoint/collector/receiver/grpc/service/AgentService.java index 9741fba91a07..4ebd65028527 100644 --- a/collector/src/main/java/com/navercorp/pinpoint/collector/receiver/grpc/service/AgentService.java +++ b/collector/src/main/java/com/navercorp/pinpoint/collector/receiver/grpc/service/AgentService.java @@ -52,8 +52,9 @@ */ public class AgentService extends AgentGrpc.AgentImplBase { private static final AtomicLong idAllocator = new AtomicLong(); + private final Logger logger = LogManager.getLogger(this.getClass()); - private final boolean isDebug = logger.isDebugEnabled(); + private final SimpleRequestHandlerAdaptor simpleRequestHandlerAdaptor; private final PingEventHandler pingEventHandler; private final Executor executor; @@ -68,7 +69,7 @@ public AgentService(DispatchHandler disp @Override public void requestAgentInfo(PAgentInfo agentInfo, StreamObserver responseObserver) { - if (isDebug) { + if (logger.isDebugEnabled()) { logger.debug("Request PAgentInfo={}", MessageFormatUtils.debugLog(agentInfo)); } @@ -101,14 +102,14 @@ public StreamObserver pingSession(final StreamObserver response) { public void onNext(PPing ping) { if (first.compareAndSet(false, true)) { // Only first - if (isDebug) { + if (logger.isDebugEnabled()) { thLogger.debug("PingSession:{} start:{}", id, MessageFormatUtils.debugLog(ping)); } AgentService.this.pingEventHandler.connect(); } else { AgentService.this.pingEventHandler.ping(); } - if (isDebug) { + if (logger.isDebugEnabled()) { thLogger.debug("PingSession:{} onNext:{}", id, MessageFormatUtils.debugLog(ping)); } if (responseObserver.isReady()) { @@ -136,7 +137,7 @@ public void onError(Throwable t) { @Override public void onCompleted() { - if (isDebug) { + if (logger.isDebugEnabled()) { thLogger.debug("PingSession:{} onCompleted()", id); } responseObserver.onCompleted(); diff --git a/log/log-web/src/main/java/com/navercorp/pinpoint/log/web/websocket/LogWebSocketHandler.java b/log/log-web/src/main/java/com/navercorp/pinpoint/log/web/websocket/LogWebSocketHandler.java index e8bcd71ca2a7..5357626f6253 100644 --- a/log/log-web/src/main/java/com/navercorp/pinpoint/log/web/websocket/LogWebSocketHandler.java +++ b/log/log-web/src/main/java/com/navercorp/pinpoint/log/web/websocket/LogWebSocketHandler.java @@ -128,8 +128,15 @@ private void sendSupply(WebSocketSession session, List pile) { } } - @Override public void start() {} - @Override public void stop() {} + @Override + public void start() { + logger.info("Started"); + } + + @Override + public void stop() { + logger.info("Stopped"); + } @Override public String getRequestMapping() { diff --git a/realtime/realtime-collector/src/main/java/com/navercorp/pinpoint/realtime/collector/receiver/grpc/GrpcCommandService.java b/realtime/realtime-collector/src/main/java/com/navercorp/pinpoint/realtime/collector/receiver/grpc/GrpcCommandService.java index 97a4bc43bca4..846e527234e7 100644 --- a/realtime/realtime-collector/src/main/java/com/navercorp/pinpoint/realtime/collector/receiver/grpc/GrpcCommandService.java +++ b/realtime/realtime-collector/src/main/java/com/navercorp/pinpoint/realtime/collector/receiver/grpc/GrpcCommandService.java @@ -286,6 +286,8 @@ public void commandActiveThreadLightDump(PCmdActiveThreadLightDumpRes response, @Override public StreamObserver commandStreamActiveThreadCount(StreamObserver responseObserver) { + logger.debug("commandStreamActiveThreadCount started"); + ServerCallStreamObserver serverResponseObserver = (ServerCallStreamObserver) responseObserver; return new ActiveThreadCountResponseStreamObserver(serverResponseObserver, this.activeThreadCountSinkRepo) { @Override diff --git a/realtime/realtime-web/src/main/java/com/navercorp/pinpoint/web/realtime/activethread/count/service/ActiveThreadCountServiceImpl.java b/realtime/realtime-web/src/main/java/com/navercorp/pinpoint/web/realtime/activethread/count/service/ActiveThreadCountServiceImpl.java index 89fdc57c2b55..6e992fd42019 100644 --- a/realtime/realtime-web/src/main/java/com/navercorp/pinpoint/web/realtime/activethread/count/service/ActiveThreadCountServiceImpl.java +++ b/realtime/realtime-web/src/main/java/com/navercorp/pinpoint/web/realtime/activethread/count/service/ActiveThreadCountServiceImpl.java @@ -78,9 +78,8 @@ public Flux getResponses(String applicationName) { Disposable updateDisposable = this.scheduler.schedulePeriodically(() -> { getAgents(taskDecorator, applicationName).subscribe(agents -> { for (ClusterKey agent : agents) { - Disposable disposable = this.atcDao.getSupplies(agent).subscribe(supply -> { - collector.add(supply); - }); + Flux supplies = this.atcDao.getSupplies(agent); + Disposable disposable = supplies.subscribe(collector::add); Disposable prev = disposableMap.put(agent, disposable); if (prev != null) { Mono.delay(Duration.ofSeconds(3), this.scheduler).subscribe(t -> prev.dispose()); @@ -95,7 +94,7 @@ public Flux getResponses(String applicationName) { .doFinally(e -> { updateDisposable.dispose(); Mono.delay(Duration.ofMillis(500)).subscribe(t -> { - for (Disposable d: disposableMap.values()) { + for (Disposable d : disposableMap.values()) { d.dispose(); } }); @@ -107,7 +106,7 @@ private static ClusterKey extractKey(ATCSupply supply) { } private Mono> getAgents(TimerTaskDecorator taskDecorator, String applicationName) { - return Mono.>create(sink -> { + return Mono.create(sink -> { taskDecorator.decorate(new TimerTask() { @Override public void run() { @@ -161,7 +160,7 @@ public ActiveThreadCountResponse compose(Long t) { long now = System.currentTimeMillis(); ActiveThreadCountResponse response = new ActiveThreadCountResponse(applicationName, now); - for (ClusterKey agent: agents) { + for (ClusterKey agent : agents) { putAgent(response, agent, now); } return response; diff --git a/realtime/realtime-web/src/main/java/com/navercorp/pinpoint/web/realtime/activethread/count/websocket/RedisActiveThreadCountWebSocketHandler.java b/realtime/realtime-web/src/main/java/com/navercorp/pinpoint/web/realtime/activethread/count/websocket/RedisActiveThreadCountWebSocketHandler.java index 505dbd389886..36fb8f0c8d9d 100644 --- a/realtime/realtime-web/src/main/java/com/navercorp/pinpoint/web/realtime/activethread/count/websocket/RedisActiveThreadCountWebSocketHandler.java +++ b/realtime/realtime-web/src/main/java/com/navercorp/pinpoint/web/realtime/activethread/count/websocket/RedisActiveThreadCountWebSocketHandler.java @@ -27,6 +27,7 @@ import org.springframework.web.socket.TextMessage; import org.springframework.web.socket.WebSocketSession; import reactor.core.Disposable; +import reactor.core.publisher.Flux; import java.io.IOException; import java.util.Objects; @@ -60,6 +61,8 @@ public void afterConnectionClosed(@NonNull WebSocketSession session, @NonNull Cl } public void handleActiveThreadCount(WebSocketSession wsSession, String applicationName) { + Objects.requireNonNull(applicationName, "applicationName"); + logger.info("ATC Requested. session: {}, applicationName: {}", wsSession, applicationName); HandlerSession handlerSession = HandlerSession.get(wsSession); if (handlerSession == null) { @@ -133,7 +136,8 @@ void start(String applicationName) { private void start0(String applicationName) { try { this.applicationName = applicationName; - this.disposable = this.atcService.getResponses(applicationName).subscribe(this::sendMessage); + Flux responses = this.atcService.getResponses(applicationName); + this.disposable = responses.subscribe(this::sendMessage); } catch (Exception e) { logger.error("Failed to start atc session"); throw new RuntimeException(e); diff --git a/web/src/main/java/com/navercorp/pinpoint/web/realtime/RedisActiveThreadCountHandlerAdaptor.java b/web/src/main/java/com/navercorp/pinpoint/web/realtime/RedisActiveThreadCountHandlerAdaptor.java index 869d3c7c3707..4c8e83a2432d 100644 --- a/web/src/main/java/com/navercorp/pinpoint/web/realtime/RedisActiveThreadCountHandlerAdaptor.java +++ b/web/src/main/java/com/navercorp/pinpoint/web/realtime/RedisActiveThreadCountHandlerAdaptor.java @@ -19,6 +19,8 @@ import com.navercorp.pinpoint.web.security.ServerMapDataFilter; import com.navercorp.pinpoint.web.websocket.ActiveThreadCountHandler; import com.navercorp.pinpoint.web.websocket.message.PinpointWebSocketMessageConverter; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.springframework.web.socket.CloseStatus; import org.springframework.web.socket.WebSocketSession; @@ -29,6 +31,7 @@ * @author youngjin.kim2 */ public class RedisActiveThreadCountHandlerAdaptor extends ActiveThreadCountHandler { + private final Logger logger = LogManager.getLogger(this.getClass()); private final RedisActiveThreadCountWebSocketHandler delegate; @@ -42,16 +45,25 @@ public RedisActiveThreadCountHandlerAdaptor( this.delegate = Objects.requireNonNull(delegate, "delegate"); } - @Override public void start() {} - @Override public void stop() {} + @Override + public void start() { + logger.info("Started"); + } + + @Override + public void stop() { + logger.info("Stopped"); + } @Override public void afterConnectionEstablished(@Nonnull WebSocketSession session) { + logger.debug("Connection established: {}", session); this.delegate.afterConnectionEstablished(session); } @Override public void afterConnectionClosed(@Nonnull WebSocketSession session, @Nonnull CloseStatus status) { + logger.debug("Connection closed: {}, {}", session, status); this.delegate.afterConnectionClosed(session, status); } diff --git a/web/src/main/java/com/navercorp/pinpoint/web/websocket/ActiveThreadCountHandler.java b/web/src/main/java/com/navercorp/pinpoint/web/websocket/ActiveThreadCountHandler.java index 15ce4aedd6bf..6c3551ad365d 100644 --- a/web/src/main/java/com/navercorp/pinpoint/web/websocket/ActiveThreadCountHandler.java +++ b/web/src/main/java/com/navercorp/pinpoint/web/websocket/ActiveThreadCountHandler.java @@ -30,6 +30,7 @@ import org.springframework.web.socket.WebSocketSession; import org.springframework.web.socket.handler.TextWebSocketHandler; +import java.io.IOException; import java.util.Objects; /** @@ -96,7 +97,9 @@ private void handleRequestMessage0(WebSocketSession webSocketSession, RequestMes if (API_ACTIVE_THREAD_COUNT.equals(command)) { handleActiveThreadCount(webSocketSession, requestMessage); } else { - logger.debug("unknown command:{}", command); + logger.warn("Unknown command:{}", command); + CloseStatus status = CloseStatus.BAD_DATA.withReason("Unknown command"); + closeSession(webSocketSession, status); } } @@ -104,6 +107,9 @@ private void handleActiveThreadCount(WebSocketSession webSocketSession, RequestM final String applicationName = MapUtils.getString(requestMessage.getParameters(), APPLICATION_NAME_KEY); if (applicationName != null) { handleActiveThreadCount(webSocketSession, applicationName); + } else { + CloseStatus status = CloseStatus.BAD_DATA.withReason("applicationName not found"); + closeSession(webSocketSession, status); } } @@ -112,8 +118,8 @@ private void handleActiveThreadCount(WebSocketSession webSocketSession, RequestM private void closeSession(WebSocketSession session, CloseStatus status) { try { session.close(status); - } catch (Exception e) { - logger.warn(e.getMessage(), e); + } catch (IOException e) { + logger.warn("Failed to close session. session:{}, status:{}", session, status, e); } }