Skip to content

Commit

Permalink
[#noissue] Cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
emeroad committed Sep 25, 2024
1 parent da8f975 commit 6ff610e
Show file tree
Hide file tree
Showing 12 changed files with 57 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public void onCompleted() {
logger.info("onCompleted.");
}

static StreamObserver create() {
static StreamObserver<Empty> create() {
return new EmptyStreamObserver();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public class GrpcActiveThreadCountService implements ProfilerGrpcCommandService,
private static final long DEFAULT_FLUSH_DELAY = 1000;

private static final Logger LOGGER = LogManager.getLogger(GrpcActiveThreadCountService.class);
private final boolean isDebug = LOGGER.isDebugEnabled();
private static final boolean isDebug = LOGGER.isDebugEnabled();

private final ActiveTraceRepository activeTraceRepository;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ private List<PActiveThreadDump> getActiveThreadDumpList(PCmdActiveThreadDump com
}

private List<PActiveThreadDump> toPActiveThreadDump(Collection<ThreadDump> activeTraceInfoList) {
final List<PActiveThreadDump> result = new ArrayList<PActiveThreadDump>(activeTraceInfoList.size());
final List<PActiveThreadDump> result = new ArrayList<>(activeTraceInfoList.size());

Check warning on line 99 in agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/receiver/grpc/GrpcActiveThreadDumpService.java

View check run for this annotation

Codecov / codecov/patch

agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/receiver/grpc/GrpcActiveThreadDumpService.java#L99

Added line #L99 was not covered by tests
for (ThreadDump threadDump : activeTraceInfoList) {
PActiveThreadDump pActiveThreadLightDump = createActiveThreadDump(threadDump);
result.add(pActiveThreadLightDump);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ protected Subscription subscribe(
getProtocol()
);
Subscription subscription = supplyChannel.subscribe(subConsumer);
deferredDisposable.setDisposable(() -> subscription.unsubscribe());
deferredDisposable.setDisposable(subscription::unsubscribe);
return subscription;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,8 @@ public FluxDemandHandler(ChannelServiceFluxBackend<D, S> backend) {
@Override
public boolean consume(byte[] rawDemand) {
try {
return responseToDemand(getProtocol().deserializeDemand(rawDemand));
D demand = getProtocol().deserializeDemand(rawDemand);
return responseToDemand(demand);
} catch (Exception e) {
throw new RuntimeException("Failed to supply for demand: " + BytesUtils.toString(rawDemand), e);
}
Expand Down Expand Up @@ -162,7 +163,7 @@ private class PubChannelProxy extends BaseSubscriber<S> {
private final Supplier<PubChannel> channelSupplier = Suppliers.memoize(this::buildPubChannel);

PubChannelProxy(D demand) {
this.demand = demand;
this.demand = Objects.requireNonNull(demand, "demand");
}

@Override
Expand All @@ -171,7 +172,7 @@ public void hookOnNext(@NonNull S supply) {
byte[] rawResponse = getProtocol().serializeSupply(supply);
this.channelSupplier.get().publish(rawResponse);
} catch (Exception e) {
logger.error("Failed to send", e);
logger.warn("Failed to send", e);

Check warning on line 175 in channel/src/main/java/com/navercorp/pinpoint/channel/service/server/ChannelServiceServerImpl.java

View check run for this annotation

Codecov / codecov/patch

channel/src/main/java/com/navercorp/pinpoint/channel/service/server/ChannelServiceServerImpl.java#L175

Added line #L175 was not covered by tests
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,9 @@
*/
public class AgentService extends AgentGrpc.AgentImplBase {
private static final AtomicLong idAllocator = new AtomicLong();

private final Logger logger = LogManager.getLogger(this.getClass());
private final boolean isDebug = logger.isDebugEnabled();

private final SimpleRequestHandlerAdaptor<GeneratedMessageV3, GeneratedMessageV3> simpleRequestHandlerAdaptor;
private final PingEventHandler pingEventHandler;
private final Executor executor;
Expand All @@ -68,7 +69,7 @@ public AgentService(DispatchHandler<GeneratedMessageV3, GeneratedMessageV3> disp

@Override
public void requestAgentInfo(PAgentInfo agentInfo, StreamObserver<PResult> responseObserver) {
if (isDebug) {
if (logger.isDebugEnabled()) {
logger.debug("Request PAgentInfo={}", MessageFormatUtils.debugLog(agentInfo));
}

Expand Down Expand Up @@ -101,14 +102,14 @@ public StreamObserver<PPing> pingSession(final StreamObserver<PPing> response) {
public void onNext(PPing ping) {
if (first.compareAndSet(false, true)) {
// Only first
if (isDebug) {
if (logger.isDebugEnabled()) {
thLogger.debug("PingSession:{} start:{}", id, MessageFormatUtils.debugLog(ping));
}
AgentService.this.pingEventHandler.connect();
} else {
AgentService.this.pingEventHandler.ping();
}
if (isDebug) {
if (logger.isDebugEnabled()) {
thLogger.debug("PingSession:{} onNext:{}", id, MessageFormatUtils.debugLog(ping));
}
if (responseObserver.isReady()) {
Expand Down Expand Up @@ -136,7 +137,7 @@ public void onError(Throwable t) {

@Override
public void onCompleted() {
if (isDebug) {
if (logger.isDebugEnabled()) {
thLogger.debug("PingSession:{} onCompleted()", id);
}
responseObserver.onCompleted();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,15 @@ private void sendSupply(WebSocketSession session, List<LiveTailBatch> pile) {
}
}

@Override public void start() {}
@Override public void stop() {}
@Override
public void start() {
logger.info("Started");
}

Check warning on line 134 in log/log-web/src/main/java/com/navercorp/pinpoint/log/web/websocket/LogWebSocketHandler.java

View check run for this annotation

Codecov / codecov/patch

log/log-web/src/main/java/com/navercorp/pinpoint/log/web/websocket/LogWebSocketHandler.java#L133-L134

Added lines #L133 - L134 were not covered by tests

@Override
public void stop() {
logger.info("Stopped");
}

Check warning on line 139 in log/log-web/src/main/java/com/navercorp/pinpoint/log/web/websocket/LogWebSocketHandler.java

View check run for this annotation

Codecov / codecov/patch

log/log-web/src/main/java/com/navercorp/pinpoint/log/web/websocket/LogWebSocketHandler.java#L138-L139

Added lines #L138 - L139 were not covered by tests

@Override
public String getRequestMapping() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,7 @@ public void commandActiveThreadLightDump(
public StreamObserver<PCmdActiveThreadCountRes> commandStreamActiveThreadCount(
StreamObserver<Empty> responseObserver
) {
logger.info("commandStreamActiveThreadCount started");
ServerCallStreamObserver<Empty> serverResponseObserver = (ServerCallStreamObserver<Empty>) responseObserver;
return new FluxCommandResponseObserver<>(serverResponseObserver, this.activeThreadCountSinkRepo) {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,8 @@ public Flux<ActiveThreadCountResponse> getResponses(String applicationName) {
Disposable updateDisposable = this.scheduler.schedulePeriodically(() -> {
getAgents(taskDecorator, applicationName).subscribe(agents -> {
for (ClusterKey agent : agents) {
Disposable disposable = this.atcDao.getSupplies(agent).subscribe(supply -> {
collector.add(supply);
});
Flux<ATCSupply> supplies = this.atcDao.getSupplies(agent);
Disposable disposable = supplies.subscribe(collector::add);
Disposable prev = disposableMap.put(agent, disposable);
if (prev != null) {
Mono.delay(Duration.ofSeconds(3), this.scheduler).subscribe(t -> prev.dispose());
Expand All @@ -95,7 +94,7 @@ public Flux<ActiveThreadCountResponse> getResponses(String applicationName) {
.doFinally(e -> {
updateDisposable.dispose();
Mono.delay(Duration.ofMillis(500)).subscribe(t -> {
for (Disposable d: disposableMap.values()) {
for (Disposable d : disposableMap.values()) {
d.dispose();
}
});
Expand All @@ -107,7 +106,7 @@ private static ClusterKey extractKey(ATCSupply supply) {
}

private Mono<List<ClusterKey>> getAgents(TimerTaskDecorator taskDecorator, String applicationName) {
return Mono.<List<ClusterKey>>create(sink -> {
return Mono.create(sink -> {
taskDecorator.decorate(new TimerTask() {
@Override
public void run() {
Expand Down Expand Up @@ -161,7 +160,7 @@ public ActiveThreadCountResponse compose(Long t) {

long now = System.currentTimeMillis();
ActiveThreadCountResponse response = new ActiveThreadCountResponse(applicationName, now);
for (ClusterKey agent: agents) {
for (ClusterKey agent : agents) {
putAgent(response, agent, now);
}
return response;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;

import java.io.IOException;
import java.util.Objects;
Expand Down Expand Up @@ -60,6 +61,8 @@ public void afterConnectionClosed(@NonNull WebSocketSession session, @NonNull Cl
}

public void handleActiveThreadCount(WebSocketSession wsSession, String applicationName) {
Objects.requireNonNull(applicationName, "applicationName");

logger.info("ATC Requested. session: {}, applicationName: {}", wsSession, applicationName);
HandlerSession handlerSession = HandlerSession.get(wsSession);
if (handlerSession == null) {
Expand Down Expand Up @@ -133,7 +136,8 @@ void start(String applicationName) {
private void start0(String applicationName) {
try {
this.applicationName = applicationName;
this.disposable = this.atcService.getResponses(applicationName).subscribe(this::sendMessage);
Flux<ActiveThreadCountResponse> responses = this.atcService.getResponses(applicationName);
this.disposable = responses.subscribe(this::sendMessage);
} catch (Exception e) {
logger.error("Failed to start atc session");
throw new RuntimeException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import com.navercorp.pinpoint.web.security.ServerMapDataFilter;
import com.navercorp.pinpoint.web.websocket.ActiveThreadCountHandler;
import com.navercorp.pinpoint.web.websocket.message.PinpointWebSocketMessageConverter;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.WebSocketSession;

Expand All @@ -29,6 +31,7 @@
* @author youngjin.kim2
*/
public class RedisActiveThreadCountHandlerAdaptor extends ActiveThreadCountHandler {
private final Logger logger = LogManager.getLogger(this.getClass());

Check warning on line 34 in web/src/main/java/com/navercorp/pinpoint/web/realtime/RedisActiveThreadCountHandlerAdaptor.java

View check run for this annotation

Codecov / codecov/patch

web/src/main/java/com/navercorp/pinpoint/web/realtime/RedisActiveThreadCountHandlerAdaptor.java#L34

Added line #L34 was not covered by tests

private final RedisActiveThreadCountWebSocketHandler delegate;

Expand All @@ -42,16 +45,25 @@ public RedisActiveThreadCountHandlerAdaptor(
this.delegate = Objects.requireNonNull(delegate, "delegate");
}

@Override public void start() {}
@Override public void stop() {}
@Override
public void start() {
logger.info("Started");
}

Check warning on line 51 in web/src/main/java/com/navercorp/pinpoint/web/realtime/RedisActiveThreadCountHandlerAdaptor.java

View check run for this annotation

Codecov / codecov/patch

web/src/main/java/com/navercorp/pinpoint/web/realtime/RedisActiveThreadCountHandlerAdaptor.java#L50-L51

Added lines #L50 - L51 were not covered by tests

@Override
public void stop() {
logger.info("Stopped");
}

Check warning on line 56 in web/src/main/java/com/navercorp/pinpoint/web/realtime/RedisActiveThreadCountHandlerAdaptor.java

View check run for this annotation

Codecov / codecov/patch

web/src/main/java/com/navercorp/pinpoint/web/realtime/RedisActiveThreadCountHandlerAdaptor.java#L55-L56

Added lines #L55 - L56 were not covered by tests

@Override
public void afterConnectionEstablished(@Nonnull WebSocketSession session) {
logger.debug("Connection established: {}", session);

Check warning on line 60 in web/src/main/java/com/navercorp/pinpoint/web/realtime/RedisActiveThreadCountHandlerAdaptor.java

View check run for this annotation

Codecov / codecov/patch

web/src/main/java/com/navercorp/pinpoint/web/realtime/RedisActiveThreadCountHandlerAdaptor.java#L60

Added line #L60 was not covered by tests
this.delegate.afterConnectionEstablished(session);
}

@Override
public void afterConnectionClosed(@Nonnull WebSocketSession session, @Nonnull CloseStatus status) {
logger.debug("Connection closed: {}, {}", session, status);

Check warning on line 66 in web/src/main/java/com/navercorp/pinpoint/web/realtime/RedisActiveThreadCountHandlerAdaptor.java

View check run for this annotation

Codecov / codecov/patch

web/src/main/java/com/navercorp/pinpoint/web/realtime/RedisActiveThreadCountHandlerAdaptor.java#L66

Added line #L66 was not covered by tests
this.delegate.afterConnectionClosed(session, status);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.handler.TextWebSocketHandler;

import java.io.IOException;
import java.util.Objects;

/**
Expand Down Expand Up @@ -96,14 +97,19 @@ private void handleRequestMessage0(WebSocketSession webSocketSession, RequestMes
if (API_ACTIVE_THREAD_COUNT.equals(command)) {
handleActiveThreadCount(webSocketSession, requestMessage);
} else {
logger.debug("unknown command:{}", command);
logger.warn("Unknown command:{}", command);
CloseStatus status = CloseStatus.BAD_DATA.withReason("Unknown command");
closeSession(webSocketSession, status);

Check warning on line 102 in web/src/main/java/com/navercorp/pinpoint/web/websocket/ActiveThreadCountHandler.java

View check run for this annotation

Codecov / codecov/patch

web/src/main/java/com/navercorp/pinpoint/web/websocket/ActiveThreadCountHandler.java#L100-L102

Added lines #L100 - L102 were not covered by tests
}
}

private void handleActiveThreadCount(WebSocketSession webSocketSession, RequestMessage requestMessage) {
final String applicationName = MapUtils.getString(requestMessage.getParameters(), APPLICATION_NAME_KEY);
if (applicationName != null) {
handleActiveThreadCount(webSocketSession, applicationName);
} else {
CloseStatus status = CloseStatus.BAD_DATA.withReason("applicationName not found");
closeSession(webSocketSession, status);

Check warning on line 112 in web/src/main/java/com/navercorp/pinpoint/web/websocket/ActiveThreadCountHandler.java

View check run for this annotation

Codecov / codecov/patch

web/src/main/java/com/navercorp/pinpoint/web/websocket/ActiveThreadCountHandler.java#L111-L112

Added lines #L111 - L112 were not covered by tests
}
}

Expand All @@ -112,8 +118,8 @@ private void handleActiveThreadCount(WebSocketSession webSocketSession, RequestM
private void closeSession(WebSocketSession session, CloseStatus status) {
try {
session.close(status);
} catch (Exception e) {
logger.warn(e.getMessage(), e);
} catch (IOException e) {
logger.warn("Failed to close session. session:{}, status:{}", session, status, e);

Check warning on line 122 in web/src/main/java/com/navercorp/pinpoint/web/websocket/ActiveThreadCountHandler.java

View check run for this annotation

Codecov / codecov/patch

web/src/main/java/com/navercorp/pinpoint/web/websocket/ActiveThreadCountHandler.java#L121-L122

Added lines #L121 - L122 were not covered by tests
}
}

Expand Down

0 comments on commit 6ff610e

Please sign in to comment.