Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

optimize dump all logic and optimize dump change interface #11658

Merged
merged 5 commits into from
Jan 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ public class ConfigInfoStateWrapper implements Serializable {

private long lastModified;

private String md5;

public long getId() {
return id;
}
Expand Down Expand Up @@ -86,11 +88,21 @@ public boolean equals(Object o) {
}
ConfigInfoStateWrapper that = (ConfigInfoStateWrapper) o;
return id == that.id && lastModified == that.lastModified && Objects.equals(dataId, that.dataId)
&& Objects.equals(group, that.group) && Objects.equals(tenant, that.tenant);
&& Objects.equals(group, that.group) && Objects.equals(tenant, that.tenant) && Objects.equals(md5,
that.md5);
}

@Override
public int hashCode() {
return Objects.hash(id, dataId, group, tenant, lastModified);
return Objects.hash(dataId, group, tenant);
}

public String getMd5() {
return md5;
}

public void setMd5(String md5) {
this.md5 = md5;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.util.concurrent.ConcurrentHashMap;

import static com.alibaba.nacos.config.server.constant.Constants.ENCODE_UTF8;
import static com.alibaba.nacos.config.server.constant.Constants.PERSIST_ENCODE;
import static com.alibaba.nacos.config.server.utils.LogUtil.DEFAULT_LOG;
import static com.alibaba.nacos.config.server.utils.LogUtil.DUMP_LOG;
import static com.alibaba.nacos.config.server.utils.LogUtil.FATAL_LOG;
Expand Down Expand Up @@ -72,6 +73,7 @@ public static int groupCount() {
* @param group group string value.
* @param tenant tenant string value.
* @param content content string value.
* @param md5 md5 of persist.
* @param lastModifiedTs lastModifiedTs.
* @param type file type.
* @return dumpChange success or not.
Expand Down Expand Up @@ -100,7 +102,7 @@ public static boolean dumpWithMd5(String dataId, String group, String tenant, St
boolean newLastModified = lastModifiedTs > ConfigCacheService.getLastModifiedTs(groupKey);

if (md5 == null) {
md5 = MD5Utils.md5Hex(content, ENCODE_UTF8);
md5 = MD5Utils.md5Hex(content, PERSIST_ENCODE);
}

//check md5 & update local disk cache.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

import com.alibaba.nacos.common.utils.MD5Utils;
import com.alibaba.nacos.config.server.constant.Constants;
import com.alibaba.nacos.config.server.model.ConfigInfo;
import com.alibaba.nacos.config.server.model.ConfigInfoStateWrapper;
import com.alibaba.nacos.config.server.model.ConfigInfoWrapper;
import com.alibaba.nacos.config.server.service.ConfigCacheService;
import com.alibaba.nacos.config.server.service.repository.ConfigInfoPersistService;
Expand Down Expand Up @@ -81,9 +81,9 @@ public void run() {
long deleteCursorId = 0L;

while (true) {
List<ConfigInfoWrapper> configDeleted = historyConfigInfoPersistService.findDeletedConfig(startTime,
List<ConfigInfoStateWrapper> configDeleted = historyConfigInfoPersistService.findDeletedConfig(startTime,
deleteCursorId, pageSize);
for (ConfigInfo configInfo : configDeleted) {
for (ConfigInfoStateWrapper configInfo : configDeleted) {
if (configInfoPersistService.findConfigInfoState(configInfo.getDataId(), configInfo.getGroup(),
configInfo.getTenant()) == null) {
ConfigCacheService.remove(configInfo.getDataId(), configInfo.getGroup(),
Expand All @@ -107,9 +107,9 @@ public void run() {
long changeCursorId = 0L;
while (true) {
LogUtil.DEFAULT_LOG.info("Check changed configs from time {},lastMaxId={}", startTime, changeCursorId);
List<ConfigInfoWrapper> changeConfigs = configInfoPersistService.findChangeConfig(startTime,
List<ConfigInfoStateWrapper> changeConfigs = configInfoPersistService.findChangeConfig(startTime,
changeCursorId, pageSize);
for (ConfigInfoWrapper cf : changeConfigs) {
for (ConfigInfoStateWrapper cf : changeConfigs) {
final String groupKey = GroupKey2.getKey(cf.getDataId(), cf.getGroup(), cf.getTenant());
//check md5 & localtimestamp update local disk cache.
boolean newLastModified = cf.getLastModified() > ConfigCacheService.getLastModifiedTs(groupKey);
Expand Down Expand Up @@ -151,7 +151,7 @@ public void run() {
} finally {
ConfigExecutor.scheduleConfigChangeTask(this, PropertyUtil.getDumpChangeWorkerInterval(),
TimeUnit.MILLISECONDS);
LogUtil.DEFAULT_LOG.info("Next dump change will scheduled after {} millseconds",
LogUtil.DEFAULT_LOG.info("Next dump change will scheduled after {} milliseconds",
PropertyUtil.getDumpChangeWorkerInterval());

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.sql.Timestamp;
import java.text.SimpleDateFormat;
import java.util.Calendar;
Expand Down Expand Up @@ -263,7 +262,7 @@ protected void dumpOperate() throws NacosException {
Timestamp currentTime = new Timestamp(System.currentTimeMillis());

try {
dumpConfigInfo(dumpAllProcessor);
dumpAllConfigInfoOnStartup(dumpAllProcessor);

// update Beta cache
LogUtil.DEFAULT_LOG.info("start clear all config-info-beta.");
Expand Down Expand Up @@ -324,12 +323,12 @@ protected void dumpOperate() throws NacosException {

}

private void dumpConfigInfo(DumpAllProcessor dumpAllProcessor) throws IOException {
private void dumpAllConfigInfoOnStartup(DumpAllProcessor dumpAllProcessor) {

try {
LogUtil.DEFAULT_LOG.info("start clear all config-info.");
ConfigDiskServiceFactory.getInstance().clearAll();
dumpAllProcessor.process(new DumpAllTask());
dumpAllProcessor.process(new DumpAllTask(true));
} catch (Exception e) {
LogUtil.FATAL_LOG.error("dump config fail" + e.getMessage());
throw e;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,24 @@
import com.alibaba.nacos.common.task.NacosTask;
import com.alibaba.nacos.common.task.NacosTaskProcessor;
import com.alibaba.nacos.common.utils.MD5Utils;
import com.alibaba.nacos.config.server.constant.Constants;
import com.alibaba.nacos.config.server.model.ConfigInfoWrapper;
import com.alibaba.nacos.config.server.service.AggrWhitelist;
import com.alibaba.nacos.config.server.service.ClientIpWhiteList;
import com.alibaba.nacos.config.server.service.ConfigCacheService;
import com.alibaba.nacos.config.server.service.SwitchService;
import com.alibaba.nacos.config.server.service.dump.task.DumpAllTask;
import com.alibaba.nacos.config.server.service.repository.ConfigInfoPersistService;
import com.alibaba.nacos.config.server.utils.GroupKey2;
import com.alibaba.nacos.config.server.utils.LogUtil;
import com.alibaba.nacos.config.server.utils.PropertyUtil;
import com.alibaba.nacos.persistence.model.Page;

import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import static com.alibaba.nacos.config.server.constant.Constants.ENCODE_UTF8;
import static com.alibaba.nacos.config.server.utils.LogUtil.DEFAULT_LOG;

/**
Expand All @@ -46,16 +53,59 @@ public DumpAllProcessor(ConfigInfoPersistService configInfoPersistService) {

@Override
public boolean process(NacosTask task) {
if (!(task instanceof DumpAllTask)) {
DEFAULT_LOG.error("[all-dump-error] ,invalid task type,DumpAllProcessor should process DumpAllTask type.");
return false;
}
DumpAllTask dumpAllTask = (DumpAllTask) task;

long currentMaxId = configInfoPersistService.findConfigMaxId();
long lastMaxId = 0;
ThreadPoolExecutor executorService = null;
if (dumpAllTask.isStartUp()) {
executorService = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(),
Runtime.getRuntime().availableProcessors(), 60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(PropertyUtil.getAllDumpPageSize() * 2),
r -> new Thread(r, "dump all executor"), new ThreadPoolExecutor.CallerRunsPolicy());
} else {
executorService = new ThreadPoolExecutor(1, 1, 60L, TimeUnit.SECONDS, new SynchronousQueue<>(),
r -> new Thread(r, "dump all executor"), new ThreadPoolExecutor.CallerRunsPolicy());
}

DEFAULT_LOG.info("start dump all config-info...");

while (lastMaxId < currentMaxId) {
Page<ConfigInfoWrapper> page = configInfoPersistService.findAllConfigInfoFragment(lastMaxId, PAGE_SIZE);

long start = System.currentTimeMillis();

Page<ConfigInfoWrapper> page = configInfoPersistService.findAllConfigInfoFragment(lastMaxId,
PropertyUtil.getAllDumpPageSize(), dumpAllTask.isStartUp());
long dbTimeStamp = System.currentTimeMillis();
if (page == null || page.getPageItems() == null || page.getPageItems().isEmpty()) {
break;
}

for (ConfigInfoWrapper cf : page.getPageItems()) {
long id = cf.getId();
lastMaxId = Math.max(id, lastMaxId);
lastMaxId = Math.max(cf.getId(), lastMaxId);
//if not start up, page query will not return content, check md5 and lastModified first ,if changed ,get single content info to dump.
if (!dumpAllTask.isStartUp()) {
final String groupKey = GroupKey2.getKey(cf.getDataId(), cf.getGroup(), cf.getTenant());
boolean newLastModified = cf.getLastModified() > ConfigCacheService.getLastModifiedTs(groupKey);
//check md5 & update local disk cache.
String localContentMd5 = ConfigCacheService.getContentMd5(groupKey);
boolean md5Update = !localContentMd5.equals(cf.getMd5());
if (newLastModified || md5Update) {
LogUtil.DUMP_LOG.info("[dump-all] find change config {}, {}, md5={}", groupKey,
cf.getLastModified(), cf.getMd5());
cf = configInfoPersistService.findConfigInfo(cf.getDataId(), cf.getGroup(), cf.getTenant());
} else {
continue;
}
}

if (cf == null) {
continue;
}
if (cf.getDataId().equals(AggrWhitelist.AGGRIDS_METADATA)) {
AggrWhitelist.load(cf.getContent());
}
Expand All @@ -67,22 +117,50 @@ public boolean process(NacosTask task) {
if (cf.getDataId().equals(SwitchService.SWITCH_META_DATA_ID)) {
SwitchService.load(cf.getContent());
}

ConfigCacheService.dump(cf.getDataId(), cf.getGroup(), cf.getTenant(), cf.getContent(),
cf.getLastModified(), cf.getType(), cf.getEncryptedDataKey());

final String content = cf.getContent();
final String md5 = MD5Utils.md5Hex(content, Constants.ENCODE);
LogUtil.DUMP_LOG.info("[dump-all-ok] {}, {}, length={}, md5={}",
GroupKey2.getKey(cf.getDataId(), cf.getGroup()), cf.getLastModified(), content.length(),
md5);
final String dataId = cf.getDataId();
final String group = cf.getGroup();
final String tenant = cf.getTenant();
final long lastModified = cf.getLastModified();
final String type = cf.getType();
final String encryptedDataKey = cf.getEncryptedDataKey();

executorService.execute(() -> {
final String md5Utf8 = MD5Utils.md5Hex(content, ENCODE_UTF8);
boolean result = ConfigCacheService.dumpWithMd5(dataId, group, tenant, content, md5Utf8,
lastModified, type, encryptedDataKey);
if (result) {
LogUtil.DUMP_LOG.info("[dump-all-ok] {}, {}, length={},md5UTF8={}",
GroupKey2.getKey(dataId, group), lastModified, content.length(), md5Utf8);
} else {
LogUtil.DUMP_LOG.info("[dump-all-error] {}", GroupKey2.getKey(dataId, group));
}

});

}
DEFAULT_LOG.info("[all-dump] {} / {}", lastMaxId, currentMaxId);

long diskStamp = System.currentTimeMillis();
DEFAULT_LOG.info("[all-dump] submit all task for {} / {}, dbTime={},diskTime={}", lastMaxId, currentMaxId,
(dbTimeStamp - start), (diskStamp - dbTimeStamp));
}

//wait all task are finished and then shutdown executor.
try {
int unfinishedTaskCount = 0;
while ((unfinishedTaskCount = executorService.getQueue().size() + executorService.getActiveCount()) > 0) {
DEFAULT_LOG.info("[all-dump] wait {} dump tasks to be finished", unfinishedTaskCount);
Thread.sleep(1000L);
}
executorService.shutdown();

} catch (Exception e) {
DEFAULT_LOG.error("[all-dump] wait dump tasks to be finished error", e);
}
DEFAULT_LOG.info("success to dump all config-info。");
return true;
}

static final int PAGE_SIZE = 1000;

final ConfigInfoPersistService configInfoPersistService;
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,19 @@
*/
public class DumpAllTask extends AbstractDelayTask {

private boolean startUp;

public DumpAllTask() {
}

public DumpAllTask(boolean startUp) {
this.startUp = startUp;
}

public boolean isStartUp() {
return startUp;
}

@Override
public void merge(AbstractDelayTask task) {
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ void updateConfigInfoAtomic(final ConfigInfo configInfo, final String srcIp, fin
* @return config max id
*/
long findConfigMaxId();

/**
* Query configuration information by primary key ID.
*
Expand Down Expand Up @@ -315,11 +315,12 @@ Page<ConfigInfo> findConfigInfo4Page(final int pageNo, final int pageSize, final
/**
* Query all config info.
*
* @param lastMaxId last max id
* @param pageSize page size
* @param lastMaxId last max id
* @param pageSize page size
* @param needContent need content or not.
* @return {@link Page} with {@link ConfigInfoWrapper} generation
*/
Page<ConfigInfoWrapper> findAllConfigInfoFragment(final long lastMaxId, final int pageSize);
Page<ConfigInfoWrapper> findAllConfigInfoFragment(final long lastMaxId, final int pageSize, boolean needContent);

/**
* Query config info.
Expand All @@ -334,7 +335,7 @@ Page<ConfigInfo> findConfigInfo4Page(final int pageNo, final int pageSize, final
*/
Page<ConfigInfo> findConfigInfoLike4Page(final int pageNo, final int pageSize, final String dataId,
final String group, final String tenant, final Map<String, Object> configAdvanceInfo);

/**
* Query change config.order by id asc.
*
Expand All @@ -343,7 +344,7 @@ Page<ConfigInfo> findConfigInfoLike4Page(final int pageNo, final int pageSize, f
* @param pageSize pageSize
* @return {@link ConfigInfoWrapper} list
*/
List<ConfigInfoWrapper> findChangeConfig(final Timestamp startTime, long lastMaxId, final int pageSize);
List<ConfigInfoStateWrapper> findChangeConfig(final Timestamp startTime, long lastMaxId, final int pageSize);

/**
* Query tag list.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,11 @@ public ConfigInfoStateWrapper mapRow(ResultSet rs, int rowNum) throws SQLExcepti
info.setGroup(rs.getString("group_id"));
info.setTenant(rs.getString("tenant_id"));
info.setLastModified(rs.getTimestamp("gmt_modified").getTime());

try {
info.setMd5(rs.getString("md5"));
} catch (SQLException e) {
// ignore
}
try {
info.setId(rs.getLong("id"));
} catch (SQLException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,12 @@

import com.alibaba.nacos.config.server.model.ConfigHistoryInfo;
import com.alibaba.nacos.config.server.model.ConfigInfo;
import com.alibaba.nacos.config.server.model.ConfigInfoWrapper;
import com.alibaba.nacos.config.server.model.ConfigInfoStateWrapper;
import com.alibaba.nacos.persistence.model.Page;
import com.alibaba.nacos.persistence.repository.PaginationHelper;

import java.sql.Timestamp;
import java.util.List;
import java.util.Map;

/**
* Database service, providing access to his_config_info in the database.
Expand All @@ -41,14 +40,6 @@ public interface HistoryConfigInfoPersistService {
*/
<E> PaginationHelper<E> createPaginationHelper();

/**
* Convert delete config.
*
* @param list origin data
* @return {@link ConfigInfo} list
*/
List<ConfigInfoWrapper> convertDeletedConfig(List<Map<String, Object>> list);

//------------------------------------------insert---------------------------------------------//

/**
Expand Down Expand Up @@ -81,9 +72,9 @@ void insertConfigHistoryAtomic(long id, ConfigInfo configInfo, String srcIp, Str
* @param startTime start time
* @param startId last max id
* @param size page size
* @return {@link ConfigInfo} list
* @return {@link ConfigInfoStateWrapper} list
*/
List<ConfigInfoWrapper> findDeletedConfig(final Timestamp startTime, final long startId, int size);
List<ConfigInfoStateWrapper> findDeletedConfig(final Timestamp startTime, final long startId, int size);

/**
* List configuration history change record.
Expand Down
Loading
Loading