Skip to content

Commit

Permalink
[pinpoint-apm#10343] Removed cluster between web, collector
Browse files Browse the repository at this point in the history
  • Loading branch information
smilu97 committed Sep 19, 2023
1 parent 2b95dd2 commit 4ac7506
Show file tree
Hide file tree
Showing 121 changed files with 1,035 additions and 7,119 deletions.
18 changes: 5 additions & 13 deletions collector/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,19 +26,11 @@ java -jar -Dpinpoint.zookeeper.address=$ZOOKEEPER_ADDRESS -Dspring.profiles.acti

## Collector port
## gRPC port
| port | protocol | type
| ---- | ---- | ----
| 9991 | TCP | agent
| 9992 | TCP | span
| 9993 | TCP | stat

## Thrift port
| port | protocol | type
| ---- | ---- | ----
| 9994 | TCP | agent
| 9995 | UDP | span
| 9996 | UDP | stat

| port | protocol | type |
|------|----------|-------|
| 9991 | TCP | agent |
| 9992 | TCP | span |
| 9993 | TCP | stat |

## Configuration for development environment
Use /config directory [External Application Properties](https://docs.spring.io/spring-boot/docs/current/reference/html/features.html#features.external-config.files)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,16 +47,16 @@ public ClientStreamChannel requestStream(ClientStreamChannelEventHandler handler
throw new RuntimeException("Unsupported command: " + command);
}
if (clusterPoint instanceof GrpcAgentConnection) {
return openStream(handler, tCommand);
return openStream(handler, command);
}
throw new RuntimeException("Invalid clusterPoint: " + clusterPoint);
}

