Skip to content

Commit

Permalink
[#10420] Removed PinpointNettyServerBuilder, Tracing gRPC logId
Browse files Browse the repository at this point in the history
  • Loading branch information
youngjin.kim2 authored and smilu97 committed Oct 24, 2023
1 parent 2cc0627 commit 957f1ba
Show file tree
Hide file tree
Showing 28 changed files with 896 additions and 1,252 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,10 @@ public String helloworld() {
public String async() {
return "async " + testService.getHello() + " world";
}

@GetMapping(value = "/sleep")
public String sleep() throws InterruptedException {
Thread.sleep(5000);
return "sleep " + 5000 + "ms";
}
}
Original file line number Diff line number Diff line change
@@ -1,134 +1,102 @@
package com.navercorp.pinpoint.collector.controller;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.navercorp.pinpoint.collector.receiver.grpc.GrpcReceiverNames;
import com.navercorp.pinpoint.grpc.channelz.ChannelzRegistry;
import com.navercorp.pinpoint.grpc.channelz.ChannelzUtils;
import io.grpc.InternalChannelz;
import io.grpc.InternalInstrumented;
import com.navercorp.pinpoint.collector.service.ChannelzService;
import com.navercorp.pinpoint.collector.service.ChannelzService.ServerStatsWithId;
import com.navercorp.pinpoint.collector.service.ChannelzService.SocketStatsWithId;
import com.navercorp.pinpoint.collector.service.ChannelzSocketLookup;
import com.navercorp.pinpoint.collector.service.ChannelzSocketLookup.SocketEntry;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;


@RestController
@RequestMapping("/channelz")
public class ChannelzController {

private final ChannelzRegistry channelzRegistry;
private final InternalChannelz channelz = InternalChannelz.instance();
private final ObjectMapper mapper;
private final ChannelzService channelzService;
private final ChannelzSocketLookup socketLookup;

public ChannelzController(ChannelzRegistry channelzRegistry, ObjectMapper objectMapper) {
this.channelzRegistry = Objects.requireNonNull(channelzRegistry, "channelzRegistry");
this.mapper = Objects.requireNonNull(objectMapper, "objectMapper");
public ChannelzController(ChannelzService channelzService, ChannelzSocketLookup socketLookup) {
this.channelzService = Objects.requireNonNull(channelzService, "channelzService");
this.socketLookup = Objects.requireNonNull(socketLookup, "socketLookup");
}

@GetMapping("/getSocket")
public String getSocket(long logId) throws JsonProcessingException {
InternalChannelz.SocketStats stats = getSocket0(logId);

return mapper.writeValueAsString(stats);
@GetMapping(value = "/sockets/{logId}")
public SocketStatsWithId findSocketStatsByLogId(@PathVariable long logId) {
return this.channelzService.getSocketStats(logId);
}

@GetMapping("/html/getSocket")
public String getSocketToHtml(long logId) {
InternalChannelz.SocketStats stats = getSocket0(logId);

return new HTMLBuilder().build(stats);
@GetMapping(value = "/sockets")
public List<SocketStatsWithId> findSocketStats(
@RequestParam(required = false) String remoteAddress,
@RequestParam(required = false) Integer localPort
) {
List<Long> ids = this.socketLookup.find(remoteAddress, localPort).stream()
.map(SocketEntry::getSocketId)
.collect(Collectors.toUnmodifiableList());
return this.channelzService.getSocketStatsList(ids);
}

private InternalChannelz.SocketStats getSocket0(long logId) {
InternalInstrumented<InternalChannelz.SocketStats> socket = channelz.getSocket(logId);
if (socket == null) {
return null;
}
return ChannelzUtils.getResult("Socket", socket);
@GetMapping(value = "/servers")
public List<ServerStatsWithId> getAllServerStats() {
return this.channelzService.getAllServerStats();
}

@GetMapping("/findSocket")
public String findSocket(String remoteAddress, int localPort) throws JsonProcessingException {

ChannelzRegistry.AddressId addressId = ChannelzRegistry.AddressId.newAddressId(remoteAddress, localPort);
List<InternalChannelz.SocketStats> stats = findSocket(addressId);
if (stats == null) {
return notFound("remoteAddress:" + remoteAddress + " localPort:" + localPort);
}

return mapper.writeValueAsString(stats);
@GetMapping(value = "/servers", produces = MediaType.TEXT_HTML_VALUE)
public String getAllServerStatsInHtml() {
return buildHtml(this.getAllServerStats());
}

@GetMapping("/html/findSocket")
public String findSocketStatToHtml(String remoteAddress, int localPort) {

ChannelzRegistry.AddressId targetAddress = ChannelzRegistry.AddressId.newAddressId(remoteAddress, localPort);

List<InternalChannelz.SocketStats> stats = findSocket(targetAddress);
if (stats.isEmpty()) {
return notFound("remoteAddress:" + remoteAddress + " localPort:" + localPort);
}
@GetMapping(value = "/servers/{name}")
public ServerStatsWithId getServerStat(@PathVariable("name") String name) {
return this.channelzService.getServerStats(name);
}

return buildHtml(stats);
@GetMapping(value = "/servers/{name}", produces = MediaType.TEXT_HTML_VALUE)
public String getServerStatInHtml(@PathVariable("name") String name) {
return buildHtml(this.getServerStat(name));
}


private List<InternalChannelz.SocketStats> findSocket(ChannelzRegistry.AddressId targetAddress) {
Set<Long> logIdSet = channelzRegistry.getSocketLogId(targetAddress);
@GetMapping(value = "/sockets/{logId}", produces = MediaType.TEXT_HTML_VALUE)
public String findSocketStatsByLogIdInHtml(@PathVariable long logId) {
return buildHtml(this.findSocketStatsByLogId(logId));
}

List<InternalInstrumented<InternalChannelz.SocketStats>> result = new ArrayList<>();
for (Long logId : logIdSet) {
InternalInstrumented<InternalChannelz.SocketStats> socket = channelz.getSocket(logId);
if (socket != null) {
result.add(socket);
}
}
return ChannelzUtils.getResults("Socket", result);
@GetMapping(value = "/sockets", produces = MediaType.TEXT_HTML_VALUE)
public String findSocketStatInHtml(
@RequestParam(required = false) String remoteAddress,
@RequestParam(required = false) Integer localPort
) throws Exception {
return buildHtml(this.findSocketStats(remoteAddress, localPort));
}

@GetMapping("/html/getServer")
public String getServerStatToHtml(String serverName) {
List<InternalChannelz.ServerStats> stats = getServer(serverName);
if (stats == null) {
return notFound("serverName=" + serverName);
private static <T> String buildHtml(List<T> stats) {
if (stats == null || stats.isEmpty()) {
return "Empty";
}
return buildHtml(stats);
}

private <T> String buildHtml(List<T> stats) {
StringBuilder buffer = new StringBuilder();
for (T stat : stats) {
String html = new HTMLBuilder().build(stat);
buffer.append(html);
buffer.append(buildHtml(stat));
buffer.append("<br>");
}
return buffer.toString();
}


@GetMapping("/html/getSpanReceiver")
public String getSpanReceiverl() {
return getServerStatToHtml(GrpcReceiverNames.SPAN);
}


private List<InternalChannelz.ServerStats> getServer(String serverName) {
Long logId = channelzRegistry.getServerLogId(serverName);

InternalChannelz.ServerList serverList = channelz.getServers(logId, 10000);

return ChannelzUtils.getResults("ServerStats", serverList.servers);
}


private String notFound(String target) {
return target + " not Found";
private static <T> String buildHtml(T stats) {
if (stats == null) {
return "Null";
}
return new HTMLBuilder().build(stats);
}


}
Original file line number Diff line number Diff line change
@@ -1,141 +1,43 @@
package com.navercorp.pinpoint.collector.receiver.grpc.channelz;

import com.navercorp.pinpoint.grpc.channelz.ChannelzRegistry;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.net.InetSocketAddress;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.BiFunction;

public class DefaultChannelzRegistry implements ChannelzRegistry {

public static final long NO_EXIST = -1L;
private final Logger logger = LogManager.getLogger(this.getClass());

private final ConcurrentMap<ChannelzRegistry.AddressId, Set<Long>> socketMap = new ConcurrentHashMap<>();
private final ConcurrentMap<ChannelzRegistry.AddressId, RemoteId> remoteAddressSocketMap = new ConcurrentHashMap<>();

private final ConcurrentMap<String, Long> serverMap = new ConcurrentHashMap<>();

@Override
public void addSocket(final long logId, InetSocketAddress remoteAddress, InetSocketAddress localAddress) {
Objects.requireNonNull(remoteAddress, "remoteAddress");
Objects.requireNonNull(localAddress, "localAddress");

final ChannelzRegistry.AddressId targetId = ChannelzRegistry.AddressId.newAddressId(localAddress, remoteAddress);

// WARNING : thread safety of Set<Long>
this.socketMap.compute(targetId, new BiFunction<AddressId, Set<Long>, Set<Long>>() {
@Override
public Set<Long> apply(AddressId addressId, Set<Long> logIdSet) {
if (logIdSet == null) {
Set<Long> newSet = newSynchronizedSet();
newSet.add(logId);
return newSet;
} else {
logIdSet.add(logId);
return logIdSet;
}
}
});

AddressId remoteAddressId = AddressId.newAddressId(remoteAddress.getHostString(), remoteAddress.getPort());
this.remoteAddressSocketMap.putIfAbsent(remoteAddressId, new RemoteId(logId, targetId));
}


private Set<Long> newSynchronizedSet() {
return Collections.synchronizedSet(new HashSet<>());
}

// for test
int getRemoteAddressSocketMapSize() {
return remoteAddressSocketMap.size();
}

// for test
int getSocketMapSize() {
return socketMap.size();
}

@Override
public Long removeSocket(InetSocketAddress remoteAddress) {
Objects.requireNonNull(remoteAddress, "remoteAddress");

AddressId remoteAddressId = AddressId.newAddressId(remoteAddress.getHostString(), remoteAddress.getPort());

final RemoteId remoteId = this.remoteAddressSocketMap.remove(remoteAddressId);
if (remoteId == null) {
return NO_EXIST;
}

final Removed removed = new Removed();
this.socketMap.compute(remoteId.targetAddress, new BiFunction<AddressId, Set<Long>, Set<Long>>() {
@Override
public Set<Long> apply(AddressId addressId, Set<Long> logIdSet) {
if (logIdSet.remove(remoteId.logId)) {
removed.remove = true;
}
if (logIdSet.isEmpty()) {
return null;
}
return logIdSet;
}
});
if (removed.remove) {
return remoteId.logId;
} else {
return -1L;
}

}

private static class Removed {
boolean remove = false;
}
private final ConcurrentMap<Long, String> serverMap = new ConcurrentHashMap<>();

@Override
public Set<Long> getSocketLogId(ChannelzRegistry.AddressId address) {
Objects.requireNonNull(address, "address");
public void register(long logId, String serverName) {
Objects.requireNonNull(serverName, "serverName");

Set<Long> logIds = this.socketMap.get(address);
if (logIds == null) {
return Collections.emptySet();
String old = this.serverMap.putIfAbsent(logId, serverName);
if (old != null) {
logger.warn("Duplicated key: {} -> {}", logId, serverName);
}
return new HashSet<>(logIds);
}

@Override
public void addServer(long logId, String serverName) {
Objects.requireNonNull(serverName, "serverName");

final Long old = this.serverMap.putIfAbsent(serverName, logId);
if (old != null) {
// warning
logger.warn("Already exist logId:{} serverName:{}", logId, serverName);
public Long getLogId(String serverName) {
for (Entry<Long, String> entry: this.serverMap.entrySet()) {
if (entry.getValue().equals(serverName)) {
return entry.getKey();
}
}
return null;
}

@Override
public Long getServerLogId(String serverName) {
Objects.requireNonNull(serverName, "serverName");

return this.serverMap.get(serverName);
public String getServerName(long logId) {
return this.serverMap.get(logId);
}

private static class RemoteId {
private final long logId;
private final ChannelzRegistry.AddressId targetAddress;

public RemoteId(long logId, ChannelzRegistry.AddressId targetAddress) {
this.logId = logId;
this.targetAddress = Objects.requireNonNull(targetAddress, "targetAddress");
}
}
}
Loading

0 comments on commit 957f1ba

Please sign in to comment.