Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#10343] Removed cluster between web, collector #10350

Merged
merged 1 commit into from
Sep 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 5 additions & 13 deletions collector/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,19 +26,11 @@ java -jar -Dpinpoint.zookeeper.address=$ZOOKEEPER_ADDRESS -Dspring.profiles.acti

## Collector port
## gRPC port
| port | protocol | type
| ---- | ---- | ----
| 9991 | TCP | agent
| 9992 | TCP | span
| 9993 | TCP | stat

## Thrift port
| port | protocol | type
| ---- | ---- | ----
| 9994 | TCP | agent
| 9995 | UDP | span
| 9996 | UDP | stat

| port | protocol | type |
|------|----------|-------|
| 9991 | TCP | agent |
| 9992 | TCP | span |
| 9993 | TCP | stat |

## Configuration for development environment
Use /config directory [External Application Properties](https://docs.spring.io/spring-boot/docs/current/reference/html/features.html#features.external-config.files)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,16 +47,16 @@
throw new RuntimeException("Unsupported command: " + command);
}
if (clusterPoint instanceof GrpcAgentConnection) {
return openStream(handler, tCommand);
return openStream(handler, command);

Check warning on line 50 in collector/src/main/java/com/navercorp/pinpoint/collector/cluster/AgentConnectionImpl.java

View check run for this annotation

Codecov / codecov/patch

collector/src/main/java/com/navercorp/pinpoint/collector/cluster/AgentConnectionImpl.java#L50

Added line #L50 was not covered by tests
}
throw new RuntimeException("Invalid clusterPoint: " + clusterPoint);
}

