Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#11278] Add gRPC transport and stream monitoring #11279

Merged
merged 1 commit into from
Jul 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,18 @@
package com.navercorp.pinpoint.collector.receiver.grpc;

import com.navercorp.pinpoint.collector.receiver.BindAddress;
import com.navercorp.pinpoint.collector.receiver.grpc.monitor.BasicMonitor;
import com.navercorp.pinpoint.collector.receiver.grpc.monitor.EmptyMonitor;
import com.navercorp.pinpoint.collector.receiver.grpc.monitor.Monitor;
import com.navercorp.pinpoint.common.server.util.AddressFilter;
import com.navercorp.pinpoint.common.util.Assert;
import com.navercorp.pinpoint.common.util.CollectionUtils;
import com.navercorp.pinpoint.grpc.channelz.ChannelzRegistry;
import com.navercorp.pinpoint.grpc.server.ConnectionCountServerTransportFilter;
import com.navercorp.pinpoint.grpc.server.MetadataServerTransportFilter;
import com.navercorp.pinpoint.grpc.server.ServerFactory;
import com.navercorp.pinpoint.grpc.server.ServerOption;
import com.navercorp.pinpoint.grpc.server.StreamCountInterceptor;
import com.navercorp.pinpoint.grpc.server.TransportMetadataFactory;
import com.navercorp.pinpoint.grpc.server.TransportMetadataServerInterceptor;
import com.navercorp.pinpoint.grpc.util.ServerUtils;
Expand Down Expand Up @@ -78,6 +83,9 @@
private Server server;
private ChannelzRegistry channelzRegistry;

private boolean enableMonitor = true;

Check warning on line 86 in collector/src/main/java/com/navercorp/pinpoint/collector/receiver/grpc/GrpcReceiver.java

View check run for this annotation

Codecov / codecov/patch

collector/src/main/java/com/navercorp/pinpoint/collector/receiver/grpc/GrpcReceiver.java#L86

Added line #L86 was not covered by tests
private Monitor monitor;


