Skip to content

Commit

Permalink
Move caching of the size of a directory to StoreDirectory. (#30581)
Browse files Browse the repository at this point in the history
In spite of the existing caching, I have seen a number of nodes hot threads
where one thread had been spending all its cpu on computing the size of a
directory. I am proposing to move the computation of the size of the directory
to `StoreDirectory` in order to skip recomputing the size of the directory if
no changes have been made. This should help with users that have read-only
indices, which is very common for time-based indices.
  • Loading branch information
jpountz authored Jun 5, 2018
1 parent 984523d commit 03dcf22
Show file tree
Hide file tree
Showing 4 changed files with 302 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,11 @@ public T getOrRefresh() {
return cached;
}

/** Return the potentially stale cached entry. */
protected final T getNoRefresh() {
return cached;
}

/**
* Returns a new instance to cache
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.index.store;

import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FilterDirectory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexOutput;
import org.elasticsearch.common.lucene.store.FilterIndexOutput;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.SingleObjectCache;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.file.AccessDeniedException;
import java.nio.file.NoSuchFileException;

final class ByteSizeCachingDirectory extends FilterDirectory {

private static class SizeAndModCount {
final long size;
final long modCount;
final boolean pendingWrite;

SizeAndModCount(long length, long modCount, boolean pendingWrite) {
this.size = length;
this.modCount = modCount;
this.pendingWrite = pendingWrite;
}
}

private static long estimateSizeInBytes(Directory directory) throws IOException {
long estimatedSize = 0;
String[] files = directory.listAll();
for (String file : files) {
try {
estimatedSize += directory.fileLength(file);
} catch (NoSuchFileException | FileNotFoundException | AccessDeniedException e) {
// ignore, the file is not there no more; on Windows, if one thread concurrently deletes a file while
// calling Files.size, you can also sometimes hit AccessDeniedException
}
}
return estimatedSize;
}

private final SingleObjectCache<SizeAndModCount> size;
// Both these variables need to be accessed under `this` lock.
private long modCount = 0;
private long numOpenOutputs = 0;

ByteSizeCachingDirectory(Directory in, TimeValue refreshInterval) {
super(in);
size = new SingleObjectCache<SizeAndModCount>(refreshInterval, new SizeAndModCount(0L, -1L, true)) {
@Override
protected SizeAndModCount refresh() {
// It is ok for the size of the directory to be more recent than
// the mod count, we would just recompute the size of the
// directory on the next call as well. However the opposite
// would be bad as we would potentially have a stale cache
// entry for a long time. So we fetch the values of modCount and
// numOpenOutputs BEFORE computing the size of the directory.
final long modCount;
final boolean pendingWrite;
synchronized(ByteSizeCachingDirectory.this) {
modCount = ByteSizeCachingDirectory.this.modCount;
pendingWrite = ByteSizeCachingDirectory.this.numOpenOutputs != 0;
}
final long size;
try {
// Compute this OUTSIDE of the lock
size = estimateSizeInBytes(getDelegate());
} catch (IOException e) {
throw new UncheckedIOException(e);
}
return new SizeAndModCount(size, modCount, pendingWrite);
}

@Override
protected boolean needsRefresh() {
if (super.needsRefresh() == false) {
// The size was computed recently, don't recompute
return false;
}
SizeAndModCount cached = getNoRefresh();
if (cached.pendingWrite) {
// The cached entry was generated while there were pending
// writes, so the size might be stale: recompute.
return true;
}
synchronized(ByteSizeCachingDirectory.this) {
// If there are pending writes or if new files have been
// written/deleted since last time: recompute
return numOpenOutputs != 0 || cached.modCount != modCount;
}
}
};
}

/** Return the cumulative size of all files in this directory. */
long estimateSizeInBytes() throws IOException {
try {
return size.getOrRefresh().size;
} catch (UncheckedIOException e) {
// we wrapped in the cache and unwrap here
throw e.getCause();
}
}

@Override
public IndexOutput createOutput(String name, IOContext context) throws IOException {
return wrapIndexOutput(super.createOutput(name, context));
}

@Override
public IndexOutput createTempOutput(String prefix, String suffix, IOContext context) throws IOException {
return wrapIndexOutput(super.createTempOutput(prefix, suffix, context));
}

private IndexOutput wrapIndexOutput(IndexOutput out) {
synchronized (this) {
numOpenOutputs++;
}
return new FilterIndexOutput(out.toString(), out) {
@Override
public void writeBytes(byte[] b, int length) throws IOException {
// Don't write to atomicXXX here since it might be called in
// tight loops and memory barriers are costly
super.writeBytes(b, length);
}

@Override
public void writeByte(byte b) throws IOException {
// Don't write to atomicXXX here since it might be called in
// tight loops and memory barriers are costly
super.writeByte(b);
}

@Override
public void close() throws IOException {
// Close might cause some data to be flushed from in-memory buffers, so
// increment the modification counter too.
try {
super.close();
} finally {
synchronized (this) {
numOpenOutputs--;
modCount++;
}
}
}
};
}

@Override
public void deleteFile(String name) throws IOException {
try {
super.deleteFile(name);
} finally {
synchronized (this) {
modCount++;
}
}
}

}
54 changes: 12 additions & 42 deletions server/src/main/java/org/elasticsearch/index/store/Store.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.BytesRefBuilder;
import org.apache.lucene.util.Version;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.bytes.BytesReference;
Expand All @@ -67,7 +66,6 @@
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.SingleObjectCache;
import org.elasticsearch.common.util.concurrent.AbstractRefCounted;
import org.elasticsearch.common.util.concurrent.RefCounted;
import org.elasticsearch.common.util.iterable.Iterables;
Expand All @@ -91,7 +89,6 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.PrintStream;
import java.nio.file.AccessDeniedException;
import java.nio.file.NoSuchFileException;
import java.nio.file.Path;
import java.util.ArrayList;
Expand Down Expand Up @@ -146,7 +143,6 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
private final ReentrantReadWriteLock metadataLock = new ReentrantReadWriteLock();
private final ShardLock shardLock;
private final OnClose onClose;
private final SingleObjectCache<StoreStats> statsCache;

