forked from pinpoint-apm/pinpoint
-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[pinpoint-apm#10420] Removed PinpointNettyServerBuilder, Tracing gRPC…
… logId
- Loading branch information
Showing
28 changed files
with
896 additions
and
1,252 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
148 changes: 58 additions & 90 deletions
148
collector/src/main/java/com/navercorp/pinpoint/collector/controller/ChannelzController.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,134 +1,102 @@ | ||
package com.navercorp.pinpoint.collector.controller; | ||
|
||
import com.fasterxml.jackson.core.JsonProcessingException; | ||
import com.fasterxml.jackson.databind.ObjectMapper; | ||
import com.navercorp.pinpoint.collector.receiver.grpc.GrpcReceiverNames; | ||
import com.navercorp.pinpoint.grpc.channelz.ChannelzRegistry; | ||
import com.navercorp.pinpoint.grpc.channelz.ChannelzUtils; | ||
import io.grpc.InternalChannelz; | ||
import io.grpc.InternalInstrumented; | ||
import com.navercorp.pinpoint.collector.service.ChannelzService; | ||
import com.navercorp.pinpoint.collector.service.ChannelzService.ServerStatsWithId; | ||
import com.navercorp.pinpoint.collector.service.ChannelzService.SocketStatsWithId; | ||
import com.navercorp.pinpoint.collector.service.ChannelzSocketLookup; | ||
import com.navercorp.pinpoint.collector.service.ChannelzSocketLookup.SocketEntry; | ||
import org.springframework.http.MediaType; | ||
import org.springframework.web.bind.annotation.GetMapping; | ||
import org.springframework.web.bind.annotation.PathVariable; | ||
import org.springframework.web.bind.annotation.RequestMapping; | ||
import org.springframework.web.bind.annotation.RequestParam; | ||
import org.springframework.web.bind.annotation.RestController; | ||
|
||
import java.util.ArrayList; | ||
import java.util.List; | ||
import java.util.Objects; | ||
import java.util.Set; | ||
import java.util.stream.Collectors; | ||
|
||
|
||
@RestController | ||
@RequestMapping("/channelz") | ||
public class ChannelzController { | ||
|
||
private final ChannelzRegistry channelzRegistry; | ||
private final InternalChannelz channelz = InternalChannelz.instance(); | ||
private final ObjectMapper mapper; | ||
private final ChannelzService channelzService; | ||
private final ChannelzSocketLookup socketLookup; | ||
|
||
public ChannelzController(ChannelzRegistry channelzRegistry, ObjectMapper objectMapper) { | ||
this.channelzRegistry = Objects.requireNonNull(channelzRegistry, "channelzRegistry"); | ||
this.mapper = Objects.requireNonNull(objectMapper, "objectMapper"); | ||
public ChannelzController(ChannelzService channelzService, ChannelzSocketLookup socketLookup) { | ||
this.channelzService = Objects.requireNonNull(channelzService, "channelzService"); | ||
this.socketLookup = Objects.requireNonNull(socketLookup, "socketLookup"); | ||
} | ||
|
||
@GetMapping("/getSocket") | ||
public String getSocket(long logId) throws JsonProcessingException { | ||
InternalChannelz.SocketStats stats = getSocket0(logId); | ||
|
||
return mapper.writeValueAsString(stats); | ||
@GetMapping(value = "/sockets/{logId}") | ||
public SocketStatsWithId findSocketStatsByLogId(@PathVariable long logId) { | ||
return this.channelzService.getSocketStats(logId); | ||
} | ||
|
||
@GetMapping("/html/getSocket") | ||
public String getSocketToHtml(long logId) { | ||
InternalChannelz.SocketStats stats = getSocket0(logId); | ||
|
||
return new HTMLBuilder().build(stats); | ||
@GetMapping(value = "/sockets") | ||
public List<SocketStatsWithId> findSocketStats( | ||
@RequestParam(required = false) String remoteAddress, | ||
@RequestParam(required = false) Integer localPort | ||
) { | ||
List<Long> ids = this.socketLookup.find(remoteAddress, localPort).stream() | ||
.map(SocketEntry::getSocketId) | ||
.collect(Collectors.toUnmodifiableList()); | ||
return this.channelzService.getSocketStatsList(ids); | ||
} | ||
|
||
private InternalChannelz.SocketStats getSocket0(long logId) { | ||
InternalInstrumented<InternalChannelz.SocketStats> socket = channelz.getSocket(logId); | ||
if (socket == null) { | ||
return null; | ||
} | ||
return ChannelzUtils.getResult("Socket", socket); | ||
@GetMapping(value = "/servers") | ||
public List<ServerStatsWithId> getAllServerStats() { | ||
return this.channelzService.getAllServerStats(); | ||
} | ||
|
||
@GetMapping("/findSocket") | ||
public String findSocket(String remoteAddress, int localPort) throws JsonProcessingException { | ||
|
||
ChannelzRegistry.AddressId addressId = ChannelzRegistry.AddressId.newAddressId(remoteAddress, localPort); | ||
List<InternalChannelz.SocketStats> stats = findSocket(addressId); | ||
if (stats == null) { | ||
return notFound("remoteAddress:" + remoteAddress + " localPort:" + localPort); | ||
} | ||
|
||
return mapper.writeValueAsString(stats); | ||
@GetMapping(value = "/servers", produces = MediaType.TEXT_HTML_VALUE) | ||
public String getAllServerStatsInHtml() { | ||
return buildHtml(this.getAllServerStats()); | ||
} | ||
|
||
@GetMapping("/html/findSocket") | ||
public String findSocketStatToHtml(String remoteAddress, int localPort) { | ||
|
||
ChannelzRegistry.AddressId targetAddress = ChannelzRegistry.AddressId.newAddressId(remoteAddress, localPort); | ||
|
||
List<InternalChannelz.SocketStats> stats = findSocket(targetAddress); | ||
if (stats.isEmpty()) { | ||
return notFound("remoteAddress:" + remoteAddress + " localPort:" + localPort); | ||
} | ||
@GetMapping(value = "/servers/{name}") | ||
public ServerStatsWithId getServerStat(@PathVariable("name") String name) { | ||
return this.channelzService.getServerStats(name); | ||
} | ||
|
||
return buildHtml(stats); | ||
@GetMapping(value = "/servers/{name}", produces = MediaType.TEXT_HTML_VALUE) | ||
public String getServerStatInHtml(@PathVariable("name") String name) { | ||
return buildHtml(this.getServerStat(name)); | ||
} | ||
|
||
|
||
private List<InternalChannelz.SocketStats> findSocket(ChannelzRegistry.AddressId targetAddress) { | ||
Set<Long> logIdSet = channelzRegistry.getSocketLogId(targetAddress); | ||
@GetMapping(value = "/sockets/{logId}", produces = MediaType.TEXT_HTML_VALUE) | ||
public String findSocketStatsByLogIdInHtml(@PathVariable long logId) { | ||
return buildHtml(this.findSocketStatsByLogId(logId)); | ||
} | ||
|
||
List<InternalInstrumented<InternalChannelz.SocketStats>> result = new ArrayList<>(); | ||
for (Long logId : logIdSet) { | ||
InternalInstrumented<InternalChannelz.SocketStats> socket = channelz.getSocket(logId); | ||
if (socket != null) { | ||
result.add(socket); | ||
} | ||
} | ||
return ChannelzUtils.getResults("Socket", result); | ||
@GetMapping(value = "/sockets", produces = MediaType.TEXT_HTML_VALUE) | ||
public String findSocketStatInHtml( | ||
@RequestParam(required = false) String remoteAddress, | ||
@RequestParam(required = false) Integer localPort | ||
) throws Exception { | ||
return buildHtml(this.findSocketStats(remoteAddress, localPort)); | ||
} | ||
|
||
@GetMapping("/html/getServer") | ||
public String getServerStatToHtml(String serverName) { | ||
List<InternalChannelz.ServerStats> stats = getServer(serverName); | ||
if (stats == null) { | ||
return notFound("serverName=" + serverName); | ||
private static <T> String buildHtml(List<T> stats) { | ||
if (stats == null || stats.isEmpty()) { | ||
return "Empty"; | ||
} | ||
return buildHtml(stats); | ||
} | ||
|
||
private <T> String buildHtml(List<T> stats) { | ||
StringBuilder buffer = new StringBuilder(); | ||
for (T stat : stats) { | ||
String html = new HTMLBuilder().build(stat); | ||
buffer.append(html); | ||
buffer.append(buildHtml(stat)); | ||
buffer.append("<br>"); | ||
} | ||
return buffer.toString(); | ||
} | ||
|
||
|
||
@GetMapping("/html/getSpanReceiver") | ||
public String getSpanReceiverl() { | ||
return getServerStatToHtml(GrpcReceiverNames.SPAN); | ||
} | ||
|
||
|
||
private List<InternalChannelz.ServerStats> getServer(String serverName) { | ||
Long logId = channelzRegistry.getServerLogId(serverName); | ||
|
||
InternalChannelz.ServerList serverList = channelz.getServers(logId, 10000); | ||
|
||
return ChannelzUtils.getResults("ServerStats", serverList.servers); | ||
} | ||
|
||
|
||
private String notFound(String target) { | ||
return target + " not Found"; | ||
private static <T> String buildHtml(T stats) { | ||
if (stats == null) { | ||
return "Null"; | ||
} | ||
return new HTMLBuilder().build(stats); | ||
} | ||
|
||
|
||
} |
130 changes: 16 additions & 114 deletions
130
...java/com/navercorp/pinpoint/collector/receiver/grpc/channelz/DefaultChannelzRegistry.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,141 +1,43 @@ | ||
package com.navercorp.pinpoint.collector.receiver.grpc.channelz; | ||
|
||
import com.navercorp.pinpoint.grpc.channelz.ChannelzRegistry; | ||
import org.apache.logging.log4j.Logger; | ||
import org.apache.logging.log4j.LogManager; | ||
import org.apache.logging.log4j.Logger; | ||
|
||
import java.net.InetSocketAddress; | ||
import java.util.Collections; | ||
import java.util.HashSet; | ||
import java.util.Map.Entry; | ||
import java.util.Objects; | ||
import java.util.Set; | ||
import java.util.concurrent.ConcurrentHashMap; | ||
import java.util.concurrent.ConcurrentMap; | ||
import java.util.function.BiFunction; | ||
|
||
public class DefaultChannelzRegistry implements ChannelzRegistry { | ||
|
||
public static final long NO_EXIST = -1L; | ||
private final Logger logger = LogManager.getLogger(this.getClass()); | ||
|
||
private final ConcurrentMap<ChannelzRegistry.AddressId, Set<Long>> socketMap = new ConcurrentHashMap<>(); | ||
private final ConcurrentMap<ChannelzRegistry.AddressId, RemoteId> remoteAddressSocketMap = new ConcurrentHashMap<>(); | ||
|
||
private final ConcurrentMap<String, Long> serverMap = new ConcurrentHashMap<>(); | ||
|
||
@Override | ||
public void addSocket(final long logId, InetSocketAddress remoteAddress, InetSocketAddress localAddress) { | ||
Objects.requireNonNull(remoteAddress, "remoteAddress"); | ||
Objects.requireNonNull(localAddress, "localAddress"); | ||
|
||
final ChannelzRegistry.AddressId targetId = ChannelzRegistry.AddressId.newAddressId(localAddress, remoteAddress); | ||
|
||
// WARNING : thread safety of Set<Long> | ||
this.socketMap.compute(targetId, new BiFunction<AddressId, Set<Long>, Set<Long>>() { | ||
@Override | ||
public Set<Long> apply(AddressId addressId, Set<Long> logIdSet) { | ||
if (logIdSet == null) { | ||
Set<Long> newSet = newSynchronizedSet(); | ||
newSet.add(logId); | ||
return newSet; | ||
} else { | ||
logIdSet.add(logId); | ||
return logIdSet; | ||
} | ||
} | ||
}); | ||
|
||
AddressId remoteAddressId = AddressId.newAddressId(remoteAddress.getHostString(), remoteAddress.getPort()); | ||
this.remoteAddressSocketMap.putIfAbsent(remoteAddressId, new RemoteId(logId, targetId)); | ||
} | ||
|
||
|
||
private Set<Long> newSynchronizedSet() { | ||
return Collections.synchronizedSet(new HashSet<>()); | ||
} | ||
|
||
// for test | ||
int getRemoteAddressSocketMapSize() { | ||
return remoteAddressSocketMap.size(); | ||
} | ||
|
||
// for test | ||
int getSocketMapSize() { | ||
return socketMap.size(); | ||
} | ||
|
||
@Override | ||
public Long removeSocket(InetSocketAddress remoteAddress) { | ||
Objects.requireNonNull(remoteAddress, "remoteAddress"); | ||
|
||
AddressId remoteAddressId = AddressId.newAddressId(remoteAddress.getHostString(), remoteAddress.getPort()); | ||
|
||
final RemoteId remoteId = this.remoteAddressSocketMap.remove(remoteAddressId); | ||
if (remoteId == null) { | ||
return NO_EXIST; | ||
} | ||
|
||
final Removed removed = new Removed(); | ||
this.socketMap.compute(remoteId.targetAddress, new BiFunction<AddressId, Set<Long>, Set<Long>>() { | ||
@Override | ||
public Set<Long> apply(AddressId addressId, Set<Long> logIdSet) { | ||
if (logIdSet.remove(remoteId.logId)) { | ||
removed.remove = true; | ||
} | ||
if (logIdSet.isEmpty()) { | ||
return null; | ||
} | ||
return logIdSet; | ||
} | ||
}); | ||
if (removed.remove) { | ||
return remoteId.logId; | ||
} else { | ||
return -1L; | ||
} | ||
|
||
} | ||
|
||
private static class Removed { | ||
boolean remove = false; | ||
} | ||
private final ConcurrentMap<Long, String> serverMap = new ConcurrentHashMap<>(); | ||
|
||
@Override | ||
public Set<Long> getSocketLogId(ChannelzRegistry.AddressId address) { | ||
Objects.requireNonNull(address, "address"); | ||
public void register(long logId, String serverName) { | ||
Objects.requireNonNull(serverName, "serverName"); | ||
|
||
Set<Long> logIds = this.socketMap.get(address); | ||
if (logIds == null) { | ||
return Collections.emptySet(); | ||
String old = this.serverMap.putIfAbsent(logId, serverName); | ||
if (old != null) { | ||
logger.warn("Duplicated key: {} -> {}", logId, serverName); | ||
} | ||
return new HashSet<>(logIds); | ||
} | ||
|
||
@Override | ||
public void addServer(long logId, String serverName) { | ||
Objects.requireNonNull(serverName, "serverName"); | ||
|
||
final Long old = this.serverMap.putIfAbsent(serverName, logId); | ||
if (old != null) { | ||
// warning | ||
logger.warn("Already exist logId:{} serverName:{}", logId, serverName); | ||
public Long getLogId(String serverName) { | ||
for (Entry<Long, String> entry: this.serverMap.entrySet()) { | ||
if (entry.getValue().equals(serverName)) { | ||
return entry.getKey(); | ||
} | ||
} | ||
return null; | ||
} | ||
|
||
@Override | ||
public Long getServerLogId(String serverName) { | ||
Objects.requireNonNull(serverName, "serverName"); | ||
|
||
return this.serverMap.get(serverName); | ||
public String getServerName(long logId) { | ||
return this.serverMap.get(logId); | ||
} | ||
|
||
private static class RemoteId { | ||
private final long logId; | ||
private final ChannelzRegistry.AddressId targetAddress; | ||
|
||
public RemoteId(long logId, ChannelzRegistry.AddressId targetAddress) { | ||
this.logId = logId; | ||
this.targetAddress = Objects.requireNonNull(targetAddress, "targetAddress"); | ||
} | ||
} | ||
} |
Oops, something went wrong.