Skip to content

Commit

Permalink
[#10420] Removed PinpointNettyServerBuilder, Tracing gRPC logId
Browse files Browse the repository at this point in the history
  • Loading branch information
youngjin.kim2 committed Oct 24, 2023
1 parent 0601f71 commit cc20e5a
Show file tree
Hide file tree
Showing 28 changed files with 896 additions and 1,252 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,10 @@ public String helloworld() {
public String async() {
return "async " + testService.getHello() + " world";
}

@GetMapping(value = "/sleep")
public String sleep() throws InterruptedException {
Thread.sleep(5000);
return "sleep " + 5000 + "ms";
}
}
Original file line number Diff line number Diff line change
@@ -1,134 +1,102 @@
package com.navercorp.pinpoint.collector.controller;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.navercorp.pinpoint.collector.receiver.grpc.GrpcReceiverNames;
import com.navercorp.pinpoint.grpc.channelz.ChannelzRegistry;
import com.navercorp.pinpoint.grpc.channelz.ChannelzUtils;
import io.grpc.InternalChannelz;
import io.grpc.InternalInstrumented;
import com.navercorp.pinpoint.collector.service.ChannelzService;
import com.navercorp.pinpoint.collector.service.ChannelzService.ServerStatsWithId;
import com.navercorp.pinpoint.collector.service.ChannelzService.SocketStatsWithId;
import com.navercorp.pinpoint.collector.service.ChannelzSocketLookup;
import com.navercorp.pinpoint.collector.service.ChannelzSocketLookup.SocketEntry;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;


@RestController
@RequestMapping("/channelz")
public class ChannelzController {

private final ChannelzRegistry channelzRegistry;
private final InternalChannelz channelz = InternalChannelz.instance();
private final ObjectMapper mapper;
private final ChannelzService channelzService;
private final ChannelzSocketLookup socketLookup;

public ChannelzController(ChannelzRegistry channelzRegistry, ObjectMapper objectMapper) {
this.channelzRegistry = Objects.requireNonNull(channelzRegistry, "channelzRegistry");
this.mapper = Objects.requireNonNull(objectMapper, "objectMapper");
public ChannelzController(ChannelzService channelzService, ChannelzSocketLookup socketLookup) {
this.channelzService = Objects.requireNonNull(channelzService, "channelzService");
this.socketLookup = Objects.requireNonNull(socketLookup, "socketLookup");

Check warning on line 29 in collector/src/main/java/com/navercorp/pinpoint/collector/controller/ChannelzController.java

View check run for this annotation

Codecov / codecov/patch

collector/src/main/java/com/navercorp/pinpoint/collector/controller/ChannelzController.java#L27-L29

Added lines #L27 - L29 were not covered by tests
}

@GetMapping("/getSocket")
public String getSocket(long logId) throws JsonProcessingException {
InternalChannelz.SocketStats stats = getSocket0(logId);

return mapper.writeValueAsString(stats);
@GetMapping(value = "/sockets/{logId}")
public SocketStatsWithId findSocketStatsByLogId(@PathVariable long logId) {
return this.channelzService.getSocketStats(logId);

Check warning on line 34 in collector/src/main/java/com/navercorp/pinpoint/collector/controller/ChannelzController.java

View check run for this annotation

Codecov / codecov/patch

collector/src/main/java/com/navercorp/pinpoint/collector/controller/ChannelzController.java#L34

Added line #L34 was not covered by tests
}

@GetMapping("/html/getSocket")
public String getSocketToHtml(long logId) {
InternalChannelz.SocketStats stats = getSocket0(logId);

return new HTMLBuilder().build(stats);
@GetMapping(value = "/sockets")
public List<SocketStatsWithId> findSocketStats(
@RequestParam(required = false) String remoteAddress,
@RequestParam(required = false) Integer localPort
) {
List<Long> ids = this.socketLookup.find(remoteAddress, localPort).stream()
.map(SocketEntry::getSocketId)
.collect(Collectors.toUnmodifiableList());
return this.channelzService.getSocketStatsList(ids);

Check warning on line 45 in collector/src/main/java/com/navercorp/pinpoint/collector/controller/ChannelzController.java

View check run for this annotation

Codecov / codecov/patch

collector/src/main/java/com/navercorp/pinpoint/collector/controller/ChannelzController.java#L42-L45

Added lines #L42 - L45 were not covered by tests
}

private InternalChannelz.SocketStats getSocket0(long logId) {
InternalInstrumented<InternalChannelz.SocketStats> socket = channelz.getSocket(logId);
if (socket == null) {
return null;
}
return ChannelzUtils.getResult("Socket", socket);
@GetMapping(value = "/servers")
public List<ServerStatsWithId> getAllServerStats() {
return this.channelzService.getAllServerStats();

Check warning on line 50 in collector/src/main/java/com/navercorp/pinpoint/collector/controller/ChannelzController.java

View check run for this annotation

Codecov / codecov/patch

collector/src/main/java/com/navercorp/pinpoint/collector/controller/ChannelzController.java#L50

Added line #L50 was not covered by tests
}

@GetMapping("/findSocket")
public String findSocket(String remoteAddress, int localPort) throws JsonProcessingException {

ChannelzRegistry.AddressId addressId = ChannelzRegistry.AddressId.newAddressId(remoteAddress, localPort);
List<InternalChannelz.SocketStats> stats = findSocket(addressId);
if (stats == null) {
return notFound("remoteAddress:" + remoteAddress + " localPort:" + localPort);
}

return mapper.writeValueAsString(stats);
@GetMapping(value = "/servers", produces = MediaType.TEXT_HTML_VALUE)
public String getAllServerStatsInHtml() {
return buildHtml(this.getAllServerStats());

Check warning on line 55 in collector/src/main/java/com/navercorp/pinpoint/collector/controller/ChannelzController.java

View check run for this annotation

Codecov / codecov/patch

collector/src/main/java/com/navercorp/pinpoint/collector/controller/ChannelzController.java#L55

Added line #L55 was not covered by tests
}

@GetMapping("/html/findSocket")
public String findSocketStatToHtml(String remoteAddress, int localPort) {

ChannelzRegistry.AddressId targetAddress = ChannelzRegistry.AddressId.newAddressId(remoteAddress, localPort);

List<InternalChannelz.SocketStats> stats = findSocket(targetAddress);
if (stats.isEmpty()) {
return notFound("remoteAddress:" + remoteAddress + " localPort:" + localPort);
}
@GetMapping(value = "/servers/{name}")
public ServerStatsWithId getServerStat(@PathVariable("name") String name) {
return this.channelzService.getServerStats(name);

Check warning on line 60 in collector/src/main/java/com/navercorp/pinpoint/collector/controller/ChannelzController.java

View check run for this annotation

Codecov / codecov/patch

collector/src/main/java/com/navercorp/pinpoint/collector/controller/ChannelzController.java#L60

Added line #L60 was not covered by tests
}

return buildHtml(stats);
@GetMapping(value = "/servers/{name}", produces = MediaType.TEXT_HTML_VALUE)
public String getServerStatInHtml(@PathVariable("name") String name) {
return buildHtml(this.getServerStat(name));

Check warning on line 65 in collector/src/main/java/com/navercorp/pinpoint/collector/controller/ChannelzController.java

View check run for this annotation

Codecov / codecov/patch

collector/src/main/java/com/navercorp/pinpoint/collector/controller/ChannelzController.java#L65

Added line #L65 was not covered by tests
}


private List<InternalChannelz.SocketStats> findSocket(ChannelzRegistry.AddressId targetAddress) {
Set<Long> logIdSet = channelzRegistry.getSocketLogId(targetAddress);
@GetMapping(value = "/sockets/{logId}", produces = MediaType.TEXT_HTML_VALUE)
public String findSocketStatsByLogIdInHtml(@PathVariable long logId) {
return buildHtml(this.findSocketStatsByLogId(logId));

Check warning on line 71 in collector/src/main/java/com/navercorp/pinpoint/collector/controller/ChannelzController.java

View check run for this annotation

Codecov / codecov/patch

collector/src/main/java/com/navercorp/pinpoint/collector/controller/ChannelzController.java#L71

Added line #L71 was not covered by tests
}

List<InternalInstrumented<InternalChannelz.SocketStats>> result = new ArrayList<>();
for (Long logId : logIdSet) {
InternalInstrumented<InternalChannelz.SocketStats> socket = channelz.getSocket(logId);
if (socket != null) {
result.add(socket);
}
}
return ChannelzUtils.getResults("Socket", result);
@GetMapping(value = "/sockets", produces = MediaType.TEXT_HTML_VALUE)
public String findSocketStatInHtml(
@RequestParam(required = false) String remoteAddress,
@RequestParam(required = false) Integer localPort
) throws Exception {
return buildHtml(this.findSocketStats(remoteAddress, localPort));

Check warning on line 79 in collector/src/main/java/com/navercorp/pinpoint/collector/controller/ChannelzController.java

View check run for this annotation

Codecov / codecov/patch

collector/src/main/java/com/navercorp/pinpoint/collector/controller/ChannelzController.java#L79

Added line #L79 was not covered by tests
}

@GetMapping("/html/getServer")
public String getServerStatToHtml(String serverName) {
List<InternalChannelz.ServerStats> stats = getServer(serverName);
if (stats == null) {
return notFound("serverName=" + serverName);
private static <T> String buildHtml(List<T> stats) {
if (stats == null || stats.isEmpty()) {
return "Empty";

Check warning on line 84 in collector/src/main/java/com/navercorp/pinpoint/collector/controller/ChannelzController.java

View check run for this annotation

Codecov / codecov/patch

collector/src/main/java/com/navercorp/pinpoint/collector/controller/ChannelzController.java#L84

Added line #L84 was not covered by tests
}
return buildHtml(stats);
}

private <T> String buildHtml(List<T> stats) {
StringBuilder buffer = new StringBuilder();
for (T stat : stats) {
String html = new HTMLBuilder().build(stat);
buffer.append(html);
buffer.append(buildHtml(stat));

Check warning on line 89 in collector/src/main/java/com/navercorp/pinpoint/collector/controller/ChannelzController.java

View check run for this annotation

Codecov / codecov/patch

collector/src/main/java/com/navercorp/pinpoint/collector/controller/ChannelzController.java#L89

Added line #L89 was not covered by tests
buffer.append("<br>");
}
return buffer.toString();
}


@GetMapping("/html/getSpanReceiver")
public String getSpanReceiverl() {
return getServerStatToHtml(GrpcReceiverNames.SPAN);
}


private List<InternalChannelz.ServerStats> getServer(String serverName) {
Long logId = channelzRegistry.getServerLogId(serverName);

InternalChannelz.ServerList serverList = channelz.getServers(logId, 10000);

return ChannelzUtils.getResults("ServerStats", serverList.servers);
}


private String notFound(String target) {
return target + " not Found";
private static <T> String buildHtml(T stats) {
if (stats == null) {
return "Null";

Check warning on line 97 in collector/src/main/java/com/navercorp/pinpoint/collector/controller/ChannelzController.java

View check run for this annotation

Codecov / codecov/patch

collector/src/main/java/com/navercorp/pinpoint/collector/controller/ChannelzController.java#L97

Added line #L97 was not covered by tests
}
return new HTMLBuilder().build(stats);

Check warning on line 99 in collector/src/main/java/com/navercorp/pinpoint/collector/controller/ChannelzController.java

View check run for this annotation

Codecov / codecov/patch

collector/src/main/java/com/navercorp/pinpoint/collector/controller/ChannelzController.java#L99

Added line #L99 was not covered by tests
}


}
Original file line number Diff line number Diff line change
@@ -1,141 +1,43 @@
package com.navercorp.pinpoint.collector.receiver.grpc.channelz;

import com.navercorp.pinpoint.grpc.channelz.ChannelzRegistry;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.net.InetSocketAddress;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.BiFunction;

public class DefaultChannelzRegistry implements ChannelzRegistry {

public static final long NO_EXIST = -1L;
private final Logger logger = LogManager.getLogger(this.getClass());

private final ConcurrentMap<ChannelzRegistry.AddressId, Set<Long>> socketMap = new ConcurrentHashMap<>();
private final ConcurrentMap<ChannelzRegistry.AddressId, RemoteId> remoteAddressSocketMap = new ConcurrentHashMap<>();

private final ConcurrentMap<String, Long> serverMap = new ConcurrentHashMap<>();

@Override
public void addSocket(final long logId, InetSocketAddress remoteAddress, InetSocketAddress localAddress) {
Objects.requireNonNull(remoteAddress, "remoteAddress");
Objects.requireNonNull(localAddress, "localAddress");

final ChannelzRegistry.AddressId targetId = ChannelzRegistry.AddressId.newAddressId(localAddress, remoteAddress);

// WARNING : thread safety of Set<Long>
this.socketMap.compute(targetId, new BiFunction<AddressId, Set<Long>, Set<Long>>() {
@Override
public Set<Long> apply(AddressId addressId, Set<Long> logIdSet) {
if (logIdSet == null) {
Set<Long> newSet = newSynchronizedSet();
newSet.add(logId);
return newSet;
} else {
logIdSet.add(logId);
return logIdSet;
}
}
});

AddressId remoteAddressId = AddressId.newAddressId(remoteAddress.getHostString(), remoteAddress.getPort());
this.remoteAddressSocketMap.putIfAbsent(remoteAddressId, new RemoteId(logId, targetId));
}


private Set<Long> newSynchronizedSet() {
return Collections.synchronizedSet(new HashSet<>());
}

// for test
int getRemoteAddressSocketMapSize() {
return remoteAddressSocketMap.size();
}

// for test
int getSocketMapSize() {
return socketMap.size();
}

@Override
public Long removeSocket(InetSocketAddress remoteAddress) {
Objects.requireNonNull(remoteAddress, "remoteAddress");

AddressId remoteAddressId = AddressId.newAddressId(remoteAddress.getHostString(), remoteAddress.getPort());

final RemoteId remoteId = this.remoteAddressSocketMap.remove(remoteAddressId);
if (remoteId == null) {
return NO_EXIST;
}

final Removed removed = new Removed();
this.socketMap.compute(remoteId.targetAddress, new BiFunction<AddressId, Set<Long>, Set<Long>>() {
@Override
public Set<Long> apply(AddressId addressId, Set<Long> logIdSet) {
if (logIdSet.remove(remoteId.logId)) {
removed.remove = true;
}
if (logIdSet.isEmpty()) {
return null;
}
return logIdSet;
}
});
if (removed.remove) {
return remoteId.logId;
} else {
return -1L;
}

}

private static class Removed {
boolean remove = false;
}
private final ConcurrentMap<Long, String> serverMap = new ConcurrentHashMap<>();

@Override
public Set<Long> getSocketLogId(ChannelzRegistry.AddressId address) {
Objects.requireNonNull(address, "address");
public void register(long logId, String serverName) {
Objects.requireNonNull(serverName, "serverName");

Set<Long> logIds = this.socketMap.get(address);
if (logIds == null) {
return Collections.emptySet();
String old = this.serverMap.putIfAbsent(logId, serverName);
if (old != null) {
logger.warn("Duplicated key: {} -> {}", logId, serverName);

Check warning on line 24 in collector/src/main/java/com/navercorp/pinpoint/collector/receiver/grpc/channelz/DefaultChannelzRegistry.java

View check run for this annotation

Codecov / codecov/patch

collector/src/main/java/com/navercorp/pinpoint/collector/receiver/grpc/channelz/DefaultChannelzRegistry.java#L24

Added line #L24 was not covered by tests
}
return new HashSet<>(logIds);
}

@Override
public void addServer(long logId, String serverName) {
Objects.requireNonNull(serverName, "serverName");

final Long old = this.serverMap.putIfAbsent(serverName, logId);
if (old != null) {
// warning
logger.warn("Already exist logId:{} serverName:{}", logId, serverName);
public Long getLogId(String serverName) {
for (Entry<Long, String> entry: this.serverMap.entrySet()) {
if (entry.getValue().equals(serverName)) {
return entry.getKey();
}
}
return null;

Check warning on line 35 in collector/src/main/java/com/navercorp/pinpoint/collector/receiver/grpc/channelz/DefaultChannelzRegistry.java

View check run for this annotation

Codecov / codecov/patch

collector/src/main/java/com/navercorp/pinpoint/collector/receiver/grpc/channelz/DefaultChannelzRegistry.java#L35

Added line #L35 was not covered by tests
}

@Override
public Long getServerLogId(String serverName) {
Objects.requireNonNull(serverName, "serverName");

return this.serverMap.get(serverName);
public String getServerName(long logId) {
return this.serverMap.get(logId);
}

private static class RemoteId {
private final long logId;
private final ChannelzRegistry.AddressId targetAddress;

public RemoteId(long logId, ChannelzRegistry.AddressId targetAddress) {
this.logId = logId;
this.targetAddress = Objects.requireNonNull(targetAddress, "targetAddress");
}
}
}
Loading

0 comments on commit cc20e5a

Please sign in to comment.