Skip to content

Commit

Permalink
optimize dump all logic and optimize dump change interface (alibaba#1…
Browse files Browse the repository at this point in the history
…1658)

* optimize config dump all,do not select content in page query,single query config on md5 and ts updated.

* 1.optimize config dump all,do not select content in page query,single query config on md5 and ts updated.
2. remove convertDeleteConfig and convertConfigConfig ,use standard RowMapper instead.

* remove unecessary code

* use context param instead of adding plugin method params

* bug and test fix
  • Loading branch information
shiyiyue1102 authored Jan 19, 2024
1 parent 9722ad8 commit 849393c
Show file tree
Hide file tree
Showing 29 changed files with 628 additions and 252 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ public class ConfigInfoStateWrapper implements Serializable {

private long lastModified;

private String md5;

public long getId() {
return id;
}
Expand Down Expand Up @@ -86,11 +88,21 @@ public boolean equals(Object o) {
}
ConfigInfoStateWrapper that = (ConfigInfoStateWrapper) o;
return id == that.id && lastModified == that.lastModified && Objects.equals(dataId, that.dataId)
&& Objects.equals(group, that.group) && Objects.equals(tenant, that.tenant);
&& Objects.equals(group, that.group) && Objects.equals(tenant, that.tenant) && Objects.equals(md5,
that.md5);
}

@Override
public int hashCode() {
return Objects.hash(id, dataId, group, tenant, lastModified);
return Objects.hash(dataId, group, tenant);
}

public String getMd5() {
return md5;
}

