diff --git a/redis/src/main/java/com/navercorp/pinpoint/pubsub/endpoint/PubSubFluxClientImpl.java b/redis/src/main/java/com/navercorp/pinpoint/pubsub/endpoint/PubSubFluxClientImpl.java index 08bbf16c3bc9..9b7066e157b8 100644 --- a/redis/src/main/java/com/navercorp/pinpoint/pubsub/endpoint/PubSubFluxClientImpl.java +++ b/redis/src/main/java/com/navercorp/pinpoint/pubsub/endpoint/PubSubFluxClientImpl.java @@ -51,11 +51,13 @@ public Flux request(D demand) { final Identifier id = identifierFactory.get(); final SubChannel> supplyChannel = supplyRouter.apply(id); final SubConsumer> subConsumer = new LongTermSubConsumer<>(sink, id, subscriptionRef); - subscriptionRef.set(supplyChannel.subscribe(subConsumer)); + final Subscription subscription = supplyChannel.subscribe(subConsumer); + subscriptionRef.set(subscription); demandRouter.apply(id).publish(DemandMessage.ok(id, demand)); - return sink.asFlux(); + return sink.asFlux() + .doFinally(e -> subscription.unsubscribe()); } static class LongTermSubConsumer implements SubConsumer> { diff --git a/redis/src/main/java/com/navercorp/pinpoint/pubsub/endpoint/PubSubMonoClientImpl.java b/redis/src/main/java/com/navercorp/pinpoint/pubsub/endpoint/PubSubMonoClientImpl.java index d82d95196f6c..691f4bd1adc0 100644 --- a/redis/src/main/java/com/navercorp/pinpoint/pubsub/endpoint/PubSubMonoClientImpl.java +++ b/redis/src/main/java/com/navercorp/pinpoint/pubsub/endpoint/PubSubMonoClientImpl.java @@ -47,11 +47,14 @@ public Mono request(D demand) { final Identifier id = identifierFactory.get(); final SubChannel> supplyChannel = supplyRouter.apply(id); final SubConsumer> subConsumer = new ShortTermSubConsumer<>(sink, id, subscriptionRef); - subscriptionRef.set(supplyChannel.subscribe(subConsumer)); + final Subscription subscription = supplyChannel.subscribe(subConsumer); + subscriptionRef.set(subscription); demandRouter.apply(id).publish(DemandMessage.ok(id, demand)); - return sink.asMono().timeout(this.options.getRequestTimeout()); + return sink.asMono() + .doFinally(e -> subscription.unsubscribe()) + .timeout(this.options.getRequestTimeout()); } static class ShortTermSubConsumer implements SubConsumer> {