Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: add BufferHandlePool #2220

Merged
merged 1 commit into from
Sep 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
/*
* Copyright 2023 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.storage;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;

import com.google.common.annotations.VisibleForTesting;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.IntStream;

interface BufferHandlePool {

PooledBuffer getBuffer();

void returnBuffer(PooledBuffer handle);

static BufferHandlePool simple(int capacity) {
return new SimpleBufferHandlePool(capacity);
}

static BufferHandlePool fixedPool(int bufferCount, int bufferCapacity) {
return FixedBufferHandlePool.of(bufferCount, bufferCapacity);
}

final class SimpleBufferHandlePool implements BufferHandlePool {
private final int capacity;

private SimpleBufferHandlePool(int capacity) {
this.capacity = capacity;
}

@Override
public PooledBuffer getBuffer() {
return PooledBuffer.of(BufferHandle.allocate(capacity));
}

@Override
public void returnBuffer(PooledBuffer handle) {
// noop
}
}

/**
* Specialized and simplified {@link java.util.concurrent.BlockingQueue}. We don't need the
* majority of methods/functionality just blocking put/get.
*
* <p>Inspired by the BoundedBuffer example from the class javadocs of {@link Condition} (java8)
*/
final class FixedBufferHandlePool implements BufferHandlePool {
@VisibleForTesting final HashSet<PooledBuffer> pool;
private final int poolMaxSize;

private final ReentrantLock lock;
private final Condition notEmpty;
private final Condition notFull;

@VisibleForTesting
FixedBufferHandlePool(HashSet<PooledBuffer> pool) {
checkArgument(!pool.isEmpty(), "provided pool bust not start empty");
this.pool = pool;
this.poolMaxSize = pool.size();

this.lock = new ReentrantLock();
this.notEmpty = lock.newCondition();
this.notFull = lock.newCondition();
}

@Override
public PooledBuffer getBuffer() {
try (AcquiredLock ignore = AcquiredLock.lock(this.lock)) {
while (pool.isEmpty()) {
notEmpty.awaitUninterruptibly();
}
return dequeue();
}
}

@Override
public void returnBuffer(PooledBuffer handle) {
checkNotNull(handle, "handle must be non null");
try (AcquiredLock ignore = AcquiredLock.lock(this.lock)) {
if (pool.contains(handle)) {
return;
}
while (poolMaxSize == pool.size()) {
notFull.awaitUninterruptibly();
}
enqueue(handle);
}
}

private void enqueue(PooledBuffer pooled) {
pooled.getBufferHandle().get().clear();
pool.add(pooled);
notEmpty.signal();
}

private PooledBuffer dequeue() {
Iterator<PooledBuffer> iterator = pool.iterator();
checkState(iterator.hasNext(), "attempt to acquire pooled buffer failed");
PooledBuffer poll = iterator.next();
iterator.remove();
notFull.signal();
return poll;
}

@VisibleForTesting
static FixedBufferHandlePool of(int bufferCount, int bufferCapacity) {
// explicitly collect to a HashSet
HashSet<PooledBuffer> buffers =
IntStream.range(0, bufferCount)
.mapToObj(i -> BufferHandle.allocate(bufferCapacity))
.map(PooledBuffer::of)
.collect(HashSet::new, Set::add, HashSet::addAll);
return new FixedBufferHandlePool(buffers);
}
}

final class PooledBuffer {
private final BufferHandle bufferHandle;

private PooledBuffer(BufferHandle bufferHandle) {
this.bufferHandle = bufferHandle;
}

BufferHandle getBufferHandle() {
return bufferHandle;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof PooledBuffer)) {
return false;
}
PooledBuffer that = (PooledBuffer) o;
return Objects.equals(System.identityHashCode(this), System.identityHashCode(that));
}

@Override
public int hashCode() {
return System.identityHashCode(this);
}

@VisibleForTesting
static PooledBuffer of(BufferHandle bufferHandle) {
return new PooledBuffer(bufferHandle);
}
}

final class AcquiredLock implements AutoCloseable {
private final ReentrantLock lock;

private AcquiredLock(ReentrantLock lock) {
this.lock = lock;
lock.lock();
}

@Override
public void close() {
lock.unlock();
}

private static AcquiredLock lock(ReentrantLock lock) {
return new AcquiredLock(lock);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Copyright 2023 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.storage;

import static com.google.common.collect.Sets.newHashSet;
import static com.google.common.truth.Truth.assertThat;
import static org.junit.Assert.assertThrows;

import com.google.cloud.storage.BufferHandlePool.FixedBufferHandlePool;
import com.google.cloud.storage.BufferHandlePool.PooledBuffer;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.HashSet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;

public final class BufferHandlePoolTest {

private static ExecutorService exec;

@BeforeClass
public static void beforeClass() {
ThreadFactory threadFactory =
new ThreadFactoryBuilder().setDaemon(true).setNameFormat("test-bhmt-%d").build();
exec = Executors.newCachedThreadPool(threadFactory);
}

@AfterClass
public static void afterClass() {
if (exec != null) {
exec.shutdownNow();
}
}

@Test
public void fixedPool_doesNotAllowTheSameBufferToBeReturnedWhilePresent() {
BufferHandle b1 = BufferHandle.allocate(10);
BufferHandle b2 = BufferHandle.allocate(10);

PooledBuffer p1 = PooledBuffer.of(b1);
PooledBuffer p2 = PooledBuffer.of(b2);
HashSet<PooledBuffer> pooledBuffers = newHashSet(p1, p2);
FixedBufferHandlePool pool = new FixedBufferHandlePool(pooledBuffers);

PooledBuffer g1 = pool.getBuffer();
PooledBuffer g2 = pool.getBuffer();

pool.returnBuffer(g1);
pool.returnBuffer(g1);

assertThat(pool.pool).isEqualTo(newHashSet(g1));
}

@Test
public void fixedPool_getBuffer_blocksIfEmpty() {
FixedBufferHandlePool pool = FixedBufferHandlePool.of(1, 10);
PooledBuffer p1 = pool.getBuffer();

Future<PooledBuffer> f = exec.submit(pool::getBuffer);
assertThrows(TimeoutException.class, () -> f.get(10, TimeUnit.MILLISECONDS));
}

@Test
public void fixedPool_returnBuffer_blocksIfFull() {
FixedBufferHandlePool pool = FixedBufferHandlePool.of(1, 10);

PooledBuffer imposter = PooledBuffer.of(BufferHandle.allocate(5));
Future<Void> f =
exec.submit(
() -> {
pool.returnBuffer(imposter);
return null;
});
assertThrows(TimeoutException.class, () -> f.get(10, TimeUnit.MILLISECONDS));
}
}