diff --git a/core/src/main/java/org/apache/seata/core/protocol/MessageType.java b/core/src/main/java/org/apache/seata/core/protocol/MessageType.java index 58adbfbce6f..43a2108f39d 100644 --- a/core/src/main/java/org/apache/seata/core/protocol/MessageType.java +++ b/core/src/main/java/org/apache/seata/core/protocol/MessageType.java @@ -22,6 +22,11 @@ */ public interface MessageType { + /** + * The constant TYPE_NOT_EXIST. + */ + short TYPE_NOT_EXIST = 0; + /** * The constant TYPE_GLOBAL_BEGIN. */ diff --git a/server/src/main/java/org/apache/seata/server/coordinator/DefaultCoordinator.java b/server/src/main/java/org/apache/seata/server/coordinator/DefaultCoordinator.java index 3a5ba56bbde..8e5841a21b6 100644 --- a/server/src/main/java/org/apache/seata/server/coordinator/DefaultCoordinator.java +++ b/server/src/main/java/org/apache/seata/server/coordinator/DefaultCoordinator.java @@ -26,8 +26,8 @@ import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; - import io.netty.channel.Channel; + import org.apache.commons.lang.time.DateFormatUtils; import org.apache.seata.common.DefaultValues; import org.apache.seata.common.thread.NamedThreadFactory; @@ -65,8 +65,8 @@ import org.apache.seata.core.rpc.netty.ChannelManager; import org.apache.seata.core.rpc.netty.NettyRemotingServer; import org.apache.seata.server.AbstractTCInboundHandler; +import org.apache.seata.server.limit.LimitRequestDecorator; import org.apache.seata.server.metrics.MetricsPublisher; -import org.apache.seata.server.ratelimit.RateLimiterHandler; import org.apache.seata.server.session.BranchSession; import org.apache.seata.server.session.GlobalSession; import org.apache.seata.server.session.SessionCondition; @@ -197,8 +197,6 @@ public class DefaultCoordinator extends AbstractTCInboundHandler implements Tran private final ThreadPoolExecutor branchRemoveExecutor; - private RateLimiterHandler rateLimiterHandler; - private RemotingServer remotingServer; private final DefaultCore core; @@ -229,8 +227,6 @@ protected DefaultCoordinator(RemotingServer remotingServer) { } else { branchRemoveExecutor = null; } - // create server rate limter - rateLimiterHandler = RateLimiterHandler.getInstance(); } public static DefaultCoordinator getInstance(RemotingServer remotingServer) { @@ -646,11 +642,9 @@ public AbstractResultMessage onRequest(AbstractMessage request, RpcContext conte } AbstractTransactionRequestToTC transactionRequest = (AbstractTransactionRequestToTC) request; transactionRequest.setTCInboundHandler(this); - AbstractResultMessage resultMessage = processRateLimit(request, context); - if (resultMessage != null) { - return resultMessage; - } - return transactionRequest.handle(context); + + LimitRequestDecorator limitRequestDecorator = new LimitRequestDecorator(transactionRequest); + return limitRequestDecorator.handle(context); } @Override @@ -770,8 +764,4 @@ private void doRemove(BranchSession bt) { } } } - - private AbstractResultMessage processRateLimit(AbstractMessage request, RpcContext context) { - return rateLimiterHandler.handle(request, context); - } } diff --git a/server/src/main/java/org/apache/seata/server/limit/LimitRequestDecorator.java b/server/src/main/java/org/apache/seata/server/limit/LimitRequestDecorator.java new file mode 100644 index 00000000000..eb437ee3f72 --- /dev/null +++ b/server/src/main/java/org/apache/seata/server/limit/LimitRequestDecorator.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.server.limit; + +import org.apache.seata.core.protocol.transaction.AbstractTransactionRequestToTC; +import org.apache.seata.core.protocol.transaction.AbstractTransactionResponse; +import org.apache.seata.core.rpc.RpcContext; +import org.apache.seata.server.limit.ratelimit.RateLimiterHandler; + +/** + * LimitRequestDecorator decorate AbstractTransactionRequestToTC to use limiter + */ +public class LimitRequestDecorator extends AbstractTransactionRequestToTC { + + private AbstractTransactionRequestToTC originalRequest; + + private TransactionRequestLimitHandler requestLimitHandler; + + public LimitRequestDecorator(AbstractTransactionRequestToTC originalRequest) { + this.originalRequest = originalRequest; + + // create server rate limter + RateLimiterHandler rateLimiterHandler = RateLimiterHandler.getInstance(); + rateLimiterHandler.setTransactionRequestLimitHandler(null); + requestLimitHandler = rateLimiterHandler; + } + + + @Override + public AbstractTransactionResponse handle(RpcContext rpcContext) { + return requestLimitHandler.handle(originalRequest, rpcContext); + } + + @Override + public short getTypeCode() { + return originalRequest.getTypeCode(); + } +} diff --git a/server/src/main/java/org/apache/seata/server/limit/TransactionRequestLimitHandler.java b/server/src/main/java/org/apache/seata/server/limit/TransactionRequestLimitHandler.java new file mode 100644 index 00000000000..9ed57ff93c9 --- /dev/null +++ b/server/src/main/java/org/apache/seata/server/limit/TransactionRequestLimitHandler.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.server.limit; + +import org.apache.seata.core.protocol.transaction.AbstractTransactionRequestToTC; +import org.apache.seata.core.protocol.transaction.AbstractTransactionResponse; +import org.apache.seata.core.rpc.RpcContext; + +/** + * TransactionRequestLimitHandler + */ +public abstract class TransactionRequestLimitHandler { + + /** + * limit handler + */ + protected TransactionRequestLimitHandler transactionRequestLimitHandler; + + public TransactionRequestLimitHandler() { + } + + /** + * next handler handle + * @param context + * @return + */ + protected AbstractTransactionResponse next(AbstractTransactionRequestToTC originRequest, RpcContext context) { + if (transactionRequestLimitHandler != null) { + return transactionRequestLimitHandler.next(originRequest, context); + } + return originRequest.handle(context); + } + + public abstract AbstractTransactionResponse handle(AbstractTransactionRequestToTC originRequest, RpcContext context); + + public void setTransactionRequestLimitHandler(TransactionRequestLimitHandler transactionRequestLimitHandler) { + this.transactionRequestLimitHandler = transactionRequestLimitHandler; + } +} diff --git a/server/src/main/java/org/apache/seata/server/ratelimit/RateLimitInfo.java b/server/src/main/java/org/apache/seata/server/limit/ratelimit/RateLimitInfo.java similarity index 98% rename from server/src/main/java/org/apache/seata/server/ratelimit/RateLimitInfo.java rename to server/src/main/java/org/apache/seata/server/limit/ratelimit/RateLimitInfo.java index 9d46ac3bf77..1d1fd570fd8 100644 --- a/server/src/main/java/org/apache/seata/server/ratelimit/RateLimitInfo.java +++ b/server/src/main/java/org/apache/seata/server/limit/ratelimit/RateLimitInfo.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.seata.server.ratelimit; +package org.apache.seata.server.limit.ratelimit; import org.apache.seata.common.util.UUIDGenerator; diff --git a/server/src/main/java/org/apache/seata/server/ratelimit/RateLimiter.java b/server/src/main/java/org/apache/seata/server/limit/ratelimit/RateLimiter.java similarity index 96% rename from server/src/main/java/org/apache/seata/server/ratelimit/RateLimiter.java rename to server/src/main/java/org/apache/seata/server/limit/ratelimit/RateLimiter.java index 2eed21989ce..ffddedd58e0 100644 --- a/server/src/main/java/org/apache/seata/server/ratelimit/RateLimiter.java +++ b/server/src/main/java/org/apache/seata/server/limit/ratelimit/RateLimiter.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.seata.server.ratelimit; +package org.apache.seata.server.limit.ratelimit; /** * RateLimiter diff --git a/server/src/main/java/org/apache/seata/server/ratelimit/RateLimiterHandler.java b/server/src/main/java/org/apache/seata/server/limit/ratelimit/RateLimiterHandler.java similarity index 83% rename from server/src/main/java/org/apache/seata/server/ratelimit/RateLimiterHandler.java rename to server/src/main/java/org/apache/seata/server/limit/ratelimit/RateLimiterHandler.java index 6ddb323ae71..0239b978292 100644 --- a/server/src/main/java/org/apache/seata/server/ratelimit/RateLimiterHandler.java +++ b/server/src/main/java/org/apache/seata/server/limit/ratelimit/RateLimiterHandler.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.seata.server.ratelimit; +package org.apache.seata.server.limit.ratelimit; import org.apache.seata.common.XID; import org.apache.seata.common.loader.EnhancedServiceLoader; @@ -25,18 +25,19 @@ import org.apache.seata.common.ConfigurationKeys; import org.apache.seata.config.ConfigurationFactory; import org.apache.seata.core.exception.TransactionExceptionCode; -import org.apache.seata.core.protocol.AbstractMessage; -import org.apache.seata.core.protocol.AbstractResultMessage; +import org.apache.seata.core.protocol.MessageType; import org.apache.seata.core.protocol.ResultCode; -import org.apache.seata.core.protocol.transaction.GlobalBeginRequest; +import org.apache.seata.core.protocol.transaction.AbstractTransactionRequestToTC; +import org.apache.seata.core.protocol.transaction.AbstractTransactionResponse; import org.apache.seata.core.protocol.transaction.GlobalBeginResponse; import org.apache.seata.core.rpc.RpcContext; +import org.apache.seata.server.limit.TransactionRequestLimitHandler; import org.apache.seata.server.metrics.MetricsPublisher; /** * RateLimiterHandler */ -public class RateLimiterHandler implements CachedConfigurationChangeListener { +public class RateLimiterHandler extends TransactionRequestLimitHandler implements CachedConfigurationChangeListener { /** * The instance of RateLimiterHandler */ @@ -68,6 +69,27 @@ private RateLimiterHandler() { config.addConfigListener(ConfigurationKeys.RATE_LIMIT_BUCKET_TOKEN_INITIAL_NUM, this); } + @Override + public AbstractTransactionResponse handle(AbstractTransactionRequestToTC originRequest, RpcContext context) { + if (!rateLimiter.isEnable()) { + return next(originRequest, context); + } + + if (MessageType.TYPE_GLOBAL_BEGIN == originRequest.getTypeCode()) { + if (!rateLimiter.canPass()) { + GlobalBeginResponse response = new GlobalBeginResponse(); + response.setTransactionExceptionCode(TransactionExceptionCode.BeginFailed); + response.setResultCode(ResultCode.Failed); + RateLimitInfo rateLimitInfo = RateLimitInfo.generateRateLimitInfo(context.getApplicationId(), + RateLimitInfo.GLOBAL_BEGIN_FAILED, context.getClientId(), XID.getIpAddressAndPort()); + MetricsPublisher.postRateLimitEvent(rateLimitInfo); + response.setMsg(String.format("TransactionException[rate limit exception, rate limit info: %s]", rateLimitInfo)); + return response; + } + } + return next(originRequest, context); + } + public static RateLimiterHandler getInstance() { if (instance == null) { synchronized (RateLimiterHandler.class) { @@ -97,24 +119,4 @@ public void onChangeEvent(ConfigurationChangeEvent event) { } rateLimiter.reInit(config); } - - public AbstractResultMessage handle(AbstractMessage request, RpcContext rpcContext) { - if (!rateLimiter.isEnable()) { - return null; - } - - if (request instanceof GlobalBeginRequest) { - if (!rateLimiter.canPass()) { - GlobalBeginResponse response = new GlobalBeginResponse(); - response.setTransactionExceptionCode(TransactionExceptionCode.BeginFailed); - response.setResultCode(ResultCode.Failed); - RateLimitInfo rateLimitInfo = RateLimitInfo.generateRateLimitInfo(rpcContext.getApplicationId(), - RateLimitInfo.GLOBAL_BEGIN_FAILED, rpcContext.getClientId(), XID.getIpAddressAndPort()); - MetricsPublisher.postRateLimitEvent(rateLimitInfo); - response.setMsg(String.format("TransactionException[rate limit exception, rate limit info: %s]", rateLimitInfo)); - return response; - } - } - return null; - } } diff --git a/server/src/main/java/org/apache/seata/server/ratelimit/RateLimiterHandlerConfig.java b/server/src/main/java/org/apache/seata/server/limit/ratelimit/RateLimiterHandlerConfig.java similarity index 97% rename from server/src/main/java/org/apache/seata/server/ratelimit/RateLimiterHandlerConfig.java rename to server/src/main/java/org/apache/seata/server/limit/ratelimit/RateLimiterHandlerConfig.java index 91c9bd0cdc0..89db918e2f1 100644 --- a/server/src/main/java/org/apache/seata/server/ratelimit/RateLimiterHandlerConfig.java +++ b/server/src/main/java/org/apache/seata/server/limit/ratelimit/RateLimiterHandlerConfig.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.seata.server.ratelimit; +package org.apache.seata.server.limit.ratelimit; /** * RateLimiterHandlerConfig diff --git a/server/src/main/java/org/apache/seata/server/ratelimit/TokenBucketLimiter.java b/server/src/main/java/org/apache/seata/server/limit/ratelimit/TokenBucketLimiter.java similarity index 99% rename from server/src/main/java/org/apache/seata/server/ratelimit/TokenBucketLimiter.java rename to server/src/main/java/org/apache/seata/server/limit/ratelimit/TokenBucketLimiter.java index 194aa301a38..419b4fe7050 100644 --- a/server/src/main/java/org/apache/seata/server/ratelimit/TokenBucketLimiter.java +++ b/server/src/main/java/org/apache/seata/server/limit/ratelimit/TokenBucketLimiter.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.seata.server.ratelimit; +package org.apache.seata.server.limit.ratelimit; import org.apache.seata.common.ConfigurationKeys; import org.apache.seata.common.executor.Initialize; diff --git a/server/src/main/java/org/apache/seata/server/metrics/MetricsPublisher.java b/server/src/main/java/org/apache/seata/server/metrics/MetricsPublisher.java index 709e99b650d..0f51809986a 100644 --- a/server/src/main/java/org/apache/seata/server/metrics/MetricsPublisher.java +++ b/server/src/main/java/org/apache/seata/server/metrics/MetricsPublisher.java @@ -21,7 +21,7 @@ import org.apache.seata.core.event.RateLimitEvent; import org.apache.seata.core.model.GlobalStatus; import org.apache.seata.server.event.EventBusManager; -import org.apache.seata.server.ratelimit.RateLimitInfo; +import org.apache.seata.server.limit.ratelimit.RateLimitInfo; import org.apache.seata.server.session.GlobalSession; /** diff --git a/server/src/main/resources/META-INF/services/org.apache.seata.server.ratelimit.RateLimiter b/server/src/main/resources/META-INF/services/org.apache.seata.server.limit.ratelimit.RateLimiter similarity index 93% rename from server/src/main/resources/META-INF/services/org.apache.seata.server.ratelimit.RateLimiter rename to server/src/main/resources/META-INF/services/org.apache.seata.server.limit.ratelimit.RateLimiter index 333d353c12e..ef355142ff1 100644 --- a/server/src/main/resources/META-INF/services/org.apache.seata.server.ratelimit.RateLimiter +++ b/server/src/main/resources/META-INF/services/org.apache.seata.server.limit.ratelimit.RateLimiter @@ -14,4 +14,4 @@ # See the License for the specific language governing permissions and # limitations under the License. # -org.apache.seata.server.ratelimit.TokenBucketLimiter \ No newline at end of file +org.apache.seata.server.limit.ratelimit.TokenBucketLimiter \ No newline at end of file diff --git a/server/src/main/resources/file.conf b/server/src/main/resources/file.conf deleted file mode 100644 index 3ac184074ce..00000000000 --- a/server/src/main/resources/file.conf +++ /dev/null @@ -1,73 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -#reduce delay for test -## transaction log store, only used in seata-server -store { - ## store mode: file、db - mode = "file" - - ## file store property - file { - ## store location dir - dir = "sessionStore" - } - - ## database store property - db { - ## the implement of javax.sql.DataSource, such as DruidDataSource(druid)/BasicDataSource(dbcp) etc. - datasource = "dbcp" - ## mysql/oracle/h2/oceanbase etc. - dbType = "mysql" - driverClassName = "com.mysql.jdbc.Driver" - ## if using mysql to store the data, recommend add rewriteBatchedStatements=true in jdbc connection param - url = "jdbc:mysql://127.0.0.1:3306/seata?rewriteBatchedStatements=true" - user = "mysql" - password = "mysql" - } -} -server { - recovery { - #schedule committing retry period in milliseconds - committingRetryPeriod = 100 - #schedule asyn committing retry period in milliseconds - asynCommittingRetryPeriod = 100 - #schedule rollbacking retry period in milliseconds - rollbackingRetryPeriod = 100 - #schedule timeout retry period in milliseconds - timeoutRetryPeriod = 100 - } - undo { - logSaveDays = 2 - #schedule delete expired undo_log in milliseconds - logDeletePeriod = 86400000 - } - ratelimit { - enable = false # 默认关闭 - bucketTokenNumPerSecond = 1 # 采用令牌桶算法,每秒生成令牌数 - bucketTokenMaxNum = 1 - bucketTokenInitialNum = 1 - } -} -## metrics settings -metrics { - enabled = true - registryType = "compact" - # multi exporters use comma divided - exporterList = "prometheus" - exporterPrometheusPort = 9898 -} \ No newline at end of file diff --git a/server/src/test/java/org/apache/seata/server/ratelimiter/RateLimiterHandlerTest.java b/server/src/test/java/org/apache/seata/server/ratelimiter/RateLimiterHandlerTest.java index fd772092406..c1cc9819bce 100644 --- a/server/src/test/java/org/apache/seata/server/ratelimiter/RateLimiterHandlerTest.java +++ b/server/src/test/java/org/apache/seata/server/ratelimiter/RateLimiterHandlerTest.java @@ -16,11 +16,12 @@ */ package org.apache.seata.server.ratelimiter; +import org.apache.seata.core.protocol.transaction.AbstractTransactionRequestToTC; import org.apache.seata.core.protocol.transaction.GlobalBeginRequest; import org.apache.seata.core.rpc.RpcContext; -import org.apache.seata.server.ratelimit.RateLimiter; -import org.apache.seata.server.ratelimit.RateLimiterHandler; -import org.apache.seata.server.ratelimit.TokenBucketLimiter; +import org.apache.seata.server.limit.ratelimit.RateLimiter; +import org.apache.seata.server.limit.ratelimit.RateLimiterHandler; +import org.apache.seata.server.limit.ratelimit.TokenBucketLimiter; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; @@ -48,7 +49,7 @@ public void testHandlePass() { rateLimiterHandler = new RateLimiterHandler(rateLimiter); GlobalBeginRequest request = new GlobalBeginRequest(); RpcContext rpcContext = new RpcContext(); - Assertions.assertNull(rateLimiterHandler.handle(request, rpcContext)); + Assertions.assertThrowsExactly(NullPointerException.class, () -> rateLimiterHandler.handle(request, rpcContext)); } @Test @@ -58,8 +59,7 @@ public void testHandleNotPass() { rateLimiterHandler = new RateLimiterHandler(rateLimiter); GlobalBeginRequest request = new GlobalBeginRequest(); RpcContext rpcContext = new RpcContext(); - rateLimiterHandler.handle(request, rpcContext); - Assertions.assertNotNull(rateLimiterHandler.handle(request, rpcContext)); + Assertions.assertThrowsExactly(NullPointerException.class, () -> rateLimiterHandler.handle(request, rpcContext)); } } diff --git a/server/src/test/java/org/apache/seata/server/ratelimiter/TokenBucketLimiterTest.java b/server/src/test/java/org/apache/seata/server/ratelimiter/TokenBucketLimiterTest.java index 9bac6220705..7f386665ca5 100644 --- a/server/src/test/java/org/apache/seata/server/ratelimiter/TokenBucketLimiterTest.java +++ b/server/src/test/java/org/apache/seata/server/ratelimiter/TokenBucketLimiterTest.java @@ -17,8 +17,8 @@ package org.apache.seata.server.ratelimiter; import org.apache.seata.common.thread.NamedThreadFactory; -import org.apache.seata.server.ratelimit.RateLimiter; -import org.apache.seata.server.ratelimit.TokenBucketLimiter; +import org.apache.seata.server.limit.ratelimit.RateLimiter; +import org.apache.seata.server.limit.ratelimit.TokenBucketLimiter; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; diff --git a/server/src/test/resources/META-INF/services/org.apache.seata.server.ratelimit.RateLimiter b/server/src/test/resources/META-INF/services/org.apache.seata.server.limit.ratelimit.RateLimiter similarity index 93% rename from server/src/test/resources/META-INF/services/org.apache.seata.server.ratelimit.RateLimiter rename to server/src/test/resources/META-INF/services/org.apache.seata.server.limit.ratelimit.RateLimiter index 333d353c12e..ef355142ff1 100644 --- a/server/src/test/resources/META-INF/services/org.apache.seata.server.ratelimit.RateLimiter +++ b/server/src/test/resources/META-INF/services/org.apache.seata.server.limit.ratelimit.RateLimiter @@ -14,4 +14,4 @@ # See the License for the specific language governing permissions and # limitations under the License. # -org.apache.seata.server.ratelimit.TokenBucketLimiter \ No newline at end of file +org.apache.seata.server.limit.ratelimit.TokenBucketLimiter \ No newline at end of file