Skip to content

Commit

Permalink
Improvements to the CrashlyticsWorker (#6143)
Browse files Browse the repository at this point in the history
Added a method to submit a task followed by a continuation to the
worker. This is useful for making a continuation happen right after the
submitted task, even if other tasks were submitted to the worker in the
meantime. This will be useful for fetching settings from a network
executor, then continuing on the common worker to parse the settings
json.

Also added a method to race two tasks on the worker. This is useful for
things like waiting for an for explicit data collection enable call, or
automatic data collection being enabled.

Both methods have their behaviour fully documented in the javadoc and
tested in unit tests.
  • Loading branch information
mrober authored Aug 1, 2024
1 parent d0fea0e commit 8b832df
Show file tree
Hide file tree
Showing 2 changed files with 319 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static org.junit.Assert.assertThrows;

import com.google.android.gms.tasks.Task;
import com.google.android.gms.tasks.TaskCompletionSource;
import com.google.android.gms.tasks.Tasks;
import com.google.firebase.concurrent.TestOnlyExecutors;
import java.io.IOException;
Expand Down Expand Up @@ -421,6 +422,258 @@ public void submitTaskWhenThreadPoolFull() {
assertThrows(TimeoutException.class, () -> Tasks.await(task, 30, TimeUnit.MILLISECONDS));
}

@Test
public void submitTaskThatReturnsWithContinuation() throws Exception {
Task<String> result =
crashlyticsWorker.submitTask(
() -> Tasks.forResult(1337),
task -> Tasks.forResult(Integer.toString(task.getResult())));

assertThat(Tasks.await(result)).isEqualTo("1337");
}

@Test
public void submitTaskThatThrowsWithContinuation() throws Exception {
Task<String> result =
crashlyticsWorker.submitTask(
() -> Tasks.forException(new IndexOutOfBoundsException("Sometimes we look too far.")),
task -> {
if (task.getException() != null) {
return Tasks.forResult("Task threw.");
}
return Tasks.forResult("I dunno how I got here?");
});

assertThat(Tasks.await(result)).isEqualTo("Task threw.");
}

@Test
public void submitTaskWithContinuationThatThrows() throws Exception {
Task<String> result =
crashlyticsWorker.submitTask(
() -> Tasks.forResult(7), task -> Tasks.forException(new IOException()));

ExecutionException thrown = assertThrows(ExecutionException.class, () -> Tasks.await(result));

assertThat(thrown).hasCauseThat().isInstanceOf(IOException.class);

// Verify the worker still executes tasks after the continuation threw.
assertThat(Tasks.await(crashlyticsWorker.submit(() -> 42))).isEqualTo(42);
}

@Test
public void submitTaskThatCancelsWithContinuation() throws Exception {
Task<String> result =
crashlyticsWorker.submitTask(
Tasks::forCanceled,
task -> Tasks.forResult(task.isCanceled() ? "Task cancelled." : "What?"));

assertThat(Tasks.await(result)).isEqualTo("Task cancelled.");
}

@Test
public void submitTaskWithContinuationThatCancels() throws Exception {
Task<String> result =
crashlyticsWorker.submitTask(() -> Tasks.forResult(7), task -> Tasks.forCanceled());

assertThrows(CancellationException.class, () -> Tasks.await(result));

// Verify the worker still executes tasks after the continuation was cancelled.
assertThat(Tasks.await(crashlyticsWorker.submit(() -> "jk"))).isEqualTo("jk");
}

@Test
public void submitTaskWithContinuationExecutesInOrder() throws Exception {
// The integers added to the list represent the order they should be executed in.
List<Integer> list = new ArrayList<>();

// Start the chain which adds 1, then kicks off tasks to add 6 & 7 later, but adds 2 before
// executing the newly added tasks in the continuation.
crashlyticsWorker.submitTask(
() -> {
list.add(1);

// Sleep to give time for the tasks 3, 4, 5 to be submitted.
sleep(300);

// We added the 1 and will add 2 in the continuation. And 3, 4, 5 have been submitted.
crashlyticsWorker.submit(() -> list.add(6));
crashlyticsWorker.submit(() -> list.add(7));

return Tasks.forResult(1);
},
task -> {
// When the task 1 completes the next number to add is 2. Because all the other tasks are
// just submitted, not executed yet.
list.add(2);
return Tasks.forResult("a");
});

// Submit tasks to add 3, 4, 5 since we just added 1 and know a continuation will add the 2.
crashlyticsWorker.submit(() -> list.add(3));
crashlyticsWorker.submit(() -> list.add(4));
crashlyticsWorker.submit(() -> list.add(5));

crashlyticsWorker.await();

// Verify the list is complete and in order.
assertThat(list).isInOrder();
assertThat(list).hasSize(7);
}

@Test
public void raceReturnsFirstResult() throws Exception {
// Create 2 tasks on different workers to race.
Task<String> task1 =
new CrashlyticsWorker(TestOnlyExecutors.background())
.submit(
() -> {
sleep(200);
return "first";
});
Task<String> task2 =
new CrashlyticsWorker(TestOnlyExecutors.background())
.submit(
() -> {
sleep(400);
return "slow";
});

Task<String> task = crashlyticsWorker.race(task1, task2);
String result = Tasks.await(task);

assertThat(result).isEqualTo("first");
}

@Test
public void raceReturnsFirstException() {
// Create 2 tasks on different workers to race.
Task<String> task1 =
new CrashlyticsWorker(TestOnlyExecutors.background())
.submitTask(
() -> {
sleep(200);
return Tasks.forException(new ArithmeticException());
});
Task<String> task2 =
new CrashlyticsWorker(TestOnlyExecutors.background())
.submitTask(
() -> {
sleep(400);
return Tasks.forException(new IllegalStateException());
});

Task<String> task = crashlyticsWorker.race(task1, task2);
ExecutionException thrown = assertThrows(ExecutionException.class, () -> Tasks.await(task));

// The first task throws an ArithmeticException.
assertThat(thrown).hasCauseThat().isInstanceOf(ArithmeticException.class);
}

@Test
public void raceFirstCancelsReturnsSecondResult() throws Exception {
// Create 2 tasks on different workers to race.
Task<String> task1 =
new CrashlyticsWorker(TestOnlyExecutors.background())
.submitTask(
() -> {
sleep(200);
return Tasks.forCanceled();
});
Task<String> task2 =
new CrashlyticsWorker(TestOnlyExecutors.background())
.submitTask(
() -> {
sleep(400);
return Tasks.forResult("I am slow but didn't cancel.");
});

Task<String> task = crashlyticsWorker.race(task1, task2);
String result = Tasks.await(task);

assertThat(result).isEqualTo("I am slow but didn't cancel.");
}

@Test
public void raceBothCancel() {
// Create 2 tasks on different workers to race.
Task<String> task1 =
new CrashlyticsWorker(TestOnlyExecutors.background())
.submitTask(
() -> {
sleep(200);
return Tasks.forCanceled();
});
Task<String> task2 =
new CrashlyticsWorker(TestOnlyExecutors.background())
.submitTask(
() -> {
sleep(400);
return Tasks.forCanceled();
});

Task<String> task = crashlyticsWorker.race(task1, task2);

// Both cancelled, so cancel the race result.
assertThrows(CancellationException.class, () -> Tasks.await(task));
}

@Test
public void raceTasksOnSameWorker() throws Exception {
// Create 2 tasks on this worker to race.
Task<String> task1 =
crashlyticsWorker.submit(
() -> {
sleep(200);
return "first";
});
Task<String> task2 =
crashlyticsWorker.submit(
() -> {
sleep(300);
return "second";
});

Task<String> task = crashlyticsWorker.race(task1, task2);
String result = Tasks.await(task);

// The first task is submitted to this worker first, so will always be first.
assertThat(result).isEqualTo("first");
}

@Test
public void raceTaskOneOnSameWorkerAnotherNeverCompletes() throws Exception {
// Create a task on this worker, and another that never completes, to race.
Task<String> task1 = crashlyticsWorker.submit(() -> "first");
Task<String> task2 = new TaskCompletionSource<String>().getTask();

Task<String> task = crashlyticsWorker.race(task1, task2);
String result = Tasks.await(task);

assertThat(result).isEqualTo("first");
}

@Test
public void raceTaskOneOnSameWorkerAnotherOtherThatCompletesFirst() throws Exception {
// Add a decoy task to the worker to take up some time.
crashlyticsWorker.submitTask(
() -> {
sleep(200);
return Tasks.forResult(null);
});

// Create a task on this worker, and another, to race.
Task<String> task1 = crashlyticsWorker.submit(() -> "same worker");
TaskCompletionSource<String> task2 = new TaskCompletionSource<>();
task2.trySetResult("other");

Task<String> task = crashlyticsWorker.race(task1, task2.getTask());
String result = Tasks.await(task);

// The other tasks completes first because the first task is queued up later on the worker.
assertThat(result).isEqualTo("other");
}

private static void sleep(long millis) {
try {
Thread.sleep(millis);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,18 @@
package com.google.firebase.crashlytics.internal;

import androidx.annotation.VisibleForTesting;
import com.google.android.gms.tasks.CancellationTokenSource;
import com.google.android.gms.tasks.Continuation;
import com.google.android.gms.tasks.Task;
import com.google.android.gms.tasks.TaskCompletionSource;
import com.google.android.gms.tasks.Tasks;
import com.google.errorprone.annotations.CanIgnoreReturnValue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;

/**
* Helper for executing tasks sequentially on the given executor service.
Expand Down Expand Up @@ -57,9 +62,12 @@ public ExecutorService getExecutor() {
/**
* Submits a <code>Callable</code> task for asynchronous execution on the executor.
*
* <p>A blocking callable will block an underlying thread.
*
* <p>Returns a <code>Task</code> which will be resolved upon successful completion of the
* callable, or throws an <code>ExecutionException</code> if the callable throws an exception.
*/
@CanIgnoreReturnValue
public <T> Task<T> submit(Callable<T> callable) {
synchronized (tailLock) {
// Do not propagate a cancellation.
Expand All @@ -76,9 +84,12 @@ public <T> Task<T> submit(Callable<T> callable) {
/**
* Submits a <code>Runnable</code> task for asynchronous execution on the executor.
*
* <p>A blocking runnable will block an underlying thread.
*
* <p>Returns a <code>Task</code> which will be resolved with null upon successful completion of
* the runnable, or throws an <code>ExecutionException</code> if the runnable throws an exception.
*/
@CanIgnoreReturnValue
public Task<Void> submit(Runnable runnable) {
synchronized (tailLock) {
// Do not propagate a cancellation.
Expand Down Expand Up @@ -108,6 +119,7 @@ public Task<Void> submit(Runnable runnable) {
* returned by the callable, throws an <code>ExecutionException</code> if the callable throws an
* exception, or throws a <code>CancellationException</code> if the task is cancelled.
*/
@CanIgnoreReturnValue
public <T> Task<T> submitTask(Callable<Task<T>> callable) {
synchronized (tailLock) {
// Chain the new callable task onto the queue's tail, regardless of cancellation.
Expand All @@ -117,6 +129,60 @@ public <T> Task<T> submitTask(Callable<Task<T>> callable) {
}
}

/**
* Submits a <code>Callable</code> <code>Task</code> followed by a <code>Continuation</code> for
* asynchronous execution on the executor.
*
* <p>This is useful for submitting a task that must be immediately followed by another task,
* regardless of more tasks being submitted in parallel. For example, settings.
*
* <p>Returns a <code>Task</code> which will be resolved upon successful completion of the Task
* returned by the callable and continued by the continuation, throws an <code>ExecutionException
* </code> if either task throws an exception, or throws a <code>CancellationException</code> if
* either task is cancelled.
*/
@CanIgnoreReturnValue
public <T, R> Task<R> submitTask(
Callable<Task<T>> callable, Continuation<T, Task<R>> continuation) {
synchronized (tailLock) {
// Chain the new callable task and continuation onto the queue's tail.
Task<R> result =
tail.continueWithTask(executor, task -> callable.call())
.continueWithTask(executor, continuation);
tail = result;
return result;
}
}

/**
* Returns a task that is resolved when either of the given tasks is resolved.
*
* <p>When both tasks are cancelled, the returned task will be cancelled.
*/
public <T> Task<T> race(Task<T> task1, Task<T> task2) {
CancellationTokenSource cancellation = new CancellationTokenSource();
TaskCompletionSource<T> result = new TaskCompletionSource<>(cancellation.getToken());

AtomicBoolean otherTaskCancelled = new AtomicBoolean(false);

Continuation<T, Task<Void>> continuation =
task -> {
if (task.isSuccessful()) {
result.trySetResult(task.getResult());
} else if (task.getException() != null) {
result.trySetException(task.getException());
} else if (otherTaskCancelled.getAndSet(true)) {
cancellation.cancel();
}
return Tasks.forResult(null);
};

task1.continueWithTask(executor, continuation);
task2.continueWithTask(executor, continuation);

return result.getTask();
}

/**
* Blocks until all current pending tasks have completed, up to 30 seconds. Useful for testing.
*
Expand Down

0 comments on commit 8b832df

Please sign in to comment.