Skip to content

Commit

Permalink
[#noissue] Removed thrift in realtime
Browse files Browse the repository at this point in the history
  • Loading branch information
smilu97 committed Oct 5, 2023
1 parent 9db65be commit f39def5
Show file tree
Hide file tree
Showing 164 changed files with 3,943 additions and 6,813 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import reactor.core.publisher.Mono;

import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;

/**
* @author youngjin.kim2
Expand Down Expand Up @@ -107,12 +108,13 @@ public boolean consume(byte[] rawDemand) {
private boolean responseToDemand(D demand) {
Mono<S> response = this.backend.demand(demand);
if (response != null) {
logger.info("Responding long pubsub demand ({})", demand);
PubChannel supplyPubChannel = getSupplyPubChannel(demand);
response.subscribe(new PubChannelProxy(supplyPubChannel));
response
.doOnError(e -> logger.debug("Ignored short pubsub demand: {}", demand))
.onErrorComplete(IgnoreDemandException.class)
.subscribe(new PubChannelProxy(demand));
return true;
} else {
logger.debug("Ignored long pubsub demand (id: {})", demand);
logger.debug("Ignored short pubsub demand: {}", demand);
}
return false;
}
Expand All @@ -139,12 +141,13 @@ public boolean consume(byte[] rawDemand) {
private boolean responseToDemand(D demand) {
Flux<S> response = this.backend.demand(demand);
if (response != null) {
logger.info("Responding long pubsub demand ({})", demand);
PubChannel supplyPubChannel = getSupplyPubChannel(demand);
response.subscribe(new PubChannelProxy(supplyPubChannel));
response
.doOnError(e -> logger.debug("Ignored long pubsub demand: {}", demand))
.onErrorComplete(IgnoreDemandException.class)
.subscribe(new PubChannelProxy(demand));
return true;
} else {
logger.debug("Ignored long pubsub demand (id: {})", demand);
logger.debug("Ignored long pubsub demand: {}", demand);
}
return false;
}
Expand All @@ -153,22 +156,35 @@ private boolean responseToDemand(D demand) {

private class PubChannelProxy extends BaseSubscriber<S> {

private final PubChannel channel;
private final D demand;
private final AtomicReference<PubChannel> channelRef = new AtomicReference<>(null);

PubChannelProxy(PubChannel channel) {
this.channel = channel;
PubChannelProxy(D demand) {
this.demand = demand;
}

@Override
public void hookOnNext(@NonNull S supply) {
try {
byte[] rawResponse = getProtocol().serializeSupply(supply);
this.channel.publish(rawResponse);
this.getPubChannel().publish(rawResponse);
} catch (Exception e) {
logger.error("Failed to send", e);
}
}

private PubChannel getPubChannel() {
PubChannel prev = this.channelRef.get();
if (prev == null) {
logger.info("Responding pubsub demand ({})", demand);
PubChannel pub = getSupplyPubChannel(this.demand);
this.channelRef.set(pub);
return pub;
} else {
return prev;
}
}

}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright 2023 NAVER Corp.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.navercorp.pinpoint.channel.service.server;

/**
* @author youngjin.kim2
*/
public class IgnoreDemandException extends RuntimeException {

public IgnoreDemandException(String message) {
super(message);
}

}
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package com.navercorp.pinpoint.collector;


import com.navercorp.pinpoint.collector.cluster.RealtimeCollectorModuleAdaptorConfig;
import com.navercorp.pinpoint.collector.config.ClusterModule;
import com.navercorp.pinpoint.collector.config.CollectorProperties;
import com.navercorp.pinpoint.collector.config.FlinkContextModule;
Expand All @@ -10,6 +9,7 @@
import com.navercorp.pinpoint.collector.grpc.ssl.GrpcSslModule;
import com.navercorp.pinpoint.common.server.CommonsServerConfiguration;
import com.navercorp.pinpoint.common.server.config.TypeLoaderConfiguration;
import com.navercorp.pinpoint.realtime.collector.RealtimeCollectorModule;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
Expand Down Expand Up @@ -39,7 +39,7 @@

GrpcSslModule.class,

RealtimeCollectorModuleAdaptorConfig.class,
RealtimeCollectorModule.class,
})
@ComponentScan(basePackages = {
"com.navercorp.pinpoint.collector.handler",
Expand Down

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

Loading

0 comments on commit f39def5

Please sign in to comment.