From 6d9bc92d825943fa4d71cf77bab2b93c82a50578 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=BB=B4=E7=AB=A0?= Date: Wed, 21 Sep 2022 20:44:11 +0800 Subject: [PATCH] feature(configurable) store configurable in db --- .../AbstractIntelligenceCache.java | 81 ------------------- rsqldb-disk/conf/rsqldb.conf | 10 ++- .../parser/builder/SnapshotBuilder.java | 10 ++- 3 files changed, 14 insertions(+), 87 deletions(-) diff --git a/rsql-dim/src/main/java/com/alibaba/rsqldb/dim/intelligence/AbstractIntelligenceCache.java b/rsql-dim/src/main/java/com/alibaba/rsqldb/dim/intelligence/AbstractIntelligenceCache.java index 93f6afd..eb64f6d 100644 --- a/rsql-dim/src/main/java/com/alibaba/rsqldb/dim/intelligence/AbstractIntelligenceCache.java +++ b/rsql-dim/src/main/java/com/alibaba/rsqldb/dim/intelligence/AbstractIntelligenceCache.java @@ -16,28 +16,18 @@ */ package com.alibaba.rsqldb.dim.intelligence; -import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; -import com.aliyuncs.CommonRequest; -import com.aliyuncs.CommonResponse; -import com.aliyuncs.DefaultAcsClient; -import com.aliyuncs.IAcsClient; -import com.aliyuncs.profile.DefaultProfile; -import org.apache.commons.lang3.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.rocketmq.streams.common.cache.compress.impl.IntValueKV; import org.apache.rocketmq.streams.common.channel.sink.ISink; -import org.apache.rocketmq.streams.common.component.ComponentCreator; import org.apache.rocketmq.streams.common.configurable.BasedConfigurable; import org.apache.rocketmq.streams.common.configurable.IAfterConfigurableRefreshListener; import org.apache.rocketmq.streams.common.configurable.IConfigurableService; import org.apache.rocketmq.streams.common.configurable.annotation.ENVDependence; -import org.apache.rocketmq.streams.common.configure.ConfigureFileKey; import org.apache.rocketmq.streams.common.dboperator.IDBDriver; import org.apache.rocketmq.streams.common.utils.NumberUtils; import org.apache.rocketmq.streams.common.utils.SQLUtil; -import org.apache.rocketmq.streams.db.driver.JDBCDriver; import java.util.List; import java.util.Map; @@ -171,29 +161,6 @@ public void doProcessAfterRefreshConfigurable(IConfigurableService configurableS this.outputDataSource = configurableService.queryConfigurable(ISink.TYPE, datasourceName); } - public void startIntelligence() { - boolean success = dbInit(); - if (success) { - startIntelligenceInner(); - } else { - Thread thread = new Thread(new Runnable() { - @Override - public void run() { - boolean success = false; - while (!success) { - success = dbInit(); - try { - Thread.sleep(60 * 1000); - } catch (InterruptedException e) { - e.printStackTrace(); - } - } - startIntelligenceInner(); - } - }); - thread.start(); - } - } public void startIntelligenceInner() { String sql = getSQL(); @@ -296,54 +263,6 @@ public void run() { } } - public boolean dbInit() { - try { - int successCode = 200; - String region = ComponentCreator.getProperties().getProperty(ConfigureFileKey.INTELLIGENCE_REGION); - String ak = ComponentCreator.getProperties().getProperty( - ConfigureFileKey.INTELLIGENCE_AK); - String sk = ComponentCreator.getProperties().getProperty( - ConfigureFileKey.INTELLIGENCE_SK); - String endpoint = ComponentCreator.getProperties().getProperty( - ConfigureFileKey.INTELLIGENCE_TIP_DB_ENDPOINT); - if (StringUtils.isNotBlank(region) && StringUtils.isNotBlank(ak) && StringUtils.isNotBlank(sk) && StringUtils.isNotBlank(endpoint)) { - DefaultProfile profile = DefaultProfile.getProfile(region, ak, sk); - IAcsClient client = new DefaultAcsClient(profile); - CommonRequest request = new CommonRequest(); - request.setDomain(endpoint); - request.setVersion("2016-03-16"); - request.setAction("DescribeTiDataSource"); - CommonResponse response = client.getCommonResponse(request); - int code = response.getHttpStatus(); - if (successCode == code) { - String content = response.getData(); - if (StringUtils.isNotBlank(content)) { - JSONObject obj = JSON.parseObject(content); - JSONObject dbInfo = obj.getJSONObject("dBInfo"); - if (dbInfo != null) { - String dbUrl = "jdbc:mysql://" + dbInfo.getString("dbConnection") + ":" + dbInfo.getInteger( - "port") + "/" + dbInfo.getString("dBName"); - String dbUserName = dbInfo.getString("userName"); - String dbPassword = dbInfo.getString("passWord"); - JDBCDriver dataSource = (JDBCDriver) this.outputDataSource; - dataSource.setUrl(dbUrl); - dataSource.setPassword(dbPassword); - dataSource.setUserName(dbUserName); - dataSource.setHasInit(false); - dataSource.init(); - LOG.debug("succeed in getting db information from tip service!"); - return true; - } - } - } - } - LOG.error("failed in getting db information from tip service!"); - return false; - } catch (Exception e) { - LOG.error("failed in getting db information from tip service!", e); - return false; - } - } /** * 把存储0/1字符串的值,转化成bit diff --git a/rsqldb-disk/conf/rsqldb.conf b/rsqldb-disk/conf/rsqldb.conf index 0f39c6b..bff3d34 100644 --- a/rsqldb-disk/conf/rsqldb.conf +++ b/rsqldb-disk/conf/rsqldb.conf @@ -1,8 +1,14 @@ NAMESRV_ADDR=127.0.0.1:9876 namespace=test -## checkpoint存储配置,可以是memory, DB 或者file, 除了checkpoint外, 任务序列化的内容也会被缓存在该存储 -dipper.configurable.service.type=file +## 流计算经过任务解析器(rsqldb-server)解析后保存位置,可以是memory, DB 或者file +configurable.storage.type=file + +#如果保存类型是DB,需要提供DB配置; +jdbc.url= +jdbc.username= +jdbc.password= + ## 窗口配置 #group by 时窗口大小,单位s,sql中不配置或者这里不指定的话默认600s diff --git a/rsqldb-parser/src/main/java/com/alibaba/rsqldb/parser/parser/builder/SnapshotBuilder.java b/rsqldb-parser/src/main/java/com/alibaba/rsqldb/parser/parser/builder/SnapshotBuilder.java index e15dd56..f7d22e6 100644 --- a/rsqldb-parser/src/main/java/com/alibaba/rsqldb/parser/parser/builder/SnapshotBuilder.java +++ b/rsqldb-parser/src/main/java/com/alibaba/rsqldb/parser/parser/builder/SnapshotBuilder.java @@ -32,7 +32,6 @@ import com.alibaba.rsqldb.parser.parser.result.IParseResult; import com.alibaba.rsqldb.parser.util.ThreadLocalUtil; import org.apache.calcite.sql.SqlNode; -import org.apache.rocketmq.streams.common.configure.ConfigureFileKey; import org.apache.rocketmq.streams.common.metadata.MetaData; import org.apache.rocketmq.streams.common.metadata.MetaDataField; import org.apache.rocketmq.streams.common.utils.ContantsUtil; @@ -63,6 +62,9 @@ * dimension table join builder specially for url, ip and domain */ public class SnapshotBuilder extends SelectSQLBuilder { + private static final String INTELLIGENCE_JDBC_URL = "intelligence.rds.jdbc.url"; + private static final String INTELLIGENCE_JDBC_USERNAME = "intelligence.rds.jdbc.username"; + private static final String INTELLIGENCE_JDBC_PASSWORD = "intelligence.rds.jdbc.password"; protected static Map INTELLIGENCE = new HashMap<>(); protected static Map dims = new HashMap<>(); @@ -348,9 +350,9 @@ protected void buildIntelligence(long pollingTime, AbstractIntelligenceCache int * 创建维表连接对象, 默认情报的数据连接是单独配置好的,不依赖sql中create语句 */ JDBCDriver dbChannel = new JDBCDriver(); - dbChannel.setUrl(ConfigureFileKey.INTELLIGENCE_JDBC_URL); - dbChannel.setPassword(ConfigureFileKey.INTELLIGENCE_JDBC_PASSWORD); - dbChannel.setUserName(ConfigureFileKey.INTELLIGENCE_JDBC_USERNAME); + dbChannel.setUrl(INTELLIGENCE_JDBC_URL); + dbChannel.setPassword(INTELLIGENCE_JDBC_PASSWORD); + dbChannel.setUserName(INTELLIGENCE_JDBC_USERNAME); getPipelineBuilder().addConfigurables(dbChannel); AbstractIntelligenceCache intelligence = ReflectUtil.forInstance(intelligenceCache.getClass());