From ded5cfec3c3b12b1b996e189ac25b1309d0f5353 Mon Sep 17 00:00:00 2001 From: "youngjin.kim2" Date: Sun, 22 Oct 2023 15:32:14 +0900 Subject: [PATCH] [#10420] Removed PinpointNettyServerBuilder, Tracing gRPC logId --- .../controller/TestController.java | 6 + .../controller/ChannelzController.java | 145 ++-- .../collector/grpc/ssl/GrpcSslModule.java | 18 +- .../collector/receiver/grpc/GrpcReceiver.java | 12 +- .../channelz/DefaultChannelzRegistry.java | 141 ---- .../channelz/DefaultGrpcServerRegistry.java | 44 ++ .../collector/service/ChannelzService.java | 33 + .../service/ChannelzServiceImpl.java | 147 ++++ .../applicationContext-collector-grpc.xml | 9 +- .../channelz/DefaultChannelzRegistryTest.java | 70 -- .../grpc/channelz/ChannelzRegistry.java | 65 -- .../ChannelzServerTransportFilter.java | 67 -- .../pinpoint/grpc/channelz/ChannelzUtils.java | 21 +- .../grpc/channelz/GrpcServerRegistry.java | 16 + .../grpc/server/DefaultTransportMetadata.java | 10 +- .../server/MetadataServerTransportFilter.java | 1 - .../pinpoint/grpc/server/ServerFactory.java | 60 +- .../grpc/server/TransportMetadata.java | 1 - .../grpc/server/TransportMetadataFactory.java | 11 +- .../netty/EmptyServerListenerDelegator.java | 10 - .../io/grpc/netty/LogIdAttachListener.java | 44 -- .../netty/LogIdServerListenerDelegator.java | 39 - .../netty/PinpointNettyServerBuilder.java | 715 ------------------ .../grpc/netty/ServerListenerDelegator.java | 7 - .../pinpoint/grpc/ChannelFactoryTest.java | 3 +- .../receiver/grpc/GrpcCommandServiceTest.java | 2 +- 26 files changed, 357 insertions(+), 1340 deletions(-) delete mode 100644 collector/src/main/java/com/navercorp/pinpoint/collector/receiver/grpc/channelz/DefaultChannelzRegistry.java create mode 100644 collector/src/main/java/com/navercorp/pinpoint/collector/receiver/grpc/channelz/DefaultGrpcServerRegistry.java create mode 100644 collector/src/main/java/com/navercorp/pinpoint/collector/service/ChannelzService.java create mode 100644 collector/src/main/java/com/navercorp/pinpoint/collector/service/ChannelzServiceImpl.java delete mode 100644 collector/src/test/java/com/navercorp/pinpoint/collector/receiver/grpc/channelz/DefaultChannelzRegistryTest.java delete mode 100644 grpc/src/main/java/com/navercorp/pinpoint/grpc/channelz/ChannelzRegistry.java delete mode 100644 grpc/src/main/java/com/navercorp/pinpoint/grpc/channelz/ChannelzServerTransportFilter.java create mode 100644 grpc/src/main/java/com/navercorp/pinpoint/grpc/channelz/GrpcServerRegistry.java delete mode 100644 grpc/src/main/java/io/grpc/netty/EmptyServerListenerDelegator.java delete mode 100644 grpc/src/main/java/io/grpc/netty/LogIdAttachListener.java delete mode 100644 grpc/src/main/java/io/grpc/netty/LogIdServerListenerDelegator.java delete mode 100644 grpc/src/main/java/io/grpc/netty/PinpointNettyServerBuilder.java delete mode 100644 grpc/src/main/java/io/grpc/netty/ServerListenerDelegator.java diff --git a/agent-testweb/spring-boot3-testweb/src/main/java/com/pinpoint/test/springboot3/controller/TestController.java b/agent-testweb/spring-boot3-testweb/src/main/java/com/pinpoint/test/springboot3/controller/TestController.java index 534ef437eb4f4..5bceb65b8893f 100644 --- a/agent-testweb/spring-boot3-testweb/src/main/java/com/pinpoint/test/springboot3/controller/TestController.java +++ b/agent-testweb/spring-boot3-testweb/src/main/java/com/pinpoint/test/springboot3/controller/TestController.java @@ -23,4 +23,10 @@ public String helloworld() { public String async() { return "async " + testService.getHello() + " world"; } + + @GetMapping(value = "/sleep") + public String sleep() throws InterruptedException { + Thread.sleep(5000); + return "sleep " + 5000 + "ms"; + } } diff --git a/collector/src/main/java/com/navercorp/pinpoint/collector/controller/ChannelzController.java b/collector/src/main/java/com/navercorp/pinpoint/collector/controller/ChannelzController.java index 9d294bc0ca277..941c42f43810e 100644 --- a/collector/src/main/java/com/navercorp/pinpoint/collector/controller/ChannelzController.java +++ b/collector/src/main/java/com/navercorp/pinpoint/collector/controller/ChannelzController.java @@ -1,134 +1,89 @@ package com.navercorp.pinpoint.collector.controller; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.navercorp.pinpoint.collector.receiver.grpc.GrpcReceiverNames; -import com.navercorp.pinpoint.grpc.channelz.ChannelzRegistry; -import com.navercorp.pinpoint.grpc.channelz.ChannelzUtils; -import io.grpc.InternalChannelz; -import io.grpc.InternalInstrumented; -import org.springframework.web.bind.annotation.GetMapping; -import org.springframework.web.bind.annotation.RequestMapping; -import org.springframework.web.bind.annotation.RestController; - -import java.util.ArrayList; +import com.navercorp.pinpoint.collector.service.ChannelzService; +import com.navercorp.pinpoint.collector.service.ChannelzService.ServerStatsWithId; +import com.navercorp.pinpoint.collector.service.ChannelzService.SocketStatsWithId; +import org.springframework.http.MediaType; +import org.springframework.web.bind.annotation.*; + import java.util.List; import java.util.Objects; -import java.util.Set; @RestController @RequestMapping("/channelz") public class ChannelzController { - private final ChannelzRegistry channelzRegistry; - private final InternalChannelz channelz = InternalChannelz.instance(); - private final ObjectMapper mapper; + private final ChannelzService channelzService; - public ChannelzController(ChannelzRegistry channelzRegistry, ObjectMapper objectMapper) { - this.channelzRegistry = Objects.requireNonNull(channelzRegistry, "channelzRegistry"); - this.mapper = Objects.requireNonNull(objectMapper, "objectMapper"); + public ChannelzController(ChannelzService channelzService) { + this.channelzService = Objects.requireNonNull(channelzService, "channelzService"); } - @GetMapping("/getSocket") - public String getSocket(long logId) throws JsonProcessingException { - InternalChannelz.SocketStats stats = getSocket0(logId); - - return mapper.writeValueAsString(stats); + @GetMapping(value = "/sockets/{logId}") + public SocketStatsWithId findSocketStatsByLogId(@PathVariable long logId) { + return this.channelzService.getSocketStats(logId); } - @GetMapping("/html/getSocket") - public String getSocketToHtml(long logId) { - InternalChannelz.SocketStats stats = getSocket0(logId); - - return new HTMLBuilder().build(stats); + @GetMapping(value = "/sockets/{logId}", produces = MediaType.TEXT_HTML_VALUE) + public String findSocketStatsByLogIdInHtml(@PathVariable long logId) { + return buildHtml(this.findSocketStatsByLogId(logId)); } - private InternalChannelz.SocketStats getSocket0(long logId) { - InternalInstrumented socket = channelz.getSocket(logId); - if (socket == null) { - return null; - } - return ChannelzUtils.getResult("Socket", socket); + @GetMapping(value = "/sockets") + public List findSocketStats( + @RequestParam(required = false) String remoteAddress, + @RequestParam(required = false) Integer localPort + ) throws Exception { + return this.channelzService.getSocketStatsList(remoteAddress, localPort); } - @GetMapping("/findSocket") - public String findSocket(String remoteAddress, int localPort) throws JsonProcessingException { - - ChannelzRegistry.AddressId addressId = ChannelzRegistry.AddressId.newAddressId(remoteAddress, localPort); - List stats = findSocket(addressId); - if (stats == null) { - return notFound("remoteAddress:" + remoteAddress + " localPort:" + localPort); - } - - return mapper.writeValueAsString(stats); + @GetMapping(value = "/sockets", produces = MediaType.TEXT_HTML_VALUE) + public String findSocketStatInHtml( + @RequestParam(required = false) String remoteAddress, + @RequestParam(required = false) Integer localPort + ) throws Exception { + return buildHtml(this.findSocketStats(remoteAddress, localPort)); } - @GetMapping("/html/findSocket") - public String findSocketStatToHtml(String remoteAddress, int localPort) { - - ChannelzRegistry.AddressId targetAddress = ChannelzRegistry.AddressId.newAddressId(remoteAddress, localPort); - - List stats = findSocket(targetAddress); - if (stats.isEmpty()) { - return notFound("remoteAddress:" + remoteAddress + " localPort:" + localPort); - } - - return buildHtml(stats); + @GetMapping(value = "/servers") + public List getAllServerStats() { + return this.channelzService.getAllServers(); } + @GetMapping(value = "/servers", produces = MediaType.TEXT_HTML_VALUE) + public String getAllServerStatsInHtml() { + return buildHtml(this.getAllServerStats()); + } - private List findSocket(ChannelzRegistry.AddressId targetAddress) { - Set logIdSet = channelzRegistry.getSocketLogId(targetAddress); + @GetMapping(value = "/servers/{name}") + public ServerStatsWithId getServerStat(@PathVariable("name") String name) { + return this.channelzService.getServer(name); + } - List> result = new ArrayList<>(); - for (Long logId : logIdSet) { - InternalInstrumented socket = channelz.getSocket(logId); - if (socket != null) { - result.add(socket); - } - } - return ChannelzUtils.getResults("Socket", result); + @GetMapping(value = "/servers/{name}", produces = MediaType.TEXT_HTML_VALUE) + public String getServerStatInHtml(@PathVariable("name") String name) { + return buildHtml(this.getServerStat(name)); } - @GetMapping("/html/getServer") - public String getServerStatToHtml(String serverName) { - List stats = getServer(serverName); - if (stats == null) { - return notFound("serverName=" + serverName); + private static String buildHtml(List stats) { + if (stats == null || stats.isEmpty()) { + return "Empty"; } - return buildHtml(stats); - } - private String buildHtml(List stats) { StringBuilder buffer = new StringBuilder(); for (T stat : stats) { - String html = new HTMLBuilder().build(stat); - buffer.append(html); + buffer.append(buildHtml(stat)); buffer.append("
"); } return buffer.toString(); } - - @GetMapping("/html/getSpanReceiver") - public String getSpanReceiverl() { - return getServerStatToHtml(GrpcReceiverNames.SPAN); - } - - - private List getServer(String serverName) { - Long logId = channelzRegistry.getServerLogId(serverName); - - InternalChannelz.ServerList serverList = channelz.getServers(logId, 10000); - - return ChannelzUtils.getResults("ServerStats", serverList.servers); - } - - - private String notFound(String target) { - return target + " not Found"; + private static String buildHtml(T stats) { + if (stats == null) { + return "Null"; + } + return new HTMLBuilder().build(stats); } - } diff --git a/collector/src/main/java/com/navercorp/pinpoint/collector/grpc/ssl/GrpcSslModule.java b/collector/src/main/java/com/navercorp/pinpoint/collector/grpc/ssl/GrpcSslModule.java index aba01b32f87a3..1664a4ed7ca91 100644 --- a/collector/src/main/java/com/navercorp/pinpoint/collector/grpc/ssl/GrpcSslModule.java +++ b/collector/src/main/java/com/navercorp/pinpoint/collector/grpc/ssl/GrpcSslModule.java @@ -3,7 +3,7 @@ import com.navercorp.pinpoint.collector.grpc.config.GrpcReceiverProperties; import com.navercorp.pinpoint.collector.receiver.grpc.GrpcReceiver; import com.navercorp.pinpoint.common.server.util.AddressFilter; -import com.navercorp.pinpoint.grpc.channelz.ChannelzRegistry; +import com.navercorp.pinpoint.grpc.channelz.GrpcServerRegistry; import com.navercorp.pinpoint.grpc.security.SslContextFactory; import io.grpc.ServerCallExecutorSupplier; import io.grpc.ServerInterceptor; @@ -36,10 +36,10 @@ public GrpcReceiver grpcAgentSslReceiver(@Qualifier("grpcAgentSslReceiverPropert AddressFilter addressFilter, @Qualifier("agentServiceList") List serviceList, @Qualifier("agentInterceptorList")List serverInterceptorList, - ChannelzRegistry channelzRegistry, + GrpcServerRegistry grpcServerRegistry, @Qualifier("grpcAgentServerExecutor") Executor executor, @Qualifier("grpcAgentServerCallExecutorSupplier") ServerCallExecutorSupplier serverCallExecutorSupplier) throws SSLException { - GrpcReceiver receiver = createReceiver(properties, grpcReceiverProperties, addressFilter, serviceList, serverInterceptorList, channelzRegistry, executor); + GrpcReceiver receiver = createReceiver(properties, grpcReceiverProperties, addressFilter, serviceList, serverInterceptorList, grpcServerRegistry, executor); receiver.setServerCallExecutorSupplier(serverCallExecutorSupplier); return receiver; @@ -51,10 +51,10 @@ public GrpcReceiver grpcSpanSslReceiver(@Qualifier("grpcSpanSslReceiverPropertie AddressFilter addressFilter, @Qualifier("spanServiceList") List serviceList, @Qualifier("spanInterceptorList") List serverInterceptorList, - ChannelzRegistry channelzRegistry, + GrpcServerRegistry grpcServerRegistry, @Qualifier("grpcSpanServerExecutor") Executor executor, @Qualifier("serverTransportFilterList") List transportFilterList) throws SSLException { - GrpcReceiver receiver = createReceiver(properties, grpcReceiverProperties, addressFilter, serviceList, serverInterceptorList, channelzRegistry, executor); + GrpcReceiver receiver = createReceiver(properties, grpcReceiverProperties, addressFilter, serviceList, serverInterceptorList, grpcServerRegistry, executor); receiver.setTransportFilterList(transportFilterList); return receiver; } @@ -65,10 +65,10 @@ public GrpcReceiver grpcStatSslReceiver(@Qualifier("grpcStatSslReceiverPropertie AddressFilter addressFilter, @Qualifier("statServiceList") List serviceList, @Qualifier("statInterceptorList") List serverInterceptorList, - ChannelzRegistry channelzRegistry, + GrpcServerRegistry grpcServerRegistry, @Qualifier("grpcStatServerExecutor") Executor executor, @Qualifier("serverTransportFilterList") List transportFilterList) throws SSLException { - GrpcReceiver receiver = createReceiver(properties, grpcReceiverProperties, addressFilter, serviceList, serverInterceptorList, channelzRegistry, executor); + GrpcReceiver receiver = createReceiver(properties, grpcReceiverProperties, addressFilter, serviceList, serverInterceptorList, grpcServerRegistry, executor); receiver.setTransportFilterList(transportFilterList); return receiver; } @@ -78,7 +78,7 @@ private GrpcReceiver createReceiver(GrpcSslReceiverProperties properties, AddressFilter addressFilter, List serviceList, List serverInterceptorList, - ChannelzRegistry channelzRegistry, + GrpcServerRegistry grpcServerRegistry, Executor executor) throws SSLException { GrpcReceiver receiver = new GrpcReceiver(); receiver.setBindAddress(properties.getBindAddress()); @@ -90,7 +90,7 @@ private GrpcReceiver createReceiver(GrpcSslReceiverProperties properties, receiver.setAddressFilter(addressFilter); receiver.setBindableServiceList(serviceList); receiver.setServerInterceptorList(serverInterceptorList); - receiver.setChannelzRegistry(channelzRegistry); + receiver.setGrpcServerRegistry(grpcServerRegistry); SslContext sslContext = newSslContext(properties.getGrpcSslProperties()); receiver.setSslContext(sslContext); diff --git a/collector/src/main/java/com/navercorp/pinpoint/collector/receiver/grpc/GrpcReceiver.java b/collector/src/main/java/com/navercorp/pinpoint/collector/receiver/grpc/GrpcReceiver.java index e610d309ea87b..2b4f670e6f1bb 100644 --- a/collector/src/main/java/com/navercorp/pinpoint/collector/receiver/grpc/GrpcReceiver.java +++ b/collector/src/main/java/com/navercorp/pinpoint/collector/receiver/grpc/GrpcReceiver.java @@ -20,7 +20,7 @@ import com.navercorp.pinpoint.common.server.util.AddressFilter; import com.navercorp.pinpoint.common.util.Assert; import com.navercorp.pinpoint.common.util.CollectionUtils; -import com.navercorp.pinpoint.grpc.channelz.ChannelzRegistry; +import com.navercorp.pinpoint.grpc.channelz.GrpcServerRegistry; import com.navercorp.pinpoint.grpc.server.MetadataServerTransportFilter; import com.navercorp.pinpoint.grpc.server.ServerFactory; import com.navercorp.pinpoint.grpc.server.ServerOption; @@ -77,7 +77,7 @@ public class GrpcReceiver implements InitializingBean, DisposableBean, BeanNameA private SslContext sslContext; private Server server; - private ChannelzRegistry channelzRegistry; + private GrpcServerRegistry grpcServerRegistry; @Override @@ -122,8 +122,8 @@ public void afterPropertiesSet() throws Exception { this.serverFactory.addInterceptor(serverInterceptor); } } - if (channelzRegistry != null) { - this.serverFactory.setChannelzRegistry(channelzRegistry); + if (grpcServerRegistry != null) { + this.serverFactory.setChannelzRegistry(grpcServerRegistry); } // Add service @@ -276,8 +276,8 @@ public void setServerInterceptorList(List serverInterceptorLi this.serverInterceptorList = serverInterceptorList; } - public void setChannelzRegistry(ChannelzRegistry channelzRegistry) { - this.channelzRegistry = Objects.requireNonNull(channelzRegistry, "channelzRegistry"); + public void setGrpcServerRegistry(GrpcServerRegistry grpcServerRegistry) { + this.grpcServerRegistry = Objects.requireNonNull(grpcServerRegistry, "grpcServerRegistry"); } } \ No newline at end of file diff --git a/collector/src/main/java/com/navercorp/pinpoint/collector/receiver/grpc/channelz/DefaultChannelzRegistry.java b/collector/src/main/java/com/navercorp/pinpoint/collector/receiver/grpc/channelz/DefaultChannelzRegistry.java deleted file mode 100644 index 0e71af6d374b8..0000000000000 --- a/collector/src/main/java/com/navercorp/pinpoint/collector/receiver/grpc/channelz/DefaultChannelzRegistry.java +++ /dev/null @@ -1,141 +0,0 @@ -package com.navercorp.pinpoint.collector.receiver.grpc.channelz; - -import com.navercorp.pinpoint.grpc.channelz.ChannelzRegistry; -import org.apache.logging.log4j.Logger; -import org.apache.logging.log4j.LogManager; - -import java.net.InetSocketAddress; -import java.util.Collections; -import java.util.HashSet; -import java.util.Objects; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.function.BiFunction; - -public class DefaultChannelzRegistry implements ChannelzRegistry { - - public static final long NO_EXIST = -1L; - private final Logger logger = LogManager.getLogger(this.getClass()); - - private final ConcurrentMap> socketMap = new ConcurrentHashMap<>(); - private final ConcurrentMap remoteAddressSocketMap = new ConcurrentHashMap<>(); - - private final ConcurrentMap serverMap = new ConcurrentHashMap<>(); - - @Override - public void addSocket(final long logId, InetSocketAddress remoteAddress, InetSocketAddress localAddress) { - Objects.requireNonNull(remoteAddress, "remoteAddress"); - Objects.requireNonNull(localAddress, "localAddress"); - - final ChannelzRegistry.AddressId targetId = ChannelzRegistry.AddressId.newAddressId(localAddress, remoteAddress); - - // WARNING : thread safety of Set - this.socketMap.compute(targetId, new BiFunction, Set>() { - @Override - public Set apply(AddressId addressId, Set logIdSet) { - if (logIdSet == null) { - Set newSet = newSynchronizedSet(); - newSet.add(logId); - return newSet; - } else { - logIdSet.add(logId); - return logIdSet; - } - } - }); - - AddressId remoteAddressId = AddressId.newAddressId(remoteAddress.getHostString(), remoteAddress.getPort()); - this.remoteAddressSocketMap.putIfAbsent(remoteAddressId, new RemoteId(logId, targetId)); - } - - - private Set newSynchronizedSet() { - return Collections.synchronizedSet(new HashSet<>()); - } - - // for test - int getRemoteAddressSocketMapSize() { - return remoteAddressSocketMap.size(); - } - - // for test - int getSocketMapSize() { - return socketMap.size(); - } - - @Override - public Long removeSocket(InetSocketAddress remoteAddress) { - Objects.requireNonNull(remoteAddress, "remoteAddress"); - - AddressId remoteAddressId = AddressId.newAddressId(remoteAddress.getHostString(), remoteAddress.getPort()); - - final RemoteId remoteId = this.remoteAddressSocketMap.remove(remoteAddressId); - if (remoteId == null) { - return NO_EXIST; - } - - final Removed removed = new Removed(); - this.socketMap.compute(remoteId.targetAddress, new BiFunction, Set>() { - @Override - public Set apply(AddressId addressId, Set logIdSet) { - if (logIdSet.remove(remoteId.logId)) { - removed.remove = true; - } - if (logIdSet.isEmpty()) { - return null; - } - return logIdSet; - } - }); - if (removed.remove) { - return remoteId.logId; - } else { - return -1L; - } - - } - - private static class Removed { - boolean remove = false; - } - - @Override - public Set getSocketLogId(ChannelzRegistry.AddressId address) { - Objects.requireNonNull(address, "address"); - - Set logIds = this.socketMap.get(address); - if (logIds == null) { - return Collections.emptySet(); - } - return new HashSet<>(logIds); - } - - @Override - public void addServer(long logId, String serverName) { - Objects.requireNonNull(serverName, "serverName"); - - final Long old = this.serverMap.putIfAbsent(serverName, logId); - if (old != null) { - // warning - logger.warn("Already exist logId:{} serverName:{}", logId, serverName); - } - } - - @Override - public Long getServerLogId(String serverName) { - Objects.requireNonNull(serverName, "serverName"); - - return this.serverMap.get(serverName); - } - - private static class RemoteId { - private final long logId; - private final ChannelzRegistry.AddressId targetAddress; - - public RemoteId(long logId, ChannelzRegistry.AddressId targetAddress) { - this.logId = logId; - this.targetAddress = Objects.requireNonNull(targetAddress, "targetAddress"); - } - } -} diff --git a/collector/src/main/java/com/navercorp/pinpoint/collector/receiver/grpc/channelz/DefaultGrpcServerRegistry.java b/collector/src/main/java/com/navercorp/pinpoint/collector/receiver/grpc/channelz/DefaultGrpcServerRegistry.java new file mode 100644 index 0000000000000..f5f2129afa8d4 --- /dev/null +++ b/collector/src/main/java/com/navercorp/pinpoint/collector/receiver/grpc/channelz/DefaultGrpcServerRegistry.java @@ -0,0 +1,44 @@ +package com.navercorp.pinpoint.collector.receiver.grpc.channelz; + +import com.navercorp.pinpoint.grpc.channelz.GrpcServerRegistry; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.LogManager; + +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +public class DefaultGrpcServerRegistry implements GrpcServerRegistry { + + private final Logger logger = LogManager.getLogger(this.getClass()); + + private final ConcurrentMap serverMap = new ConcurrentHashMap<>(); + + @Override + public void addServer(long logId, String serverName) { + Objects.requireNonNull(serverName, "serverName"); + + Long old = this.serverMap.putIfAbsent(serverName, logId); + if (old != null) { + logger.warn("Already exist logId:{} serverName:{}", logId, serverName); + } + } + + @Override + public Long getServerLogId(String serverName) { + Objects.requireNonNull(serverName, "serverName"); + + return this.serverMap.get(serverName); + } + + @Override + public Collection getServerNames() { + return this.serverMap.keySet(); + } + + @Override + public Collection getServerLogIds() { + return this.serverMap.values(); + } + +} diff --git a/collector/src/main/java/com/navercorp/pinpoint/collector/service/ChannelzService.java b/collector/src/main/java/com/navercorp/pinpoint/collector/service/ChannelzService.java new file mode 100644 index 0000000000000..782eef857dab7 --- /dev/null +++ b/collector/src/main/java/com/navercorp/pinpoint/collector/service/ChannelzService.java @@ -0,0 +1,33 @@ +package com.navercorp.pinpoint.collector.service; + +import io.grpc.InternalChannelz.ServerStats; +import io.grpc.InternalChannelz.SocketStats; + +import javax.annotation.Nullable; +import java.util.List; + +public interface ChannelzService { + + SocketStatsWithId getSocketStats(long logId); + + List getSocketStatsList( + @Nullable String remoteAddress, + @Nullable Integer localPort + ) throws Exception; + + List getAllServers(); + + ServerStatsWithId getServer(String serverName); + + class SocketStatsWithId { + public Long id; + public SocketStats stats; + } + + class ServerStatsWithId { + public Long id; + public String name; + public ServerStats stats; + } + +} diff --git a/collector/src/main/java/com/navercorp/pinpoint/collector/service/ChannelzServiceImpl.java b/collector/src/main/java/com/navercorp/pinpoint/collector/service/ChannelzServiceImpl.java new file mode 100644 index 0000000000000..26e16ead2f9ed --- /dev/null +++ b/collector/src/main/java/com/navercorp/pinpoint/collector/service/ChannelzServiceImpl.java @@ -0,0 +1,147 @@ +package com.navercorp.pinpoint.collector.service; + +import com.google.common.base.Suppliers; +import com.navercorp.pinpoint.grpc.channelz.ChannelzUtils; +import com.navercorp.pinpoint.grpc.channelz.GrpcServerRegistry; +import io.grpc.InternalChannelz; +import io.grpc.InternalChannelz.*; +import io.grpc.InternalInstrumented; +import io.grpc.InternalWithLogId; +import org.springframework.stereotype.Service; + +import javax.annotation.Nullable; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.ExecutionException; +import java.util.function.Supplier; +import java.util.stream.Collectors; + +@Service +public class ChannelzServiceImpl implements ChannelzService { + + private final InternalChannelz channelz = InternalChannelz.instance(); + + private final Supplier> socketStatsCache = + Suppliers.synchronizedSupplier(this::findAllSockets); + + private final GrpcServerRegistry grpcServerRegistry; + + public ChannelzServiceImpl(GrpcServerRegistry grpcServerRegistry) { + this.grpcServerRegistry = Objects.requireNonNull(grpcServerRegistry, "grpcServerRegistry"); + } + + @Override + public SocketStatsWithId getSocketStats(long logId) { + InternalInstrumented socket = channelz.getSocket(logId); + return wrapSocketStats(logId, ChannelzUtils.getResult("Socket", socket)); + } + + @Override + public List getSocketStatsList( + @Nullable String remoteAddress, + @Nullable Integer localPort + ) { + String localPortStr = toString(localPort); + return this.socketStatsCache.get().stream() + .filter(stats -> filterSocketStats(stats, remoteAddress, localPortStr)) + .collect(Collectors.toUnmodifiableList()); + } + + private static String toString(Integer v) { + if (v == null) { + return null; + } + return v.toString(); + } + + private static boolean filterSocketStats(SocketStatsWithId target, String remoteAddr, String localPort) { + SocketStats stats = target.stats; + + if (remoteAddr != null) { + if (stats.remote == null || !stats.remote.toString().substring(1).startsWith(remoteAddr)) { + return false; + } + } + + if (localPort != null) { + return stats.local != null && stats.local.toString().endsWith(localPort); + } + + return true; + } + + + @Override + public List getAllServers() { + Collection serverNames = this.grpcServerRegistry.getServerNames(); + List result = new ArrayList<>(serverNames.size()); + for (String serverName: serverNames) { + Long serverId = this.grpcServerRegistry.getServerLogId(serverName); + InternalInstrumented server = this.channelz.getServer(serverId); + ServerStats serverStats = ChannelzUtils.getResult("ServerStat", server); + result.add(wrapServerStats(serverId, serverName, serverStats)); + } + return result; + } + + @Override + public ServerStatsWithId getServer(String serverName) { + Long serverId = this.grpcServerRegistry.getServerLogId(serverName); + if (serverId == null) { + return null; + } + return wrapServerStats(serverId, serverName, ChannelzUtils.getResult("ServerStat", this.channelz.getServer(serverId))); + } + + private List findAllSockets() { + try { + return findAllSockets0(); + } catch (Exception e) { + throw new RuntimeException("Failed to find all sockets"); + } + } + + private List findAllSockets0() throws InterruptedException, ExecutionException { + List result = new ArrayList<>(); + for (long serverId: this.grpcServerRegistry.getServerLogIds()) { + findServerSockets(serverId, result); + } + return result; + } + + private void findServerSockets( + long serverId, + List result + ) throws InterruptedException, ExecutionException { + InternalChannelz.ServerSocketsList serverSockets = + this.channelz.getServerSockets(serverId, 0, 10000); + if (serverSockets == null) { + return; + } + for (InternalWithLogId socketId: serverSockets.sockets) { + long id = socketId.getLogId().getId(); + InternalInstrumented socket = this.channelz.getSocket(id); + if (socket != null) { + result.add(wrapSocketStats(id, socket.getStats().get())); + } + } + } + + private static SocketStatsWithId wrapSocketStats(Long id, SocketStats stats) { + SocketStatsWithId result = new SocketStatsWithId(); + result.id = id; + result.stats = stats; + return result; + } + + private static ServerStatsWithId wrapServerStats(Long id, String name, ServerStats stats) { + ServerStatsWithId result = new ServerStatsWithId(); + result.id = id; + result.name = name; + result.stats = stats; + return result; + } + +} diff --git a/collector/src/main/resources/applicationContext-collector-grpc.xml b/collector/src/main/resources/applicationContext-collector-grpc.xml index ecca77e3b0c03..e3348c779f4a1 100644 --- a/collector/src/main/resources/applicationContext-collector-grpc.xml +++ b/collector/src/main/resources/applicationContext-collector-grpc.xml @@ -13,7 +13,7 @@ com.navercorp.pinpoint.collector.receiver.grpc"/> - + @@ -96,9 +96,6 @@ - - - @@ -148,7 +145,7 @@ - + @@ -186,7 +183,7 @@ - + diff --git a/collector/src/test/java/com/navercorp/pinpoint/collector/receiver/grpc/channelz/DefaultChannelzRegistryTest.java b/collector/src/test/java/com/navercorp/pinpoint/collector/receiver/grpc/channelz/DefaultChannelzRegistryTest.java deleted file mode 100644 index f0ed7cf194d8c..0000000000000 --- a/collector/src/test/java/com/navercorp/pinpoint/collector/receiver/grpc/channelz/DefaultChannelzRegistryTest.java +++ /dev/null @@ -1,70 +0,0 @@ -package com.navercorp.pinpoint.collector.receiver.grpc.channelz; - -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.Test; - -import java.net.InetSocketAddress; - -public class DefaultChannelzRegistryTest { - @Test - public void testAddSocket() { - InetSocketAddress remote = InetSocketAddress.createUnresolved("1.1.1.1", 80); - InetSocketAddress local = InetSocketAddress.createUnresolved("127.0.0.1", 9991); - long logId = 1; - - DefaultChannelzRegistry registry = new DefaultChannelzRegistry(); - registry.addSocket(logId, remote, local); - - Long removedLogId = registry.removeSocket(remote); - Assertions.assertEquals(logId, removedLogId.longValue()); - } - - @Test - public void testAddSocket_multiple() { - InetSocketAddress remote1 = InetSocketAddress.createUnresolved("1.1.1.1", 80); - InetSocketAddress local1 = InetSocketAddress.createUnresolved("127.0.0.1", 9991); - long logId1 = 1; - - InetSocketAddress remote2 = InetSocketAddress.createUnresolved("2.2.2.2", 90); - InetSocketAddress local2 = InetSocketAddress.createUnresolved("127.0.0.1", 19991); - long logId2 = 2; - - DefaultChannelzRegistry registry = new DefaultChannelzRegistry(); - registry.addSocket(logId1, remote1, local1); - registry.addSocket(logId2, remote2, local2); - - Long removedLogId = registry.removeSocket(remote1); - Assertions.assertEquals(logId1, removedLogId.longValue()); - - Assertions.assertEquals(-1L, registry.removeSocket(remote1).longValue()); - } - - @Test - public void testMemoryleak() { - InetSocketAddress remote1 = InetSocketAddress.createUnresolved("1.1.1.1", 80); - InetSocketAddress local1 = InetSocketAddress.createUnresolved("127.0.0.1", 9991); - long logId1 = 1; - - - DefaultChannelzRegistry registry = new DefaultChannelzRegistry(); - registry.addSocket(logId1, remote1, local1); - long removedLogId = registry.removeSocket(remote1); - - Assertions.assertEquals(logId1, removedLogId); - - Assertions.assertEquals(0, registry.getRemoteAddressSocketMapSize()); - Assertions.assertEquals(0, registry.getSocketMapSize()); - } - - @Test - public void testMemoryleak2() { - InetSocketAddress remote1 = InetSocketAddress.createUnresolved("1.1.1.1", 80); - InetSocketAddress local1 = InetSocketAddress.createUnresolved("127.0.0.1", 9991); - - DefaultChannelzRegistry registry = new DefaultChannelzRegistry(); - registry.addSocket(1, remote1, local1); - - InetSocketAddress unkonwn = InetSocketAddress.createUnresolved("2.2.2.2", 9991); - Assertions.assertEquals(-1L, registry.removeSocket(unkonwn).longValue()); - } -} \ No newline at end of file diff --git a/grpc/src/main/java/com/navercorp/pinpoint/grpc/channelz/ChannelzRegistry.java b/grpc/src/main/java/com/navercorp/pinpoint/grpc/channelz/ChannelzRegistry.java deleted file mode 100644 index c406d17700c84..0000000000000 --- a/grpc/src/main/java/com/navercorp/pinpoint/grpc/channelz/ChannelzRegistry.java +++ /dev/null @@ -1,65 +0,0 @@ -package com.navercorp.pinpoint.grpc.channelz; - - -import java.net.InetSocketAddress; -import java.util.Set; - -public interface ChannelzRegistry { - - void addSocket(long logId, InetSocketAddress remoteAddress, InetSocketAddress localAddress); - - Long removeSocket(InetSocketAddress remoteAddress); - - Set getSocketLogId(AddressId address); - - void addServer(long logId, String serverName); - - Long getServerLogId(String serverName); - - - class AddressId { - private final String address; - private final int port; - - public static AddressId newAddressId(String address, int port) { - return new AddressId(address, port); - } - - public static AddressId newAddressId(InetSocketAddress local, InetSocketAddress remote) { - return new AddressId(remote.getHostString(), local.getPort()); - } - - private AddressId(String address, int localPort) { - this.address = address; - this.port = localPort; - } - - public String getAddress() { - return address; - } - - public int getPort() { - return port; - } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - - AddressId addressId = (AddressId) o; - - if (port != addressId.port) return false; - return address != null ? address.equals(addressId.address) : addressId.address == null; - } - - @Override - public int hashCode() { - int result = port; - result = 31 * result + (address != null ? address.hashCode() : 0); - return result; - } - - } - -} diff --git a/grpc/src/main/java/com/navercorp/pinpoint/grpc/channelz/ChannelzServerTransportFilter.java b/grpc/src/main/java/com/navercorp/pinpoint/grpc/channelz/ChannelzServerTransportFilter.java deleted file mode 100644 index ad976a1b035b8..0000000000000 --- a/grpc/src/main/java/com/navercorp/pinpoint/grpc/channelz/ChannelzServerTransportFilter.java +++ /dev/null @@ -1,67 +0,0 @@ -package com.navercorp.pinpoint.grpc.channelz; - -import com.navercorp.pinpoint.grpc.server.MetadataServerTransportFilter; -import com.navercorp.pinpoint.grpc.server.SocketAddressUtils; -import com.navercorp.pinpoint.grpc.server.TransportMetadata; -import io.grpc.Attributes; -import io.grpc.ServerTransportFilter; -import org.apache.logging.log4j.Logger; -import org.apache.logging.log4j.LogManager; - -import java.net.InetSocketAddress; -import java.util.Objects; - - -public class ChannelzServerTransportFilter extends ServerTransportFilter { - - private final Logger logger = LogManager.getLogger(this.getClass()); - private final ChannelzRegistry registry; - - public ChannelzServerTransportFilter(ChannelzRegistry registry) { - this.registry = Objects.requireNonNull(registry, "registry"); - } - - @Override - public Attributes transportReady(Attributes transportAttrs) { - final TransportMetadata transportMetadata = getTransportMetadata(transportAttrs); - - final long logId = transportMetadata.getLogId(); - final InetSocketAddress remoteAddress = transportMetadata.getRemoteAddress(); - final InetSocketAddress localAddress = transportMetadata.getLocalAddress(); - - if (logger.isDebugEnabled()) { - logger.debug("Add logId:{} remoteAddress:{} localAddress:{}", - logId, - SocketAddressUtils.toString(remoteAddress), - SocketAddressUtils.toString(localAddress) - ); - } - - registry.addSocket(logId, remoteAddress, localAddress); - - return transportAttrs; - } - - - private TransportMetadata getTransportMetadata(Attributes transportAttrs) { - return transportAttrs.get(MetadataServerTransportFilter.TRANSPORT_METADATA_KEY); - } - - @Override - public void transportTerminated(Attributes transportAttrs) { - // transport is not ready - if (transportAttrs == null) { - return; - } - final TransportMetadata transportMetadata = getTransportMetadata(transportAttrs); - - final InetSocketAddress remoteAddress = transportMetadata.getRemoteAddress(); - - final Long logId = registry.removeSocket(remoteAddress); - if (logger.isDebugEnabled()) { - logger.debug("Remove logId:{} remoteAddress:{}", logId, remoteAddress); - } - } - - -} diff --git a/grpc/src/main/java/com/navercorp/pinpoint/grpc/channelz/ChannelzUtils.java b/grpc/src/main/java/com/navercorp/pinpoint/grpc/channelz/ChannelzUtils.java index 340976be831ef..6551b702423c7 100644 --- a/grpc/src/main/java/com/navercorp/pinpoint/grpc/channelz/ChannelzUtils.java +++ b/grpc/src/main/java/com/navercorp/pinpoint/grpc/channelz/ChannelzUtils.java @@ -22,17 +22,11 @@ private ChannelzUtils() { } public static T getResult(String name, InternalInstrumented instrumented) { - final ListenableFuture future = instrumented.getStats(); - try { - return future.get(timeout, TimeUnit.MILLISECONDS); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } catch (ExecutionException e) { - logger.info("ExecutionError {} {}", name, e.getMessage()); - } catch (TimeoutException e) { - logger.info("Timeout {} {}", name, e.getMessage()); + if (instrumented == null) { + return null; } - return null; + final ListenableFuture future = instrumented.getStats(); + return unwrapFuture(name, future); } public static List getResults(String name, List> instrumentedList) { @@ -43,8 +37,12 @@ public static List getResults(String name, List> listenableFutures.add(each.getStats()); } ListenableFuture> listListenableFuture = Futures.allAsList(listenableFutures); + return unwrapFuture(name, listListenableFuture); + } + + private static T unwrapFuture(String name, ListenableFuture future) { try { - return listListenableFuture.get(timeout, TimeUnit.MILLISECONDS); + return future.get(timeout, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } catch (ExecutionException e) { @@ -54,4 +52,5 @@ public static List getResults(String name, List> } return null; } + } diff --git a/grpc/src/main/java/com/navercorp/pinpoint/grpc/channelz/GrpcServerRegistry.java b/grpc/src/main/java/com/navercorp/pinpoint/grpc/channelz/GrpcServerRegistry.java new file mode 100644 index 0000000000000..9b58488202040 --- /dev/null +++ b/grpc/src/main/java/com/navercorp/pinpoint/grpc/channelz/GrpcServerRegistry.java @@ -0,0 +1,16 @@ +package com.navercorp.pinpoint.grpc.channelz; + + +import java.util.Collection; + +public interface GrpcServerRegistry { + + void addServer(long logId, String serverName); + + Long getServerLogId(String serverName); + + Collection getServerNames(); + + Collection getServerLogIds(); + +} diff --git a/grpc/src/main/java/com/navercorp/pinpoint/grpc/server/DefaultTransportMetadata.java b/grpc/src/main/java/com/navercorp/pinpoint/grpc/server/DefaultTransportMetadata.java index ba21aac8d851d..addd25cf7acbc 100644 --- a/grpc/src/main/java/com/navercorp/pinpoint/grpc/server/DefaultTransportMetadata.java +++ b/grpc/src/main/java/com/navercorp/pinpoint/grpc/server/DefaultTransportMetadata.java @@ -29,16 +29,14 @@ public class DefaultTransportMetadata implements TransportMetadata { private final InetSocketAddress localAddress; private final Long transportId; private final long connectTime; - private final Long logId; - public DefaultTransportMetadata(String debugString, InetSocketAddress remoteAddress, InetSocketAddress localAddreess, long transportId, long connectTime, Long logId) { + public DefaultTransportMetadata(String debugString, InetSocketAddress remoteAddress, InetSocketAddress localAddreess, long transportId, long connectTime) { this.debugString = Objects.requireNonNull(debugString, "debugString"); this.remoteAddress = Objects.requireNonNull(remoteAddress, "remoteAddress"); this.localAddress = Objects.requireNonNull(localAddreess, "localAddreess"); this.transportId = transportId; this.connectTime = connectTime; - this.logId = Objects.requireNonNull(logId, "logId"); } @Override @@ -61,11 +59,6 @@ public long getConnectTime() { return connectTime; } - @Override - public Long getLogId() { - return logId; - } - @Override public String toString() { return "DefaultTransportMetadata{" + @@ -74,7 +67,6 @@ public String toString() { ", localAddress=" + localAddress + ", transportId=" + transportId + ", connectTime=" + connectTime + - ", logId=" + logId + '}'; } } diff --git a/grpc/src/main/java/com/navercorp/pinpoint/grpc/server/MetadataServerTransportFilter.java b/grpc/src/main/java/com/navercorp/pinpoint/grpc/server/MetadataServerTransportFilter.java index 84c1718fe9d8a..c2d41878b3f29 100644 --- a/grpc/src/main/java/com/navercorp/pinpoint/grpc/server/MetadataServerTransportFilter.java +++ b/grpc/src/main/java/com/navercorp/pinpoint/grpc/server/MetadataServerTransportFilter.java @@ -25,7 +25,6 @@ public class MetadataServerTransportFilter extends ServerTransportFilter { - public static final Attributes.Key LOG_ID = Attributes.Key.create("logId"); public static final Attributes.Key TRANSPORT_METADATA_KEY = Attributes.Key.create("transportMetadata"); private final Logger logger = LogManager.getLogger(this.getClass()); diff --git a/grpc/src/main/java/com/navercorp/pinpoint/grpc/server/ServerFactory.java b/grpc/src/main/java/com/navercorp/pinpoint/grpc/server/ServerFactory.java index ad4465b1302cc..e19a1c3563e6a 100644 --- a/grpc/src/main/java/com/navercorp/pinpoint/grpc/server/ServerFactory.java +++ b/grpc/src/main/java/com/navercorp/pinpoint/grpc/server/ServerFactory.java @@ -19,16 +19,10 @@ import com.navercorp.pinpoint.common.profiler.concurrent.PinpointThreadFactory; import com.navercorp.pinpoint.common.util.CpuUtils; import com.navercorp.pinpoint.grpc.ExecutorUtils; -import com.navercorp.pinpoint.grpc.channelz.ChannelzRegistry; -import io.grpc.BindableService; -import io.grpc.InternalWithLogId; -import io.grpc.Server; -import io.grpc.ServerCallExecutorSupplier; -import io.grpc.ServerInterceptor; -import io.grpc.ServerServiceDefinition; -import io.grpc.ServerTransportFilter; -import io.grpc.netty.LogIdServerListenerDelegator; -import io.grpc.netty.PinpointNettyServerBuilder; +import com.navercorp.pinpoint.grpc.channelz.GrpcServerRegistry; +import io.grpc.*; +import io.grpc.internal.ServerImplBuilder; +import io.grpc.netty.NettyServerBuilder; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.ServerChannel; @@ -39,15 +33,12 @@ import org.apache.logging.log4j.Logger; import javax.net.ssl.SSLException; +import java.lang.reflect.Field; import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.List; import java.util.Objects; -import java.util.concurrent.Executor; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; /** * @author Woonduk Kang(emeroad) @@ -76,7 +67,7 @@ public class ServerFactory { private final ServerOption serverOption; private final SslContext sslContext; - private ChannelzRegistry channelzRegistry; + private GrpcServerRegistry grpcServerRegistry; public ServerFactory(String name, String hostname, int port, Executor serverExecutor, @@ -121,8 +112,8 @@ private ExecutorService newExecutor(String name) { return Executors.newCachedThreadPool(threadFactory); } - public void setChannelzRegistry(ChannelzRegistry channelzRegistry) { - this.channelzRegistry = Objects.requireNonNull(channelzRegistry, "channelzRegistry"); + public void setChannelzRegistry(GrpcServerRegistry grpcServerRegistry) { + this.grpcServerRegistry = Objects.requireNonNull(grpcServerRegistry, "channelzRegistry"); } public void addService(BindableService bindableService) { @@ -145,17 +136,17 @@ public void addInterceptor(ServerInterceptor serverInterceptor) { this.serverInterceptors.add(serverInterceptor); } - public Server build() throws SSLException { + public Server build() throws SSLException, NoSuchFieldException, IllegalAccessException { InetSocketAddress bindAddress = new InetSocketAddress(this.hostname, this.port); - PinpointNettyServerBuilder serverBuilder = PinpointNettyServerBuilder.forAddress(bindAddress); - serverBuilder.serverListenerDelegator(new LogIdServerListenerDelegator()); + NettyServerBuilder serverBuilder = NettyServerBuilder.forAddress(bindAddress); logger.info("ChannelType:{}", channelType.getSimpleName()); serverBuilder.channelType(channelType); serverBuilder.bossEventLoopGroup(bossEventLoopGroup); serverBuilder.workerEventLoopGroup(workerEventLoopGroup); - setupInternal(serverBuilder); + ServerImplBuilder serverImplBuilder = extractServerImplBuilder(serverBuilder); + setupInternal(serverImplBuilder); for (Object service : this.bindableServices) { @@ -194,25 +185,28 @@ public Server build() throws SSLException { final InternalWithLogId logId = (InternalWithLogId) server; final long serverLogId = logId.getLogId().getId(); logger.info("{} serverLogId:{}", name, serverLogId); - if (channelzRegistry != null) { - channelzRegistry.addServer(serverLogId, name); + if (grpcServerRegistry != null) { + grpcServerRegistry.addServer(serverLogId, name); } } return server; } + public static ServerImplBuilder extractServerImplBuilder(NettyServerBuilder serverBuilder) + throws NoSuchFieldException, IllegalAccessException { + Field serverImplBuilderField = NettyServerBuilder.class.getDeclaredField("serverImplBuilder"); + serverImplBuilderField.setAccessible(true); + return (ServerImplBuilder) serverImplBuilderField.get(serverBuilder); + } - private void setupInternal(PinpointNettyServerBuilder serverBuilder) { - - serverBuilder.setTracingEnabled(false); - - serverBuilder.setStatsEnabled(false); - serverBuilder.setStatsRecordRealTimeMetrics(false); - serverBuilder.setStatsRecordStartedRpcs(false); - + private void setupInternal(ServerImplBuilder builder) { + builder.setTracingEnabled(false); + builder.setStatsEnabled(false); + builder.setStatsRecordRealTimeMetrics(false); + builder.setStatsRecordStartedRpcs(false); } - private void setupServerOption(final PinpointNettyServerBuilder builder) { + private void setupServerOption(NettyServerBuilder builder) { // TODO @see PinpointServerAcceptor builder.withChildOption(ChannelOption.TCP_NODELAY, true); builder.withChildOption(ChannelOption.SO_REUSEADDR, true); diff --git a/grpc/src/main/java/com/navercorp/pinpoint/grpc/server/TransportMetadata.java b/grpc/src/main/java/com/navercorp/pinpoint/grpc/server/TransportMetadata.java index 0a7d71a3ab6c1..9b466bdf84411 100644 --- a/grpc/src/main/java/com/navercorp/pinpoint/grpc/server/TransportMetadata.java +++ b/grpc/src/main/java/com/navercorp/pinpoint/grpc/server/TransportMetadata.java @@ -28,5 +28,4 @@ public interface TransportMetadata { long getConnectTime(); - Long getLogId(); } diff --git a/grpc/src/main/java/com/navercorp/pinpoint/grpc/server/TransportMetadataFactory.java b/grpc/src/main/java/com/navercorp/pinpoint/grpc/server/TransportMetadataFactory.java index bc9da53a5cae2..1f854e738e75e 100644 --- a/grpc/src/main/java/com/navercorp/pinpoint/grpc/server/TransportMetadataFactory.java +++ b/grpc/src/main/java/com/navercorp/pinpoint/grpc/server/TransportMetadataFactory.java @@ -51,18 +51,11 @@ public TransportMetadata build(Attributes attributes) { final long transportId = idGenerator.getAndIncrement(); final long connectedTime = System.currentTimeMillis(); - final Long logId = attributes.get(MetadataServerTransportFilter.LOG_ID); - if (logId == null) { - throw Status.INTERNAL.withDescription("LogId not found").asRuntimeException(); - } - return new DefaultTransportMetadata(debugString, remoteSocketAddress, localSocketAddress, transportId, connectedTime, logId); + return new DefaultTransportMetadata(debugString, remoteSocketAddress, localSocketAddress, transportId, connectedTime); } @Override public String toString() { - final StringBuilder sb = new StringBuilder("TransportMetadataFactory{"); - sb.append("debugString='").append(debugString).append('\''); - sb.append('}'); - return sb.toString(); + return "TransportMetadataFactory{" + "debugString='" + debugString + '\'' + '}'; } } diff --git a/grpc/src/main/java/io/grpc/netty/EmptyServerListenerDelegator.java b/grpc/src/main/java/io/grpc/netty/EmptyServerListenerDelegator.java deleted file mode 100644 index 5ed0a34bc1808..0000000000000 --- a/grpc/src/main/java/io/grpc/netty/EmptyServerListenerDelegator.java +++ /dev/null @@ -1,10 +0,0 @@ -package io.grpc.netty; - -import io.grpc.internal.ServerListener; - -public class EmptyServerListenerDelegator implements ServerListenerDelegator { - @Override - public ServerListener wrapServerListener(ServerListener serverListener) { - return serverListener; - } -} diff --git a/grpc/src/main/java/io/grpc/netty/LogIdAttachListener.java b/grpc/src/main/java/io/grpc/netty/LogIdAttachListener.java deleted file mode 100644 index 7f2ad90a3eac1..0000000000000 --- a/grpc/src/main/java/io/grpc/netty/LogIdAttachListener.java +++ /dev/null @@ -1,44 +0,0 @@ -package io.grpc.netty; - -import com.navercorp.pinpoint.grpc.server.MetadataServerTransportFilter; -import io.grpc.Attributes; -import io.grpc.Metadata; -import io.grpc.internal.ServerStream; -import io.grpc.internal.ServerTransportListener; -import org.apache.logging.log4j.Logger; -import org.apache.logging.log4j.LogManager; - -import java.util.Objects; - -public class LogIdAttachListener implements ServerTransportListener { - private final Logger logger = LogManager.getLogger(this.getClass()); - - private final ServerTransportListener delegate; - private final Long logId; - - public LogIdAttachListener(ServerTransportListener delegate, Long logId) { - this.delegate = Objects.requireNonNull(delegate, "delegate"); - this.logId = Objects.requireNonNull(logId, "logId"); - } - - @Override - public void streamCreated(ServerStream stream, String method, Metadata headers) { - delegate.streamCreated(stream, method, headers); - } - - @Override - public Attributes transportReady(Attributes attributes) { - Attributes.Builder builder = attributes.toBuilder(); - builder.set(MetadataServerTransportFilter.LOG_ID, logId); - Attributes build = builder.build(); - if (logger.isDebugEnabled()) { - logger.debug("logId:{} transportReady:{} ", logId, attributes); - } - return delegate.transportReady(build); - } - - @Override - public void transportTerminated() { - delegate.transportTerminated(); - } -} diff --git a/grpc/src/main/java/io/grpc/netty/LogIdServerListenerDelegator.java b/grpc/src/main/java/io/grpc/netty/LogIdServerListenerDelegator.java deleted file mode 100644 index 770f80cd04985..0000000000000 --- a/grpc/src/main/java/io/grpc/netty/LogIdServerListenerDelegator.java +++ /dev/null @@ -1,39 +0,0 @@ -package io.grpc.netty; - -import io.grpc.InternalLogId; -import io.grpc.internal.ServerListener; -import io.grpc.internal.ServerTransport; -import io.grpc.internal.ServerTransportListener; -import org.apache.logging.log4j.Logger; -import org.apache.logging.log4j.LogManager; - -public class LogIdServerListenerDelegator implements ServerListenerDelegator { - final Logger logger = LogManager.getLogger(this.getClass()); - - @Override - public ServerListener wrapServerListener(final ServerListener serverListener) { - - logger.info("ServerListener serverListener:{}", serverListener); - - final ServerListener delegate = new ServerListener() { - @Override - public ServerTransportListener transportCreated(ServerTransport transport) { - - final InternalLogId logId = transport.getLogId(); - if (logger.isDebugEnabled()) { - logger.debug("transportCreated:{} {}", transport, logId); - } - - final ServerTransportListener serverTransportListener = serverListener.transportCreated(transport); - - return new LogIdAttachListener(serverTransportListener, logId.getId()); - } - - @Override - public void serverShutdown() { - serverListener.serverShutdown(); - } - }; - return delegate; - } -} diff --git a/grpc/src/main/java/io/grpc/netty/PinpointNettyServerBuilder.java b/grpc/src/main/java/io/grpc/netty/PinpointNettyServerBuilder.java deleted file mode 100644 index 4d9ea1c4435fa..0000000000000 --- a/grpc/src/main/java/io/grpc/netty/PinpointNettyServerBuilder.java +++ /dev/null @@ -1,715 +0,0 @@ -/* - * Copyright 2014 The gRPC Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.grpc.netty; - -import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Preconditions.checkNotNull; -import static com.google.common.base.Preconditions.checkState; -import static io.grpc.internal.GrpcUtil.DEFAULT_MAX_MESSAGE_SIZE; -import static io.grpc.internal.GrpcUtil.DEFAULT_SERVER_KEEPALIVE_TIMEOUT_NANOS; -import static io.grpc.internal.GrpcUtil.DEFAULT_SERVER_KEEPALIVE_TIME_NANOS; -import static io.grpc.internal.GrpcUtil.SERVER_KEEPALIVE_TIME_NANOS_DISABLED; - -import com.google.common.annotations.VisibleForTesting; -import com.google.errorprone.annotations.CanIgnoreReturnValue; -import io.grpc.Attributes; -import io.grpc.ExperimentalApi; -import io.grpc.Internal; -import io.grpc.ServerBuilder; -import io.grpc.ServerCredentials; -import io.grpc.ServerStreamTracer; -import io.grpc.internal.AbstractServerImplBuilder; -import io.grpc.internal.FixedObjectPool; -import io.grpc.internal.GrpcUtil; -import io.grpc.internal.InternalServer; -import io.grpc.internal.KeepAliveManager; -import io.grpc.internal.ObjectPool; -import io.grpc.internal.ServerImplBuilder; -import io.grpc.internal.ServerImplBuilder.ClientTransportServersBuilder; -import io.grpc.internal.ServerListener; -import io.grpc.internal.SharedResourcePool; -import io.grpc.internal.TransportTracer; -import io.netty.channel.ChannelFactory; -import io.netty.channel.ChannelOption; -import io.netty.channel.EventLoopGroup; -import io.netty.channel.ReflectiveChannelFactory; -import io.netty.channel.ServerChannel; -import io.netty.channel.socket.nio.NioServerSocketChannel; -import io.netty.handler.ssl.SslContext; -import java.io.File; -import java.io.IOException; -import java.io.InputStream; -import java.net.InetSocketAddress; -import java.net.SocketAddress; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.concurrent.TimeUnit; -import javax.annotation.CheckReturnValue; -import javax.net.ssl.SSLException; - -/** - * copy & modify : NettyServerBuilder - * A builder to help simplify the construction of a Netty-based GRPC server. - */ -@ExperimentalApi("https://github.com/grpc/grpc-java/issues/1784") -@CanIgnoreReturnValue -public final class PinpointNettyServerBuilder extends AbstractServerImplBuilder { - - // 1MiB - public static final int DEFAULT_FLOW_CONTROL_WINDOW = 1024 * 1024; - - static final long MAX_CONNECTION_IDLE_NANOS_DISABLED = Long.MAX_VALUE; - static final long MAX_CONNECTION_AGE_NANOS_DISABLED = Long.MAX_VALUE; - static final long MAX_CONNECTION_AGE_GRACE_NANOS_INFINITE = Long.MAX_VALUE; - - private static final long MIN_KEEPALIVE_TIME_NANO = TimeUnit.MILLISECONDS.toNanos(1L); - private static final long MIN_KEEPALIVE_TIMEOUT_NANO = TimeUnit.MICROSECONDS.toNanos(499L); - private static final long MIN_MAX_CONNECTION_IDLE_NANO = TimeUnit.SECONDS.toNanos(1L); - private static final long MIN_MAX_CONNECTION_AGE_NANO = TimeUnit.SECONDS.toNanos(1L); - private static final long AS_LARGE_AS_INFINITE = TimeUnit.DAYS.toNanos(1000L); - private static final ObjectPool DEFAULT_BOSS_EVENT_LOOP_GROUP_POOL = - SharedResourcePool.forResource(Utils.DEFAULT_BOSS_EVENT_LOOP_GROUP); - private static final ObjectPool DEFAULT_WORKER_EVENT_LOOP_GROUP_POOL = - SharedResourcePool.forResource(Utils.DEFAULT_WORKER_EVENT_LOOP_GROUP); - - private final ServerImplBuilder serverImplBuilder; - private final List listenAddresses = new ArrayList<>(); - - private TransportTracer.Factory transportTracerFactory = TransportTracer.getDefaultFactory(); - private ChannelFactory channelFactory = - Utils.DEFAULT_SERVER_CHANNEL_FACTORY; - private final Map, Object> channelOptions = new HashMap<>(); - private final Map, Object> childChannelOptions = new HashMap<>(); - private ObjectPool bossEventLoopGroupPool = - DEFAULT_BOSS_EVENT_LOOP_GROUP_POOL; - private ObjectPool workerEventLoopGroupPool = - DEFAULT_WORKER_EVENT_LOOP_GROUP_POOL; - private boolean forceHeapBuffer; - private ProtocolNegotiator.ServerFactory protocolNegotiatorFactory; - private final boolean freezeProtocolNegotiatorFactory; - private int maxConcurrentCallsPerConnection = Integer.MAX_VALUE; - private boolean autoFlowControl = true; - private int flowControlWindow = DEFAULT_FLOW_CONTROL_WINDOW; - private int maxMessageSize = DEFAULT_MAX_MESSAGE_SIZE; - private int maxHeaderListSize = GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE; - private long keepAliveTimeInNanos = DEFAULT_SERVER_KEEPALIVE_TIME_NANOS; - private long keepAliveTimeoutInNanos = DEFAULT_SERVER_KEEPALIVE_TIMEOUT_NANOS; - private long maxConnectionIdleInNanos = MAX_CONNECTION_IDLE_NANOS_DISABLED; - private long maxConnectionAgeInNanos = MAX_CONNECTION_AGE_NANOS_DISABLED; - private long maxConnectionAgeGraceInNanos = MAX_CONNECTION_AGE_GRACE_NANOS_INFINITE; - private boolean permitKeepAliveWithoutCalls; - private long permitKeepAliveTimeInNanos = TimeUnit.MINUTES.toNanos(5); - private Attributes eagAttributes = Attributes.EMPTY; - - /** - * Creates a server builder that will bind to the given port. - * - * @param port the port on which the server is to be bound. - * @return the server builder. - */ - @CheckReturnValue - public static PinpointNettyServerBuilder forPort(int port) { - return forAddress(new InetSocketAddress(port)); - } - - /** - * Creates a server builder that will bind to the given port. - * - * @param port the port on which the server is to be bound. - * @return the server builder. - */ - @CheckReturnValue - public static PinpointNettyServerBuilder forPort(int port, ServerCredentials creds) { - return forAddress(new InetSocketAddress(port), creds); - } - - /** - * Creates a server builder configured with the given {@link SocketAddress}. - * - * @param address the socket address on which the server is to be bound. - * @return the server builder - */ - @CheckReturnValue - public static PinpointNettyServerBuilder forAddress(SocketAddress address) { - return new PinpointNettyServerBuilder(address); - } - - /** - * Creates a server builder configured with the given {@link SocketAddress}. - * - * @param address the socket address on which the server is to be bound. - * @return the server builder - */ - @CheckReturnValue - public static PinpointNettyServerBuilder forAddress(SocketAddress address, ServerCredentials creds) { - ProtocolNegotiators.FromServerCredentialsResult result = ProtocolNegotiators.from(creds); - if (result.error != null) { - throw new IllegalArgumentException(result.error); - } - return new PinpointNettyServerBuilder(address, result.negotiator); - } - - private final class NettyClientTransportServersBuilder implements ClientTransportServersBuilder { - @Override - public InternalServer buildClientTransportServers( - List streamTracerFactories) { - return buildTransportServers(streamTracerFactories); - } - } - - @CheckReturnValue - private PinpointNettyServerBuilder(SocketAddress address) { - serverImplBuilder = new ServerImplBuilder(new NettyClientTransportServersBuilder()); - this.listenAddresses.add(address); - this.protocolNegotiatorFactory = ProtocolNegotiators.serverPlaintextFactory(); - this.freezeProtocolNegotiatorFactory = false; - } - - @CheckReturnValue - PinpointNettyServerBuilder( - SocketAddress address, ProtocolNegotiator.ServerFactory negotiatorFactory) { - serverImplBuilder = new ServerImplBuilder(new NettyClientTransportServersBuilder()); - this.listenAddresses.add(address); - this.protocolNegotiatorFactory = checkNotNull(negotiatorFactory, "negotiatorFactory"); - this.freezeProtocolNegotiatorFactory = true; - } - - @Internal - @Override - protected ServerBuilder delegate() { - return serverImplBuilder; - } - - /** - * Adds an additional address for this server to listen on. Callers must ensure that all socket - * addresses are compatible with the Netty channel type, and that they don't conflict with each - * other. - */ - public PinpointNettyServerBuilder addListenAddress(SocketAddress listenAddress) { - this.listenAddresses.add(checkNotNull(listenAddress, "listenAddress")); - return this; - } - - /** - * Specifies the channel type to use, by default we use {@code EpollServerSocketChannel} if - * available, otherwise using {@link NioServerSocketChannel}. - * - *

You either use this or {@link #channelFactory(io.netty.channel.ChannelFactory)} if your - * {@link ServerChannel} implementation has no no-args constructor. - * - *

It's an optional parameter. If the user has not provided an Channel type or ChannelFactory - * when the channel is built, the builder will use the default one which is static. - * - *

You must also provide corresponding {@link EventLoopGroup} using {@link - * #workerEventLoopGroup(EventLoopGroup)} and {@link #bossEventLoopGroup(EventLoopGroup)}. For - * example, {@link NioServerSocketChannel} must use {@link - * io.netty.channel.nio.NioEventLoopGroup}, otherwise your server won't start. - */ - public PinpointNettyServerBuilder channelType(Class channelType) { - checkNotNull(channelType, "channelType"); - return channelFactory(new ReflectiveChannelFactory<>(channelType)); - } - - /** - * Specifies the {@link ChannelFactory} to create {@link ServerChannel} instances. This method is - * usually only used if the specific {@code ServerChannel} requires complex logic which requires - * additional information to create the {@code ServerChannel}. Otherwise, recommend to use {@link - * #channelType(Class)}. - * - *

It's an optional parameter. If the user has not provided an Channel type or ChannelFactory - * when the channel is built, the builder will use the default one which is static. - * - *

You must also provide corresponding {@link EventLoopGroup} using {@link - * #workerEventLoopGroup(EventLoopGroup)} and {@link #bossEventLoopGroup(EventLoopGroup)}. For - * example, if the factory creates {@link NioServerSocketChannel} you must use {@link - * io.netty.channel.nio.NioEventLoopGroup}, otherwise your server won't start. - */ - public PinpointNettyServerBuilder channelFactory(ChannelFactory channelFactory) { - this.channelFactory = checkNotNull(channelFactory, "channelFactory"); - return this; - } - - /** - * Specifies a channel option. As the underlying channel as well as network implementation may - * ignore this value applications should consider it a hint. - * - * @since 1.30.0 - */ - public PinpointNettyServerBuilder withOption(ChannelOption option, T value) { - this.channelOptions.put(option, value); - return this; - } - - /** - * Specifies a child channel option. As the underlying channel as well as network implementation - * may ignore this value applications should consider it a hint. - * - * @since 1.9.0 - */ - public PinpointNettyServerBuilder withChildOption(ChannelOption option, T value) { - this.childChannelOptions.put(option, value); - return this; - } - - /** - * Provides the boss EventGroupLoop to the server. - * - *

It's an optional parameter. If the user has not provided one when the server is built, the - * builder will use the default one which is static. - * - *

You must also provide corresponding {@link io.netty.channel.Channel} type using {@link - * #channelType(Class)} and {@link #workerEventLoopGroup(EventLoopGroup)}. For example, {@link - * NioServerSocketChannel} must use {@link io.netty.channel.nio.NioEventLoopGroup} for both boss - * and worker {@link EventLoopGroup}, otherwise your server won't start. - * - *

The server won't take ownership of the given EventLoopGroup. It's caller's responsibility - * to shut it down when it's desired. - * - *

Grpc uses non-daemon {@link Thread}s by default and thus a {@link io.grpc.Server} will - * continue to run even after the main thread has terminated. However, users have to be cautious - * when providing their own {@link EventLoopGroup}s. - * For example, Netty's {@link EventLoopGroup}s use daemon threads by default - * and thus an application with only daemon threads running besides the main thread will exit as - * soon as the main thread completes. - * A simple solution to this problem is to call {@link io.grpc.Server#awaitTermination()} to - * keep the main thread alive until the server has terminated. - */ - public PinpointNettyServerBuilder bossEventLoopGroup(EventLoopGroup group) { - if (group != null) { - return bossEventLoopGroupPool(new FixedObjectPool<>(group)); - } - return bossEventLoopGroupPool(DEFAULT_BOSS_EVENT_LOOP_GROUP_POOL); - } - - PinpointNettyServerBuilder bossEventLoopGroupPool( - ObjectPool bossEventLoopGroupPool) { - this.bossEventLoopGroupPool = checkNotNull(bossEventLoopGroupPool, "bossEventLoopGroupPool"); - return this; - } - - /** - * Provides the worker EventGroupLoop to the server. - * - *

It's an optional parameter. If the user has not provided one when the server is built, the - * builder will create one. - * - *

You must also provide corresponding {@link io.netty.channel.Channel} type using {@link - * #channelType(Class)} and {@link #bossEventLoopGroup(EventLoopGroup)}. For example, {@link - * NioServerSocketChannel} must use {@link io.netty.channel.nio.NioEventLoopGroup} for both boss - * and worker {@link EventLoopGroup}, otherwise your server won't start. - * - *

The server won't take ownership of the given EventLoopGroup. It's caller's responsibility - * to shut it down when it's desired. - * - *

Grpc uses non-daemon {@link Thread}s by default and thus a {@link io.grpc.Server} will - * continue to run even after the main thread has terminated. However, users have to be cautious - * when providing their own {@link EventLoopGroup}s. - * For example, Netty's {@link EventLoopGroup}s use daemon threads by default - * and thus an application with only daemon threads running besides the main thread will exit as - * soon as the main thread completes. - * A simple solution to this problem is to call {@link io.grpc.Server#awaitTermination()} to - * keep the main thread alive until the server has terminated. - */ - public PinpointNettyServerBuilder workerEventLoopGroup(EventLoopGroup group) { - if (group != null) { - return workerEventLoopGroupPool(new FixedObjectPool<>(group)); - } - return workerEventLoopGroupPool(DEFAULT_WORKER_EVENT_LOOP_GROUP_POOL); - } - - PinpointNettyServerBuilder workerEventLoopGroupPool( - ObjectPool workerEventLoopGroupPool) { - this.workerEventLoopGroupPool = - checkNotNull(workerEventLoopGroupPool, "workerEventLoopGroupPool"); - return this; - } - - /** - * Force using heap buffer when custom allocator is enabled. - */ - void setForceHeapBuffer(boolean value) { - forceHeapBuffer = value; - } - - /** - * Sets the TLS context to use for encryption. Providing a context enables encryption. It must - * have been configured with {@link GrpcSslContexts}, but options could have been overridden. - */ - public PinpointNettyServerBuilder sslContext(SslContext sslContext) { - checkState(!freezeProtocolNegotiatorFactory, - "Cannot change security when using ServerCredentials"); - if (sslContext != null) { - checkArgument(sslContext.isServer(), - "Client SSL context can not be used for server"); - GrpcSslContexts.ensureAlpnAndH2Enabled(sslContext.applicationProtocolNegotiator()); - protocolNegotiatorFactory = ProtocolNegotiators.serverTlsFactory(sslContext); - } else { - protocolNegotiatorFactory = ProtocolNegotiators.serverPlaintextFactory(); - } - return this; - } - - /** - * Sets the {@link ProtocolNegotiator} to be used. Overrides the value specified in {@link - * #sslContext(SslContext)}. - */ - @Internal - public final PinpointNettyServerBuilder protocolNegotiator(ProtocolNegotiator protocolNegotiator) { - checkState(!freezeProtocolNegotiatorFactory, - "Cannot change security when using ServerCredentials"); - this.protocolNegotiatorFactory = ProtocolNegotiators.fixedServerFactory(protocolNegotiator); - return this; - } - - public void setTracingEnabled(boolean value) { - this.serverImplBuilder.setTracingEnabled(value); - } - - public void setStatsEnabled(boolean value) { - this.serverImplBuilder.setStatsEnabled(value); - } - - public void setStatsRecordStartedRpcs(boolean value) { - this.serverImplBuilder.setStatsRecordStartedRpcs(value); - } - - public void setStatsRecordRealTimeMetrics(boolean value) { - this.serverImplBuilder.setStatsRecordRealTimeMetrics(value); - } - - /** - * The maximum number of concurrent calls permitted for each incoming connection. Defaults to no - * limit. - */ - public PinpointNettyServerBuilder maxConcurrentCallsPerConnection(int maxCalls) { - checkArgument(maxCalls > 0, "max must be positive: %s", maxCalls); - this.maxConcurrentCallsPerConnection = maxCalls; - return this; - } - - /** - * Sets the initial flow control window in bytes. Setting initial flow control window enables auto - * flow control tuning using bandwidth-delay product algorithm. To disable auto flow control - * tuning, use {@link #flowControlWindow(int)}. By default, auto flow control is enabled with - * initial flow control window size of {@link #DEFAULT_FLOW_CONTROL_WINDOW}. - */ - public PinpointNettyServerBuilder initialFlowControlWindow(int initialFlowControlWindow) { - checkArgument(initialFlowControlWindow > 0, "initialFlowControlWindow must be positive"); - this.flowControlWindow = initialFlowControlWindow; - this.autoFlowControl = true; - return this; - } - - /** - * Sets the flow control window in bytes. Setting flowControlWindow disables auto flow control - * tuning; use {@link #initialFlowControlWindow(int)} to enable auto flow control tuning. If not - * called, the default value is {@link #DEFAULT_FLOW_CONTROL_WINDOW}) with auto flow control - * tuning. - */ - public PinpointNettyServerBuilder flowControlWindow(int flowControlWindow) { - checkArgument(flowControlWindow > 0, "flowControlWindow must be positive: %s", - flowControlWindow); - this.flowControlWindow = flowControlWindow; - this.autoFlowControl = false; - return this; - } - - /** - * Sets the maximum message size allowed to be received on the server. If not called, - * defaults to 4 MiB. The default provides protection to services who haven't considered the - * possibility of receiving large messages while trying to be large enough to not be hit in normal - * usage. - * - * @deprecated Call {@link #maxInboundMessageSize} instead. This method will be removed in a - * future release. - */ - @Deprecated - public PinpointNettyServerBuilder maxMessageSize(int maxMessageSize) { - return maxInboundMessageSize(maxMessageSize); - } - - /** {@inheritDoc} */ - @Override - public PinpointNettyServerBuilder maxInboundMessageSize(int bytes) { - checkArgument(bytes >= 0, "bytes must be non-negative: %s", bytes); - this.maxMessageSize = bytes; - return this; - } - - /** - * Sets the maximum size of header list allowed to be received. This is cumulative size of the - * headers with some overhead, as defined for - * - * HTTP/2's SETTINGS_MAX_HEADER_LIST_SIZE. The default is 8 KiB. - * - * @deprecated Use {@link #maxInboundMetadataSize} instead - */ - @Deprecated - public PinpointNettyServerBuilder maxHeaderListSize(int maxHeaderListSize) { - return maxInboundMetadataSize(maxHeaderListSize); - } - - /** - * Sets the maximum size of metadata allowed to be received. This is cumulative size of the - * entries with some overhead, as defined for - * - * HTTP/2's SETTINGS_MAX_HEADER_LIST_SIZE. The default is 8 KiB. - * - * @param bytes the maximum size of received metadata - * @return this - * @throws IllegalArgumentException if bytes is non-positive - * @since 1.17.0 - */ - @Override - public PinpointNettyServerBuilder maxInboundMetadataSize(int bytes) { - checkArgument(bytes > 0, "maxInboundMetadataSize must be positive: %s", bytes); - this.maxHeaderListSize = bytes; - return this; - } - - /** - * Sets a custom keepalive time, the delay time for sending next keepalive ping. An unreasonably - * small value might be increased, and {@code Long.MAX_VALUE} nano seconds or an unreasonably - * large value will disable keepalive. - * - * @since 1.3.0 - */ - public PinpointNettyServerBuilder keepAliveTime(long keepAliveTime, TimeUnit timeUnit) { - checkArgument(keepAliveTime > 0L, "keepalive time must be positiveļ¼š%s", keepAliveTime); - keepAliveTimeInNanos = timeUnit.toNanos(keepAliveTime); - keepAliveTimeInNanos = KeepAliveManager.clampKeepAliveTimeInNanos(keepAliveTimeInNanos); - if (keepAliveTimeInNanos >= AS_LARGE_AS_INFINITE) { - // Bump keepalive time to infinite. This disables keep alive. - keepAliveTimeInNanos = SERVER_KEEPALIVE_TIME_NANOS_DISABLED; - } - if (keepAliveTimeInNanos < MIN_KEEPALIVE_TIME_NANO) { - // Bump keepalive time. - keepAliveTimeInNanos = MIN_KEEPALIVE_TIME_NANO; - } - return this; - } - - /** - * Sets a custom keepalive timeout, the timeout for keepalive ping requests. An unreasonably small - * value might be increased. - * - * @since 1.3.0 - */ - public PinpointNettyServerBuilder keepAliveTimeout(long keepAliveTimeout, TimeUnit timeUnit) { - checkArgument(keepAliveTimeout > 0L, "keepalive timeout must be positive: %s", - keepAliveTimeout); - keepAliveTimeoutInNanos = timeUnit.toNanos(keepAliveTimeout); - keepAliveTimeoutInNanos = - KeepAliveManager.clampKeepAliveTimeoutInNanos(keepAliveTimeoutInNanos); - if (keepAliveTimeoutInNanos < MIN_KEEPALIVE_TIMEOUT_NANO) { - // Bump keepalive timeout. - keepAliveTimeoutInNanos = MIN_KEEPALIVE_TIMEOUT_NANO; - } - return this; - } - - /** - * Sets a custom max connection idle time, connection being idle for longer than which will be - * gracefully terminated. Idleness duration is defined since the most recent time the number of - * outstanding RPCs became zero or the connection establishment. An unreasonably small value might - * be increased. {@code Long.MAX_VALUE} nano seconds or an unreasonably large value will disable - * max connection idle. - * - * @since 1.4.0 - */ - public PinpointNettyServerBuilder maxConnectionIdle(long maxConnectionIdle, TimeUnit timeUnit) { - checkArgument(maxConnectionIdle > 0L, "max connection idle must be positive: %s", - maxConnectionIdle); - maxConnectionIdleInNanos = timeUnit.toNanos(maxConnectionIdle); - if (maxConnectionIdleInNanos >= AS_LARGE_AS_INFINITE) { - maxConnectionIdleInNanos = MAX_CONNECTION_IDLE_NANOS_DISABLED; - } - if (maxConnectionIdleInNanos < MIN_MAX_CONNECTION_IDLE_NANO) { - maxConnectionIdleInNanos = MIN_MAX_CONNECTION_IDLE_NANO; - } - return this; - } - - /** - * Sets a custom max connection age, connection lasting longer than which will be gracefully - * terminated. An unreasonably small value might be increased. A random jitter of +/-10% will be - * added to it. {@code Long.MAX_VALUE} nano seconds or an unreasonably large value will disable - * max connection age. - * - * @since 1.3.0 - */ - public PinpointNettyServerBuilder maxConnectionAge(long maxConnectionAge, TimeUnit timeUnit) { - checkArgument(maxConnectionAge > 0L, "max connection age must be positive: %s", - maxConnectionAge); - maxConnectionAgeInNanos = timeUnit.toNanos(maxConnectionAge); - if (maxConnectionAgeInNanos >= AS_LARGE_AS_INFINITE) { - maxConnectionAgeInNanos = MAX_CONNECTION_AGE_NANOS_DISABLED; - } - if (maxConnectionAgeInNanos < MIN_MAX_CONNECTION_AGE_NANO) { - maxConnectionAgeInNanos = MIN_MAX_CONNECTION_AGE_NANO; - } - return this; - } - - /** - * Sets a custom grace time for the graceful connection termination. Once the max connection age - * is reached, RPCs have the grace time to complete. RPCs that do not complete in time will be - * cancelled, allowing the connection to terminate. {@code Long.MAX_VALUE} nano seconds or an - * unreasonably large value are considered infinite. - * - * @see #maxConnectionAge(long, TimeUnit) - * @since 1.3.0 - */ - public PinpointNettyServerBuilder maxConnectionAgeGrace(long maxConnectionAgeGrace, TimeUnit timeUnit) { - checkArgument(maxConnectionAgeGrace >= 0L, "max connection age grace must be non-negative: %s", - maxConnectionAgeGrace); - maxConnectionAgeGraceInNanos = timeUnit.toNanos(maxConnectionAgeGrace); - if (maxConnectionAgeGraceInNanos >= AS_LARGE_AS_INFINITE) { - maxConnectionAgeGraceInNanos = MAX_CONNECTION_AGE_GRACE_NANOS_INFINITE; - } - return this; - } - - /** - * Specify the most aggressive keep-alive time clients are permitted to configure. The server will - * try to detect clients exceeding this rate and when detected will forcefully close the - * connection. The default is 5 minutes. - * - *

Even though a default is defined that allows some keep-alives, clients must not use - * keep-alive without approval from the service owner. Otherwise, they may experience failures in - * the future if the service becomes more restrictive. When unthrottled, keep-alives can cause a - * significant amount of traffic and CPU usage, so clients and servers should be conservative in - * what they use and accept. - * - * @see #permitKeepAliveWithoutCalls(boolean) - * @since 1.3.0 - */ - public PinpointNettyServerBuilder permitKeepAliveTime(long keepAliveTime, TimeUnit timeUnit) { - checkArgument(keepAliveTime >= 0, "permit keepalive time must be non-negative: %s", - keepAliveTime); - permitKeepAliveTimeInNanos = timeUnit.toNanos(keepAliveTime); - return this; - } - - /** - * Sets whether to allow clients to send keep-alive HTTP/2 PINGs even if there are no outstanding - * RPCs on the connection. Defaults to {@code false}. - * - * @see #permitKeepAliveTime(long, TimeUnit) - * @since 1.3.0 - */ - public PinpointNettyServerBuilder permitKeepAliveWithoutCalls(boolean permit) { - permitKeepAliveWithoutCalls = permit; - return this; - } - - /** Sets the EAG attributes available to protocol negotiators. Not for general use. */ - void eagAttributes(Attributes eagAttributes) { - this.eagAttributes = checkNotNull(eagAttributes, "eagAttributes"); - } - - //-------------------------------- modify pinpoint - private ServerListenerDelegator serverListenerDelegator = new EmptyServerListenerDelegator(); - - public void serverListenerDelegator(ServerListenerDelegator serverListenerDelegator) { - this.serverListenerDelegator = Objects.requireNonNull(serverListenerDelegator, "serverListenerDelegator"); - } - //-------------------------------- modify pinpoint - - @CheckReturnValue - NettyServer buildTransportServers( - List streamTracerFactories) { - assertEventLoopsAndChannelType(); - - ProtocolNegotiator negotiator = protocolNegotiatorFactory.newNegotiator( - this.serverImplBuilder.getExecutorPool()); - - final NettyServer transportServer = new NettyServer(listenAddresses, channelFactory, channelOptions, childChannelOptions, - bossEventLoopGroupPool, workerEventLoopGroupPool, forceHeapBuffer, negotiator, - streamTracerFactories, transportTracerFactory, maxConcurrentCallsPerConnection, - autoFlowControl, flowControlWindow, maxMessageSize, maxHeaderListSize, - keepAliveTimeInNanos, keepAliveTimeoutInNanos, - maxConnectionIdleInNanos, maxConnectionAgeInNanos, - maxConnectionAgeGraceInNanos, permitKeepAliveWithoutCalls, permitKeepAliveTimeInNanos, - eagAttributes, this.serverImplBuilder.getChannelz()) { - //-------------------------------- modify pinpoint - @Override - public void start(final ServerListener serverListener) throws IOException { - ServerListener delegate = serverListenerDelegator.wrapServerListener(serverListener); - super.start(delegate); - } - //-------------------------------- modify pinpoint - }; - - return transportServer; - } - - @VisibleForTesting - void assertEventLoopsAndChannelType() { - boolean allProvided = channelFactory != Utils.DEFAULT_SERVER_CHANNEL_FACTORY - && bossEventLoopGroupPool != DEFAULT_BOSS_EVENT_LOOP_GROUP_POOL - && workerEventLoopGroupPool != DEFAULT_WORKER_EVENT_LOOP_GROUP_POOL; - boolean nonProvided = channelFactory == Utils.DEFAULT_SERVER_CHANNEL_FACTORY - && bossEventLoopGroupPool == DEFAULT_BOSS_EVENT_LOOP_GROUP_POOL - && workerEventLoopGroupPool == DEFAULT_WORKER_EVENT_LOOP_GROUP_POOL; - checkState( - allProvided || nonProvided, - "All of BossEventLoopGroup, WorkerEventLoopGroup and ChannelType should be provided or " - + "neither should be"); - } - - PinpointNettyServerBuilder setTransportTracerFactory( - TransportTracer.Factory transportTracerFactory) { - this.transportTracerFactory = transportTracerFactory; - return this; - } - - @Override - public PinpointNettyServerBuilder useTransportSecurity(File certChain, File privateKey) { - checkState(!freezeProtocolNegotiatorFactory, - "Cannot change security when using ServerCredentials"); - SslContext sslContext; - try { - sslContext = GrpcSslContexts.forServer(certChain, privateKey).build(); - } catch (SSLException e) { - // This should likely be some other, easier to catch exception. - throw new RuntimeException(e); - } - protocolNegotiatorFactory = ProtocolNegotiators.serverTlsFactory(sslContext); - return this; - } - - @Override - public PinpointNettyServerBuilder useTransportSecurity(InputStream certChain, InputStream privateKey) { - checkState(!freezeProtocolNegotiatorFactory, - "Cannot change security when using ServerCredentials"); - SslContext sslContext; - try { - sslContext = GrpcSslContexts.forServer(certChain, privateKey).build(); - } catch (SSLException e) { - // This should likely be some other, easier to catch exception. - throw new RuntimeException(e); - } - protocolNegotiatorFactory = ProtocolNegotiators.serverTlsFactory(sslContext); - return this; - } -} - - diff --git a/grpc/src/main/java/io/grpc/netty/ServerListenerDelegator.java b/grpc/src/main/java/io/grpc/netty/ServerListenerDelegator.java deleted file mode 100644 index 9cd2362db0145..0000000000000 --- a/grpc/src/main/java/io/grpc/netty/ServerListenerDelegator.java +++ /dev/null @@ -1,7 +0,0 @@ -package io.grpc.netty; - -import io.grpc.internal.ServerListener; - -public interface ServerListenerDelegator { - ServerListener wrapServerListener(ServerListener serverListener); -} diff --git a/grpc/src/test/java/com/navercorp/pinpoint/grpc/ChannelFactoryTest.java b/grpc/src/test/java/com/navercorp/pinpoint/grpc/ChannelFactoryTest.java index 2db05cedec51e..228c8f5fa6c90 100644 --- a/grpc/src/test/java/com/navercorp/pinpoint/grpc/ChannelFactoryTest.java +++ b/grpc/src/test/java/com/navercorp/pinpoint/grpc/ChannelFactoryTest.java @@ -151,7 +151,8 @@ private PSpan newSpan() { } - private static Server serverStart(ExecutorService executorService) throws SSLException { + private static Server serverStart(ExecutorService executorService) + throws SSLException, NoSuchFieldException, IllegalAccessException { logger.debug("server start"); serverFactory = new ServerFactory(ChannelFactoryTest.class.getSimpleName() + "-server", "127.0.0.1", PORT, executorService, null, ServerOption.newBuilder().build()); diff --git a/realtime/realtime-collector/src/test/java/com/navercorp/pinpoint/realtime/collector/receiver/grpc/GrpcCommandServiceTest.java b/realtime/realtime-collector/src/test/java/com/navercorp/pinpoint/realtime/collector/receiver/grpc/GrpcCommandServiceTest.java index d5c1d35b6cd62..9ae391b88a9b1 100644 --- a/realtime/realtime-collector/src/test/java/com/navercorp/pinpoint/realtime/collector/receiver/grpc/GrpcCommandServiceTest.java +++ b/realtime/realtime-collector/src/test/java/com/navercorp/pinpoint/realtime/collector/receiver/grpc/GrpcCommandServiceTest.java @@ -320,7 +320,7 @@ public ServerCall.Listener interceptCall(ServerCall