Skip to content

Commit

Permalink
Added submitAsync to ClosingFuture.
Browse files Browse the repository at this point in the history
RELNOTES="Added `submitAsync(AsyncClosingCallable)` to `ClosingFuture`."

-------------
Created by MOE: https://github.com/google/moe
MOE_MIGRATED_REVID=341474622
  • Loading branch information
wanyingd1996 authored and cgdecker committed Nov 10, 2020
1 parent 3455264 commit c5e2d8d
Show file tree
Hide file tree
Showing 4 changed files with 259 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import com.google.common.reflect.Reflection;
import com.google.common.truth.FailureStrategy;
import com.google.common.truth.StandardSubjectBuilder;
import com.google.common.util.concurrent.ClosingFuture.AsyncClosingCallable;
import com.google.common.util.concurrent.ClosingFuture.AsyncClosingFunction;
import com.google.common.util.concurrent.ClosingFuture.ClosingCallable;
import com.google.common.util.concurrent.ClosingFuture.ClosingFunction;
Expand Down Expand Up @@ -267,6 +268,76 @@ public Object call(DeferredCloser closer) throws Exception {
assertClosed(closeable1, closeable2);
}

public void testSubmitAsync() throws Exception {
ClosingFuture<TestCloseable> closingFuture =
ClosingFuture.submitAsync(
new AsyncClosingCallable<TestCloseable>() {
@Override
public ClosingFuture<TestCloseable> call(DeferredCloser closer) {
closer.eventuallyClose(closeable1, closingExecutor);
return ClosingFuture.submit(
new ClosingCallable<TestCloseable>() {
@Override
public TestCloseable call(DeferredCloser deferredCloser) throws Exception {
return closeable2;
}
},
directExecutor());
}
},
executor);
assertThat(getFinalValue(closingFuture)).isSameInstanceAs(closeable2);
waitUntilClosed(closingFuture);
assertClosed(closeable1);
assertStillOpen(closeable2);
}

public void testSubmitAsync_cancelledPipeline() throws Exception {
ClosingFuture<TestCloseable> closingFuture =
ClosingFuture.submitAsync(
waiter.waitFor(
new AsyncClosingCallable<TestCloseable>() {
@Override
public ClosingFuture<TestCloseable> call(DeferredCloser closer) throws Exception {
awaitUninterruptibly(futureCancelled);
closer.eventuallyClose(closeable1, closingExecutor);
closer.eventuallyClose(closeable2, closingExecutor);
return ClosingFuture.submit(
new ClosingCallable<TestCloseable>() {
@Override
public TestCloseable call(DeferredCloser deferredCloser)
throws Exception {
deferredCloser.eventuallyClose(closeable3, closingExecutor);
return closeable3;
}
},
directExecutor());
}
}),
executor);
waiter.awaitStarted();
cancelFinalStepAndWait(closingFuture);
waiter.awaitReturned();
assertClosed(closeable1, closeable2, closeable3);
}

public void testSubmitAsync_throws() throws Exception {
ClosingFuture<Object> closingFuture =
ClosingFuture.submitAsync(
new AsyncClosingCallable<Object>() {
@Override
public ClosingFuture<Object> call(DeferredCloser closer) throws Exception {
closer.eventuallyClose(closeable1, closingExecutor);
closer.eventuallyClose(closeable2, closingExecutor);
throw exception;
}
},
executor);
assertFinallyFailsWithException(closingFuture);
waitUntilClosed(closingFuture);
assertClosed(closeable1, closeable2);
}

