From 09309d6479a89352d881b51470137ce62e66c030 Mon Sep 17 00:00:00 2001 From: Kim YoungJin Date: Mon, 7 Aug 2023 11:36:42 +0900 Subject: [PATCH] [#noissue] Added logs for responding pubsub demands --- .../java/com/navercorp/pinpoint/util/ThrottleTest.java | 8 ++++---- .../pinpoint/pubsub/endpoint/PubSubServerImpl.java | 10 ++++++++++ 2 files changed, 14 insertions(+), 4 deletions(-) diff --git a/realtime/realtime-common/src/test/java/com/navercorp/pinpoint/util/ThrottleTest.java b/realtime/realtime-common/src/test/java/com/navercorp/pinpoint/util/ThrottleTest.java index 5b95e2475f4f..026c17d260de 100644 --- a/realtime/realtime-common/src/test/java/com/navercorp/pinpoint/util/ThrottleTest.java +++ b/realtime/realtime-common/src/test/java/com/navercorp/pinpoint/util/ThrottleTest.java @@ -34,19 +34,19 @@ public class ThrottleTest { @Test public void shouldHitAround10Times() { final Throttle throttle = new MinTermThrottle(TimeUnit.MILLISECONDS.toNanos(10)); - final long threshold = TimeUnit.MILLISECONDS.toNanos(100); - final long now = System.nanoTime(); + final long testDuration = TimeUnit.MILLISECONDS.toNanos(100); + final long startedAt = System.nanoTime(); final AtomicLong numTry = new AtomicLong(0); final AtomicLong numHit = new AtomicLong(0); executeParallel(() -> { - while (System.nanoTime() - now < threshold) { + while (System.nanoTime() - startedAt < testDuration) { numTry.incrementAndGet(); if (throttle.hit()) { numHit.incrementAndGet(); } } }); - assertThat(numHit.get()).isLessThanOrEqualTo(10).isGreaterThanOrEqualTo(8); + assertThat(numHit.get()).isLessThanOrEqualTo(11).isGreaterThanOrEqualTo(8); } private void executeParallel(Runnable target) { diff --git a/redis/src/main/java/com/navercorp/pinpoint/pubsub/endpoint/PubSubServerImpl.java b/redis/src/main/java/com/navercorp/pinpoint/pubsub/endpoint/PubSubServerImpl.java index 16ba1e9ce4b3..ebbdfe262d0a 100644 --- a/redis/src/main/java/com/navercorp/pinpoint/pubsub/endpoint/PubSubServerImpl.java +++ b/redis/src/main/java/com/navercorp/pinpoint/pubsub/endpoint/PubSubServerImpl.java @@ -18,6 +18,8 @@ import com.navercorp.pinpoint.pubsub.PubChannel; import com.navercorp.pinpoint.pubsub.SubChannel; import com.navercorp.pinpoint.pubsub.SubConsumer; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.springframework.lang.NonNull; import reactor.core.publisher.BaseSubscriber; import reactor.core.publisher.Flux; @@ -33,6 +35,8 @@ */ class PubSubServerImpl implements PubSubServer { + private static final Logger logger = LogManager.getLogger(PubSubServerImpl.class); + private final Function> monoService; private final Function> fluxService; private final SubChannel> demandChannel; @@ -73,9 +77,12 @@ public boolean consume(DemandMessage demand) { private boolean responseToDemand(DemandMessage demand) { final Mono mono = monoService.apply(demand.getContent()); if (mono != null) { + logger.info("Responding short pubsub demand (id: {})", demand.getId()); final Identifier demandId = demand.getId(); mono.subscribe(new ShortResponseSubscriber(supplyRouter.apply(demandId), demandId)); return true; + } else { + logger.debug("Ignored short pubsub demand (id: {})", demand.getId()); } return false; } @@ -113,9 +120,12 @@ public boolean consume(DemandMessage demand) { private boolean responseToDemand(DemandMessage demand) { final Flux flux = fluxService.apply(demand.getContent()); if (flux != null) { + logger.info("Responding long pubsub demand (id: {})", demand.getId()); final Identifier demandId = demand.getId(); flux.subscribe(new LongResponseSubscriber(supplyRouter.apply(demandId), demandId)); return true; + } else { + logger.debug("Ignored long pubsub demand (id: {})", demand.getId()); } return false; }