private ClientStreamChannel openStream(ClientStreamChannelEventHandler handler, TBase<?, ?> tCommand) {
private ClientStreamChannel openStream(ClientStreamChannelEventHandler handler, GeneratedMessageV3 command) {
try {
return ((GrpcAgentConnection) clusterPoint).openStream(tCommand, handler);
return ((GrpcAgentConnection) clusterPoint).openStream(command, handler);
} catch (StreamException e) {
throw new RuntimeException("Failed to openStream " + tCommand);
throw new RuntimeException("Failed to openStream " + command);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,24 @@
import com.navercorp.pinpoint.thrift.sender.message.CommandGrpcToThriftMessageConverter;

import java.util.Objects;
import java.util.Set;

/**
* @author youngjin.kim2
*/
public class AgentConnectionRepositoryImpl implements AgentConnectionRepository {

private final StreamRouteHandler streamRouteHandler;
private final ClusterPointRepository<?> clusterPointRepository;

private final CommandGrpcToThriftMessageConverter messageConverter = new CommandGrpcToThriftMessageConverter();

public AgentConnectionRepositoryImpl(StreamRouteHandler streamRouteHandler) {
public AgentConnectionRepositoryImpl(
StreamRouteHandler streamRouteHandler,
ClusterPointRepository<?> clusterPointRepository
) {
this.streamRouteHandler = Objects.requireNonNull(streamRouteHandler, "streamRouteHandler");
this.clusterPointRepository = Objects.requireNonNull(clusterPointRepository, "clusterPointRepository");
}

@Override
Expand All @@ -46,4 +52,9 @@ public AgentConnection getConnection(ClusterKey key) {
return new AgentConnectionImpl(clusterPoint, this.messageConverter);
}

@Override
public Set<ClusterKey> getClusterKeys() {
return this.clusterPointRepository.getAvailableAgentKeySet();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -33,26 +33,16 @@ public ClusterAddressProvider(Address address) {
this.address = Objects.requireNonNull(address, "address");
}

private void assertAddressNotNull(InetSocketAddress inetSocketAddress) {
if (inetSocketAddress == null) {
throw new IllegalArgumentException("address may not be null");
}
}

@Override
public InetSocketAddress resolve() {
String host = address.getHost();
int port = address.getPort();

InetSocketAddress address = new InetSocketAddress(host, port);
return address;
return new InetSocketAddress(host, port);
}

@Override
public String toString() {
final StringBuilder sb = new StringBuilder("ClusterAddressProvider{");
sb.append("address=").append(address);
sb.append('}');
return sb.toString();
return "ClusterAddressProvider{" + "address=" + address + '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ public List<T> getClusterPointList() {
}
}

public Set<ClusterKey> getAvailableAgentKeyList() {
public Set<ClusterKey> getAvailableAgentKeySet() {
synchronized (this) {
Set<ClusterKey> availableAgentKeySet = new HashSet<>(clusterPointRepository.size());

Expand All @@ -106,10 +106,4 @@ public Set<ClusterKey> getAvailableAgentKeyList() {
}
}

public void clear() {
synchronized (this) {
clusterPointRepository.clear();
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,9 @@
import com.navercorp.pinpoint.thrift.io.HeaderTBaseSerializer;
import com.navercorp.pinpoint.thrift.io.SerializerFactory;
import com.navercorp.pinpoint.thrift.util.SerializationUtils;

import org.apache.thrift.TBase;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.thrift.TBase;

import javax.annotation.PreDestroy;
import java.util.Objects;
Expand Down Expand Up @@ -107,7 +106,7 @@ public StreamCode handleStreamCreatePacket(ServerStreamChannel streamChannel, St
if (request == null) {
return StreamCode.TYPE_UNKNOWN;
} else if (request instanceof TCommandTransfer) {
return handleStreamRouteCreate((TCommandTransfer) request, packet, streamChannel);
return handleStreamRouteCreate((TCommandTransfer) request, streamChannel);
} else {
return StreamCode.TYPE_UNSUPPORT;
}
Expand All @@ -120,7 +119,7 @@ public void handleStreamClosePacket(ServerStreamChannel streamChannel, StreamClo
streamRouteHandler.close(streamChannel);
}

private boolean handleRouteRequest(TCommandTransfer request, RequestPacket requestPacket, PinpointSocket pinpointSocket) {
private void handleRouteRequest(TCommandTransfer request, RequestPacket requestPacket, PinpointSocket pinpointSocket) {
logger.info("handleRouteRequest() request:{}, remote:{}", request, pinpointSocket.getRemoteAddress());

byte[] payload = request.getPayload();
Expand All @@ -130,7 +129,9 @@ private boolean handleRouteRequest(TCommandTransfer request, RequestPacket reque
TCommandTransferResponse response = routeHandler.onRoute(event);
pinpointSocket.response(requestPacket.getRequestId(), serialize(response));

return response.getRouteResult() == TRouteResult.OK;
if (response.getRouteResult() != TRouteResult.OK) {
throw new RuntimeException("RouteResult is not OK");
}
}

private void handleRouteRequestFail(String message, RequestPacket requestPacket, PinpointSocket pinpointSocket) {
Expand All @@ -140,7 +141,7 @@ private void handleRouteRequestFail(String message, RequestPacket requestPacket,
pinpointSocket.response(requestPacket.getRequestId(), serialize(tResult));
}

private StreamCode handleStreamRouteCreate(TCommandTransfer request, StreamCreatePacket packet, ServerStreamChannel serverStreamChannel) {
private StreamCode handleStreamRouteCreate(TCommandTransfer request, ServerStreamChannel serverStreamChannel) {
byte[] payload = request.getPayload();
TBase<?, ?> command = deserialize(payload);
if (command == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,9 @@
*/
public interface ClusterService {

void setUp() throws Exception;
void setup();

void tearDown() throws Exception;

boolean isEnable();
void tearDown();

ProfilerClusterManager getProfilerClusterManager();
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,6 @@

package com.navercorp.pinpoint.collector.cluster;

import com.navercorp.pinpoint.collector.cluster.zookeeper.ZookeeperClusterService;
import com.navercorp.pinpoint.collector.config.CollectorClusterProperties;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.FactoryBean;
import org.springframework.beans.factory.InitializingBean;
Expand All @@ -28,22 +24,14 @@

public class ClusterServiceFactory implements FactoryBean<ClusterService>, InitializingBean, DisposableBean {

private final Logger logger = LogManager.getLogger(this.getClass());

private CollectorClusterProperties collectorClusterProperties;
private ClusterPointRouter clusterPointRouter;

private ClusterService clusterService;

@Override
public ClusterService getObject() throws Exception {
public ClusterService getObject() {
return this.clusterService;
}

public void setClusterProperties(CollectorClusterProperties collectorClusterProperties) {
this.collectorClusterProperties = Objects.requireNonNull(collectorClusterProperties, "collectorClusterProperties");
}

public void setClusterPointRouter(ClusterPointRouter clusterPointRouter) {
this.clusterPointRouter = Objects.requireNonNull(clusterPointRouter, "clusterPointRouter");
}
Expand All @@ -57,26 +45,16 @@ public void destroy() throws Exception {
@Override
public void afterPropertiesSet() throws Exception {
this.clusterService = newClusterService();
this.clusterService.setUp();
this.clusterService.setup();
}

private ClusterService newClusterService() {
if (collectorClusterProperties.isClusterEnable()) {
return new ZookeeperClusterService(collectorClusterProperties, clusterPointRouter);
}
logger.info("pinpoint-collector cluster disable");
return new DisableClusterService();
return new ClusterServiceImpl(clusterPointRouter);
}

@Override
public Class<?> getObjectType() {
return ClusterService.class;
}

@Override
public boolean isSingleton() {
return FactoryBean.super.isSingleton();
}


}
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* Copyright 2014 NAVER Corp.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.navercorp.pinpoint.collector.cluster;

import com.navercorp.pinpoint.common.server.cluster.zookeeper.util.CommonState;
import com.navercorp.pinpoint.common.server.cluster.zookeeper.util.CommonStateContext;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.Objects;

/**
* @author koo.taejin
*/
public class ClusterServiceImpl implements ClusterService {

private final Logger logger = LogManager.getLogger(this.getClass());

private final ClusterPointRouter clusterPointRouter;
private final CommonStateContext serviceState = new CommonStateContext();

private ProfilerClusterManager profilerClusterManager;

public ClusterServiceImpl(ClusterPointRouter clusterPointRouter) {
this.clusterPointRouter = Objects.requireNonNull(clusterPointRouter, "clusterPointRouter");
}

@Override
public void setup() {
logger.info("Setting up cluster");

switch (this.serviceState.getCurrentState()) {
case NEW:
if (this.serviceState.changeStateInitializing()) {
logger.info("{} initialization started.", this.getClass().getSimpleName());

this.profilerClusterManager = new MemoryProfilerClusterManager(clusterPointRouter.getTargetClusterPointRepository());
this.profilerClusterManager.start();

this.serviceState.changeStateStarted();
logger.info("{} initialization completed.", this.getClass().getSimpleName());
}
break;
case INITIALIZING:
logger.info("{} already initializing.", this.getClass().getSimpleName());
break;
case STARTED:
logger.info("{} already started.", this.getClass().getSimpleName());
break;
case DESTROYING:
throw new IllegalStateException("Already destroying.");
case STOPPED:
throw new IllegalStateException("Already stopped.");
case ILLEGAL_STATE:
throw new IllegalStateException("Invalid State.");
}
}


@Override
public void tearDown() {
logger.info("pinpoint-collector cluster tearDown");

if (!(this.serviceState.changeStateDestroying())) {
CommonState state = this.serviceState.getCurrentState();

logger.info("{} already {}.", this.getClass().getSimpleName(), state);
return;
}

logger.info("{} destroying started.", this.getClass().getSimpleName());

if (this.profilerClusterManager != null) {
profilerClusterManager.stop();
}

this.serviceState.changeStateStopped();
logger.info("{} destroying completed.", this.getClass().getSimpleName());
}

@Override
public ProfilerClusterManager getProfilerClusterManager() {
return profilerClusterManager;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,15 @@ public class DisableClusterService implements ClusterService {
private final ProfilerClusterManager profilerClusterManager = new DisabledProfilerClusterManager();

@Override
public void setUp() throws Exception {
public void setup() {

}

@Override
public void tearDown() throws Exception {
public void tearDown() {

}

@Override
public boolean isEnable() {
return false;
}

@Override
public ProfilerClusterManager getProfilerClusterManager() {
return profilerClusterManager;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,6 @@ public void register(ClusterPoint<?> targetClusterPoint) {
public void unregister(ClusterPoint<?> targetClusterPoint) {
}

@Override
public void refresh() {
}

@Override
public List<String> getClusterData() {
return Collections.emptyList();
Expand Down
Loading

0 comments on commit 4ac7506

Please sign in to comment.