Skip to content

Commit

Permalink
[#noissue] thrift
Browse files Browse the repository at this point in the history
  • Loading branch information
emeroad committed May 11, 2023
1 parent 89fe796 commit 0d87fb3
Show file tree
Hide file tree
Showing 23 changed files with 180 additions and 226 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,7 @@
import com.navercorp.pinpoint.profiler.metadata.SqlMetaData;
import com.navercorp.pinpoint.profiler.metadata.StringMetaData;
import com.navercorp.pinpoint.profiler.sender.EnhancedDataSender;
import com.navercorp.pinpoint.rpc.FutureListener;
import com.navercorp.pinpoint.rpc.ResponseMessage;
import com.navercorp.pinpoint.rpc.client.PinpointClientReconnectEventListener;
import com.navercorp.pinpoint.test.util.BiHashMap;
import com.navercorp.pinpoint.test.util.Pair;

Expand All @@ -33,6 +31,8 @@
import java.util.List;
import java.util.Map.Entry;
import java.util.NoSuchElementException;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;

/**
* @author Jongho Moon
Expand Down Expand Up @@ -140,20 +140,11 @@ public boolean request(MetaDataType data, int retry) {
}

@Override
public boolean request(MetaDataType data, FutureListener<ResponseMessage> listener) {
public boolean request(MetaDataType data, Consumer<CompletableFuture<ResponseMessage>> listener) {
addData(data);
return true;
}

@Override
public boolean addReconnectEventListener(PinpointClientReconnectEventListener eventListener) {
return false;
}

@Override
public boolean removeReconnectEventListener(PinpointClientReconnectEventListener eventListener) {
return false;
}

public String getApiDescription(int id) {
return syncGet(apiIdMap, id);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,22 @@

package com.navercorp.pinpoint.profiler;

import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.atomic.AtomicInteger;

import java.util.Objects;
import com.navercorp.pinpoint.profiler.context.thrift.MessageConverter;
import com.navercorp.pinpoint.profiler.metadata.AgentInfo;
import com.navercorp.pinpoint.profiler.metadata.MetaDataType;
import com.navercorp.pinpoint.profiler.sender.EnhancedDataSender;
import com.navercorp.pinpoint.profiler.sender.ResultResponse;
import com.navercorp.pinpoint.profiler.util.AgentInfoFactory;
import com.navercorp.pinpoint.rpc.DefaultFuture;
import com.navercorp.pinpoint.rpc.ResponseMessage;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import com.navercorp.pinpoint.profiler.sender.EnhancedDataSender;
import java.util.Objects;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

/**
* @author emeroad
Expand Down Expand Up @@ -175,31 +175,22 @@ public void run() {
private boolean sendAgentInfo() {
try {
AgentInfo agentInfo = agentInfoFactory.createAgentInfo();
final DefaultFuture<ResponseMessage> future = new DefaultFuture<>();
final CompletableFuture<ResponseMessage> future = new CompletableFuture<>();

logger.info("Sending AgentInfo {}", agentInfo);
dataSender.request(agentInfo, new ResponseMessageFutureListener(future));
if (!future.await()) {
logger.warn("request timed out while waiting for response.");
return false;
}
if (!future.isSuccess()) {
Throwable t = future.getCause();
logger.warn("request failed.", t);
return false;
}
ResponseMessage responseMessage = future.getResult();
if (responseMessage == null) {
ResponseMessage byteMessage = future.get(3000, TimeUnit.MILLISECONDS);
if (byteMessage == null) {
logger.warn("result not set.");
return false;
}
final ResultResponse result = messageConverter.toMessage(responseMessage);
final ResultResponse result = messageConverter.toMessage(byteMessage);
if (!result.isSuccess()) {
logger.warn("request unsuccessful. Cause : {}", result.getMessage());
}
return result.isSuccess();
} catch (Exception e) {
logger.warn("failed to send agent info.", e);
} catch (Throwable th) {
logger.warn("failed to send agent info.", th);
}
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,36 +16,34 @@

package com.navercorp.pinpoint.profiler;

import com.navercorp.pinpoint.rpc.DefaultFuture;
import com.navercorp.pinpoint.rpc.Future;
import com.navercorp.pinpoint.rpc.FutureListener;
import com.navercorp.pinpoint.rpc.ResponseMessage;

public class ResponseMessageFutureListener implements FutureListener<ResponseMessage> {
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;

private final DefaultFuture<ResponseMessage> future;
public class ResponseMessageFutureListener implements Consumer<CompletableFuture<ResponseMessage>> {

public ResponseMessageFutureListener(DefaultFuture<ResponseMessage> future) {
private final CompletableFuture<ResponseMessage> future;

public ResponseMessageFutureListener(CompletableFuture<ResponseMessage> future) {
this.future = future;
}

@Override
public void onComplete(Future<ResponseMessage> future) {
public void accept(CompletableFuture<ResponseMessage> future) {
if (future == null) {
this.future.setFailure(new IllegalStateException("ResponseMessage future is null"));
this.future.completeExceptionally(new IllegalStateException("ResponseMessage future is null"));
return;
}
if (!future.isReady()) {
this.future.setFailure(new IllegalStateException("ResponseMessage future is not complete"));
if (!future.isDone()) {
this.future.completeExceptionally(new IllegalStateException("ResponseMessage future is not complete"));
return;
}

if (future.isSuccess()) {
ResponseMessage responseMessage = future.getResult();
this.future.setResult(responseMessage);
} else {
Throwable cause = future.getCause();
this.future.setFailure(cause);
try {
ResponseMessage byteMessage = future.getNow(null);
this.future.complete(byteMessage);
} catch (Throwable cause) {
this.future.completeExceptionally(cause);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@

package com.navercorp.pinpoint.profiler.sender;

import com.navercorp.pinpoint.rpc.FutureListener;
import com.navercorp.pinpoint.rpc.ResponseMessage;
import com.navercorp.pinpoint.rpc.client.PinpointClientReconnectEventListener;

import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;


/**
Expand Down Expand Up @@ -55,17 +56,7 @@ public boolean request(T data, int retry) {


@Override
public boolean request(T data, FutureListener<ResponseMessage> listener) {
return false;
}

@Override
public boolean addReconnectEventListener(PinpointClientReconnectEventListener eventListener) {
return false;
}

@Override
public boolean removeReconnectEventListener(PinpointClientReconnectEventListener eventListener) {
public boolean request(T data, Consumer<CompletableFuture<ResponseMessage>> listener) {
return false;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@

package com.navercorp.pinpoint.profiler.sender;

import com.navercorp.pinpoint.rpc.FutureListener;
import com.navercorp.pinpoint.rpc.ResponseMessage;
import com.navercorp.pinpoint.rpc.client.PinpointClientReconnectEventListener;

import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;


/**
Expand All @@ -28,9 +29,7 @@ public interface EnhancedDataSender<T> extends DataSender<T> {

boolean request(T data);
boolean request(T data, int retry);
boolean request(T data, FutureListener<ResponseMessage> listener);
boolean request(T data, Consumer<CompletableFuture<ResponseMessage>> listener);

boolean addReconnectEventListener(PinpointClientReconnectEventListener eventListener);
boolean removeReconnectEventListener(PinpointClientReconnectEventListener eventListener);

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,20 @@

package com.navercorp.pinpoint.profiler.sender;

import com.navercorp.pinpoint.rpc.FutureListener;
import com.navercorp.pinpoint.rpc.ResponseMessage;

import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;

/**
* @author Woonduk Kang(emeroad)
*/
class ListenerableRequestMessage<T> implements RequestMessage<T> {
private final T message;
private final FutureListener<ResponseMessage> futureListener;
private final Consumer<CompletableFuture<ResponseMessage>> futureListener;


ListenerableRequestMessage(T message, FutureListener<ResponseMessage> futureListener) {
ListenerableRequestMessage(T message, Consumer<CompletableFuture<ResponseMessage>> futureListener) {
this.message = message;
this.futureListener = futureListener;
}
Expand All @@ -44,7 +46,7 @@ public int getRetryCount() {
}

@Override
public FutureListener<ResponseMessage> getFutureListener() {
public Consumer<CompletableFuture<ResponseMessage>> getFutureListener() {
return futureListener;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@

package com.navercorp.pinpoint.profiler.sender;

import com.navercorp.pinpoint.rpc.FutureListener;
import com.navercorp.pinpoint.rpc.ResponseMessage;
import com.navercorp.pinpoint.rpc.client.PinpointClientReconnectEventListener;

import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;


/**
Expand Down Expand Up @@ -58,21 +58,9 @@ public boolean request(T data, int retry) {


@Override
public boolean request(T data, FutureListener<ResponseMessage> listener) {
public boolean request(T data, Consumer<CompletableFuture<ResponseMessage>> listener) {
logger.info("request tBase:{} FutureListener:{}", data, listener);
return false;
}

@Override
public boolean addReconnectEventListener(PinpointClientReconnectEventListener eventListener) {
logger.info("addReconnectEventListener eventListener:{}", eventListener);
return false;
}

@Override
public boolean removeReconnectEventListener(PinpointClientReconnectEventListener eventListener) {
logger.info("removeReconnectEventListener eventListener:{}", eventListener);
return false;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package com.navercorp.pinpoint.profiler.sender;

import java.util.function.Consumer;

public interface ReconnectEventSupport<T> {
boolean addEventListener(Consumer<T> eventListener);

boolean removeEventListener(Consumer<T> eventListener);
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@

package com.navercorp.pinpoint.profiler.sender;

import com.navercorp.pinpoint.rpc.FutureListener;
import com.navercorp.pinpoint.rpc.ResponseMessage;

import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;

/**
* @author Woonduk Kang(emeroad)
*/
Expand All @@ -28,6 +30,6 @@ public interface RequestMessage<M> {

int getRetryCount();

FutureListener<ResponseMessage> getFutureListener();
Consumer<CompletableFuture<ResponseMessage>> getFutureListener();

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@
package com.navercorp.pinpoint.profiler.sender;


import com.navercorp.pinpoint.rpc.FutureListener;
import com.navercorp.pinpoint.rpc.ResponseMessage;

import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;

/**
* @author Woonduk Kang(emeroad)
*/
Expand All @@ -32,7 +34,7 @@ public static <T> RequestMessage<T> request(T message, int retryCount) {
return new RetryRequestMessage<>(message, retryCount);
}

public static <T> RequestMessage<T> request(T message, FutureListener<ResponseMessage> futureListener) {
public static <T> RequestMessage<T> request(T message, Consumer<CompletableFuture<ResponseMessage>> futureListener) {
return new ListenerableRequestMessage<>(message, futureListener);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@
package com.navercorp.pinpoint.profiler.sender;


import com.navercorp.pinpoint.rpc.FutureListener;
import com.navercorp.pinpoint.rpc.ResponseMessage;

import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;

/**
* @author Woonduk Kang(emeroad)
*/
Expand All @@ -45,7 +47,7 @@ public int getRetryCount() {
}

@Override
public FutureListener<ResponseMessage> getFutureListener() {
public Consumer<CompletableFuture<ResponseMessage>> getFutureListener() {
return null;
}
}
Loading

0 comments on commit 0d87fb3

Please sign in to comment.