Skip to content

Commit

Permalink
Solve #1719, Add top N hottest files support based on access
Browse files Browse the repository at this point in the history
  • Loading branch information
littlezhou committed May 2, 2018
1 parent c4ddbf0 commit 6e51540
Show file tree
Hide file tree
Showing 7 changed files with 99 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ private String unfoldFunctionCalls(String sql) {
String paraName = m.groupCount() == 2 ? m.group(2) : null;
List<Object> params = tr.getParameter(paraName);
String value = callFunction(funcName, params);
ret = ret.replace(rep, value);
ret = ret.replace(rep, value == null ? "" : value);
}
return ret;
}
Expand All @@ -112,7 +112,9 @@ public List<String> executeFileRuleQuery() {
if (index == tr.getRetSqlIndex()) {
ret = adapter.executeFilesPathQuery(sql);
} else {
adapter.execute(sql);
if (sql != null && sql.length() > 3) {
adapter.execute(sql);
}
}
index++;
} catch (MetaStoreException e) {
Expand Down Expand Up @@ -143,6 +145,23 @@ public String callFunction(String funcName, List<Object> parameters) {
}
}

public String genVirtualAccessCountTableMaxValue(List<Object> parameters) {
List<Object> paraList = (List<Object>) parameters.get(0);
String table = (String) parameters.get(1);
String var = (String) parameters.get(2);
Long num = (Long) paraList.get(1);
String sql0 = "SELECT min(count) FROM ( SELECT * FROM " + table
+ " ORDER BY count LIMIT " + num + ") AS " + table + "_TMP;";
Long count = null;
try {
count = adapter.queryForLong(sql0);
} catch (MetaStoreException e) {
LOG.error("Get maximum access count from table '" + table + "' error.", e);
}
ctx.setProperty(var, count == null ? 0L : count);
return null;
}

public String genVirtualAccessCountTable(List<Object> parameters) {
List<Object> paraList = (List<Object>) parameters.get(0);
String newTable = (String) parameters.get(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.smartdata.metastore.dao.FileDiffDao;
import org.smartdata.metastore.dao.FileInfoDao;
import org.smartdata.metastore.dao.FileStateDao;
import org.smartdata.metastore.dao.GeneralDao;
import org.smartdata.metastore.dao.GlobalConfigDao;
import org.smartdata.metastore.dao.GroupsDao;
import org.smartdata.metastore.dao.MetaStoreHelper;
Expand Down Expand Up @@ -121,6 +122,7 @@ public class MetaStore implements CopyMetaService, CmdletMetaService, BackupMeta
private ClusterInfoDao clusterInfoDao;
private SystemInfoDao systemInfoDao;
private FileStateDao fileStateDao;
private GeneralDao generalDao;

public MetaStore(DBPool pool) throws MetaStoreException {
this.pool = pool;
Expand All @@ -145,6 +147,7 @@ public MetaStore(DBPool pool) throws MetaStoreException {
clusterInfoDao = new ClusterInfoDao(pool.getDataSource());
systemInfoDao = new SystemInfoDao(pool.getDataSource());
fileStateDao = new FileStateDao(pool.getDataSource());
generalDao = new GeneralDao(pool.getDataSource());
}

public Connection getConnection() throws MetaStoreException {
Expand All @@ -168,6 +171,14 @@ private void closeConnection(Connection conn) throws MetaStoreException {
}
}

public Long queryForLong(String sql) throws MetaStoreException {
try {
return generalDao.queryForLong(sql);
} catch (Exception e) {
throw new MetaStoreException(e);
}
}

public synchronized void addUser(String userName) throws MetaStoreException {
try {
userDao.addUser(userName);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.smartdata.metastore.dao;

import org.springframework.jdbc.core.JdbcTemplate;

import javax.sql.DataSource;

public class GeneralDao {
private DataSource dataSource;

public GeneralDao(DataSource dataSource) {
this.dataSource = dataSource;
}

public Long queryForLong(String sql) {
JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource);
Long x = jdbcTemplate.queryForObject(sql, Long.class);
return x;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ public class FileObject extends SmartObject {
new Property("accessCount", ValueType.LONG,
Arrays.asList(ValueType.TIMEINTVAL),
"VIRTUAL_ACCESS_COUNT_TABLE", "", false, "count"));
PROPERTIES.put("accessCountMax",
new Property("accessCountMax", ValueType.LONG,
Arrays.asList(ValueType.TIMEINTVAL, ValueType.LONG),
"VIRTUAL_ACCESS_COUNT_TABLE", "", false, "count"));
PROPERTIES.put("length",
new Property("length", ValueType.LONG,
null, "file", "length", false));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,4 +64,8 @@ public String formatParameters() {
public String instId() {
return property.instId(values);
}

public String instId(int s, int e) {
return property.instId(values.subList(s, e));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -884,6 +884,27 @@ public NodeTransResult doGenerateSql(TreeNode root, String tableName) throws IOE
return new NodeTransResult(virTab, realParas.formatParameters());
}

if (p.getPropertyName().equals("accessCountMax")) {
String rid = "";
if (transCtx != null) {
rid = transCtx.getRuleId() + "_";
}
String virTab = "VIR_ACC_CNT_TAB_" + rid + "accessCount_"
+ realParas.getValues().get(0).toString();
if (!tempTableNames.contains(virTab)) {
tempTableNames.add(virTab);
sqlStatements.add("DROP TABLE IF EXISTS " + virTab + ";");
sqlStatements.add("$@genVirtualAccessCountTable(" + virTab + ")");
dynamicParameters.put(virTab, Arrays.asList(realParas.getValues(), virTab));
}
String mStr = virTab + "_max_" + realParas.getValues().get(1).toString();
String mStrValue = mStr + "_value";
sqlStatements.add("$@genVirtualAccessCountTableMaxValue(" + mStr + ")");
dynamicParameters.put(mStr, Arrays.asList(realParas.getValues(), virTab, mStrValue));
procAcc = true;
return new NodeTransResult(null, "$" + mStrValue);
}

return new NodeTransResult(p.getTableName(), realParas.formatParameters());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ public class TestSmartRuleStringParser {
public void testRuleTranslate() throws Exception {
List<String> rules = new LinkedList<>();
rules.add("file : path matches \"/test/*\" | sync -dest \"hdfs://remotecluster:port/somedir\"");
rules.add("file : accessCount(10min) > accessCountMax(10min, 10) | sleep -ms 0");

for (String rule : rules) {
parseRule(rule);
}
Expand Down

0 comments on commit 6e51540

Please sign in to comment.