Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added RxJava3 adapter #3315

Merged
merged 1 commit into from
May 20, 2020
Merged

Added RxJava3 adapter #3315

merged 1 commit into from
May 20, 2020

Conversation

mickverm
Copy link

@mickverm mickverm commented Feb 24, 2020

Closes #3297

@provJoshMieczkowski
Copy link

@mickverm you have some failing tests

@mickverm
Copy link
Author

mickverm commented Feb 26, 2020

The following tests fail:

FlowableTest

  • responseRespectsBackPressure
  • bodyRespectsBackpressure
  • resultRespectsBackpressure

All with the same reason:

org.junit.ComparisonFailure: 
Expected :1
Actual   :0

I noticed these same tests from the adapter-rxjava2 succeed but not without throwing errors:

Feb 26, 2020 7:19:13 PM okhttp3.mockwebserver.MockWebServer$3 execute
INFO: MockWebServer[52498] starting to accept connections
java.lang.IllegalArgumentException: n > 0 required but it was 0
	at io.reactivex.internal.subscriptions.SubscriptionHelper.validate(SubscriptionHelper.java:80)
	at io.reactivex.internal.operators.flowable.FlowableOnBackpressureLatest$BackpressureLatestSubscriber.request(FlowableOnBackpressureLatest.java:85)
	at retrofit2.adapter.rxjava2.RecordingSubscriber.onSubscribe(RecordingSubscriber.java:45)
	at io.reactivex.internal.operators.flowable.FlowableOnBackpressureLatest$BackpressureLatestSubscriber.onSubscribe(FlowableOnBackpressureLatest.java:59)
	at io.reactivex.internal.operators.flowable.FlowableFromObservable$SubscriberObserver.onSubscribe(FlowableFromObservable.java:60)
	at retrofit2.adapter.rxjava2.BodyObservable$BodyObserver.onSubscribe(BodyObservable.java:46)
	at retrofit2.adapter.rxjava2.CallExecuteObservable.subscribeActual(CallExecuteObservable.java:38)
	at io.reactivex.Observable.subscribe(Observable.java:10151)
	at retrofit2.adapter.rxjava2.BodyObservable.subscribeActual(BodyObservable.java:34)
	at io.reactivex.Observable.subscribe(Observable.java:10151)
	at io.reactivex.internal.operators.flowable.FlowableFromObservable.subscribeActual(FlowableFromObservable.java:31)
	at io.reactivex.Flowable.subscribe(Flowable.java:12172)
	at io.reactivex.internal.operators.flowable.FlowableOnBackpressureLatest.subscribeActual(FlowableOnBackpressureLatest.java:31)
	at io.reactivex.Flowable.subscribe(Flowable.java:12172)
	at retrofit2.adapter.rxjava2.FlowableTest.bodyRespectsBackpressure(FlowableTest.java:84)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:567)
	at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
	at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
	at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
	at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
	at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
	at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
	at retrofit2.adapter.rxjava2.RecordingSubscriber$Rule$1.evaluate(RecordingSubscriber.java:146)
	at org.junit.rules.RunRules.evaluate(RunRules.java:20)
	at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
	at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
	at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
	at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
	at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
	at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
	at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
	at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
	at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
	at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
	at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:230)
	at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:58)