public void testStatusFuture() throws Exception {
ClosingFuture<String> closingFuture =
ClosingFuture.submit(
Expand Down Expand Up @@ -1837,6 +1908,10 @@ <V> ClosingCallable<V> waitFor(ClosingCallable<V> closingCallable) {
return waitFor(closingCallable, ClosingCallable.class);
}

<V> AsyncClosingCallable<V> waitFor(AsyncClosingCallable<V> asyncClosingCallable) {
return waitFor(asyncClosingCallable, AsyncClosingCallable.class);
}

<T, U> ClosingFunction<T, U> waitFor(ClosingFunction<T, U> closingFunction) {
return waitFor(closingFunction, ClosingFunction.class);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,23 @@ public interface ClosingCallable<V extends Object> {
V call(DeferredCloser closer) throws Exception;
}

/**
* An operation that computes a {@link ClosingFuture} of a result.
*
* @param <V> the type of the result
* @since NEXT
*/
public interface AsyncClosingCallable<V extends Object> {
/**
* Computes a result, or throws an exception if unable to do so.
*
* <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable, Executor)
* closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline is done (but
* not before this method completes), even if this method throws or the pipeline is cancelled.
*/
ClosingFuture<V> call(DeferredCloser closer) throws Exception;
}

/**
* A function from an input to a result.
*
Expand Down Expand Up @@ -367,7 +384,17 @@ public static <V> ClosingFuture<V> submit(ClosingCallable<V> callable, Executor
return new ClosingFuture<>(callable, executor);
}

// TODO(dpb, cpovirk): Do we need submitAsync?
/**
* Starts a {@link ClosingFuture} pipeline by submitting a callable block to an executor.
*
* @throws java.util.concurrent.RejectedExecutionException if the task cannot be scheduled for
* execution
* @since NEXT
*/
public static <V> ClosingFuture<V> submitAsync(
AsyncClosingCallable<V> callable, Executor executor) {
return new ClosingFuture<>(callable, executor);
}

/**
* Starts a {@link ClosingFuture} pipeline with a {@link ListenableFuture}.
Expand Down Expand Up @@ -568,6 +595,32 @@ public String toString() {
this.future = task;
}

private ClosingFuture(final AsyncClosingCallable<V> callable, Executor executor) {
checkNotNull(callable);
TrustedListenableFutureTask<V> task =
TrustedListenableFutureTask.create(
new AsyncCallable<V>() {
@Override
public ListenableFuture<V> call() throws Exception {
CloseableList newCloseables = new CloseableList();
try {
ClosingFuture<V> closingFuture = callable.call(newCloseables.closer);
closingFuture.becomeSubsumedInto(closeables);
return closingFuture.future;
} finally {
closeables.add(newCloseables, directExecutor());
}
}

@Override
public String toString() {
return callable.toString();
}
});
executor.execute(task);
this.future = task;
}

/**
* Returns a future that finishes when this step does. Calling {@code get()} on the returned
* future returns {@code null} if the step is successful or throws the same exception that would
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import com.google.common.reflect.Reflection;
import com.google.common.truth.FailureStrategy;
import com.google.common.truth.StandardSubjectBuilder;
import com.google.common.util.concurrent.ClosingFuture.AsyncClosingCallable;
import com.google.common.util.concurrent.ClosingFuture.AsyncClosingFunction;
import com.google.common.util.concurrent.ClosingFuture.ClosingCallable;
import com.google.common.util.concurrent.ClosingFuture.ClosingFunction;
Expand Down Expand Up @@ -267,6 +268,76 @@ public Object call(DeferredCloser closer) throws Exception {
assertClosed(closeable1, closeable2);
}

public void testSubmitAsync() throws Exception {
ClosingFuture<TestCloseable> closingFuture =
ClosingFuture.submitAsync(
new AsyncClosingCallable<TestCloseable>() {
@Override
public ClosingFuture<TestCloseable> call(DeferredCloser closer) {
closer.eventuallyClose(closeable1, closingExecutor);
return ClosingFuture.submit(
new ClosingCallable<TestCloseable>() {
@Override
public TestCloseable call(DeferredCloser deferredCloser) throws Exception {
return closeable2;
}
},
directExecutor());
}
},
executor);
assertThat(getFinalValue(closingFuture)).isSameInstanceAs(closeable2);
waitUntilClosed(closingFuture);
assertClosed(closeable1);
assertStillOpen(closeable2);
}

public void testSubmitAsync_cancelledPipeline() throws Exception {
ClosingFuture<TestCloseable> closingFuture =
ClosingFuture.submitAsync(
waiter.waitFor(
new AsyncClosingCallable<TestCloseable>() {
@Override
public ClosingFuture<TestCloseable> call(DeferredCloser closer) throws Exception {
awaitUninterruptibly(futureCancelled);
closer.eventuallyClose(closeable1, closingExecutor);
closer.eventuallyClose(closeable2, closingExecutor);
return ClosingFuture.submit(
new ClosingCallable<TestCloseable>() {
@Override
public TestCloseable call(DeferredCloser deferredCloser)
throws Exception {
deferredCloser.eventuallyClose(closeable3, closingExecutor);
return closeable3;
}
},
directExecutor());
}
}),
executor);
waiter.awaitStarted();
cancelFinalStepAndWait(closingFuture);
waiter.awaitReturned();
assertClosed(closeable1, closeable2, closeable3);
}

public void testSubmitAsync_throws() throws Exception {
ClosingFuture<Object> closingFuture =
ClosingFuture.submitAsync(
new AsyncClosingCallable<Object>() {
@Override
public ClosingFuture<Object> call(DeferredCloser closer) throws Exception {
closer.eventuallyClose(closeable1, closingExecutor);
closer.eventuallyClose(closeable2, closingExecutor);
throw exception;
}
},
executor);
assertFinallyFailsWithException(closingFuture);
waitUntilClosed(closingFuture);
assertClosed(closeable1, closeable2);
}

public void testAutoCloseable() throws Exception {
AutoCloseable autoCloseable = closeable1::close;
ClosingFuture<String> closingFuture =
Expand Down Expand Up @@ -1854,6 +1925,10 @@ <V> ClosingCallable<V> waitFor(ClosingCallable<V> closingCallable) {
return waitFor(closingCallable, ClosingCallable.class);
}

<V> AsyncClosingCallable<V> waitFor(AsyncClosingCallable<V> asyncClosingCallable) {
return waitFor(asyncClosingCallable, AsyncClosingCallable.class);
}

<T, U> ClosingFunction<T, U> waitFor(ClosingFunction<T, U> closingFunction) {
return waitFor(closingFunction, ClosingFunction.class);
}
Expand Down
56 changes: 55 additions & 1 deletion guava/src/com/google/common/util/concurrent/ClosingFuture.java
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,24 @@ public interface ClosingCallable<V extends @Nullable Object> {
V call(DeferredCloser closer) throws Exception;
}

/**
* An operation that computes a {@link ClosingFuture} of a result.
*
* @param <V> the type of the result
* @since NEXT
*/
@FunctionalInterface
public interface AsyncClosingCallable<V extends @Nullable Object> {
/**
* Computes a result, or throws an exception if unable to do so.
*
* <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable, Executor)
* closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline is done (but
* not before this method completes), even if this method throws or the pipeline is cancelled.
*/
ClosingFuture<V> call(DeferredCloser closer) throws Exception;
}

/**
* A function from an input to a result.
*
Expand Down Expand Up @@ -366,7 +384,17 @@ public static <V> ClosingFuture<V> submit(ClosingCallable<V> callable, Executor
return new ClosingFuture<>(callable, executor);
}

// TODO(dpb, cpovirk): Do we need submitAsync?
/**
* Starts a {@link ClosingFuture} pipeline by submitting a callable block to an executor.
*
* @throws java.util.concurrent.RejectedExecutionException if the task cannot be scheduled for
* execution
* @since NEXT
*/
public static <V> ClosingFuture<V> submitAsync(
AsyncClosingCallable<V> callable, Executor executor) {
return new ClosingFuture<>(callable, executor);
}

/**
* Starts a {@link ClosingFuture} pipeline with a {@link ListenableFuture}.
Expand Down Expand Up @@ -567,6 +595,32 @@ public String toString() {
this.future = task;
}

private ClosingFuture(final AsyncClosingCallable<V> callable, Executor executor) {
checkNotNull(callable);
TrustedListenableFutureTask<V> task =
TrustedListenableFutureTask.create(
new AsyncCallable<V>() {
@Override
public ListenableFuture<V> call() throws Exception {
CloseableList newCloseables = new CloseableList();
try {
ClosingFuture<V> closingFuture = callable.call(newCloseables.closer);
closingFuture.becomeSubsumedInto(closeables);
return closingFuture.future;
} finally {
closeables.add(newCloseables, directExecutor());
}
}

@Override
public String toString() {
return callable.toString();
}
});
executor.execute(task);
this.future = task;
}

/**
* Returns a future that finishes when this step does. Calling {@code get()} on the returned
* future returns {@code null} if the step is successful or throws the same exception that would
Expand Down

0 comments on commit c5e2d8d

Please sign in to comment.