@Override
public void afterPropertiesSet() throws Exception {
Expand All @@ -91,6 +99,11 @@
Objects.requireNonNull(this.addressFilter, "addressFilter");
Assert.isTrue(CollectionUtils.hasLength(this.serviceList), "serviceList must not be empty");
Objects.requireNonNull(this.serverOption, "serverOption");
if (enableMonitor) {
this.monitor = new BasicMonitor(beanName + "-Monitor");

Check warning on line 103 in collector/src/main/java/com/navercorp/pinpoint/collector/receiver/grpc/GrpcReceiver.java

View check run for this annotation

Codecov / codecov/patch

collector/src/main/java/com/navercorp/pinpoint/collector/receiver/grpc/GrpcReceiver.java#L103

Added line #L103 was not covered by tests
} else {
this.monitor = new EmptyMonitor();

Check warning on line 105 in collector/src/main/java/com/navercorp/pinpoint/collector/receiver/grpc/GrpcReceiver.java

View check run for this annotation

Codecov / codecov/patch

collector/src/main/java/com/navercorp/pinpoint/collector/receiver/grpc/GrpcReceiver.java#L105

Added line #L105 was not covered by tests
}

if (sslContext != null) {
this.serverFactory = new ServerFactory(beanName, this.bindAddress.getIp(), this.bindAddress.getPort(), this.executor, this.serverCallExecutorSupplier, serverOption, sslContext);
Expand All @@ -101,6 +114,9 @@
ServerTransportFilter permissionServerTransportFilter = new PermissionServerTransportFilter(this.beanName, addressFilter);
this.serverFactory.addTransportFilter(permissionServerTransportFilter);

ConnectionCountServerTransportFilter countFilter = new ConnectionCountServerTransportFilter();
this.serverFactory.addTransportFilter(countFilter);

Check warning on line 118 in collector/src/main/java/com/navercorp/pinpoint/collector/receiver/grpc/GrpcReceiver.java

View check run for this annotation

Codecov / codecov/patch

collector/src/main/java/com/navercorp/pinpoint/collector/receiver/grpc/GrpcReceiver.java#L117-L118

Added lines #L117 - L118 were not covered by tests

TransportMetadataFactory transportMetadataFactory = new TransportMetadataFactory(beanName);
// Mandatory interceptor
final ServerTransportFilter metadataTransportFilter = new MetadataServerTransportFilter(transportMetadataFactory);
Expand All @@ -116,6 +132,9 @@
ServerInterceptor transportMetadataServerInterceptor = new TransportMetadataServerInterceptor();
this.serverFactory.addInterceptor(transportMetadataServerInterceptor);

StreamCountInterceptor streamCountInterceptor = new StreamCountInterceptor();
this.serverFactory.addInterceptor(streamCountInterceptor);

Check warning on line 136 in collector/src/main/java/com/navercorp/pinpoint/collector/receiver/grpc/GrpcReceiver.java

View check run for this annotation

Codecov / codecov/patch

collector/src/main/java/com/navercorp/pinpoint/collector/receiver/grpc/GrpcReceiver.java#L135-L136

Added lines #L135 - L136 were not covered by tests

if (CollectionUtils.hasLength(serverInterceptorList)) {
for (ServerInterceptor serverInterceptor : serverInterceptorList) {
this.serverFactory.addInterceptor(serverInterceptor);
Expand All @@ -125,6 +144,10 @@
this.serverFactory.setChannelzRegistry(channelzRegistry);
}

this.monitor.register(() -> {
logger.info("{} CurrentTransport:{}, CurrentGrpcStream:{}", beanName, countFilter.getCurrentConnection(), streamCountInterceptor.getCurrentStream());
});

Check warning on line 149 in collector/src/main/java/com/navercorp/pinpoint/collector/receiver/grpc/GrpcReceiver.java

View check run for this annotation

Codecov / codecov/patch

collector/src/main/java/com/navercorp/pinpoint/collector/receiver/grpc/GrpcReceiver.java#L147-L149

Added lines #L147 - L149 were not covered by tests

// Add service
addService();

Expand Down Expand Up @@ -171,6 +194,8 @@
logger.info("Destroy {} server {}", this.beanName, this.server);
}

monitor.close();

Check warning on line 197 in collector/src/main/java/com/navercorp/pinpoint/collector/receiver/grpc/GrpcReceiver.java

View check run for this annotation

Codecov / codecov/patch

collector/src/main/java/com/navercorp/pinpoint/collector/receiver/grpc/GrpcReceiver.java#L197

Added line #L197 was not covered by tests

shutdownServer();

for (Object bindableService : serviceList) {
Expand Down Expand Up @@ -242,4 +267,7 @@
this.channelzRegistry = Objects.requireNonNull(channelzRegistry, "channelzRegistry");
}

public void setEnableMonitor(boolean enableMonitor) {
this.enableMonitor = enableMonitor;
}

Check warning on line 272 in collector/src/main/java/com/navercorp/pinpoint/collector/receiver/grpc/GrpcReceiver.java

View check run for this annotation

Codecov / codecov/patch

collector/src/main/java/com/navercorp/pinpoint/collector/receiver/grpc/GrpcReceiver.java#L271-L272

Added lines #L271 - L272 were not covered by tests
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package com.navercorp.pinpoint.collector.receiver.grpc.monitor;

import java.util.Objects;
import java.util.Timer;
import java.util.TimerTask;

public class BasicMonitor implements Monitor {
private final Timer timer;

public BasicMonitor(String name) {
this.timer = new Timer(name, true);
}

Check warning on line 12 in collector/src/main/java/com/navercorp/pinpoint/collector/receiver/grpc/monitor/BasicMonitor.java

View check run for this annotation

Codecov / codecov/patch

collector/src/main/java/com/navercorp/pinpoint/collector/receiver/grpc/monitor/BasicMonitor.java#L10-L12

Added lines #L10 - L12 were not covered by tests

public void register(Runnable job) {
Objects.requireNonNull(job, "job");
this.timer.schedule(new TimerTask() {

Check warning on line 16 in collector/src/main/java/com/navercorp/pinpoint/collector/receiver/grpc/monitor/BasicMonitor.java

View check run for this annotation

Codecov / codecov/patch

collector/src/main/java/com/navercorp/pinpoint/collector/receiver/grpc/monitor/BasicMonitor.java#L15-L16

Added lines #L15 - L16 were not covered by tests
@Override
public void run() {
job.run();
}

Check warning on line 20 in collector/src/main/java/com/navercorp/pinpoint/collector/receiver/grpc/monitor/BasicMonitor.java

View check run for this annotation

Codecov / codecov/patch

collector/src/main/java/com/navercorp/pinpoint/collector/receiver/grpc/monitor/BasicMonitor.java#L19-L20

Added lines #L19 - L20 were not covered by tests
}, 60_000, 60_000);
}

Check warning on line 22 in collector/src/main/java/com/navercorp/pinpoint/collector/receiver/grpc/monitor/BasicMonitor.java

View check run for this annotation

Codecov / codecov/patch

collector/src/main/java/com/navercorp/pinpoint/collector/receiver/grpc/monitor/BasicMonitor.java#L22

Added line #L22 was not covered by tests


@Override
public void close() {
this.timer.purge();
this.timer.cancel();
}

Check warning on line 29 in collector/src/main/java/com/navercorp/pinpoint/collector/receiver/grpc/monitor/BasicMonitor.java

View check run for this annotation

Codecov / codecov/patch

collector/src/main/java/com/navercorp/pinpoint/collector/receiver/grpc/monitor/BasicMonitor.java#L27-L29

Added lines #L27 - L29 were not covered by tests
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package com.navercorp.pinpoint.collector.receiver.grpc.monitor;

public class EmptyMonitor implements Monitor {

public EmptyMonitor() {
}

Check warning on line 6 in collector/src/main/java/com/navercorp/pinpoint/collector/receiver/grpc/monitor/EmptyMonitor.java

View check run for this annotation

Codecov / codecov/patch

collector/src/main/java/com/navercorp/pinpoint/collector/receiver/grpc/monitor/EmptyMonitor.java#L5-L6

Added lines #L5 - L6 were not covered by tests

public void register(Runnable job) {
}

Check warning on line 9 in collector/src/main/java/com/navercorp/pinpoint/collector/receiver/grpc/monitor/EmptyMonitor.java

View check run for this annotation

Codecov / codecov/patch

collector/src/main/java/com/navercorp/pinpoint/collector/receiver/grpc/monitor/EmptyMonitor.java#L9

Added line #L9 was not covered by tests

@Override
public void close() {
}

Check warning on line 13 in collector/src/main/java/com/navercorp/pinpoint/collector/receiver/grpc/monitor/EmptyMonitor.java

View check run for this annotation

Codecov / codecov/patch

collector/src/main/java/com/navercorp/pinpoint/collector/receiver/grpc/monitor/EmptyMonitor.java#L13

Added line #L13 was not covered by tests
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package com.navercorp.pinpoint.collector.receiver.grpc.monitor;

import java.io.Closeable;

public interface Monitor extends Closeable {

void register(Runnable job);

void close();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright 2019 NAVER Corp.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.navercorp.pinpoint.grpc.server;

import io.grpc.Attributes;
import io.grpc.ServerTransportFilter;

import java.util.concurrent.atomic.AtomicLong;

public class ConnectionCountServerTransportFilter extends ServerTransportFilter {

private final AtomicLong currentConnection = new AtomicLong();

Check warning on line 26 in grpc/src/main/java/com/navercorp/pinpoint/grpc/server/ConnectionCountServerTransportFilter.java

View check run for this annotation

Codecov / codecov/patch

grpc/src/main/java/com/navercorp/pinpoint/grpc/server/ConnectionCountServerTransportFilter.java#L26

Added line #L26 was not covered by tests

public ConnectionCountServerTransportFilter() {
}

Check warning on line 29 in grpc/src/main/java/com/navercorp/pinpoint/grpc/server/ConnectionCountServerTransportFilter.java

View check run for this annotation

Codecov / codecov/patch

grpc/src/main/java/com/navercorp/pinpoint/grpc/server/ConnectionCountServerTransportFilter.java#L28-L29

Added lines #L28 - L29 were not covered by tests


@Override
public Attributes transportReady(Attributes transportAttrs) {
currentConnection.incrementAndGet();
return transportAttrs;

Check warning on line 35 in grpc/src/main/java/com/navercorp/pinpoint/grpc/server/ConnectionCountServerTransportFilter.java

View check run for this annotation

Codecov / codecov/patch

grpc/src/main/java/com/navercorp/pinpoint/grpc/server/ConnectionCountServerTransportFilter.java#L34-L35

Added lines #L34 - L35 were not covered by tests
}

@Override
public void transportTerminated(Attributes transportAttrs) {
currentConnection.decrementAndGet();
}

Check warning on line 41 in grpc/src/main/java/com/navercorp/pinpoint/grpc/server/ConnectionCountServerTransportFilter.java

View check run for this annotation

Codecov / codecov/patch

grpc/src/main/java/com/navercorp/pinpoint/grpc/server/ConnectionCountServerTransportFilter.java#L40-L41

Added lines #L40 - L41 were not covered by tests

public long getCurrentConnection() {
return currentConnection.get();

Check warning on line 44 in grpc/src/main/java/com/navercorp/pinpoint/grpc/server/ConnectionCountServerTransportFilter.java

View check run for this annotation

Codecov / codecov/patch

grpc/src/main/java/com/navercorp/pinpoint/grpc/server/ConnectionCountServerTransportFilter.java#L44

Added line #L44 was not covered by tests
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package com.navercorp.pinpoint.grpc.server;

import io.grpc.ForwardingServerCallListener;
import io.grpc.Metadata;
import io.grpc.ServerCall;
import io.grpc.ServerCallHandler;
import io.grpc.ServerInterceptor;

import java.util.concurrent.atomic.AtomicLong;

public class StreamCountInterceptor implements ServerInterceptor {
private final AtomicLong currentStream = new AtomicLong();
public StreamCountInterceptor() {
}

Check warning on line 14 in grpc/src/main/java/com/navercorp/pinpoint/grpc/server/StreamCountInterceptor.java

View check run for this annotation

Codecov / codecov/patch

grpc/src/main/java/com/navercorp/pinpoint/grpc/server/StreamCountInterceptor.java#L12-L14

Added lines #L12 - L14 were not covered by tests

@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call, Metadata headers, ServerCallHandler<ReqT, RespT> next) {
currentStream.incrementAndGet();
ServerCall.Listener<ReqT> listener = next.startCall(call, headers);
return new ForwardingServerCallListener.SimpleForwardingServerCallListener<ReqT>(listener) {

Check warning on line 20 in grpc/src/main/java/com/navercorp/pinpoint/grpc/server/StreamCountInterceptor.java

View check run for this annotation

Codecov / codecov/patch

grpc/src/main/java/com/navercorp/pinpoint/grpc/server/StreamCountInterceptor.java#L18-L20

Added lines #L18 - L20 were not covered by tests
@Override
public void onCancel() {
currentStream.decrementAndGet();
super.onCancel();
}

Check warning on line 25 in grpc/src/main/java/com/navercorp/pinpoint/grpc/server/StreamCountInterceptor.java

View check run for this annotation

Codecov / codecov/patch

grpc/src/main/java/com/navercorp/pinpoint/grpc/server/StreamCountInterceptor.java#L23-L25

Added lines #L23 - L25 were not covered by tests

@Override
public void onComplete() {
currentStream.decrementAndGet();
super.onComplete();
}

Check warning on line 31 in grpc/src/main/java/com/navercorp/pinpoint/grpc/server/StreamCountInterceptor.java

View check run for this annotation

Codecov / codecov/patch

grpc/src/main/java/com/navercorp/pinpoint/grpc/server/StreamCountInterceptor.java#L29-L31

Added lines #L29 - L31 were not covered by tests

};
}

public long getCurrentStream() {
return currentStream.get();

Check warning on line 37 in grpc/src/main/java/com/navercorp/pinpoint/grpc/server/StreamCountInterceptor.java

View check run for this annotation

Codecov / codecov/patch

grpc/src/main/java/com/navercorp/pinpoint/grpc/server/StreamCountInterceptor.java#L37

Added line #L37 was not covered by tests
}
}