diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ResourcesServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ResourcesServiceImpl.java index 398d182adf37..7d43c589729b 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ResourcesServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ResourcesServiceImpl.java @@ -33,7 +33,6 @@ import org.apache.dolphinscheduler.api.utils.Result; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.ProgramType; -import org.apache.dolphinscheduler.spi.enums.ResourceType; import org.apache.dolphinscheduler.common.utils.FileUtils; import org.apache.dolphinscheduler.common.utils.HadoopUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils; @@ -50,6 +49,7 @@ import org.apache.dolphinscheduler.dao.mapper.UdfFuncMapper; import org.apache.dolphinscheduler.dao.mapper.UserMapper; import org.apache.dolphinscheduler.dao.utils.ResourceProcessDefinitionUtils; +import org.apache.dolphinscheduler.spi.enums.ResourceType; import org.apache.commons.beanutils.BeanMap; import org.apache.commons.collections.CollectionUtils; @@ -81,6 +81,7 @@ import com.baomidou.mybatisplus.core.metadata.IPage; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import com.fasterxml.jackson.databind.SerializationFeature; +import com.google.common.base.Joiner; import com.google.common.io.Files; /** @@ -221,6 +222,7 @@ public Result createResource(User loginUser, try { resourcesMapper.insert(resource); + updateParentResourceSize(resource, resource.getSize()); putMsg(result, Status.SUCCESS); Map dataMap = new BeanMap(resource); Map resultMap = new HashMap<>(); @@ -244,6 +246,33 @@ public Result createResource(User loginUser, return result; } + /** + * update the folder's size of the resource + * + * @param resource the current resource + * @param size size + */ + private void updateParentResourceSize(Resource resource, long size) { + if (resource.getSize() > 0) { + String[] splits = resource.getFullName().split("/"); + for (int i = 1; i < splits.length; i++) { + String parentFullName = Joiner.on("/").join(Arrays.copyOfRange(splits, 0, i)); + if (StringUtils.isNotBlank(parentFullName)) { + List resources = resourcesMapper.queryResource(parentFullName, resource.getType().ordinal()); + if (CollectionUtils.isNotEmpty(resources)) { + Resource parentResource = resources.get(0); + if (parentResource.getSize() + size >= 0) { + parentResource.setSize(parentResource.getSize() + size); + } else { + parentResource.setSize(0L); + } + resourcesMapper.updateById(parentResource); + } + } + } + } + } + /** * check resource is exists * @@ -360,6 +389,7 @@ public Result updateResource(User loginUser, // updateResource data Date now = new Date(); + long originFileSize = resource.getSize(); resource.setAlias(name); resource.setFileName(name); @@ -445,6 +475,8 @@ public Result updateResource(User loginUser, throw new ServiceException(String.format("delete resource: %s failed.", originFullName)); } } + + updateParentResourceSize(resource, resource.getSize() - originFileSize); return result; } @@ -727,11 +759,15 @@ public Result delete(User loginUser, int resourceId) throws IOException String hdfsFilename = HadoopUtils.getHdfsFileName(resource.getType(), tenantCode, resource.getFullName()); //delete data in database + resourcesMapper.selectBatchIds(Arrays.asList(needDeleteResourceIdArray)).forEach(item -> { + updateParentResourceSize(item, item.getSize() * -1); + }); resourcesMapper.deleteIds(needDeleteResourceIdArray); resourceUserMapper.deleteResourceUserArray(0, needDeleteResourceIdArray); //delete file on hdfs HadoopUtils.getInstance().delete(hdfsFilename, true); + putMsg(result, Status.SUCCESS); return result; @@ -941,6 +977,7 @@ public Result onlineCreateResource(User loginUser, ResourceType type, St Resource resource = new Resource(pid,name,fullName,false,desc,name,loginUser.getId(),type,content.getBytes().length,now,now); resourcesMapper.insert(resource); + updateParentResourceSize(resource, resource.getSize()); putMsg(result, Status.SUCCESS); Map dataMap = new BeanMap(resource); @@ -1035,10 +1072,13 @@ public Result updateResourceContent(int resourceId, String content) { if (StringUtils.isEmpty(tenantCode)) { return result; } + long originFileSize = resource.getSize(); resource.setSize(content.getBytes().length); resource.setUpdateTime(new Date()); resourcesMapper.updateById(resource); + updateParentResourceSize(resource, resource.getSize() - originFileSize); + result = uploadContentToHdfs(resource.getFullName(), tenantCode, content); if (!result.getCode().equals(Status.SUCCESS.getCode())) { throw new ServiceException(result.getMsg()); diff --git a/dolphinscheduler-tools/src/main/java/org/apache/dolphinscheduler/tools/datasource/DolphinSchedulerManager.java b/dolphinscheduler-tools/src/main/java/org/apache/dolphinscheduler/tools/datasource/DolphinSchedulerManager.java index 783022008617..a588fabcfb1f 100644 --- a/dolphinscheduler-tools/src/main/java/org/apache/dolphinscheduler/tools/datasource/DolphinSchedulerManager.java +++ b/dolphinscheduler-tools/src/main/java/org/apache/dolphinscheduler/tools/datasource/DolphinSchedulerManager.java @@ -77,6 +77,7 @@ public void initDolphinSchedulerSchema() { logger.info("Start initializing the DolphinScheduler manager table structure"); upgradeDao.initSchema(); } + public void upgradeDolphinScheduler() throws IOException { // Gets a list of all upgrades List schemaList = SchemaUtils.getAllSchemaList(); @@ -99,12 +100,13 @@ public void upgradeDolphinScheduler() throws IOException { } // The target version of the upgrade String schemaVersion = ""; + String currentVersion = version; for (String schemaDir : schemaList) { schemaVersion = schemaDir.split("_")[0]; if (SchemaUtils.isAGreatVersion(schemaVersion, version)) { logger.info("upgrade DolphinScheduler metadata version from {} to {}", version, schemaVersion); logger.info("Begin upgrading DolphinScheduler's table structure"); - upgradeDao.upgradeDolphinScheduler(schemaDir); + upgradeDao.upgradeDolphinScheduler(schemaDir); if ("1.3.0".equals(schemaVersion)) { upgradeDao.upgradeDolphinSchedulerWorkerGroup(); } else if ("1.3.2".equals(schemaVersion)) { @@ -115,6 +117,10 @@ public void upgradeDolphinScheduler() throws IOException { version = schemaVersion; } } + + if (SchemaUtils.isAGreatVersion("2.0.6", currentVersion) && SchemaUtils.isAGreatVersion(SchemaUtils.getSoftVersion(), currentVersion)) { + upgradeDao.upgradeDolphinSchedulerResourceFileSize(); + } } // Assign the value of the version field in the version table to the version of the product diff --git a/dolphinscheduler-tools/src/main/java/org/apache/dolphinscheduler/tools/datasource/dao/ResourceDao.java b/dolphinscheduler-tools/src/main/java/org/apache/dolphinscheduler/tools/datasource/dao/ResourceDao.java index 4ddb1a1eaaa7..e4e8d130a086 100644 --- a/dolphinscheduler-tools/src/main/java/org/apache/dolphinscheduler/tools/datasource/dao/ResourceDao.java +++ b/dolphinscheduler-tools/src/main/java/org/apache/dolphinscheduler/tools/datasource/dao/ResourceDao.java @@ -17,17 +17,25 @@ package org.apache.dolphinscheduler.tools.datasource.dao; +import java.sql.SQLException; +import java.util.Objects; import org.apache.dolphinscheduler.common.utils.ConnectionUtils; +import org.apache.dolphinscheduler.spi.utils.StringUtils; + +import org.apache.directory.api.util.Strings; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; +import java.util.Arrays; import java.util.HashMap; import java.util.Map; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.base.Joiner; + /** * resource dao */ @@ -66,4 +74,89 @@ Map listAllResources(Connection conn) { return resourceMap; } + /** + * list all resources by the type + * + * @param conn connection + * @return map that key is full_name and value is the folder's size + */ + private Map listAllResourcesByFileType(Connection conn, int type) { + Map resourceSizeMap = new HashMap<>(); + + String sql = String.format("SELECT full_name, type, size, is_directory FROM t_ds_resources where type = %d", type); + ResultSet rs = null; + PreparedStatement pstmt = null; + try { + pstmt = conn.prepareStatement(sql); + rs = pstmt.executeQuery(); + + while (rs.next()) { + String fullName = rs.getString("full_name"); + Boolean isDirectory = rs.getBoolean("is_directory"); + long fileSize = rs.getLong("size"); + + if (StringUtils.isNotBlank(fullName) && !isDirectory) { + String[] splits = fullName.split("/"); + for (int i = 1; i < splits.length; i++) { + String parentFullName = Joiner.on("/").join(Arrays.copyOfRange(splits,0, splits.length - i)); + if (Strings.isNotEmpty(parentFullName)) { + long size = resourceSizeMap.getOrDefault(parentFullName, 0L); + resourceSizeMap.put(parentFullName, size + fileSize); + } + } + } + } + } catch (Exception e) { + logger.error(e.getMessage(), e); + throw new RuntimeException("sql: " + sql, e); + } finally { + if (Objects.nonNull(pstmt)) { + try { + if (!pstmt.isClosed()) { + pstmt.close(); + } + } catch (SQLException e) { + logger.error(e.getMessage(), e); + } + } + } + return resourceSizeMap; + } + + /** + * update the folder's size + * + * @param conn connection + */ + public void updateResourceFolderSizeByFileType(Connection conn, int type) { + Map resourceSizeMap = listAllResourcesByFileType(conn, type); + + String sql = "UPDATE t_ds_resources SET size=? where type=? and full_name=? and is_directory = true"; + PreparedStatement pstmt = null; + try { + pstmt = conn.prepareStatement(sql); + for (Map.Entry entry : resourceSizeMap.entrySet()) { + pstmt.setLong(1, entry.getValue()); + pstmt.setInt(2, type); + pstmt.setString(3, entry.getKey()); + pstmt.addBatch(); + } + pstmt.executeBatch(); + } catch (Exception e) { + logger.error(e.getMessage(), e); + throw new RuntimeException("sql: " + sql, e); + } finally { + if (Objects.nonNull(pstmt)) { + try { + if (!pstmt.isClosed()) { + pstmt.close(); + } + } catch (SQLException e) { + logger.error(e.getMessage(), e); + } + } + ConnectionUtils.releaseResource(conn); + } + } + } diff --git a/dolphinscheduler-tools/src/main/java/org/apache/dolphinscheduler/tools/datasource/dao/UpgradeDao.java b/dolphinscheduler-tools/src/main/java/org/apache/dolphinscheduler/tools/datasource/dao/UpgradeDao.java index 06b8aab12c70..8d08dd5e4392 100644 --- a/dolphinscheduler-tools/src/main/java/org/apache/dolphinscheduler/tools/datasource/dao/UpgradeDao.java +++ b/dolphinscheduler-tools/src/main/java/org/apache/dolphinscheduler/tools/datasource/dao/UpgradeDao.java @@ -26,7 +26,6 @@ import org.apache.dolphinscheduler.common.enums.Flag; import org.apache.dolphinscheduler.common.enums.Priority; import org.apache.dolphinscheduler.common.enums.TimeoutFlag; -import org.apache.dolphinscheduler.plugin.task.api.parameters.TaskTimeoutParameter; import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils; import org.apache.dolphinscheduler.common.utils.ConnectionUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils; @@ -42,6 +41,7 @@ import org.apache.dolphinscheduler.dao.upgrade.SchemaUtils; import org.apache.dolphinscheduler.dao.upgrade.WorkerGroupDao; import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo; +import org.apache.dolphinscheduler.plugin.task.api.parameters.TaskTimeoutParameter; import org.apache.dolphinscheduler.spi.enums.DbType; import org.apache.commons.collections.CollectionUtils; @@ -172,6 +172,21 @@ public void upgradeDolphinSchedulerTo200(String schemaDir) { upgradeDolphinSchedulerDDL(schemaDir, "dolphinscheduler_ddl_post.sql"); } + /** + * upgrade DolphinScheduler to 2.0.6 + */ + public void upgradeDolphinSchedulerResourceFileSize() { + ResourceDao resourceDao = new ResourceDao(); + try { + // update the size of the folder that is the type of file. + resourceDao.updateResourceFolderSizeByFileType(dataSource.getConnection(), 0); + // update the size of the folder that is the type of udf. + resourceDao.updateResourceFolderSizeByFileType(dataSource.getConnection(), 1); + } catch (Exception ex) { + logger.error("Failed to upgrade because of failing to update the folder's size of resource files."); + } + } + /** * updateProcessDefinitionJsonWorkerGroup */