Skip to content

Commit

Permalink
2.x: Add Observable.rangeLong & Flowable.rangeLong (#4687)
Browse files Browse the repository at this point in the history
* 2.x: Add Observable.rangeLong & Flowable.rangeLong

* Clean up cast

* Adjust Long overflow checks

* Add test for rangeLong count 1

* Fix ObservableRangeLongTest.testRangeWithOverflow5
  • Loading branch information
vanniktech authored and akarnokd committed Oct 11, 2016
1 parent 5888b23 commit 7e89c1f
Show file tree
Hide file tree
Showing 6 changed files with 918 additions and 0 deletions.
44 changes: 44 additions & 0 deletions src/main/java/io/reactivex/Flowable.java
Original file line number Diff line number Diff line change
Expand Up @@ -3455,6 +3455,50 @@ public static Flowable<Integer> range(int start, int count) {
return RxJavaPlugins.onAssembly(new FlowableRange(start, count));
}

/**
* Returns a Flowable that emits a sequence of Longs within a specified range.
* <p>
* <img width="640" height="195" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/range.png" alt="">
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator honors backpressure from downstream and signals values on-demand (i.e., when requested).</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code rangeLong} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param start
* the value of the first Long in the sequence
* @param count
* the number of sequential Longs to generate
* @return a Flowable that emits a range of sequential Longs
* @throws IllegalArgumentException
* if {@code count} is less than zero, or if {@code start} + {@code count} &minus; 1 exceeds
* {@code Long.MAX_VALUE}
* @see <a href="http://reactivex.io/documentation/operators/range.html">ReactiveX operators documentation: Range</a>
*/
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.NONE)
public static Flowable<Long> rangeLong(long start, long count) {
if (count < 0) {
throw new IllegalArgumentException("count >= 0 required but it was " + count);
}

if (count == 0) {
return empty();
}

if (count == 1) {
return just(start);
}

long end = start + (count - 1);
if (start > 0 && end < 0) {
throw new IllegalArgumentException("Overflow! start + count is bigger than Long.MAX_VALUE");
}

return RxJavaPlugins.onAssembly(new FlowableRangeLong(start, count));
}

/**
* Returns a Flowable that emits a Boolean value that indicates whether two Publisher sequences are the
* same by comparing the items emitted by each Publisher pairwise.
Expand Down
41 changes: 41 additions & 0 deletions src/main/java/io/reactivex/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -2992,6 +2992,47 @@ public static Observable<Integer> range(final int start, final int count) {
return RxJavaPlugins.onAssembly(new ObservableRange(start, count));
}

/**
* Returns an Observable that emits a sequence of Longs within a specified range.
* <p>
* <img width="640" height="195" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/range.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code rangeLong} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param start
* the value of the first Long in the sequence
* @param count
* the number of sequential Longs to generate
* @return an Observable that emits a range of sequential Longs
* @throws IllegalArgumentException
* if {@code count} is less than zero, or if {@code start} + {@code count} &minus; 1 exceeds
* {@code Long.MAX_VALUE}
* @see <a href="http://reactivex.io/documentation/operators/range.html">ReactiveX operators documentation: Range</a>
*/
@SchedulerSupport(SchedulerSupport.NONE)
public static Observable<Long> rangeLong(long start, long count) {
if (count < 0) {
throw new IllegalArgumentException("count >= 0 required but it was " + count);
}

if (count == 0) {
return empty();
}

if (count == 1) {
return just(start);
}

long end = start + (count - 1);
if (start > 0 && end < 0) {
throw new IllegalArgumentException("Overflow! start + count is bigger than Long.MAX_VALUE");
}

return RxJavaPlugins.onAssembly(new ObservableRangeLong(start, count));
}

