From e629e20d893d29d354084e1e00bf921824ec75ba Mon Sep 17 00:00:00 2001 From: hecy7 Date: Fri, 11 Aug 2023 16:20:38 +0800 Subject: [PATCH 01/35] =?UTF-8?q?=E6=8E=A8=E7=90=86=E8=B0=83=E7=94=A8?= =?UTF-8?q?=E6=97=B6caseId=E7=9A=84=E6=A4=8D=E5=85=A5=E6=96=B9=E5=BC=8F?= =?UTF-8?q?=E4=BF=AE=E6=94=B9=E4=B8=BA=E4=BB=8E=E8=AF=B7=E6=B1=82=E5=A4=B4?= =?UTF-8?q?=E4=B8=AD=E6=A4=8D=E5=85=A5=EF=BC=8C=E6=9B=B4=E5=8A=A0=E7=AC=A6?= =?UTF-8?q?=E5=90=88caseId=E5=AD=97=E6=AE=B5=E5=90=AB=E4=B9=89=E4=B8=94?= =?UTF-8?q?=E4=BE=BF=E4=BA=8E=E7=94=A8=E6=88=B7=E5=81=9A=E8=AF=B7=E6=B1=82?= =?UTF-8?q?=E8=BF=BD=E8=B8=AA?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: hecy7 --- .../proxy/controller/ProxyController.java | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/fate-serving-proxy/src/main/java/com/webank/ai/fate/serving/proxy/controller/ProxyController.java b/fate-serving-proxy/src/main/java/com/webank/ai/fate/serving/proxy/controller/ProxyController.java index 15e9cd275..4a3df6f6a 100644 --- a/fate-serving-proxy/src/main/java/com/webank/ai/fate/serving/proxy/controller/ProxyController.java +++ b/fate-serving-proxy/src/main/java/com/webank/ai/fate/serving/proxy/controller/ProxyController.java @@ -105,21 +105,29 @@ public String call() throws Exception { if (logger.isDebugEnabled()) { logger.debug("receive : {} headers {}", data, headers.toSingleValueMap()); } + + String caseId = headers.getFirst("caseId"); final ServiceAdaptor serviceAdaptor = proxyServiceRegister.getServiceAdaptor(callName); + Context context = new BaseContext(); context.setCallName(callName); context.setVersion(version); + context.setCaseId(caseId); + if (null == context.getCaseId() || context.getCaseId().isEmpty()) { + context.setCaseId(UUID.randomUUID().toString().replaceAll("-", "")); + } + InboundPackage inboundPackage = buildInboundPackageFederation(context, data, httpServletRequest); OutboundPackage result = serviceAdaptor.service(context, inboundPackage); + if (result != null && result.getData() != null) { result.getData().remove("log"); result.getData().remove("warn"); result.getData().remove("caseid"); return JsonUtil.object2Json(result.getData()); } - return ""; - + return ""; } }; } @@ -134,10 +142,6 @@ private InboundPackage buildInboundPackageFederation(Context context, Strin Map head = (Map) jsonObject.getOrDefault(Dict.HEAD, new HashMap<>()); Map body = (Map) jsonObject.getOrDefault(Dict.BODY, new HashMap<>()); context.setHostAppid((String) head.getOrDefault(Dict.APP_ID, "")); - context.setCaseId((String) head.getOrDefault(Dict.CASE_ID, "")); - if (null == context.getCaseId() || context.getCaseId().isEmpty()) { - context.setCaseId(UUID.randomUUID().toString().replaceAll("-", "")); - } InboundPackage inboundPackage = new InboundPackage(); inboundPackage.setBody(body); From 553731a158ad3a6723f8c7c7eddb79dd8acdfe55 Mon Sep 17 00:00:00 2001 From: hecy7 Date: Mon, 14 Aug 2023 10:00:20 +0800 Subject: [PATCH 02/35] add caseId param handle on ValidateServiceProvider Signed-off-by: hecy7 --- .../admin/services/provider/ValidateServiceProvider.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/fate-serving-admin/src/main/java/com/webank/ai/fate/serving/admin/services/provider/ValidateServiceProvider.java b/fate-serving-admin/src/main/java/com/webank/ai/fate/serving/admin/services/provider/ValidateServiceProvider.java index 8cfdf96d7..919e050d7 100644 --- a/fate-serving-admin/src/main/java/com/webank/ai/fate/serving/admin/services/provider/ValidateServiceProvider.java +++ b/fate-serving-admin/src/main/java/com/webank/ai/fate/serving/admin/services/provider/ValidateServiceProvider.java @@ -123,6 +123,10 @@ public Object inference(Context context, InboundPackage data) throws Exception { inferenceRequest.setApplyId(params.get("applyId").toString()); } + if(params.get("caseId")!=null) { + inferenceRequest.setCaseId(params.get("caseId").toString()); + } + for (Map.Entry entry : featureData.entrySet()) { inferenceRequest.getFeatureData().put(entry.getKey(), entry.getValue()); } From 5c8431dd0aedb1af85a9d6a51f1f67aded91f049 Mon Sep 17 00:00:00 2001 From: hecy7 Date: Mon, 14 Aug 2023 10:31:23 +0800 Subject: [PATCH 03/35] add caseId param handle from header on ValidateServiceProvider Signed-off-by: hecy7 --- .../services/provider/ValidateServiceProvider.java | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/fate-serving-admin/src/main/java/com/webank/ai/fate/serving/admin/services/provider/ValidateServiceProvider.java b/fate-serving-admin/src/main/java/com/webank/ai/fate/serving/admin/services/provider/ValidateServiceProvider.java index 919e050d7..54f351e27 100644 --- a/fate-serving-admin/src/main/java/com/webank/ai/fate/serving/admin/services/provider/ValidateServiceProvider.java +++ b/fate-serving-admin/src/main/java/com/webank/ai/fate/serving/admin/services/provider/ValidateServiceProvider.java @@ -42,7 +42,10 @@ import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; +import org.springframework.web.context.request.RequestContextHolder; +import org.springframework.web.context.request.ServletRequestAttributes; +import javax.servlet.http.HttpServletRequest; import java.nio.charset.Charset; import java.util.HashMap; import java.util.List; @@ -99,6 +102,10 @@ public Object publishBind(Context context, InboundPackage data) throws Exception @FateServiceMethod(name = "inference") public Object inference(Context context, InboundPackage data) throws Exception { + ServletRequestAttributes attributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes(); + HttpServletRequest request = attributes.getRequest(); + String caseId = request.getHeader("caseId"); + Map params = (Map) data.getBody(); String host = (String) params.get(Dict.HOST); int port = (int) params.get(Dict.PORT); @@ -123,8 +130,8 @@ public Object inference(Context context, InboundPackage data) throws Exception { inferenceRequest.setApplyId(params.get("applyId").toString()); } - if(params.get("caseId")!=null) { - inferenceRequest.setCaseId(params.get("caseId").toString()); + if(caseId != null && !caseId.isEmpty()) { + inferenceRequest.setCaseId(caseId); } for (Map.Entry entry : featureData.entrySet()) { From 00d357e2264bcbd078bace5a7de9532433ca0c15 Mon Sep 17 00:00:00 2001 From: hecy7 Date: Mon, 14 Aug 2023 11:39:59 +0800 Subject: [PATCH 04/35] Bugfix and some code adjustments Signed-off-by: hecy7 --- .../common/health/HealthCheckItemEnum.java | 13 +++++---- .../common/health/HealthCheckRecord.java | 1 + .../common/health/HealthCheckResult.java | 1 + .../common/health/HealthCheckStatus.java | 27 ++++++++++++++++--- .../common/health/HealthCheckUtil.java | 4 +-- .../ai/fate/serving/common/model/Model.java | 9 ++++--- 6 files changed, 38 insertions(+), 17 deletions(-) diff --git a/fate-serving-common/src/main/java/com/webank/ai/fate/serving/common/health/HealthCheckItemEnum.java b/fate-serving-common/src/main/java/com/webank/ai/fate/serving/common/health/HealthCheckItemEnum.java index 20fbe2d5a..07181c4cd 100644 --- a/fate-serving-common/src/main/java/com/webank/ai/fate/serving/common/health/HealthCheckItemEnum.java +++ b/fate-serving-common/src/main/java/com/webank/ai/fate/serving/common/health/HealthCheckItemEnum.java @@ -10,21 +10,20 @@ public enum HealthCheckItemEnum { CHECK_FATEFLOW_IN_ZK("check fateflow in zookeeper",HealthCheckComponent.SERVINGSERVER), CHECK_MODEL_LOADED("check model loaded",HealthCheckComponent.SERVINGSERVER); -// CHECK_MODEL_LOADED("check model loaded"), -// CHECK_MODEL_VALIDATE() + private String itemName; + private HealthCheckComponent component; - private String itemName; - private HealthCheckComponent component; - private HealthCheckItemEnum(String name,HealthCheckComponent healthCheckComponent ){ + HealthCheckItemEnum(String name, HealthCheckComponent healthCheckComponent){ this.component = healthCheckComponent; this.itemName= name; } public String getItemName(){ - return itemName; + return itemName; } + public HealthCheckComponent getComponent(){ - return this.component; + return this.component; } diff --git a/fate-serving-common/src/main/java/com/webank/ai/fate/serving/common/health/HealthCheckRecord.java b/fate-serving-common/src/main/java/com/webank/ai/fate/serving/common/health/HealthCheckRecord.java index a2e8d6608..2b8fe53ec 100644 --- a/fate-serving-common/src/main/java/com/webank/ai/fate/serving/common/health/HealthCheckRecord.java +++ b/fate-serving-common/src/main/java/com/webank/ai/fate/serving/common/health/HealthCheckRecord.java @@ -40,6 +40,7 @@ public HealthCheckRecord(String checkItemName, String msg, HealthCheckStatus hea this.healthCheckStatus = healthCheckStatus; } + @Override public String toString(){ return JsonUtil.object2Json(this); } diff --git a/fate-serving-common/src/main/java/com/webank/ai/fate/serving/common/health/HealthCheckResult.java b/fate-serving-common/src/main/java/com/webank/ai/fate/serving/common/health/HealthCheckResult.java index 489f2f5cc..847532f71 100644 --- a/fate-serving-common/src/main/java/com/webank/ai/fate/serving/common/health/HealthCheckResult.java +++ b/fate-serving-common/src/main/java/com/webank/ai/fate/serving/common/health/HealthCheckResult.java @@ -15,6 +15,7 @@ public void setRecords(List records) { List records = Lists.newArrayList(); + @Override public String toString(){ return JsonUtil.object2Json(this); } diff --git a/fate-serving-common/src/main/java/com/webank/ai/fate/serving/common/health/HealthCheckStatus.java b/fate-serving-common/src/main/java/com/webank/ai/fate/serving/common/health/HealthCheckStatus.java index d883d894f..32f9258bd 100644 --- a/fate-serving-common/src/main/java/com/webank/ai/fate/serving/common/health/HealthCheckStatus.java +++ b/fate-serving-common/src/main/java/com/webank/ai/fate/serving/common/health/HealthCheckStatus.java @@ -1,7 +1,28 @@ package com.webank.ai.fate.serving.common.health; public enum HealthCheckStatus { - ok, - warn, - error + /** + * 健康 + */ + ok("状态健康"), + + /** + * 异常 + */ + warn("状态异常"), + + /** + * 错误 + */ + error("状态错误"); + + private final String desc; + + HealthCheckStatus(String desc) { + this.desc = desc; + } + + public String getDesc() { + return desc; + } } diff --git a/fate-serving-common/src/main/java/com/webank/ai/fate/serving/common/health/HealthCheckUtil.java b/fate-serving-common/src/main/java/com/webank/ai/fate/serving/common/health/HealthCheckUtil.java index 38cc91dbd..095637d3c 100644 --- a/fate-serving-common/src/main/java/com/webank/ai/fate/serving/common/health/HealthCheckUtil.java +++ b/fate-serving-common/src/main/java/com/webank/ai/fate/serving/common/health/HealthCheckUtil.java @@ -59,9 +59,7 @@ public static String getPercentFormat(double d,int IntegerDigits,int FractionDig nf.setMinimumFractionDigits(FractionDigits);// 小数点后保留几位 - String str = nf.format(d); - - return str; + return nf.format(d); } diff --git a/fate-serving-common/src/main/java/com/webank/ai/fate/serving/common/model/Model.java b/fate-serving-common/src/main/java/com/webank/ai/fate/serving/common/model/Model.java index cbbeded73..49131f4a4 100644 --- a/fate-serving-common/src/main/java/com/webank/ai/fate/serving/common/model/Model.java +++ b/fate-serving-common/src/main/java/com/webank/ai/fate/serving/common/model/Model.java @@ -161,13 +161,14 @@ public int hashCode() { @Override public boolean equals(Object obj) { - if(obj!=null&&obj instanceof Model) { + if(obj instanceof Model) { Model model = (Model) obj; - if(this.namespace!=null&&this.namespace!=null) + if(this.namespace != null && this.tableName != null) { return this.namespace.equals(model.namespace) && this.tableName.equals(model.tableName); - else + } else { return false; - }else { + } + } else { return false; } } From f4141c5390ccafaec1efde735fca13537fd31661 Mon Sep 17 00:00:00 2001 From: hecy7 Date: Mon, 14 Aug 2023 15:01:32 +0800 Subject: [PATCH 05/35] Using BCryptPasswordEncoder in spring security to encrypt and decrypt login passwords Signed-off-by: hecy7 --- fate-serving-admin/pom.xml | 5 +++++ .../ai/fate/serving/admin/controller/LoginController.java | 5 +++-- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/fate-serving-admin/pom.xml b/fate-serving-admin/pom.xml index afaa2d95e..1863eee8e 100755 --- a/fate-serving-admin/pom.xml +++ b/fate-serving-admin/pom.xml @@ -51,6 +51,11 @@ ${fate.version} + + org.springframework.boot + spring-boot-starter-security + + @@ -68,7 +68,7 @@ generate-resources - install + install --force From 430c34b53c612103a8a345a21c99a333b421ee16 Mon Sep 17 00:00:00 2001 From: hecy7 Date: Tue, 22 Aug 2023 17:47:05 +0800 Subject: [PATCH 19/35] Update ParallelBatchToSingleFeatureAdaptor Signed-off-by: hecy7 --- .../ParallelBatchToSingleFeatureAdaptor.java | 85 +++++++++++++------ 1 file changed, 59 insertions(+), 26 deletions(-) diff --git a/fate-serving-extension/src/main/java/com/webank/ai/fate/serving/adaptor/dataaccess/ParallelBatchToSingleFeatureAdaptor.java b/fate-serving-extension/src/main/java/com/webank/ai/fate/serving/adaptor/dataaccess/ParallelBatchToSingleFeatureAdaptor.java index 6df016328..3582021a0 100644 --- a/fate-serving-extension/src/main/java/com/webank/ai/fate/serving/adaptor/dataaccess/ParallelBatchToSingleFeatureAdaptor.java +++ b/fate-serving-extension/src/main/java/com/webank/ai/fate/serving/adaptor/dataaccess/ParallelBatchToSingleFeatureAdaptor.java @@ -19,30 +19,53 @@ import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; import com.webank.ai.fate.serving.core.adaptor.SingleFeatureDataAdaptor; -import com.webank.ai.fate.serving.core.bean.BatchHostFeatureAdaptorResult; -import com.webank.ai.fate.serving.core.bean.BatchHostFederatedParams; -import com.webank.ai.fate.serving.core.bean.Context; -import com.webank.ai.fate.serving.core.bean.ReturnResult; +import com.webank.ai.fate.serving.core.bean.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.List; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; /** * 许多host方并未提供批量查询接口,这个类将批量请求拆分成单笔请求发送,再合结果 */ public class ParallelBatchToSingleFeatureAdaptor extends AbstractBatchFeatureDataAdaptor { + private static final Logger logger = LoggerFactory.getLogger(HttpAdapter.class); + int timeout; SingleFeatureDataAdaptor singleFeatureDataAdaptor; - ListeningExecutorService listeningExecutorService = MoreExecutors.listeningDecorator(null); + ListeningExecutorService listeningExecutorService; + + // 自定义Adapter初始化 + public ParallelBatchToSingleFeatureAdaptor(int core, int max, int timeout) { + initExecutor(core, max, timeout); + } + + // 默认Adapter初始化 + public ParallelBatchToSingleFeatureAdaptor() { + + // 默认线程池核心线程10 + int defaultCore = 10; + + // 默认线程池最大线程 100 + int defaultMax = 100; + + // 默认countDownLatch超时时间永远比grpc超时时间小 + timeout = MetaInfo.PROPERTY_GRPC_TIMEOUT.intValue() - 1; - public ParallelBatchToSingleFeatureAdaptor(int core, int max) { - new ThreadPoolExecutor(core, max, 1000, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(1000), new ThreadPoolExecutor.AbortPolicy()); + initExecutor(defaultCore, defaultMax, timeout); + } + + + private void initExecutor(int core, int max, int timeout) { + this.timeout = timeout; + + ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(core, max, 1000, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(1000), new ThreadPoolExecutor.AbortPolicy()); + + listeningExecutorService = MoreExecutors.listeningDecorator(threadPoolExecutor); } @Override @@ -56,37 +79,47 @@ public BatchHostFeatureAdaptorResult getFeatures(Context context, List Date: Wed, 30 Aug 2023 11:49:36 +0800 Subject: [PATCH 20/35] Duplicate variable initialization Signed-off-by: hecy7 --- .../src/main/java/com/webank/ai/fate/serving/Bootstrap.java | 1 - 1 file changed, 1 deletion(-) diff --git a/fate-serving-server/src/main/java/com/webank/ai/fate/serving/Bootstrap.java b/fate-serving-server/src/main/java/com/webank/ai/fate/serving/Bootstrap.java index 7a2ca5b9d..2cafc14a7 100644 --- a/fate-serving-server/src/main/java/com/webank/ai/fate/serving/Bootstrap.java +++ b/fate-serving-server/src/main/java/com/webank/ai/fate/serving/Bootstrap.java @@ -105,7 +105,6 @@ public static void parseConfig() { MetaInfo.PROPERTY_MODEL_CACHE_PATH = StringUtils.isNotBlank(environment.getProperty(Dict.PROPERTY_MODEL_CACHE_PATH)) ? environment.getProperty(Dict.PROPERTY_MODEL_CACHE_PATH) : MetaInfo.PROPERTY_ROOT_PATH; MetaInfo.PROPERTY_ACL_ENABLE = Boolean.valueOf(environment.getProperty(Dict.PROPERTY_ACL_ENABLE, "false")); MetaInfo.PROPERTY_MODEL_SYNC = Boolean.valueOf(environment.getProperty(Dict.PROPERTY_MODEL_SYNC, "false")); - MetaInfo.PROPERTY_MODEL_SYNC = Boolean.valueOf(environment.getProperty(Dict.PROPERTY_MODEL_SYNC, "false")); MetaInfo.PROPERTY_GRPC_TIMEOUT = Integer.valueOf(environment.getProperty(Dict.PROPERTY_GRPC_TIMEOUT, "5000")); MetaInfo.PROPERTY_ACL_USERNAME = environment.getProperty(Dict.PROPERTY_ACL_USERNAME); MetaInfo.PROPERTY_ACL_PASSWORD = environment.getProperty(Dict.PROPERTY_ACL_PASSWORD); From 88a2e681eaf4a6eed828522453fc5ec3573d3086 Mon Sep 17 00:00:00 2001 From: hecy7 Date: Mon, 4 Sep 2023 17:01:11 +0800 Subject: [PATCH 21/35] serving-server add model unbind rest interface Signed-off-by: hecy7 --- fate-serving-server/pom.xml | 10 +++ .../serving/controller/ServerController.java | 83 +++++++++++++++++++ 2 files changed, 93 insertions(+) create mode 100644 fate-serving-server/src/main/java/com/webank/ai/fate/serving/controller/ServerController.java diff --git a/fate-serving-server/pom.xml b/fate-serving-server/pom.xml index c513a98f9..eca0bc6be 100644 --- a/fate-serving-server/pom.xml +++ b/fate-serving-server/pom.xml @@ -102,6 +102,16 @@ protobuf-java-format 1.2 + + org.springframework.boot + spring-boot-starter-web + + + org.springframework.boot + spring-boot-starter-logging + + + diff --git a/fate-serving-server/src/main/java/com/webank/ai/fate/serving/controller/ServerController.java b/fate-serving-server/src/main/java/com/webank/ai/fate/serving/controller/ServerController.java new file mode 100644 index 000000000..cfee3e0c7 --- /dev/null +++ b/fate-serving-server/src/main/java/com/webank/ai/fate/serving/controller/ServerController.java @@ -0,0 +1,83 @@ +package com.webank.ai.fate.serving.controller; + +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.ListenableFuture; +import com.webank.ai.fate.api.mlmodel.manager.ModelServiceGrpc; +import com.webank.ai.fate.api.mlmodel.manager.ModelServiceProto; +import com.webank.ai.fate.serving.core.bean.GrpcConnectionPool; +import com.webank.ai.fate.serving.core.bean.MetaInfo; +import com.webank.ai.fate.serving.core.bean.RequestParamWrapper; +import com.webank.ai.fate.serving.core.bean.ReturnResult; +import com.webank.ai.fate.serving.core.exceptions.SysException; +import com.webank.ai.fate.serving.core.utils.NetUtils; +import io.grpc.ManagedChannel; +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Controller; +import org.springframework.web.bind.annotation.*; + +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; + +/** + * @author hcy + */ +@RestController +public class ServerController { + + Logger logger = LoggerFactory.getLogger(ServerController.class); + + GrpcConnectionPool grpcConnectionPool = GrpcConnectionPool.getPool(); + + @RequestMapping(value = "/server/model/unbind", method = RequestMethod.POST) + @ResponseBody + public Callable unbind(@RequestBody RequestParamWrapper requestParams) throws Exception { + return () -> { + String host = requestParams.getHost(); + Integer port = requestParams.getPort(); + String tableName = requestParams.getTableName(); + String namespace = requestParams.getNamespace(); + List serviceIds = requestParams.getServiceIds(); + + Preconditions.checkArgument(StringUtils.isNotBlank(tableName), "parameter tableName is blank"); + Preconditions.checkArgument(StringUtils.isNotBlank(namespace), "parameter namespace is blank"); + Preconditions.checkArgument(serviceIds != null && serviceIds.size() != 0, "parameter serviceId is blank"); + + ReturnResult result = new ReturnResult(); + + logger.info("unbind model by tableName and namespace, host: {}, port: {}, tableName: {}, namespace: {}", host, port, tableName, namespace); + + ModelServiceGrpc.ModelServiceFutureStub futureStub = getModelServiceFutureStub(host, port); + + ModelServiceProto.UnbindRequest unbindRequest = ModelServiceProto.UnbindRequest.newBuilder() + .setTableName(tableName) + .setNamespace(namespace) + .addAllServiceIds(serviceIds) + .build(); + + ListenableFuture future = futureStub.unbind(unbindRequest); + + ModelServiceProto.UnbindResponse response = future.get(MetaInfo.PROPERTY_GRPC_TIMEOUT, TimeUnit.MILLISECONDS); + + logger.info("response: {}", response); + + result.setRetcode(response.getStatusCode()); + result.setRetmsg(response.getMessage()); + return result; + }; + } + + private ModelServiceGrpc.ModelServiceFutureStub getModelServiceFutureStub(String host, Integer port) { + Preconditions.checkArgument(StringUtils.isNotBlank(host), "parameter host is blank"); + Preconditions.checkArgument(port != null && port != 0, "parameter port was wrong"); + + if (!NetUtils.isValidAddress(host + ":" + port)) { + throw new SysException("invalid address"); + } + + ManagedChannel managedChannel = grpcConnectionPool.getManagedChannel(host, port); + return ModelServiceGrpc.newFutureStub(managedChannel); + } +} From a10c5da112d0f57f6b3722e592be25cebb2ae3fb Mon Sep 17 00:00:00 2001 From: hecy7 Date: Tue, 5 Sep 2023 12:28:02 +0800 Subject: [PATCH 22/35] model transfer interface added In serving-server Signed-off-by: hecy7 --- ...roller.java => ServerModelController.java} | 44 ++++++++++++++++--- 1 file changed, 39 insertions(+), 5 deletions(-) rename fate-serving-server/src/main/java/com/webank/ai/fate/serving/controller/{ServerController.java => ServerModelController.java} (60%) diff --git a/fate-serving-server/src/main/java/com/webank/ai/fate/serving/controller/ServerController.java b/fate-serving-server/src/main/java/com/webank/ai/fate/serving/controller/ServerModelController.java similarity index 60% rename from fate-serving-server/src/main/java/com/webank/ai/fate/serving/controller/ServerController.java rename to fate-serving-server/src/main/java/com/webank/ai/fate/serving/controller/ServerModelController.java index cfee3e0c7..1300ff33f 100644 --- a/fate-serving-server/src/main/java/com/webank/ai/fate/serving/controller/ServerController.java +++ b/fate-serving-server/src/main/java/com/webank/ai/fate/serving/controller/ServerModelController.java @@ -14,7 +14,6 @@ import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.*; import java.util.List; @@ -25,9 +24,9 @@ * @author hcy */ @RestController -public class ServerController { +public class ServerModelController { - Logger logger = LoggerFactory.getLogger(ServerController.class); + Logger logger = LoggerFactory.getLogger(ServerModelController.class); GrpcConnectionPool grpcConnectionPool = GrpcConnectionPool.getPool(); @@ -47,7 +46,7 @@ public Callable unbind(@RequestBody RequestParamWrapper requestPar ReturnResult result = new ReturnResult(); - logger.info("unbind model by tableName and namespace, host: {}, port: {}, tableName: {}, namespace: {}", host, port, tableName, namespace); + logger.debug("unbind model by tableName and namespace, host: {}, port: {}, tableName: {}, namespace: {}", host, port, tableName, namespace); ModelServiceGrpc.ModelServiceFutureStub futureStub = getModelServiceFutureStub(host, port); @@ -61,7 +60,42 @@ public Callable unbind(@RequestBody RequestParamWrapper requestPar ModelServiceProto.UnbindResponse response = future.get(MetaInfo.PROPERTY_GRPC_TIMEOUT, TimeUnit.MILLISECONDS); - logger.info("response: {}", response); + logger.debug("response: {}", response); + + result.setRetcode(response.getStatusCode()); + result.setRetmsg(response.getMessage()); + return result; + }; + } + + @RequestMapping(value = "/server/model/transfer", method = RequestMethod.POST) + @ResponseBody + public Callable transfer(@RequestBody RequestParamWrapper requestParams) { + return () -> { + String host = requestParams.getHost(); + Integer port = requestParams.getPort(); + String tableName = requestParams.getTableName(); + String namespace = requestParams.getNamespace(); + + String targetHost = requestParams.getTargetHost(); + Integer targetPort = requestParams.getTargetPort(); + + Preconditions.checkArgument(StringUtils.isNotBlank(tableName), "parameter tableName is blank"); + Preconditions.checkArgument(StringUtils.isNotBlank(namespace), "parameter namespace is blank"); + + ReturnResult result = new ReturnResult(); + + logger.debug("transfer model by tableName and namespace, host: {}, port: {}, tableName: {}, namespace: {}, targetHost: {}, targetPort: {}" + , host, port, tableName, namespace, targetHost, targetPort); + + ModelServiceGrpc.ModelServiceFutureStub futureStub = getModelServiceFutureStub(targetHost, targetPort); + ModelServiceProto.FetchModelRequest fetchModelRequest = ModelServiceProto.FetchModelRequest.newBuilder() + .setNamespace(namespace).setTableName(tableName).setSourceIp(host).setSourcePort(port).build(); + + ListenableFuture future = futureStub.fetchModel(fetchModelRequest); + ModelServiceProto.FetchModelResponse response = future.get(MetaInfo.PROPERTY_GRPC_TIMEOUT, TimeUnit.MILLISECONDS); + + logger.debug("response: {}", response); result.setRetcode(response.getStatusCode()); result.setRetmsg(response.getMessage()); From ac0d1d28b8bb05c495afa1542368582528cc302f Mon Sep 17 00:00:00 2001 From: hecy7 Date: Tue, 5 Sep 2023 14:44:51 +0800 Subject: [PATCH 23/35] Add dynamic adjustment log interface To Admin Serving Proxy Signed-off-by: hecy7 --- .../controller/DynamicLogController.java | 41 +++++++++++++++++++ .../controller/DynamicLogController.java | 41 +++++++++++++++++++ .../controller/DynamicLogController.java | 41 +++++++++++++++++++ 3 files changed, 123 insertions(+) create mode 100644 fate-serving-admin/src/main/java/com/webank/ai/fate/serving/admin/controller/DynamicLogController.java create mode 100644 fate-serving-proxy/src/main/java/com/webank/ai/fate/serving/proxy/controller/DynamicLogController.java create mode 100644 fate-serving-server/src/main/java/com/webank/ai/fate/serving/controller/DynamicLogController.java diff --git a/fate-serving-admin/src/main/java/com/webank/ai/fate/serving/admin/controller/DynamicLogController.java b/fate-serving-admin/src/main/java/com/webank/ai/fate/serving/admin/controller/DynamicLogController.java new file mode 100644 index 000000000..a873a3040 --- /dev/null +++ b/fate-serving-admin/src/main/java/com/webank/ai/fate/serving/admin/controller/DynamicLogController.java @@ -0,0 +1,41 @@ +package com.webank.ai.fate.serving.admin.controller; + +import org.apache.logging.log4j.Level; +import org.apache.logging.log4j.core.LoggerContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.web.bind.annotation.*; + +/** + * @author hcy + */ +@RequestMapping("/admin") +@RestController +public class DynamicLogController { + private static final Logger logger = LoggerFactory.getLogger(DynamicLogController.class); + + @GetMapping("/alterSysLogLevel/{level}") + public String alterSysLogLevel(@PathVariable String level){ + try { + LoggerContext context = (LoggerContext) LoggerFactory.getILoggerFactory(); + context.getLogger("ROOT").setLevel(Level.valueOf(level)); + return "ok"; + } catch (Exception ex) { + logger.error("admin alterSysLogLevel failed : " + ex); + return "failed"; + } + + } + + @GetMapping("/alterPkgLogLevel") + public String alterPkgLogLevel(@RequestParam String level, @RequestParam String pkgName){ + try { + LoggerContext context = (LoggerContext) LoggerFactory.getILoggerFactory(); + context.getLogger(pkgName).setLevel(Level.valueOf(level)); + return "ok"; + } catch (Exception ex) { + logger.error("admin alterPkgLogLevel failed : " + ex); + return "failed"; + } + } +} diff --git a/fate-serving-proxy/src/main/java/com/webank/ai/fate/serving/proxy/controller/DynamicLogController.java b/fate-serving-proxy/src/main/java/com/webank/ai/fate/serving/proxy/controller/DynamicLogController.java new file mode 100644 index 000000000..8325da238 --- /dev/null +++ b/fate-serving-proxy/src/main/java/com/webank/ai/fate/serving/proxy/controller/DynamicLogController.java @@ -0,0 +1,41 @@ +package com.webank.ai.fate.serving.proxy.controller; + +import org.apache.logging.log4j.Level; +import org.apache.logging.log4j.core.LoggerContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.web.bind.annotation.*; + +/** + * @author hcy + */ +@RequestMapping("/proxy") +@RestController +public class DynamicLogController { + private static final Logger logger = LoggerFactory.getLogger(DynamicLogController.class); + + @GetMapping("/alterSysLogLevel/{level}") + public String alterSysLogLevel(@PathVariable String level){ + try { + LoggerContext context = (LoggerContext) LoggerFactory.getILoggerFactory(); + context.getLogger("ROOT").setLevel(Level.valueOf(level)); + return "ok"; + } catch (Exception ex) { + logger.error("proxy alterSysLogLevel failed : " + ex); + return "failed"; + } + + } + + @GetMapping("/alterPkgLogLevel") + public String alterPkgLogLevel(@RequestParam String level, @RequestParam String pkgName){ + try { + LoggerContext context = (LoggerContext) LoggerFactory.getILoggerFactory(); + context.getLogger(pkgName).setLevel(Level.valueOf(level)); + return "ok"; + } catch (Exception ex) { + logger.error("proxy alterPkgLogLevel failed : " + ex); + return "failed"; + } + } +} diff --git a/fate-serving-server/src/main/java/com/webank/ai/fate/serving/controller/DynamicLogController.java b/fate-serving-server/src/main/java/com/webank/ai/fate/serving/controller/DynamicLogController.java new file mode 100644 index 000000000..ca32c026a --- /dev/null +++ b/fate-serving-server/src/main/java/com/webank/ai/fate/serving/controller/DynamicLogController.java @@ -0,0 +1,41 @@ +package com.webank.ai.fate.serving.controller; + +import org.apache.logging.log4j.Level; +import org.apache.logging.log4j.core.LoggerContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.web.bind.annotation.*; + +/** + * @author hcy + */ +@RequestMapping("/server") +@RestController +public class DynamicLogController { + private static final Logger logger = LoggerFactory.getLogger(DynamicLogController.class); + + @GetMapping("/alterSysLogLevel/{level}") + public String alterSysLogLevel(@PathVariable String level){ + try { + LoggerContext context = (LoggerContext) LoggerFactory.getILoggerFactory(); + context.getLogger("ROOT").setLevel(Level.valueOf(level)); + return "ok"; + } catch (Exception ex) { + logger.error("server alterSysLogLevel failed : " + ex); + return "failed"; + } + + } + + @GetMapping("/alterPkgLogLevel") + public String alterPkgLogLevel(@RequestParam String level, @RequestParam String pkgName){ + try { + LoggerContext context = (LoggerContext) LoggerFactory.getILoggerFactory(); + context.getLogger(pkgName).setLevel(Level.valueOf(level)); + return "ok"; + } catch (Exception ex) { + logger.error("server alterPkgLogLevel failed : " + ex); + return "failed"; + } + } +} From 68ff8f0a4ded042a5f2de4a2222656acae898bd1 Mon Sep 17 00:00:00 2001 From: hecy7 Date: Tue, 5 Sep 2023 16:21:25 +0800 Subject: [PATCH 24/35] Add dynamic adjustment log interface To Admin Serving Proxy Signed-off-by: hecy7 --- .../fate/serving/admin/controller/DynamicLogController.java | 5 +++-- .../fate/serving/proxy/controller/DynamicLogController.java | 5 +++-- .../ai/fate/serving/controller/DynamicLogController.java | 5 +++-- 3 files changed, 9 insertions(+), 6 deletions(-) diff --git a/fate-serving-admin/src/main/java/com/webank/ai/fate/serving/admin/controller/DynamicLogController.java b/fate-serving-admin/src/main/java/com/webank/ai/fate/serving/admin/controller/DynamicLogController.java index a873a3040..3c65bb2f0 100644 --- a/fate-serving-admin/src/main/java/com/webank/ai/fate/serving/admin/controller/DynamicLogController.java +++ b/fate-serving-admin/src/main/java/com/webank/ai/fate/serving/admin/controller/DynamicLogController.java @@ -1,6 +1,7 @@ package com.webank.ai.fate.serving.admin.controller; import org.apache.logging.log4j.Level; +import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.core.LoggerContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -17,7 +18,7 @@ public class DynamicLogController { @GetMapping("/alterSysLogLevel/{level}") public String alterSysLogLevel(@PathVariable String level){ try { - LoggerContext context = (LoggerContext) LoggerFactory.getILoggerFactory(); + LoggerContext context = (LoggerContext) LogManager.getContext(false); context.getLogger("ROOT").setLevel(Level.valueOf(level)); return "ok"; } catch (Exception ex) { @@ -30,7 +31,7 @@ public String alterSysLogLevel(@PathVariable String level){ @GetMapping("/alterPkgLogLevel") public String alterPkgLogLevel(@RequestParam String level, @RequestParam String pkgName){ try { - LoggerContext context = (LoggerContext) LoggerFactory.getILoggerFactory(); + LoggerContext context = (LoggerContext) LogManager.getContext(false); context.getLogger(pkgName).setLevel(Level.valueOf(level)); return "ok"; } catch (Exception ex) { diff --git a/fate-serving-proxy/src/main/java/com/webank/ai/fate/serving/proxy/controller/DynamicLogController.java b/fate-serving-proxy/src/main/java/com/webank/ai/fate/serving/proxy/controller/DynamicLogController.java index 8325da238..738c9f41a 100644 --- a/fate-serving-proxy/src/main/java/com/webank/ai/fate/serving/proxy/controller/DynamicLogController.java +++ b/fate-serving-proxy/src/main/java/com/webank/ai/fate/serving/proxy/controller/DynamicLogController.java @@ -1,6 +1,7 @@ package com.webank.ai.fate.serving.proxy.controller; import org.apache.logging.log4j.Level; +import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.core.LoggerContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -17,7 +18,7 @@ public class DynamicLogController { @GetMapping("/alterSysLogLevel/{level}") public String alterSysLogLevel(@PathVariable String level){ try { - LoggerContext context = (LoggerContext) LoggerFactory.getILoggerFactory(); + LoggerContext context = (LoggerContext) LogManager.getContext(false); context.getLogger("ROOT").setLevel(Level.valueOf(level)); return "ok"; } catch (Exception ex) { @@ -30,7 +31,7 @@ public String alterSysLogLevel(@PathVariable String level){ @GetMapping("/alterPkgLogLevel") public String alterPkgLogLevel(@RequestParam String level, @RequestParam String pkgName){ try { - LoggerContext context = (LoggerContext) LoggerFactory.getILoggerFactory(); + LoggerContext context = (LoggerContext) LogManager.getContext(false); context.getLogger(pkgName).setLevel(Level.valueOf(level)); return "ok"; } catch (Exception ex) { diff --git a/fate-serving-server/src/main/java/com/webank/ai/fate/serving/controller/DynamicLogController.java b/fate-serving-server/src/main/java/com/webank/ai/fate/serving/controller/DynamicLogController.java index ca32c026a..ae1171099 100644 --- a/fate-serving-server/src/main/java/com/webank/ai/fate/serving/controller/DynamicLogController.java +++ b/fate-serving-server/src/main/java/com/webank/ai/fate/serving/controller/DynamicLogController.java @@ -1,6 +1,7 @@ package com.webank.ai.fate.serving.controller; import org.apache.logging.log4j.Level; +import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.core.LoggerContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -17,7 +18,7 @@ public class DynamicLogController { @GetMapping("/alterSysLogLevel/{level}") public String alterSysLogLevel(@PathVariable String level){ try { - LoggerContext context = (LoggerContext) LoggerFactory.getILoggerFactory(); + LoggerContext context = (LoggerContext) LogManager.getContext(false); context.getLogger("ROOT").setLevel(Level.valueOf(level)); return "ok"; } catch (Exception ex) { @@ -30,7 +31,7 @@ public String alterSysLogLevel(@PathVariable String level){ @GetMapping("/alterPkgLogLevel") public String alterPkgLogLevel(@RequestParam String level, @RequestParam String pkgName){ try { - LoggerContext context = (LoggerContext) LoggerFactory.getILoggerFactory(); + LoggerContext context = (LoggerContext) LogManager.getContext(false); context.getLogger(pkgName).setLevel(Level.valueOf(level)); return "ok"; } catch (Exception ex) { From 51e835786604557427d65301b17a72ade5e9a7a6 Mon Sep 17 00:00:00 2001 From: hecy7 Date: Wed, 6 Sep 2023 18:01:25 +0800 Subject: [PATCH 25/35] add service weight controller Signed-off-by: hecy7 --- .../controller/ServerServiceController.java | 90 +++++++++++++++++++ 1 file changed, 90 insertions(+) create mode 100644 fate-serving-server/src/main/java/com/webank/ai/fate/serving/controller/ServerServiceController.java diff --git a/fate-serving-server/src/main/java/com/webank/ai/fate/serving/controller/ServerServiceController.java b/fate-serving-server/src/main/java/com/webank/ai/fate/serving/controller/ServerServiceController.java new file mode 100644 index 000000000..84c0418c9 --- /dev/null +++ b/fate-serving-server/src/main/java/com/webank/ai/fate/serving/controller/ServerServiceController.java @@ -0,0 +1,90 @@ +package com.webank.ai.fate.serving.controller; + +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.ListenableFuture; +import com.webank.ai.fate.api.networking.common.CommonServiceGrpc; +import com.webank.ai.fate.api.networking.common.CommonServiceProto; +import com.webank.ai.fate.serving.core.bean.GrpcConnectionPool; +import com.webank.ai.fate.serving.core.bean.MetaInfo; +import com.webank.ai.fate.serving.core.bean.RequestParamWrapper; +import com.webank.ai.fate.serving.core.bean.ReturnResult; +import com.webank.ai.fate.serving.core.exceptions.SysException; +import com.webank.ai.fate.serving.core.utils.NetUtils; +import io.grpc.ManagedChannel; +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.web.bind.annotation.*; + +import java.util.concurrent.TimeUnit; + +/** + * @author hcy + */ +@RestController +public class ServerServiceController { + + Logger logger = LoggerFactory.getLogger(ServerServiceController.class); + + GrpcConnectionPool grpcConnectionPool = GrpcConnectionPool.getPool(); + + @RequestMapping(value = "/server/service/weight/update", method = RequestMethod.POST) + @ResponseBody + public ReturnResult updateService(@RequestBody RequestParamWrapper requestParams) throws Exception { + String host = requestParams.getHost(); + int port = requestParams.getPort(); + String url = requestParams.getUrl(); + String routerMode = requestParams.getRouterMode(); + Integer weight = requestParams.getWeight(); + Long version = requestParams.getVersion(); + + if (logger.isDebugEnabled()) { + logger.debug("try to update service"); + } + + Preconditions.checkArgument(StringUtils.isNotBlank(url), "parameter url is blank"); + + logger.info("update url: {}, routerMode: {}, weight: {}, version: {}", url, routerMode, weight, version); + + CommonServiceGrpc.CommonServiceFutureStub commonServiceFutureStub = getCommonServiceFutureStub(host, port); + CommonServiceProto.UpdateServiceRequest.Builder builder = CommonServiceProto.UpdateServiceRequest.newBuilder(); + + builder.setUrl(url); + if (StringUtils.isNotBlank(routerMode)) { + builder.setRouterMode(routerMode); + } + + if (weight != null) { + builder.setWeight(weight); + } else { + builder.setWeight(-1); + } + + if (version != null) { + builder.setVersion(version); + } else { + builder.setVersion(-1); + } + + ListenableFuture future = commonServiceFutureStub.updateService(builder.build()); + + CommonServiceProto.CommonResponse response = future.get(MetaInfo.PROPERTY_GRPC_TIMEOUT, TimeUnit.MILLISECONDS); + + ReturnResult result = new ReturnResult(); + result.setRetcode(response.getStatusCode()); + result.setRetmsg(response.getMessage()); + return result; + } + + private CommonServiceGrpc.CommonServiceFutureStub getCommonServiceFutureStub(String host, Integer port) { + Preconditions.checkArgument(StringUtils.isNotBlank(host), "parameter host is blank"); + Preconditions.checkArgument(port != null && port != 0, "parameter port was wrong"); + + if (!NetUtils.isValidAddress(host + ":" + port)) { + throw new SysException("invalid address"); + } + + ManagedChannel managedChannel = grpcConnectionPool.getManagedChannel(host, port); + return CommonServiceGrpc.newFutureStub(managedChannel); + } +} From 310d6a25a0e74742d1b303d6c42a2427f4f6f6fc Mon Sep 17 00:00:00 2001 From: hecy7 Date: Fri, 8 Sep 2023 15:00:59 +0800 Subject: [PATCH 26/35] =?UTF-8?q?=E6=96=B0=E5=A2=9EJVM=E4=B8=AD=E7=BA=BF?= =?UTF-8?q?=E7=A8=8BCPU=E5=8D=A0=E6=AF=94=E4=BF=A1=E6=81=AF=E6=95=B0?= =?UTF-8?q?=E6=8D=AE=E7=BB=9F=E8=AE=A1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: hecy7 --- .../ai/fate/serving/common/bean/ThreadVO.java | 146 +++++++++++++ .../serving/common/utils/JVMCPUUtils.java | 39 ++++ .../serving/common/utils/ThreadSample.java | 200 ++++++++++++++++++ .../serving/common/utils/ThreadUtils.java | 50 +++++ 4 files changed, 435 insertions(+) create mode 100644 fate-serving-common/src/main/java/com/webank/ai/fate/serving/common/bean/ThreadVO.java create mode 100644 fate-serving-common/src/main/java/com/webank/ai/fate/serving/common/utils/JVMCPUUtils.java create mode 100644 fate-serving-common/src/main/java/com/webank/ai/fate/serving/common/utils/ThreadSample.java create mode 100644 fate-serving-common/src/main/java/com/webank/ai/fate/serving/common/utils/ThreadUtils.java diff --git a/fate-serving-common/src/main/java/com/webank/ai/fate/serving/common/bean/ThreadVO.java b/fate-serving-common/src/main/java/com/webank/ai/fate/serving/common/bean/ThreadVO.java new file mode 100644 index 000000000..ac7029a4a --- /dev/null +++ b/fate-serving-common/src/main/java/com/webank/ai/fate/serving/common/bean/ThreadVO.java @@ -0,0 +1,146 @@ +package com.webank.ai.fate.serving.common.bean; + +import java.io.Serializable; +import java.util.Objects; + +/** + * @author hcy + */ +public class ThreadVO implements Serializable { + private static final long serialVersionUID = 0L; + + private long id; + private String name; + private String group; + private int priority; + private Thread.State state; + private double cpu; + private long deltaTime; + private long time; + private boolean interrupted; + private boolean daemon; + + public ThreadVO() { + } + + public long getId() { + return id; + } + + public void setId(long id) { + this.id = id; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public String getGroup() { + return group; + } + + public void setGroup(String group) { + this.group = group; + } + + public int getPriority() { + return priority; + } + + public void setPriority(int priority) { + this.priority = priority; + } + + public Thread.State getState() { + return state; + } + + public void setState(Thread.State state) { + this.state = state; + } + + public double getCpu() { + return cpu; + } + + public void setCpu(double cpu) { + this.cpu = cpu; + } + + public long getDeltaTime() { + return deltaTime; + } + + public void setDeltaTime(long deltaTime) { + this.deltaTime = deltaTime; + } + + public long getTime() { + return time; + } + + public void setTime(long time) { + this.time = time; + } + + public boolean isInterrupted() { + return interrupted; + } + + public void setInterrupted(boolean interrupted) { + this.interrupted = interrupted; + } + + public boolean isDaemon() { + return daemon; + } + + public void setDaemon(boolean daemon) { + this.daemon = daemon; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + + if (o == null || getClass() != o.getClass()) { + return false; + } + + ThreadVO threadVO = (ThreadVO) o; + + if (id != threadVO.id) { + return false; + } + return Objects.equals(name, threadVO.name); + } + + @Override + public int hashCode() { + int result = (int) (id ^ (id >>> 32)); + result = 31 * result + (name != null ? name.hashCode() : 0); + return result; + } + + @Override + public String toString() { + return "ThreadVO{" + + "id=" + id + + ", name='" + name + '\'' + + ", group='" + group + '\'' + + ", priority=" + priority + + ", state=" + state + + ", cpu=" + cpu + + ", deltaTime=" + deltaTime + + ", time=" + time + + ", interrupted=" + interrupted + + ", daemon=" + daemon + + '}'; + } +} diff --git a/fate-serving-common/src/main/java/com/webank/ai/fate/serving/common/utils/JVMCPUUtils.java b/fate-serving-common/src/main/java/com/webank/ai/fate/serving/common/utils/JVMCPUUtils.java new file mode 100644 index 000000000..4723dbba9 --- /dev/null +++ b/fate-serving-common/src/main/java/com/webank/ai/fate/serving/common/utils/JVMCPUUtils.java @@ -0,0 +1,39 @@ +package com.webank.ai.fate.serving.common.utils; + +import com.webank.ai.fate.serving.common.bean.ThreadVO; + +import java.util.*; + +/** + * @author hcy + */ +public class JVMCPUUtils { + + private static Set states = null; + + static { + states = new HashSet<>(Thread.State.values().length); + for (Thread.State state : Thread.State.values()) { + states.add(state.name()); + } + } + + public static List getThreadsState() { + + List threads = ThreadUtils.getThreads(); + + Collection resultThreads = new ArrayList<>(); + for (ThreadVO thread : threads) { + if (thread.getState() != null && states.contains(thread.getState().name())) { + resultThreads.add(thread); + } + } + + + ThreadSampler threadSampler = new ThreadSampler(); + threadSampler.setIncludeInternalThreads(true); + threadSampler.sample(resultThreads); + threadSampler.pause(1000); + return threadSampler.sample(resultThreads); + } +} diff --git a/fate-serving-common/src/main/java/com/webank/ai/fate/serving/common/utils/ThreadSample.java b/fate-serving-common/src/main/java/com/webank/ai/fate/serving/common/utils/ThreadSample.java new file mode 100644 index 000000000..4c9797182 --- /dev/null +++ b/fate-serving-common/src/main/java/com/webank/ai/fate/serving/common/utils/ThreadSample.java @@ -0,0 +1,200 @@ +package com.webank.ai.fate.serving.common.utils; + + +import com.webank.ai.fate.serving.common.bean.ThreadVO; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import sun.management.HotspotThreadMBean; +import sun.management.ManagementFactoryHelper; + +import java.lang.management.ManagementFactory; +import java.lang.management.ThreadMXBean; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * @author hcy + */ + +class ThreadSampler { + private static Logger logger = LoggerFactory.getLogger(ThreadSampler.class); + private static ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean(); + private static HotspotThreadMBean hotspotThreadMBean; + private static boolean hotspotThreadMBeanEnable = true; + + private Map lastCpuTimes = new HashMap(); + + private long lastSampleTimeNanos; + private boolean includeInternalThreads = true; + + + public List sample(Collection originThreads) { + + List threads = new ArrayList(originThreads); + + // Sample CPU + if (lastCpuTimes.isEmpty()) { + lastSampleTimeNanos = System.nanoTime(); + for (ThreadVO thread : threads) { + if (thread.getId() > 0) { + long cpu = threadMXBean.getThreadCpuTime(thread.getId()); + lastCpuTimes.put(thread, cpu); + thread.setTime(cpu / 1000000); + } + } + + // add internal threads + Map internalThreadCpuTimes = getInternalThreadCpuTimes(); + if (internalThreadCpuTimes != null) { + for (Map.Entry entry : internalThreadCpuTimes.entrySet()) { + String key = entry.getKey(); + ThreadVO thread = createThreadVO(key); + thread.setTime(entry.getValue() / 1000000); + threads.add(thread); + lastCpuTimes.put(thread, entry.getValue()); + } + } + + //sort by time + Collections.sort(threads, new Comparator() { + @Override + public int compare(ThreadVO o1, ThreadVO o2) { + long l1 = o1.getTime(); + long l2 = o2.getTime(); + if (l1 < l2) { + return 1; + } else if (l1 > l2) { + return -1; + } else { + return 0; + } + } + }); + return threads; + } + + // Resample + long newSampleTimeNanos = System.nanoTime(); + Map newCpuTimes = new HashMap(threads.size()); + for (ThreadVO thread : threads) { + if (thread.getId() > 0) { + long cpu = threadMXBean.getThreadCpuTime(thread.getId()); + newCpuTimes.put(thread, cpu); + } + } + // internal threads + Map newInternalThreadCpuTimes = getInternalThreadCpuTimes(); + if (newInternalThreadCpuTimes != null) { + for (Map.Entry entry : newInternalThreadCpuTimes.entrySet()) { + ThreadVO threadVO = createThreadVO(entry.getKey()); + threads.add(threadVO); + newCpuTimes.put(threadVO, entry.getValue()); + } + } + + // Compute delta time + final Map deltas = new HashMap(threads.size()); + for (ThreadVO thread : newCpuTimes.keySet()) { + Long t = lastCpuTimes.get(thread); + if (t == null) { + t = 0L; + } + long time1 = t; + long time2 = newCpuTimes.get(thread); + if (time1 == -1) { + time1 = time2; + } else if (time2 == -1) { + time2 = time1; + } + long delta = time2 - time1; + deltas.put(thread, delta); + } + + long sampleIntervalNanos = newSampleTimeNanos - lastSampleTimeNanos; + + // Compute cpu usage + final HashMap cpuUsages = new HashMap(threads.size()); + for (ThreadVO thread : threads) { + double cpu = sampleIntervalNanos == 0 ? 0 : (Math.rint(deltas.get(thread) * 10000.0 / sampleIntervalNanos) / 100.0); + cpuUsages.put(thread, cpu); + } + + // Sort by CPU time : should be a rendering hint... + Collections.sort(threads, new Comparator() { + @Override + public int compare(ThreadVO o1, ThreadVO o2) { + long l1 = deltas.get(o1); + long l2 = deltas.get(o2); + if (l1 < l2) { + return 1; + } else if (l1 > l2) { + return -1; + } else { + return 0; + } + } + }); + + for (ThreadVO thread : threads) { + //nanos to mills + long timeMills = newCpuTimes.get(thread) / 1000000; + long deltaTime = deltas.get(thread) / 1000000; + double cpu = cpuUsages.get(thread); + + thread.setCpu(cpu); + thread.setTime(timeMills); + thread.setDeltaTime(deltaTime); + } + lastCpuTimes = newCpuTimes; + lastSampleTimeNanos = newSampleTimeNanos; + + return threads; + } + + private Map getInternalThreadCpuTimes() { + if (hotspotThreadMBeanEnable && includeInternalThreads) { + try { + if (hotspotThreadMBean == null) { + hotspotThreadMBean = ManagementFactoryHelper.getHotspotThreadMBean(); + } + return hotspotThreadMBean.getInternalThreadCpuTimes(); + } catch (Exception ex) { + logger.error("getInternalThreadCpuTimes failed Cause : " + ex); + hotspotThreadMBeanEnable = false; + } + } + return null; + } + + private ThreadVO createThreadVO(String name) { + ThreadVO threadVO = new ThreadVO(); + threadVO.setId(-1); + threadVO.setName(name); + threadVO.setPriority(-1); + threadVO.setDaemon(true); + threadVO.setInterrupted(false); + return threadVO; + } + + public void pause(long mills) { + try { + Thread.sleep(mills); + } catch (InterruptedException e) { + logger.error("pause failed Cause : " + e); + } + } + + public boolean isIncludeInternalThreads() { + return includeInternalThreads; + } + + public void setIncludeInternalThreads(boolean includeInternalThreads) { + this.includeInternalThreads = includeInternalThreads; + } +} + diff --git a/fate-serving-common/src/main/java/com/webank/ai/fate/serving/common/utils/ThreadUtils.java b/fate-serving-common/src/main/java/com/webank/ai/fate/serving/common/utils/ThreadUtils.java new file mode 100644 index 000000000..d9e74c4ea --- /dev/null +++ b/fate-serving-common/src/main/java/com/webank/ai/fate/serving/common/utils/ThreadUtils.java @@ -0,0 +1,50 @@ +package com.webank.ai.fate.serving.common.utils; + +import com.webank.ai.fate.serving.common.bean.ThreadVO; + +import java.util.ArrayList; +import java.util.List; + +/** + * @author hcy + */ +public class ThreadUtils { + + private static ThreadGroup getRoot() { + ThreadGroup group = Thread.currentThread().getThreadGroup(); + ThreadGroup parent; + while ((parent = group.getParent()) != null) { + group = parent; + } + return group; + } + + public static List getThreads() { + ThreadGroup root = getRoot(); + Thread[] threads = new Thread[root.activeCount()]; + while (root.enumerate(threads, true) == threads.length) { + threads = new Thread[threads.length * 2]; + } + List list = new ArrayList(threads.length); + for (Thread thread : threads) { + if (thread != null) { + ThreadVO threadVO = createThreadVO(thread); + list.add(threadVO); + } + } + return list; + } + + private static ThreadVO createThreadVO(Thread thread) { + ThreadGroup group = thread.getThreadGroup(); + ThreadVO threadVO = new ThreadVO(); + threadVO.setId(thread.getId()); + threadVO.setName(thread.getName()); + threadVO.setGroup(group == null ? "" : group.getName()); + threadVO.setPriority(thread.getPriority()); + threadVO.setState(thread.getState()); + threadVO.setInterrupted(thread.isInterrupted()); + threadVO.setDaemon(thread.isDaemon()); + return threadVO; + } +} From 743c0bcdb004c9bee2f1d4354d5231c92cc99f80 Mon Sep 17 00:00:00 2001 From: hecy7 Date: Wed, 13 Sep 2023 10:18:02 +0800 Subject: [PATCH 27/35] logger content bugfix Signed-off-by: hecy7 --- .../java/com/webank/ai/fate/serving/model/ModelManager.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fate-serving-server/src/main/java/com/webank/ai/fate/serving/model/ModelManager.java b/fate-serving-server/src/main/java/com/webank/ai/fate/serving/model/ModelManager.java index e31589962..9d59217c5 100644 --- a/fate-serving-server/src/main/java/com/webank/ai/fate/serving/model/ModelManager.java +++ b/fate-serving-server/src/main/java/com/webank/ai/fate/serving/model/ModelManager.java @@ -78,7 +78,7 @@ public synchronized ModelServiceProto.UnbindResponse unbind(Context context, Mod String modelKey = this.getNameSpaceKey(req.getTableName(), req.getNamespace()); if (!this.namespaceMap.containsKey(modelKey)) { logger.error("not found model info table name {} namespace {}, please check if the model is already loaded.", req.getTableName(), req.getNamespace()); - throw new ModelNullException(" found model info, please check if the model is already loaded."); + throw new ModelNullException("not found model info, please check if the model is already loaded."); } Model model = this.namespaceMap.get(modelKey); String tableNamekey = this.getNameSpaceKey(model.getTableName(), model.getNamespace()); From ddb43d61d4182a7a56f2eb3797309ef7feb79f34 Mon Sep 17 00:00:00 2001 From: hecy7 Date: Fri, 15 Sep 2023 18:47:41 +0800 Subject: [PATCH 28/35] =?UTF-8?q?=E9=87=87=E7=94=A8logger=E5=86=99?= =?UTF-8?q?=E6=97=A5=E5=BF=97?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: hecy7 --- .../ai/fate/register/zookeeper/ZookeeperRegistry.java | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/fate-serving-register/src/main/java/com/webank/ai/fate/register/zookeeper/ZookeeperRegistry.java b/fate-serving-register/src/main/java/com/webank/ai/fate/register/zookeeper/ZookeeperRegistry.java index c0d9b73a0..2afd9f64f 100644 --- a/fate-serving-register/src/main/java/com/webank/ai/fate/register/zookeeper/ZookeeperRegistry.java +++ b/fate-serving-register/src/main/java/com/webank/ai/fate/register/zookeeper/ZookeeperRegistry.java @@ -162,14 +162,15 @@ public boolean tryUnregister(URL url) { boolean exists = client.checkExists(toUrlPath(url)); String urlPath = toUrlPath(url); if (exists) { - System.err.println("delete zk path "+urlPath); + if (logger.isDebugEnabled()) { + logger.debug("delete zk path " + urlPath); + } zkClient.delete(toUrlPath(url)); registedString.remove(url.getServiceInterface() + url.getEnvironment()); syncServiceCacheFile(); return true; - } - else{ - System.err.println(urlPath +"is not exist"); + } else { + logger.error(urlPath + " is not exist"); } } catch (Throwable e) { throw new RuntimeException("Failed to unregister " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e); From dcf024a77fd99576841f3991c0c1191ccfa785cd Mon Sep 17 00:00:00 2001 From: hecy7 Date: Thu, 21 Sep 2023 15:18:32 +0800 Subject: [PATCH 29/35] =?UTF-8?q?=E4=BC=98=E5=8C=96httpAdapter=E5=AE=9E?= =?UTF-8?q?=E7=8E=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: hecy7 --- .../serving/adaptor/dataaccess/HttpAdapter.java | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/fate-serving-extension/src/main/java/com/webank/ai/fate/serving/adaptor/dataaccess/HttpAdapter.java b/fate-serving-extension/src/main/java/com/webank/ai/fate/serving/adaptor/dataaccess/HttpAdapter.java index 9fbc34dec..328da9a00 100644 --- a/fate-serving-extension/src/main/java/com/webank/ai/fate/serving/adaptor/dataaccess/HttpAdapter.java +++ b/fate-serving-extension/src/main/java/com/webank/ai/fate/serving/adaptor/dataaccess/HttpAdapter.java @@ -16,6 +16,7 @@ package com.webank.ai.fate.serving.adaptor.dataaccess; +import com.fasterxml.jackson.databind.ObjectMapper; import com.webank.ai.fate.serving.common.utils.HttpAdapterClientPool; import com.webank.ai.fate.serving.core.bean.*; import com.webank.ai.fate.serving.core.constant.StatusCode; @@ -30,6 +31,7 @@ public class HttpAdapter extends AbstractSingleFeatureDataAdaptor { private final static String HTTP_ADAPTER_URL = MetaInfo.PROPERTY_HTTP_ADAPTER_URL; + private static final ObjectMapper objectMapper = new ObjectMapper(); @Override public void init() { environment.getProperty("port"); @@ -44,13 +46,18 @@ public ReturnResult getData(Context context, Map featureIds) { responseResult = HttpAdapterClientPool.doPost(HTTP_ADAPTER_URL, featureIds); int responseCode = responseResult.getCode(); switch (responseCode) { - case HttpAdapterResponseCodeEnum.SUCCESS_CODE: - if (responseResult.getData() == null || responseResult.getData().size() == 0) { + case HttpAdapterResponseCodeEnum.COMMON_HTTP_SUCCESS_CODE: + Map responseResultData = responseResult.getData(); + if (responseResultData == null || responseResultData.size() == 0) { returnResult.setRetcode(StatusCode.FEATURE_DATA_ADAPTOR_ERROR); returnResult.setRetmsg("responseData is null "); + } else if (!responseResultData.get("code").equals(HttpAdapterResponseCodeEnum.SUCCESS_CODE)) { + returnResult.setRetcode(StatusCode.FEATURE_DATA_ADAPTOR_ERROR); + returnResult.setRetmsg("responseData is : " + objectMapper.writeValueAsString(responseResultData.get("data"))); } else { + ((Map)responseResultData.get("data")).remove("code"); returnResult.setRetcode(StatusCode.SUCCESS); - returnResult.setData(responseResult.getData()); + returnResult.setData(responseResultData); } break; From 5798a49c94ed668019b47155ad454cbd56a6ee1d Mon Sep 17 00:00:00 2001 From: hecy7 Date: Thu, 21 Sep 2023 15:36:25 +0800 Subject: [PATCH 30/35] =?UTF-8?q?=E6=9B=B4=E6=96=B0httpAdapter=E5=AF=B9?= =?UTF-8?q?=E5=BA=94md=E8=AF=B4=E6=98=8E=E6=96=87=E4=BB=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: hecy7 --- document/docs/service/adapter.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/document/docs/service/adapter.md b/document/docs/service/adapter.md index b50aab49c..7e9b52684 100644 --- a/document/docs/service/adapter.md +++ b/document/docs/service/adapter.md @@ -69,7 +69,7 @@ x0:1,x1:5,x2:13,x3:58,x4:95,x5:352,x6:418,x7:833,x8:888,x9:937,x10:32776 #### HttpAdapter 在serving-server.properties文件中配置属性feature.single.adaptor和http.adapter.url,feature.single.adaptor为继承AbstractSingleFeatureDataAdaptor -接口,url为调用获取数据接口地址。 +接口,url为调用获取数据接口地址。http.adapter.url中标明的用户接口,返回格式请定义为 {"code": 200, "data": xxx}标准格式即可,httpAdapter中会根据接口返回状态码是否为200判断用户数据拉取接口是否执行成功。 ```yaml feature.single.adaptor=com.webank.ai.fate.serving.adaptor.dataaccess.HttpAdapter http.adapter.url=http://127.0.0.1:9380/v1/http/adapter/getFeature From 0464061ad2cf34e3e7beb40ae39376fcd57cea92 Mon Sep 17 00:00:00 2001 From: hecy7 Date: Wed, 27 Sep 2023 16:18:24 +0800 Subject: [PATCH 31/35] =?UTF-8?q?=E6=89=B9=E9=87=8F=E6=8E=A8=E7=90=86?= =?UTF-8?q?=E5=8A=9F=E8=83=BD=E5=AE=8C=E5=96=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: hecy7 --- document/docs/service/adapter.md | 2 ++ .../webank/ai/fate/serving/core/bean/Dict.java | 1 + .../ai/fate/serving/core/bean/MetaInfo.java | 1 + .../ParallelBatchToSingleFeatureAdaptor.java | 18 ++++++++++++++++++ .../com/webank/ai/fate/serving/Bootstrap.java | 1 + .../main/resources/serving-server.properties | 1 + 6 files changed, 24 insertions(+) diff --git a/document/docs/service/adapter.md b/document/docs/service/adapter.md index 7e9b52684..988ba1895 100644 --- a/document/docs/service/adapter.md +++ b/document/docs/service/adapter.md @@ -40,8 +40,10 @@ Context为上下文信息,用于传递请求所需参数,featureIds用于传 #在host方的配置文件serving-server.properties中将其配置成自定义的类的全路径,如下所示 feature.single.adaptor=com.webank.ai.fate.serving.adaptor.dataaccess.CustomAdapter feature.batch.adaptor=com.webank.ai.fate.serving.adaptor.dataaccess.CustomBatchAdapter +feature.batch.single.adatpor=com.webank.ai.fate.serving.adaptor.dataaccess.CustomAdapter ``` 可以根据需要实现Adapter中的逻辑,并修改serving-server.properties中feature.single.adaptor或feature.batch.adaptor配置项为新增Adapter的全类名即可。可以参考源码中的MockAdaptor +注: feature.batch.single.adatpor与feature.batch.adatpor配套使用,feature.batch.single.adatpor可根据用户场景自行实现,fate-serving中目前支持httpAdaptor ## fate-serving-extension 为了更好的代码解耦合,代码中将自定义adapter分离到fate-serving-extension模块中。用户可在此模块中开发自定义的adapter。 diff --git a/fate-serving-core/src/main/java/com/webank/ai/fate/serving/core/bean/Dict.java b/fate-serving-core/src/main/java/com/webank/ai/fate/serving/core/bean/Dict.java index c25a5f51d..eca8a5439 100644 --- a/fate-serving-core/src/main/java/com/webank/ai/fate/serving/core/bean/Dict.java +++ b/fate-serving-core/src/main/java/com/webank/ai/fate/serving/core/bean/Dict.java @@ -117,6 +117,7 @@ public class Dict { public static final String PROPERTY_MODEL_SYNC = "model.synchronize"; public static final String PROPERTY_SERVING_MAX_POOL_SIZE = "serving.max.pool.size"; public static final String PROPERTY_FEATURE_BATCH_ADAPTOR = "feature.batch.adaptor"; + public static final String PROPERTY_FEATURE_BATCH_SINGLE_ADAPTOR = "feature.batch.single.adaptor"; public static final String PROPERTY_ACL_ENABLE = "acl.enable"; public static final String PROPERTY_ACL_USERNAME = "acl.username"; public static final String PROPERTY_ACL_PASSWORD = "acl.password"; diff --git a/fate-serving-core/src/main/java/com/webank/ai/fate/serving/core/bean/MetaInfo.java b/fate-serving-core/src/main/java/com/webank/ai/fate/serving/core/bean/MetaInfo.java index a6184201b..2d33c9730 100644 --- a/fate-serving-core/src/main/java/com/webank/ai/fate/serving/core/bean/MetaInfo.java +++ b/fate-serving-core/src/main/java/com/webank/ai/fate/serving/core/bean/MetaInfo.java @@ -38,6 +38,7 @@ public class MetaInfo { public static Boolean PROPERTY_USE_REGISTER; public static Boolean PROPERTY_USE_ZK_ROUTER; public static String PROPERTY_FEATURE_BATCH_ADAPTOR; + public static String PROPERTY_FETTURE_BATCH_SINGLE_ADAPTOR; public static Integer PROPERTY_BATCH_INFERENCE_MAX; public static String PROPERTY_FEATURE_SINGLE_ADAPTOR; public static Integer PROPERTY_SINGLE_INFERENCE_RPC_TIMEOUT; diff --git a/fate-serving-extension/src/main/java/com/webank/ai/fate/serving/adaptor/dataaccess/ParallelBatchToSingleFeatureAdaptor.java b/fate-serving-extension/src/main/java/com/webank/ai/fate/serving/adaptor/dataaccess/ParallelBatchToSingleFeatureAdaptor.java index 3582021a0..11e9b476f 100644 --- a/fate-serving-extension/src/main/java/com/webank/ai/fate/serving/adaptor/dataaccess/ParallelBatchToSingleFeatureAdaptor.java +++ b/fate-serving-extension/src/main/java/com/webank/ai/fate/serving/adaptor/dataaccess/ParallelBatchToSingleFeatureAdaptor.java @@ -20,6 +20,8 @@ import com.google.common.util.concurrent.MoreExecutors; import com.webank.ai.fate.serving.core.adaptor.SingleFeatureDataAdaptor; import com.webank.ai.fate.serving.core.bean.*; +import com.webank.ai.fate.serving.core.utils.InferenceUtils; +import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -61,6 +63,22 @@ public ParallelBatchToSingleFeatureAdaptor() { private void initExecutor(int core, int max, int timeout) { + SingleFeatureDataAdaptor singleFeatureDataAdaptor = null; + String adaptorClass = MetaInfo.PROPERTY_FETTURE_BATCH_SINGLE_ADAPTOR; + if (StringUtils.isNotEmpty(adaptorClass)) { + logger.info("try to load single adaptor for ParallelBatchToSingleFeatureAdaptor {}", adaptorClass); + singleFeatureDataAdaptor = (SingleFeatureDataAdaptor) InferenceUtils.getClassByName(adaptorClass); + } + + if (singleFeatureDataAdaptor != null) { + String implementationClass = singleFeatureDataAdaptor.getClass().getName(); + logger.info("SingleFeatureDataAdaptor implementation class: " + implementationClass); + } else { + logger.warn("SingleFeatureDataAdaptor is null."); + } + + this.singleFeatureDataAdaptor = singleFeatureDataAdaptor; + this.timeout = timeout; ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(core, max, 1000, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(1000), new ThreadPoolExecutor.AbortPolicy()); diff --git a/fate-serving-server/src/main/java/com/webank/ai/fate/serving/Bootstrap.java b/fate-serving-server/src/main/java/com/webank/ai/fate/serving/Bootstrap.java index 2cafc14a7..3da2d95dd 100644 --- a/fate-serving-server/src/main/java/com/webank/ai/fate/serving/Bootstrap.java +++ b/fate-serving-server/src/main/java/com/webank/ai/fate/serving/Bootstrap.java @@ -77,6 +77,7 @@ public static void parseConfig() { MetaInfo.PROPERTY_SERVING_POOL_ALIVE_TIME = environment.getProperty(Dict.PROPERTY_SERVING_POOL_ALIVE_TIME) != null ? Integer.valueOf(environment.getProperty(Dict.PROPERTY_SERVING_POOL_ALIVE_TIME)) : 1000; MetaInfo.PROPERTY_SERVING_POOL_QUEUE_SIZE = environment.getProperty(Dict.PROPERTY_SERVING_POOL_QUEUE_SIZE) != null ? Integer.valueOf(environment.getProperty(Dict.PROPERTY_SERVING_POOL_QUEUE_SIZE)) : 100; MetaInfo.PROPERTY_FEATURE_BATCH_ADAPTOR = environment.getProperty(Dict.PROPERTY_FEATURE_BATCH_ADAPTOR); + MetaInfo.PROPERTY_FETTURE_BATCH_SINGLE_ADAPTOR = environment.getProperty(Dict.PROPERTY_FEATURE_BATCH_SINGLE_ADAPTOR); MetaInfo.PROPERTY_BATCH_INFERENCE_MAX = environment.getProperty(Dict.PROPERTY_BATCH_INFERENCE_MAX) != null ? Integer.valueOf(environment.getProperty(Dict.PROPERTY_BATCH_INFERENCE_MAX)) : 300; MetaInfo.PROPERTY_REMOTE_MODEL_INFERENCE_RESULT_CACHE_SWITCH = environment.getProperty(Dict.PROPERTY_REMOTE_MODEL_INFERENCE_RESULT_CACHE_SWITCH) != null ? Boolean.valueOf(environment.getProperty(Dict.PROPERTY_REMOTE_MODEL_INFERENCE_RESULT_CACHE_SWITCH)) : Boolean.FALSE; MetaInfo.PROPERTY_SINGLE_INFERENCE_RPC_TIMEOUT = environment.getProperty(Dict.PROPERTY_SINGLE_INFERENCE_RPC_TIMEOUT) != null ? Integer.valueOf(environment.getProperty(Dict.PROPERTY_SINGLE_INFERENCE_RPC_TIMEOUT)) : 3000; diff --git a/fate-serving-server/src/main/resources/serving-server.properties b/fate-serving-server/src/main/resources/serving-server.properties index 8710309a4..e4c86932d 100644 --- a/fate-serving-server/src/main/resources/serving-server.properties +++ b/fate-serving-server/src/main/resources/serving-server.properties @@ -39,6 +39,7 @@ port=8000 # adapter feature.single.adaptor=com.webank.ai.fate.serving.adaptor.dataaccess.MockAdapter feature.batch.adaptor=com.webank.ai.fate.serving.adaptor.dataaccess.MockBatchAdapter +feature.batch.single.adaptor=com.webank.ai.fate.serving.adaptor.dataaccess.HttpAdapter http.adapter.url=http://127.0.0.1:9380/v1/http/adapter/getFeature # model transfer model.transfer.url=http://127.0.0.1:9380/v1/model/transfer From eff34bee8a75ce2d8abf5eb7a871f31679439d20 Mon Sep 17 00:00:00 2001 From: hecy7 Date: Thu, 12 Oct 2023 16:42:23 +0800 Subject: [PATCH 32/35] =?UTF-8?q?=E7=BA=B5=E5=90=91=E6=97=A0=E6=8D=9F?= =?UTF-8?q?=E5=86=B3=E7=AD=96=E6=A0=91=E6=8E=A8=E7=90=86=E6=8A=A5=E9=94=99?= =?UTF-8?q?fix?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: hecy7 --- .../ai/fate/serving/federatedml/model/HeteroSecureBoost.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fate-serving-federatedml/src/main/java/com/webank/ai/fate/serving/federatedml/model/HeteroSecureBoost.java b/fate-serving-federatedml/src/main/java/com/webank/ai/fate/serving/federatedml/model/HeteroSecureBoost.java index 1487b96ed..035c7e53b 100644 --- a/fate-serving-federatedml/src/main/java/com/webank/ai/fate/serving/federatedml/model/HeteroSecureBoost.java +++ b/fate-serving-federatedml/src/main/java/com/webank/ai/fate/serving/federatedml/model/HeteroSecureBoost.java @@ -73,7 +73,7 @@ public int initModel(byte[] protoMeta, byte[] protoParam) { protected String getSite(int treeId, int treeNodeId) { String siteName = this.trees.get(treeId).getTree(treeNodeId).getSitename(); - if(siteName!=null&&":".indexOf(siteName)!=0){ + if(siteName != null && siteName.contains(":")){ return siteName.split(":")[1]; }else{ return siteName; From e80cedd787aace1d3c998be9c091735ae9564857 Mon Sep 17 00:00:00 2001 From: hecy7 Date: Fri, 20 Oct 2023 11:07:13 +0800 Subject: [PATCH 33/35] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E9=80=9A=E7=94=A8http?= =?UTF-8?q?=E5=93=8D=E5=BA=94=E6=88=90=E5=8A=9F=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: hecy7 --- .../serving/core/bean/HttpAdapterResponseCodeEnum.java | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/fate-serving-core/src/main/java/com/webank/ai/fate/serving/core/bean/HttpAdapterResponseCodeEnum.java b/fate-serving-core/src/main/java/com/webank/ai/fate/serving/core/bean/HttpAdapterResponseCodeEnum.java index 890b2086a..3923ed191 100644 --- a/fate-serving-core/src/main/java/com/webank/ai/fate/serving/core/bean/HttpAdapterResponseCodeEnum.java +++ b/fate-serving-core/src/main/java/com/webank/ai/fate/serving/core/bean/HttpAdapterResponseCodeEnum.java @@ -1,12 +1,18 @@ package com.webank.ai.fate.serving.core.bean; +/** + * @author hcy + */ public class HttpAdapterResponseCodeEnum{ + // 通用http响应成功码 + public static final int COMMON_HTTP_SUCCESS_CODE = 0; + //正常 - public final static int SUCCESS_CODE = 200; + public static final int SUCCESS_CODE = 200; //查询无果 - public final static int ERROR_CODE = 404; + public static final int ERROR_CODE = 404; } From f4c4965d4285f3de6df24792c15cc7783673af0b Mon Sep 17 00:00:00 2001 From: forgive_dengkai Date: Fri, 20 Oct 2023 18:33:53 +0800 Subject: [PATCH 34/35] change version Signed-off-by: forgive_dengkai --- fate-serving-admin/bin/service.sh | 2 +- fate-serving-common/pom.xml | 2 +- fate-serving-proxy/bin/service.sh | 2 +- fate-serving-proxy/pom.xml | 2 +- fate-serving-server/bin/service.sh | 2 +- fate-serving-server/pom.xml | 2 +- pom.xml | 12 ++++++------ 7 files changed, 12 insertions(+), 12 deletions(-) diff --git a/fate-serving-admin/bin/service.sh b/fate-serving-admin/bin/service.sh index 71a244833..bc11abd1b 100644 --- a/fate-serving-admin/bin/service.sh +++ b/fate-serving-admin/bin/service.sh @@ -24,7 +24,7 @@ basepath=$(cd `dirname $0`;pwd) configpath=$(cd $basepath/conf;pwd) module=serving-admin main_class=com.webank.ai.fate.serving.admin.Bootstrap -module_version=2.1.6 +module_version=2.1.7 case "$1" in start) diff --git a/fate-serving-common/pom.xml b/fate-serving-common/pom.xml index cddc0cd1d..019e42eb1 100644 --- a/fate-serving-common/pom.xml +++ b/fate-serving-common/pom.xml @@ -57,7 +57,7 @@ commons-net commons-net - 3.8.0 + 3.9.0 com.github.oshi diff --git a/fate-serving-proxy/bin/service.sh b/fate-serving-proxy/bin/service.sh index d6cc0ccb8..8c3d65046 100644 --- a/fate-serving-proxy/bin/service.sh +++ b/fate-serving-proxy/bin/service.sh @@ -24,7 +24,7 @@ basepath=$(cd `dirname $0`;pwd) configpath=$(cd $basepath/conf;pwd) module=serving-proxy main_class=com.webank.ai.fate.serving.proxy.bootstrap.Bootstrap -module_version=2.1.6 +module_version=2.1.7 case "$1" in diff --git a/fate-serving-proxy/pom.xml b/fate-serving-proxy/pom.xml index f7b1c4586..de1957b92 100644 --- a/fate-serving-proxy/pom.xml +++ b/fate-serving-proxy/pom.xml @@ -88,7 +88,7 @@ commons-net commons-net - 3.8.0 + 3.9.0 diff --git a/fate-serving-server/bin/service.sh b/fate-serving-server/bin/service.sh index 56c68996c..c0c7225fb 100644 --- a/fate-serving-server/bin/service.sh +++ b/fate-serving-server/bin/service.sh @@ -24,7 +24,7 @@ basepath=$(cd `dirname $0`;pwd) configpath=$(cd $basepath/conf;pwd) module=serving-server main_class=com.webank.ai.fate.serving.Bootstrap -module_version=2.1.6 +module_version=2.1.7 case "$1" in diff --git a/fate-serving-server/pom.xml b/fate-serving-server/pom.xml index eca0bc6be..abfbe1b59 100644 --- a/fate-serving-server/pom.xml +++ b/fate-serving-server/pom.xml @@ -94,7 +94,7 @@ commons-net commons-net - 3.8.0 + 3.9.0 diff --git a/pom.xml b/pom.xml index 652c1a895..1893a3d97 100644 --- a/pom.xml +++ b/pom.xml @@ -30,14 +30,14 @@ fate-serving-register fate-serving-common fate-serving-proxy - fate-serving-admin - fate-serving-admin-ui + + fate-serving-extension fate-serving-sdk - 2.1.6 + 2.1.7 1.8 UTF-8 UTF-8 @@ -48,7 +48,7 @@ 0.6.1 1.6.1 2.7.0 - 2.13.3 + 2.15.3 2.9.0 2.17.1 true @@ -226,7 +226,7 @@ org.yaml snakeyaml - 1.26 + 1.32 compile @@ -251,7 +251,7 @@ commons-net commons-net - 3.8.0 + 3.9.0 From ff8ddc3d4346ece70d9676a433abbadb181d2520 Mon Sep 17 00:00:00 2001 From: tubaobao3 <13572814277@163.com> Date: Thu, 9 Nov 2023 15:31:52 +0800 Subject: [PATCH 35/35] =?UTF-8?q?serving-server=E6=9C=8D=E5=8A=A1http?= =?UTF-8?q?=E7=AB=AF=E5=8F=A3=E8=AE=BE=E7=BD=AE=E5=8F=8A=E8=AF=B4=E6=98=8E?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: hecy7 --- .../src/main/resources/serving-server.properties | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/fate-serving-server/src/main/resources/serving-server.properties b/fate-serving-server/src/main/resources/serving-server.properties index e4c86932d..1736ba86f 100644 --- a/fate-serving-server/src/main/resources/serving-server.properties +++ b/fate-serving-server/src/main/resources/serving-server.properties @@ -13,7 +13,10 @@ # See the License for the specific language governing permissions and # limitations under the License. # -port=8000 +#主机启动serving-server进程服务时,port端口定义为grpc端口 +port=8000 +#主机启动serving-server进程服务时,http端口在此定义;采用kubefate部署时,请注释关闭此选项,在k8s中serving-server对应svc资源文件上进行修改 +server.port=8185 #serviceRoleName=serving # cache #remoteModelInferenceResultCacheSwitch=false @@ -55,4 +58,4 @@ zk.url=localhost:2181,localhost:2182,localhost:2183 # LR algorithm config #lr.split.size=500 -#lr.use.parallel=false \ No newline at end of file +#lr.use.parallel=false