Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Fix-8544][API] The folder's size can't be updated when creating, updating or deleting a resource file int the folder. #9107

Merged
merged 6 commits into from
Mar 23, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ProgramType;
import org.apache.dolphinscheduler.spi.enums.ResourceType;
import org.apache.dolphinscheduler.common.utils.FileUtils;
import org.apache.dolphinscheduler.common.utils.HadoopUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
Expand All @@ -50,6 +49,7 @@
import org.apache.dolphinscheduler.dao.mapper.UdfFuncMapper;
import org.apache.dolphinscheduler.dao.mapper.UserMapper;
import org.apache.dolphinscheduler.dao.utils.ResourceProcessDefinitionUtils;
import org.apache.dolphinscheduler.spi.enums.ResourceType;

import org.apache.commons.beanutils.BeanMap;
import org.apache.commons.collections.CollectionUtils;
Expand Down Expand Up @@ -81,6 +81,7 @@
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.google.common.base.Joiner;
import com.google.common.io.Files;

/**
Expand Down Expand Up @@ -221,6 +222,7 @@ public Result<Object> createResource(User loginUser,

try {
resourcesMapper.insert(resource);
updateParentResourceSize(resource, resource.getSize());
putMsg(result, Status.SUCCESS);
Map<Object, Object> dataMap = new BeanMap(resource);
Map<String, Object> resultMap = new HashMap<>();
Expand All @@ -244,6 +246,33 @@ public Result<Object> createResource(User loginUser,
return result;
}

/**
* update the folder's size of the resource
*
* @param resource the current resource
* @param size size
*/
private void updateParentResourceSize(Resource resource, long size) {
if (resource.getSize() > 0) {
String[] splits = resource.getFullName().split("/");
for (int i = 1; i < splits.length; i++) {
String parentFullName = Joiner.on("/").join(Arrays.copyOfRange(splits, 0, i));
if (StringUtils.isNotBlank(parentFullName)) {
List<Resource> resources = resourcesMapper.queryResource(parentFullName, resource.getType().ordinal());
if (CollectionUtils.isNotEmpty(resources)) {
Resource parentResource = resources.get(0);
if (parentResource.getSize() + size >= 0) {
parentResource.setSize(parentResource.getSize() + size);
} else {
parentResource.setSize(0L);
}
resourcesMapper.updateById(parentResource);
}
}
}
}
}

/**
* check resource is exists
*
Expand Down Expand Up @@ -360,6 +389,7 @@ public Result<Object> updateResource(User loginUser,

// updateResource data
Date now = new Date();
long originFileSize = resource.getSize();

resource.setAlias(name);
resource.setFileName(name);
Expand Down Expand Up @@ -445,6 +475,8 @@ public Result<Object> updateResource(User loginUser,
throw new ServiceException(String.format("delete resource: %s failed.", originFullName));
}
}

updateParentResourceSize(resource, resource.getSize() - originFileSize);
return result;
}

Expand Down Expand Up @@ -727,11 +759,15 @@ public Result<Object> delete(User loginUser, int resourceId) throws IOException
String hdfsFilename = HadoopUtils.getHdfsFileName(resource.getType(), tenantCode, resource.getFullName());

//delete data in database
resourcesMapper.selectBatchIds(Arrays.asList(needDeleteResourceIdArray)).forEach(item -> {
updateParentResourceSize(item, item.getSize() * -1);
});
resourcesMapper.deleteIds(needDeleteResourceIdArray);
resourceUserMapper.deleteResourceUserArray(0, needDeleteResourceIdArray);

//delete file on hdfs
HadoopUtils.getInstance().delete(hdfsFilename, true);

putMsg(result, Status.SUCCESS);

return result;
Expand Down Expand Up @@ -941,6 +977,7 @@ public Result<Object> onlineCreateResource(User loginUser, ResourceType type, St
Resource resource = new Resource(pid,name,fullName,false,desc,name,loginUser.getId(),type,content.getBytes().length,now,now);

resourcesMapper.insert(resource);
updateParentResourceSize(resource, resource.getSize());

putMsg(result, Status.SUCCESS);
Map<Object, Object> dataMap = new BeanMap(resource);
Expand Down Expand Up @@ -1035,10 +1072,13 @@ public Result<Object> updateResourceContent(int resourceId, String content) {
if (StringUtils.isEmpty(tenantCode)) {
return result;
}
long originFileSize = resource.getSize();
resource.setSize(content.getBytes().length);
resource.setUpdateTime(new Date());
resourcesMapper.updateById(resource);

updateParentResourceSize(resource, resource.getSize() - originFileSize);

result = uploadContentToHdfs(resource.getFullName(), tenantCode, content);
if (!result.getCode().equals(Status.SUCCESS.getCode())) {
throw new ServiceException(result.getMsg());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ public void initDolphinSchedulerSchema() {
logger.info("Start initializing the DolphinScheduler manager table structure");
upgradeDao.initSchema();
}

public void upgradeDolphinScheduler() throws IOException {
// Gets a list of all upgrades
List<String> schemaList = SchemaUtils.getAllSchemaList();
Expand All @@ -99,12 +100,13 @@ public void upgradeDolphinScheduler() throws IOException {
}
// The target version of the upgrade
String schemaVersion = "";
String currentVersion = version;
for (String schemaDir : schemaList) {
schemaVersion = schemaDir.split("_")[0];
if (SchemaUtils.isAGreatVersion(schemaVersion, version)) {
logger.info("upgrade DolphinScheduler metadata version from {} to {}", version, schemaVersion);
logger.info("Begin upgrading DolphinScheduler's table structure");
upgradeDao.upgradeDolphinScheduler(schemaDir);
upgradeDao.upgradeDolphinScheduler(schemaDir);
if ("1.3.0".equals(schemaVersion)) {
upgradeDao.upgradeDolphinSchedulerWorkerGroup();
} else if ("1.3.2".equals(schemaVersion)) {
Expand All @@ -115,6 +117,10 @@ public void upgradeDolphinScheduler() throws IOException {
version = schemaVersion;
}
}

if (SchemaUtils.isAGreatVersion("2.0.6", currentVersion) && SchemaUtils.isAGreatVersion(SchemaUtils.getSoftVersion(), currentVersion)) {
upgradeDao.upgradeDolphinSchedulerResourceFileSize();
}
}

// Assign the value of the version field in the version table to the version of the product
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,25 @@

package org.apache.dolphinscheduler.tools.datasource.dao;

import java.sql.SQLException;
import java.util.Objects;
import org.apache.dolphinscheduler.common.utils.ConnectionUtils;
import org.apache.dolphinscheduler.spi.utils.StringUtils;

import org.apache.directory.api.util.Strings;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.base.Joiner;

/**
* resource dao
*/
Expand Down Expand Up @@ -66,4 +74,89 @@ Map<String, Integer> listAllResources(Connection conn) {
return resourceMap;
}

