diff --git a/channel/README.md b/channel/README.md new file mode 100644 index 000000000000..e3da681cf6bf --- /dev/null +++ b/channel/README.md @@ -0,0 +1,85 @@ + +## Purpose + +This module abstracts the channel concept which are usually implemented by Redis, or Kafka, etc. + +Regardless of the underlying implementation, +the channel is a way to send messages from one part of the system to another. +Also, the channel can provide the way to broadcast certain events to the unknown multiple subscribers. + +### Usage + +The channel should be obtained from channel repository, which provides the channel by URI. + +Only the single channel repository should be exist in JVM, +and created with some pairs of `ChannelProvider` and its name. + +```java +ChannelRepository repository = new ChannelRepository(List.of( + ChannelProviderRegistry.of("redis", new RedisChannelProvider("redis")), + ChannelProviderRegistry.of("kafka", new KafkaChannelProvider("kafka")) +)) +``` + +Then, the channel can be obtained by the URI. + +**Hello World Example** + +At instance-subscriber, the process should print the message "Hello, world!" which is published by instance-publisher. + +```java +// Instance-subscriber + +URI uri = URI.create("redis://system-out?param=foo"); +SubChannel subChannel = repository.getSubChannel(uri); +subChannel.subscribe(message -> { + System.out.println(new String(message)); +}); +``` + +```java +// Instance-publisher + +URI uri = URI.create("redis://system-out?param=foo"); +PubChannel pubChannel = repository.getPubChannel(uri); +pubChannel.publish("Hello, world!".getBytes()); +``` + +### Channel Service + +This module also contains the ChannelService implementations which are used to manage instant +demand-supply interactions between the different parts of the system. These are very similar to the conventional +RPC calls, but it is designed to send demand to all the servers which are listening to the service. + +There should be ChannelServiceServers, which supplies the demands in network in prior to the ChannelServiceClient +emitting the demand. Each servers catch all demands from the reserved channel for the service, +and exactly 0 or 1 server should supply the data to the supply channel. + +For communication, all servers and clients must have the `ChannelServiceProtocol` which have the information +about the service, and the demand and supply channels. + +```java +ChannelServiceProtocol protocol = ChannelServiceProtocol.builder() + .setDemandSerde(JacksonSerde.byClass(objectMapper, String.class)) + .setDemandPubChannelURIProvider(demand -> URI.create("redis:char-count:demand")) + .setDemandSubChannelURI(URI.create("redis:char-count:demand")) + .setSupplySerde(JacksonSerde.byClass(objectMapper, Long.class)) + .setSupplyChannelURIProvider(demand -> URI.create("redis:char-count:supply:" + demand.hashCode())) + .setRequestTimeout(Duration.ofSeconds(3)) + .buildMono(); +``` + +With the protocol and the channel repository, the ChannelServiceClient and ChannelServiceServer can be created. + +**Server** + +```java +ChannelServiceServer.buildMono(repository, protocol, demand -> demand.length()).listen(); +``` + +**Client** + +```java +MonoChannelServiceClient client = ChannelServiceClient.buildMono(repository, protocol); +Long result = client.demand("Hello, World!").block(); // 13 +``` diff --git a/channel/src/main/java/com/navercorp/pinpoint/channel/Channel.java b/channel/src/main/java/com/navercorp/pinpoint/channel/Channel.java index c7beae172f46..c47782441d30 100644 --- a/channel/src/main/java/com/navercorp/pinpoint/channel/Channel.java +++ b/channel/src/main/java/com/navercorp/pinpoint/channel/Channel.java @@ -17,6 +17,16 @@ /** * @author youngjin.kim2 + * + * Channel is publishable channel, and subscribable channel at the same time. + * If a pair of PubChannel, and SubChannel are bound in a single channel interface, the two channel + * should be able to communicate with each other. + *
+ * In most cases, A paired PubChannel, and SubChannel are located at the different side of the network, and + * implemented with distributed systems like Redis, Kafka, etc. + * + * @see PubChannel + * @see SubChannel */ public interface Channel extends PubChannel, SubChannel { } diff --git a/channel/src/main/java/com/navercorp/pinpoint/channel/ChannelProvider.java b/channel/src/main/java/com/navercorp/pinpoint/channel/ChannelProvider.java index a18bab4bed92..67015cbf2db6 100644 --- a/channel/src/main/java/com/navercorp/pinpoint/channel/ChannelProvider.java +++ b/channel/src/main/java/com/navercorp/pinpoint/channel/ChannelProvider.java @@ -17,6 +17,12 @@ /** * @author youngjin.kim2 + * + * ChannelProvider provides PubChannel, and SubChannel by the key. + *
+ * In distributed system, A pair of PubChannel, and SubChannel are connected each other if they have the same key. + * In the other word, even if the two processes are located at the different side of the network, they can communicate + * with each other if they have the same key. */ public interface ChannelProvider extends PubChannelProvider, SubChannelProvider { static ChannelProvider pair(PubChannelProvider pub, SubChannelProvider sub) { diff --git a/channel/src/main/java/com/navercorp/pinpoint/channel/ChannelProviderRegistry.java b/channel/src/main/java/com/navercorp/pinpoint/channel/ChannelProviderRegistry.java index 6b3f0ce12962..fa07b72d573e 100644 --- a/channel/src/main/java/com/navercorp/pinpoint/channel/ChannelProviderRegistry.java +++ b/channel/src/main/java/com/navercorp/pinpoint/channel/ChannelProviderRegistry.java @@ -19,6 +19,10 @@ /** * @author youngjin.kim2 + * + * A pair for registration of ChannelProvider. + * + * @see ChannelProviderRepository */ public class ChannelProviderRegistry { diff --git a/channel/src/main/java/com/navercorp/pinpoint/channel/ChannelProviderRepository.java b/channel/src/main/java/com/navercorp/pinpoint/channel/ChannelProviderRepository.java index 997db84070bc..7690a32d4e8c 100644 --- a/channel/src/main/java/com/navercorp/pinpoint/channel/ChannelProviderRepository.java +++ b/channel/src/main/java/com/navercorp/pinpoint/channel/ChannelProviderRepository.java @@ -21,6 +21,19 @@ /** * @author youngjin.kim2 + * + * The channels are provided by the URI key with the formats like below: + *
+ *
+ *     scheme://key
+ *     e.g. redis://hello-world-topic?param1=value1¶m2=value2
+ *     e.g. kafka://hello-world-topic?param1=value1¶m2=value2
+ * 
+ * + * If a pair of PubChannel, and SubChannel are obtained by the same key, they can communicate with each other. + * + * @see ChannelProvider + * @see Channel */ public class ChannelProviderRepository { diff --git a/channel/src/main/java/com/navercorp/pinpoint/channel/PubChannel.java b/channel/src/main/java/com/navercorp/pinpoint/channel/PubChannel.java index 8508db9b894d..7ea71931fa54 100644 --- a/channel/src/main/java/com/navercorp/pinpoint/channel/PubChannel.java +++ b/channel/src/main/java/com/navercorp/pinpoint/channel/PubChannel.java @@ -17,6 +17,11 @@ /** * @author youngjin.kim2 + * + * PubChannel publishes the byte array parameter into the connected SubChannel. + * + * @see Channel + * @see SubChannel */ public interface PubChannel { diff --git a/channel/src/main/java/com/navercorp/pinpoint/channel/PubChannelProvider.java b/channel/src/main/java/com/navercorp/pinpoint/channel/PubChannelProvider.java index cf932b9f3cea..881c7bf702c3 100644 --- a/channel/src/main/java/com/navercorp/pinpoint/channel/PubChannelProvider.java +++ b/channel/src/main/java/com/navercorp/pinpoint/channel/PubChannelProvider.java @@ -17,6 +17,10 @@ /** * @author youngjin.kim2 + * + * Provides the PubChannel by the key. + * + * @see ChannelProvider */ public interface PubChannelProvider { diff --git a/channel/src/main/java/com/navercorp/pinpoint/channel/SubChannel.java b/channel/src/main/java/com/navercorp/pinpoint/channel/SubChannel.java index 7059315cf9bf..ac8a64445b1b 100644 --- a/channel/src/main/java/com/navercorp/pinpoint/channel/SubChannel.java +++ b/channel/src/main/java/com/navercorp/pinpoint/channel/SubChannel.java @@ -17,6 +17,11 @@ /** * @author youngjin.kim2 + * + * SubChannel registers, or de-register the handler for the incoming byte array from PubChannel. + * + * @see Channel + * @see PubChannel */ public interface SubChannel { diff --git a/channel/src/main/java/com/navercorp/pinpoint/channel/SubChannelProvider.java b/channel/src/main/java/com/navercorp/pinpoint/channel/SubChannelProvider.java index 8f91a39a047c..57035d458ee7 100644 --- a/channel/src/main/java/com/navercorp/pinpoint/channel/SubChannelProvider.java +++ b/channel/src/main/java/com/navercorp/pinpoint/channel/SubChannelProvider.java @@ -17,6 +17,10 @@ /** * @author youngjin.kim2 + * + * Provide the SubChannel by the key. + * + * @see ChannelProvider */ public interface SubChannelProvider { diff --git a/channel/src/main/java/com/navercorp/pinpoint/channel/SubConsumer.java b/channel/src/main/java/com/navercorp/pinpoint/channel/SubConsumer.java index a937636fc875..edf15359dd01 100644 --- a/channel/src/main/java/com/navercorp/pinpoint/channel/SubConsumer.java +++ b/channel/src/main/java/com/navercorp/pinpoint/channel/SubConsumer.java @@ -17,6 +17,10 @@ /** * @author youngjin.kim2 + * + * Handler object for the byte array from SubChannel + * + * @see SubChannel */ public interface SubConsumer { diff --git a/channel/src/main/java/com/navercorp/pinpoint/channel/legacy/DemandMessage.java b/channel/src/main/java/com/navercorp/pinpoint/channel/legacy/DemandMessage.java deleted file mode 100644 index 6669b89d3437..000000000000 --- a/channel/src/main/java/com/navercorp/pinpoint/channel/legacy/DemandMessage.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Copyright 2023 NAVER Corp. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.navercorp.pinpoint.channel.legacy; - -/** - * @author youngjin.kim2 - */ -public class DemandMessage { - - private Identifier id; - private D content; - - public Identifier getId() { - return id; - } - - public void setId(Identifier id) { - this.id = id; - } - - public D getContent() { - return content; - } - - public void setContent(D content) { - this.content = content; - } - -} diff --git a/channel/src/main/java/com/navercorp/pinpoint/channel/legacy/Identifier.java b/channel/src/main/java/com/navercorp/pinpoint/channel/legacy/Identifier.java deleted file mode 100644 index 0e8651aa7c44..000000000000 --- a/channel/src/main/java/com/navercorp/pinpoint/channel/legacy/Identifier.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Copyright 2023 NAVER Corp. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.navercorp.pinpoint.channel.legacy; - -/** - * @author youngjin.kim2 - */ -public class Identifier { - - private long value; - - public long getValue() { - return value; - } - - public void setValue(long value) { - this.value = value; - } - -} diff --git a/channel/src/main/java/com/navercorp/pinpoint/channel/legacy/LegacyFluxBackendAdaptor.java b/channel/src/main/java/com/navercorp/pinpoint/channel/legacy/LegacyFluxBackendAdaptor.java deleted file mode 100644 index 23fa2a281f6d..000000000000 --- a/channel/src/main/java/com/navercorp/pinpoint/channel/legacy/LegacyFluxBackendAdaptor.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Copyright 2023 NAVER Corp. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.navercorp.pinpoint.channel.legacy; - -import com.navercorp.pinpoint.channel.service.server.ChannelServiceFluxBackend; -import jakarta.annotation.Nullable; -import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; - -import java.util.Objects; -import java.util.concurrent.atomic.AtomicInteger; - -/** - * @author youngjin.kim2 - */ -public class LegacyFluxBackendAdaptor implements ChannelServiceFluxBackend, SupplyMessage> { - - private final ChannelServiceFluxBackend delegate; - - public LegacyFluxBackendAdaptor(ChannelServiceFluxBackend delegate) { - this.delegate = Objects.requireNonNull(delegate, "delegate"); - } - - @Nullable - @Override - public Flux> demand(DemandMessage demand) { - Identifier id = demand.getId(); - Flux supply = this.delegate.demand(demand.getContent()); - if (supply == null) { - return null; - } - AtomicInteger seqGen = new AtomicInteger(0); - return supply.map(el -> wrapSupplyMessage(el, id, seqGen.getAndIncrement(), false)) - .concatWith(Mono.defer(() -> Mono.just(wrapSupplyMessage(null, id, seqGen.getAndIncrement(), true)))); - } - - private static SupplyMessage wrapSupplyMessage(T content, Identifier id, int seq, boolean terminated) { - SupplyMessage msg = new SupplyMessage<>(); - msg.setContent(content); - msg.setTerminated(terminated); - msg.setDemandId(id); - msg.setSequence(seq); - return msg; - } - -} diff --git a/channel/src/main/java/com/navercorp/pinpoint/channel/legacy/LegacyFluxClientAdaptor.java b/channel/src/main/java/com/navercorp/pinpoint/channel/legacy/LegacyFluxClientAdaptor.java deleted file mode 100644 index 6ad6a3b7ccef..000000000000 --- a/channel/src/main/java/com/navercorp/pinpoint/channel/legacy/LegacyFluxClientAdaptor.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Copyright 2023 NAVER Corp. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.navercorp.pinpoint.channel.legacy; - -import com.navercorp.pinpoint.channel.service.client.FluxChannelServiceClient; -import reactor.core.publisher.Flux; - -import java.util.Objects; -import java.util.function.Function; - -/** - * @author youngjin.kim2 - */ -public class LegacyFluxClientAdaptor implements FluxChannelServiceClient { - - private final FluxChannelServiceClient, SupplyMessage> delegate; - private final Function idFn; - - public LegacyFluxClientAdaptor( - FluxChannelServiceClient, SupplyMessage> delegate, - Function idFn - ) { - this.delegate = Objects.requireNonNull(delegate, "delegate"); - this.idFn = Objects.requireNonNull(idFn, "idFn"); - } - - @Override - public Flux request(D demand) { - return this.delegate.request(wrapDemand(demand)).mapNotNull(SupplyMessage::getContent); - } - - private DemandMessage wrapDemand(D demand) { - Identifier id = new Identifier(); - id.setValue(this.idFn.apply(demand)); - - DemandMessage msg = new DemandMessage<>(); - msg.setId(id); - msg.setContent(demand); - return msg; - } - -} diff --git a/channel/src/main/java/com/navercorp/pinpoint/channel/legacy/LegacyMonoBackendAdaptor.java b/channel/src/main/java/com/navercorp/pinpoint/channel/legacy/LegacyMonoBackendAdaptor.java deleted file mode 100644 index 12e7b506dc48..000000000000 --- a/channel/src/main/java/com/navercorp/pinpoint/channel/legacy/LegacyMonoBackendAdaptor.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Copyright 2023 NAVER Corp. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.navercorp.pinpoint.channel.legacy; - -import com.navercorp.pinpoint.channel.service.server.ChannelServiceMonoBackend; -import jakarta.annotation.Nullable; -import reactor.core.publisher.Mono; - -import java.util.Objects; - -/** - * @author youngjin.kim2 - */ -public class LegacyMonoBackendAdaptor implements ChannelServiceMonoBackend, SupplyMessage> { - - private final ChannelServiceMonoBackend delegate; - - public LegacyMonoBackendAdaptor(ChannelServiceMonoBackend delegate) { - this.delegate = Objects.requireNonNull(delegate, "delegate"); - } - - @Nullable - @Override - public Mono> demand(DemandMessage demand) { - Identifier id = demand.getId(); - Mono supply = this.delegate.demand(demand.getContent()); - if (supply == null) { - return null; - } - return supply.map(el -> wrapSupplyMessage(el, id)); - } - - private static SupplyMessage wrapSupplyMessage(T content, Identifier id) { - SupplyMessage msg = new SupplyMessage<>(); - msg.setContent(content); - msg.setTerminated(false); - msg.setDemandId(id); - msg.setSequence(0); - return msg; - } - -} diff --git a/channel/src/main/java/com/navercorp/pinpoint/channel/legacy/LegacyMonoClientAdaptor.java b/channel/src/main/java/com/navercorp/pinpoint/channel/legacy/LegacyMonoClientAdaptor.java deleted file mode 100644 index e41b4a8b041a..000000000000 --- a/channel/src/main/java/com/navercorp/pinpoint/channel/legacy/LegacyMonoClientAdaptor.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Copyright 2023 NAVER Corp. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.navercorp.pinpoint.channel.legacy; - -import com.navercorp.pinpoint.channel.service.client.MonoChannelServiceClient; -import reactor.core.publisher.Mono; - -import java.util.Objects; -import java.util.function.Function; - -/** - * @author youngjin.kim2 - */ -public class LegacyMonoClientAdaptor implements MonoChannelServiceClient { - - private final MonoChannelServiceClient, SupplyMessage> delegate; - private final Function idFn; - - public LegacyMonoClientAdaptor( - MonoChannelServiceClient, SupplyMessage> delegate, - Function idFn - ) { - this.delegate = Objects.requireNonNull(delegate, "delegate"); - this.idFn = Objects.requireNonNull(idFn, "idFn"); - } - - @Override - public Mono request(D demand) { - return this.delegate.request(wrapDemand(demand)).mapNotNull(SupplyMessage::getContent); - } - - private DemandMessage wrapDemand(D demand) { - Identifier id = new Identifier(); - id.setValue(this.idFn.apply(demand)); - - DemandMessage msg = new DemandMessage<>(); - msg.setId(id); - msg.setContent(demand); - return msg; - } - -} diff --git a/channel/src/main/java/com/navercorp/pinpoint/channel/legacy/SupplyMessage.java b/channel/src/main/java/com/navercorp/pinpoint/channel/legacy/SupplyMessage.java deleted file mode 100644 index afa6f7f72880..000000000000 --- a/channel/src/main/java/com/navercorp/pinpoint/channel/legacy/SupplyMessage.java +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Copyright 2023 NAVER Corp. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.navercorp.pinpoint.channel.legacy; - -/** - * @author youngjin.kim2 - */ -public class SupplyMessage { - - private Identifier demandId; - private int sequence; - private S content; - private boolean terminated; - - public Identifier getDemandId() { - return demandId; - } - - public void setDemandId(Identifier demandId) { - this.demandId = demandId; - } - - public int getSequence() { - return sequence; - } - - public void setSequence(int sequence) { - this.sequence = sequence; - } - - public S getContent() { - return content; - } - - public void setContent(S content) { - this.content = content; - } - - public boolean isTerminated() { - return terminated; - } - - public void setTerminated(boolean terminated) { - this.terminated = terminated; - } - -} diff --git a/channel/src/main/java/com/navercorp/pinpoint/channel/reactor/DeferredDisposable.java b/channel/src/main/java/com/navercorp/pinpoint/channel/reactor/DeferredDisposable.java index c391cf298977..04edb878efde 100644 --- a/channel/src/main/java/com/navercorp/pinpoint/channel/reactor/DeferredDisposable.java +++ b/channel/src/main/java/com/navercorp/pinpoint/channel/reactor/DeferredDisposable.java @@ -21,6 +21,11 @@ /** * @author youngjin.kim2 + * + * Ensure delegated disposable is run at least once. + *
+ * Even if the delegated disposable is set after the DeferredDisposable is disposed, + * the delegated disposable is executed. */ public class DeferredDisposable implements Disposable { diff --git a/channel/src/main/java/com/navercorp/pinpoint/channel/service/ChannelServiceProtocol.java b/channel/src/main/java/com/navercorp/pinpoint/channel/service/ChannelServiceProtocol.java index 3fbd524cd288..eb91516667af 100644 --- a/channel/src/main/java/com/navercorp/pinpoint/channel/service/ChannelServiceProtocol.java +++ b/channel/src/main/java/com/navercorp/pinpoint/channel/service/ChannelServiceProtocol.java @@ -20,6 +20,18 @@ /** * @author youngjin.kim2 + * + * This protocol should contain the information which must be shared between server and client + * for communication through channel. + *
+ * At the channel service, there always be a demand from client, and supplies from server. The demand are + * carried through channel, and listened by server. The supplies are sent through channel, and received by client. + *
+ * By the number of supplies, the channel service can be classified into two types: Mono and Flux. + * Mono is a channel service which sends only one supply, and Flux is a channel service which sends multiple supplies. + * + * @see ChannelServiceServerProtocol + * @see ChannelServiceClientProtocol */ public interface ChannelServiceProtocol extends ChannelServiceServerProtocol, ChannelServiceClientProtocol { diff --git a/channel/src/main/java/com/navercorp/pinpoint/channel/service/client/ChannelServiceClientProtocol.java b/channel/src/main/java/com/navercorp/pinpoint/channel/service/client/ChannelServiceClientProtocol.java index edfb28bd7976..1aca43a3fc9a 100644 --- a/channel/src/main/java/com/navercorp/pinpoint/channel/service/client/ChannelServiceClientProtocol.java +++ b/channel/src/main/java/com/navercorp/pinpoint/channel/service/client/ChannelServiceClientProtocol.java @@ -19,6 +19,14 @@ /** * @author youngjin.kim2 + * + * Client-side protocol for the channel service. It must provide following information. + *
+ * 1. The way to serialize and deserialize the demand and supply objects. + * 2. Which channel to send the demand. + * 3. Which channel to listen for the supply. + * + * @see com.navercorp.pinpoint.channel.service.ChannelServiceProtocol */ public interface ChannelServiceClientProtocol { diff --git a/channel/src/main/java/com/navercorp/pinpoint/channel/service/server/ChannelServiceServerProtocol.java b/channel/src/main/java/com/navercorp/pinpoint/channel/service/server/ChannelServiceServerProtocol.java index a6c4b296a454..831004e0d203 100644 --- a/channel/src/main/java/com/navercorp/pinpoint/channel/service/server/ChannelServiceServerProtocol.java +++ b/channel/src/main/java/com/navercorp/pinpoint/channel/service/server/ChannelServiceServerProtocol.java @@ -19,6 +19,14 @@ /** * @author youngjin.kim2 + * + * Server-side protocol for the channel service. It must provide following information. + *
+ * 1. The way to serialize and deserialize the demand and supply objects. + * 2. Which channel to listen for the demand. + * 3. Which channel to send the supply. + * + * @see com.navercorp.pinpoint.channel.service.ChannelServiceProtocol */ public interface ChannelServiceServerProtocol { diff --git a/channel/src/test/java/com/navercorp/pinpoint/channel/legacy/LegacyAdaptorTest.java b/channel/src/test/java/com/navercorp/pinpoint/channel/legacy/LegacyAdaptorTest.java deleted file mode 100644 index 7898edab95b4..000000000000 --- a/channel/src/test/java/com/navercorp/pinpoint/channel/legacy/LegacyAdaptorTest.java +++ /dev/null @@ -1,111 +0,0 @@ -/* - * Copyright 2023 NAVER Corp. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.navercorp.pinpoint.channel.legacy; - -import com.fasterxml.jackson.databind.ObjectMapper; -import com.navercorp.pinpoint.channel.ChannelProviderRegistry; -import com.navercorp.pinpoint.channel.ChannelProviderRepository; -import com.navercorp.pinpoint.channel.MemoryChannelProvider; -import com.navercorp.pinpoint.channel.serde.JacksonSerde; -import com.navercorp.pinpoint.channel.service.ChannelServiceProtocol; -import com.navercorp.pinpoint.channel.service.FluxChannelServiceProtocol; -import com.navercorp.pinpoint.channel.service.MonoChannelServiceProtocol; -import com.navercorp.pinpoint.channel.service.client.ChannelServiceClient; -import com.navercorp.pinpoint.channel.service.client.ChannelState; -import com.navercorp.pinpoint.channel.service.server.ChannelServiceServer; -import org.junit.jupiter.api.DisplayName; -import org.junit.jupiter.api.Test; -import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; -import reactor.core.scheduler.Schedulers; - -import java.net.URI; -import java.time.Duration; -import java.util.List; - -import static org.assertj.core.api.Assertions.assertThat; - -/** - * @author youngjin.kim2 - */ -public class LegacyAdaptorTest { - - @Test - @DisplayName("basic flux service scenario") - public void testBasicFluxScenario() { - ChannelProviderRegistry memChannel = ChannelProviderRegistry.of("mem", new MemoryChannelProvider()); - ChannelProviderRepository channelRepo = new ChannelProviderRepository(List.of(memChannel)); - - FluxChannelServiceProtocol, SupplyMessage> protocol = defineFluxProtocol(); - - ChannelServiceServer.buildFlux( - channelRepo, - protocol, - new LegacyFluxBackendAdaptor<>(d -> Flux.fromArray(d.split(","))) - ).listen(); - - List response = new LegacyFluxClientAdaptor<>( - ChannelServiceClient.buildFlux(channelRepo, protocol, Schedulers.immediate()), - el -> 0L - ).request("Foo,Bar,Hello,World,END").collectList().block(Duration.ofSeconds(1)); - assertThat(response).hasSameElementsAs(List.of("Foo", "Bar", "Hello", "World")); - } - - private static FluxChannelServiceProtocol, SupplyMessage> defineFluxProtocol() { - ObjectMapper objectMapper = new ObjectMapper(); - return ChannelServiceProtocol., SupplyMessage>builder() - .setDemandSerde(JacksonSerde.byParameterized(objectMapper, DemandMessage.class, String.class)) - .setDemandPubChannelURIProvider(d -> URI.create("mem:split:demand")) - .setDemandSubChannelURI(URI.create("mem:split:demand")) - .setSupplySerde(JacksonSerde.byParameterized(objectMapper, SupplyMessage.class, String.class)) - .setSupplyChannelURIProvider(d -> URI.create("mem:split:supply")) - .setChannelStateFn( - supply -> supply.getContent().equals("END") ? ChannelState.TERMINATED : ChannelState.ALIVE) - .buildFlux(); - } - - @Test - @DisplayName("basic mono service scenario") - public void testBasicMonoScenario() { - ChannelProviderRegistry memChannel = ChannelProviderRegistry.of("mem", new MemoryChannelProvider()); - ChannelProviderRepository channelRepo = new ChannelProviderRepository(List.of(memChannel)); - - MonoChannelServiceProtocol, SupplyMessage> protocol = defineMonoProtocol(); - - ChannelServiceServer.buildMono( - channelRepo, - protocol, - new LegacyMonoBackendAdaptor<>(d -> Mono.just("Hello, " + d + "!")) - ).listen(); - - String response = new LegacyMonoClientAdaptor<>(ChannelServiceClient.buildMono(channelRepo, protocol), el -> 0L) - .request("Channel").block(Duration.ofSeconds(1)); - assertThat(response).isEqualTo("Hello, Channel!"); - } - - private static MonoChannelServiceProtocol, SupplyMessage> defineMonoProtocol() { - ObjectMapper objectMapper = new ObjectMapper(); - return ChannelServiceProtocol., SupplyMessage>builder() - .setDemandSerde(JacksonSerde.byParameterized(objectMapper, DemandMessage.class, String.class)) - .setDemandPubChannelURIProvider(d -> URI.create("mem:hello:demand")) - .setDemandSubChannelURI(URI.create("mem:hello:demand")) - .setSupplySerde(JacksonSerde.byParameterized(objectMapper, SupplyMessage.class, String.class)) - .setSupplyChannelURIProvider(d -> URI.create("mem:hello:supply")) - .setRequestTimeout(Duration.ofSeconds(1)) - .buildMono(); - } - -} diff --git a/realtime/realtime-collector/src/main/java/com/navercorp/pinpoint/realtime/collector/RealtimeCollectorServerConfig.java b/realtime/realtime-collector/src/main/java/com/navercorp/pinpoint/realtime/collector/RealtimeCollectorServerConfig.java index f56268ba491c..2d357fb56787 100644 --- a/realtime/realtime-collector/src/main/java/com/navercorp/pinpoint/realtime/collector/RealtimeCollectorServerConfig.java +++ b/realtime/realtime-collector/src/main/java/com/navercorp/pinpoint/realtime/collector/RealtimeCollectorServerConfig.java @@ -16,10 +16,6 @@ package com.navercorp.pinpoint.realtime.collector; import com.navercorp.pinpoint.channel.ChannelProviderRepository; -import com.navercorp.pinpoint.channel.legacy.DemandMessage; -import com.navercorp.pinpoint.channel.legacy.LegacyFluxBackendAdaptor; -import com.navercorp.pinpoint.channel.legacy.LegacyMonoBackendAdaptor; -import com.navercorp.pinpoint.channel.legacy.SupplyMessage; import com.navercorp.pinpoint.channel.service.FluxChannelServiceProtocol; import com.navercorp.pinpoint.channel.service.MonoChannelServiceProtocol; import com.navercorp.pinpoint.channel.service.server.ChannelServiceServer; @@ -60,19 +56,6 @@ public RealtimeCollectorServerConfig() { logger.info("RealtimeCollectorServerConfig initialized"); } - @Bean - public ChannelServiceServer legacyATCServer( - ChannelProviderRepository channelProviderRepository, - FluxChannelServiceProtocol, SupplyMessage> protocol, - ActiveThreadCountService service - ) { - return ChannelServiceServer.buildFlux( - channelProviderRepository, - protocol, - new LegacyFluxBackendAdaptor<>(service::requestAsync) - ); - } - @Bean public ChannelServiceServer ATCServer( ChannelProviderRepository channelProviderRepository, @@ -86,19 +69,6 @@ public ChannelServiceServer ATCServer( ); } - @Bean - public ChannelServiceServer legacyATDServer( - ChannelProviderRepository channelProviderRepository, - MonoChannelServiceProtocol, SupplyMessage> protocol, - ActiveThreadDumpService service - ) { - return ChannelServiceServer.buildMono( - channelProviderRepository, - protocol, - new LegacyMonoBackendAdaptor<>(service::getDump) - ); - } - @Bean public ChannelServiceServer ATDServer( ChannelProviderRepository channelProviderRepository, @@ -112,19 +82,6 @@ public ChannelServiceServer ATDServer( ); } - @Bean - public ChannelServiceServer legacyEchoServer( - ChannelProviderRepository channelProviderRepository, - MonoChannelServiceProtocol, SupplyMessage> protocol, - EchoService service - ) { - return ChannelServiceServer.buildMono( - channelProviderRepository, - protocol, - new LegacyMonoBackendAdaptor<>(service::echo) - ); - } - @Bean public ChannelServiceServer echoServer( ChannelProviderRepository channelProviderRepository, diff --git a/realtime/realtime-common/src/main/java/com/navercorp/pinpoint/realtime/config/ATCServiceProtocolConfig.java b/realtime/realtime-common/src/main/java/com/navercorp/pinpoint/realtime/config/ATCServiceProtocolConfig.java index a89d38c9d67c..62fb89f7303e 100644 --- a/realtime/realtime-common/src/main/java/com/navercorp/pinpoint/realtime/config/ATCServiceProtocolConfig.java +++ b/realtime/realtime-common/src/main/java/com/navercorp/pinpoint/realtime/config/ATCServiceProtocolConfig.java @@ -17,8 +17,6 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.navercorp.pinpoint.channel.ChannelSpringConfig; -import com.navercorp.pinpoint.channel.legacy.DemandMessage; -import com.navercorp.pinpoint.channel.legacy.SupplyMessage; import com.navercorp.pinpoint.channel.redis.pubsub.RedisPubSubConfig; import com.navercorp.pinpoint.channel.redis.pubsub.RedisPubSubConstants; import com.navercorp.pinpoint.channel.serde.JacksonSerde; @@ -41,24 +39,6 @@ @Import({ RedisPubSubConfig.class, ChannelSpringConfig.class }) public class ATCServiceProtocolConfig { - @Bean - FluxChannelServiceProtocol, SupplyMessage> atcLegacyProtocol( - ObjectMapper objectMapper - ) { - return ChannelServiceProtocol., SupplyMessage>builder() - .setDemandSerde(JacksonSerde.byParameterized(objectMapper, DemandMessage.class, ATCDemand.class)) - .setDemandPubChannelURIProvider( - demand -> URI.create(RedisPubSubConstants.SCHEME + ":demand:atc:" + demand.getId().getValue())) - .setDemandSubChannelURI(URI.create(RedisPubSubConstants.SCHEME + ":demand:atc:*")) - .setSupplySerde(JacksonSerde.byParameterized(objectMapper, SupplyMessage.class, ATCSupply.class)) - .setSupplyChannelURIProvider( - demand -> URI.create(RedisPubSubConstants.SCHEME + ":supply:atc:" + demand.getId().getValue())) - .setDemandInterval(Duration.ofSeconds(10)) - .setBufferSize(4) - .setChannelStateFn(supply -> ChannelState.ALIVE) - .buildFlux(); - } - @Bean FluxChannelServiceProtocol atcProtocol(ObjectMapper objectMapper) { return ChannelServiceProtocol.builder() diff --git a/realtime/realtime-common/src/main/java/com/navercorp/pinpoint/realtime/config/ATDServiceProtocolConfig.java b/realtime/realtime-common/src/main/java/com/navercorp/pinpoint/realtime/config/ATDServiceProtocolConfig.java index 319538860c42..316f2615be68 100644 --- a/realtime/realtime-common/src/main/java/com/navercorp/pinpoint/realtime/config/ATDServiceProtocolConfig.java +++ b/realtime/realtime-common/src/main/java/com/navercorp/pinpoint/realtime/config/ATDServiceProtocolConfig.java @@ -17,8 +17,6 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.module.SimpleModule; -import com.navercorp.pinpoint.channel.legacy.DemandMessage; -import com.navercorp.pinpoint.channel.legacy.SupplyMessage; import com.navercorp.pinpoint.channel.redis.pubsub.RedisPubSubConfig; import com.navercorp.pinpoint.channel.redis.pubsub.RedisPubSubConstants; import com.navercorp.pinpoint.channel.serde.JacksonSerde; @@ -42,25 +40,6 @@ @Import(RedisPubSubConfig.class) public class ATDServiceProtocolConfig { - @Bean - MonoChannelServiceProtocol, SupplyMessage> atdLegacyProtocol( - ObjectMapper objectMapper, - @Qualifier("clusterKeyDeserializer") SimpleModule clusterKeyJacksonModule - ) { - objectMapper.registerModule(clusterKeyJacksonModule); - return ChannelServiceProtocol., SupplyMessage>builder() - .setDemandSerde(JacksonSerde.byParameterized(objectMapper, DemandMessage.class, ATDDemand.class)) - .setDemandPubChannelURIProvider( - demand -> URI.create(RedisPubSubConstants.SCHEME + ":demand:atd:" + demand.getId().getValue())) - .setDemandSubChannelURI(URI.create(RedisPubSubConstants.SCHEME + ":demand:atd:*")) - .setSupplySerde(JacksonSerde.byParameterized(objectMapper, SupplyMessage.class, ATDSupply.class)) - .setSupplyChannelURIProvider( - demand -> URI.create(RedisPubSubConstants.SCHEME + ":supply:atd:" + demand.getId().getValue())) - .setRequestTimeout(Duration.ofSeconds(3)) - .buildMono(); - - } - @Bean MonoChannelServiceProtocol atdProtocol( ObjectMapper objectMapper, diff --git a/realtime/realtime-common/src/main/java/com/navercorp/pinpoint/realtime/config/EchoServiceProtocolConfig.java b/realtime/realtime-common/src/main/java/com/navercorp/pinpoint/realtime/config/EchoServiceProtocolConfig.java index 610a997e777d..e5bd529fc3b4 100644 --- a/realtime/realtime-common/src/main/java/com/navercorp/pinpoint/realtime/config/EchoServiceProtocolConfig.java +++ b/realtime/realtime-common/src/main/java/com/navercorp/pinpoint/realtime/config/EchoServiceProtocolConfig.java @@ -17,8 +17,6 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.module.SimpleModule; -import com.navercorp.pinpoint.channel.legacy.DemandMessage; -import com.navercorp.pinpoint.channel.legacy.SupplyMessage; import com.navercorp.pinpoint.channel.redis.pubsub.RedisPubSubConfig; import com.navercorp.pinpoint.channel.redis.pubsub.RedisPubSubConstants; import com.navercorp.pinpoint.channel.serde.JacksonSerde; @@ -49,24 +47,6 @@ SimpleModule clusterKeyDeserializer() { return jacksonModule; } - @Bean - MonoChannelServiceProtocol, SupplyMessage> echoLegacyProtocol( - ObjectMapper objectMapper, - @Qualifier("clusterKeyDeserializer") SimpleModule clusterKeyJacksonModule - ) { - objectMapper.registerModule(clusterKeyJacksonModule); - return ChannelServiceProtocol., SupplyMessage>builder() - .setDemandSerde(JacksonSerde.byParameterized(objectMapper, DemandMessage.class, Echo.class)) - .setDemandPubChannelURIProvider( - demand -> URI.create(RedisPubSubConstants.SCHEME + ":demand:echo:" + demand.getId().getValue())) - .setDemandSubChannelURI(URI.create(RedisPubSubConstants.SCHEME + ":demand:echo:*")) - .setSupplySerde(JacksonSerde.byParameterized(objectMapper, SupplyMessage.class, Echo.class)) - .setSupplyChannelURIProvider( - demand -> URI.create(RedisPubSubConstants.SCHEME + ":supply:echo:" + demand.getId().getValue())) - .setRequestTimeout(Duration.ofSeconds(3)) - .buildMono(); - } - @Bean MonoChannelServiceProtocol echoProtocol( ObjectMapper objectMapper, diff --git a/realtime/realtime-web/src/main/java/com/navercorp/pinpoint/web/realtime/activethread/count/dao/ActiveThreadCountWebDaoConfig.java b/realtime/realtime-web/src/main/java/com/navercorp/pinpoint/web/realtime/activethread/count/dao/ActiveThreadCountWebDaoConfig.java index 3cd71a6fbab2..46cbb13ca1a5 100644 --- a/realtime/realtime-web/src/main/java/com/navercorp/pinpoint/web/realtime/activethread/count/dao/ActiveThreadCountWebDaoConfig.java +++ b/realtime/realtime-web/src/main/java/com/navercorp/pinpoint/web/realtime/activethread/count/dao/ActiveThreadCountWebDaoConfig.java @@ -16,9 +16,6 @@ package com.navercorp.pinpoint.web.realtime.activethread.count.dao; import com.navercorp.pinpoint.channel.ChannelProviderRepository; -import com.navercorp.pinpoint.channel.legacy.DemandMessage; -import com.navercorp.pinpoint.channel.legacy.LegacyFluxClientAdaptor; -import com.navercorp.pinpoint.channel.legacy.SupplyMessage; import com.navercorp.pinpoint.channel.service.FluxChannelServiceProtocol; import com.navercorp.pinpoint.channel.service.client.ChannelServiceClient; import com.navercorp.pinpoint.channel.service.client.FluxChannelServiceClient; @@ -29,7 +26,6 @@ import com.navercorp.pinpoint.redis.value.RedisIncrementer; import com.navercorp.pinpoint.web.realtime.RealtimeWebCommonConfig; import org.springframework.beans.factory.annotation.Qualifier; -import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Import; @@ -44,20 +40,6 @@ public class ActiveThreadCountWebDaoConfig { @Bean - @ConditionalOnProperty(name = "pinpoint.modules.realtime.atc.version", havingValue = "v1", matchIfMissing = true) - FluxChannelServiceClient atcClientV1( - ChannelProviderRepository channelProviderRepository, - FluxChannelServiceProtocol, SupplyMessage> protocol - ) { - return new LegacyFluxClientAdaptor<>(ChannelServiceClient.buildFlux( - channelProviderRepository, - protocol, - Schedulers.newParallel("atc", Runtime.getRuntime().availableProcessors()) - ), d -> d.getId()); - } - - @Bean - @ConditionalOnProperty(name = "pinpoint.modules.realtime.atc.version", havingValue = "v2") FluxChannelServiceClient atcClientV2( ChannelProviderRepository channelProviderRepository, FluxChannelServiceProtocol protocol diff --git a/realtime/realtime-web/src/main/java/com/navercorp/pinpoint/web/realtime/activethread/dump/WebActiveThreadDumpConfig.java b/realtime/realtime-web/src/main/java/com/navercorp/pinpoint/web/realtime/activethread/dump/WebActiveThreadDumpConfig.java index 9e7394052457..c8196306c5de 100644 --- a/realtime/realtime-web/src/main/java/com/navercorp/pinpoint/web/realtime/activethread/dump/WebActiveThreadDumpConfig.java +++ b/realtime/realtime-web/src/main/java/com/navercorp/pinpoint/web/realtime/activethread/dump/WebActiveThreadDumpConfig.java @@ -16,9 +16,6 @@ package com.navercorp.pinpoint.web.realtime.activethread.dump; import com.navercorp.pinpoint.channel.ChannelProviderRepository; -import com.navercorp.pinpoint.channel.legacy.DemandMessage; -import com.navercorp.pinpoint.channel.legacy.LegacyMonoClientAdaptor; -import com.navercorp.pinpoint.channel.legacy.SupplyMessage; import com.navercorp.pinpoint.channel.service.MonoChannelServiceProtocol; import com.navercorp.pinpoint.channel.service.client.ChannelServiceClient; import com.navercorp.pinpoint.channel.service.client.MonoChannelServiceClient; @@ -44,12 +41,9 @@ public class WebActiveThreadDumpConfig { @Bean MonoChannelServiceClient atdClient( ChannelProviderRepository channelProviderRepository, - MonoChannelServiceProtocol, SupplyMessage> protocol + MonoChannelServiceProtocol protocol ) { - return new LegacyMonoClientAdaptor<>( - ChannelServiceClient.buildMono(channelProviderRepository, protocol), - d -> d.getId() - ); + return ChannelServiceClient.buildMono(channelProviderRepository, protocol); } @Bean diff --git a/realtime/realtime-web/src/main/java/com/navercorp/pinpoint/web/realtime/echo/WebEchoConfig.java b/realtime/realtime-web/src/main/java/com/navercorp/pinpoint/web/realtime/echo/WebEchoConfig.java index a3a78b23a381..94d9e69b8d15 100644 --- a/realtime/realtime-web/src/main/java/com/navercorp/pinpoint/web/realtime/echo/WebEchoConfig.java +++ b/realtime/realtime-web/src/main/java/com/navercorp/pinpoint/web/realtime/echo/WebEchoConfig.java @@ -16,9 +16,6 @@ package com.navercorp.pinpoint.web.realtime.echo; import com.navercorp.pinpoint.channel.ChannelProviderRepository; -import com.navercorp.pinpoint.channel.legacy.DemandMessage; -import com.navercorp.pinpoint.channel.legacy.LegacyMonoClientAdaptor; -import com.navercorp.pinpoint.channel.legacy.SupplyMessage; import com.navercorp.pinpoint.channel.service.MonoChannelServiceProtocol; import com.navercorp.pinpoint.channel.service.client.ChannelServiceClient; import com.navercorp.pinpoint.channel.service.client.MonoChannelServiceClient; @@ -43,12 +40,9 @@ public class WebEchoConfig { @Bean MonoChannelServiceClient echoClient( ChannelProviderRepository channelProviderRepository, - MonoChannelServiceProtocol, SupplyMessage> protocol + MonoChannelServiceProtocol protocol ) { - return new LegacyMonoClientAdaptor<>( - ChannelServiceClient.buildMono(channelProviderRepository, protocol), - d -> d.getId() - ); + return ChannelServiceClient.buildMono(channelProviderRepository, protocol); } @Bean diff --git a/realtime/realtime-web/src/main/resources/pinpoint-web-realtime.properties b/realtime/realtime-web/src/main/resources/pinpoint-web-realtime.properties index 0543c0189208..3a3ee6bd0043 100644 --- a/realtime/realtime-web/src/main/resources/pinpoint-web-realtime.properties +++ b/realtime/realtime-web/src/main/resources/pinpoint-web-realtime.properties @@ -1,4 +1,4 @@ pinpoint.web.realtime.atc.periods.emit=PT1S pinpoint.web.realtime.atc.periods.refresh=PT10S pinpoint.web.realtime.atc.periods.update=PT30S -pinpoint.web.realtime.agent-recentness=PT5S +pinpoint.web.realtime.agent-recentness=PT5M diff --git a/web/src/main/java/com/navercorp/pinpoint/web/realtime/AgentLookupServiceImpl.java b/web/src/main/java/com/navercorp/pinpoint/web/realtime/AgentLookupServiceImpl.java index 7987b43a7374..493bf1cfc0b2 100644 --- a/web/src/main/java/com/navercorp/pinpoint/web/realtime/AgentLookupServiceImpl.java +++ b/web/src/main/java/com/navercorp/pinpoint/web/realtime/AgentLookupServiceImpl.java @@ -46,11 +46,12 @@ class AgentLookupServiceImpl implements AgentLookupService { @Override public List getRecentAgents(String applicationName) { - final long now = System.currentTimeMillis(); + long now = System.currentTimeMillis(); + long from = now - recentness.toMillis(); return intoClusterKeyList(this.agentInfoService.getAgentsListByApplicationName( - AgentStatusFilters.running(), + AgentStatusFilters.recentRunning(from), applicationName, - Range.between(now - recentness.toMillis(), now), + Range.between(from, now), SortByAgentInfo.Rules.AGENT_NAME_ASC )); }