/**
* Returns an Observable that emits a Boolean value that indicates whether two ObservableSource sequences are the
* same by comparing the items emitted by each ObservableSource pairwise.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,246 @@
/**
* Copyright 2016 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex.internal.operators.flowable;

import io.reactivex.Flowable;
import io.reactivex.internal.fuseable.ConditionalSubscriber;
import io.reactivex.internal.subscriptions.BasicQueueSubscription;
import io.reactivex.internal.subscriptions.SubscriptionHelper;
import io.reactivex.internal.util.BackpressureHelper;
import org.reactivestreams.Subscriber;

/**
* Emits a range of long values.
*/
public final class FlowableRangeLong extends Flowable<Long> {
final long start;
final long end;

public FlowableRangeLong(long start, long count) {
this.start = start;
this.end = start + count;
}

@Override
public void subscribeActual(Subscriber<? super Long> s) {
if (s instanceof ConditionalSubscriber) {
s.onSubscribe(new RangeConditionalSubscription(
(ConditionalSubscriber<? super Long>)s, start, end));
} else {
s.onSubscribe(new RangeSubscription(s, start, end));
}
}

abstract static class BaseRangeSubscription extends BasicQueueSubscription<Long> {

private static final long serialVersionUID = -2252972430506210021L;

final long end;

long index;

volatile boolean cancelled;

BaseRangeSubscription(long index, long end) {
this.index = index;
this.end = end;
}

@Override
public final int requestFusion(int mode) {
return mode & SYNC;
}

@Override
public final Long poll() {
long i = index;
if (i == end) {
return null;
}
index = i + 1;
return i;
}

@Override
public final boolean isEmpty() {
return index == end;
}

@Override
public final void clear() {
index = end;
}

@Override
public final void request(long n) {
if (SubscriptionHelper.validate(n)) {
if (BackpressureHelper.add(this, n) == 0L) {
if (n == Long.MAX_VALUE) {
fastPath();
} else {
slowPath(n);
}
}
}
}

@Override
public final void cancel() {
cancelled = true;
}


abstract void fastPath();

abstract void slowPath(long r);
}

static final class RangeSubscription extends BaseRangeSubscription {

private static final long serialVersionUID = 2587302975077663557L;

final Subscriber<? super Long> actual;

RangeSubscription(Subscriber<? super Long> actual, long index, long end) {
super(index, end);
this.actual = actual;
}

@Override
void fastPath() {
long f = end;
Subscriber<? super Long> a = actual;

for (long i = index; i != f; i++) {
if (cancelled) {
return;
}
a.onNext(i);
}
if (cancelled) {
return;
}
a.onComplete();
}

@Override
void slowPath(long r) {
long e = 0;
long f = end;
long i = index;
Subscriber<? super Long> a = actual;

for (;;) {

while (e != r && i != f) {
if (cancelled) {
return;
}

a.onNext(i);

e++;
i++;
}

if (i == f) {
if (!cancelled) {
a.onComplete();
}
return;
}

r = get();
if (e == r) {
index = i;
r = addAndGet(-e);
if (r == 0L) {
return;
}
e = 0L;
}
}
}
}

static final class RangeConditionalSubscription extends BaseRangeSubscription {


private static final long serialVersionUID = 2587302975077663557L;

final ConditionalSubscriber<? super Long> actual;

RangeConditionalSubscription(ConditionalSubscriber<? super Long> actual, long index, long end) {
super(index, end);
this.actual = actual;
}

@Override
void fastPath() {
long f = end;
ConditionalSubscriber<? super Long> a = actual;

for (long i = index; i != f; i++) {
if (cancelled) {
return;
}
a.tryOnNext(i);
}
if (cancelled) {
return;
}
a.onComplete();
}

@Override
void slowPath(long r) {
long e = 0;
long f = end;
long i = index;
ConditionalSubscriber<? super Long> a = actual;

for (;;) {

while (e != r && i != f) {
if (cancelled) {
return;
}

if (a.tryOnNext(i)) {
e++;
}

i++;
}

if (i == f) {
if (!cancelled) {
a.onComplete();
}
return;
}

r = get();
if (e == r) {
index = i;
r = addAndGet(-e);
if (r == 0) {
return;
}
e = 0;
}
}
}
}
}
Loading

0 comments on commit 7e89c1f

Please sign in to comment.