Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature(configurable) store configuration in db #45

Merged
merged 1 commit into from
Sep 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,28 +16,18 @@
*/
package com.alibaba.rsqldb.dim.intelligence;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.aliyuncs.CommonRequest;
import com.aliyuncs.CommonResponse;
import com.aliyuncs.DefaultAcsClient;
import com.aliyuncs.IAcsClient;
import com.aliyuncs.profile.DefaultProfile;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.rocketmq.streams.common.cache.compress.impl.IntValueKV;
import org.apache.rocketmq.streams.common.channel.sink.ISink;
import org.apache.rocketmq.streams.common.component.ComponentCreator;
import org.apache.rocketmq.streams.common.configurable.BasedConfigurable;
import org.apache.rocketmq.streams.common.configurable.IAfterConfigurableRefreshListener;
import org.apache.rocketmq.streams.common.configurable.IConfigurableService;
import org.apache.rocketmq.streams.common.configurable.annotation.ENVDependence;
import org.apache.rocketmq.streams.common.configure.ConfigureFileKey;
import org.apache.rocketmq.streams.common.dboperator.IDBDriver;
import org.apache.rocketmq.streams.common.utils.NumberUtils;
import org.apache.rocketmq.streams.common.utils.SQLUtil;
import org.apache.rocketmq.streams.db.driver.JDBCDriver;

import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -171,29 +161,6 @@ public void doProcessAfterRefreshConfigurable(IConfigurableService configurableS
this.outputDataSource = configurableService.queryConfigurable(ISink.TYPE, datasourceName);
}

public void startIntelligence() {
boolean success = dbInit();
if (success) {
startIntelligenceInner();
} else {
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
boolean success = false;
while (!success) {
success = dbInit();
try {
Thread.sleep(60 * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
startIntelligenceInner();
}
});
thread.start();
}
}

public void startIntelligenceInner() {
String sql = getSQL();
Expand Down Expand Up @@ -296,54 +263,6 @@ public void run() {
}
}

public boolean dbInit() {
try {
int successCode = 200;
String region = ComponentCreator.getProperties().getProperty(ConfigureFileKey.INTELLIGENCE_REGION);
String ak = ComponentCreator.getProperties().getProperty(
ConfigureFileKey.INTELLIGENCE_AK);
String sk = ComponentCreator.getProperties().getProperty(
ConfigureFileKey.INTELLIGENCE_SK);
String endpoint = ComponentCreator.getProperties().getProperty(
ConfigureFileKey.INTELLIGENCE_TIP_DB_ENDPOINT);
if (StringUtils.isNotBlank(region) && StringUtils.isNotBlank(ak) && StringUtils.isNotBlank(sk) && StringUtils.isNotBlank(endpoint)) {
DefaultProfile profile = DefaultProfile.getProfile(region, ak, sk);
IAcsClient client = new DefaultAcsClient(profile);
CommonRequest request = new CommonRequest();
request.setDomain(endpoint);
request.setVersion("2016-03-16");
request.setAction("DescribeTiDataSource");
CommonResponse response = client.getCommonResponse(request);
int code = response.getHttpStatus();
if (successCode == code) {
String content = response.getData();
if (StringUtils.isNotBlank(content)) {
JSONObject obj = JSON.parseObject(content);
JSONObject dbInfo = obj.getJSONObject("dBInfo");
if (dbInfo != null) {
String dbUrl = "jdbc:mysql://" + dbInfo.getString("dbConnection") + ":" + dbInfo.getInteger(
"port") + "/" + dbInfo.getString("dBName");
String dbUserName = dbInfo.getString("userName");
String dbPassword = dbInfo.getString("passWord");
JDBCDriver dataSource = (JDBCDriver) this.outputDataSource;
dataSource.setUrl(dbUrl);
dataSource.setPassword(dbPassword);
dataSource.setUserName(dbUserName);
dataSource.setHasInit(false);
dataSource.init();
LOG.debug("succeed in getting db information from tip service!");
return true;
}
}
}
}
LOG.error("failed in getting db information from tip service!");
return false;
} catch (Exception e) {
LOG.error("failed in getting db information from tip service!", e);
return false;
}
}

/**
* 把存储0/1字符串的值,转化成bit
Expand Down
10 changes: 8 additions & 2 deletions rsqldb-disk/conf/rsqldb.conf
Original file line number Diff line number Diff line change
@@ -1,8 +1,14 @@
NAMESRV_ADDR=127.0.0.1:9876
namespace=test

## checkpoint存储配置,可以是memory, DB 或者file, 除了checkpoint外, 任务序列化的内容也会被缓存在该存储
dipper.configurable.service.type=file
## 流计算经过任务解析器(rsqldb-server)解析后保存位置,可以是memory, DB 或者file
configurable.storage.type=file

#如果保存类型是DB,需要提供DB配置;
jdbc.url=
jdbc.username=
jdbc.password=


## 窗口配置
#group by 时窗口大小,单位s,sql中不配置或者这里不指定的话默认600s
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import com.alibaba.rsqldb.parser.parser.result.IParseResult;
import com.alibaba.rsqldb.parser.util.ThreadLocalUtil;
import org.apache.calcite.sql.SqlNode;
import org.apache.rocketmq.streams.common.configure.ConfigureFileKey;
import org.apache.rocketmq.streams.common.metadata.MetaData;
import org.apache.rocketmq.streams.common.metadata.MetaDataField;
import org.apache.rocketmq.streams.common.utils.ContantsUtil;
Expand Down Expand Up @@ -63,6 +62,9 @@
* dimension table join builder specially for url, ip and domain
*/
public class SnapshotBuilder extends SelectSQLBuilder {
private static final String INTELLIGENCE_JDBC_URL = "intelligence.rds.jdbc.url";
private static final String INTELLIGENCE_JDBC_USERNAME = "intelligence.rds.jdbc.username";
private static final String INTELLIGENCE_JDBC_PASSWORD = "intelligence.rds.jdbc.password";

protected static Map<String, AbstractIntelligenceCache> INTELLIGENCE = new HashMap<>();
protected static Map<CreateSQLBuilder, AbstractDim> dims = new HashMap<>();
Expand Down Expand Up @@ -348,9 +350,9 @@ protected void buildIntelligence(long pollingTime, AbstractIntelligenceCache int
* 创建维表连接对象, 默认情报的数据连接是单独配置好的,不依赖sql中create语句
*/
JDBCDriver dbChannel = new JDBCDriver();
dbChannel.setUrl(ConfigureFileKey.INTELLIGENCE_JDBC_URL);
dbChannel.setPassword(ConfigureFileKey.INTELLIGENCE_JDBC_PASSWORD);
dbChannel.setUserName(ConfigureFileKey.INTELLIGENCE_JDBC_USERNAME);
dbChannel.setUrl(INTELLIGENCE_JDBC_URL);
dbChannel.setPassword(INTELLIGENCE_JDBC_PASSWORD);
dbChannel.setUserName(INTELLIGENCE_JDBC_USERNAME);
getPipelineBuilder().addConfigurables(dbChannel);

AbstractIntelligenceCache intelligence = ReflectUtil.forInstance(intelligenceCache.getClass());
Expand Down