public void setMd5(String md5) {
this.md5 = md5;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.util.concurrent.ConcurrentHashMap;

import static com.alibaba.nacos.config.server.constant.Constants.ENCODE_UTF8;
import static com.alibaba.nacos.config.server.constant.Constants.PERSIST_ENCODE;
import static com.alibaba.nacos.config.server.utils.LogUtil.DEFAULT_LOG;
import static com.alibaba.nacos.config.server.utils.LogUtil.DUMP_LOG;
import static com.alibaba.nacos.config.server.utils.LogUtil.FATAL_LOG;
Expand Down Expand Up @@ -72,6 +73,7 @@ public static int groupCount() {
* @param group group string value.
* @param tenant tenant string value.
* @param content content string value.
* @param md5 md5 of persist.
* @param lastModifiedTs lastModifiedTs.
* @param type file type.
* @return dumpChange success or not.
Expand Down Expand Up @@ -100,7 +102,7 @@ public static boolean dumpWithMd5(String dataId, String group, String tenant, St
boolean newLastModified = lastModifiedTs > ConfigCacheService.getLastModifiedTs(groupKey);

if (md5 == null) {
md5 = MD5Utils.md5Hex(content, ENCODE_UTF8);
md5 = MD5Utils.md5Hex(content, PERSIST_ENCODE);
}

//check md5 & update local disk cache.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

import com.alibaba.nacos.common.utils.MD5Utils;
import com.alibaba.nacos.config.server.constant.Constants;
import com.alibaba.nacos.config.server.model.ConfigInfo;
import com.alibaba.nacos.config.server.model.ConfigInfoStateWrapper;
import com.alibaba.nacos.config.server.model.ConfigInfoWrapper;
import com.alibaba.nacos.config.server.service.ConfigCacheService;
import com.alibaba.nacos.config.server.service.repository.ConfigInfoPersistService;
Expand Down Expand Up @@ -81,9 +81,9 @@ public void run() {
long deleteCursorId = 0L;

while (true) {
List<ConfigInfoWrapper> configDeleted = historyConfigInfoPersistService.findDeletedConfig(startTime,
List<ConfigInfoStateWrapper> configDeleted = historyConfigInfoPersistService.findDeletedConfig(startTime,
deleteCursorId, pageSize);
for (ConfigInfo configInfo : configDeleted) {
for (ConfigInfoStateWrapper configInfo : configDeleted) {
if (configInfoPersistService.findConfigInfoState(configInfo.getDataId(), configInfo.getGroup(),
configInfo.getTenant()) == null) {
ConfigCacheService.remove(configInfo.getDataId(), configInfo.getGroup(),
Expand All @@ -107,9 +107,9 @@ public void run() {
long changeCursorId = 0L;
while (true) {
LogUtil.DEFAULT_LOG.info("Check changed configs from time {},lastMaxId={}", startTime, changeCursorId);
List<ConfigInfoWrapper> changeConfigs = configInfoPersistService.findChangeConfig(startTime,
List<ConfigInfoStateWrapper> changeConfigs = configInfoPersistService.findChangeConfig(startTime,
changeCursorId, pageSize);
for (ConfigInfoWrapper cf : changeConfigs) {
for (ConfigInfoStateWrapper cf : changeConfigs) {
final String groupKey = GroupKey2.getKey(cf.getDataId(), cf.getGroup(), cf.getTenant());
//check md5 & localtimestamp update local disk cache.
boolean newLastModified = cf.getLastModified() > ConfigCacheService.getLastModifiedTs(groupKey);
Expand Down Expand Up @@ -151,7 +151,7 @@ public void run() {
} finally {
ConfigExecutor.scheduleConfigChangeTask(this, PropertyUtil.getDumpChangeWorkerInterval(),
TimeUnit.MILLISECONDS);
LogUtil.DEFAULT_LOG.info("Next dump change will scheduled after {} millseconds",
LogUtil.DEFAULT_LOG.info("Next dump change will scheduled after {} milliseconds",
PropertyUtil.getDumpChangeWorkerInterval());

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.sql.Timestamp;
import java.text.SimpleDateFormat;
import java.util.Calendar;
Expand Down Expand Up @@ -263,7 +262,7 @@ protected void dumpOperate() throws NacosException {
Timestamp currentTime = new Timestamp(System.currentTimeMillis());

try {
dumpConfigInfo(dumpAllProcessor);
dumpAllConfigInfoOnStartup(dumpAllProcessor);

// update Beta cache
LogUtil.DEFAULT_LOG.info("start clear all config-info-beta.");
Expand Down Expand Up @@ -324,12 +323,12 @@ protected void dumpOperate() throws NacosException {

}

private void dumpConfigInfo(DumpAllProcessor dumpAllProcessor) throws IOException {
private void dumpAllConfigInfoOnStartup(DumpAllProcessor dumpAllProcessor) {

try {
LogUtil.DEFAULT_LOG.info("start clear all config-info.");
ConfigDiskServiceFactory.getInstance().clearAll();
dumpAllProcessor.process(new DumpAllTask());
dumpAllProcessor.process(new DumpAllTask(true));
} catch (Exception e) {
LogUtil.FATAL_LOG.error("dump config fail" + e.getMessage());
throw e;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,24 @@
import com.alibaba.nacos.common.task.NacosTask;
import com.alibaba.nacos.common.task.NacosTaskProcessor;
import com.alibaba.nacos.common.utils.MD5Utils;
import com.alibaba.nacos.config.server.constant.Constants;
import com.alibaba.nacos.config.server.model.ConfigInfoWrapper;
import com.alibaba.nacos.config.server.service.AggrWhitelist;
import com.alibaba.nacos.config.server.service.ClientIpWhiteList;
import com.alibaba.nacos.config.server.service.ConfigCacheService;
import com.alibaba.nacos.config.server.service.SwitchService;
import com.alibaba.nacos.config.server.service.dump.task.DumpAllTask;
import com.alibaba.nacos.config.server.service.repository.ConfigInfoPersistService;
import com.alibaba.nacos.config.server.utils.GroupKey2;
import com.alibaba.nacos.config.server.utils.LogUtil;
import com.alibaba.nacos.config.server.utils.PropertyUtil;
import com.alibaba.nacos.persistence.model.Page;

import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import static com.alibaba.nacos.config.server.constant.Constants.ENCODE_UTF8;
import static com.alibaba.nacos.config.server.utils.LogUtil.DEFAULT_LOG;

/**
Expand All @@ -46,16 +53,59 @@ public DumpAllProcessor(ConfigInfoPersistService configInfoPersistService) {

@Override
public boolean process(NacosTask task) {
if (!(task instanceof DumpAllTask)) {
DEFAULT_LOG.error("[all-dump-error] ,invalid task type,DumpAllProcessor should process DumpAllTask type.");
return false;
}
DumpAllTask dumpAllTask = (DumpAllTask) task;

long currentMaxId = configInfoPersistService.findConfigMaxId();
long lastMaxId = 0;
ThreadPoolExecutor executorService = null;
if (dumpAllTask.isStartUp()) {
executorService = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(),
Runtime.getRuntime().availableProcessors(), 60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(PropertyUtil.getAllDumpPageSize() * 2),
r -> new Thread(r, "dump all executor"), new ThreadPoolExecutor.CallerRunsPolicy());
} else {
executorService = new ThreadPoolExecutor(1, 1, 60L, TimeUnit.SECONDS, new SynchronousQueue<>(),
r -> new Thread(r, "dump all executor"), new ThreadPoolExecutor.CallerRunsPolicy());
}

DEFAULT_LOG.info("start dump all config-info...");

while (lastMaxId < currentMaxId) {
Page<ConfigInfoWrapper> page = configInfoPersistService.findAllConfigInfoFragment(lastMaxId, PAGE_SIZE);

long start = System.currentTimeMillis();

Page<ConfigInfoWrapper> page = configInfoPersistService.findAllConfigInfoFragment(lastMaxId,
PropertyUtil.getAllDumpPageSize(), dumpAllTask.isStartUp());
long dbTimeStamp = System.currentTimeMillis();
if (page == null || page.getPageItems() == null || page.getPageItems().isEmpty()) {
break;
}

for (ConfigInfoWrapper cf : page.getPageItems()) {
long id = cf.getId();
lastMaxId = Math.max(id, lastMaxId);
lastMaxId = Math.max(cf.getId(), lastMaxId);
//if not start up, page query will not return content, check md5 and lastModified first ,if changed ,get single content info to dump.
if (!dumpAllTask.isStartUp()) {
final String groupKey = GroupKey2.getKey(cf.getDataId(), cf.getGroup(), cf.getTenant());
boolean newLastModified = cf.getLastModified() > ConfigCacheService.getLastModifiedTs(groupKey);
//check md5 & update local disk cache.
String localContentMd5 = ConfigCacheService.getContentMd5(groupKey);
boolean md5Update = !localContentMd5.equals(cf.getMd5());
if (newLastModified || md5Update) {
LogUtil.DUMP_LOG.info("[dump-all] find change config {}, {}, md5={}", groupKey,
cf.getLastModified(), cf.getMd5());
cf = configInfoPersistService.findConfigInfo(cf.getDataId(), cf.getGroup(), cf.getTenant());
} else {
continue;
}
}

if (cf == null) {
continue;
}
if (cf.getDataId().equals(AggrWhitelist.AGGRIDS_METADATA)) {
AggrWhitelist.load(cf.getContent());
}
Expand All @@ -67,22 +117,50 @@ public boolean process(NacosTask task) {
if (cf.getDataId().equals(SwitchService.SWITCH_META_DATA_ID)) {
SwitchService.load(cf.getContent());
}

ConfigCacheService.dump(cf.getDataId(), cf.getGroup(), cf.getTenant(), cf.getContent(),
cf.getLastModified(), cf.getType(), cf.getEncryptedDataKey());

final String content = cf.getContent();
final String md5 = MD5Utils.md5Hex(content, Constants.ENCODE);
LogUtil.DUMP_LOG.info("[dump-all-ok] {}, {}, length={}, md5={}",
GroupKey2.getKey(cf.getDataId(), cf.getGroup()), cf.getLastModified(), content.length(),
md5);
final String dataId = cf.getDataId();
final String group = cf.getGroup();
final String tenant = cf.getTenant();
final long lastModified = cf.getLastModified();
final String type = cf.getType();
final String encryptedDataKey = cf.getEncryptedDataKey();

executorService.execute(() -> {
final String md5Utf8 = MD5Utils.md5Hex(content, ENCODE_UTF8);
boolean result = ConfigCacheService.dumpWithMd5(dataId, group, tenant, content, md5Utf8,
lastModified, type, encryptedDataKey);
if (result) {
LogUtil.DUMP_LOG.info("[dump-all-ok] {}, {}, length={},md5UTF8={}",
GroupKey2.getKey(dataId, group), lastModified, content.length(), md5Utf8);
} else {
LogUtil.DUMP_LOG.info("[dump-all-error] {}", GroupKey2.getKey(dataId, group));
}

});

}
DEFAULT_LOG.info("[all-dump] {} / {}", lastMaxId, currentMaxId);

long diskStamp = System.currentTimeMillis();
DEFAULT_LOG.info("[all-dump] submit all task for {} / {}, dbTime={},diskTime={}", lastMaxId, currentMaxId,
(dbTimeStamp - start), (diskStamp - dbTimeStamp));
}

//wait all task are finished and then shutdown executor.
try {
int unfinishedTaskCount = 0;
while ((unfinishedTaskCount = executorService.getQueue().size() + executorService.getActiveCount()) > 0) {
DEFAULT_LOG.info("[all-dump] wait {} dump tasks to be finished", unfinishedTaskCount);
Thread.sleep(1000L);
}
executorService.shutdown();

} catch (Exception e) {
DEFAULT_LOG.error("[all-dump] wait dump tasks to be finished error", e);
}
DEFAULT_LOG.info("success to dump all config-info。");
return true;
}

static final int PAGE_SIZE = 1000;

final ConfigInfoPersistService configInfoPersistService;
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,19 @@
*/
public class DumpAllTask extends AbstractDelayTask {

private boolean startUp;

public DumpAllTask() {
}

public DumpAllTask(boolean startUp) {
this.startUp = startUp;
}

public boolean isStartUp() {
return startUp;
}

@Override
public void merge(AbstractDelayTask task) {
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ void updateConfigInfoAtomic(final ConfigInfo configInfo, final String srcIp, fin
* @return config max id
*/
long findConfigMaxId();

/**
* Query configuration information by primary key ID.
*
Expand Down Expand Up @@ -315,11 +315,12 @@ Page<ConfigInfo> findConfigInfo4Page(final int pageNo, final int pageSize, final
/**
* Query all config info.
*
* @param lastMaxId last max id
* @param pageSize page size
* @param lastMaxId last max id
* @param pageSize page size
* @param needContent need content or not.
* @return {@link Page} with {@link ConfigInfoWrapper} generation
*/
Page<ConfigInfoWrapper> findAllConfigInfoFragment(final long lastMaxId, final int pageSize);
Page<ConfigInfoWrapper> findAllConfigInfoFragment(final long lastMaxId, final int pageSize, boolean needContent);

/**
* Query config info.
Expand All @@ -334,7 +335,7 @@ Page<ConfigInfo> findConfigInfo4Page(final int pageNo, final int pageSize, final
*/
Page<ConfigInfo> findConfigInfoLike4Page(final int pageNo, final int pageSize, final String dataId,
final String group, final String tenant, final Map<String, Object> configAdvanceInfo);

/**
* Query change config.order by id asc.
*
Expand All @@ -343,7 +344,7 @@ Page<ConfigInfo> findConfigInfoLike4Page(final int pageNo, final int pageSize, f
* @param pageSize pageSize
* @return {@link ConfigInfoWrapper} list
*/
List<ConfigInfoWrapper> findChangeConfig(final Timestamp startTime, long lastMaxId, final int pageSize);
List<ConfigInfoStateWrapper> findChangeConfig(final Timestamp startTime, long lastMaxId, final int pageSize);

/**
* Query tag list.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,11 @@ public ConfigInfoStateWrapper mapRow(ResultSet rs, int rowNum) throws SQLExcepti
info.setGroup(rs.getString("group_id"));
info.setTenant(rs.getString("tenant_id"));
info.setLastModified(rs.getTimestamp("gmt_modified").getTime());

try {
info.setMd5(rs.getString("md5"));
} catch (SQLException e) {
// ignore
}
try {
info.setId(rs.getLong("id"));
} catch (SQLException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,12 @@

import com.alibaba.nacos.config.server.model.ConfigHistoryInfo;
import com.alibaba.nacos.config.server.model.ConfigInfo;
import com.alibaba.nacos.config.server.model.ConfigInfoWrapper;
import com.alibaba.nacos.config.server.model.ConfigInfoStateWrapper;
import com.alibaba.nacos.persistence.model.Page;
import com.alibaba.nacos.persistence.repository.PaginationHelper;

import java.sql.Timestamp;
import java.util.List;
import java.util.Map;

/**
* Database service, providing access to his_config_info in the database.
Expand All @@ -41,14 +40,6 @@ public interface HistoryConfigInfoPersistService {
*/
<E> PaginationHelper<E> createPaginationHelper();

/**
* Convert delete config.
*
* @param list origin data
* @return {@link ConfigInfo} list
*/
List<ConfigInfoWrapper> convertDeletedConfig(List<Map<String, Object>> list);

//------------------------------------------insert---------------------------------------------//

/**
Expand Down Expand Up @@ -81,9 +72,9 @@ void insertConfigHistoryAtomic(long id, ConfigInfo configInfo, String srcIp, Str
* @param startTime start time
* @param startId last max id
* @param size page size
* @return {@link ConfigInfo} list
* @return {@link ConfigInfoStateWrapper} list
*/
List<ConfigInfoWrapper> findDeletedConfig(final Timestamp startTime, final long startId, int size);
List<ConfigInfoStateWrapper> findDeletedConfig(final Timestamp startTime, final long startId, int size);

/**
* List configuration history change record.
Expand Down
Loading

0 comments on commit 849393c

Please sign in to comment.