Skip to content

Commit

Permalink
[#noissue] Added logs for responding pubsub demands
Browse files Browse the repository at this point in the history
  • Loading branch information
smilu97 committed Aug 7, 2023
1 parent b7a01bd commit 09309d6
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,19 +34,19 @@ public class ThrottleTest {
@Test
public void shouldHitAround10Times() {
final Throttle throttle = new MinTermThrottle(TimeUnit.MILLISECONDS.toNanos(10));
final long threshold = TimeUnit.MILLISECONDS.toNanos(100);
final long now = System.nanoTime();
final long testDuration = TimeUnit.MILLISECONDS.toNanos(100);
final long startedAt = System.nanoTime();
final AtomicLong numTry = new AtomicLong(0);
final AtomicLong numHit = new AtomicLong(0);
executeParallel(() -> {
while (System.nanoTime() - now < threshold) {
while (System.nanoTime() - startedAt < testDuration) {
numTry.incrementAndGet();
if (throttle.hit()) {
numHit.incrementAndGet();
}
}
});
assertThat(numHit.get()).isLessThanOrEqualTo(10).isGreaterThanOrEqualTo(8);
assertThat(numHit.get()).isLessThanOrEqualTo(11).isGreaterThanOrEqualTo(8);
}

private void executeParallel(Runnable target) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import com.navercorp.pinpoint.pubsub.PubChannel;
import com.navercorp.pinpoint.pubsub.SubChannel;
import com.navercorp.pinpoint.pubsub.SubConsumer;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.lang.NonNull;
import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Flux;
Expand All @@ -33,6 +35,8 @@
*/
class PubSubServerImpl<D, S> implements PubSubServer {

private static final Logger logger = LogManager.getLogger(PubSubServerImpl.class);

private final Function<D, Mono<S>> monoService;
private final Function<D, Flux<S>> fluxService;
private final SubChannel<DemandMessage<D>> demandChannel;
Expand Down Expand Up @@ -73,9 +77,12 @@ public boolean consume(DemandMessage<D> demand) {
private boolean responseToDemand(DemandMessage<D> demand) {
final Mono<S> mono = monoService.apply(demand.getContent());
if (mono != null) {
logger.info("Responding short pubsub demand (id: {})", demand.getId());
final Identifier demandId = demand.getId();
mono.subscribe(new ShortResponseSubscriber(supplyRouter.apply(demandId), demandId));
return true;
} else {
logger.debug("Ignored short pubsub demand (id: {})", demand.getId());
}
return false;
}
Expand Down Expand Up @@ -113,9 +120,12 @@ public boolean consume(DemandMessage<D> demand) {
private boolean responseToDemand(DemandMessage<D> demand) {
final Flux<S> flux = fluxService.apply(demand.getContent());
if (flux != null) {
logger.info("Responding long pubsub demand (id: {})", demand.getId());
final Identifier demandId = demand.getId();
flux.subscribe(new LongResponseSubscriber(supplyRouter.apply(demandId), demandId));
return true;
} else {
logger.debug("Ignored long pubsub demand (id: {})", demand.getId());
}
return false;
}
Expand Down

0 comments on commit 09309d6

Please sign in to comment.