From 13a073dca33def6c6975dbf2ab734911c3851993 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Fri, 2 Oct 2020 13:26:33 -0600 Subject: [PATCH] Do not block Translog add on file write (#62513) Currently a TranslogWriter add operation is synchronized. This operation adds the bytes to the file output stream buffer and issues a write system call if the buffer is filled. This happens every 8KB which means that we routinely block other add calls on system writes. This commit modifies the add operation to simply place the operation in an array list. The array list if flushed when the sync call occurs or when 1MB is buffered. --- .../common/io/DiskIoBufferPool.java | 60 ++++ .../util/concurrent/ReleasableLock.java | 13 + .../index/translog/Translog.java | 14 +- .../index/translog/TranslogConfig.java | 2 +- .../index/translog/TranslogWriter.java | 272 ++++++++++++------ .../translog/TranslogDeletionPolicyTests.java | 3 +- .../index/translog/TranslogTests.java | 130 ++++++++- 7 files changed, 380 insertions(+), 114 deletions(-) create mode 100644 server/src/main/java/org/elasticsearch/common/io/DiskIoBufferPool.java diff --git a/server/src/main/java/org/elasticsearch/common/io/DiskIoBufferPool.java b/server/src/main/java/org/elasticsearch/common/io/DiskIoBufferPool.java new file mode 100644 index 0000000000000..782a2fdde28a0 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/common/io/DiskIoBufferPool.java @@ -0,0 +1,60 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.common.io; + +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.threadpool.ThreadPool; + +import java.nio.ByteBuffer; +import java.util.Arrays; + +public class DiskIoBufferPool { + + public static final int BUFFER_SIZE = StrictMath.toIntExact(ByteSizeValue.parseBytesSizeValue( + System.getProperty("es.disk_io.direct.buffer.size", "64KB"), "es.disk_io.direct.buffer.size").getBytes()); + public static final int HEAP_BUFFER_SIZE = 8 * 1024; + + private static final ThreadLocal ioBufferPool = ThreadLocal.withInitial(() -> { + if (isWriteOrFlushThread()) { + return ByteBuffer.allocateDirect(BUFFER_SIZE); + } else { + return ByteBuffer.allocate(HEAP_BUFFER_SIZE); + } + }); + + public static ByteBuffer getIoBuffer() { + ByteBuffer ioBuffer = ioBufferPool.get(); + ioBuffer.clear(); + return ioBuffer; + } + + private static boolean isWriteOrFlushThread() { + String threadName = Thread.currentThread().getName(); + for (String s : Arrays.asList( + "[" + ThreadPool.Names.WRITE + "]", + "[" + ThreadPool.Names.FLUSH + "]", + "[" + ThreadPool.Names.SYSTEM_WRITE + "]")) { + if (threadName.contains(s)) { + return true; + } + } + return false; + } +} diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/ReleasableLock.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/ReleasableLock.java index 2444080744912..7820b09b9ddff 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/ReleasableLock.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/ReleasableLock.java @@ -58,6 +58,19 @@ public ReleasableLock acquire() throws EngineException { return this; } + /** + * Try acquiring lock, returning null if unable. + */ + public ReleasableLock tryAcquire() { + boolean locked = lock.tryLock(); + if (locked) { + assert addCurrentThread(); + return this; + } else { + return null; + } + } + /** * Try acquiring lock, returning null if unable to acquire lock within timeout. */ diff --git a/server/src/main/java/org/elasticsearch/index/translog/Translog.java b/server/src/main/java/org/elasticsearch/index/translog/Translog.java index adb3e5b7aba8a..1552a5436e765 100644 --- a/server/src/main/java/org/elasticsearch/index/translog/Translog.java +++ b/server/src/main/java/org/elasticsearch/index/translog/Translog.java @@ -27,13 +27,13 @@ import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.bytes.ReleasableBytesReference; import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.lease.Releasables; import org.elasticsearch.common.lucene.uid.Versions; -import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.concurrent.ReleasableLock; import org.elasticsearch.core.internal.io.IOUtils; @@ -525,6 +525,7 @@ TranslogWriter createWriter(long fileGeneration, long initialMinTranslogGen, lon */ public Location add(final Operation operation) throws IOException { final ReleasableBytesStreamOutput out = new ReleasableBytesStreamOutput(bigArrays); + boolean successfullySerialized = false; try { final long start = out.position(); out.skip(Integer.BYTES); @@ -534,8 +535,9 @@ public Location add(final Operation operation) throws IOException { out.seek(start); out.writeInt(operationSize); out.seek(end); - final BytesReference bytes = out.bytes(); - try (ReleasableLock ignored = readLock.acquire()) { + successfullySerialized = true; + try (ReleasableBytesReference bytes = new ReleasableBytesReference(out.bytes(), out); + ReleasableLock ignored = readLock.acquire()) { ensureOpen(); if (operation.primaryTerm() > current.getPrimaryTerm()) { assert false : @@ -553,7 +555,9 @@ public Location add(final Operation operation) throws IOException { closeOnTragicEvent(ex); throw new TranslogException(shardId, "Failed to write operation [" + operation + "]", ex); } finally { - Releasables.close(out); + if (successfullySerialized == false) { + Releasables.close(out); + } } } @@ -1889,7 +1893,7 @@ public static String createEmptyTranslog(final Path location, Checkpoint.write(channelFactory, checkpointFile, checkpoint, StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW); IOUtils.fsync(checkpointFile, false); final TranslogWriter writer = TranslogWriter.create(shardId, uuid, generation, translogFile, channelFactory, - new ByteSizeValue(10), minTranslogGeneration, initialGlobalCheckpoint, + TranslogConfig.DEFAULT_BUFFER_SIZE, minTranslogGeneration, initialGlobalCheckpoint, () -> { throw new UnsupportedOperationException(); }, () -> { diff --git a/server/src/main/java/org/elasticsearch/index/translog/TranslogConfig.java b/server/src/main/java/org/elasticsearch/index/translog/TranslogConfig.java index a8acf58683935..23c43c3d92318 100644 --- a/server/src/main/java/org/elasticsearch/index/translog/TranslogConfig.java +++ b/server/src/main/java/org/elasticsearch/index/translog/TranslogConfig.java @@ -34,7 +34,7 @@ */ public final class TranslogConfig { - public static final ByteSizeValue DEFAULT_BUFFER_SIZE = new ByteSizeValue(8, ByteSizeUnit.KB); + public static final ByteSizeValue DEFAULT_BUFFER_SIZE = new ByteSizeValue(1, ByteSizeUnit.MB); private final BigArrays bigArrays; private final IndexSettings indexSettings; private final ShardId shardId; diff --git a/server/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java b/server/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java index e91ae0504536c..fa09b7af7816e 100644 --- a/server/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java +++ b/server/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java @@ -22,32 +22,42 @@ import com.carrotsearch.hppc.LongArrayList; import com.carrotsearch.hppc.procedures.LongProcedure; import org.apache.lucene.store.AlreadyClosedException; +import org.apache.lucene.util.BytesRef; +import org.apache.lucene.util.BytesRefIterator; import org.elasticsearch.Assertions; +import org.elasticsearch.common.SuppressForbidden; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.bytes.ReleasableBytesReference; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.io.Channels; +import org.elasticsearch.common.io.DiskIoBufferPool; +import org.elasticsearch.common.lease.Releasable; +import org.elasticsearch.common.lease.Releasables; import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.util.concurrent.ReleasableLock; import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.ShardId; -import java.io.BufferedOutputStream; import java.io.Closeable; import java.io.IOException; -import java.io.OutputStream; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; import java.nio.file.Path; import java.nio.file.StandardOpenOption; +import java.util.ArrayDeque; +import java.util.ArrayList; import java.util.HashMap; import java.util.Map; import java.util.Objects; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.ReentrantLock; import java.util.function.LongConsumer; import java.util.function.LongSupplier; public class TranslogWriter extends BaseTranslogReader implements Closeable { + private final ShardId shardId; private final FileChannel checkpointChannel; private final Path checkpointPath; @@ -57,8 +67,6 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable { private volatile int operationCounter; /* if we hit an exception that we can't recover from we assign it to this var and ship it with every AlreadyClosedException we throw */ private final TragicExceptionHolder tragedy; - /* A buffered outputstream what writes to the writers channel */ - private final OutputStream outputStream; /* the total offset of this file including the bytes written to the file as well as into the buffer */ private volatile long totalOffset; @@ -72,10 +80,14 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable { private final LongConsumer persistedSequenceNumberConsumer; protected final AtomicBoolean closed = new AtomicBoolean(false); - // lock order synchronized(syncLock) -> synchronized(this) + // lock order try(Releasable lock = writeLock.acquire()) -> synchronized(this) + private final ReleasableLock writeLock = new ReleasableLock(new ReentrantLock()); + // lock order synchronized(syncLock) -> try(Releasable lock = writeLock.acquire()) -> synchronized(this) private final Object syncLock = new Object(); - private LongArrayList nonFsyncedSequenceNumbers; + private final int forceWriteThreshold; + private final ArrayList bufferedOps = new ArrayList<>(); + private long bufferedBytes = 0L; private final Map> seenSequenceNumbers; @@ -96,11 +108,11 @@ private TranslogWriter( assert initialCheckpoint.offset == channel.position() : "initial checkpoint offset [" + initialCheckpoint.offset + "] is different than current channel position [" + channel.position() + "]"; + this.forceWriteThreshold = Math.toIntExact(bufferSize.getBytes()); this.shardId = shardId; this.checkpointChannel = checkpointChannel; this.checkpointPath = checkpointPath; this.minTranslogGenerationSupplier = minTranslogGenerationSupplier; - this.outputStream = new BufferedChannelOutputStream(java.nio.channels.Channels.newOutputStream(channel), bufferSize.bytesAsInt()); this.lastSyncedCheckpoint = initialCheckpoint; this.totalOffset = initialCheckpoint.offset; assert initialCheckpoint.minSeqNo == SequenceNumbers.NO_OPS_PERFORMED : initialCheckpoint.minSeqNo; @@ -109,7 +121,6 @@ private TranslogWriter( this.maxSeqNo = initialCheckpoint.maxSeqNo; assert initialCheckpoint.trimmedAboveSeqNo == SequenceNumbers.UNASSIGNED_SEQ_NO : initialCheckpoint.trimmedAboveSeqNo; this.globalCheckpointSupplier = globalCheckpointSupplier; - this.nonFsyncedSequenceNumbers = new LongArrayList(64); this.persistedSequenceNumberConsumer = persistedSequenceNumberConsumer; this.seenSequenceNumbers = Assertions.ENABLED ? new HashMap<>() : null; this.tragedy = tragedy; @@ -162,10 +173,6 @@ private synchronized void closeWithTragicEvent(final Exception ex) { } } - /** - * add the given bytes to the translog and return the location they were written at - */ - /** * Add the given bytes to the translog with the specified sequence number; returns the location the bytes were written to. * @@ -174,34 +181,35 @@ private synchronized void closeWithTragicEvent(final Exception ex) { * @return the location the bytes were written to * @throws IOException if writing to the translog resulted in an I/O exception */ - public synchronized Translog.Location add(final BytesReference data, final long seqNo) throws IOException { - ensureOpen(); - final long offset = totalOffset; - try { - data.writeTo(outputStream); - } catch (final Exception ex) { - closeWithTragicEvent(ex); - throw ex; - } - totalOffset += data.length(); + public Translog.Location add(final ReleasableBytesReference data, final long seqNo) throws IOException { + final Translog.Location location; + final long bytesBufferedAfterAdd; + synchronized (this) { + ensureOpen(); + final long offset = totalOffset; + totalOffset += data.length(); + bufferedBytes += data.length(); + bufferedOps.add(new Operation(seqNo, data.retain())); - if (minSeqNo == SequenceNumbers.NO_OPS_PERFORMED) { - assert operationCounter == 0; - } - if (maxSeqNo == SequenceNumbers.NO_OPS_PERFORMED) { - assert operationCounter == 0; - } + assert minSeqNo != SequenceNumbers.NO_OPS_PERFORMED || operationCounter == 0; + assert maxSeqNo != SequenceNumbers.NO_OPS_PERFORMED || operationCounter == 0; - minSeqNo = SequenceNumbers.min(minSeqNo, seqNo); - maxSeqNo = SequenceNumbers.max(maxSeqNo, seqNo); + minSeqNo = SequenceNumbers.min(minSeqNo, seqNo); + maxSeqNo = SequenceNumbers.max(maxSeqNo, seqNo); - nonFsyncedSequenceNumbers.add(seqNo); + operationCounter++; - operationCounter++; + assert assertNoSeqNumberConflict(seqNo, data); - assert assertNoSeqNumberConflict(seqNo, data); + location = new Translog.Location(generation, offset, data.length()); + bytesBufferedAfterAdd = bufferedBytes; + } + + if (bytesBufferedAfterAdd >= forceWriteThreshold) { + writeBufferedOps(Long.MAX_VALUE, bytesBufferedAfterAdd >= forceWriteThreshold * 4); + } - return new Translog.Location(generation, offset, data.length()); + return location; } private synchronized boolean assertNoSeqNumberConflict(long seqNo, BytesReference data) throws IOException { @@ -211,9 +219,9 @@ private synchronized boolean assertNoSeqNumberConflict(long seqNo, BytesReferenc final Tuple previous = seenSequenceNumbers.get(seqNo); if (previous.v1().equals(data) == false) { Translog.Operation newOp = Translog.readOperation( - new BufferedChecksumStreamInput(data.streamInput(), "assertion")); + new BufferedChecksumStreamInput(data.streamInput(), "assertion")); Translog.Operation prvOp = Translog.readOperation( - new BufferedChecksumStreamInput(previous.v1().streamInput(), "assertion")); + new BufferedChecksumStreamInput(previous.v1().streamInput(), "assertion")); // TODO: We haven't had timestamp for Index operations in Lucene yet, we need to loosen this check without timestamp. final boolean sameOp; if (newOp instanceof Translog.Index && prvOp instanceof Translog.Index) { @@ -250,7 +258,7 @@ synchronized boolean assertNoSeqAbove(long belowTerm, long aboveSeqNo) { final Translog.Operation op; try { op = Translog.readOperation( - new BufferedChecksumStreamInput(e.getValue().v1().streamInput(), "assertion")); + new BufferedChecksumStreamInput(e.getValue().v1().streamInput(), "assertion")); } catch (IOException ex) { throw new RuntimeException(ex); } @@ -309,28 +317,36 @@ public long sizeInBytes() { public TranslogReader closeIntoReader() throws IOException { // make sure to acquire the sync lock first, to prevent dead locks with threads calling // syncUpTo() , where the sync lock is acquired first, following by the synchronize(this) + // After the sync lock we acquire the write lock to avoid deadlocks with threads writing where + // the write lock is acquired first followed by synchronize(this). // // Note: While this is not strictly needed as this method is called while blocking all ops on the translog, // we do this to for correctness and preventing future issues. synchronized (syncLock) { - synchronized (this) { - try { - sync(); // sync before we close.. - } catch (final Exception ex) { - closeWithTragicEvent(ex); - throw ex; - } - if (closed.compareAndSet(false, true)) { + try (ReleasableLock toClose = writeLock.acquire()) { + synchronized (this) { try { - checkpointChannel.close(); + sync(); // sync before we close.. } catch (final Exception ex) { closeWithTragicEvent(ex); throw ex; } - return new TranslogReader(getLastSyncedCheckpoint(), channel, path, header); - } else { - throw new AlreadyClosedException("translog [" + getGeneration() + "] is already closed (path [" + path + "]", + // If we reached this point, all of the buffered ops should have been flushed successfully. + assert bufferedOps.size() == 0; + assert checkChannelPositionWhileHandlingException(totalOffset); + assert totalOffset == lastSyncedCheckpoint.offset; + if (closed.compareAndSet(false, true)) { + try { + checkpointChannel.close(); + } catch (final Exception ex) { + closeWithTragicEvent(ex); + throw ex; + } + return new TranslogReader(getLastSyncedCheckpoint(), channel, path, header); + } else { + throw new AlreadyClosedException("translog [" + getGeneration() + "] is already closed (path [" + path + "]", tragedy.get()); + } } } } @@ -341,15 +357,23 @@ public TranslogReader closeIntoReader() throws IOException { public TranslogSnapshot newSnapshot() { // make sure to acquire the sync lock first, to prevent dead locks with threads calling // syncUpTo() , where the sync lock is acquired first, following by the synchronize(this) + // After the sync lock we acquire the write lock to avoid deadlocks with threads writing where + // the write lock is acquired first followed by synchronize(this). synchronized (syncLock) { - synchronized (this) { - ensureOpen(); - try { - sync(); - } catch (IOException e) { - throw new TranslogException(shardId, "exception while syncing before creating a snapshot", e); + try (ReleasableLock toClose = writeLock.acquire()) { + synchronized (this) { + ensureOpen(); + try { + sync(); + } catch (IOException e) { + throw new TranslogException(shardId, "exception while syncing before creating a snapshot", e); + } + // If we reached this point, all of the buffered ops should have been flushed successfully. + assert bufferedOps.size() == 0; + assert checkChannelPositionWhileHandlingException(totalOffset); + assert totalOffset == lastSyncedCheckpoint.offset; + return super.newSnapshot(); } - return super.newSnapshot(); } } } @@ -371,13 +395,21 @@ final boolean syncUpTo(long offset) throws IOException { // the lock we should check again since if this code is busy we might have fsynced enough already final Checkpoint checkpointToSync; final LongArrayList flushedSequenceNumbers; - synchronized (this) { - ensureOpen(); - try { - outputStream.flush(); + final ArrayDeque toWrite; + try (ReleasableLock toClose = writeLock.acquire()) { + synchronized (this) { + ensureOpen(); checkpointToSync = getCheckpoint(); - flushedSequenceNumbers = nonFsyncedSequenceNumbers; - nonFsyncedSequenceNumbers = new LongArrayList(64); + toWrite = pollOpsToWrite(); + } + flushedSequenceNumbers = new LongArrayList(toWrite.size()); + for (Operation operation : toWrite) { + flushedSequenceNumbers.add(operation.seqNo); + } + + try { + // Write ops will release operations. + writeAndReleaseOps(toWrite); } catch (final Exception ex) { closeWithTragicEvent(ex); throw ex; @@ -403,19 +435,76 @@ final boolean syncUpTo(long offset) throws IOException { return false; } + private void writeBufferedOps(long offset, boolean blockOnExistingWriter) throws IOException { + try (ReleasableLock locked = blockOnExistingWriter ? writeLock.acquire() : writeLock.tryAcquire()) { + try { + if (locked != null && offset > getWrittenOffset()) { + writeAndReleaseOps(pollOpsToWrite()); + } + } catch (IOException e){ + closeWithTragicEvent(e); + throw e; + } + } + } + + private synchronized ArrayDeque pollOpsToWrite() { + ensureOpen(); + final ArrayDeque operationsToWrite = new ArrayDeque<>(bufferedOps.size()); + operationsToWrite.addAll(bufferedOps); + bufferedOps.clear(); + bufferedBytes = 0; + return operationsToWrite; + } + + private void writeAndReleaseOps(final ArrayDeque operationsToWrite) throws IOException { + try { + assert writeLock.isHeldByCurrentThread(); + ByteBuffer ioBuffer = DiskIoBufferPool.getIoBuffer(); + + Operation operation; + while ((operation = operationsToWrite.pollFirst()) != null) { + try (Releasable toClose = operation) { + BytesRefIterator iterator = operation.bytesReference.iterator(); + BytesRef current; + while ((current = iterator.next()) != null) { + int currentBytesConsumed = 0; + while (currentBytesConsumed != current.length) { + int nBytesToWrite = Math.min(current.length - currentBytesConsumed, ioBuffer.remaining()); + ioBuffer.put(current.bytes, current.offset + currentBytesConsumed, nBytesToWrite); + currentBytesConsumed += nBytesToWrite; + if (ioBuffer.hasRemaining() == false) { + ioBuffer.flip(); + writeToFile(ioBuffer); + ioBuffer.clear(); + } + } + } + } + } + ioBuffer.flip(); + writeToFile(ioBuffer); + } finally { + Releasables.close(operationsToWrite); + } + } + + @SuppressForbidden(reason = "Channel#write") + private void writeToFile(ByteBuffer ioBuffer) throws IOException { + while (ioBuffer.remaining() > 0) { + channel.write(ioBuffer); + } + } + @Override protected void readBytes(ByteBuffer targetBuffer, long position) throws IOException { try { if (position + targetBuffer.remaining() > getWrittenOffset()) { - synchronized (this) { - // we only flush here if it's really really needed - try to minimize the impact of the read operation - // in some cases ie. a tragic event we might still be able to read the relevant value - // which is not really important in production but some test can make most strict assumptions - // if we don't fail in this call unless absolutely necessary. - if (position + targetBuffer.remaining() > getWrittenOffset()) { - outputStream.flush(); - } - } + // we only flush here if it's really really needed - try to minimize the impact of the read operation + // in some cases ie. a tragic event we might still be able to read the relevant value + // which is not really important in production but some test can make most strict assumptions + // if we don't fail in this call unless absolutely necessary. + writeBufferedOps(position + targetBuffer.remaining(), true); } } catch (final Exception ex) { closeWithTragicEvent(ex); @@ -448,9 +537,21 @@ protected final void ensureOpen() { } } + private boolean checkChannelPositionWhileHandlingException(long expectedOffset) { + try { + return expectedOffset == channel.position(); + } catch (IOException e) { + return true; + } + } + @Override public final void close() throws IOException { if (closed.compareAndSet(false, true)) { + synchronized (this) { + Releasables.closeWhileHandlingException(bufferedOps); + bufferedOps.clear(); + } IOUtils.close(checkpointChannel, channel); } } @@ -459,32 +560,19 @@ protected final boolean isClosed() { return closed.get(); } + private static class Operation implements Releasable { - private final class BufferedChannelOutputStream extends BufferedOutputStream { + private final long seqNo; + private final ReleasableBytesReference bytesReference; - BufferedChannelOutputStream(OutputStream out, int size) throws IOException { - super(out, size); + private Operation(long seqNo, ReleasableBytesReference bytesReference) { + this.seqNo = seqNo; + this.bytesReference = bytesReference; } @Override - public synchronized void flush() throws IOException { - if (count > 0) { - try { - ensureOpen(); - super.flush(); - } catch (final Exception ex) { - closeWithTragicEvent(ex); - throw ex; - } - } - } - - @Override - public void close() throws IOException { - // the stream is intentionally not closed because - // closing it will close the FileChannel - throw new IllegalStateException("never close this stream"); + public void close() { + Releasables.closeWhileHandlingException(bytesReference); } } - } diff --git a/server/src/test/java/org/elasticsearch/index/translog/TranslogDeletionPolicyTests.java b/server/src/test/java/org/elasticsearch/index/translog/TranslogDeletionPolicyTests.java index dc1a72b20120f..c6deff8d58b78 100644 --- a/server/src/test/java/org/elasticsearch/index/translog/TranslogDeletionPolicyTests.java +++ b/server/src/test/java/org/elasticsearch/index/translog/TranslogDeletionPolicyTests.java @@ -22,6 +22,7 @@ import org.apache.lucene.store.ByteArrayDataOutput; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.common.bytes.ReleasableBytesReference; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.core.internal.io.IOUtils; @@ -96,7 +97,7 @@ private Tuple, TranslogWriter> createReadersAndWriter() thr for (int ops = randomIntBetween(0, 20); ops > 0; ops--) { out.reset(bytes); out.writeInt(ops); - writer.add(new BytesArray(bytes), ops); + writer.add(ReleasableBytesReference.wrap(new BytesArray(bytes)), ops); } } return new Tuple<>(readers, writer); diff --git a/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java b/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java index 42c253f977a29..4090bbd0f645c 100644 --- a/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java +++ b/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java @@ -42,6 +42,7 @@ import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.bytes.ReleasableBytesReference; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.io.FileSystemUtils; import org.elasticsearch.common.io.stream.BytesStreamOutput; @@ -128,6 +129,7 @@ import static org.elasticsearch.common.util.BigArrays.NON_RECYCLING_INSTANCE; import static org.elasticsearch.index.translog.SnapshotMatchers.containsOperationsInAnyOrder; import static org.hamcrest.CoreMatchers.hasItem; +import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.endsWith; @@ -238,12 +240,10 @@ private TranslogConfig getTranslogConfig(final Path path) { } private TranslogConfig getTranslogConfig(final Path path, final Settings settings) { - final ByteSizeValue bufferSize; - if (randomBoolean()) { - bufferSize = TranslogConfig.DEFAULT_BUFFER_SIZE; - } else { - bufferSize = new ByteSizeValue(10 + randomInt(128 * 1024), ByteSizeUnit.BYTES); - } + final ByteSizeValue bufferSize = randomFrom( + TranslogConfig.DEFAULT_BUFFER_SIZE, + new ByteSizeValue(8, ByteSizeUnit.KB), + new ByteSizeValue(10 + randomInt(128 * 1024), ByteSizeUnit.BYTES)); final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(shardId.getIndex(), settings); @@ -1254,12 +1254,11 @@ public void testTranslogWriter() throws IOException { final Set persistedSeqNos = new HashSet<>(); persistedSeqNoConsumer.set(persistedSeqNos::add); final int numOps = randomIntBetween(8, 128); - byte[] bytes = new byte[4]; - ByteArrayDataOutput out = new ByteArrayDataOutput(bytes); final Set seenSeqNos = new HashSet<>(); boolean opsHaveValidSequenceNumbers = randomBoolean(); for (int i = 0; i < numOps; i++) { - out.reset(bytes); + byte[] bytes = new byte[4]; + ByteArrayDataOutput out = new ByteArrayDataOutput(bytes); out.writeInt(i); long seqNo; do { @@ -1269,7 +1268,7 @@ public void testTranslogWriter() throws IOException { if (seqNo != SequenceNumbers.UNASSIGNED_SEQ_NO) { seenSeqNos.add(seqNo); } - writer.add(new BytesArray(bytes), seqNo); + writer.add(ReleasableBytesReference.wrap(new BytesArray(bytes)), seqNo); } assertThat(persistedSeqNos, empty()); writer.sync(); @@ -1290,9 +1289,10 @@ public void testTranslogWriter() throws IOException { assertThat(reader.getCheckpoint().minSeqNo, equalTo(minSeqNo)); assertThat(reader.getCheckpoint().maxSeqNo, equalTo(maxSeqNo)); - out.reset(bytes); + byte[] bytes = new byte[4]; + ByteArrayDataOutput out = new ByteArrayDataOutput(bytes); out.writeInt(2048); - writer.add(new BytesArray(bytes), randomNonNegativeLong()); + writer.add(ReleasableBytesReference.wrap(new BytesArray(bytes)), randomNonNegativeLong()); if (reader instanceof TranslogReader) { ByteBuffer buffer = ByteBuffer.allocate(4); @@ -1315,15 +1315,115 @@ public void testTranslogWriter() throws IOException { IOUtils.close(writer); } + public void testTranslogWriterDoesNotBlockAddsOnWrite() throws IOException, InterruptedException { + Path tempDir = createTempDir(); + final TranslogConfig config = getTranslogConfig(tempDir); + final AtomicBoolean startBlocking = new AtomicBoolean(false); + final CountDownLatch writeStarted = new CountDownLatch(1); + final CountDownLatch blocker = new CountDownLatch(1); + final Set persistedSeqNos = new HashSet<>(); + + final ChannelFactory channelFactory = (file, openOption) -> { + FileChannel delegate = FileChannel.open(file, openOption); + boolean success = false; + try { + // don't do partial writes for checkpoints we rely on the fact that the bytes are written as an atomic operation + final boolean isCkpFile = file.getFileName().toString().endsWith(".ckp"); + + final FileChannel channel; + if (isCkpFile) { + channel = delegate; + } else { + channel = new FilterFileChannel(delegate) { + + @Override + public int write(ByteBuffer src) throws IOException { + if (startBlocking.get()) { + if (writeStarted.getCount() > 0) { + writeStarted.countDown(); + } + try { + blocker.await(); + } catch (InterruptedException e) { + // Ignore + } + } + return super.write(src); + } + + @Override + public void force(boolean metaData) throws IOException { + if (startBlocking.get()) { + if (writeStarted.getCount() > 0) { + writeStarted.countDown(); + } + try { + blocker.await(); + } catch (InterruptedException e) { + // Ignore + } + } + super.force(metaData); + } + }; + } + success = true; + return channel; + } finally { + if (success == false) { + IOUtils.closeWhileHandlingException(delegate); + } + } + }; + String translogUUID = Translog.createEmptyTranslog( + config.getTranslogPath(), SequenceNumbers.NO_OPS_PERFORMED, shardId, channelFactory, primaryTerm.get()); + + try (Translog translog = new Translog(config, translogUUID, new TranslogDeletionPolicy(), + () -> SequenceNumbers.NO_OPS_PERFORMED, primaryTerm::get, persistedSeqNos::add) { + @Override + ChannelFactory getChannelFactory() { + return channelFactory; + } + }) { + try (TranslogWriter writer = translog.createWriter(translog.currentFileGeneration() + 1)) { + byte[] bytes = new byte[4]; + ByteArrayDataOutput out = new ByteArrayDataOutput(new byte[4]); + out.writeInt(1); + writer.add(ReleasableBytesReference.wrap(new BytesArray(bytes)), 1); + assertThat(persistedSeqNos, empty()); + startBlocking.set(true); + Thread thread = new Thread(() -> { + try { + writer.sync(); + } catch (IOException e) { + throw new AssertionError(e); + } + }); + thread.start(); + writeStarted.await(); + + // Add will not block even though we are currently writing/syncing + writer.add(ReleasableBytesReference.wrap(new BytesArray(bytes)), 2); + + blocker.countDown(); + // Sync against so that both operations are written + writer.sync(); + + assertThat(persistedSeqNos, contains(1L, 2L)); + thread.join(); + } + } + } + public void testCloseIntoReader() throws IOException { try (TranslogWriter writer = translog.createWriter(translog.currentFileGeneration() + 1)) { final int numOps = randomIntBetween(8, 128); - final byte[] bytes = new byte[4]; - final ByteArrayDataOutput out = new ByteArrayDataOutput(bytes); for (int i = 0; i < numOps; i++) { + final byte[] bytes = new byte[4]; + final ByteArrayDataOutput out = new ByteArrayDataOutput(bytes); out.reset(bytes); out.writeInt(i); - writer.add(new BytesArray(bytes), randomNonNegativeLong()); + writer.add(ReleasableBytesReference.wrap(new BytesArray(bytes)), randomNonNegativeLong()); } writer.sync(); final Checkpoint writerCheckpoint = writer.getCheckpoint();