Skip to content

Commit

Permalink
Trim translog for closed indices (#43156)
Browse files Browse the repository at this point in the history
Today when an index is closed all its shards are forced flushed 
but the translog files are left around. As explained in #42445 
we'd like to trim the translog for closed indices in order to 
consume less disk space. This commit reuses the existing 
AsyncTrimTranslogTask task and reenables it for closed indices.

At the time the task is executed, we should have the guarantee 
that nothing holds the translog files that are going to be removed. 
It also leaves a short period of time (10 min) during which translog 
files of a recently closed index are still present on disk. This could
 also help in some cases where the closed index is reopened 
shortly after being closed (in order to update an index setting 
for example).

Relates to #42445
  • Loading branch information
tlrx authored Jun 28, 2019
1 parent 88c9ecb commit c900795
Show file tree
Hide file tree
Showing 4 changed files with 152 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -940,6 +940,11 @@ final class AsyncTrimTranslogTask extends BaseAsyncTask {
.getSettings().getAsTime(INDEX_TRANSLOG_RETENTION_CHECK_INTERVAL_SETTING, TimeValue.timeValueMinutes(10)));
}

@Override
protected boolean mustReschedule() {
return indexService.closed.get() == false;
}

@Override
protected void runInternal() {
indexService.maybeTrimTranslog();
Expand Down Expand Up @@ -1031,8 +1036,8 @@ AsyncTranslogFSync getFsyncTask() { // for tests
return fsyncTask;
}

AsyncGlobalCheckpointTask getGlobalCheckpointTask() {
return globalCheckpointTask;
AsyncTrimTranslogTask getTrimTranslogTask() { // for tests
return trimTranslogTask;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,24 @@
import org.apache.lucene.index.SegmentReader;
import org.apache.lucene.store.Directory;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.util.concurrent.ReleasableLock;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogConfig;
import org.elasticsearch.index.translog.TranslogDeletionPolicy;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.List;
import java.util.Map;
import java.util.function.Function;

/**
* NoOpEngine is an engine implementation that does nothing but the bare minimum
* required in order to have an engine. All attempts to do something (search,
* index, get), throw {@link UnsupportedOperationException}.
* index, get), throw {@link UnsupportedOperationException}. However, NoOpEngine
* allows to trim any existing translog files through the usage of the
* {{@link #trimUnreferencedTranslogFiles()}} method.
*/
public final class NoOpEngine extends ReadOnlyEngine {

Expand Down Expand Up @@ -116,4 +124,52 @@ public SegmentsStats segmentsStats(boolean includeSegmentFileSizes, boolean incl
return super.segmentsStats(includeSegmentFileSizes, includeUnloadedSegments);
}
}

/**
* This implementation will trim existing translog files using a {@link TranslogDeletionPolicy}
* that retains nothing but the last translog generation from safe commit.
*/
@Override
public void trimUnreferencedTranslogFiles() {
final Store store = this.engineConfig.getStore();
store.incRef();
try (ReleasableLock lock = readLock.acquire()) {
ensureOpen();
final List<IndexCommit> commits = DirectoryReader.listCommits(store.directory());
if (commits.size() == 1) {
final Map<String, String> commitUserData = getLastCommittedSegmentInfos().getUserData();
final String translogUuid = commitUserData.get(Translog.TRANSLOG_UUID_KEY);
if (translogUuid == null) {
throw new IllegalStateException("commit doesn't contain translog unique id");
}
if (commitUserData.containsKey(Translog.TRANSLOG_GENERATION_KEY) == false) {
throw new IllegalStateException("commit doesn't contain translog generation id");
}
final long lastCommitGeneration = Long.parseLong(commitUserData.get(Translog.TRANSLOG_GENERATION_KEY));
final TranslogConfig translogConfig = engineConfig.getTranslogConfig();
final long minTranslogGeneration = Translog.readMinTranslogGeneration(translogConfig.getTranslogPath(), translogUuid);

if (minTranslogGeneration < lastCommitGeneration) {
// a translog deletion policy that retains nothing but the last translog generation from safe commit
final TranslogDeletionPolicy translogDeletionPolicy = new TranslogDeletionPolicy(-1, -1);
translogDeletionPolicy.setTranslogGenerationOfLastCommit(lastCommitGeneration);
translogDeletionPolicy.setMinTranslogGenerationForRecovery(lastCommitGeneration);

try (Translog translog = new Translog(translogConfig, translogUuid, translogDeletionPolicy,
engineConfig.getGlobalCheckpointSupplier(), engineConfig.getPrimaryTermSupplier(), seqNo -> {})) {
translog.trimUnreferencedReaders();
}
}
}
} catch (final Exception e) {
try {
failEngine("translog trimming failed", e);
} catch (Exception inner) {
e.addSuppressed(inner);
}
throw new EngineException(shardId, "failed to trim translog", e);
} finally {
store.decRef();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,15 @@
import org.elasticsearch.threadpool.ThreadPool;

import java.io.IOException;
import java.nio.file.Path;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

import static org.elasticsearch.index.shard.IndexShardTestCase.getEngine;
import static org.elasticsearch.test.InternalSettingsPlugin.TRANSLOG_RETENTION_CHECK_INTERVAL_SETTING;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.core.IsEqual.equalTo;
Expand Down Expand Up @@ -370,7 +373,7 @@ public void testAsyncTranslogTrimActuallyWorks() throws Exception {
.build();
IndexService indexService = createIndex("test", settings);
ensureGreen("test");
assertTrue(indexService.getRefreshTask().mustReschedule());
assertTrue(indexService.getTrimTranslogTask().mustReschedule());
client().prepareIndex("test", "test", "1").setSource("{\"foo\": \"bar\"}", XContentType.JSON).get();
client().admin().indices().prepareFlush("test").get();
client().admin().indices().prepareUpdateSettings("test")
Expand All @@ -382,6 +385,48 @@ public void testAsyncTranslogTrimActuallyWorks() throws Exception {
assertBusy(() -> assertThat(IndexShardTestCase.getTranslog(shard).totalOperations(), equalTo(0)));
}

public void testAsyncTranslogTrimTaskOnClosedIndex() throws Exception {
final String indexName = "test";
IndexService indexService = createIndex(indexName, Settings.builder()
.put(TRANSLOG_RETENTION_CHECK_INTERVAL_SETTING.getKey(), "100ms")
.build());

Translog translog = IndexShardTestCase.getTranslog(indexService.getShard(0));
final Path translogPath = translog.getConfig().getTranslogPath();
final String translogUuid = translog.getTranslogUUID();

final int numDocs = scaledRandomIntBetween(10, 100);
for (int i = 0; i < numDocs; i++) {
client().prepareIndex().setIndex(indexName).setId(String.valueOf(i)).setSource("{\"foo\": \"bar\"}", XContentType.JSON).get();
if (randomBoolean()) {
client().admin().indices().prepareFlush(indexName).get();
}
}
assertThat(translog.totalOperations(), equalTo(numDocs));
assertThat(translog.stats().estimatedNumberOfOperations(), equalTo(numDocs));
assertAcked(client().admin().indices().prepareClose("test"));

indexService = getInstanceFromNode(IndicesService.class).indexServiceSafe(indexService.index());
assertTrue(indexService.getTrimTranslogTask().mustReschedule());

final long lastCommitedTranslogGeneration;
try (Engine.IndexCommitRef indexCommitRef = getEngine(indexService.getShard(0)).acquireLastIndexCommit(false)) {
Map<String, String> lastCommittedUserData = indexCommitRef.getIndexCommit().getUserData();
lastCommitedTranslogGeneration = Long.parseLong(lastCommittedUserData.get(Translog.TRANSLOG_GENERATION_KEY));
}
assertBusy(() -> {
long minTranslogGen = Translog.readMinTranslogGeneration(translogPath, translogUuid);
assertThat(minTranslogGen, equalTo(lastCommitedTranslogGeneration));
});

assertAcked(client().admin().indices().prepareOpen("test"));

indexService = getInstanceFromNode(IndicesService.class).indexServiceSafe(indexService.index());
translog = IndexShardTestCase.getTranslog(indexService.getShard(0));
assertThat(translog.totalOperations(), equalTo(0));
assertThat(translog.stats().estimatedNumberOfOperations(), equalTo(0));
}

public void testIllegalFsyncInterval() {
Settings settings = Settings.builder()
.put(IndexSettings.INDEX_TRANSLOG_SYNC_INTERVAL_SETTING.getKey(), "0ms") // disable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,15 @@
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.DocsStats;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogDeletionPolicy;
import org.elasticsearch.test.IndexSettingsModule;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.file.Path;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;

import static org.hamcrest.Matchers.equalTo;
Expand Down Expand Up @@ -83,7 +85,7 @@ public void testNoopAfterRegularEngine() throws IOException {
tracker.updateLocalCheckpoint(allocationId.getId(), i);
}

flushAndTrimTranslog(engine);
engine.flush(true, true);

long localCheckpoint = engine.getPersistedLocalCheckpoint();
long maxSeqNo = engine.getSeqNoStats(100L).getMaxSeqNo();
Expand Down Expand Up @@ -159,6 +161,45 @@ public void testNoOpEngineStats() throws Exception {
}
}

public void testTrimUnreferencedTranslogFiles() throws Exception {
final ReplicationTracker tracker = (ReplicationTracker) engine.config().getGlobalCheckpointSupplier();
ShardRouting routing = TestShardRouting.newShardRouting("test", shardId.id(), "node",
null, true, ShardRoutingState.STARTED, allocationId);
IndexShardRoutingTable table = new IndexShardRoutingTable.Builder(shardId).addShard(routing).build();
tracker.updateFromMaster(1L, Collections.singleton(allocationId.getId()), table);
tracker.activatePrimaryMode(SequenceNumbers.NO_OPS_PERFORMED);

final int numDocs = scaledRandomIntBetween(10, 3000);
for (int i = 0; i < numDocs; i++) {
engine.index(indexForDoc(createParsedDoc(Integer.toString(i), null)));
if (rarely()) {
engine.flush();
}
tracker.updateLocalCheckpoint(allocationId.getId(), i);
}
engine.flush(true, true);

final String translogUuid = engine.getTranslog().getTranslogUUID();
final long minFileGeneration = engine.getTranslog().getMinFileGeneration();
final long currentFileGeneration = engine.getTranslog().currentFileGeneration();
engine.close();

final NoOpEngine noOpEngine = new NoOpEngine(noOpConfig(INDEX_SETTINGS, store, primaryTranslogDir, tracker));
final Path translogPath = noOpEngine.config().getTranslogConfig().getTranslogPath();

final long lastCommitedTranslogGeneration;
try (Engine.IndexCommitRef indexCommitRef = noOpEngine.acquireLastIndexCommit(false)) {
Map<String, String> lastCommittedUserData = indexCommitRef.getIndexCommit().getUserData();
lastCommitedTranslogGeneration = Long.parseLong(lastCommittedUserData.get(Translog.TRANSLOG_GENERATION_KEY));
assertThat(lastCommitedTranslogGeneration, equalTo(currentFileGeneration));
}

assertThat(Translog.readMinTranslogGeneration(translogPath, translogUuid), equalTo(minFileGeneration));
noOpEngine.trimUnreferencedTranslogFiles();
assertThat(Translog.readMinTranslogGeneration(translogPath, translogUuid), equalTo(lastCommitedTranslogGeneration));
noOpEngine.close();
}

private void flushAndTrimTranslog(final InternalEngine engine) {
engine.flush(true, true);
final TranslogDeletionPolicy deletionPolicy = engine.getTranslog().getDeletionPolicy();
Expand Down

0 comments on commit c900795

Please sign in to comment.