Exception in thread "main" java.lang.IllegalArgumentException: n > 0 required but it was 0
	at io.reactivex.internal.subscriptions.SubscriptionHelper.validate(SubscriptionHelper.java:80)
	at io.reactivex.internal.operators.flowable.FlowableOnBackpressureLatest$BackpressureLatestSubscriber.request(FlowableOnBackpressureLatest.java:85)
	at retrofit2.adapter.rxjava2.RecordingSubscriber.onSubscribe(RecordingSubscriber.java:45)
	at io.reactivex.internal.operators.flowable.FlowableOnBackpressureLatest$BackpressureLatestSubscriber.onSubscribe(FlowableOnBackpressureLatest.java:59)
	at io.reactivex.internal.operators.flowable.FlowableFromObservable$SubscriberObserver.onSubscribe(FlowableFromObservable.java:60)
	at retrofit2.adapter.rxjava2.BodyObservable$BodyObserver.onSubscribe(BodyObservable.java:46)
	at retrofit2.adapter.rxjava2.CallExecuteObservable.subscribeActual(CallExecuteObservable.java:38)
	at io.reactivex.Observable.subscribe(Observable.java:10151)
	at retrofit2.adapter.rxjava2.BodyObservable.subscribeActual(BodyObservable.java:34)
	at io.reactivex.Observable.subscribe(Observable.java:10151)
	at io.reactivex.internal.operators.flowable.FlowableFromObservable.subscribeActual(FlowableFromObservable.java:31)
	at io.reactivex.Flowable.subscribe(Flowable.java:12172)
	at io.reactivex.internal.operators.flowable.FlowableOnBackpressureLatest.subscribeActual(FlowableOnBackpressureLatest.java:31)
	at io.reactivex.Flowable.subscribe(Flowable.java:12172)
	at retrofit2.adapter.rxjava2.FlowableTest.bodyRespectsBackpressure(FlowableTest.java:84)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:567)
	at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
	at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
	at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
	at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
	at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
	at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
	at retrofit2.adapter.rxjava2.RecordingSubscriber$Rule$1.evaluate(RecordingSubscriber.java:146)
	at org.junit.rules.RunRules.evaluate(RunRules.java:20)
	at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
	at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
	at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
	at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
	at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
	at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
	at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
	at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
	at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
	at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
	at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:230)
	at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:58)
Feb 26, 2020 7:19:13 PM okhttp3.mockwebserver.MockWebServer$4 processOneRequest
INFO: MockWebServer[52498] received request: GET / HTTP/1.1 and responded: HTTP/1.1 200 OK
Feb 26, 2020 7:19:13 PM okhttp3.mockwebserver.MockWebServer$3 acceptConnections
INFO: MockWebServer[52498] done accepting connections: Interrupted function call: accept failed

@provJoshMieczkowski
Copy link

provJoshMieczkowski commented Feb 26, 2020

I'm looking into atm and I've noticed that in CallExecuteObservable for the rxjava3 adapter, this guy if (disposable.isDisposed()) is returning true which is why the test are failing but if you look at the rxjava2 version, that is returning false and allowing the data to continue.

@provJoshMieczkowski
Copy link

@mickverm is removing the tests the right solution vs figuring out why CallExecuteObservable is being disposable as soon as we use it?

@provJoshMieczkowski
Copy link

provJoshMieczkowski commented Feb 26, 2020

Doing some more digging, I've noticed that in RecordingSubscriber we are calling subscription.request(initialRequest); with initialRequest = 0. That causes an error to be thrown at StrictSubscriber.request because 0 is not a valid value

In RxJava2, they are using FlowableOnBackpressureLatest which does check for that 0 but doesn't throw an error

@provJoshMieczkowski
Copy link

Ok, I have found the issue. In Flowable.subscribe if you do not pass in a FlowableSubscriber (the RecordingSubscriber was using just Subscriber), it will default to using a StrictSubscriber which caused the tests to fail.

@provJoshMieczkowski
Copy link

mickverm#1
@mickverm open a PR on your fork that fixes the tests 👍

@mickverm
Copy link
Author

mickverm#1
@mickverm open a PR on your fork that fixes the tests 👍

You're right, I've implemented your changes.

@provJoshMieczkowski
Copy link

Is anyone looking at this guy? Would be awesome if we can merge it in so we can upgrade RxJava :D

@adam-hurwitz
Copy link

adam-hurwitz commented May 9, 2020

In case anyone is having a hard time finding it, @akarnokd has created a library for the rxjava3-retrofit-adapter as there does not seem to be one yet via io.reactivex. See RxJavaRetrofitAdapter and this StackOverflow post for details.

implementation "com.github.akarnokd:rxjava3-retrofit-adapter:3.0.0"

@JakeWharton JakeWharton force-pushed the rxjava3-adapter branch 2 times, most recently from 26b4f77 to 72a43c5 Compare May 20, 2020 15:29
@JakeWharton JakeWharton merged commit ab1d46d into square:master May 20, 2020
@JakeWharton
Copy link
Collaborator

Updated some docs and change the default behavior to async.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

First-party RxJava 3 adapter
4 participants