private final AbstractRefCounted refCounter = new AbstractRefCounted("store") {
@Override
Expand All @@ -164,12 +160,13 @@ public Store(ShardId shardId, IndexSettings indexSettings, DirectoryService dire
OnClose onClose) throws IOException {
super(shardId, indexSettings);
final Settings settings = indexSettings.getSettings();
this.directory = new StoreDirectory(directoryService.newDirectory(), Loggers.getLogger("index.store.deletes", settings, shardId));
this.shardLock = shardLock;
this.onClose = onClose;
Directory dir = directoryService.newDirectory();
final TimeValue refreshInterval = indexSettings.getValue(INDEX_STORE_STATS_REFRESH_INTERVAL_SETTING);
this.statsCache = new StoreStatsCache(refreshInterval, directory);
logger.debug("store stats are refreshed with refresh_interval [{}]", refreshInterval);
ByteSizeCachingDirectory sizeCachingDir = new ByteSizeCachingDirectory(dir, refreshInterval);
this.directory = new StoreDirectory(sizeCachingDir, Loggers.getLogger("index.store.deletes", settings, shardId));
this.shardLock = shardLock;
this.onClose = onClose;

assert onClose != null;
assert shardLock != null;
Expand Down Expand Up @@ -377,7 +374,7 @@ public void exorciseIndex(CheckIndex.Status status) throws IOException {

public StoreStats stats() throws IOException {
ensureOpen();
return statsCache.getOrRefresh();
return new StoreStats(directory.estimateSize());
}

/**
Expand Down Expand Up @@ -731,11 +728,16 @@ static final class StoreDirectory extends FilterDirectory {

private final Logger deletesLogger;

StoreDirectory(Directory delegateDirectory, Logger deletesLogger) {
StoreDirectory(ByteSizeCachingDirectory delegateDirectory, Logger deletesLogger) {
super(delegateDirectory);
this.deletesLogger = deletesLogger;
}

/** Estimate the cumulative size of all files in this directory in bytes. */
long estimateSize() throws IOException {
return ((ByteSizeCachingDirectory) getDelegate()).estimateSizeInBytes();
}

@Override
public void close() {
assert false : "Nobody should close this directory except of the Store itself";
Expand Down Expand Up @@ -1428,38 +1430,6 @@ public void accept(ShardLock Lock) {
};
}

private static class StoreStatsCache extends SingleObjectCache<StoreStats> {
private final Directory directory;

StoreStatsCache(TimeValue refreshInterval, Directory directory) throws IOException {
super(refreshInterval, new StoreStats(estimateSize(directory)));
this.directory = directory;
}

@Override
protected StoreStats refresh() {
try {
return new StoreStats(estimateSize(directory));
} catch (IOException ex) {
throw new ElasticsearchException("failed to refresh store stats", ex);
}
}

private static long estimateSize(Directory directory) throws IOException {
long estimatedSize = 0;
String[] files = directory.listAll();
for (String file : files) {
try {
estimatedSize += directory.fileLength(file);
} catch (NoSuchFileException | FileNotFoundException | AccessDeniedException e) {
// ignore, the file is not there no more; on Windows, if one thread concurrently deletes a file while
// calling Files.size, you can also sometimes hit AccessDeniedException
}
}
return estimatedSize;
}
}

/**
* creates an empty lucene index and a corresponding empty translog. Any existing data will be deleted.
*/
Expand Down
Loading

0 comments on commit 03dcf22

Please sign in to comment.