Skip to content

Commit

Permalink
Merge remote-tracking branch 'dakrone/lockobtainfailed-replacement'
Browse files Browse the repository at this point in the history
  • Loading branch information
dakrone committed Aug 16, 2016
2 parents a61257e + 1de3388 commit 1825d80
Show file tree
Hide file tree
Showing 10 changed files with 100 additions and 38 deletions.
34 changes: 17 additions & 17 deletions core/src/main/java/org/elasticsearch/env/NodeEnvironment.java
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,7 @@ private static String toString(Collection<String> items) {
* @param shardId the id of the shard to delete to delete
* @throws IOException if an IOException occurs
*/
public void deleteShardDirectorySafe(ShardId shardId, IndexSettings indexSettings) throws IOException {
public void deleteShardDirectorySafe(ShardId shardId, IndexSettings indexSettings) throws IOException, ShardLockObtainFailedException {
final Path[] paths = availableShardPaths(shardId);
logger.trace("deleting shard {} directory, paths: [{}]", shardId, paths);
try (ShardLock lock = shardLock(shardId)) {
Expand Down Expand Up @@ -462,7 +462,7 @@ public static void acquireFSLockForPaths(IndexSettings indexSettings, Path... sh
locks[i] = dirs[i].obtainLock(IndexWriter.WRITE_LOCK_NAME);
} catch (IOException ex) {
throw new LockObtainFailedException("unable to acquire " +
IndexWriter.WRITE_LOCK_NAME + " for " + p);
IndexWriter.WRITE_LOCK_NAME + " for " + p, ex);
}
}
} finally {
Expand Down Expand Up @@ -504,7 +504,7 @@ private boolean isShardLocked(ShardId id) {
try {
shardLock(id, 0).close();
return false;
} catch (IOException ex) {
} catch (ShardLockObtainFailedException ex) {
return true;
}
}
Expand All @@ -519,7 +519,8 @@ private boolean isShardLocked(ShardId id) {
* @param indexSettings settings for the index being deleted
* @throws IOException if any of the shards data directories can't be locked or deleted
*/
public void deleteIndexDirectorySafe(Index index, long lockTimeoutMS, IndexSettings indexSettings) throws IOException {
public void deleteIndexDirectorySafe(Index index, long lockTimeoutMS, IndexSettings indexSettings)
throws IOException, ShardLockObtainFailedException {
final List<ShardLock> locks = lockAllForIndex(index, indexSettings, lockTimeoutMS);
try {
deleteIndexDirectoryUnderLock(index, indexSettings);
Expand Down Expand Up @@ -549,14 +550,15 @@ public void deleteIndexDirectoryUnderLock(Index index, IndexSettings indexSettin

/**
* Tries to lock all local shards for the given index. If any of the shard locks can't be acquired
* an {@link LockObtainFailedException} is thrown and all previously acquired locks are released.
* a {@link ShardLockObtainFailedException} is thrown and all previously acquired locks are released.
*
* @param index the index to lock shards for
* @param lockTimeoutMS how long to wait for acquiring the indices shard locks
* @return the {@link ShardLock} instances for this index.
* @throws IOException if an IOException occurs.
*/
public List<ShardLock> lockAllForIndex(Index index, IndexSettings settings, long lockTimeoutMS) throws IOException {
public List<ShardLock> lockAllForIndex(Index index, IndexSettings settings, long lockTimeoutMS)
throws IOException, ShardLockObtainFailedException {
final int numShards = settings.getNumberOfShards();
if (numShards <= 0) {
throw new IllegalArgumentException("settings must contain a non-null > 0 number of shards");
Expand Down Expand Up @@ -584,29 +586,27 @@ public List<ShardLock> lockAllForIndex(Index index, IndexSettings settings, long
* Tries to lock the given shards ID. A shard lock is required to perform any kind of
* write operation on a shards data directory like deleting files, creating a new index writer
* or recover from a different shard instance into it. If the shard lock can not be acquired
* an {@link LockObtainFailedException} is thrown.
* a {@link ShardLockObtainFailedException} is thrown.
*
* Note: this method will return immediately if the lock can't be acquired.
*
* @param id the shard ID to lock
* @return the shard lock. Call {@link ShardLock#close()} to release the lock
* @throws IOException if an IOException occurs.
*/
public ShardLock shardLock(ShardId id) throws IOException {
public ShardLock shardLock(ShardId id) throws ShardLockObtainFailedException {
return shardLock(id, 0);
}

/**
* Tries to lock the given shards ID. A shard lock is required to perform any kind of
* write operation on a shards data directory like deleting files, creating a new index writer
* or recover from a different shard instance into it. If the shard lock can not be acquired
* an {@link org.apache.lucene.store.LockObtainFailedException} is thrown
* a {@link ShardLockObtainFailedException} is thrown
* @param shardId the shard ID to lock
* @param lockTimeoutMS the lock timeout in milliseconds
* @return the shard lock. Call {@link ShardLock#close()} to release the lock
* @throws IOException if an IOException occurs.
*/
public ShardLock shardLock(final ShardId shardId, long lockTimeoutMS) throws IOException {
public ShardLock shardLock(final ShardId shardId, long lockTimeoutMS) throws ShardLockObtainFailedException {
logger.trace("acquiring node shardlock on [{}], timeout [{}]", shardId, lockTimeoutMS);
final InternalShardLock shardLock;
final boolean acquired;
Expand Down Expand Up @@ -647,8 +647,7 @@ protected void closeInternal() {
*/
@FunctionalInterface
public interface ShardLocker {

ShardLock lock(ShardId shardId, long lockTimeoutMS) throws IOException;
ShardLock lock(ShardId shardId, long lockTimeoutMS) throws ShardLockObtainFailedException;
}

/**
Expand Down Expand Up @@ -703,14 +702,15 @@ private void decWaitCount() {
}
}

void acquire(long timeoutInMillis) throws LockObtainFailedException{
void acquire(long timeoutInMillis) throws ShardLockObtainFailedException {
try {
if (mutex.tryAcquire(timeoutInMillis, TimeUnit.MILLISECONDS) == false) {
throw new LockObtainFailedException("Can't lock shard " + shardId + ", timed out after " + timeoutInMillis + "ms");
throw new ShardLockObtainFailedException(shardId,
"obtaining shard lock timed out after " + timeoutInMillis + "ms");
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new LockObtainFailedException("Can't lock shard " + shardId + ", interrupted", e);
throw new ShardLockObtainFailedException(shardId, "thread interrupted while trying to obtain shard lock", e);
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/java/org/elasticsearch/env/ShardLock.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public final ShardId getShardId() {
}

@Override
public final void close() throws IOException {
public final void close() {
if (this.closed.compareAndSet(false, true)) {
closeInternal();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.env;

import org.elasticsearch.index.shard.ShardId;

/**
* Exception used when the in-memory lock for a shard cannot be obtained
*/
public class ShardLockObtainFailedException extends Exception {
private final ShardId shardId;

public ShardLockObtainFailedException(ShardId shardId, String message) {
super(message);
this.shardId = shardId;
}

public ShardLockObtainFailedException(ShardId shardId, String message, Throwable cause) {
super(message, cause);
this.shardId = shardId;
}

@Override
public String getMessage() {
StringBuffer sb = new StringBuffer();
sb.append(shardId.toString());
sb.append(": ");
sb.append(super.getMessage());
return sb.toString();
}
}
10 changes: 8 additions & 2 deletions core/src/main/java/org/elasticsearch/index/IndexService.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.env.ShardLock;
import org.elasticsearch.env.ShardLockObtainFailedException;
import org.elasticsearch.index.analysis.AnalysisRegistry;
import org.elasticsearch.index.analysis.AnalysisService;
import org.elasticsearch.index.cache.IndexCache;
Expand Down Expand Up @@ -279,8 +280,9 @@ public synchronized IndexShard createShard(ShardRouting routing) throws IOExcept
boolean success = false;
Store store = null;
IndexShard indexShard = null;
final ShardLock lock = nodeEnv.shardLock(shardId, TimeUnit.SECONDS.toMillis(5));
ShardLock lock = null;
try {
lock = nodeEnv.shardLock(shardId, TimeUnit.SECONDS.toMillis(5));
eventListener.beforeIndexShardCreated(shardId, indexSettings);
ShardPath path;
try {
Expand Down Expand Up @@ -349,9 +351,13 @@ public synchronized IndexShard createShard(ShardRouting routing) throws IOExcept
shards = newMapBuilder(shards).put(shardId.id(), indexShard).immutableMap();
success = true;
return indexShard;
} catch (ShardLockObtainFailedException e) {
throw new IOException("failed to obtain in-memory shard lock", e);
} finally {
if (success == false) {
IOUtils.closeWhileHandlingException(lock);
if (lock != null) {
IOUtils.closeWhileHandlingException(lock);
}
closeShard("initialization failed", shardId, indexShard, store, eventListener);
}
}
Expand Down
6 changes: 6 additions & 0 deletions core/src/main/java/org/elasticsearch/index/store/Store.java
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
import org.elasticsearch.common.util.iterable.Iterables;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.env.ShardLock;
import org.elasticsearch.env.ShardLockObtainFailedException;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.shard.AbstractIndexShardComponent;
Expand Down Expand Up @@ -388,6 +389,8 @@ public static MetadataSnapshot readMetadataSnapshot(Path indexLocation, ShardId
// that's fine - happens all the time no need to log
} catch (FileNotFoundException | NoSuchFileException ex) {
logger.info("Failed to open / find files while reading metadata snapshot");
} catch (ShardLockObtainFailedException ex) {
logger.info("{}: failed to obtain shard lock", ex, shardId);
}
return MetadataSnapshot.EMPTY;
}
Expand Down Expand Up @@ -418,6 +421,9 @@ public static void tryOpenIndex(Path indexLocation, ShardId shardId, NodeEnviron
failIfCorrupted(dir, shardId);
SegmentInfos segInfo = Lucene.readSegmentInfos(dir);
logger.trace("{} loaded segment info [{}]", shardId, segInfo);
} catch (ShardLockObtainFailedException ex) {
logger.error("{} unable to acquire shard lock", ex, shardId);
throw new IOException(ex);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import org.elasticsearch.common.util.iterable.Iterables;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.env.ShardLock;
import org.elasticsearch.env.ShardLockObtainFailedException;
import org.elasticsearch.gateway.MetaDataStateFormat;
import org.elasticsearch.gateway.MetaStateService;
import org.elasticsearch.index.Index;
Expand Down Expand Up @@ -676,7 +677,8 @@ public void deleteShardStore(String reason, ShardLock lock, IndexSettings indexS
* @param clusterState . This is required to access the indexes settings etc.
* @throws IOException if an IOException occurs
*/
public void deleteShardStore(String reason, ShardId shardId, ClusterState clusterState) throws IOException {
public void deleteShardStore(String reason, ShardId shardId, ClusterState clusterState)
throws IOException, ShardLockObtainFailedException {
final IndexMetaData metaData = clusterState.getMetaData().indices().get(shardId.getIndexName());

final IndexSettings indexSettings = buildIndexSettings(metaData);
Expand Down Expand Up @@ -891,7 +893,8 @@ public int compareTo(PendingDelete o) {
* @param timeout the timeout used for processing pending deletes
*/
@Override
public void processPendingDeletes(Index index, IndexSettings indexSettings, TimeValue timeout) throws IOException, InterruptedException {
public void processPendingDeletes(Index index, IndexSettings indexSettings, TimeValue timeout)
throws IOException, InterruptedException, ShardLockObtainFailedException {
logger.debug("{} processing pending deletes", index);
final long startTimeNS = System.nanoTime();
final List<ShardLock> shardLocks = nodeEnv.lockAllForIndex(index, indexSettings, timeout.millis());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.elasticsearch.common.util.Callback;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.env.ShardLockObtainFailedException;
import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexComponent;
Expand Down Expand Up @@ -835,6 +836,7 @@ default T getShardOrNull(ShardId shardId) {
return null;
}

void processPendingDeletes(Index index, IndexSettings indexSettings, TimeValue timeValue) throws IOException, InterruptedException;
void processPendingDeletes(Index index, IndexSettings indexSettings, TimeValue timeValue)
throws IOException, InterruptedException, ShardLockObtainFailedException;
}
}
21 changes: 8 additions & 13 deletions core/src/test/java/org/elasticsearch/env/NodeEnvironmentTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.elasticsearch.env;

import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.store.LockObtainFailedException;
import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.LuceneTestCase;
import org.elasticsearch.cluster.metadata.IndexMetaData;
Expand Down Expand Up @@ -129,7 +128,7 @@ public void testNodeLockMultipleEnvironment() throws IOException {
IOUtils.close(first, second);
}

public void testShardLock() throws IOException {
public void testShardLock() throws Exception {
final NodeEnvironment env = newNodeEnvironment();

Index index = new Index("foo", "fooUUID");
Expand All @@ -139,7 +138,7 @@ public void testShardLock() throws IOException {
try {
env.shardLock(new ShardId(index, 0));
fail("shard is locked");
} catch (LockObtainFailedException ex) {
} catch (ShardLockObtainFailedException ex) {
// expected
}
for (Path path : env.indexPaths(index)) {
Expand All @@ -149,7 +148,7 @@ public void testShardLock() throws IOException {
try {
env.lockAllForIndex(index, idxSettings, randomIntBetween(0, 10));
fail("shard 0 is locked");
} catch (LockObtainFailedException ex) {
} catch (ShardLockObtainFailedException ex) {
// expected
}

Expand All @@ -161,7 +160,7 @@ public void testShardLock() throws IOException {
try {
env.shardLock(new ShardId(index, 0));
fail("shard is locked");
} catch (LockObtainFailedException ex) {
} catch (ShardLockObtainFailedException ex) {
// expected
}
IOUtils.close(locks);
Expand Down Expand Up @@ -213,13 +212,12 @@ public void testResolveIndexFolders() throws Exception {
env.close();
}

public void testDeleteSafe() throws IOException, InterruptedException {
public void testDeleteSafe() throws Exception {
final NodeEnvironment env = newNodeEnvironment();
final Index index = new Index("foo", "fooUUID");
ShardLock fooLock = env.shardLock(new ShardId(index, 0));
assertEquals(new ShardId(index, 0), fooLock.getShardId());


for (Path path : env.indexPaths(index)) {
Files.createDirectories(path.resolve("0"));
Files.createDirectories(path.resolve("1"));
Expand All @@ -228,14 +226,13 @@ public void testDeleteSafe() throws IOException, InterruptedException {
try {
env.deleteShardDirectorySafe(new ShardId(index, 0), idxSettings);
fail("shard is locked");
} catch (LockObtainFailedException ex) {
} catch (ShardLockObtainFailedException ex) {
// expected
}

for (Path path : env.indexPaths(index)) {
assertTrue(Files.exists(path.resolve("0")));
assertTrue(Files.exists(path.resolve("1")));

}

env.deleteShardDirectorySafe(new ShardId(index, 1), idxSettings);
Expand All @@ -248,7 +245,7 @@ public void testDeleteSafe() throws IOException, InterruptedException {
try {
env.deleteIndexDirectorySafe(index, randomIntBetween(0, 10), idxSettings);
fail("shard is locked");
} catch (LockObtainFailedException ex) {
} catch (ShardLockObtainFailedException ex) {
// expected
}
fooLock.close();
Expand Down Expand Up @@ -338,10 +335,8 @@ public void run() {
assertEquals(flipFlop[shard].incrementAndGet(), 1);
assertEquals(flipFlop[shard].decrementAndGet(), 0);
}
} catch (LockObtainFailedException ex) {
} catch (ShardLockObtainFailedException ex) {
// ok
} catch (IOException ex) {
fail(ex.toString());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.env.ShardLockObtainFailedException;
import org.elasticsearch.gateway.GatewayMetaState;
import org.elasticsearch.gateway.LocalAllocateDangledIndices;
import org.elasticsearch.gateway.MetaStateService;
Expand Down Expand Up @@ -172,7 +173,7 @@ public void testPendingTasks() throws Exception {
try {
indicesService.processPendingDeletes(test.index(), test.getIndexSettings(), new TimeValue(0, TimeUnit.MILLISECONDS));
fail("can't get lock");
} catch (LockObtainFailedException ex) {
} catch (ShardLockObtainFailedException ex) {

}
assertTrue(path.exists());
Expand Down
Loading

0 comments on commit 1825d80

Please sign in to comment.