Skip to content

Commit

Permalink
Generate Observable based clients in generic generator
Browse files Browse the repository at this point in the history
  • Loading branch information
jianghaolu committed Aug 17, 2016
1 parent ca25c1c commit 75a68c3
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 69 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import retrofit2.http.GET;
import retrofit2.http.Header;
import retrofit2.http.Url;
import rx.Observable;

import java.io.IOException;
import java.lang.reflect.Type;
Expand Down Expand Up @@ -156,14 +157,10 @@ public <T, THeader> ServiceResponseWithHeaders<T, THeader> getPutOrPatchResultWi
* @param callback the user callback to call when operation terminates.
* @return the task describing the asynchronous polling.
*/
public <T> AsyncPollingTask<T> getPutOrPatchResultAsync(Response<ResponseBody> response, Type resourceType, ServiceCall<T> serviceCall, ServiceCallback<T> callback) {
public <T> Observable<ServiceResponse<T>> getPutOrPatchResultAsync(Response<ResponseBody> response, Type resourceType, Observable<ServiceResponse<T>> observable) {
if (response == null) {
CloudException t = new CloudException("response is null.");
if (callback != null) {
callback.failure(t);
}
serviceCall.failure(t);
return null;
return Observable.error(t);
}

int statusCode = response.code();
Expand All @@ -182,29 +179,21 @@ public <T> AsyncPollingTask<T> getPutOrPatchResultAsync(Response<ResponseBody> r
responseBody.close();
}
} catch (Exception e) { /* ignore serialization errors on top of service errors */ }
if (callback != null) {
callback.failure(exception);
}
serviceCall.failure(exception);
return null;
return Observable.error(exception);
}

PollingState<T> pollingState;
try {
pollingState = new PollingState<>(response, this.getLongRunningOperationRetryTimeout(), resourceType, restClient().mapperAdapter());
} catch (IOException e) {
if (callback != null) {
callback.failure(e);
}
serviceCall.failure(e);
return null;
return Observable.error(e);
}
String url = response.raw().request().url().toString();

// Task runner will take it from here
PutPatchPollingTask<T> task = new PutPatchPollingTask<>(pollingState, url, serviceCall, callback);
executor.schedule(task, pollingState.getDelayInMilliseconds(), TimeUnit.MILLISECONDS);
return task;
return Observable.fromCallable(task);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import okhttp3.OkHttpClient;
import okhttp3.logging.HttpLoggingInterceptor;
import retrofit2.Retrofit;
import retrofit2.adapter.rxjava.RxJavaCallAdapterFactory;

import java.lang.reflect.Field;
import java.net.CookieManager;
Expand Down Expand Up @@ -338,6 +339,7 @@ public RestClient build() {
.baseUrl(baseUrl)
.client(httpClient)
.addConverterFactory(mapperAdapter.getConverterFactory())
.addCallAdapterFactory(RxJavaCallAdapterFactory.create())
.build(),
credentials,
customHeadersInterceptor,
Expand Down
2 changes: 2 additions & 0 deletions client-runtime/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ dependencies {
compile 'com.squareup.okhttp3:logging-interceptor:3.3.1'
compile 'com.squareup.okhttp3:okhttp-urlconnection:3.3.1'
compile 'com.squareup.retrofit2:converter-jackson:2.0.2'
compile 'com.squareup.retrofit2:adapter-rxjava:2.0.2'
compile 'io.reactivex:rxjava:1.1.8'
compile 'com.fasterxml.jackson.datatype:jackson-datatype-joda:2.7.2'
compile 'org.apache.commons:commons-lang3:3.4'
testCompile 'junit:junit:4.12'
Expand Down
119 changes: 67 additions & 52 deletions client-runtime/src/main/java/com/microsoft/rest/ServiceCall.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@

import com.google.common.util.concurrent.AbstractFuture;

import retrofit2.Call;
import rx.Observable;
import rx.Subscription;
import rx.functions.Action1;

/**
* An instance of this class provides access to the underlying REST call invocation.
Expand All @@ -22,33 +24,72 @@ public class ServiceCall<T> extends AbstractFuture<ServiceResponse<T>> {
/**
* The Retrofit method invocation.
*/
private Call<?> call;
private Subscription subscription;

/**
* Creates an instance of ServiceCall.
*
* @param call the Retrofit call to wrap around.
*/
public ServiceCall(Call<?> call) {
this.call = call;
private ServiceCall() {
}

/**
* Updates the current Retrofit call object.
*
* @param call the new call object.
*/
public void newCall(Call<?> call) {
this.call = call;
public static <T> ServiceCall<T> create(final Observable<ServiceResponse<T>> observable) {
final ServiceCall<T> serviceCall = new ServiceCall<>();
serviceCall.subscription = observable
.subscribe(new Action1<ServiceResponse<T>>() {
@Override
public void call(ServiceResponse<T> t) {
serviceCall.set(t);
}
}, new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
serviceCall.setException(throwable);
}
});
return serviceCall;
}

/**
* Gets the current Retrofit call object.
*
* @return the current call object.
*/
public Call<?> getCall() {
return call;
public static <T> ServiceCall<T> create(final Observable<ServiceResponse<T>> observable, final ServiceCallback<T> callback) {
final ServiceCall<T> serviceCall = new ServiceCall<>();
serviceCall.subscription = observable
.subscribe(new Action1<ServiceResponse<T>>() {
@Override
public void call(ServiceResponse<T> t) {
if (callback != null) {
callback.success(t);
}
serviceCall.set(t);
}
}, new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
if (callback != null) {
callback.failure(throwable);
}
serviceCall.setException(throwable);
}
});
return serviceCall;
}

public static <T, V> ServiceCall<T> createWithHeaders(final Observable<ServiceResponseWithHeaders<T, V>> observable, final ServiceCallback<T> callback) {
final ServiceCall<T> serviceCall = new ServiceCall<>();
serviceCall.subscription = observable
.subscribe(new Action1<ServiceResponse<T>>() {
@Override
public void call(ServiceResponse<T> t) {
if (callback != null) {
callback.success(t);
}
serviceCall.set(t);
}
}, new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
if (callback != null) {
callback.failure(throwable);
}
serviceCall.setException(throwable);
}
});
return serviceCall;
}

/**
Expand All @@ -59,38 +100,12 @@ public Call<?> getCall() {
*/
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
if (isCancelled()) {
return false;
} else {
call.cancel();
return true;
}
subscription.unsubscribe();
return super.cancel(mayInterruptIfRunning);
}

@Override
public boolean isCancelled() {
return call.isCanceled();
}

/**
* Invoke this method to report completed, allowing
* {@link AbstractFuture#get()} to be unblocked.
*
* @param result the service response returned.
* @return true if successfully reported; false otherwise.
*/
public boolean success(ServiceResponse<T> result) {
return set(result);
}

/**
* Invoke this method to report a failure, allowing
* {@link AbstractFuture#get()} to throw the exception.
*
* @param t the exception thrown.
* @return true if successfully reported; false otherwise.
*/
public boolean failure(Throwable t) {
return setException(t);
return subscription.isUnsubscribed();
}
}

0 comments on commit 75a68c3

Please sign in to comment.