Skip to content

Commit

Permalink
optimize code
Browse files Browse the repository at this point in the history
  • Loading branch information
mxsm committed Apr 15, 2024
1 parent 8936c98 commit 1508e4c
Show file tree
Hide file tree
Showing 10 changed files with 34 additions and 79 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.eventmesh.common;


public abstract class MetricsConstants {
public class MetricsConstants {

private MetricsConstants() {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.eventmesh.runtime.boot;

import org.apache.eventmesh.common.Pair;
import org.apache.eventmesh.common.config.CommonConfiguration;
import org.apache.eventmesh.common.protocol.tcp.Command;
import org.apache.eventmesh.common.protocol.tcp.EventMeshMessage;
Expand All @@ -27,7 +28,6 @@
import org.apache.eventmesh.common.protocol.tcp.codec.Codec;
import org.apache.eventmesh.common.utils.AssertUtils;
import org.apache.eventmesh.common.utils.IPUtils;
import org.apache.eventmesh.runtime.common.Pair;
import org.apache.eventmesh.runtime.configuration.EventMeshTCPConfiguration;
import org.apache.eventmesh.runtime.constants.EventMeshConstants;
import org.apache.eventmesh.runtime.core.protocol.tcp.client.EventMeshTcp2Client;
Expand Down Expand Up @@ -291,8 +291,8 @@ private void processTcpCommandRequest(final Package pkg, final ChannelHandlerCon
final long startTime, final Command cmd) {

Pair<TcpProcessor, ThreadPoolExecutor> pair = tcpRequestProcessorTable.get(cmd);
pair.getObject2().submit(() -> {
TcpProcessor processor = pair.getObject1();
pair.getRight().submit(() -> {
TcpProcessor processor = pair.getLeft();

processor.process(pkg, ctx, startTime);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,20 +152,17 @@ public void init() throws Exception {

if (Objects.nonNull(eventMeshTCPServer)) {
MetricsManager metricsManager = eventMeshTCPServer.getEventMeshTcpMetricsManager();
this.eventMeshMetricsManager.addMetricManager(metricsManager);
this.eventMeshMetricsManager.addMetrics(metricsManager.getMetrics());
addMetricsManagerAndMetrics(metricsManager);
}

if (Objects.nonNull(eventMeshGrpcServer)) {
MetricsManager metricsManager = eventMeshGrpcServer.getEventMeshGrpcMetricsManager();
this.eventMeshMetricsManager.addMetricManager(metricsManager);
this.eventMeshMetricsManager.addMetrics(metricsManager.getMetrics());
addMetricsManagerAndMetrics(metricsManager);
}

if (Objects.nonNull(eventMeshHTTPServer)) {
MetricsManager metricsManager = eventMeshHTTPServer.getEventMeshHttpMetricsManager();
this.eventMeshMetricsManager.addMetricManager(metricsManager);
this.eventMeshMetricsManager.addMetrics(metricsManager.getMetrics());
addMetricsManagerAndMetrics(metricsManager);
}

if (Objects.nonNull(eventMeshMetricsManager)) {
Expand All @@ -188,6 +185,13 @@ public void init() throws Exception {
log.info(SERVER_STATE_MSG, serviceState);
}

private void addMetricsManagerAndMetrics(MetricsManager metricsManager) {
if (Objects.nonNull(metricsManager)) {
this.eventMeshMetricsManager.addMetricManager(metricsManager);
this.eventMeshMetricsManager.addMetrics(metricsManager.getMetrics());
}
}

public void start() throws Exception {
if (Objects.nonNull(configuration)) {
if (configuration.isEventMeshServerSecurityEnable()) {
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ public void addMetric(Metric metric) {
* Initializes the EventMeshMetricsManager by registering the metrics with the metrics registries.
*/
public void init() {
GeneralMetricsManager.registerMetrics(metricsRegistries);
MetricsUtils.registerMetrics(metricsRegistries);
// Register metrics
metricsRegistries.stream().forEach(metricsRegistry -> metricsRegistry.register(metrics));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,6 @@

import java.util.List;

/**
* metric manager interface
*/

/**
* MetricsManager is an interface for managing metrics.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
* Managing general metrics.
*/
@UtilityClass
public class GeneralMetricsManager {
public class MetricsUtils {

private static Attributes EMPTY_ATTRIBUTES = Attributes.builder().build();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@
import org.apache.eventmesh.metrics.api.MetricsRegistry;
import org.apache.eventmesh.metrics.api.model.Metric;
import org.apache.eventmesh.runtime.boot.EventMeshGrpcServer;
import org.apache.eventmesh.runtime.metrics.GeneralMetricsManager;
import org.apache.eventmesh.runtime.metrics.MetricsManager;
import org.apache.eventmesh.runtime.metrics.MetricsUtils;

import java.util.ArrayList;
import java.util.HashMap;
Expand Down Expand Up @@ -92,42 +92,42 @@ public void recordReceiveMsgFromClient(final String clientAddress) {
Map<String, String> attributes = new HashMap<>(labelMap);
attributes.put(MetricsConstants.CLIENT_PROTOCOL_TYPE, ProtocolType.GRPC.name());
attributes.put(MetricsConstants.CLIENT_ADDRESS, Optional.ofNullable(clientAddress).orElse(MetricsConstants.UNKOWN));
GeneralMetricsManager.incrementClientToEventMeshMsgNum(attributes);
MetricsUtils.incrementClientToEventMeshMsgNum(attributes);
}

public void recordReceiveMsgFromClient(final int count, String clientAddress) {
grpcMetrics.getClient2EventMeshMsgNum().addAndGet(count);
Map<String, String> attributes = new HashMap<>(labelMap);
attributes.put(MetricsConstants.CLIENT_PROTOCOL_TYPE, ProtocolType.GRPC.name());
attributes.put(MetricsConstants.CLIENT_ADDRESS, Optional.ofNullable(clientAddress).orElse(MetricsConstants.UNKOWN));
GeneralMetricsManager.incrementClientToEventMeshMsgNum(attributes, count);
MetricsUtils.incrementClientToEventMeshMsgNum(attributes, count);
}

public void recordSendMsgToQueue() {
grpcMetrics.getEventMesh2MqMsgNum().incrementAndGet();
Map<String, String> attributes = new HashMap<>(labelMap);
GeneralMetricsManager.incrementEventMeshToMQMsgNum(attributes);
MetricsUtils.incrementEventMeshToMQMsgNum(attributes);
}

public void recordReceiveMsgFromQueue() {
grpcMetrics.getMq2EventMeshMsgNum().incrementAndGet();
Map<String, String> attributes = new HashMap<>(labelMap);
GeneralMetricsManager.incrementMQToEventMeshMsgNum(attributes);
MetricsUtils.incrementMQToEventMeshMsgNum(attributes);
}

public void recordSendMsgToClient(final String clientAddress) {
grpcMetrics.getEventMesh2ClientMsgNum().incrementAndGet();
Map<String, String> attributes = new HashMap<>(labelMap);
attributes.put(MetricsConstants.CLIENT_PROTOCOL_TYPE, ProtocolType.TCP.name());
attributes.put(MetricsConstants.CLIENT_ADDRESS, Optional.ofNullable(clientAddress).orElse(MetricsConstants.UNKOWN));
GeneralMetricsManager.incrementEventMeshToClientMsgNum(attributes);
MetricsUtils.incrementEventMeshToClientMsgNum(attributes);
}

public void recordGrpcPublishHandleCost(long costTime, String clientAddress) {
Map<String, String> attributes = new HashMap<>(labelMap);
attributes.put(MetricsConstants.CLIENT_PROTOCOL_TYPE, ProtocolType.TCP.name());
attributes.put(MetricsConstants.CLIENT_ADDRESS, Optional.ofNullable(clientAddress).orElse(MetricsConstants.UNKOWN));
grpcMetrics.recordGrpcPublishHandleCost(costTime, GeneralMetricsManager.buildAttributes(attributes));
grpcMetrics.recordGrpcPublishHandleCost(costTime, MetricsUtils.buildAttributes(attributes));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@
import org.apache.eventmesh.metrics.api.MetricsRegistry;
import org.apache.eventmesh.metrics.api.model.Metric;
import org.apache.eventmesh.runtime.boot.EventMeshHTTPServer;
import org.apache.eventmesh.runtime.metrics.GeneralMetricsManager;
import org.apache.eventmesh.runtime.metrics.MetricsManager;
import org.apache.eventmesh.runtime.metrics.MetricsUtils;

import java.util.ArrayList;
import java.util.HashMap;
Expand Down Expand Up @@ -114,31 +114,31 @@ public void recordReceiveMsgFromClient(final String clientAddress) {
Map<String, String> attributes = new HashMap<>(labelMap);
attributes.put(MetricsConstants.CLIENT_PROTOCOL_TYPE, ProtocolType.GRPC.name());
attributes.put(MetricsConstants.CLIENT_ADDRESS, Optional.ofNullable(clientAddress).orElse(MetricsConstants.UNKOWN));
GeneralMetricsManager.incrementClientToEventMeshMsgNum(attributes);
MetricsUtils.incrementClientToEventMeshMsgNum(attributes);
}

public void recordReceiveMsgFromClient(final int count, String clientAddress) {
Map<String, String> attributes = new HashMap<>(labelMap);
attributes.put(MetricsConstants.CLIENT_PROTOCOL_TYPE, ProtocolType.GRPC.name());
attributes.put(MetricsConstants.CLIENT_ADDRESS, Optional.ofNullable(clientAddress).orElse(MetricsConstants.UNKOWN));
GeneralMetricsManager.incrementClientToEventMeshMsgNum(attributes, count);
MetricsUtils.incrementClientToEventMeshMsgNum(attributes, count);
}

public void recordSendMsgToQueue() {
Map<String, String> attributes = new HashMap<>(labelMap);
GeneralMetricsManager.incrementEventMeshToMQMsgNum(attributes);
MetricsUtils.incrementEventMeshToMQMsgNum(attributes);
}

public void recordReceiveMsgFromQueue() {
Map<String, String> attributes = new HashMap<>(labelMap);
GeneralMetricsManager.incrementMQToEventMeshMsgNum(attributes);
MetricsUtils.incrementMQToEventMeshMsgNum(attributes);
}

public void recordSendMsgToClient(final String clientAddress) {
Map<String, String> attributes = new HashMap<>(labelMap);
attributes.put(MetricsConstants.CLIENT_PROTOCOL_TYPE, ProtocolType.TCP.name());
attributes.put(MetricsConstants.CLIENT_ADDRESS, Optional.ofNullable(clientAddress).orElse(MetricsConstants.UNKOWN));
GeneralMetricsManager.incrementEventMeshToClientMsgNum(attributes);
MetricsUtils.incrementEventMeshToClientMsgNum(attributes);
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
import org.apache.eventmesh.metrics.api.MetricsRegistry;
import org.apache.eventmesh.metrics.api.model.Metric;
import org.apache.eventmesh.runtime.boot.EventMeshTCPServer;
import org.apache.eventmesh.runtime.metrics.GeneralMetricsManager;
import org.apache.eventmesh.runtime.metrics.MetricsManager;
import org.apache.eventmesh.runtime.metrics.MetricsUtils;

import java.util.ArrayList;
import java.util.HashMap;
Expand Down Expand Up @@ -85,27 +85,27 @@ public void client2eventMeshMsgNumIncrement(final String clientAddress) {
Map<String, String> attributes = new HashMap<>(labelMap);
attributes.put(MetricsConstants.CLIENT_PROTOCOL_TYPE, ProtocolType.TCP.name());
attributes.put(MetricsConstants.CLIENT_ADDRESS, Optional.ofNullable(clientAddress).orElse(MetricsConstants.UNKOWN));
GeneralMetricsManager.incrementClientToEventMeshMsgNum(attributes);
MetricsUtils.incrementClientToEventMeshMsgNum(attributes);
}

public void eventMesh2mqMsgNumIncrement() {
tcpMetrics.getEventMesh2mqMsgNum().getAndIncrement();
Map<String, String> attributes = new HashMap<>(labelMap);
GeneralMetricsManager.incrementEventMeshToMQMsgNum(attributes);
MetricsUtils.incrementEventMeshToMQMsgNum(attributes);
}

public void mq2eventMeshMsgNumIncrement() {
tcpMetrics.getMq2eventMeshMsgNum().getAndIncrement();
Map<String, String> attributes = new HashMap<>(labelMap);
GeneralMetricsManager.incrementMQToEventMeshMsgNum(attributes);
MetricsUtils.incrementMQToEventMeshMsgNum(attributes);
}

public void eventMesh2clientMsgNumIncrement(final String clientAddress) {
tcpMetrics.getEventMesh2clientMsgNum().getAndIncrement();
Map<String, String> attributes = new HashMap<>(labelMap);
attributes.put(MetricsConstants.CLIENT_PROTOCOL_TYPE, ProtocolType.TCP.name());
attributes.put(MetricsConstants.CLIENT_ADDRESS, Optional.ofNullable(clientAddress).orElse(MetricsConstants.UNKOWN));
GeneralMetricsManager.incrementEventMeshToClientMsgNum(attributes);
MetricsUtils.incrementEventMeshToClientMsgNum(attributes);
}

public TcpMetrics getTcpMetrics() {
Expand Down

0 comments on commit 1508e4c

Please sign in to comment.