-
Notifications
You must be signed in to change notification settings - Fork 136
Reactive Streams : ValueSubscriber
johnmcclean-aol edited this page Nov 23, 2016
·
1 revision
ValueSubscriber is a ReactiveStreams subscriber that will subscribe to a ReactiveStreams Publisher and extract a single value e.g.
ValueSubscriber<Integer> anInt = ValueSubscriber.subscriber();
ReactiveSeq.of(1,2,3)
.publish(anInt);
Xor<Throwable,Integer> xor = anInt.toXor();
In this case our xor will be an Xor.primary instance and will hold the value of the first int through the Stream (1). If the Stream were to throw an exception during execution on the other hand e.g.
ValueSubscriber<Integer> anInt = ValueSubscriber.subscriber();
ReactiveSeq.of(1,2,3)
.map(this::throwCustomException)
.publish(anInt);
Xor<Throwable,Integer> xor = anInt.toXor();
In this case our Xor will be a secondary instance containing the exception. We could also make use of the ValueSubscriber#toTry / ValueSubscriber#toIor methods.
ValueSubscriber<Integer> anInt = ValueSubscriber.subscriber();
ReactiveSeq.of(1,2,3)
.publish(anInt);
Xor<Throwable,Integer> xor = anInt.toXor(); //1
Try<Throwable,Integer> myTry = anInt.toTry(); //2
oops - my bad