private ClientStreamChannel openStream(ClientStreamChannelEventHandler handler, TBase<?, ?> tCommand) {
private ClientStreamChannel openStream(ClientStreamChannelEventHandler handler, GeneratedMessageV3 command) {
try {
return ((GrpcAgentConnection) clusterPoint).openStream(tCommand, handler);
return ((GrpcAgentConnection) clusterPoint).openStream(command, handler);

Check warning on line 57 in collector/src/main/java/com/navercorp/pinpoint/collector/cluster/AgentConnectionImpl.java

View check run for this annotation

Codecov / codecov/patch

collector/src/main/java/com/navercorp/pinpoint/collector/cluster/AgentConnectionImpl.java#L57

Added line #L57 was not covered by tests
} catch (StreamException e) {
throw new RuntimeException("Failed to openStream " + tCommand);
throw new RuntimeException("Failed to openStream " + command);

Check warning on line 59 in collector/src/main/java/com/navercorp/pinpoint/collector/cluster/AgentConnectionImpl.java

View check run for this annotation

Codecov / codecov/patch

collector/src/main/java/com/navercorp/pinpoint/collector/cluster/AgentConnectionImpl.java#L59

Added line #L59 was not covered by tests
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,24 @@
import com.navercorp.pinpoint.thrift.sender.message.CommandGrpcToThriftMessageConverter;

import java.util.Objects;
import java.util.Set;

/**
* @author youngjin.kim2
*/
public class AgentConnectionRepositoryImpl implements AgentConnectionRepository {

private final StreamRouteHandler streamRouteHandler;
private final ClusterPointRepository<?> clusterPointRepository;

private final CommandGrpcToThriftMessageConverter messageConverter = new CommandGrpcToThriftMessageConverter();

public AgentConnectionRepositoryImpl(StreamRouteHandler streamRouteHandler) {
public AgentConnectionRepositoryImpl(
StreamRouteHandler streamRouteHandler,
ClusterPointRepository<?> clusterPointRepository
) {

Check warning on line 40 in collector/src/main/java/com/navercorp/pinpoint/collector/cluster/AgentConnectionRepositoryImpl.java

View check run for this annotation

Codecov / codecov/patch

collector/src/main/java/com/navercorp/pinpoint/collector/cluster/AgentConnectionRepositoryImpl.java#L40

Added line #L40 was not covered by tests
this.streamRouteHandler = Objects.requireNonNull(streamRouteHandler, "streamRouteHandler");
this.clusterPointRepository = Objects.requireNonNull(clusterPointRepository, "clusterPointRepository");

Check warning on line 42 in collector/src/main/java/com/navercorp/pinpoint/collector/cluster/AgentConnectionRepositoryImpl.java

View check run for this annotation

Codecov / codecov/patch

collector/src/main/java/com/navercorp/pinpoint/collector/cluster/AgentConnectionRepositoryImpl.java#L42

Added line #L42 was not covered by tests
}

@Override
Expand All @@ -46,4 +52,9 @@
return new AgentConnectionImpl(clusterPoint, this.messageConverter);
}

@Override
public Set<ClusterKey> getClusterKeys() {
return this.clusterPointRepository.getAvailableAgentKeySet();

Check warning on line 57 in collector/src/main/java/com/navercorp/pinpoint/collector/cluster/AgentConnectionRepositoryImpl.java

View check run for this annotation

Codecov / codecov/patch

collector/src/main/java/com/navercorp/pinpoint/collector/cluster/AgentConnectionRepositoryImpl.java#L57

Added line #L57 was not covered by tests
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -33,26 +33,16 @@
this.address = Objects.requireNonNull(address, "address");
}

private void assertAddressNotNull(InetSocketAddress inetSocketAddress) {
if (inetSocketAddress == null) {
throw new IllegalArgumentException("address may not be null");
}
}

@Override
public InetSocketAddress resolve() {
String host = address.getHost();
int port = address.getPort();

InetSocketAddress address = new InetSocketAddress(host, port);
return address;
return new InetSocketAddress(host, port);
}

@Override
public String toString() {
final StringBuilder sb = new StringBuilder("ClusterAddressProvider{");
sb.append("address=").append(address);
sb.append('}');
return sb.toString();
return "ClusterAddressProvider{" + "address=" + address + '}';

Check warning on line 46 in collector/src/main/java/com/navercorp/pinpoint/collector/cluster/ClusterAddressProvider.java

View check run for this annotation

Codecov / codecov/patch

collector/src/main/java/com/navercorp/pinpoint/collector/cluster/ClusterAddressProvider.java#L46

Added line #L46 was not covered by tests
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ public List<T> getClusterPointList() {
}
}

public Set<ClusterKey> getAvailableAgentKeyList() {
public Set<ClusterKey> getAvailableAgentKeySet() {
synchronized (this) {
Set<ClusterKey> availableAgentKeySet = new HashSet<>(clusterPointRepository.size());

Expand All @@ -106,10 +106,4 @@ public Set<ClusterKey> getAvailableAgentKeyList() {
}
}

public void clear() {
synchronized (this) {
clusterPointRepository.clear();
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,9 @@
import com.navercorp.pinpoint.thrift.io.HeaderTBaseSerializer;
import com.navercorp.pinpoint.thrift.io.SerializerFactory;
import com.navercorp.pinpoint.thrift.util.SerializationUtils;

import org.apache.thrift.TBase;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.thrift.TBase;

import javax.annotation.PreDestroy;
import java.util.Objects;
Expand Down Expand Up @@ -107,7 +106,7 @@
if (request == null) {
return StreamCode.TYPE_UNKNOWN;
} else if (request instanceof TCommandTransfer) {
return handleStreamRouteCreate((TCommandTransfer) request, packet, streamChannel);
return handleStreamRouteCreate((TCommandTransfer) request, streamChannel);

Check warning on line 109 in collector/src/main/java/com/navercorp/pinpoint/collector/cluster/ClusterPointRouter.java

View check run for this annotation

Codecov / codecov/patch

collector/src/main/java/com/navercorp/pinpoint/collector/cluster/ClusterPointRouter.java#L109

Added line #L109 was not covered by tests
} else {
return StreamCode.TYPE_UNSUPPORT;
}
Expand All @@ -120,7 +119,7 @@
streamRouteHandler.close(streamChannel);
}

private boolean handleRouteRequest(TCommandTransfer request, RequestPacket requestPacket, PinpointSocket pinpointSocket) {
private void handleRouteRequest(TCommandTransfer request, RequestPacket requestPacket, PinpointSocket pinpointSocket) {
logger.info("handleRouteRequest() request:{}, remote:{}", request, pinpointSocket.getRemoteAddress());

byte[] payload = request.getPayload();
Expand All @@ -130,7 +129,9 @@
TCommandTransferResponse response = routeHandler.onRoute(event);
pinpointSocket.response(requestPacket.getRequestId(), serialize(response));

return response.getRouteResult() == TRouteResult.OK;
if (response.getRouteResult() != TRouteResult.OK) {
throw new RuntimeException("RouteResult is not OK");

Check warning on line 133 in collector/src/main/java/com/navercorp/pinpoint/collector/cluster/ClusterPointRouter.java

View check run for this annotation

Codecov / codecov/patch

collector/src/main/java/com/navercorp/pinpoint/collector/cluster/ClusterPointRouter.java#L133

Added line #L133 was not covered by tests
}
}

private void handleRouteRequestFail(String message, RequestPacket requestPacket, PinpointSocket pinpointSocket) {
Expand All @@ -140,7 +141,7 @@
pinpointSocket.response(requestPacket.getRequestId(), serialize(tResult));
}

private StreamCode handleStreamRouteCreate(TCommandTransfer request, StreamCreatePacket packet, ServerStreamChannel serverStreamChannel) {
private StreamCode handleStreamRouteCreate(TCommandTransfer request, ServerStreamChannel serverStreamChannel) {
byte[] payload = request.getPayload();
TBase<?, ?> command = deserialize(payload);
if (command == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,9 @@
*/
public interface ClusterService {

void setUp() throws Exception;
void setup();

void tearDown() throws Exception;

boolean isEnable();
void tearDown();

ProfilerClusterManager getProfilerClusterManager();
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,6 @@

package com.navercorp.pinpoint.collector.cluster;

import com.navercorp.pinpoint.collector.cluster.zookeeper.ZookeeperClusterService;
import com.navercorp.pinpoint.collector.config.CollectorClusterProperties;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.FactoryBean;
import org.springframework.beans.factory.InitializingBean;
Expand All @@ -28,22 +24,14 @@

public class ClusterServiceFactory implements FactoryBean<ClusterService>, InitializingBean, DisposableBean {

private final Logger logger = LogManager.getLogger(this.getClass());

private CollectorClusterProperties collectorClusterProperties;
private ClusterPointRouter clusterPointRouter;

private ClusterService clusterService;

@Override
public ClusterService getObject() throws Exception {
public ClusterService getObject() {
return this.clusterService;
}

public void setClusterProperties(CollectorClusterProperties collectorClusterProperties) {
this.collectorClusterProperties = Objects.requireNonNull(collectorClusterProperties, "collectorClusterProperties");
}

public void setClusterPointRouter(ClusterPointRouter clusterPointRouter) {
this.clusterPointRouter = Objects.requireNonNull(clusterPointRouter, "clusterPointRouter");
}
Expand All @@ -57,26 +45,16 @@
@Override
public void afterPropertiesSet() throws Exception {
this.clusterService = newClusterService();
this.clusterService.setUp();
this.clusterService.setup();

Check warning on line 48 in collector/src/main/java/com/navercorp/pinpoint/collector/cluster/ClusterServiceFactory.java

View check run for this annotation

Codecov / codecov/patch

collector/src/main/java/com/navercorp/pinpoint/collector/cluster/ClusterServiceFactory.java#L48

Added line #L48 was not covered by tests
}

private ClusterService newClusterService() {
if (collectorClusterProperties.isClusterEnable()) {
return new ZookeeperClusterService(collectorClusterProperties, clusterPointRouter);
}
logger.info("pinpoint-collector cluster disable");
return new DisableClusterService();
return new ClusterServiceImpl(clusterPointRouter);

Check warning on line 52 in collector/src/main/java/com/navercorp/pinpoint/collector/cluster/ClusterServiceFactory.java

View check run for this annotation

Codecov / codecov/patch

collector/src/main/java/com/navercorp/pinpoint/collector/cluster/ClusterServiceFactory.java#L52

Added line #L52 was not covered by tests
}

@Override
public Class<?> getObjectType() {
return ClusterService.class;
}

@Override
public boolean isSingleton() {
return FactoryBean.super.isSingleton();
}


}
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* Copyright 2014 NAVER Corp.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.navercorp.pinpoint.collector.cluster;

import com.navercorp.pinpoint.common.server.cluster.zookeeper.util.CommonState;
import com.navercorp.pinpoint.common.server.cluster.zookeeper.util.CommonStateContext;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.Objects;

/**
* @author koo.taejin
*/
public class ClusterServiceImpl implements ClusterService {

private final Logger logger = LogManager.getLogger(this.getClass());

Check warning on line 31 in collector/src/main/java/com/navercorp/pinpoint/collector/cluster/ClusterServiceImpl.java

View check run for this annotation

Codecov / codecov/patch

collector/src/main/java/com/navercorp/pinpoint/collector/cluster/ClusterServiceImpl.java#L31

Added line #L31 was not covered by tests

private final ClusterPointRouter clusterPointRouter;
private final CommonStateContext serviceState = new CommonStateContext();

Check warning on line 34 in collector/src/main/java/com/navercorp/pinpoint/collector/cluster/ClusterServiceImpl.java

View check run for this annotation

Codecov / codecov/patch

collector/src/main/java/com/navercorp/pinpoint/collector/cluster/ClusterServiceImpl.java#L34

Added line #L34 was not covered by tests

private ProfilerClusterManager profilerClusterManager;

public ClusterServiceImpl(ClusterPointRouter clusterPointRouter) {
this.clusterPointRouter = Objects.requireNonNull(clusterPointRouter, "clusterPointRouter");
}

Check warning on line 40 in collector/src/main/java/com/navercorp/pinpoint/collector/cluster/ClusterServiceImpl.java

View check run for this annotation

Codecov / codecov/patch

collector/src/main/java/com/navercorp/pinpoint/collector/cluster/ClusterServiceImpl.java#L38-L40

Added lines #L38 - L40 were not covered by tests

@Override
public void setup() {
logger.info("Setting up cluster");

Check warning on line 44 in collector/src/main/java/com/navercorp/pinpoint/collector/cluster/ClusterServiceImpl.java

View check run for this annotation

Codecov / codecov/patch

collector/src/main/java/com/navercorp/pinpoint/collector/cluster/ClusterServiceImpl.java#L44

Added line #L44 was not covered by tests

switch (this.serviceState.getCurrentState()) {
case NEW:
if (this.serviceState.changeStateInitializing()) {
logger.info("{} initialization started.", this.getClass().getSimpleName());

Check warning on line 49 in collector/src/main/java/com/navercorp/pinpoint/collector/cluster/ClusterServiceImpl.java

View check run for this annotation

Codecov / codecov/patch

collector/src/main/java/com/navercorp/pinpoint/collector/cluster/ClusterServiceImpl.java#L49

Added line #L49 was not covered by tests

this.profilerClusterManager = new MemoryProfilerClusterManager(clusterPointRouter.getTargetClusterPointRepository());
this.profilerClusterManager.start();

Check warning on line 52 in collector/src/main/java/com/navercorp/pinpoint/collector/cluster/ClusterServiceImpl.java

View check run for this annotation

Codecov / codecov/patch

collector/src/main/java/com/navercorp/pinpoint/collector/cluster/ClusterServiceImpl.java#L51-L52

Added lines #L51 - L52 were not covered by tests

this.serviceState.changeStateStarted();
logger.info("{} initialization completed.", this.getClass().getSimpleName());

Check warning on line 55 in collector/src/main/java/com/navercorp/pinpoint/collector/cluster/ClusterServiceImpl.java

View check run for this annotation

Codecov / codecov/patch

collector/src/main/java/com/navercorp/pinpoint/collector/cluster/ClusterServiceImpl.java#L54-L55

Added lines #L54 - L55 were not covered by tests
}
break;
case INITIALIZING:
logger.info("{} already initializing.", this.getClass().getSimpleName());
break;

Check warning on line 60 in collector/src/main/java/com/navercorp/pinpoint/collector/cluster/ClusterServiceImpl.java

View check run for this annotation

Codecov / codecov/patch

collector/src/main/java/com/navercorp/pinpoint/collector/cluster/ClusterServiceImpl.java#L59-L60

Added lines #L59 - L60 were not covered by tests
case STARTED:
logger.info("{} already started.", this.getClass().getSimpleName());
break;

Check warning on line 63 in collector/src/main/java/com/navercorp/pinpoint/collector/cluster/ClusterServiceImpl.java

View check run for this annotation

Codecov / codecov/patch

collector/src/main/java/com/navercorp/pinpoint/collector/cluster/ClusterServiceImpl.java#L62-L63

Added lines #L62 - L63 were not covered by tests
case DESTROYING:
throw new IllegalStateException("Already destroying.");

Check warning on line 65 in collector/src/main/java/com/navercorp/pinpoint/collector/cluster/ClusterServiceImpl.java

View check run for this annotation

Codecov / codecov/patch

collector/src/main/java/com/navercorp/pinpoint/collector/cluster/ClusterServiceImpl.java#L65

Added line #L65 was not covered by tests
case STOPPED:
throw new IllegalStateException("Already stopped.");

Check warning on line 67 in collector/src/main/java/com/navercorp/pinpoint/collector/cluster/ClusterServiceImpl.java

View check run for this annotation

Codecov / codecov/patch

collector/src/main/java/com/navercorp/pinpoint/collector/cluster/ClusterServiceImpl.java#L67

Added line #L67 was not covered by tests
case ILLEGAL_STATE:
throw new IllegalStateException("Invalid State.");

Check warning on line 69 in collector/src/main/java/com/navercorp/pinpoint/collector/cluster/ClusterServiceImpl.java

View check run for this annotation

Codecov / codecov/patch

collector/src/main/java/com/navercorp/pinpoint/collector/cluster/ClusterServiceImpl.java#L69

Added line #L69 was not covered by tests
}
}

Check warning on line 71 in collector/src/main/java/com/navercorp/pinpoint/collector/cluster/ClusterServiceImpl.java

View check run for this annotation

Codecov / codecov/patch

collector/src/main/java/com/navercorp/pinpoint/collector/cluster/ClusterServiceImpl.java#L71

Added line #L71 was not covered by tests


@Override
public void tearDown() {
logger.info("pinpoint-collector cluster tearDown");

Check warning on line 76 in collector/src/main/java/com/navercorp/pinpoint/collector/cluster/ClusterServiceImpl.java

View check run for this annotation

Codecov / codecov/patch

collector/src/main/java/com/navercorp/pinpoint/collector/cluster/ClusterServiceImpl.java#L76

Added line #L76 was not covered by tests

if (!(this.serviceState.changeStateDestroying())) {
CommonState state = this.serviceState.getCurrentState();

Check warning on line 79 in collector/src/main/java/com/navercorp/pinpoint/collector/cluster/ClusterServiceImpl.java

View check run for this annotation

Codecov / codecov/patch

collector/src/main/java/com/navercorp/pinpoint/collector/cluster/ClusterServiceImpl.java#L79

Added line #L79 was not covered by tests

logger.info("{} already {}.", this.getClass().getSimpleName(), state);
return;

Check warning on line 82 in collector/src/main/java/com/navercorp/pinpoint/collector/cluster/ClusterServiceImpl.java

View check run for this annotation

Codecov / codecov/patch

collector/src/main/java/com/navercorp/pinpoint/collector/cluster/ClusterServiceImpl.java#L81-L82

Added lines #L81 - L82 were not covered by tests
}

logger.info("{} destroying started.", this.getClass().getSimpleName());

Check warning on line 85 in collector/src/main/java/com/navercorp/pinpoint/collector/cluster/ClusterServiceImpl.java

View check run for this annotation

Codecov / codecov/patch

collector/src/main/java/com/navercorp/pinpoint/collector/cluster/ClusterServiceImpl.java#L85

Added line #L85 was not covered by tests

if (this.profilerClusterManager != null) {
profilerClusterManager.stop();

Check warning on line 88 in collector/src/main/java/com/navercorp/pinpoint/collector/cluster/ClusterServiceImpl.java

View check run for this annotation

Codecov / codecov/patch

collector/src/main/java/com/navercorp/pinpoint/collector/cluster/ClusterServiceImpl.java#L88

Added line #L88 was not covered by tests
}

this.serviceState.changeStateStopped();
logger.info("{} destroying completed.", this.getClass().getSimpleName());
}

Check warning on line 93 in collector/src/main/java/com/navercorp/pinpoint/collector/cluster/ClusterServiceImpl.java

View check run for this annotation

Codecov / codecov/patch

collector/src/main/java/com/navercorp/pinpoint/collector/cluster/ClusterServiceImpl.java#L91-L93

Added lines #L91 - L93 were not covered by tests

@Override
public ProfilerClusterManager getProfilerClusterManager() {
return profilerClusterManager;

Check warning on line 97 in collector/src/main/java/com/navercorp/pinpoint/collector/cluster/ClusterServiceImpl.java

View check run for this annotation

Codecov / codecov/patch

collector/src/main/java/com/navercorp/pinpoint/collector/cluster/ClusterServiceImpl.java#L97

Added line #L97 was not covered by tests
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,15 @@ public class DisableClusterService implements ClusterService {
private final ProfilerClusterManager profilerClusterManager = new DisabledProfilerClusterManager();

@Override
public void setUp() throws Exception {
public void setup() {

}

@Override
public void tearDown() throws Exception {
public void tearDown() {

}

@Override
public boolean isEnable() {
return false;
}

@Override
public ProfilerClusterManager getProfilerClusterManager() {
return profilerClusterManager;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,6 @@ public void register(ClusterPoint<?> targetClusterPoint) {
public void unregister(ClusterPoint<?> targetClusterPoint) {
}

@Override
public void refresh() {
}

@Override
public List<String> getClusterData() {
return Collections.emptyList();
Expand Down
Loading