Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#noissue] Cleanup #11526

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public void onCompleted() {
logger.info("onCompleted.");
}

static StreamObserver create() {
static StreamObserver<Empty> create() {
return new EmptyStreamObserver();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@

private static final long DEFAULT_FLUSH_DELAY = 1000;

private static final Logger LOGGER = LogManager.getLogger(GrpcActiveThreadCountService.class);
private final boolean isDebug = LOGGER.isDebugEnabled();
private final Logger logger = LogManager.getLogger(getClass());
private final boolean isDebug = logger.isDebugEnabled();

private final ActiveTraceRepository activeTraceRepository;

Expand Down Expand Up @@ -85,7 +85,7 @@

@Override
public void close() throws IOException {
LOGGER.info("close");
logger.info("close");
grpcStreamService.close();
}

Expand All @@ -94,7 +94,7 @@
@Override
public void run() {
if (isDebug) {
LOGGER.debug("ActiveThreadCountTimerTask started. streamSocketList:{}", Arrays.toString(grpcStreamService.getStreamSocketList()));
logger.debug("ActiveThreadCountTimerTask started. streamSocketList:{}", Arrays.toString(grpcStreamService.getStreamSocketList()));

Check warning on line 97 in agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/receiver/grpc/GrpcActiveThreadCountService.java

View check run for this annotation

Codecov / codecov/patch

agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/receiver/grpc/GrpcActiveThreadCountService.java#L97

Added line #L97 was not covered by tests
}

PCmdActiveThreadCountRes.Builder activeThreadCountResponseBuilder = getActiveThreadCountResponse();
Expand All @@ -109,10 +109,10 @@

stream.send(activeThreadCount);
if (isDebug) {
LOGGER.debug("ActiveThreadCountStreamSocket. {}", stream);
logger.debug("ActiveThreadCountStreamSocket. {}", stream);

Check warning on line 112 in agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/receiver/grpc/GrpcActiveThreadCountService.java

View check run for this annotation

Codecov / codecov/patch

agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/receiver/grpc/GrpcActiveThreadCountService.java#L112

Added line #L112 was not covered by tests
}
} catch (Throwable e) {
LOGGER.warn("failed to execute ActiveThreadCountTimerTask.run method. streamSocket:{}, message:{}", streamSocket, e.getMessage(), e);
logger.warn("failed to execute ActiveThreadCountTimerTask.run method. streamSocket:{}, message:{}", streamSocket, e.getMessage(), e);

Check warning on line 115 in agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/receiver/grpc/GrpcActiveThreadCountService.java

View check run for this annotation

Codecov / codecov/patch

agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/receiver/grpc/GrpcActiveThreadCountService.java#L115

Added line #L115 was not covered by tests
streamSocket.close(e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@
}

private List<PActiveThreadDump> toPActiveThreadDump(Collection<ThreadDump> activeTraceInfoList) {
final List<PActiveThreadDump> result = new ArrayList<PActiveThreadDump>(activeTraceInfoList.size());
final List<PActiveThreadDump> result = new ArrayList<>(activeTraceInfoList.size());

Check warning on line 99 in agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/receiver/grpc/GrpcActiveThreadDumpService.java

View check run for this annotation

Codecov / codecov/patch

agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/receiver/grpc/GrpcActiveThreadDumpService.java#L99

Added line #L99 was not covered by tests
for (ThreadDump threadDump : activeTraceInfoList) {
PActiveThreadDump pActiveThreadLightDump = createActiveThreadDump(threadDump);
result.add(pActiveThreadLightDump);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ protected Subscription subscribe(
getProtocol()
);
Subscription subscription = supplyChannel.subscribe(subConsumer);
deferredDisposable.setDisposable(() -> subscription.unsubscribe());
deferredDisposable.setDisposable(subscription::unsubscribe);
return subscription;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,8 @@
@Override
public boolean consume(byte[] rawDemand) {
try {
return responseToDemand(getProtocol().deserializeDemand(rawDemand));
D demand = getProtocol().deserializeDemand(rawDemand);
return responseToDemand(demand);
} catch (Exception e) {
throw new RuntimeException("Failed to supply for demand: " + BytesUtils.toString(rawDemand), e);
}
Expand Down Expand Up @@ -162,7 +163,7 @@
private final Supplier<PubChannel> channelSupplier = Suppliers.memoize(this::buildPubChannel);

PubChannelProxy(D demand) {
this.demand = demand;
this.demand = Objects.requireNonNull(demand, "demand");
}

@Override
Expand All @@ -171,7 +172,7 @@
byte[] rawResponse = getProtocol().serializeSupply(supply);
this.channelSupplier.get().publish(rawResponse);
} catch (Exception e) {
logger.error("Failed to send", e);
logger.warn("Failed to send", e);

Check warning on line 175 in channel/src/main/java/com/navercorp/pinpoint/channel/service/server/ChannelServiceServerImpl.java

View check run for this annotation

Codecov / codecov/patch

channel/src/main/java/com/navercorp/pinpoint/channel/service/server/ChannelServiceServerImpl.java#L175

Added line #L175 was not covered by tests
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,9 @@
*/
public class AgentService extends AgentGrpc.AgentImplBase {
private static final AtomicLong idAllocator = new AtomicLong();

private final Logger logger = LogManager.getLogger(this.getClass());
private final boolean isDebug = logger.isDebugEnabled();

private final SimpleRequestHandlerAdaptor<GeneratedMessageV3, GeneratedMessageV3> simpleRequestHandlerAdaptor;
private final PingEventHandler pingEventHandler;
private final Executor executor;
Expand All @@ -68,7 +69,7 @@ public AgentService(DispatchHandler<GeneratedMessageV3, GeneratedMessageV3> disp

@Override
public void requestAgentInfo(PAgentInfo agentInfo, StreamObserver<PResult> responseObserver) {
if (isDebug) {
if (logger.isDebugEnabled()) {
logger.debug("Request PAgentInfo={}", MessageFormatUtils.debugLog(agentInfo));
}

Expand Down Expand Up @@ -101,14 +102,14 @@ public StreamObserver<PPing> pingSession(final StreamObserver<PPing> response) {
public void onNext(PPing ping) {
if (first.compareAndSet(false, true)) {
// Only first
if (isDebug) {
if (logger.isDebugEnabled()) {
thLogger.debug("PingSession:{} start:{}", id, MessageFormatUtils.debugLog(ping));
}
AgentService.this.pingEventHandler.connect();
} else {
AgentService.this.pingEventHandler.ping();
}
if (isDebug) {
if (logger.isDebugEnabled()) {
thLogger.debug("PingSession:{} onNext:{}", id, MessageFormatUtils.debugLog(ping));
}
if (responseObserver.isReady()) {
Expand Down Expand Up @@ -136,7 +137,7 @@ public void onError(Throwable t) {

@Override
public void onCompleted() {
if (isDebug) {
if (logger.isDebugEnabled()) {
thLogger.debug("PingSession:{} onCompleted()", id);
}
responseObserver.onCompleted();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,15 @@
}
}

@Override public void start() {}
@Override public void stop() {}
@Override
public void start() {
logger.info("Started");
}

Check warning on line 134 in log/log-web/src/main/java/com/navercorp/pinpoint/log/web/websocket/LogWebSocketHandler.java

View check run for this annotation

Codecov / codecov/patch

log/log-web/src/main/java/com/navercorp/pinpoint/log/web/websocket/LogWebSocketHandler.java#L133-L134

Added lines #L133 - L134 were not covered by tests

@Override
public void stop() {
logger.info("Stopped");
}

Check warning on line 139 in log/log-web/src/main/java/com/navercorp/pinpoint/log/web/websocket/LogWebSocketHandler.java

View check run for this annotation

Codecov / codecov/patch

log/log-web/src/main/java/com/navercorp/pinpoint/log/web/websocket/LogWebSocketHandler.java#L138-L139

Added lines #L138 - L139 were not covered by tests

@Override
public String getRequestMapping() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,8 @@ public void commandActiveThreadLightDump(PCmdActiveThreadLightDumpRes response,

@Override
public StreamObserver<PCmdActiveThreadCountRes> commandStreamActiveThreadCount(StreamObserver<Empty> responseObserver) {
logger.debug("commandStreamActiveThreadCount started");

ServerCallStreamObserver<Empty> serverResponseObserver = (ServerCallStreamObserver<Empty>) responseObserver;
return new ActiveThreadCountResponseStreamObserver(serverResponseObserver, this.activeThreadCountSinkRepo) {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,8 @@ public Flux<ActiveThreadCountResponse> getResponses(String applicationName) {
Disposable updateDisposable = this.scheduler.schedulePeriodically(() -> {
getAgents(taskDecorator, applicationName).subscribe(agents -> {
for (ClusterKey agent : agents) {
Disposable disposable = this.atcDao.getSupplies(agent).subscribe(supply -> {
collector.add(supply);
});
Flux<ATCSupply> supplies = this.atcDao.getSupplies(agent);
Disposable disposable = supplies.subscribe(collector::add);
Disposable prev = disposableMap.put(agent, disposable);
if (prev != null) {
Mono.delay(Duration.ofSeconds(3), this.scheduler).subscribe(t -> prev.dispose());
Expand All @@ -95,7 +94,7 @@ public Flux<ActiveThreadCountResponse> getResponses(String applicationName) {
.doFinally(e -> {
updateDisposable.dispose();
Mono.delay(Duration.ofMillis(500)).subscribe(t -> {
for (Disposable d: disposableMap.values()) {
for (Disposable d : disposableMap.values()) {
d.dispose();
}
});
Expand All @@ -107,7 +106,7 @@ private static ClusterKey extractKey(ATCSupply supply) {
}

private Mono<List<ClusterKey>> getAgents(TimerTaskDecorator taskDecorator, String applicationName) {
return Mono.<List<ClusterKey>>create(sink -> {
return Mono.create(sink -> {
taskDecorator.decorate(new TimerTask() {
@Override
public void run() {
Expand Down Expand Up @@ -161,7 +160,7 @@ public ActiveThreadCountResponse compose(Long t) {

long now = System.currentTimeMillis();
ActiveThreadCountResponse response = new ActiveThreadCountResponse(applicationName, now);
for (ClusterKey agent: agents) {
for (ClusterKey agent : agents) {
putAgent(response, agent, now);
}
return response;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;

import java.io.IOException;
import java.util.Objects;
Expand Down Expand Up @@ -60,6 +61,8 @@ public void afterConnectionClosed(@NonNull WebSocketSession session, @NonNull Cl
}

public void handleActiveThreadCount(WebSocketSession wsSession, String applicationName) {
Objects.requireNonNull(applicationName, "applicationName");

logger.info("ATC Requested. session: {}, applicationName: {}", wsSession, applicationName);
HandlerSession handlerSession = HandlerSession.get(wsSession);
if (handlerSession == null) {
Expand Down Expand Up @@ -133,7 +136,8 @@ void start(String applicationName) {
private void start0(String applicationName) {
try {
this.applicationName = applicationName;
this.disposable = this.atcService.getResponses(applicationName).subscribe(this::sendMessage);
Flux<ActiveThreadCountResponse> responses = this.atcService.getResponses(applicationName);
this.disposable = responses.subscribe(this::sendMessage);
} catch (Exception e) {
logger.error("Failed to start atc session");
throw new RuntimeException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import com.navercorp.pinpoint.web.security.ServerMapDataFilter;
import com.navercorp.pinpoint.web.websocket.ActiveThreadCountHandler;
import com.navercorp.pinpoint.web.websocket.message.PinpointWebSocketMessageConverter;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.WebSocketSession;

Expand All @@ -29,6 +31,7 @@
* @author youngjin.kim2
*/
public class RedisActiveThreadCountHandlerAdaptor extends ActiveThreadCountHandler {
private final Logger logger = LogManager.getLogger(this.getClass());

Check warning on line 34 in web/src/main/java/com/navercorp/pinpoint/web/realtime/RedisActiveThreadCountHandlerAdaptor.java

View check run for this annotation

Codecov / codecov/patch

web/src/main/java/com/navercorp/pinpoint/web/realtime/RedisActiveThreadCountHandlerAdaptor.java#L34

Added line #L34 was not covered by tests

private final RedisActiveThreadCountWebSocketHandler delegate;

Expand All @@ -42,16 +45,25 @@
this.delegate = Objects.requireNonNull(delegate, "delegate");
}

@Override public void start() {}
@Override public void stop() {}
@Override
public void start() {
logger.info("Started");
}

Check warning on line 51 in web/src/main/java/com/navercorp/pinpoint/web/realtime/RedisActiveThreadCountHandlerAdaptor.java

View check run for this annotation

Codecov / codecov/patch

web/src/main/java/com/navercorp/pinpoint/web/realtime/RedisActiveThreadCountHandlerAdaptor.java#L50-L51

Added lines #L50 - L51 were not covered by tests

@Override
public void stop() {
logger.info("Stopped");
}

Check warning on line 56 in web/src/main/java/com/navercorp/pinpoint/web/realtime/RedisActiveThreadCountHandlerAdaptor.java

View check run for this annotation

Codecov / codecov/patch

web/src/main/java/com/navercorp/pinpoint/web/realtime/RedisActiveThreadCountHandlerAdaptor.java#L55-L56

Added lines #L55 - L56 were not covered by tests

@Override
public void afterConnectionEstablished(@Nonnull WebSocketSession session) {
logger.debug("Connection established: {}", session);

Check warning on line 60 in web/src/main/java/com/navercorp/pinpoint/web/realtime/RedisActiveThreadCountHandlerAdaptor.java

View check run for this annotation

Codecov / codecov/patch

web/src/main/java/com/navercorp/pinpoint/web/realtime/RedisActiveThreadCountHandlerAdaptor.java#L60

Added line #L60 was not covered by tests
this.delegate.afterConnectionEstablished(session);
}

@Override
public void afterConnectionClosed(@Nonnull WebSocketSession session, @Nonnull CloseStatus status) {
logger.debug("Connection closed: {}, {}", session, status);

Check warning on line 66 in web/src/main/java/com/navercorp/pinpoint/web/realtime/RedisActiveThreadCountHandlerAdaptor.java

View check run for this annotation

Codecov / codecov/patch

web/src/main/java/com/navercorp/pinpoint/web/realtime/RedisActiveThreadCountHandlerAdaptor.java#L66

Added line #L66 was not covered by tests
this.delegate.afterConnectionClosed(session, status);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.handler.TextWebSocketHandler;

import java.io.IOException;
import java.util.Objects;

/**
Expand Down Expand Up @@ -96,14 +97,19 @@
if (API_ACTIVE_THREAD_COUNT.equals(command)) {
handleActiveThreadCount(webSocketSession, requestMessage);
} else {
logger.debug("unknown command:{}", command);
logger.warn("Unknown command:{}", command);
CloseStatus status = CloseStatus.BAD_DATA.withReason("Unknown command");
closeSession(webSocketSession, status);

Check warning on line 102 in web/src/main/java/com/navercorp/pinpoint/web/websocket/ActiveThreadCountHandler.java

View check run for this annotation

Codecov / codecov/patch

web/src/main/java/com/navercorp/pinpoint/web/websocket/ActiveThreadCountHandler.java#L100-L102

Added lines #L100 - L102 were not covered by tests
}
}

private void handleActiveThreadCount(WebSocketSession webSocketSession, RequestMessage requestMessage) {
final String applicationName = MapUtils.getString(requestMessage.getParameters(), APPLICATION_NAME_KEY);
if (applicationName != null) {
handleActiveThreadCount(webSocketSession, applicationName);
} else {
CloseStatus status = CloseStatus.BAD_DATA.withReason("applicationName not found");
closeSession(webSocketSession, status);

Check warning on line 112 in web/src/main/java/com/navercorp/pinpoint/web/websocket/ActiveThreadCountHandler.java

View check run for this annotation

Codecov / codecov/patch

web/src/main/java/com/navercorp/pinpoint/web/websocket/ActiveThreadCountHandler.java#L111-L112

Added lines #L111 - L112 were not covered by tests
}
}

Expand All @@ -112,8 +118,8 @@
private void closeSession(WebSocketSession session, CloseStatus status) {
try {
session.close(status);
} catch (Exception e) {
logger.warn(e.getMessage(), e);
} catch (IOException e) {
logger.warn("Failed to close session. session:{}, status:{}", session, status, e);

Check warning on line 122 in web/src/main/java/com/navercorp/pinpoint/web/websocket/ActiveThreadCountHandler.java

View check run for this annotation

Codecov / codecov/patch

web/src/main/java/com/navercorp/pinpoint/web/websocket/ActiveThreadCountHandler.java#L121-L122

Added lines #L121 - L122 were not covered by tests
}
}

Expand Down