Skip to content

Commit

Permalink
Fix #757 Fix Tidb create table issue
Browse files Browse the repository at this point in the history
  • Loading branch information
huafengw authored Jul 20, 2017
1 parent 3a25cbc commit 8cb0880
Show file tree
Hide file tree
Showing 6 changed files with 30 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -804,13 +804,10 @@ public synchronized void formatDataBase() throws MetaStoreException {
initializeDataBase();
}

public String aggregateSQLStatement(AccessCountTable destinationTable
public void aggregateTables(AccessCountTable destinationTable
, List<AccessCountTable> tablesToAggregate) throws MetaStoreException {
try {
return accessCountDao
.aggregateSQLStatement(destinationTable, tablesToAggregate);
} catch (EmptyResultDataAccessException e) {
return null;
accessCountDao.aggregateTables(destinationTable, tablesToAggregate);
} catch (Exception e) {
throw new MetaStoreException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public void delete(AccessCountTable table) {
jdbcTemplate.update(sql, table.getTableName());
}

public static String createTableSQL(String tableName) {
public static String createAccessCountTableSQL(String tableName) {
return String.format(
"CREATE TABLE %s (%s INTEGER NOT NULL, %s INTEGER NOT NULL)",
tableName, FILE_FIELD, ACCESSCOUNT_FIELD);
Expand All @@ -83,16 +83,21 @@ public List<AccessCountTable> getAllSortedTables() {
return jdbcTemplate.query(sql, new AccessCountRowMapper());
}

public String aggregateSQLStatement(AccessCountTable destinationTable
, List<AccessCountTable> tablesToAggregate) {
StringBuilder statement = new StringBuilder();
statement.append("CREATE TABLE " + destinationTable.getTableName() + " as ");
statement.append("SELECT " + AccessCountDao.FILE_FIELD + ", SUM(" +
AccessCountDao.ACCESSCOUNT_FIELD + ") as " +
AccessCountDao.ACCESSCOUNT_FIELD + " FROM (");
statement.append(getUnionStatement(tablesToAggregate));
statement.append(") tmp GROUP BY " + AccessCountDao.FILE_FIELD);
return statement.toString();
public void aggregateTables(
AccessCountTable destinationTable, List<AccessCountTable> tablesToAggregate) {
JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource);
String create = AccessCountDao.createAccessCountTableSQL(destinationTable.getTableName());
jdbcTemplate.execute(create);
String insert =
String.format(
"INSERT INTO %s SELECT %s, SUM(%s) AS %s FROM(%s) GROUP BY %s;",
destinationTable.getTableName(),
AccessCountDao.FILE_FIELD,
AccessCountDao.ACCESSCOUNT_FIELD,
AccessCountDao.ACCESSCOUNT_FIELD,
getUnionStatement(tablesToAggregate),
AccessCountDao.FILE_FIELD);
jdbcTemplate.execute(insert);
}

public Map<Long, Integer> getHotFiles(List<AccessCountTable> tables, int topNum)
Expand Down Expand Up @@ -139,9 +144,10 @@ public void createProportionTable(AccessCountTable dest, AccessCountTable source
double percentage =
((double) dest.getEndTime() - dest.getStartTime())
/ (source.getEndTime() - source.getStartTime());
jdbcTemplate.execute(AccessCountDao.createAccessCountTableSQL(dest.getTableName()));
String sql =
String.format(
"CREATE TABLE %s AS SELECT %s, ROUND(%s.%s * %s) AS %s FROM %s",
"INSERT INTO %s SELECT %s, ROUND(%s.%s * %s) AS %s FROM %s",
dest.getTableName(),
AccessCountDao.FILE_FIELD,
source.getTableName(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,7 @@ public AccessCountTableAggregator(MetaStore metaStore) {
public void aggregate(AccessCountTable destinationTable,
List<AccessCountTable> tablesToAggregate) throws MetaStoreException {
if (tablesToAggregate.size() > 0) {
String aggregateSQ = metaStore.aggregateSQLStatement(destinationTable, tablesToAggregate);
metaStore.execute(aggregateSQ);
metaStore.aggregateTables(destinationTable, tablesToAggregate);
metaStore.insertAccessCountTable(destinationTable);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public void addAccessEvents(List<FileAccessEvent> eventList) {

private void createTable() {
AccessCountTable table = new AccessCountTable(currentWindow.start, currentWindow.end);
String createTable = AccessCountDao.createTableSQL(table.getTableName());
String createTable = AccessCountDao.createAccessCountTableSQL(table.getTableName());
try {
adapter.execute(createTable);
adapter.insertAccessCountTable(table);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,9 @@ public void testAccessCountTableManager() throws InterruptedException {

private void createTables(Connection connection) throws Exception {
Statement statement = connection.createStatement();
statement.execute(AccessCountDao.createTableSQL("expect1"));
statement.execute(AccessCountDao.createTableSQL("expect2"));
statement.execute(AccessCountDao.createTableSQL("expect3"));
statement.execute(AccessCountDao.createAccessCountTableSQL("expect1"));
statement.execute(AccessCountDao.createAccessCountTableSQL("expect2"));
statement.execute(AccessCountDao.createAccessCountTableSQL("expect3"));
statement.close();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,10 @@ public class TestTableAggregator extends DBTest {

private void createTables(IDatabaseConnection connection) throws Exception {
Statement statement = connection.getConnection().createStatement();
statement.execute(AccessCountDao.createTableSQL("table1"));
statement.execute(AccessCountDao.createTableSQL("table2"));
statement.execute(AccessCountDao.createTableSQL("table3"));
statement.execute(AccessCountDao.createTableSQL("expect"));
statement.execute(AccessCountDao.createAccessCountTableSQL("table1"));
statement.execute(AccessCountDao.createAccessCountTableSQL("table2"));
statement.execute(AccessCountDao.createAccessCountTableSQL("table3"));
statement.execute(AccessCountDao.createAccessCountTableSQL("expect"));
statement.close();
}

Expand All @@ -58,11 +58,7 @@ public void testAggregate() throws Exception {
AccessCountTable table1 = new AccessCountTable("table1", 0L, 0L, false);
AccessCountTable table2 = new AccessCountTable("table2", 0L, 0L, false);
AccessCountTable table3 = new AccessCountTable("table3", 0L, 0L, false);
String aggregateStatement =
metaStore.aggregateSQLStatement(result, Lists.newArrayList(table1, table2, table3));
Statement statement = databaseTester.getConnection().getConnection().createStatement();
statement.execute(aggregateStatement);
statement.close();
metaStore.aggregateTables(result, Lists.newArrayList(table1, table2, table3));

ITable actual = databaseTester.getConnection().createTable(result.getTableName());
ITable expect = databaseTester.getDataSet().getTable("expect");
Expand Down

0 comments on commit 8cb0880

Please sign in to comment.