From 42acf13e4532541337a68098e698e573529b1a8c Mon Sep 17 00:00:00 2001 From: Kim YoungJin Date: Thu, 24 Aug 2023 18:34:01 +0900 Subject: [PATCH] [#9636] Added log livetail feature --- grpc/grpc-idl | 2 +- log/log-collector/pom.xml | 133 ++++++++++++++++ .../log/collector/LogCollectorConfig.java | 37 +++++ .../log/collector/LogCollectorModule.java | 28 ++++ .../LogCollectorPropertySources.java | 41 +++++ .../collector/grpc/EmptyStreamObserver.java | 34 +++++ .../grpc/GrpcLogReceiverProperties.java | 39 +++++ .../grpc/GrpcLogReceiverPropertiesConfig.java | 99 ++++++++++++ .../grpc/LogCollectorGrpcServerConfig.java | 137 +++++++++++++++++ .../collector/grpc/LogConnectionHandler.java | 99 ++++++++++++ .../log/collector/grpc/LogGrpcService.java | 93 ++++++++++++ .../grpc/context/LogAgentHeader.java | 47 ++++++ .../grpc/context/LogAgentHeaderReader.java | 45 ++++++ .../LogHeaderPropagationInterceptor.java | 74 +++++++++ .../redis/LogCollectorRedisServerConfig.java | 81 ++++++++++ .../collector/redis/LogFileListServer.java | 109 ++++++++++++++ .../repository/LogAcceptorRepository.java | 51 +++++++ .../log/collector/repository/LogConsumer.java | 27 ++++ .../repository/LogConsumerRepository.java | 55 +++++++ .../repository/LogDemandAcceptor.java | 27 ++++ .../collector/service/LogConsumerService.java | 34 +++++ .../service/LogConsumerServiceImpl.java | 86 +++++++++++ .../collector/service/LogProviderService.java | 32 ++++ .../service/LogProviderServiceImpl.java | 67 +++++++++ .../collector/service/LogServiceConfig.java | 69 +++++++++ ...inpoint-collector-log-grpc-root.properties | 16 ++ .../pinpoint-collector-log-root.properties | 0 .../pinpoint-collector-log-grpc.properties | 34 +++++ .../pinpoint-collector-log-grpc.properties | 34 +++++ .../repository/LogAcceptorRepositoryTest.java | 60 ++++++++ .../repository/LogConsumerRepositoryTest.java | 56 +++++++ .../log/collector/service/LogServiceTest.java | 75 +++++++++ log/log-common/pom.xml | 35 +++++ .../log/LogServiceProtocolConfig.java | 57 +++++++ .../navercorp/pinpoint/log/dto/LogDemand.java | 51 +++++++ .../navercorp/pinpoint/log/dto/LogSupply.java | 35 +++++ .../navercorp/pinpoint/log/vo/FileKey.java | 72 +++++++++ .../navercorp/pinpoint/log/vo/HostKey.java | 63 ++++++++ .../com/navercorp/pinpoint/log/vo/Log.java | 39 +++++ .../pinpoint/log/vo/LogFileList.java | 48 ++++++ .../navercorp/pinpoint/log/vo/LogPile.java | 41 +++++ log/log-web/pom.xml | 29 ++++ .../pinpoint/log/web/LogWebConfig.java | 15 ++ .../pinpoint/log/web/LogWebModule.java | 29 ++++ .../log/web/LogWebPropertySources.java | 34 +++++ .../log/web/controller/LogController.java | 74 +++++++++ .../web/controller/LogControllerConfig.java | 30 ++++ .../pinpoint/log/web/dao/LiveTailDao.java | 36 +++++ .../pinpoint/log/web/dao/LiveTailDaoImpl.java | 112 ++++++++++++++ .../pinpoint/log/web/dao/LogWebDaoConfig.java | 67 +++++++++ .../log/web/service/LiveTailService.java | 36 +++++ .../log/web/service/LiveTailServiceImpl.java | 53 +++++++ .../log/web/service/LogServiceConfig.java | 37 +++++ .../pinpoint/log/web/vo/LogHost.java | 60 ++++++++ .../log/web/websocket/LogWebSocketConfig.java | 43 ++++++ .../web/websocket/LogWebSocketHandler.java | 142 ++++++++++++++++++ .../log/pinpoint-web-log-root.properties | 0 .../local/pinpoint-web-log.properties | 1 + .../release/pinpoint-web-log.properties | 1 + log/pom.xml | 21 +++ metric-module/collector-starter/pom.xml | 4 + .../multi/application/MultiApplication.java | 12 ++ .../metric/collector/CollectorType.java | 1 + metric-module/web-starter/pom.xml | 4 + .../web/starter/multi/MetricAndWebApp.java | 4 +- pom.xml | 16 ++ .../redis/kv/RedisKVChannelConfig.java | 8 +- .../channel/redis/kv/RedisKVPubChannel.java | 14 +- .../redis/kv/RedisKVPubChannelProvider.java | 16 +- .../channel/redis/kv/RedisKVSubChannel.java | 10 +- .../redis/kv/RedisKVSubChannelProvider.java | 10 +- .../redis/pubsub/RedisPubSubConfig.java | 6 +- .../redis/pubsub/RedisSubChannelProvider.java | 4 +- .../redis/stream/RedisStreamConfig.java | 6 +- .../stream/RedisStreamPubChannelProvider.java | 2 +- .../stream/RedisStreamSubChannelProvider.java | 4 +- .../pinpoint/redis/RedisBasicConfig.java | 10 +- 77 files changed, 3171 insertions(+), 42 deletions(-) create mode 100644 log/log-collector/pom.xml create mode 100644 log/log-collector/src/main/java/com/navercorp/pinpoint/log/collector/LogCollectorConfig.java create mode 100644 log/log-collector/src/main/java/com/navercorp/pinpoint/log/collector/LogCollectorModule.java create mode 100644 log/log-collector/src/main/java/com/navercorp/pinpoint/log/collector/LogCollectorPropertySources.java create mode 100644 log/log-collector/src/main/java/com/navercorp/pinpoint/log/collector/grpc/EmptyStreamObserver.java create mode 100644 log/log-collector/src/main/java/com/navercorp/pinpoint/log/collector/grpc/GrpcLogReceiverProperties.java create mode 100644 log/log-collector/src/main/java/com/navercorp/pinpoint/log/collector/grpc/GrpcLogReceiverPropertiesConfig.java create mode 100644 log/log-collector/src/main/java/com/navercorp/pinpoint/log/collector/grpc/LogCollectorGrpcServerConfig.java create mode 100644 log/log-collector/src/main/java/com/navercorp/pinpoint/log/collector/grpc/LogConnectionHandler.java create mode 100644 log/log-collector/src/main/java/com/navercorp/pinpoint/log/collector/grpc/LogGrpcService.java create mode 100644 log/log-collector/src/main/java/com/navercorp/pinpoint/log/collector/grpc/context/LogAgentHeader.java create mode 100644 log/log-collector/src/main/java/com/navercorp/pinpoint/log/collector/grpc/context/LogAgentHeaderReader.java create mode 100644 log/log-collector/src/main/java/com/navercorp/pinpoint/log/collector/grpc/context/LogHeaderPropagationInterceptor.java create mode 100644 log/log-collector/src/main/java/com/navercorp/pinpoint/log/collector/redis/LogCollectorRedisServerConfig.java create mode 100644 log/log-collector/src/main/java/com/navercorp/pinpoint/log/collector/redis/LogFileListServer.java create mode 100644 log/log-collector/src/main/java/com/navercorp/pinpoint/log/collector/repository/LogAcceptorRepository.java create mode 100644 log/log-collector/src/main/java/com/navercorp/pinpoint/log/collector/repository/LogConsumer.java create mode 100644 log/log-collector/src/main/java/com/navercorp/pinpoint/log/collector/repository/LogConsumerRepository.java create mode 100644 log/log-collector/src/main/java/com/navercorp/pinpoint/log/collector/repository/LogDemandAcceptor.java create mode 100644 log/log-collector/src/main/java/com/navercorp/pinpoint/log/collector/service/LogConsumerService.java create mode 100644 log/log-collector/src/main/java/com/navercorp/pinpoint/log/collector/service/LogConsumerServiceImpl.java create mode 100644 log/log-collector/src/main/java/com/navercorp/pinpoint/log/collector/service/LogProviderService.java create mode 100644 log/log-collector/src/main/java/com/navercorp/pinpoint/log/collector/service/LogProviderServiceImpl.java create mode 100644 log/log-collector/src/main/java/com/navercorp/pinpoint/log/collector/service/LogServiceConfig.java create mode 100644 log/log-collector/src/main/resources/log/pinpoint-collector-log-grpc-root.properties create mode 100644 log/log-collector/src/main/resources/log/pinpoint-collector-log-root.properties create mode 100644 log/log-collector/src/main/resources/log/profiles/local/pinpoint-collector-log-grpc.properties create mode 100644 log/log-collector/src/main/resources/log/profiles/release/pinpoint-collector-log-grpc.properties create mode 100644 log/log-collector/src/test/java/com/navercorp/pinpoint/log/collector/repository/LogAcceptorRepositoryTest.java create mode 100644 log/log-collector/src/test/java/com/navercorp/pinpoint/log/collector/repository/LogConsumerRepositoryTest.java create mode 100644 log/log-collector/src/test/java/com/navercorp/pinpoint/log/collector/service/LogServiceTest.java create mode 100644 log/log-common/pom.xml create mode 100644 log/log-common/src/main/java/com/navercorp/pinpoint/log/LogServiceProtocolConfig.java create mode 100644 log/log-common/src/main/java/com/navercorp/pinpoint/log/dto/LogDemand.java create mode 100644 log/log-common/src/main/java/com/navercorp/pinpoint/log/dto/LogSupply.java create mode 100644 log/log-common/src/main/java/com/navercorp/pinpoint/log/vo/FileKey.java create mode 100644 log/log-common/src/main/java/com/navercorp/pinpoint/log/vo/HostKey.java create mode 100644 log/log-common/src/main/java/com/navercorp/pinpoint/log/vo/Log.java create mode 100644 log/log-common/src/main/java/com/navercorp/pinpoint/log/vo/LogFileList.java create mode 100644 log/log-common/src/main/java/com/navercorp/pinpoint/log/vo/LogPile.java create mode 100644 log/log-web/pom.xml create mode 100644 log/log-web/src/main/java/com/navercorp/pinpoint/log/web/LogWebConfig.java create mode 100644 log/log-web/src/main/java/com/navercorp/pinpoint/log/web/LogWebModule.java create mode 100644 log/log-web/src/main/java/com/navercorp/pinpoint/log/web/LogWebPropertySources.java create mode 100644 log/log-web/src/main/java/com/navercorp/pinpoint/log/web/controller/LogController.java create mode 100644 log/log-web/src/main/java/com/navercorp/pinpoint/log/web/controller/LogControllerConfig.java create mode 100644 log/log-web/src/main/java/com/navercorp/pinpoint/log/web/dao/LiveTailDao.java create mode 100644 log/log-web/src/main/java/com/navercorp/pinpoint/log/web/dao/LiveTailDaoImpl.java create mode 100644 log/log-web/src/main/java/com/navercorp/pinpoint/log/web/dao/LogWebDaoConfig.java create mode 100644 log/log-web/src/main/java/com/navercorp/pinpoint/log/web/service/LiveTailService.java create mode 100644 log/log-web/src/main/java/com/navercorp/pinpoint/log/web/service/LiveTailServiceImpl.java create mode 100644 log/log-web/src/main/java/com/navercorp/pinpoint/log/web/service/LogServiceConfig.java create mode 100644 log/log-web/src/main/java/com/navercorp/pinpoint/log/web/vo/LogHost.java create mode 100644 log/log-web/src/main/java/com/navercorp/pinpoint/log/web/websocket/LogWebSocketConfig.java create mode 100644 log/log-web/src/main/java/com/navercorp/pinpoint/log/web/websocket/LogWebSocketHandler.java create mode 100644 log/log-web/src/main/resources/log/pinpoint-web-log-root.properties create mode 100644 log/log-web/src/main/resources/log/profiles/local/pinpoint-web-log.properties create mode 100644 log/log-web/src/main/resources/log/profiles/release/pinpoint-web-log.properties create mode 100644 log/pom.xml diff --git a/grpc/grpc-idl b/grpc/grpc-idl index 060ccca3b0d5c..342c0a9f11f67 160000 --- a/grpc/grpc-idl +++ b/grpc/grpc-idl @@ -1 +1 @@ -Subproject commit 060ccca3b0d5c9fa89a874b13e25d00d5230711b +Subproject commit 342c0a9f11f67fbc2b016772d6111eb0b5ff89e8 diff --git a/log/log-collector/pom.xml b/log/log-collector/pom.xml new file mode 100644 index 0000000000000..3a5752083c1c2 --- /dev/null +++ b/log/log-collector/pom.xml @@ -0,0 +1,133 @@ + + + + pinpoint-log-module + com.navercorp.pinpoint + 2.6.0-SNAPSHOT + + 4.0.0 + + pinpoint-log-collector + pinpoint-log-collector + jar + + + 11 + ${env.JAVA_11_HOME} + + 2.19.0 + + + false + + + ${project.artifactId}-boot-${project.version} + + + + + + javax.annotation + javax.annotation-api + 1.3.2 + + + com.google.guava + guava + + + io.grpc + grpc-stub + ${grpc.version} + + + io.grpc + grpc-protobuf + ${grpc.version} + + + + org.apache.logging.log4j + log4j-core + ${log4j.version} + + + org.apache.logging.log4j + log4j-api + ${log4j.version} + + + + com.navercorp.pinpoint + pinpoint-grpc + + + com.navercorp.pinpoint + pinpoint-collector + + + com.navercorp.pinpoint + pinpoint-log-common + + + org.springframework.kafka + spring-kafka + 2.9.4 + + + + org.assertj + assertj-core + 3.23.1 + test + + + org.junit.jupiter + junit-jupiter-api + test + + + org.testcontainers + junit-jupiter + 1.17.6 + test + + + org.testcontainers + testcontainers + ${testcontainers.version} + test + + + org.springframework.boot + spring-boot-starter-test + ${spring.boot.version} + test + + + org.springframework.boot + spring-boot-starter-logging + + + + + + + + + org.springframework.boot + spring-boot-maven-plugin + + com.navercorp.pinpoint.log.collector.LogCollectorApp + ${project.build.directory}/deploy + true + false + ${pinpoint.log.collector.executable.name} + + + + + + \ No newline at end of file diff --git a/log/log-collector/src/main/java/com/navercorp/pinpoint/log/collector/LogCollectorConfig.java b/log/log-collector/src/main/java/com/navercorp/pinpoint/log/collector/LogCollectorConfig.java new file mode 100644 index 0000000000000..8d8dd796e9cff --- /dev/null +++ b/log/log-collector/src/main/java/com/navercorp/pinpoint/log/collector/LogCollectorConfig.java @@ -0,0 +1,37 @@ +/* + * Copyright 2023 NAVER Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.navercorp.pinpoint.log.collector; + +import com.codahale.metrics.MetricRegistry; +import com.navercorp.pinpoint.log.collector.grpc.LogCollectorGrpcServerConfig; +import com.navercorp.pinpoint.log.collector.redis.LogCollectorRedisServerConfig; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Import; + +/** + * @author youngjin.kim2 + */ +@Configuration(proxyBeanMethods = false) +@Import({ LogCollectorGrpcServerConfig.class, LogCollectorRedisServerConfig.class }) +public class LogCollectorConfig { + + @Bean + MetricRegistry metricRegistry() { + return new MetricRegistry(); + } + +} diff --git a/log/log-collector/src/main/java/com/navercorp/pinpoint/log/collector/LogCollectorModule.java b/log/log-collector/src/main/java/com/navercorp/pinpoint/log/collector/LogCollectorModule.java new file mode 100644 index 0000000000000..a6e85d4d1a360 --- /dev/null +++ b/log/log-collector/src/main/java/com/navercorp/pinpoint/log/collector/LogCollectorModule.java @@ -0,0 +1,28 @@ +/* + * Copyright 2023 NAVER Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.navercorp.pinpoint.log.collector; + +import com.navercorp.pinpoint.redis.RedisPropertySources; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Import; + +/** + * @author youngjin.kim2 + */ +@Configuration(proxyBeanMethods = false) +@Import({ LogCollectorConfig.class, RedisPropertySources.class, LogCollectorPropertySources.class }) +public class LogCollectorModule { +} \ No newline at end of file diff --git a/log/log-collector/src/main/java/com/navercorp/pinpoint/log/collector/LogCollectorPropertySources.java b/log/log-collector/src/main/java/com/navercorp/pinpoint/log/collector/LogCollectorPropertySources.java new file mode 100644 index 0000000000000..72c62d4be8d7b --- /dev/null +++ b/log/log-collector/src/main/java/com/navercorp/pinpoint/log/collector/LogCollectorPropertySources.java @@ -0,0 +1,41 @@ +/* + * Copyright 2023 NAVER Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.navercorp.pinpoint.log.collector; + +import org.springframework.context.annotation.PropertySource; +import org.springframework.context.annotation.PropertySources; + +import static com.navercorp.pinpoint.log.collector.LogCollectorPropertySources.COLLECTOR; +import static com.navercorp.pinpoint.log.collector.LogCollectorPropertySources.GRPC_PROFILE; +import static com.navercorp.pinpoint.log.collector.LogCollectorPropertySources.GRPC_ROOT; + +/** + * @author youngjin.kim2 + */ +@PropertySources({ + @PropertySource(name = "CollectorLogAppPropertySources-GRPC", value = { GRPC_ROOT, GRPC_PROFILE }), + @PropertySource(name = "CollectorLogAppPropertySources", value = { COLLECTOR }) +}) +public class LogCollectorPropertySources { + + private static final String PROFILE = "classpath:log/profiles/${pinpoint.profiles.active:local}/"; + + public static final String GRPC_ROOT = "classpath:log/pinpoint-collector-log-grpc-root.properties"; + public static final String GRPC_PROFILE = PROFILE + "pinpoint-collector-log-grpc.properties"; + + public static final String COLLECTOR = "classpath:log/pinpoint-collector-log-root.properties"; + +} diff --git a/log/log-collector/src/main/java/com/navercorp/pinpoint/log/collector/grpc/EmptyStreamObserver.java b/log/log-collector/src/main/java/com/navercorp/pinpoint/log/collector/grpc/EmptyStreamObserver.java new file mode 100644 index 0000000000000..394a975c4fdc7 --- /dev/null +++ b/log/log-collector/src/main/java/com/navercorp/pinpoint/log/collector/grpc/EmptyStreamObserver.java @@ -0,0 +1,34 @@ +/* + * Copyright 2023 NAVER Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.navercorp.pinpoint.log.collector.grpc; + +import com.google.protobuf.Empty; +import io.grpc.stub.StreamObserver; + +/** + * @author youngjin.kim2 + */ +public class EmptyStreamObserver implements StreamObserver { + @Override public void onNext(Empty t) {} + @Override public void onError(Throwable throwable) {} + @Override public void onCompleted() {} + + @SuppressWarnings("unchecked") + static StreamObserver create() { + return (StreamObserver) new EmptyStreamObserver(); + } + +} diff --git a/log/log-collector/src/main/java/com/navercorp/pinpoint/log/collector/grpc/GrpcLogReceiverProperties.java b/log/log-collector/src/main/java/com/navercorp/pinpoint/log/collector/grpc/GrpcLogReceiverProperties.java new file mode 100644 index 0000000000000..1d4fb58b2a316 --- /dev/null +++ b/log/log-collector/src/main/java/com/navercorp/pinpoint/log/collector/grpc/GrpcLogReceiverProperties.java @@ -0,0 +1,39 @@ +/* + * Copyright 2023 NAVER Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.navercorp.pinpoint.log.collector.grpc; + +import com.navercorp.pinpoint.collector.config.ExecutorProperties; +import com.navercorp.pinpoint.collector.grpc.config.GrpcReceiverProperties; +import com.navercorp.pinpoint.collector.receiver.BindAddress; +import com.navercorp.pinpoint.grpc.server.ServerOption; + +/** + * @author youngjin.kim2 + */ +final class GrpcLogReceiverProperties extends GrpcReceiverProperties { + + GrpcLogReceiverProperties( + boolean enable, + BindAddress bindAddress, + ExecutorProperties serverExecutor, + ExecutorProperties serverCallExecutor, + ExecutorProperties workerExecutor, + ServerOption serverOption + ) { + super(enable, bindAddress, serverExecutor, serverCallExecutor, workerExecutor, serverOption); + } + +} diff --git a/log/log-collector/src/main/java/com/navercorp/pinpoint/log/collector/grpc/GrpcLogReceiverPropertiesConfig.java b/log/log-collector/src/main/java/com/navercorp/pinpoint/log/collector/grpc/GrpcLogReceiverPropertiesConfig.java new file mode 100644 index 0000000000000..e5353970b8970 --- /dev/null +++ b/log/log-collector/src/main/java/com/navercorp/pinpoint/log/collector/grpc/GrpcLogReceiverPropertiesConfig.java @@ -0,0 +1,99 @@ +/* + * Copyright 2023 NAVER Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.navercorp.pinpoint.log.collector.grpc; + +import com.navercorp.pinpoint.collector.config.ExecutorProperties; +import com.navercorp.pinpoint.collector.grpc.config.GrpcPropertiesServerOptionBuilder; +import com.navercorp.pinpoint.collector.receiver.BindAddress; +import com.navercorp.pinpoint.grpc.server.ServerOption; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.core.env.Environment; + +/** + * @author youngjin.kim2 + */ +@Configuration +@EnableConfigurationProperties +public class GrpcLogReceiverPropertiesConfig { + + public static final String BIND_ADDRESS = "collector.receiver.grpc.log.bindaddress"; + + public static final String SERVER_EXECUTOR = "collector.receiver.grpc.log.server.executor"; + + public static final String SERVER_CALL_EXECUTOR = "collector.receiver.grpc.log.server-call.executor"; + + public static final String WORKER_EXECUTOR = "collector.receiver.grpc.log.worker.executor"; + + public static final String SERVER_OPTION = "collector.receiver.grpc.log"; + + public GrpcLogReceiverPropertiesConfig() { + } + + @Bean(BIND_ADDRESS) + @ConfigurationProperties(BIND_ADDRESS) + public BindAddress.Builder newBindAddressBuilder() { + return BindAddress.newBuilder(); + } + + @Bean(SERVER_EXECUTOR) + @ConfigurationProperties(SERVER_EXECUTOR) + public ExecutorProperties.Builder newServerExecutorBuilder() { + return ExecutorProperties.newBuilder(); + } + + @Bean(SERVER_CALL_EXECUTOR) + @ConfigurationProperties(SERVER_CALL_EXECUTOR) + public ExecutorProperties.Builder newServerCallExecutorBuilder() { + return ExecutorProperties.newBuilder(); + } + + @Bean(WORKER_EXECUTOR) + @ConfigurationProperties(WORKER_EXECUTOR) + public ExecutorProperties.Builder newWorkerExecutorBuilder() { + return ExecutorProperties.newBuilder(); + } + + @Bean(SERVER_OPTION) + @ConfigurationProperties(SERVER_OPTION) + public GrpcPropertiesServerOptionBuilder newServerOption() { + // Server option + return new GrpcPropertiesServerOptionBuilder(); + } + + @Bean("grpcLogReceiverConfig") + public GrpcLogReceiverProperties newLogReceiverConfig(Environment environment) { + boolean enable = environment.getProperty("collector.receiver.grpc.log.enable", boolean.class, false); + + final ServerOption serverOption = newServerOption().build(); + final BindAddress bindAddress = newBindAddressBuilder().build(); + final ExecutorProperties serverExecutor = newServerExecutorBuilder().build(); + final ExecutorProperties serverCallExecutor = newServerCallExecutorBuilder().build(); + final ExecutorProperties workerExecutor = newWorkerExecutorBuilder().build(); + + return new GrpcLogReceiverProperties( + enable, + bindAddress, + serverExecutor, + serverCallExecutor, + workerExecutor, + serverOption + ); + } + +} diff --git a/log/log-collector/src/main/java/com/navercorp/pinpoint/log/collector/grpc/LogCollectorGrpcServerConfig.java b/log/log-collector/src/main/java/com/navercorp/pinpoint/log/collector/grpc/LogCollectorGrpcServerConfig.java new file mode 100644 index 0000000000000..a8558849adc3b --- /dev/null +++ b/log/log-collector/src/main/java/com/navercorp/pinpoint/log/collector/grpc/LogCollectorGrpcServerConfig.java @@ -0,0 +1,137 @@ +/* + * Copyright 2023 NAVER Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.navercorp.pinpoint.log.collector.grpc; + +import com.codahale.metrics.MetricRegistry; +import com.navercorp.pinpoint.collector.config.ExecutorProperties; +import com.navercorp.pinpoint.collector.receiver.ExecutorFactoryBean; +import com.navercorp.pinpoint.collector.receiver.grpc.GrpcReceiver; +import com.navercorp.pinpoint.collector.receiver.grpc.SimpleServerCallExecutorSupplier; +import com.navercorp.pinpoint.common.server.util.AddressFilter; +import com.navercorp.pinpoint.grpc.HeaderReader; +import com.navercorp.pinpoint.grpc.log.LogGrpc; +import com.navercorp.pinpoint.log.collector.grpc.context.LogAgentHeader; +import com.navercorp.pinpoint.log.collector.grpc.context.LogAgentHeaderReader; +import com.navercorp.pinpoint.log.collector.grpc.context.LogHeaderPropagationInterceptor; +import com.navercorp.pinpoint.log.collector.service.LogProviderService; +import com.navercorp.pinpoint.log.collector.service.LogServiceConfig; +import io.grpc.ServerInterceptor; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Import; + +import java.util.List; +import java.util.concurrent.Executor; +import java.util.concurrent.ThreadPoolExecutor; + +/** + * @author youngjin.kim2 + */ +@Configuration +@Import({ GrpcLogReceiverPropertiesConfig.class, LogServiceConfig.class }) +public class LogCollectorGrpcServerConfig { + + @Bean("abortPolicy") + ThreadPoolExecutor.AbortPolicy abortPolicy() { + return new ThreadPoolExecutor.AbortPolicy(); + } + + @Bean("grpcLogServerExecutor") + ExecutorFactoryBean grpcLogServerExecutor( + ThreadPoolExecutor.AbortPolicy abortPolicy, + @Qualifier("grpcLogReceiverConfig") GrpcLogReceiverProperties receiverConfig + ) { + final ExecutorProperties config = receiverConfig.getServerExecutor(); + final ExecutorFactoryBean factory = new ExecutorFactoryBean(); + factory.setRejectedExecutionHandler(abortPolicy); + factory.setDaemon(true); + factory.setWaitForTasksToCompleteOnShutdown(true); + factory.setAwaitTerminationSeconds(10); + factory.setPreStartAllCoreThreads(true); + factory.setLogRate(100); + factory.setExecutorProperties(config); + factory.setThreadNamePrefix("Pinpoint-GrpcLog-Server-"); + return factory; + } + + @Bean("grpcLogServerCallExecutor") + ExecutorFactoryBean grpcLogServerCallExecutor( + ThreadPoolExecutor.AbortPolicy abortPolicy, + @Qualifier("grpcLogReceiverConfig") GrpcLogReceiverProperties receiverConfig, + @Autowired(required = false) MetricRegistry metricRegistry + ) { + final ExecutorProperties config = receiverConfig.getServerCallExecutor(); + final ExecutorFactoryBean factory = new ExecutorFactoryBean(); + factory.setRejectedExecutionHandler(abortPolicy); + factory.setDaemon(true); + factory.setWaitForTasksToCompleteOnShutdown(true); + factory.setAwaitTerminationSeconds(10); + factory.setPreStartAllCoreThreads(true); + factory.setExecutorProperties(config); + factory.setThreadNamePrefix("Pinpoint-GrpcLog-Server-"); + factory.setLogRate(1); + + if (metricRegistry != null) { + factory.setRegistry(metricRegistry); + } + + return factory; + } + + @Bean("logInterceptorList") + List logInterceptorList() { + final HeaderReader headerReader = new LogAgentHeaderReader(); + final ServerInterceptor interceptor = new LogHeaderPropagationInterceptor(headerReader); + return List.of(interceptor); + } + + @Bean("addressFilter") + @ConditionalOnMissingBean(name = "addressFilter") + AddressFilter allAddressFilter() { + return AddressFilter.ALL; + } + + @Bean + LogGrpc.LogImplBase logService(LogProviderService service) { + return new LogGrpcService(service); + } + + @Bean + GrpcReceiver grpcLogReceiver( + @Qualifier("grpcLogReceiverConfig") GrpcLogReceiverProperties receiverConfig, + @Qualifier("grpcLogServerExecutor") Executor serverExecutor, + @Qualifier("grpcLogServerCallExecutor") Executor serverCallExecutor, + @Qualifier("addressFilter") AddressFilter addressFilter, + @Qualifier("logInterceptorList") List logInterceptorList, + LogGrpc.LogImplBase logService + ) { + final GrpcReceiver receiver = new GrpcReceiver(); + receiver.setBindAddress(receiverConfig.getBindAddress()); + receiver.setExecutor(serverExecutor); + receiver.setServerCallExecutorSupplier(new SimpleServerCallExecutorSupplier(serverCallExecutor)); + receiver.setAddressFilter(addressFilter); + receiver.setBindableServiceList(List.of(logService)); + receiver.setServerInterceptorList(logInterceptorList); + receiver.setEnable(receiverConfig.isEnable()); + receiver.setServerOption(receiverConfig.getServerOption()); + + return receiver; + } + +} diff --git a/log/log-collector/src/main/java/com/navercorp/pinpoint/log/collector/grpc/LogConnectionHandler.java b/log/log-collector/src/main/java/com/navercorp/pinpoint/log/collector/grpc/LogConnectionHandler.java new file mode 100644 index 0000000000000..c023c2ddfebfc --- /dev/null +++ b/log/log-collector/src/main/java/com/navercorp/pinpoint/log/collector/grpc/LogConnectionHandler.java @@ -0,0 +1,99 @@ +/* + * Copyright 2023 NAVER Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.navercorp.pinpoint.log.collector.grpc; + +import com.navercorp.pinpoint.grpc.log.PLogDemand; +import com.navercorp.pinpoint.grpc.log.PLogPile; +import com.navercorp.pinpoint.grpc.log.PLogRecord; +import com.navercorp.pinpoint.log.dto.LogDemand; +import com.navercorp.pinpoint.log.vo.FileKey; +import com.navercorp.pinpoint.log.vo.Log; +import com.navercorp.pinpoint.log.vo.LogPile; +import io.grpc.stub.ServerCallStreamObserver; +import io.grpc.stub.StreamObserver; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import reactor.core.Disposable; + +import java.util.List; +import java.util.Objects; +import java.util.function.Consumer; +import java.util.stream.Collectors; + +/** + * @author youngjin.kim2 + */ +class LogConnectionHandler implements StreamObserver, Consumer { + + private final Logger logger = LogManager.getLogger(LogConnectionHandler.class); + + private final Consumer pileConsumer; + private final Disposable disposable; + private final FileKey fileKey; + private final ServerCallStreamObserver responseObserver; + + LogConnectionHandler( + Consumer pileConsumer, + Disposable disposable, + FileKey fileKey, + ServerCallStreamObserver responseObserver + ) { + this.pileConsumer = Objects.requireNonNull(pileConsumer, "pileConsumer"); + this.disposable = Objects.requireNonNull(disposable, "disposable"); + this.fileKey = Objects.requireNonNull(fileKey, "fileKey"); + this.responseObserver = Objects.requireNonNull(responseObserver, "responseObserver"); + } + + @Override + public void onNext(PLogPile pLogPile) { + if (logger.isTraceEnabled()) { + for (final PLogRecord record: pLogPile.getRecordsList()) { + logger.trace("log[{}-{}]: {}", pLogPile.getSeq(), record.getSeq(), record.getMessage()); + } + } + final List logs = pLogPile.getRecordsList().stream() + .map(el -> new Log(el.getSeq(), el.getMessage())) + .collect(Collectors.toUnmodifiableList()); + this.pileConsumer.accept(new LogPile(pLogPile.getSeq(), logs)); + } + + @Override + public void onError(Throwable throwable) { + logger.error("Error on log {}: {}", this.fileKey, throwable.getMessage()); + this.disposable.dispose(); + } + + @Override + public void onCompleted() { + logger.info("Completed on {}", this.fileKey); + this.disposable.dispose(); + this.responseObserver.onCompleted(); + } + + @Override + public void accept(LogDemand logDemand) { + try { + this.responseObserver.onNext(PLogDemand.newBuilder() + .setDurationMillis(logDemand.getDurationMillis()) + .build()); + } catch (Exception e) { + this.responseObserver.onError(e); + this.disposable.dispose(); + logger.error("Failed to send demand for {}", this.fileKey, e); + } + } + +} diff --git a/log/log-collector/src/main/java/com/navercorp/pinpoint/log/collector/grpc/LogGrpcService.java b/log/log-collector/src/main/java/com/navercorp/pinpoint/log/collector/grpc/LogGrpcService.java new file mode 100644 index 0000000000000..402ef3a269c3d --- /dev/null +++ b/log/log-collector/src/main/java/com/navercorp/pinpoint/log/collector/grpc/LogGrpcService.java @@ -0,0 +1,93 @@ +/* + * Copyright 2023 NAVER Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.navercorp.pinpoint.log.collector.grpc; + +import com.navercorp.pinpoint.grpc.log.LogGrpc; +import com.navercorp.pinpoint.grpc.log.PLogDemand; +import com.navercorp.pinpoint.grpc.log.PLogPile; +import com.navercorp.pinpoint.log.collector.grpc.context.LogAgentHeader; +import com.navercorp.pinpoint.log.collector.service.LogProviderService; +import com.navercorp.pinpoint.log.dto.LogDemand; +import com.navercorp.pinpoint.log.vo.FileKey; +import io.grpc.stub.ServerCallStreamObserver; +import io.grpc.stub.StreamObserver; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import reactor.core.Disposable; + +import java.util.Objects; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; + +/** + * @author youngjin.kim2 + */ +class LogGrpcService extends LogGrpc.LogImplBase { + + private final Logger logger = LogManager.getLogger(LogGrpcService.class); + + private final LogProviderService service; + + LogGrpcService(LogProviderService service) { + this.service = Objects.requireNonNull(service, "service"); + } + + @Override + public StreamObserver connect(StreamObserver responseObserver0) { + ServerCallStreamObserver responseObserver = + (ServerCallStreamObserver) responseObserver0; + try { + FileKey fileKey = getFileKey(); + AtomicReference disposableRef = new AtomicReference<>(); + Disposable disposable = this.service.getDemands(fileKey) + .subscribe(getDemandHandler(responseObserver, fileKey, disposableRef)); + disposableRef.set(disposable); + return new LogConnectionHandler( + pile -> this.service.provide(fileKey, pile), + disposable, + fileKey, + responseObserver + ); + } catch (Exception e) { + responseObserver.onError(e); + return EmptyStreamObserver.create(); + } + } + + private Consumer getDemandHandler( + ServerCallStreamObserver responseObserver, + FileKey fileKey, + AtomicReference disposableRef + ) { + return demand -> { + try { + responseObserver.onNext(PLogDemand.newBuilder() + .setDurationMillis(demand.getDurationMillis()) + .build()); + } catch (Exception e) { + responseObserver.onError(e); + disposableRef.get().dispose(); + logger.error("Failed to send demand for {}", fileKey, e); + } + }; + } + + private FileKey getFileKey() { + final LogAgentHeader header = LogAgentHeader.LOG_AGENT_HEADER_KEY.get(); + return header.getFileKey(); + } + +} diff --git a/log/log-collector/src/main/java/com/navercorp/pinpoint/log/collector/grpc/context/LogAgentHeader.java b/log/log-collector/src/main/java/com/navercorp/pinpoint/log/collector/grpc/context/LogAgentHeader.java new file mode 100644 index 0000000000000..1adedca3c7378 --- /dev/null +++ b/log/log-collector/src/main/java/com/navercorp/pinpoint/log/collector/grpc/context/LogAgentHeader.java @@ -0,0 +1,47 @@ +/* + * Copyright 2023 NAVER Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.navercorp.pinpoint.log.collector.grpc.context; + +import com.navercorp.pinpoint.log.vo.FileKey; +import io.grpc.Context; +import io.grpc.Metadata; + +/** + * @author youngjin.kim2 + */ +public class LogAgentHeader { + + public static Metadata.Key HOST_GROUP_NAME_KEY = keyOf("hostgroupname"); + public static Metadata.Key HOST_NAME_KEY = keyOf("hostname"); + public static Metadata.Key FILE_NAME_KEY = keyOf("filename"); + + public static final Context.Key LOG_AGENT_HEADER_KEY = Context.key("logagentheader"); + + private final FileKey fileKey; + + public LogAgentHeader(FileKey fileKey) { + this.fileKey = fileKey; + } + + public FileKey getFileKey() { + return fileKey; + } + + private static Metadata.Key keyOf(String name) { + return Metadata.Key.of(name, Metadata.ASCII_STRING_MARSHALLER); + } + +} diff --git a/log/log-collector/src/main/java/com/navercorp/pinpoint/log/collector/grpc/context/LogAgentHeaderReader.java b/log/log-collector/src/main/java/com/navercorp/pinpoint/log/collector/grpc/context/LogAgentHeaderReader.java new file mode 100644 index 0000000000000..072cd0417adfa --- /dev/null +++ b/log/log-collector/src/main/java/com/navercorp/pinpoint/log/collector/grpc/context/LogAgentHeaderReader.java @@ -0,0 +1,45 @@ +/* + * Copyright 2023 NAVER Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.navercorp.pinpoint.log.collector.grpc.context; + +import com.navercorp.pinpoint.grpc.HeaderReader; +import com.navercorp.pinpoint.log.vo.FileKey; +import io.grpc.Metadata; +import io.grpc.Status; + +/** + * @author youngjin.kim2 + */ +public class LogAgentHeaderReader implements HeaderReader { + + @Override + public LogAgentHeader extract(Metadata headers) { + final String hostGroupName = getString(headers, LogAgentHeader.HOST_GROUP_NAME_KEY); + final String hostName = getString(headers, LogAgentHeader.HOST_NAME_KEY); + final String fileName = getString(headers, LogAgentHeader.FILE_NAME_KEY); + final FileKey fileKey = FileKey.of(hostGroupName, hostName, fileName); + return new LogAgentHeader(fileKey); + } + + private static String getString(Metadata headers, Metadata.Key key) { + final String value = headers.get(key); + if (value == null) { + throw Status.INVALID_ARGUMENT.withDescription(key.name() + " header is missing").asRuntimeException(); + } + return value; + } + +} diff --git a/log/log-collector/src/main/java/com/navercorp/pinpoint/log/collector/grpc/context/LogHeaderPropagationInterceptor.java b/log/log-collector/src/main/java/com/navercorp/pinpoint/log/collector/grpc/context/LogHeaderPropagationInterceptor.java new file mode 100644 index 0000000000000..8186e0dba30e8 --- /dev/null +++ b/log/log-collector/src/main/java/com/navercorp/pinpoint/log/collector/grpc/context/LogHeaderPropagationInterceptor.java @@ -0,0 +1,74 @@ +/* + * Copyright 2019 NAVER Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.navercorp.pinpoint.log.collector.grpc.context; + +import com.navercorp.pinpoint.grpc.HeaderReader; +import io.grpc.Context; +import io.grpc.Contexts; +import io.grpc.Metadata; +import io.grpc.ServerCall; +import io.grpc.ServerCallHandler; +import io.grpc.ServerInterceptor; +import io.grpc.Status; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.Objects; + +/** + * @author Woonduk Kang(emeroad) + */ +public class LogHeaderPropagationInterceptor implements ServerInterceptor { + private final Logger logger = LogManager.getLogger(this.getClass()); + + private final HeaderReader headerReader; + private final Context.Key contextKey; + + public LogHeaderPropagationInterceptor(HeaderReader headerReader) { + this(headerReader, LogAgentHeader.LOG_AGENT_HEADER_KEY); + } + + public LogHeaderPropagationInterceptor(HeaderReader headerReader, Context.Key contextKey) { + this.headerReader = Objects.requireNonNull(headerReader, "headerReader"); + this.contextKey = Objects.requireNonNull(contextKey, "contextKey"); + } + + @Override + public ServerCall.Listener interceptCall(final ServerCall call, Metadata headers, ServerCallHandler next) { + LogAgentHeader headerObject; + try { + headerObject = headerReader.extract(headers); + } catch (Exception e) { + if (logger.isInfoEnabled()) { + logger.info("Header extract fail cause={}, method={} headers={}, attr={}", + e.getMessage(), call.getMethodDescriptor().getFullMethodName(), headers, call.getAttributes(), e); + } + call.close(Status.INVALID_ARGUMENT.withDescription(e.getMessage()), new Metadata()); + return new ServerCall.Listener<>() { + }; + } + + final Context currentContext = Context.current(); + final Context newContext = currentContext.withValue(contextKey, headerObject); + if (logger.isDebugEnabled()) { + logger.debug("headerPropagation method={}, headers={}, attr={}", call.getMethodDescriptor().getFullMethodName(), headers, call.getAttributes()); + } + + return Contexts.interceptCall(newContext, call, headers, next); + } + +} \ No newline at end of file diff --git a/log/log-collector/src/main/java/com/navercorp/pinpoint/log/collector/redis/LogCollectorRedisServerConfig.java b/log/log-collector/src/main/java/com/navercorp/pinpoint/log/collector/redis/LogCollectorRedisServerConfig.java new file mode 100644 index 0000000000000..12feeb634dc78 --- /dev/null +++ b/log/log-collector/src/main/java/com/navercorp/pinpoint/log/collector/redis/LogCollectorRedisServerConfig.java @@ -0,0 +1,81 @@ +/* + * Copyright 2023 NAVER Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.navercorp.pinpoint.log.collector.redis; + +import com.navercorp.pinpoint.channel.ChannelProviderRepository; +import com.navercorp.pinpoint.channel.ChannelSpringConfig; +import com.navercorp.pinpoint.channel.redis.kv.RedisKVChannelConfig; +import com.navercorp.pinpoint.channel.redis.pubsub.RedisPubSubConfig; +import com.navercorp.pinpoint.channel.service.FluxChannelServiceProtocol; +import com.navercorp.pinpoint.channel.service.server.ChannelServiceServer; +import com.navercorp.pinpoint.log.LogServiceProtocolConfig; +import com.navercorp.pinpoint.log.collector.service.LogConsumerService; +import com.navercorp.pinpoint.log.collector.service.LogServiceConfig; +import com.navercorp.pinpoint.log.vo.FileKey; +import com.navercorp.pinpoint.log.vo.LogPile; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Import; + +import java.time.Duration; +import java.util.concurrent.Executors; + +/** + * @author youngjin.kim2 + */ +@Configuration +@Import({ + LogServiceProtocolConfig.class, + LogServiceConfig.class, + ChannelSpringConfig.class, + RedisPubSubConfig.class, + RedisKVChannelConfig.class, +}) +public class LogCollectorRedisServerConfig { + + @Value("${pinpoint.log.collector.broadcast-connection-period-millis:10000}") + private long broadcastConnectionPeriodMillis; + + @Bean + ChannelServiceServer logPubSubServer( + ChannelProviderRepository channelProviderRepository, + FluxChannelServiceProtocol protocol, + LogConsumerService logConsumerService + ) { + Duration duration = protocol.getDemandInterval().plus(Duration.ofSeconds(2)); + return ChannelServiceServer.buildFlux( + channelProviderRepository, + protocol, + fileKey -> logConsumerService.tail(fileKey, duration) + ); + } + + @Bean + LogFileListServer logConnectionBroadcastingService( + LogConsumerService service, + ChannelProviderRepository channelProviderRepository + ) { + final Duration broadcastConnectionPeriod = Duration.ofMillis(broadcastConnectionPeriodMillis); + return new LogFileListServer( + Executors.newSingleThreadScheduledExecutor(r -> new Thread(r, "LogConnection-Broadcaster")), + service, + channelProviderRepository, + broadcastConnectionPeriod + ); + } + +} diff --git a/log/log-collector/src/main/java/com/navercorp/pinpoint/log/collector/redis/LogFileListServer.java b/log/log-collector/src/main/java/com/navercorp/pinpoint/log/collector/redis/LogFileListServer.java new file mode 100644 index 0000000000000..fba30c205c110 --- /dev/null +++ b/log/log-collector/src/main/java/com/navercorp/pinpoint/log/collector/redis/LogFileListServer.java @@ -0,0 +1,109 @@ +/* + * Copyright 2023 NAVER Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.navercorp.pinpoint.log.collector.redis; + +import com.navercorp.pinpoint.channel.ChannelProviderRepository; +import com.navercorp.pinpoint.channel.PubChannel; +import com.navercorp.pinpoint.common.util.StringUtils; +import com.navercorp.pinpoint.log.collector.service.LogConsumerService; +import com.navercorp.pinpoint.log.vo.FileKey; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.springframework.beans.factory.InitializingBean; + +import java.net.InetAddress; +import java.net.URI; +import java.net.UnknownHostException; +import java.time.Duration; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +/** + * @author youngjin.kim2 + */ +class LogFileListServer implements InitializingBean { + + private final String HOSTNAME = getHostname(); + + private final Logger logger = LogManager.getLogger(LogFileListServer.class); + private final ScheduledExecutorService broadcastExecutor; + private final LogConsumerService service; + private final Duration connectionBroadcastingPeriod; + private final ChannelProviderRepository channelProviderRepository; + + private PubChannel pubChannel; + + LogFileListServer( + ScheduledExecutorService broadcastExecutor, + LogConsumerService service, + ChannelProviderRepository channelProviderRepository, + Duration connectionBroadcastingPeriod + ) { + this.broadcastExecutor = Objects.requireNonNull(broadcastExecutor, "broadcastExecutor"); + this.service = Objects.requireNonNull(service, "service"); + this.channelProviderRepository = Objects.requireNonNull(channelProviderRepository, "channelProviderRepository"); + this.connectionBroadcastingPeriod = + Objects.requireNonNullElse(connectionBroadcastingPeriod, Duration.ofSeconds(10)); + } + + @Override + public void afterPropertiesSet() { + if (StringUtils.isEmpty(HOSTNAME)) { + throw new RuntimeException("Failed to initialize LogFileListServer: invalid hostname"); + } + this.pubChannel = channelProviderRepository.getPubChannel(URI.create("kv:PT20S:log:files:" + HOSTNAME)); + scheduleNextBroadcasting(); + } + + private void scheduleNextBroadcasting() { + this.broadcastExecutor.schedule( + this::broadcastLogFiles, + this.connectionBroadcastingPeriod.toNanos(), + TimeUnit.NANOSECONDS + ); + } + + private void broadcastLogFiles() { + try { + broadcastLogFiles0(); + } catch (Exception e) { + logger.error("Failed to broadcast log files", e); + } finally { + scheduleNextBroadcasting(); + } + } + + private void broadcastLogFiles0() { + StringBuilder b = new StringBuilder(); + List keys = this.service.getFileKeys(); + for (FileKey key: keys) { + b.append(key).append("\r\n"); + } + this.pubChannel.publish(b.toString().getBytes()); + logger.info("Broadcast {} log files", keys.size()); + } + + private static String getHostname() { + try { + return InetAddress.getLocalHost().getHostName(); + } catch (UnknownHostException e) { + return ""; + } + } + +} diff --git a/log/log-collector/src/main/java/com/navercorp/pinpoint/log/collector/repository/LogAcceptorRepository.java b/log/log-collector/src/main/java/com/navercorp/pinpoint/log/collector/repository/LogAcceptorRepository.java new file mode 100644 index 0000000000000..41a90ea2b7bac --- /dev/null +++ b/log/log-collector/src/main/java/com/navercorp/pinpoint/log/collector/repository/LogAcceptorRepository.java @@ -0,0 +1,51 @@ +/* + * Copyright 2023 NAVER Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.navercorp.pinpoint.log.collector.repository; + +import com.google.common.collect.Multimaps; +import com.google.common.collect.SetMultimap; +import com.navercorp.pinpoint.log.vo.FileKey; + +import java.util.HashMap; +import java.util.LinkedHashSet; +import java.util.Set; + +/** + * @author youngjin.kim2 + */ +public class LogAcceptorRepository { + + private final SetMultimap acceptors = Multimaps.synchronizedSetMultimap( + Multimaps.newSetMultimap(new HashMap<>(1024), () -> new LinkedHashSet<>(2)) + ); + + public Set getAcceptableKeys() { + return this.acceptors.keySet(); + } + + public Set getAcceptors(FileKey key) { + return this.acceptors.get(key); + } + + public void addAcceptor(FileKey key, LogDemandAcceptor acceptor) { + this.acceptors.put(key, acceptor); + } + + public void removeAcceptor(FileKey key, LogDemandAcceptor acceptor) { + this.acceptors.remove(key, acceptor); + } + +} diff --git a/log/log-collector/src/main/java/com/navercorp/pinpoint/log/collector/repository/LogConsumer.java b/log/log-collector/src/main/java/com/navercorp/pinpoint/log/collector/repository/LogConsumer.java new file mode 100644 index 0000000000000..f56e1460a565a --- /dev/null +++ b/log/log-collector/src/main/java/com/navercorp/pinpoint/log/collector/repository/LogConsumer.java @@ -0,0 +1,27 @@ +/* + * Copyright 2023 NAVER Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.navercorp.pinpoint.log.collector.repository; + +import com.navercorp.pinpoint.log.vo.LogPile; + +/** + * @author youngjin.kim2 + */ +public interface LogConsumer { + + void consume(LogPile pile); + +} diff --git a/log/log-collector/src/main/java/com/navercorp/pinpoint/log/collector/repository/LogConsumerRepository.java b/log/log-collector/src/main/java/com/navercorp/pinpoint/log/collector/repository/LogConsumerRepository.java new file mode 100644 index 0000000000000..824ceee9cd049 --- /dev/null +++ b/log/log-collector/src/main/java/com/navercorp/pinpoint/log/collector/repository/LogConsumerRepository.java @@ -0,0 +1,55 @@ +/* + * Copyright 2023 NAVER Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.navercorp.pinpoint.log.collector.repository; + +import com.google.common.collect.Multimaps; +import com.google.common.collect.SetMultimap; +import com.navercorp.pinpoint.log.vo.FileKey; + +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedHashSet; + +/** + * @author youngjin.kim2 + */ +public class LogConsumerRepository { + + private final SetMultimap consumers = Multimaps.synchronizedSetMultimap( + Multimaps.newSetMultimap(new HashMap<>(32), () -> new LinkedHashSet<>(2)) + ); + + public LogConsumer getConsumer(FileKey key) { + return getLast(this.consumers.get(key).iterator()); + } + + public void addConsumer(FileKey key, LogConsumer consumer) { + this.consumers.put(key, consumer); + } + + public void removeConsumer(FileKey key, LogConsumer consumer) { + this.consumers.remove(key, consumer); + } + + private static T getLast(Iterator iterator) { + T item = null; + while (iterator.hasNext()) { + item = iterator.next(); + } + return item; + } + +} diff --git a/log/log-collector/src/main/java/com/navercorp/pinpoint/log/collector/repository/LogDemandAcceptor.java b/log/log-collector/src/main/java/com/navercorp/pinpoint/log/collector/repository/LogDemandAcceptor.java new file mode 100644 index 0000000000000..d10c9d84644b2 --- /dev/null +++ b/log/log-collector/src/main/java/com/navercorp/pinpoint/log/collector/repository/LogDemandAcceptor.java @@ -0,0 +1,27 @@ +/* + * Copyright 2023 NAVER Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.navercorp.pinpoint.log.collector.repository; + +import com.navercorp.pinpoint.log.dto.LogDemand; + +/** + * @author youngjin.kim2 + */ +public interface LogDemandAcceptor { + + void accept(LogDemand demand); + +} diff --git a/log/log-collector/src/main/java/com/navercorp/pinpoint/log/collector/service/LogConsumerService.java b/log/log-collector/src/main/java/com/navercorp/pinpoint/log/collector/service/LogConsumerService.java new file mode 100644 index 0000000000000..41ead6f32a83c --- /dev/null +++ b/log/log-collector/src/main/java/com/navercorp/pinpoint/log/collector/service/LogConsumerService.java @@ -0,0 +1,34 @@ +/* + * Copyright 2023 NAVER Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.navercorp.pinpoint.log.collector.service; + +import com.navercorp.pinpoint.log.vo.FileKey; +import com.navercorp.pinpoint.log.vo.LogPile; +import reactor.core.publisher.Flux; + +import java.time.Duration; +import java.util.List; + +/** + * @author youngjin.kim2 + */ +public interface LogConsumerService { + + Flux tail(FileKey demand, Duration duration); + + List getFileKeys(); + +} diff --git a/log/log-collector/src/main/java/com/navercorp/pinpoint/log/collector/service/LogConsumerServiceImpl.java b/log/log-collector/src/main/java/com/navercorp/pinpoint/log/collector/service/LogConsumerServiceImpl.java new file mode 100644 index 0000000000000..93ca98bd6df54 --- /dev/null +++ b/log/log-collector/src/main/java/com/navercorp/pinpoint/log/collector/service/LogConsumerServiceImpl.java @@ -0,0 +1,86 @@ +/* + * Copyright 2023 NAVER Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.navercorp.pinpoint.log.collector.service; + +import com.navercorp.pinpoint.log.collector.repository.LogAcceptorRepository; +import com.navercorp.pinpoint.log.collector.repository.LogConsumer; +import com.navercorp.pinpoint.log.collector.repository.LogConsumerRepository; +import com.navercorp.pinpoint.log.collector.repository.LogDemandAcceptor; +import com.navercorp.pinpoint.log.dto.LogDemand; +import com.navercorp.pinpoint.log.vo.FileKey; +import com.navercorp.pinpoint.log.vo.LogPile; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Sinks; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +/** + * @author youngjin.kim2 + */ +class LogConsumerServiceImpl implements LogConsumerService { + + private final ScheduledExecutorService scheduler; + + private final LogAcceptorRepository acceptorRepository; + private final LogConsumerRepository consumerRepository; + + LogConsumerServiceImpl( + ScheduledExecutorService scheduler, + LogAcceptorRepository acceptorRepository, + LogConsumerRepository consumerRepository + ) { + this.acceptorRepository = Objects.requireNonNull(acceptorRepository, "acceptorRepository"); + this.consumerRepository = Objects.requireNonNull(consumerRepository, "consumerRepository"); + this.scheduler = Objects.requireNonNull(scheduler, "scheduler"); + } + + @Override + public Flux tail(FileKey key, Duration duration) { + Flux piles = listen(key, duration); + request(key, duration); + return piles; + } + + @Override + public List getFileKeys() { + return new ArrayList<>(this.acceptorRepository.getAcceptableKeys()); + } + + private Flux listen(FileKey key, Duration duration) { + Sinks.Many sink = Sinks.many().replay().all(); + LogConsumer logConsumer = pile -> sink.emitNext(pile, Sinks.EmitFailureHandler.FAIL_FAST); + this.consumerRepository.addConsumer(key, logConsumer); + schedule(() -> this.consumerRepository.removeConsumer(key, logConsumer), duration); + return sink.asFlux(); + } + + private void schedule(Runnable r, Duration delay) { + this.scheduler.schedule(r, delay.toNanos(), TimeUnit.NANOSECONDS); + } + + private void request(FileKey key, Duration duration) { + LogDemand demand = new LogDemand(key, duration.toMillis()); + for (LogDemandAcceptor acceptor: this.acceptorRepository.getAcceptors(key)) { + acceptor.accept(demand); + } + } + +} diff --git a/log/log-collector/src/main/java/com/navercorp/pinpoint/log/collector/service/LogProviderService.java b/log/log-collector/src/main/java/com/navercorp/pinpoint/log/collector/service/LogProviderService.java new file mode 100644 index 0000000000000..ab42877f65a17 --- /dev/null +++ b/log/log-collector/src/main/java/com/navercorp/pinpoint/log/collector/service/LogProviderService.java @@ -0,0 +1,32 @@ +/* + * Copyright 2023 NAVER Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.navercorp.pinpoint.log.collector.service; + +import com.navercorp.pinpoint.log.dto.LogDemand; +import com.navercorp.pinpoint.log.vo.FileKey; +import com.navercorp.pinpoint.log.vo.LogPile; +import reactor.core.publisher.Flux; + +/** + * @author youngjin.kim2 + */ +public interface LogProviderService { + + void provide(FileKey fileKey, LogPile pile); + + Flux getDemands(FileKey fileKey); + +} diff --git a/log/log-collector/src/main/java/com/navercorp/pinpoint/log/collector/service/LogProviderServiceImpl.java b/log/log-collector/src/main/java/com/navercorp/pinpoint/log/collector/service/LogProviderServiceImpl.java new file mode 100644 index 0000000000000..97f16cec81356 --- /dev/null +++ b/log/log-collector/src/main/java/com/navercorp/pinpoint/log/collector/service/LogProviderServiceImpl.java @@ -0,0 +1,67 @@ +/* + * Copyright 2023 NAVER Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.navercorp.pinpoint.log.collector.service; + +import com.navercorp.pinpoint.log.collector.repository.LogAcceptorRepository; +import com.navercorp.pinpoint.log.collector.repository.LogConsumer; +import com.navercorp.pinpoint.log.collector.repository.LogConsumerRepository; +import com.navercorp.pinpoint.log.collector.repository.LogDemandAcceptor; +import com.navercorp.pinpoint.log.dto.LogDemand; +import com.navercorp.pinpoint.log.vo.FileKey; +import com.navercorp.pinpoint.log.vo.LogPile; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Sinks; + +import java.util.Objects; + +/** + * @author youngjin.kim2 + */ +class LogProviderServiceImpl implements LogProviderService { + + private final LogAcceptorRepository acceptorRepository; + private final LogConsumerRepository consumerRepository; + + LogProviderServiceImpl(LogAcceptorRepository acceptorRepository, LogConsumerRepository consumerRepository) { + this.acceptorRepository = Objects.requireNonNull(acceptorRepository, "acceptorRepository"); + this.consumerRepository = Objects.requireNonNull(consumerRepository, "consumerRepository"); + } + + @Override + public void provide(FileKey fileKey, LogPile pile) { + LogConsumer consumer = this.consumerRepository.getConsumer(fileKey); + if (consumer != null) { + consumer.consume(pile); + } + } + + @Override + public Flux getDemands(FileKey fileKey) { + Sinks.Many sink = Sinks.many().replay().all(); + LogDemandAcceptor acceptor = demand -> sink.emitNext(demand, Sinks.EmitFailureHandler.FAIL_FAST); + registerDemandAcceptor(fileKey, acceptor); + return sink.asFlux().doFinally(t -> unregisterDemandAcceptor(fileKey, acceptor)); + } + + private void registerDemandAcceptor(FileKey fileKey, LogDemandAcceptor acceptor) { + this.acceptorRepository.addAcceptor(fileKey, acceptor); + } + + private void unregisterDemandAcceptor(FileKey fileKey, LogDemandAcceptor acceptor) { + this.acceptorRepository.removeAcceptor(fileKey, acceptor); + } + +} diff --git a/log/log-collector/src/main/java/com/navercorp/pinpoint/log/collector/service/LogServiceConfig.java b/log/log-collector/src/main/java/com/navercorp/pinpoint/log/collector/service/LogServiceConfig.java new file mode 100644 index 0000000000000..3471890e3f3a0 --- /dev/null +++ b/log/log-collector/src/main/java/com/navercorp/pinpoint/log/collector/service/LogServiceConfig.java @@ -0,0 +1,69 @@ +/* + * Copyright 2023 NAVER Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.navercorp.pinpoint.log.collector.service; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import com.navercorp.pinpoint.log.collector.repository.LogAcceptorRepository; +import com.navercorp.pinpoint.log.collector.repository.LogConsumerRepository; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; + +/** + * @author youngjin.kim2 + */ +@Configuration +public class LogServiceConfig { + + @Bean("logConsumerExpiringScheduler") + ScheduledExecutorService logConsumerExpiringScheduler() { + return Executors.newScheduledThreadPool( + Runtime.getRuntime().availableProcessors(), + new ThreadFactoryBuilder().setNameFormat("log-dao-scheduler-%d").build() + ); + } + + @Bean + LogAcceptorRepository logAcceptorRepository() { + return new LogAcceptorRepository(); + } + + @Bean + LogConsumerRepository logConsumerRepository() { + return new LogConsumerRepository(); + } + + @Bean + LogConsumerService logConsumerService( + @Qualifier("logConsumerExpiringScheduler") ScheduledExecutorService scheduler, + LogAcceptorRepository acceptorRepository, + LogConsumerRepository consumerRepository + ) { + return new LogConsumerServiceImpl(scheduler, acceptorRepository, consumerRepository); + } + + @Bean + LogProviderService logProviderService( + LogAcceptorRepository acceptorRepository, + LogConsumerRepository consumerRepository + ) { + return new LogProviderServiceImpl(acceptorRepository, consumerRepository); + } + +} diff --git a/log/log-collector/src/main/resources/log/pinpoint-collector-log-grpc-root.properties b/log/log-collector/src/main/resources/log/pinpoint-collector-log-grpc-root.properties new file mode 100644 index 0000000000000..061264cfb2091 --- /dev/null +++ b/log/log-collector/src/main/resources/log/pinpoint-collector-log-grpc-root.properties @@ -0,0 +1,16 @@ +# gRPC +# Log +# Server Option +collector.receiver.grpc.log.keepalive_time_millis=30000 +collector.receiver.grpc.log.keepalive_timeout_millis=60000 +collector.receiver.grpc.log.permit_keepalive_time_millis=20000 +collector.receiver.grpc.log.connection_idle_timeout_millis=360000 +collector.receiver.grpc.log.concurrent-calls_per-connection_max=128 +collector.receiver.grpc.log.handshake_timeout_millis=120000 +collector.receiver.grpc.log.flow-control_window_size_init=1MB +collector.receiver.grpc.log.header_list_size_max=1KB +collector.receiver.grpc.log.inbound_message_size_max=4MB +collector.receiver.grpc.log.receive_buffer_size=64KB +collector.receiver.grpc.log.grpc_max_term_wait_time_millis=3000 +## AUTO, NIO, EPOLL +collector.receiver.grpc.log.channel-type=AUTO diff --git a/log/log-collector/src/main/resources/log/pinpoint-collector-log-root.properties b/log/log-collector/src/main/resources/log/pinpoint-collector-log-root.properties new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/log/log-collector/src/main/resources/log/profiles/local/pinpoint-collector-log-grpc.properties b/log/log-collector/src/main/resources/log/profiles/local/pinpoint-collector-log-grpc.properties new file mode 100644 index 0000000000000..040c6b2a3d150 --- /dev/null +++ b/log/log-collector/src/main/resources/log/profiles/local/pinpoint-collector-log-grpc.properties @@ -0,0 +1,34 @@ +# gRPC +# Log +collector.receiver.grpc.log.enable=true +collector.receiver.grpc.log.bindaddress.ip=0.0.0.0 +collector.receiver.grpc.log.bindaddress.port=15600 +# Executor of Server +collector.receiver.grpc.log.server.executor.thread_size=8 +collector.receiver.grpc.log.server.executor.queue_size=256 +collector.receiver.grpc.log.server.executor.monitor_enable=false +# Call Executor of Server +collector.receiver.grpc.log.server-call.executor.thread_size=8 +collector.receiver.grpc.log.server-call.executor.queue_size=256 +collector.receiver.grpc.log.server-call.executor.monitor_enable=true +# Executor of Worker +collector.receiver.grpc.log.worker.executor.thread_size=16 +collector.receiver.grpc.log.worker.executor.queue_size=1024 +collector.receiver.grpc.log.worker.executor.monitor_enable=true + + +### For ssl config +collector.receiver.grpc.ssl.enable=false +# please choose openssl/jdk +collector.receiver.grpc.ssl.provider_type=jdk +# please insert .pem file path +# (prefix for claspath = claspath:, prefix for absoulute path = file:) +collector.receiver.grpc.ssl.key_file_path= +# please insert .crt file path +# (prefix for claspath = claspath:, prefix for absoulute path = file:) +collector.receiver.grpc.ssl.key_cert_file_path= + +# Agent +collector.receiver.grpc.log.ssl.enable=false +collector.receiver.grpc.log.ssl.bindaddress.ip=0.0.0.0 +collector.receiver.grpc.log.ssl.bindaddress.port=16600 diff --git a/log/log-collector/src/main/resources/log/profiles/release/pinpoint-collector-log-grpc.properties b/log/log-collector/src/main/resources/log/profiles/release/pinpoint-collector-log-grpc.properties new file mode 100644 index 0000000000000..040c6b2a3d150 --- /dev/null +++ b/log/log-collector/src/main/resources/log/profiles/release/pinpoint-collector-log-grpc.properties @@ -0,0 +1,34 @@ +# gRPC +# Log +collector.receiver.grpc.log.enable=true +collector.receiver.grpc.log.bindaddress.ip=0.0.0.0 +collector.receiver.grpc.log.bindaddress.port=15600 +# Executor of Server +collector.receiver.grpc.log.server.executor.thread_size=8 +collector.receiver.grpc.log.server.executor.queue_size=256 +collector.receiver.grpc.log.server.executor.monitor_enable=false +# Call Executor of Server +collector.receiver.grpc.log.server-call.executor.thread_size=8 +collector.receiver.grpc.log.server-call.executor.queue_size=256 +collector.receiver.grpc.log.server-call.executor.monitor_enable=true +# Executor of Worker +collector.receiver.grpc.log.worker.executor.thread_size=16 +collector.receiver.grpc.log.worker.executor.queue_size=1024 +collector.receiver.grpc.log.worker.executor.monitor_enable=true + + +### For ssl config +collector.receiver.grpc.ssl.enable=false +# please choose openssl/jdk +collector.receiver.grpc.ssl.provider_type=jdk +# please insert .pem file path +# (prefix for claspath = claspath:, prefix for absoulute path = file:) +collector.receiver.grpc.ssl.key_file_path= +# please insert .crt file path +# (prefix for claspath = claspath:, prefix for absoulute path = file:) +collector.receiver.grpc.ssl.key_cert_file_path= + +# Agent +collector.receiver.grpc.log.ssl.enable=false +collector.receiver.grpc.log.ssl.bindaddress.ip=0.0.0.0 +collector.receiver.grpc.log.ssl.bindaddress.port=16600 diff --git a/log/log-collector/src/test/java/com/navercorp/pinpoint/log/collector/repository/LogAcceptorRepositoryTest.java b/log/log-collector/src/test/java/com/navercorp/pinpoint/log/collector/repository/LogAcceptorRepositoryTest.java new file mode 100644 index 0000000000000..8eba27f00c588 --- /dev/null +++ b/log/log-collector/src/test/java/com/navercorp/pinpoint/log/collector/repository/LogAcceptorRepositoryTest.java @@ -0,0 +1,60 @@ +/* + * Copyright 2023 NAVER Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.navercorp.pinpoint.log.collector.repository; + +import com.navercorp.pinpoint.log.vo.FileKey; +import org.junit.jupiter.api.Test; + +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * @author youngjin.kim2 + */ +public class LogAcceptorRepositoryTest { + + @Test + public void testBasicScenario() throws Exception { + LogDemandAcceptor acceptor1 = System.out::println; + LogDemandAcceptor acceptor2 = System.out::print; + + LogAcceptorRepository repo = new LogAcceptorRepository(); + repo.addAcceptor(FileKey.parse("hostGroup-1:host1.1:file-1.1.1"), acceptor1); + repo.addAcceptor(FileKey.parse("hostGroup-1:host1.1:file-1.1.1"), acceptor2); + repo.addAcceptor(FileKey.parse("hostGroup-1:host1.1:file-1.1.2"), acceptor1); + + assertThat(repo.getAcceptors(FileKey.parse("hostGroup-1:host1.1:file-1.1.1"))) + .withFailMessage("should return 2 items") + .hasSameElementsAs(List.of(acceptor1, acceptor2)); + + assertThat(repo.getAcceptableKeys()) + .withFailMessage("should return 2 keys") + .hasSameElementsAs(List.of( + FileKey.parse("hostGroup-1:host1.1:file-1.1.1"), + FileKey.parse("hostGroup-1:host1.1:file-1.1.2") + )); + + repo.removeAcceptor(FileKey.parse("hostGroup-1:host1.1:file-1.1.2"), acceptor1); + + assertThat(repo.getAcceptableKeys()) + .withFailMessage("file-1.1.2 item item should have been removed") + .hasSameElementsAs(List.of( + FileKey.parse("hostGroup-1:host1.1:file-1.1.1") + )); + } + +} diff --git a/log/log-collector/src/test/java/com/navercorp/pinpoint/log/collector/repository/LogConsumerRepositoryTest.java b/log/log-collector/src/test/java/com/navercorp/pinpoint/log/collector/repository/LogConsumerRepositoryTest.java new file mode 100644 index 0000000000000..4f2b5ca009fa2 --- /dev/null +++ b/log/log-collector/src/test/java/com/navercorp/pinpoint/log/collector/repository/LogConsumerRepositoryTest.java @@ -0,0 +1,56 @@ +/* + * Copyright 2023 NAVER Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.navercorp.pinpoint.log.collector.repository; + +import com.navercorp.pinpoint.log.vo.FileKey; +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * @author youngjin.kim2 + */ +public class LogConsumerRepositoryTest { + + @Test + public void testBasicScenario() throws Exception { + LogConsumer consumer1 = System.out::println; + LogConsumer consumer2 = System.out::print; + + LogConsumerRepository repo = new LogConsumerRepository(); + repo.addConsumer(FileKey.parse("hostGroup-1:host1.1:file-1.1.1"), consumer1); + repo.addConsumer(FileKey.parse("hostGroup-1:host1.1:file-1.1.1"), consumer2); + repo.addConsumer(FileKey.parse("hostGroup-1:host1.1:file-1.1.2"), consumer1); + + assertThat(repo.getConsumer(FileKey.parse("hostGroup-1:host1.1:file-1.1.1"))) + .withFailMessage("should return last item") + .isEqualTo(consumer2); + assertThat(repo.getConsumer(FileKey.parse("hostGroup-1:host1.1:file-1.1.2"))) + .withFailMessage("should return last item") + .isEqualTo(consumer1); + + repo.removeConsumer(FileKey.parse("hostGroup-1:host1.1:file-1.1.1"), consumer2); + repo.removeConsumer(FileKey.parse("hostGroup-1:host1.1:file-1.1.2"), consumer1); + + assertThat(repo.getConsumer(FileKey.parse("hostGroup-1:host1.1:file-1.1.1"))) + .withFailMessage("should return last item") + .isEqualTo(consumer1); + assertThat(repo.getConsumer(FileKey.parse("hostGroup-1:host1.1:file-1.1.2"))) + .withFailMessage("should return last item") + .isNull(); + } + +} diff --git a/log/log-collector/src/test/java/com/navercorp/pinpoint/log/collector/service/LogServiceTest.java b/log/log-collector/src/test/java/com/navercorp/pinpoint/log/collector/service/LogServiceTest.java new file mode 100644 index 0000000000000..9322d849093f1 --- /dev/null +++ b/log/log-collector/src/test/java/com/navercorp/pinpoint/log/collector/service/LogServiceTest.java @@ -0,0 +1,75 @@ +/* + * Copyright 2023 NAVER Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.navercorp.pinpoint.log.collector.service; + +import com.navercorp.pinpoint.log.collector.repository.LogAcceptorRepository; +import com.navercorp.pinpoint.log.collector.repository.LogConsumerRepository; +import com.navercorp.pinpoint.log.vo.FileKey; +import com.navercorp.pinpoint.log.vo.Log; +import com.navercorp.pinpoint.log.vo.LogPile; +import org.junit.jupiter.api.Test; +import reactor.core.Disposable; + +import java.time.Duration; +import java.util.List; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * @author youngjin.kim2 + */ +public class LogServiceTest { + + @Test + public void test() throws Exception { + LogAcceptorRepository acceptorRepository = new LogAcceptorRepository(); + LogConsumerRepository consumerRepository = new LogConsumerRepository(); + ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); + + LogConsumerService consumerService = + new LogConsumerServiceImpl(scheduler, acceptorRepository, consumerRepository); + LogProviderService providerService = + new LogProviderServiceImpl(acceptorRepository, consumerRepository); + + FileKey fileKey = FileKey.parse("hostGroup-1:host-1:file-1"); + Log log1 = new Log(0, "log1"); + Log log2 = new Log(1, "log2"); + LogPile pile1 = new LogPile(0, List.of(log1, log2)); + + Disposable providerDisposable = providerService.getDemands(fileKey).subscribe(demand -> { + providerService.provide(fileKey, pile1); + }); + + assertThat(consumerService.getFileKeys()).hasSameElementsAs(List.of(fileKey)); + assertThat(consumerService.tail(fileKey, Duration.ofSeconds(99999)) + .take(Duration.ofMillis(100)) + .collectList() + .block() + ).hasSize(1).hasSameElementsAs(List.of(pile1)); + + providerDisposable.dispose(); + + assertThat(consumerService.getFileKeys()).isNotNull().isEmpty(); + assertThat(consumerService.tail(fileKey, Duration.ofMillis(1)) + .take(Duration.ofMillis(100)) + .collectList() + .block() + ).isNotNull().isEmpty(); + } + +} diff --git a/log/log-common/pom.xml b/log/log-common/pom.xml new file mode 100644 index 0000000000000..b8c33dadd0312 --- /dev/null +++ b/log/log-common/pom.xml @@ -0,0 +1,35 @@ + + + + pinpoint-log-module + com.navercorp.pinpoint + 2.6.0-SNAPSHOT + + 4.0.0 + + pinpoint-log-common + + + 11 + ${env.JAVA_11_HOME} + + + + + com.navercorp.pinpoint + pinpoint-redis + + + com.navercorp.pinpoint + pinpoint-commons-server + + + com.fasterxml.jackson.core + jackson-annotations + compile + + + + \ No newline at end of file diff --git a/log/log-common/src/main/java/com/navercorp/pinpoint/log/LogServiceProtocolConfig.java b/log/log-common/src/main/java/com/navercorp/pinpoint/log/LogServiceProtocolConfig.java new file mode 100644 index 0000000000000..f289e3c675daf --- /dev/null +++ b/log/log-common/src/main/java/com/navercorp/pinpoint/log/LogServiceProtocolConfig.java @@ -0,0 +1,57 @@ +/* + * Copyright 2023 NAVER Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.navercorp.pinpoint.log; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.navercorp.pinpoint.channel.ChannelSpringConfig; +import com.navercorp.pinpoint.channel.redis.pubsub.RedisPubSubConfig; +import com.navercorp.pinpoint.channel.serde.JacksonSerde; +import com.navercorp.pinpoint.channel.service.ChannelServiceProtocol; +import com.navercorp.pinpoint.channel.service.FluxChannelServiceProtocol; +import com.navercorp.pinpoint.channel.service.client.ChannelState; +import com.navercorp.pinpoint.log.vo.FileKey; +import com.navercorp.pinpoint.log.vo.LogPile; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Import; +import reactor.core.publisher.Sinks; + +import java.net.URI; +import java.time.Duration; + +/** + * @author youngjin.kim2 + */ +@Configuration(proxyBeanMethods = false) +@Import({ RedisPubSubConfig.class, ChannelSpringConfig.class }) +public class LogServiceProtocolConfig { + + @Bean + FluxChannelServiceProtocol logProtocol(ObjectMapper objectMapper) { + return ChannelServiceProtocol.builder() + .setDemandSerde(JacksonSerde.byClass(objectMapper, FileKey.class)) + .setDemandPubChannelURIProvider(demand -> URI.create("pubsub:log:demand:" + demand)) + .setDemandSubChannelURI(URI.create("pubsub:log:demand:*")) + .setSupplySerde(JacksonSerde.byClass(objectMapper, LogPile.class)) + .setSupplyChannelURIProvider(demand -> URI.create("pubsub:log:supply:" + demand)) + .setDemandInterval(Duration.ofSeconds(5)) + .setBufferSize(4) + .setFailureHandlerEmitError(Sinks.EmitFailureHandler.busyLooping(Duration.ofSeconds(1))) + .setChannelStateFn(supply -> ChannelState.ALIVE) + .buildFlux(); + } + +} diff --git a/log/log-common/src/main/java/com/navercorp/pinpoint/log/dto/LogDemand.java b/log/log-common/src/main/java/com/navercorp/pinpoint/log/dto/LogDemand.java new file mode 100644 index 0000000000000..61671578e9d77 --- /dev/null +++ b/log/log-common/src/main/java/com/navercorp/pinpoint/log/dto/LogDemand.java @@ -0,0 +1,51 @@ +/* + * Copyright 2023 NAVER Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.navercorp.pinpoint.log.dto; + +import com.navercorp.pinpoint.log.vo.FileKey; + +/** + * @author youngjin.kim2 + */ +public class LogDemand { + + private FileKey fileKey; + private long durationMillis; + + public LogDemand() {} + + public LogDemand(FileKey fileKey, long durationMillis) { + this.fileKey = fileKey; + this.durationMillis = durationMillis; + } + + public FileKey getFileKey() { + return fileKey; + } + + public void setFileKey(FileKey fileKey) { + this.fileKey = fileKey; + } + + public long getDurationMillis() { + return durationMillis; + } + + public void setDurationMillis(long durationMillis) { + this.durationMillis = durationMillis; + } + +} diff --git a/log/log-common/src/main/java/com/navercorp/pinpoint/log/dto/LogSupply.java b/log/log-common/src/main/java/com/navercorp/pinpoint/log/dto/LogSupply.java new file mode 100644 index 0000000000000..7f7ce30eaee91 --- /dev/null +++ b/log/log-common/src/main/java/com/navercorp/pinpoint/log/dto/LogSupply.java @@ -0,0 +1,35 @@ +/* + * Copyright 2023 NAVER Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.navercorp.pinpoint.log.dto; + +import com.navercorp.pinpoint.log.vo.LogPile; + +/** + * @author youngjin.kim2 + */ +public class LogSupply { + + private final LogPile pile; + + public LogSupply(LogPile pile) { + this.pile = pile; + } + + public LogPile getPile() { + return pile; + } + +} diff --git a/log/log-common/src/main/java/com/navercorp/pinpoint/log/vo/FileKey.java b/log/log-common/src/main/java/com/navercorp/pinpoint/log/vo/FileKey.java new file mode 100644 index 0000000000000..696b3bd8b14d5 --- /dev/null +++ b/log/log-common/src/main/java/com/navercorp/pinpoint/log/vo/FileKey.java @@ -0,0 +1,72 @@ +/* + * Copyright 2023 NAVER Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.navercorp.pinpoint.log.vo; + +import java.text.ParseException; +import java.util.Objects; + +/** + * @author youngjin.kim2 + */ +public class FileKey { + + private final HostKey hostKey; + private final String fileName; + + public FileKey(HostKey hostKey, String fileName) { + this.hostKey = hostKey; + this.fileName = fileName; + } + + public HostKey getHostKey() { + return hostKey; + } + + public String getFileName() { + return fileName; + } + + public static FileKey of(String hostGroupName, String hostName, String fileName) { + return new FileKey(HostKey.of(hostGroupName, hostName), fileName); + } + + public static FileKey parse(String s) throws ParseException { + String[] words = s.split(":"); + if (words.length != 3) { + throw new ParseException(s, s.length()); + } + return new FileKey(HostKey.of(words[0], words[1]), words[2]); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + FileKey fileKey = (FileKey) o; + return Objects.equals(hostKey, fileKey.hostKey) && Objects.equals(fileName, fileKey.fileName); + } + + @Override + public int hashCode() { + return Objects.hash(hostKey, fileName); + } + + @Override + public String toString() { + return hostKey.toString() + ':' + fileName; + } + +} diff --git a/log/log-common/src/main/java/com/navercorp/pinpoint/log/vo/HostKey.java b/log/log-common/src/main/java/com/navercorp/pinpoint/log/vo/HostKey.java new file mode 100644 index 0000000000000..88e1686ebfaa7 --- /dev/null +++ b/log/log-common/src/main/java/com/navercorp/pinpoint/log/vo/HostKey.java @@ -0,0 +1,63 @@ +/* + * Copyright 2023 NAVER Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.navercorp.pinpoint.log.vo; + +import java.util.Objects; + +/** + * @author youngjin.kim2 + */ +public class HostKey { + + private final String hostGroupName; + private final String hostName; + + private HostKey(String hostGroupName, String hostName) { + this.hostGroupName = hostGroupName; + this.hostName = hostName; + } + + public static HostKey of(String hostGroupName, String hostName) { + return new HostKey(hostGroupName, hostName); + } + + public String getHostGroupName() { + return hostGroupName; + } + + public String getHostName() { + return hostName; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + HostKey hostKey = (HostKey) o; + return Objects.equals(hostGroupName, hostKey.hostGroupName) && Objects.equals(hostName, hostKey.hostName); + } + + @Override + public int hashCode() { + return Objects.hash(hostGroupName, hostName); + } + + @Override + public String toString() { + return hostGroupName + ':' + hostName; + } + +} diff --git a/log/log-common/src/main/java/com/navercorp/pinpoint/log/vo/Log.java b/log/log-common/src/main/java/com/navercorp/pinpoint/log/vo/Log.java new file mode 100644 index 0000000000000..6fcdf4c22df2e --- /dev/null +++ b/log/log-common/src/main/java/com/navercorp/pinpoint/log/vo/Log.java @@ -0,0 +1,39 @@ +/* + * Copyright 2023 NAVER Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.navercorp.pinpoint.log.vo; + +/** + * @author youngjin.kim2 + */ +public class Log { + + private final long seq; + private final String log; + + public Log(long seq, String log) { + this.seq = seq; + this.log = log; + } + + public long getSeq() { + return seq; + } + + public String getLog() { + return log; + } + +} diff --git a/log/log-common/src/main/java/com/navercorp/pinpoint/log/vo/LogFileList.java b/log/log-common/src/main/java/com/navercorp/pinpoint/log/vo/LogFileList.java new file mode 100644 index 0000000000000..0b6fe30241dd7 --- /dev/null +++ b/log/log-common/src/main/java/com/navercorp/pinpoint/log/vo/LogFileList.java @@ -0,0 +1,48 @@ +/* + * Copyright 2023 NAVER Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.navercorp.pinpoint.log.vo; + +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; + +/** + * @author youngjin.kim2 + */ +public class LogFileList { + + private List fileKeys; + + public LogFileList() {} + + public LogFileList(List fileKeys) { + Objects.requireNonNull(fileKeys, "fileKeys"); + this.fileKeys = fileKeys.stream().map(el -> el.toString()).distinct().collect(Collectors.toUnmodifiableList()); + } + + public static LogFileList of(List fileKeys) { + return new LogFileList(fileKeys); + } + + public List getFileKeys() { + return fileKeys; + } + + public void setFileKeys(List fileKeys) { + this.fileKeys = fileKeys; + } + +} diff --git a/log/log-common/src/main/java/com/navercorp/pinpoint/log/vo/LogPile.java b/log/log-common/src/main/java/com/navercorp/pinpoint/log/vo/LogPile.java new file mode 100644 index 0000000000000..81fdcbf3bf7bb --- /dev/null +++ b/log/log-common/src/main/java/com/navercorp/pinpoint/log/vo/LogPile.java @@ -0,0 +1,41 @@ +/* + * Copyright 2023 NAVER Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.navercorp.pinpoint.log.vo; + +import java.util.List; + +/** + * @author youngjin.kim2 + */ +public class LogPile { + + private final long seq; + private final List logs; + + public LogPile(long seq, List logs) { + this.seq = seq; + this.logs = logs; + } + + public long getSeq() { + return seq; + } + + public List getLogs() { + return logs; + } + +} diff --git a/log/log-web/pom.xml b/log/log-web/pom.xml new file mode 100644 index 0000000000000..c508aa3aaf6fa --- /dev/null +++ b/log/log-web/pom.xml @@ -0,0 +1,29 @@ + + + + pinpoint-log-module + com.navercorp.pinpoint + 2.6.0-SNAPSHOT + + 4.0.0 + + pinpoint-log-web + + + 11 + ${env.JAVA_11_HOME} + + + + + com.navercorp.pinpoint + pinpoint-web + + + com.navercorp.pinpoint + pinpoint-log-common + + + \ No newline at end of file diff --git a/log/log-web/src/main/java/com/navercorp/pinpoint/log/web/LogWebConfig.java b/log/log-web/src/main/java/com/navercorp/pinpoint/log/web/LogWebConfig.java new file mode 100644 index 0000000000000..a192774e13e4d --- /dev/null +++ b/log/log-web/src/main/java/com/navercorp/pinpoint/log/web/LogWebConfig.java @@ -0,0 +1,15 @@ +package com.navercorp.pinpoint.log.web; + +import com.navercorp.pinpoint.log.web.controller.LogControllerConfig; +import com.navercorp.pinpoint.log.web.websocket.LogWebSocketConfig; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Import; + +@Configuration(proxyBeanMethods = false) +@Import({ + LogWebSocketConfig.class, + LogControllerConfig.class +}) +public class LogWebConfig { + +} diff --git a/log/log-web/src/main/java/com/navercorp/pinpoint/log/web/LogWebModule.java b/log/log-web/src/main/java/com/navercorp/pinpoint/log/web/LogWebModule.java new file mode 100644 index 0000000000000..e867950647a65 --- /dev/null +++ b/log/log-web/src/main/java/com/navercorp/pinpoint/log/web/LogWebModule.java @@ -0,0 +1,29 @@ +package com.navercorp.pinpoint.log.web;/* + * Copyright 2023 NAVER Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import com.navercorp.pinpoint.redis.RedisPropertySources; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Import; + +/** + * @author youngjin.kim2 + */ +@Configuration(proxyBeanMethods = false) +@ConditionalOnProperty(name = "pinpoint.modules.log.enabled", havingValue = "true") +@Import({ LogWebConfig.class, RedisPropertySources.class, LogWebPropertySources.class }) +public class LogWebModule { +} diff --git a/log/log-web/src/main/java/com/navercorp/pinpoint/log/web/LogWebPropertySources.java b/log/log-web/src/main/java/com/navercorp/pinpoint/log/web/LogWebPropertySources.java new file mode 100644 index 0000000000000..0ff728ea0278a --- /dev/null +++ b/log/log-web/src/main/java/com/navercorp/pinpoint/log/web/LogWebPropertySources.java @@ -0,0 +1,34 @@ +/* + * Copyright 2023 NAVER Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.navercorp.pinpoint.log.web; + +import org.springframework.context.annotation.PropertySource; +import org.springframework.context.annotation.PropertySources; + +import static com.navercorp.pinpoint.log.web.LogWebPropertySources.LOG; +import static com.navercorp.pinpoint.log.web.LogWebPropertySources.LOG_ROOT; + +@PropertySources({ + @PropertySource(name = "LogPropertySources", value = { LOG_ROOT, LOG }), +}) +public class LogWebPropertySources { + + private static final String PROFILE = "classpath:log/profiles/${pinpoint.profiles.active:local}/"; + + public static final String LOG_ROOT = "classpath:log/pinpoint-web-log-root.properties"; + public static final String LOG = PROFILE + "pinpoint-web-log.properties"; + +} diff --git a/log/log-web/src/main/java/com/navercorp/pinpoint/log/web/controller/LogController.java b/log/log-web/src/main/java/com/navercorp/pinpoint/log/web/controller/LogController.java new file mode 100644 index 0000000000000..1e2d4f555d65c --- /dev/null +++ b/log/log-web/src/main/java/com/navercorp/pinpoint/log/web/controller/LogController.java @@ -0,0 +1,74 @@ +/* + * Copyright 2023 NAVER Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.navercorp.pinpoint.log.web.controller; + +import com.navercorp.pinpoint.log.vo.FileKey; +import com.navercorp.pinpoint.log.vo.Log; +import com.navercorp.pinpoint.log.web.service.LiveTailService; +import com.navercorp.pinpoint.log.web.vo.LogHost; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestParam; +import org.springframework.web.bind.annotation.RestController; +import reactor.core.publisher.Flux; + +import java.time.Duration; +import java.util.List; +import java.util.Objects; +import java.util.Set; + +/** + * @author youngjin.kim2 + */ +@RestController +@RequestMapping("/log") +public class LogController { + + private final LiveTailService service; + + public LogController(LiveTailService service) { + this.service = Objects.requireNonNull(service, "service"); + } + + @GetMapping("hostGroups/{hostGroup}/hosts/{host}/files/{file}/recent") + public List test( + @PathVariable("hostGroup") String hostGroupName, + @PathVariable("host") String hostName, + @PathVariable("file") String fileName, + @RequestParam("durationMillis") long durationMillis + ) { + final FileKey fileKey = FileKey.of(hostGroupName, hostName, fileName); + final Duration duration = Duration.ofMillis(durationMillis); + return this.service.tail(fileKey) + .take(duration) + .flatMap(pile -> Flux.fromIterable(pile.getLogs())) + .map(Log::getLog) + .collectList() + .block(); + } + + @GetMapping("hostGroups") + public Set getHostGroups() { + return this.service.getHostGroupNames(); + } + + @GetMapping("hostGroups/{hostGroup}/hosts") + public List getHosts(@PathVariable("hostGroup") String hostGroupName) { + return LogHost.from(this.service.getFileKeys(hostGroupName)); + } + +} diff --git a/log/log-web/src/main/java/com/navercorp/pinpoint/log/web/controller/LogControllerConfig.java b/log/log-web/src/main/java/com/navercorp/pinpoint/log/web/controller/LogControllerConfig.java new file mode 100644 index 0000000000000..8dcad111588f8 --- /dev/null +++ b/log/log-web/src/main/java/com/navercorp/pinpoint/log/web/controller/LogControllerConfig.java @@ -0,0 +1,30 @@ +/* + * Copyright 2023 NAVER Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.navercorp.pinpoint.log.web.controller; + +import com.navercorp.pinpoint.log.web.service.LogServiceConfig; +import org.springframework.context.annotation.ComponentScan; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Import; + +/** + * @author youngjin.kim2 + */ +@Configuration +@ComponentScan(basePackages = "com.navercorp.pinpoint.log.web.controller") +@Import({ LogServiceConfig.class }) +public class LogControllerConfig { +} diff --git a/log/log-web/src/main/java/com/navercorp/pinpoint/log/web/dao/LiveTailDao.java b/log/log-web/src/main/java/com/navercorp/pinpoint/log/web/dao/LiveTailDao.java new file mode 100644 index 0000000000000..0800c59e6476a --- /dev/null +++ b/log/log-web/src/main/java/com/navercorp/pinpoint/log/web/dao/LiveTailDao.java @@ -0,0 +1,36 @@ +/* + * Copyright 2023 NAVER Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.navercorp.pinpoint.log.web.dao; + +import com.navercorp.pinpoint.log.vo.FileKey; +import com.navercorp.pinpoint.log.vo.LogPile; +import reactor.core.publisher.Flux; + +import java.util.List; +import java.util.Set; + +/** + * @author youngjin.kim2 + */ +public interface LiveTailDao { + + Flux tail(FileKey fileKey); + + Set getHostGroupNames(); + + List getFileKeys(String hostGroupName); + +} diff --git a/log/log-web/src/main/java/com/navercorp/pinpoint/log/web/dao/LiveTailDaoImpl.java b/log/log-web/src/main/java/com/navercorp/pinpoint/log/web/dao/LiveTailDaoImpl.java new file mode 100644 index 0000000000000..b1f64b6ffb98f --- /dev/null +++ b/log/log-web/src/main/java/com/navercorp/pinpoint/log/web/dao/LiveTailDaoImpl.java @@ -0,0 +1,112 @@ +/* + * Copyright 2023 NAVER Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.navercorp.pinpoint.log.web.dao; + +import com.google.common.base.Suppliers; +import com.navercorp.pinpoint.channel.service.client.FluxChannelServiceClient; +import com.navercorp.pinpoint.log.vo.FileKey; +import com.navercorp.pinpoint.log.vo.LogPile; +import org.springframework.data.redis.core.RedisTemplate; +import reactor.core.publisher.Flux; + +import java.text.ParseException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; + +/** + * @author youngjin.kim2 + */ +public class LiveTailDaoImpl implements LiveTailDao { + + private final RedisTemplate redis; + private final FluxChannelServiceClient client; + + private final Supplier>> fileKeyMapSupplier = + Suppliers.memoizeWithExpiration(this::getFileKeyMap, 10, TimeUnit.SECONDS); + + public LiveTailDaoImpl( + RedisTemplate redis, + FluxChannelServiceClient client + ) { + this.redis = Objects.requireNonNull(redis, "redis"); + this.client = Objects.requireNonNull(client, "client"); + } + + @Override + public Flux tail(FileKey fileKey) { + return this.client.request(fileKey); + } + + @Override + public Set getHostGroupNames() { + return this.fileKeyMapSupplier.get().keySet(); + } + + @Override + public List getFileKeys(String hostGroupName) { + return this.fileKeyMapSupplier.get().get(hostGroupName); + } + + private Map> getFileKeyMap() { + return groupByHostGroupName(getAllFileKeys()); + } + + private Map> groupByHostGroupName(List fileKeys) { + Map> result = new HashMap<>(fileKeys.size() * 3); + for (FileKey fileKey: fileKeys) { + result.computeIfAbsent(fileKey.getHostKey().getHostGroupName(), k -> new ArrayList<>(2)).add(fileKey); + } + return result; + } + + private List getAllFileKeys() { + Set keys = this.redis.keys("log:files:*"); + if (keys == null) { + return List.of(); + } + List fileKeys = new ArrayList<>(keys.size() * 8); + for (String key: keys) { + fileKeys.addAll(getFileKeysFromRedis(key)); + } + return fileKeys; + } + + private List getFileKeysFromRedis(String redisKey) { + String value = this.redis.opsForValue().get(redisKey); + if (value == null) { + return List.of(); + } + return parseFileKeyList(value); + } + + private List parseFileKeyList(String raw) { + List keys = new ArrayList<>(8); + for (String fileKey: raw.split("\r\n")) { + try { + keys.add(FileKey.parse(fileKey)); + } catch (ParseException ignored) { + } + } + return keys; + } + +} diff --git a/log/log-web/src/main/java/com/navercorp/pinpoint/log/web/dao/LogWebDaoConfig.java b/log/log-web/src/main/java/com/navercorp/pinpoint/log/web/dao/LogWebDaoConfig.java new file mode 100644 index 0000000000000..00b206ab9b919 --- /dev/null +++ b/log/log-web/src/main/java/com/navercorp/pinpoint/log/web/dao/LogWebDaoConfig.java @@ -0,0 +1,67 @@ +/* + * Copyright 2023 NAVER Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.navercorp.pinpoint.log.web.dao; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.navercorp.pinpoint.channel.ChannelProviderRepository; +import com.navercorp.pinpoint.channel.redis.pubsub.RedisPubSubConfig; +import com.navercorp.pinpoint.channel.service.FluxChannelServiceProtocol; +import com.navercorp.pinpoint.channel.service.client.ChannelServiceClient; +import com.navercorp.pinpoint.channel.service.client.FluxChannelServiceClient; +import com.navercorp.pinpoint.log.LogServiceProtocolConfig; +import com.navercorp.pinpoint.log.vo.FileKey; +import com.navercorp.pinpoint.log.vo.LogPile; +import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Import; +import org.springframework.data.redis.core.RedisTemplate; +import reactor.core.scheduler.Schedulers; + +/** + * @author youngjin.kim2 + */ +@Configuration +@Import({ RedisPubSubConfig.class, LogServiceProtocolConfig.class }) +public class LogWebDaoConfig { + + @Bean + @ConditionalOnMissingBean(ObjectMapper.class) + ObjectMapper objectMapper() { + return new ObjectMapper(); + } + + @Bean + FluxChannelServiceClient liveTailClient( + ChannelProviderRepository channelProviderRepository, + FluxChannelServiceProtocol protocol + ) { + return ChannelServiceClient.buildFlux( + channelProviderRepository, + protocol, + Schedulers.newParallel("liveTail", Runtime.getRuntime().availableProcessors()) + ); + } + + @Bean + LiveTailDao liveTailDao( + RedisTemplate template, + FluxChannelServiceClient client + ) { + return new LiveTailDaoImpl(template, client); + } + +} diff --git a/log/log-web/src/main/java/com/navercorp/pinpoint/log/web/service/LiveTailService.java b/log/log-web/src/main/java/com/navercorp/pinpoint/log/web/service/LiveTailService.java new file mode 100644 index 0000000000000..9ecb23374e465 --- /dev/null +++ b/log/log-web/src/main/java/com/navercorp/pinpoint/log/web/service/LiveTailService.java @@ -0,0 +1,36 @@ +/* + * Copyright 2023 NAVER Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.navercorp.pinpoint.log.web.service; + +import com.navercorp.pinpoint.log.vo.FileKey; +import com.navercorp.pinpoint.log.vo.LogPile; +import reactor.core.publisher.Flux; + +import java.util.List; +import java.util.Set; + +/** + * @author youngjin.kim2 + */ +public interface LiveTailService { + + Flux tail(FileKey fileKey); + + Set getHostGroupNames(); + + List getFileKeys(String hostGroupName); + +} diff --git a/log/log-web/src/main/java/com/navercorp/pinpoint/log/web/service/LiveTailServiceImpl.java b/log/log-web/src/main/java/com/navercorp/pinpoint/log/web/service/LiveTailServiceImpl.java new file mode 100644 index 0000000000000..5b39e9b19b465 --- /dev/null +++ b/log/log-web/src/main/java/com/navercorp/pinpoint/log/web/service/LiveTailServiceImpl.java @@ -0,0 +1,53 @@ +/* + * Copyright 2023 NAVER Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.navercorp.pinpoint.log.web.service; + +import com.navercorp.pinpoint.log.vo.FileKey; +import com.navercorp.pinpoint.log.vo.LogPile; +import com.navercorp.pinpoint.log.web.dao.LiveTailDao; +import reactor.core.publisher.Flux; + +import java.util.List; +import java.util.Objects; +import java.util.Set; + +/** + * @author youngjin.kim2 + */ +public class LiveTailServiceImpl implements LiveTailService { + + private final LiveTailDao dao; + + public LiveTailServiceImpl(LiveTailDao dao) { + this.dao = Objects.requireNonNull(dao, "dao"); + } + + @Override + public Flux tail(FileKey fileKey) { + return this.dao.tail(fileKey); + } + + @Override + public Set getHostGroupNames() { + return this.dao.getHostGroupNames(); + } + + @Override + public List getFileKeys(String hostGroupName) { + return this.dao.getFileKeys(hostGroupName); + } + +} diff --git a/log/log-web/src/main/java/com/navercorp/pinpoint/log/web/service/LogServiceConfig.java b/log/log-web/src/main/java/com/navercorp/pinpoint/log/web/service/LogServiceConfig.java new file mode 100644 index 0000000000000..2f108dd7d46f5 --- /dev/null +++ b/log/log-web/src/main/java/com/navercorp/pinpoint/log/web/service/LogServiceConfig.java @@ -0,0 +1,37 @@ +/* + * Copyright 2023 NAVER Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.navercorp.pinpoint.log.web.service; + +import com.navercorp.pinpoint.channel.redis.pubsub.RedisPubSubConfig; +import com.navercorp.pinpoint.log.web.dao.LiveTailDao; +import com.navercorp.pinpoint.log.web.dao.LogWebDaoConfig; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Import; + +/** + * @author youngjin.kim2 + */ +@Configuration +@Import({ RedisPubSubConfig.class, LogWebDaoConfig.class }) +public class LogServiceConfig { + + @Bean + LiveTailService liveTailService(LiveTailDao dao) { + return new LiveTailServiceImpl(dao); + } + +} diff --git a/log/log-web/src/main/java/com/navercorp/pinpoint/log/web/vo/LogHost.java b/log/log-web/src/main/java/com/navercorp/pinpoint/log/web/vo/LogHost.java new file mode 100644 index 0000000000000..dba725929576e --- /dev/null +++ b/log/log-web/src/main/java/com/navercorp/pinpoint/log/web/vo/LogHost.java @@ -0,0 +1,60 @@ +/* + * Copyright 2023 NAVER Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.navercorp.pinpoint.log.web.vo; + +import com.navercorp.pinpoint.log.vo.FileKey; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * @author youngjin.kim2 + */ +public class LogHost { + + private final String name; + private final List files; + + public LogHost(String name, List files) { + this.name = name; + this.files = files; + } + + public static List from(List fileKey) { + final Map> byName = new HashMap<>(); + for (FileKey key : fileKey) { + final String hostName = key.getHostKey().getHostName(); + final List keys = byName.computeIfAbsent(hostName, k -> new ArrayList<>()); + keys.add(key.getFileName()); + } + + final List hosts = new ArrayList<>(); + for (Map.Entry> entry: byName.entrySet()) { + hosts.add(new LogHost(entry.getKey(), entry.getValue())); + } + return hosts; + } + + public String getName() { + return name; + } + + public List getFiles() { + return files; + } +} diff --git a/log/log-web/src/main/java/com/navercorp/pinpoint/log/web/websocket/LogWebSocketConfig.java b/log/log-web/src/main/java/com/navercorp/pinpoint/log/web/websocket/LogWebSocketConfig.java new file mode 100644 index 0000000000000..046f5aeab6504 --- /dev/null +++ b/log/log-web/src/main/java/com/navercorp/pinpoint/log/web/websocket/LogWebSocketConfig.java @@ -0,0 +1,43 @@ +/* + * Copyright 2023 NAVER Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.navercorp.pinpoint.log.web.websocket; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.navercorp.pinpoint.channel.serde.JacksonSerde; +import com.navercorp.pinpoint.log.vo.LogPile; +import com.navercorp.pinpoint.log.web.service.LiveTailService; +import com.navercorp.pinpoint.log.web.service.LogServiceConfig; +import com.navercorp.pinpoint.web.websocket.PinpointWebSocketHandler; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Import; + +/** + * @author youngjin.kim2 + */ +@Configuration +@Import({ LogServiceConfig.class }) +public class LogWebSocketConfig { + + @Bean + PinpointWebSocketHandler logWebSocketHandler( + LiveTailService liveTailService, + ObjectMapper objectMapper + ) { + return new LogWebSocketHandler(liveTailService, JacksonSerde.byClass(objectMapper, LogPile.class)); + } + +} diff --git a/log/log-web/src/main/java/com/navercorp/pinpoint/log/web/websocket/LogWebSocketHandler.java b/log/log-web/src/main/java/com/navercorp/pinpoint/log/web/websocket/LogWebSocketHandler.java new file mode 100644 index 0000000000000..d5036b868705b --- /dev/null +++ b/log/log-web/src/main/java/com/navercorp/pinpoint/log/web/websocket/LogWebSocketHandler.java @@ -0,0 +1,142 @@ +/* + * Copyright 2023 NAVER Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.navercorp.pinpoint.log.web.websocket; + +import com.navercorp.pinpoint.log.vo.FileKey; +import com.navercorp.pinpoint.log.vo.LogPile; +import com.navercorp.pinpoint.log.web.service.LiveTailService; +import com.navercorp.pinpoint.web.websocket.PinpointWebSocketHandler; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.springframework.core.serializer.Serializer; +import org.springframework.util.CollectionUtils; +import org.springframework.web.socket.CloseStatus; +import org.springframework.web.socket.TextMessage; +import org.springframework.web.socket.WebSocketSession; +import org.springframework.web.socket.adapter.standard.StandardWebSocketSession; +import org.springframework.web.socket.handler.TextWebSocketHandler; +import reactor.core.Disposable; + +import javax.annotation.Nonnull; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +/** + * @author youngjin.kim2 + */ +class LogWebSocketHandler extends TextWebSocketHandler implements PinpointWebSocketHandler { + + private static final String LIVE_TAIL_DISPOSABLE_ATTR = "pinpoint.live-tail.subscription"; + + private final Logger logger = LogManager.getLogger(this.getClass()); + private final Serializer logPileSerializer; + + private final LiveTailService liveTailService; + + LogWebSocketHandler( + LiveTailService liveTailService, + Serializer logPileSerializer + ) { + this.liveTailService = Objects.requireNonNull(liveTailService, "liveTailService"); + this.logPileSerializer = Objects.requireNonNull(logPileSerializer, "logPileSerializer"); + } + + @Override + public void afterConnectionEstablished(@Nonnull WebSocketSession session) throws Exception { + try { + if (session instanceof StandardWebSocketSession) { + startLiveTail((StandardWebSocketSession) session); + } else { + logger.error("Failed to handle live-tail: session is not an instance of StandardWebSocketSession"); + throw new RuntimeException("Failed to handle live-tail: session is not an instance of " + + "StandardWebSocketSession"); + } + } catch (Exception e) { + logger.error("Failed to handle live-tail", e); + stopLiveTail(session); + session.close(); + return; + } + + super.afterConnectionEstablished(session); + } + + private void startLiveTail(StandardWebSocketSession session) { + logger.info("Starting live-tail, session: {}", session); + + final Map> params = session + .getNativeSession() + .getRequestParameterMap(); + + final String hostGroupName = getUniParam(params, "hostGroupName"); + final String hostName = getUniParam(params, "hostName"); + final String fileName = getUniParam(params, "fileName"); + final FileKey fileKey = FileKey.of(hostGroupName, hostName, fileName); + + final Disposable disposable = this.liveTailService.tail(fileKey) + .subscribe(supply -> sendSupply(session, supply)); + session.getAttributes().put(LIVE_TAIL_DISPOSABLE_ATTR, disposable); + } + + private static String getUniParam(Map> params, String key) { + final List values = params.get(key); + String firstElement = CollectionUtils.firstElement(values); + if (firstElement == null) { + throw new RuntimeException("No parameter: " + key); + } + return firstElement; + } + + private void stopLiveTail(@Nonnull WebSocketSession session) { + logger.info("Stopping live-tail, session: {}", session); + Object disposable = session.getAttributes().get(LIVE_TAIL_DISPOSABLE_ATTR); + if (disposable instanceof Disposable) { + ((Disposable) disposable).dispose(); + } + } + + @Override + public void afterConnectionClosed(@Nonnull WebSocketSession session, @Nonnull CloseStatus status) throws Exception { + stopLiveTail(session); + super.afterConnectionClosed(session, status); + } + + + private void sendSupply(WebSocketSession session, LogPile pile) { + try { + byte[] payload = this.logPileSerializer.serializeToByteArray(pile); + session.sendMessage(new TextMessage(payload)); + } catch (Exception e) { + stopLiveTail(session); + logger.error("Failed to send message", e); + } + } + + @Override public void start() {} + @Override public void stop() {} + + @Override + public String getRequestMapping() { + return "/log/liveTail"; + } + + @Override + public int getPriority() { + return 0; + } + +} diff --git a/log/log-web/src/main/resources/log/pinpoint-web-log-root.properties b/log/log-web/src/main/resources/log/pinpoint-web-log-root.properties new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/log/log-web/src/main/resources/log/profiles/local/pinpoint-web-log.properties b/log/log-web/src/main/resources/log/profiles/local/pinpoint-web-log.properties new file mode 100644 index 0000000000000..0f6e2ff2688af --- /dev/null +++ b/log/log-web/src/main/resources/log/profiles/local/pinpoint-web-log.properties @@ -0,0 +1 @@ +config.show.log=true \ No newline at end of file diff --git a/log/log-web/src/main/resources/log/profiles/release/pinpoint-web-log.properties b/log/log-web/src/main/resources/log/profiles/release/pinpoint-web-log.properties new file mode 100644 index 0000000000000..0f6e2ff2688af --- /dev/null +++ b/log/log-web/src/main/resources/log/profiles/release/pinpoint-web-log.properties @@ -0,0 +1 @@ +config.show.log=true \ No newline at end of file diff --git a/log/pom.xml b/log/pom.xml new file mode 100644 index 0000000000000..3b4d8eb6f5e38 --- /dev/null +++ b/log/pom.xml @@ -0,0 +1,21 @@ + + + + pinpoint + com.navercorp.pinpoint + 2.6.0-SNAPSHOT + + 4.0.0 + + pinpoint-log-module + pom + + + log-collector + log-web + log-common + + + \ No newline at end of file diff --git a/metric-module/collector-starter/pom.xml b/metric-module/collector-starter/pom.xml index 8f3f41fa60936..b19176b761e17 100644 --- a/metric-module/collector-starter/pom.xml +++ b/metric-module/collector-starter/pom.xml @@ -44,6 +44,10 @@ com.navercorp.pinpoint pinpoint-inspector-collector + + com.navercorp.pinpoint + pinpoint-log-collector + diff --git a/metric-module/collector-starter/src/main/java/com/navercorp/pinpoint/collector/starter/multi/application/MultiApplication.java b/metric-module/collector-starter/src/main/java/com/navercorp/pinpoint/collector/starter/multi/application/MultiApplication.java index e06b51c61d518..808481dee741d 100644 --- a/metric-module/collector-starter/src/main/java/com/navercorp/pinpoint/collector/starter/multi/application/MultiApplication.java +++ b/metric-module/collector-starter/src/main/java/com/navercorp/pinpoint/collector/starter/multi/application/MultiApplication.java @@ -7,10 +7,12 @@ import com.navercorp.pinpoint.common.server.env.ProfileResolveListener; import com.navercorp.pinpoint.common.server.util.ServerBootLogger; import com.navercorp.pinpoint.inspector.collector.InspectorCollectorApp; +import com.navercorp.pinpoint.log.collector.LogCollectorModule; import com.navercorp.pinpoint.metric.collector.CollectorType; import com.navercorp.pinpoint.metric.collector.CollectorTypeParser; import com.navercorp.pinpoint.metric.collector.MetricCollectorApp; import com.navercorp.pinpoint.metric.collector.TypeSet; +import com.navercorp.pinpoint.redis.RedisPropertySources; import com.navercorp.pinpoint.uristat.collector.UriStatCollectorConfig; import org.springframework.boot.Banner; import org.springframework.boot.SpringBootConfiguration; @@ -79,6 +81,16 @@ public static void main(String[] args) { metricAppBuilder.listeners(new AdditionalProfileListener("metric")); metricAppBuilder.build().run(args); } + + if (types.hasType(CollectorType.LOG)) { + logger.info(String.format("Start %s collector", CollectorType.LOG)); + SpringApplicationBuilder logAppBuilder = createAppBuilder(builder, 0, + LogCollectorModule.class, + RedisPropertySources.class + ) + .web(WebApplicationType.NONE); + logAppBuilder.build().run(args); + } } diff --git a/metric-module/metric/src/main/java/com/navercorp/pinpoint/metric/collector/CollectorType.java b/metric-module/metric/src/main/java/com/navercorp/pinpoint/metric/collector/CollectorType.java index ece5ad76db365..d6f55a40f4c78 100644 --- a/metric-module/metric/src/main/java/com/navercorp/pinpoint/metric/collector/CollectorType.java +++ b/metric-module/metric/src/main/java/com/navercorp/pinpoint/metric/collector/CollectorType.java @@ -7,6 +7,7 @@ public enum CollectorType { ALL, BASIC, METRIC, + LOG, BASIC_WITH_INSPECTOR; public boolean hasType(CollectorType type) { diff --git a/metric-module/web-starter/pom.xml b/metric-module/web-starter/pom.xml index fe0c3b707b757..551e2e94d660c 100644 --- a/metric-module/web-starter/pom.xml +++ b/metric-module/web-starter/pom.xml @@ -48,6 +48,10 @@ com.navercorp.pinpoint pinpoint-inspector-web + + com.navercorp.pinpoint + pinpoint-log-web + diff --git a/metric-module/web-starter/src/main/java/com/navercorp/pinpoint/web/starter/multi/MetricAndWebApp.java b/metric-module/web-starter/src/main/java/com/navercorp/pinpoint/web/starter/multi/MetricAndWebApp.java index b6fdca11f1919..7b12ce254c7bb 100644 --- a/metric-module/web-starter/src/main/java/com/navercorp/pinpoint/web/starter/multi/MetricAndWebApp.java +++ b/metric-module/web-starter/src/main/java/com/navercorp/pinpoint/web/starter/multi/MetricAndWebApp.java @@ -19,6 +19,7 @@ import com.navercorp.pinpoint.common.server.util.ServerBootLogger; import com.navercorp.pinpoint.datasource.MainDataSourcePropertySource; import com.navercorp.pinpoint.inspector.web.InspectorWebApp; +import com.navercorp.pinpoint.log.web.LogWebModule; import com.navercorp.pinpoint.login.basic.PinpointBasicLoginConfig; import com.navercorp.pinpoint.metric.web.MetricWebApp; import com.navercorp.pinpoint.redis.RedisPropertySources; @@ -64,7 +65,8 @@ public static void main(String[] args) { AuthorizationConfig.class, MetricWebApp.class, UriStatWebConfig.class, - InspectorWebApp.class + InspectorWebApp.class, + LogWebModule.class ); starter.addProfiles("uri", "metric"); starter.start(args); diff --git a/pom.xml b/pom.xml index 821d474c75007..b90d20c8878bf 100644 --- a/pom.xml +++ b/pom.xml @@ -126,6 +126,7 @@ realtime redis inspector-module + log user batch-alarmsender channel @@ -544,6 +545,21 @@ pinpoint-inspector-web ${project.version} + + com.navercorp.pinpoint + pinpoint-log-web + ${project.version} + + + com.navercorp.pinpoint + pinpoint-log-collector + ${project.version} + + + com.navercorp.pinpoint + pinpoint-log-common + ${project.version} + com.navercorp.pinpoint pinpoint-plugins diff --git a/redis/src/main/java/com/navercorp/pinpoint/channel/redis/kv/RedisKVChannelConfig.java b/redis/src/main/java/com/navercorp/pinpoint/channel/redis/kv/RedisKVChannelConfig.java index 935f4ecaa729d..6e2c0d5617fd1 100644 --- a/redis/src/main/java/com/navercorp/pinpoint/channel/redis/kv/RedisKVChannelConfig.java +++ b/redis/src/main/java/com/navercorp/pinpoint/channel/redis/kv/RedisKVChannelConfig.java @@ -34,11 +34,11 @@ @Import(RedisBasicConfig.class) public class RedisKVChannelConfig { - @Bean("redisPubSubChannelProvider") - ChannelProviderRegistry redisPubSubChannelProvider(RedisTemplate template) { + @Bean + public ChannelProviderRegistry redisKeyValueChannelProvider(RedisTemplate template) { Scheduler scheduler = Schedulers.newParallel("kv-channel-poller", Runtime.getRuntime().availableProcessors()); - PubChannelProvider pub = new RedisKVPubChannelProvider(template.opsForValue()); - SubChannelProvider sub = new RedisKVSubChannelProvider(template.opsForValue(), scheduler); + PubChannelProvider pub = new RedisKVPubChannelProvider(template); + SubChannelProvider sub = new RedisKVSubChannelProvider(template, scheduler); return ChannelProviderRegistry.of(RedisKVConstants.SCHEME, ChannelProvider.pair(pub, sub)); } diff --git a/redis/src/main/java/com/navercorp/pinpoint/channel/redis/kv/RedisKVPubChannel.java b/redis/src/main/java/com/navercorp/pinpoint/channel/redis/kv/RedisKVPubChannel.java index 24a6264432f8e..4af3800332bfa 100644 --- a/redis/src/main/java/com/navercorp/pinpoint/channel/redis/kv/RedisKVPubChannel.java +++ b/redis/src/main/java/com/navercorp/pinpoint/channel/redis/kv/RedisKVPubChannel.java @@ -16,26 +16,30 @@ package com.navercorp.pinpoint.channel.redis.kv; import com.navercorp.pinpoint.channel.PubChannel; -import org.springframework.data.redis.core.ValueOperations; +import org.springframework.data.redis.core.RedisTemplate; import java.util.Objects; +import java.util.concurrent.TimeUnit; /** * @author youngjin.kim2 */ class RedisKVPubChannel implements PubChannel { - private final ValueOperations ops; + private final RedisTemplate template; + private final long expireMs; private final String key; - RedisKVPubChannel(ValueOperations ops, String key) { - this.ops = Objects.requireNonNull(ops, "ops"); + RedisKVPubChannel(RedisTemplate template, long expireMs, String key) { + this.template = Objects.requireNonNull(template, "template"); + this.expireMs = expireMs; this.key = Objects.requireNonNull(key, "key"); } @Override public void publish(byte[] content) { - this.ops.set(this.key, new String(content)); + this.template.opsForValue().set(this.key, new String(content)); + this.template.expire(this.key, expireMs, TimeUnit.MILLISECONDS); } } diff --git a/redis/src/main/java/com/navercorp/pinpoint/channel/redis/kv/RedisKVPubChannelProvider.java b/redis/src/main/java/com/navercorp/pinpoint/channel/redis/kv/RedisKVPubChannelProvider.java index 2081c30ef5279..1afa36d4f186e 100644 --- a/redis/src/main/java/com/navercorp/pinpoint/channel/redis/kv/RedisKVPubChannelProvider.java +++ b/redis/src/main/java/com/navercorp/pinpoint/channel/redis/kv/RedisKVPubChannelProvider.java @@ -17,8 +17,9 @@ import com.navercorp.pinpoint.channel.PubChannel; import com.navercorp.pinpoint.channel.PubChannelProvider; -import org.springframework.data.redis.core.ValueOperations; +import org.springframework.data.redis.core.RedisTemplate; +import java.time.Duration; import java.util.Objects; /** @@ -26,15 +27,20 @@ */ class RedisKVPubChannelProvider implements PubChannelProvider { - private final ValueOperations ops; + private final RedisTemplate template; - RedisKVPubChannelProvider(ValueOperations ops) { - this.ops = Objects.requireNonNull(ops, "ops"); + RedisKVPubChannelProvider(RedisTemplate template) { + this.template = Objects.requireNonNull(template, "template"); } @Override public PubChannel getPubChannel(String key) { - return new RedisKVPubChannel(this.ops, key); + String[] words = key.split(":", 2); + if (words.length != 2) { + throw new IllegalArgumentException("the key must contain expire duration"); + } + Duration expire = Duration.parse(words[0]); + return new RedisKVPubChannel(this.template, expire.toMillis(), words[1]); } } diff --git a/redis/src/main/java/com/navercorp/pinpoint/channel/redis/kv/RedisKVSubChannel.java b/redis/src/main/java/com/navercorp/pinpoint/channel/redis/kv/RedisKVSubChannel.java index 1a55b2fbb3c50..437c7b0650af6 100644 --- a/redis/src/main/java/com/navercorp/pinpoint/channel/redis/kv/RedisKVSubChannel.java +++ b/redis/src/main/java/com/navercorp/pinpoint/channel/redis/kv/RedisKVSubChannel.java @@ -18,7 +18,7 @@ import com.navercorp.pinpoint.channel.SubChannel; import com.navercorp.pinpoint.channel.SubConsumer; import com.navercorp.pinpoint.channel.Subscription; -import org.springframework.data.redis.core.ValueOperations; +import org.springframework.data.redis.core.RedisTemplate; import reactor.core.Disposable; import reactor.core.publisher.Flux; import reactor.core.scheduler.Scheduler; @@ -33,7 +33,7 @@ */ class RedisKVSubChannel implements SubChannel { - private final ValueOperations ops; + private final RedisTemplate template; private final Scheduler scheduler; private final Duration pollPeriod; private final String key; @@ -41,8 +41,8 @@ class RedisKVSubChannel implements SubChannel { private final List subscriptions = new ArrayList<>(2); private volatile Disposable disposePolling = null; - RedisKVSubChannel(ValueOperations ops, Scheduler scheduler, Duration pollPeriod, String key) { - this.ops = Objects.requireNonNull(ops, "ops"); + RedisKVSubChannel(RedisTemplate template, Scheduler scheduler, Duration pollPeriod, String key) { + this.template = Objects.requireNonNull(template, "template"); this.scheduler = Objects.requireNonNull(scheduler, "scheduler"); this.pollPeriod = Objects.requireNonNull(pollPeriod, "pollPeriod"); this.key = Objects.requireNonNull(key, "key"); @@ -88,7 +88,7 @@ private void stopPolling() { } private void broadcast(long tick) { - String value = this.ops.get(this.key); + String value = this.template.opsForValue().get(this.key); if (value == null) { return; } diff --git a/redis/src/main/java/com/navercorp/pinpoint/channel/redis/kv/RedisKVSubChannelProvider.java b/redis/src/main/java/com/navercorp/pinpoint/channel/redis/kv/RedisKVSubChannelProvider.java index 5629d986462bc..df2b121239d7a 100644 --- a/redis/src/main/java/com/navercorp/pinpoint/channel/redis/kv/RedisKVSubChannelProvider.java +++ b/redis/src/main/java/com/navercorp/pinpoint/channel/redis/kv/RedisKVSubChannelProvider.java @@ -17,7 +17,7 @@ import com.navercorp.pinpoint.channel.SubChannel; import com.navercorp.pinpoint.channel.SubChannelProvider; -import org.springframework.data.redis.core.ValueOperations; +import org.springframework.data.redis.core.RedisTemplate; import reactor.core.scheduler.Scheduler; import java.time.Duration; @@ -28,11 +28,11 @@ */ class RedisKVSubChannelProvider implements SubChannelProvider { - private final ValueOperations ops; + private final RedisTemplate template; private final Scheduler scheduler; - public RedisKVSubChannelProvider(ValueOperations ops, Scheduler scheduler) { - this.ops = Objects.requireNonNull(ops, "ops"); + RedisKVSubChannelProvider(RedisTemplate template, Scheduler scheduler) { + this.template = Objects.requireNonNull(template, "template"); this.scheduler = Objects.requireNonNull(scheduler, "scheduler"); } @@ -43,7 +43,7 @@ public SubChannel getSubChannel(String key) { throw new IllegalArgumentException("the key must contain period"); } Duration period = Duration.parse(words[0]); - return new RedisKVSubChannel(this.ops, this.scheduler, period, words[1]); + return new RedisKVSubChannel(this.template, this.scheduler, period, words[1]); } } diff --git a/redis/src/main/java/com/navercorp/pinpoint/channel/redis/pubsub/RedisPubSubConfig.java b/redis/src/main/java/com/navercorp/pinpoint/channel/redis/pubsub/RedisPubSubConfig.java index 33906ab1e442a..642c713f317b5 100644 --- a/redis/src/main/java/com/navercorp/pinpoint/channel/redis/pubsub/RedisPubSubConfig.java +++ b/redis/src/main/java/com/navercorp/pinpoint/channel/redis/pubsub/RedisPubSubConfig.java @@ -44,13 +44,13 @@ public class RedisPubSubConfig { long clientTimeoutMs; @Bean("redisPubSubMessageExecutor") - ExecutorService redisPubSubMessageExecutor() { + public ExecutorService redisPubSubMessageExecutor() { final int processors = Runtime.getRuntime().availableProcessors(); return RedisUtils.newFixedThreadPool(processors, "RedisPubSubMessageExecutor"); } @Bean("redisMessageListenerContainer") - RedisMessageListenerContainer redisMessageListenerContainer( + public RedisMessageListenerContainer redisMessageListenerContainer( RedisConnectionFactory redisConnectionFactory, @Qualifier("redisPubSubMessageExecutor") ExecutorService executor ) { @@ -61,7 +61,7 @@ RedisMessageListenerContainer redisMessageListenerContainer( } @Bean("redisPubSubChannelProvider") - ChannelProviderRegistry redisPubSubChannelProvider( + public ChannelProviderRegistry redisPubSubChannelProvider( ReactiveRedisTemplate reactiveRedisTemplate, RedisMessageListenerContainer redisMessageListenerContainer ) { diff --git a/redis/src/main/java/com/navercorp/pinpoint/channel/redis/pubsub/RedisSubChannelProvider.java b/redis/src/main/java/com/navercorp/pinpoint/channel/redis/pubsub/RedisSubChannelProvider.java index 5ff4efdf5e858..13dbc63d82d78 100644 --- a/redis/src/main/java/com/navercorp/pinpoint/channel/redis/pubsub/RedisSubChannelProvider.java +++ b/redis/src/main/java/com/navercorp/pinpoint/channel/redis/pubsub/RedisSubChannelProvider.java @@ -27,11 +27,11 @@ /** * @author youngjin.kim2 */ -public class RedisSubChannelProvider implements SubChannelProvider { +class RedisSubChannelProvider implements SubChannelProvider { private final RedisMessageListenerContainer listenerContainer; - public RedisSubChannelProvider(RedisMessageListenerContainer listenerContainer) { + RedisSubChannelProvider(RedisMessageListenerContainer listenerContainer) { this.listenerContainer = Objects.requireNonNull(listenerContainer, "listenerContainer"); } diff --git a/redis/src/main/java/com/navercorp/pinpoint/channel/redis/stream/RedisStreamConfig.java b/redis/src/main/java/com/navercorp/pinpoint/channel/redis/stream/RedisStreamConfig.java index ff3770a3466d8..75600cda8d1df 100644 --- a/redis/src/main/java/com/navercorp/pinpoint/channel/redis/stream/RedisStreamConfig.java +++ b/redis/src/main/java/com/navercorp/pinpoint/channel/redis/stream/RedisStreamConfig.java @@ -54,12 +54,12 @@ public class RedisStreamConfig { String consumerName; @Bean("redisStreamMessageExecutor") - Executor redisPubSubMessageExecutor() { + public Executor redisPubSubMessageExecutor() { return new SimpleAsyncTaskExecutor("redis-stream-message-executor"); } @Bean - StreamMessageListenerContainer> streamMessageListenerContainer( + public StreamMessageListenerContainer> streamMessageListenerContainer( RedisConnectionFactory redisConnectionFactory, @Qualifier("redisStreamMessageExecutor") Executor executor ) { @@ -78,7 +78,7 @@ StreamMessageListenerContainer> stream } @Bean("redisStreamPubSubChannelProvider") - ChannelProviderRegistry redisStreamPubSubChannelProvider( + public ChannelProviderRegistry redisStreamPubSubChannelProvider( ReactiveRedisTemplate redisTemplate, StreamMessageListenerContainer> listenerContainer ) { diff --git a/redis/src/main/java/com/navercorp/pinpoint/channel/redis/stream/RedisStreamPubChannelProvider.java b/redis/src/main/java/com/navercorp/pinpoint/channel/redis/stream/RedisStreamPubChannelProvider.java index 6454b4e530244..b0f920ea4a70b 100644 --- a/redis/src/main/java/com/navercorp/pinpoint/channel/redis/stream/RedisStreamPubChannelProvider.java +++ b/redis/src/main/java/com/navercorp/pinpoint/channel/redis/stream/RedisStreamPubChannelProvider.java @@ -28,7 +28,7 @@ public class RedisStreamPubChannelProvider implements PubChannelProvider { private final ReactiveStreamOperations streamOps; - public RedisStreamPubChannelProvider(ReactiveStreamOperations streamOps) { + RedisStreamPubChannelProvider(ReactiveStreamOperations streamOps) { this.streamOps = Objects.requireNonNull(streamOps, "streamOps"); } diff --git a/redis/src/main/java/com/navercorp/pinpoint/channel/redis/stream/RedisStreamSubChannelProvider.java b/redis/src/main/java/com/navercorp/pinpoint/channel/redis/stream/RedisStreamSubChannelProvider.java index 23efb2f7414ea..b216351698786 100644 --- a/redis/src/main/java/com/navercorp/pinpoint/channel/redis/stream/RedisStreamSubChannelProvider.java +++ b/redis/src/main/java/com/navercorp/pinpoint/channel/redis/stream/RedisStreamSubChannelProvider.java @@ -28,13 +28,13 @@ /** * @author youngjin.kim2 */ -public class RedisStreamSubChannelProvider implements SubChannelProvider, InitializingBean { +class RedisStreamSubChannelProvider implements SubChannelProvider, InitializingBean { private final StreamMessageListenerContainer> listenerContainer; private final ReactiveRedisTemplate redisTemplate; private final Consumer consumer; - public RedisStreamSubChannelProvider( + RedisStreamSubChannelProvider( StreamMessageListenerContainer> listenerContainer, ReactiveRedisTemplate redisTemplate, Consumer consumer diff --git a/redis/src/main/java/com/navercorp/pinpoint/redis/RedisBasicConfig.java b/redis/src/main/java/com/navercorp/pinpoint/redis/RedisBasicConfig.java index a12911b89f895..de89aa1094595 100644 --- a/redis/src/main/java/com/navercorp/pinpoint/redis/RedisBasicConfig.java +++ b/redis/src/main/java/com/navercorp/pinpoint/redis/RedisBasicConfig.java @@ -69,7 +69,7 @@ public class RedisBasicConfig { int lettuceRequestQueueSize; @Bean - RedisTemplate redisStringToStringTemplate(RedisConnectionFactory connectionFactory) { + public RedisTemplate redisStringToStringTemplate(RedisConnectionFactory connectionFactory) { final RedisTemplate template = new RedisTemplate<>(); template.setConnectionFactory(connectionFactory); template.setKeySerializer(RedisSerializer.string()); @@ -78,12 +78,12 @@ RedisTemplate redisStringToStringTemplate(RedisConnectionFactory } @Bean("reactiveRedisTemplate") - ReactiveRedisTemplate reactiveRedisTemplate(ReactiveRedisConnectionFactory connectionFactory) { + public ReactiveRedisTemplate reactiveRedisTemplate(ReactiveRedisConnectionFactory connectionFactory) { return new ReactiveRedisTemplate<>(connectionFactory, RedisSerializationContext.string()); } @Bean - RedisConfiguration redisConfiguration() { + public RedisConfiguration redisConfiguration() { if (CollectionUtils.isEmpty(clusterNodes)) { Assert.hasText(host, "host is required for redis-standalone mode"); @@ -100,7 +100,7 @@ RedisConfiguration redisConfiguration() { } @Bean - LettuceClientConfiguration lettuceClientConfiguration() { + public LettuceClientConfiguration lettuceClientConfiguration() { final ClientResources clientResources = ClientResources.builder() .ioThreadPoolSize(lettuceIOThreadPoolSize) .computationThreadPoolSize(lettuceComputationThreadPoolSize) @@ -130,7 +130,7 @@ LettuceClientConfiguration lettuceClientConfiguration() { } @Bean - LettuceConnectionFactory redisConnectionFactory( + public LettuceConnectionFactory redisConnectionFactory( RedisConfiguration redisConfig, LettuceClientConfiguration clientConfiguration ) {