Skip to content

Commit

Permalink
Added RxJava3 adapter
Browse files Browse the repository at this point in the history
  • Loading branch information
Michiel Vermeersch authored and JakeWharton committed May 20, 2020
1 parent 1078fd1 commit f68ceb4
Show file tree
Hide file tree
Showing 41 changed files with 4,870 additions and 0 deletions.
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ buildscript {
'androidxTestRunner': 'androidx.test:runner:1.1.0',
'rxjava': 'io.reactivex:rxjava:1.3.8',
'rxjava2': 'io.reactivex.rxjava2:rxjava:2.0.0',
'rxjava3': 'io.reactivex.rxjava3:rxjava:3.0.0',
'reactiveStreams': 'org.reactivestreams:reactive-streams:1.0.3',
'scalaLibrary': 'org.scala-lang:scala-library:2.13.1',
'gson': 'com.google.code.gson:gson:2.8.5',
Expand Down
65 changes: 65 additions & 0 deletions retrofit-adapters/rxjava3/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
RxJava3 Adapter
==============

An `Adapter` for adapting [RxJava 3.x][1] types.

Available types:

* `Observable<T>`, `Observable<Response<T>>`, and `Observable<Result<T>>` where `T` is the body type.
* `Flowable<T>`, `Flowable<Response<T>>` and `Flowable<Result<T>>` where `T` is the body type.
* `Single<T>`, `Single<Response<T>>`, and `Single<Result<T>>` where `T` is the body type.
* `Maybe<T>`, `Maybe<Response<T>>`, and `Maybe<Result<T>>` where `T` is the body type.
* `Completable` where response bodies are discarded.


Usage
-----

Add `RxJava3CallAdapterFactory` as a `Call` adapter when building your `Retrofit` instance:
```java
Retrofit retrofit = new Retrofit.Builder()
.baseUrl("https://example.com/")
.addCallAdapterFactory(RxJava3CallAdapterFactory.create())
.build();
```

Your service methods can now use any of the above types as their return type.
```java
interface MyService {
@GET("/user")
Observable<User> getUser();
}
```

By default, `create()` will produce reactive types which execute their HTTP requests asynchronously
on a background thread. There are two other ways to control the threading on which a request
occurs:

* Use `createSynchronous()` and call `subscribeOn` on the returned reactive type with a `Scheduler`
of your choice.
* Use `createWithScheduler(Scheduler)` to supply a default subscription `Scheduler`.

Download
--------

Download [the latest JAR][2] or grab via [Maven][3]:
```xml
<dependency>
<groupId>com.squareup.retrofit2</groupId>
<artifactId>adapter-rxjava3</artifactId>
<version>latest.version</version>
</dependency>
```
or [Gradle][3]:
```groovy
implementation 'com.squareup.retrofit2:adapter-rxjava3:latest.version'
```

Snapshots of the development version are available in [Sonatype's `snapshots` repository][snap].



[1]: https://github.com/ReactiveX/RxJava/tree/3.x
[2]: https://search.maven.org/remote_content?g=com.squareup.retrofit2&a=adapter-rxjava3&v=LATEST
[3]: http://search.maven.org/#search%7Cga%7C1%7Cg%3A%22com.squareup.retrofit2%22%20a%3A%22adapter-rxjava3%22
[snap]: https://oss.sonatype.org/content/repositories/snapshots/
20 changes: 20 additions & 0 deletions retrofit-adapters/rxjava3/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
apply plugin: 'java-library'
apply plugin: 'com.vanniktech.maven.publish'

dependencies {
api project(':retrofit')
api deps.rxjava3
api deps.reactiveStreams
compileOnly deps.findBugsAnnotations

testImplementation deps.junit
testImplementation deps.assertj
testImplementation deps.guava
testImplementation deps.mockwebserver
}

jar {
manifest {
attributes 'Automatic-Module-Name': 'retrofit2.adapter.rxjava3'
}
}
3 changes: 3 additions & 0 deletions retrofit-adapters/rxjava3/gradle.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
POM_ARTIFACT_ID=adapter-rxjava3
POM_NAME=Adapter: RxJava 3
POM_DESCRIPTION=A Retrofit CallAdapter for RxJava 3's stream types.
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Copyright (C) 2020 Square, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package retrofit2.adapter.rxjava3;

import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.core.Observer;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.exceptions.CompositeException;
import io.reactivex.rxjava3.exceptions.Exceptions;
import io.reactivex.rxjava3.plugins.RxJavaPlugins;
import retrofit2.Response;

final class BodyObservable<T> extends Observable<T> {
private final Observable<Response<T>> upstream;

BodyObservable(Observable<Response<T>> upstream) {
this.upstream = upstream;
}

@Override
protected void subscribeActual(Observer<? super T> observer) {
upstream.subscribe(new BodyObserver<>(observer));
}

private static class BodyObserver<R> implements Observer<Response<R>> {
private final Observer<? super R> observer;
private boolean terminated;

BodyObserver(Observer<? super R> observer) {
this.observer = observer;
}

@Override
public void onSubscribe(Disposable disposable) {
observer.onSubscribe(disposable);
}

@Override
public void onNext(Response<R> response) {
if (response.isSuccessful()) {
observer.onNext(response.body());
} else {
terminated = true;
Throwable t = new HttpException(response);
try {
observer.onError(t);
} catch (Throwable inner) {
Exceptions.throwIfFatal(inner);
RxJavaPlugins.onError(new CompositeException(t, inner));
}
}
}

@Override
public void onComplete() {
if (!terminated) {
observer.onComplete();
}
}

@Override
public void onError(Throwable throwable) {
if (!terminated) {
observer.onError(throwable);
} else {
// This should never happen! onNext handles and forwards errors automatically.
Throwable broken =
new AssertionError(
"This should never happen! Report as a bug with the full stacktrace.");
//noinspection UnnecessaryInitCause Two-arg AssertionError constructor is 1.7+ only.
broken.initCause(throwable);
RxJavaPlugins.onError(broken);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*
* Copyright (C) 2020 Square, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package retrofit2.adapter.rxjava3;

import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.core.Observer;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.exceptions.CompositeException;
import io.reactivex.rxjava3.exceptions.Exceptions;
import io.reactivex.rxjava3.plugins.RxJavaPlugins;
import retrofit2.Call;
import retrofit2.Callback;
import retrofit2.Response;

final class CallEnqueueObservable<T> extends Observable<Response<T>> {
private final Call<T> originalCall;

CallEnqueueObservable(Call<T> originalCall) {
this.originalCall = originalCall;
}

@Override
protected void subscribeActual(Observer<? super Response<T>> observer) {
// Since Call is a one-shot type, clone it for each new observer.
Call<T> call = originalCall.clone();
CallCallback<T> callback = new CallCallback<>(call, observer);
observer.onSubscribe(callback);
if (!callback.isDisposed()) {
call.enqueue(callback);
}
}

private static final class CallCallback<T> implements Disposable, Callback<T> {
private final Call<?> call;
private final Observer<? super Response<T>> observer;
private volatile boolean disposed;
boolean terminated = false;

CallCallback(Call<?> call, Observer<? super Response<T>> observer) {
this.call = call;
this.observer = observer;
}

@Override
public void onResponse(Call<T> call, Response<T> response) {
if (disposed) return;

try {
observer.onNext(response);

if (!disposed) {
terminated = true;
observer.onComplete();
}
} catch (Throwable t) {
Exceptions.throwIfFatal(t);
if (terminated) {
RxJavaPlugins.onError(t);
} else if (!disposed) {
try {
observer.onError(t);
} catch (Throwable inner) {
Exceptions.throwIfFatal(inner);
RxJavaPlugins.onError(new CompositeException(t, inner));
}
}
}
}

@Override
public void onFailure(Call<T> call, Throwable t) {
if (call.isCanceled()) return;

try {
observer.onError(t);
} catch (Throwable inner) {
Exceptions.throwIfFatal(inner);
RxJavaPlugins.onError(new CompositeException(t, inner));
}
}

@Override
public void dispose() {
disposed = true;
call.cancel();
}

@Override
public boolean isDisposed() {
return disposed;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Copyright (C) 2020 Square, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package retrofit2.adapter.rxjava3;

import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.core.Observer;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.exceptions.CompositeException;
import io.reactivex.rxjava3.exceptions.Exceptions;
import io.reactivex.rxjava3.plugins.RxJavaPlugins;
import retrofit2.Call;
import retrofit2.Response;

final class CallExecuteObservable<T> extends Observable<Response<T>> {
private final Call<T> originalCall;

CallExecuteObservable(Call<T> originalCall) {
this.originalCall = originalCall;
}

@Override
protected void subscribeActual(Observer<? super Response<T>> observer) {
// Since Call is a one-shot type, clone it for each new observer.
Call<T> call = originalCall.clone();
CallDisposable disposable = new CallDisposable(call);
observer.onSubscribe(disposable);
if (disposable.isDisposed()) {
return;
}

boolean terminated = false;
try {
Response<T> response = call.execute();
if (!disposable.isDisposed()) {
observer.onNext(response);
}
if (!disposable.isDisposed()) {
terminated = true;
observer.onComplete();
}
} catch (Throwable t) {
Exceptions.throwIfFatal(t);
if (terminated) {
RxJavaPlugins.onError(t);
} else if (!disposable.isDisposed()) {
try {
observer.onError(t);
} catch (Throwable inner) {
Exceptions.throwIfFatal(inner);
RxJavaPlugins.onError(new CompositeException(t, inner));
}
}
}
}

private static final class CallDisposable implements Disposable {
private final Call<?> call;
private volatile boolean disposed;

CallDisposable(Call<?> call) {
this.call = call;
}

@Override
public void dispose() {
disposed = true;
call.cancel();
}

@Override
public boolean isDisposed() {
return disposed;
}
}
}
Loading

0 comments on commit f68ceb4

Please sign in to comment.