/**
* list all resources by the type
*
* @param conn connection
* @return map that key is full_name and value is the folder's size
*/
private Map<String, Long> listAllResourcesByFileType(Connection conn, int type) {
Map<String, Long> resourceSizeMap = new HashMap<>();

String sql = String.format("SELECT full_name, type, size, is_directory FROM t_ds_resources where type = %d", type);
ResultSet rs = null;
PreparedStatement pstmt = null;
try {
pstmt = conn.prepareStatement(sql);
rs = pstmt.executeQuery();

while (rs.next()) {
String fullName = rs.getString("full_name");
Boolean isDirectory = rs.getBoolean("is_directory");
long fileSize = rs.getLong("size");

if (StringUtils.isNotBlank(fullName) && !isDirectory) {
String[] splits = fullName.split("/");
for (int i = 1; i < splits.length; i++) {
String parentFullName = Joiner.on("/").join(Arrays.copyOfRange(splits,0, splits.length - i));
if (Strings.isNotEmpty(parentFullName)) {
long size = resourceSizeMap.getOrDefault(parentFullName, 0L);
resourceSizeMap.put(parentFullName, size + fileSize);
}
}
}
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
throw new RuntimeException("sql: " + sql, e);
} finally {
if (Objects.nonNull(pstmt)) {
try {
if (!pstmt.isClosed()) {
pstmt.close();
}
} catch (SQLException e) {
logger.error(e.getMessage(), e);
}
}
}
return resourceSizeMap;
}

/**
* update the folder's size
*
* @param conn connection
*/
public void updateResourceFolderSizeByFileType(Connection conn, int type) {
Map<String, Long> resourceSizeMap = listAllResourcesByFileType(conn, type);

String sql = "UPDATE t_ds_resources SET size=? where type=? and full_name=? and is_directory = true";
PreparedStatement pstmt = null;
try {
pstmt = conn.prepareStatement(sql);
for (Map.Entry<String, Long> entry : resourceSizeMap.entrySet()) {
pstmt.setLong(1, entry.getValue());
pstmt.setInt(2, type);
pstmt.setString(3, entry.getKey());
pstmt.addBatch();
}
pstmt.executeBatch();
} catch (Exception e) {
logger.error(e.getMessage(), e);
throw new RuntimeException("sql: " + sql, e);
} finally {
if (Objects.nonNull(pstmt)) {
try {
if (!pstmt.isClosed()) {
pstmt.close();
}
} catch (SQLException e) {
logger.error(e.getMessage(), e);
}
}
ConnectionUtils.releaseResource(conn);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.Priority;
import org.apache.dolphinscheduler.common.enums.TimeoutFlag;
import org.apache.dolphinscheduler.plugin.task.api.parameters.TaskTimeoutParameter;
import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils;
import org.apache.dolphinscheduler.common.utils.ConnectionUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
Expand All @@ -42,6 +41,7 @@
import org.apache.dolphinscheduler.dao.upgrade.SchemaUtils;
import org.apache.dolphinscheduler.dao.upgrade.WorkerGroupDao;
import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo;
import org.apache.dolphinscheduler.plugin.task.api.parameters.TaskTimeoutParameter;
import org.apache.dolphinscheduler.spi.enums.DbType;

import org.apache.commons.collections.CollectionUtils;
Expand Down Expand Up @@ -172,6 +172,21 @@ public void upgradeDolphinSchedulerTo200(String schemaDir) {
upgradeDolphinSchedulerDDL(schemaDir, "dolphinscheduler_ddl_post.sql");
}

/**
* upgrade DolphinScheduler to 2.0.6
*/
public void upgradeDolphinSchedulerResourceFileSize() {
ResourceDao resourceDao = new ResourceDao();
try {
// update the size of the folder that is the type of file.
resourceDao.updateResourceFolderSizeByFileType(dataSource.getConnection(), 0);
// update the size of the folder that is the type of udf.
resourceDao.updateResourceFolderSizeByFileType(dataSource.getConnection(), 1);
} catch (Exception ex) {
logger.error("Failed to upgrade because of failing to update the folder's size of resource files.");
}
}

/**
* updateProcessDefinitionJsonWorkerGroup
*/
Expand Down