diff --git a/azure-client-runtime/src/main/java/com/microsoft/azure/AzureClient.java b/azure-client-runtime/src/main/java/com/microsoft/azure/AzureClient.java index 4bc9cd9e55d14..6529375f6343c 100644 --- a/azure-client-runtime/src/main/java/com/microsoft/azure/AzureClient.java +++ b/azure-client-runtime/src/main/java/com/microsoft/azure/AzureClient.java @@ -7,27 +7,26 @@ package com.microsoft.azure; -import com.microsoft.rest.ServiceCall; -import com.microsoft.rest.ServiceCallback; -import com.microsoft.rest.ServiceException; import com.microsoft.rest.ServiceResponse; -import com.microsoft.rest.ServiceResponseCallback; import com.microsoft.rest.ServiceResponseWithHeaders; -import okhttp3.ResponseBody; -import retrofit2.Call; -import retrofit2.Response; -import retrofit2.http.GET; -import retrofit2.http.Header; -import retrofit2.http.Url; import java.io.IOException; import java.lang.reflect.Type; import java.net.MalformedURLException; import java.net.URL; +import java.util.Arrays; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import okhttp3.ResponseBody; +import retrofit2.Response; +import retrofit2.http.GET; +import retrofit2.http.Header; +import retrofit2.http.Url; +import rx.Observable; +import rx.functions.Func1; + /** * An instance of this class defines a ServiceClient that handles polling and * retrying for long running operations when accessing Azure resources. @@ -58,11 +57,29 @@ public AzureClient(AzureServiceClient serviceClient) { this.serviceClientUserAgent = serviceClient.userAgent(); } + /** + * Gets the interval time between two long running operation polls. + * + * @return the time in milliseconds. + */ + public Integer getLongRunningOperationRetryTimeout() { + return longRunningOperationRetryTimeout; + } + + /** + * Sets the interval time between two long running operation polls. + * + * @param longRunningOperationRetryTimeout the time in milliseconds. + */ + public void withLongRunningOperationRetryTimeout(Integer longRunningOperationRetryTimeout) { + this.longRunningOperationRetryTimeout = longRunningOperationRetryTimeout; + } + /** * Handles an initial response from a PUT or PATCH operation response by polling * the status of the operation until the long running operation terminates. * - * @param response the initial response from the PUT or PATCH operation. + * @param observable the initial observable from the PUT or PATCH operation. * @param the return type of the caller * @param resourceType the type of the resource * @return the terminal response for the operation. @@ -70,62 +87,16 @@ public AzureClient(AzureServiceClient serviceClient) { * @throws InterruptedException interrupted exception * @throws IOException thrown by deserialization */ - public ServiceResponse getPutOrPatchResult(Response response, Type resourceType) throws CloudException, InterruptedException, IOException { - if (response == null) { - throw new CloudException("response is null."); - } - - int statusCode = response.code(); - ResponseBody responseBody; - if (response.isSuccessful()) { - responseBody = response.body(); - } else { - responseBody = response.errorBody(); - } - if (statusCode != 200 && statusCode != 201 && statusCode != 202) { - CloudException exception = new CloudException(statusCode + " is not a valid polling status code"); - exception.setResponse(response); - if (responseBody != null) { - exception.setBody((CloudError) restClient().mapperAdapter().deserialize(responseBody.string(), CloudError.class)); - responseBody.close(); - } - throw exception; - } - - PollingState pollingState = new PollingState<>(response, this.getLongRunningOperationRetryTimeout(), resourceType, restClient().mapperAdapter()); - String url = response.raw().request().url().toString(); - - // Check provisioning state - while (!AzureAsyncOperation.getTerminalStatuses().contains(pollingState.getStatus())) { - Thread.sleep(pollingState.getDelayInMilliseconds()); - - if (pollingState.getAzureAsyncOperationHeaderLink() != null - && !pollingState.getAzureAsyncOperationHeaderLink().isEmpty()) { - updateStateFromAzureAsyncOperationHeader(pollingState); - } else if (pollingState.getLocationHeaderLink() != null - && !pollingState.getLocationHeaderLink().isEmpty()) { - updateStateFromLocationHeaderOnPut(pollingState); - } else { - updateStateFromGetResourceOperation(pollingState, url); - } - } - - if (AzureAsyncOperation.SUCCESS_STATUS.equals(pollingState.getStatus()) && pollingState.getResource() == null) { - updateStateFromGetResourceOperation(pollingState, url); - } - - if (AzureAsyncOperation.getFailedStatuses().contains(pollingState.getStatus())) { - throw new CloudException("Async operation failed"); - } - - return new ServiceResponse<>(pollingState.getResource(), pollingState.getResponse()); + public ServiceResponse getPutOrPatchResult(Observable> observable, Type resourceType) throws CloudException, InterruptedException, IOException { + Observable> asyncObservable = getPutOrPatchResultAsync(observable, resourceType); + return asyncObservable.toBlocking().last(); } /** * Handles an initial response from a PUT or PATCH operation response by polling * the status of the operation until the long running operation terminates. * - * @param response the initial response from the PUT or PATCH operation. + * @param observable the initial observable from the PUT or PATCH operation. * @param resourceType the type of the resource * @param headerType the type of the response header * @param the return type of the caller @@ -135,12 +106,12 @@ public ServiceResponse getPutOrPatchResult(Response respons * @throws InterruptedException interrupted exception * @throws IOException thrown by deserialization */ - public ServiceResponseWithHeaders getPutOrPatchResultWithHeaders(Response response, Type resourceType, Class headerType) throws CloudException, InterruptedException, IOException { - ServiceResponse bodyResponse = getPutOrPatchResult(response, resourceType); + public ServiceResponseWithHeaders getPutOrPatchResultWithHeaders(Observable> observable, Type resourceType, Class headerType) throws CloudException, InterruptedException, IOException { + ServiceResponse bodyResponse = getPutOrPatchResult(observable, resourceType); return new ServiceResponseWithHeaders<>( - bodyResponse.getBody(), - restClient().mapperAdapter().deserialize(restClient().mapperAdapter().serialize(bodyResponse.getResponse().headers()), headerType), - bodyResponse.getResponse() + bodyResponse.getBody(), + restClient().mapperAdapter().deserialize(restClient().mapperAdapter().serialize(bodyResponse.getResponse().headers()), headerType), + bodyResponse.getResponse() ); } @@ -149,62 +120,77 @@ public ServiceResponseWithHeaders getPutOrPatchResultWi * the status of the operation asynchronously, calling the user provided callback * when the operation terminates. * - * @param response the initial response from the PUT or PATCH operation. + * @param observable the initial observable from the PUT or PATCH operation. * @param the return type of the caller. * @param resourceType the type of the resource. - * @param serviceCall the ServiceCall object tracking Retrofit calls. - * @param callback the user callback to call when operation terminates. - * @return the task describing the asynchronous polling. + * @return the observable of which a subscription will lead to a final response. */ - public AsyncPollingTask getPutOrPatchResultAsync(Response response, Type resourceType, ServiceCall serviceCall, ServiceCallback callback) { - if (response == null) { - CloudException t = new CloudException("response is null."); - if (callback != null) { - callback.failure(t); - } - serviceCall.failure(t); - return null; - } + public Observable> getPutOrPatchResultAsync(Observable> observable, final Type resourceType) { + return observable + .flatMap(new Func1, Observable>>() { + @Override + public Observable> call(Response response) { + CloudException exception = createExceptionFromResponse(response, 200, 201, 202); + if (exception != null) { + return Observable.error(exception); + } - int statusCode = response.code(); - ResponseBody responseBody; - if (response.isSuccessful()) { - responseBody = response.body(); - } else { - responseBody = response.errorBody(); - } - if (statusCode != 200 && statusCode != 201 && statusCode != 202) { - CloudException exception = new CloudException(statusCode + " is not a valid polling status code"); - exception.setResponse(response); - try { - if (responseBody != null) { - exception.setBody((CloudError) restClient().mapperAdapter().deserialize(responseBody.string(), CloudError.class)); - responseBody.close(); + try { + final PollingState pollingState = new PollingState<>(response, getLongRunningOperationRetryTimeout(), resourceType, restClient().mapperAdapter()); + final String url = response.raw().request().url().toString(); + + // Task runner will take it from here + return Observable.just(pollingState) + // Emit a polling task intermittently + .repeatWhen(new Func1, Observable>() { + @Override + public Observable call(Observable observable) { + return observable.delay(pollingState.getDelayInMilliseconds(), TimeUnit.MILLISECONDS); + } + }) + // Conditionally polls if it's not a terminal status + .flatMap(new Func1, Observable>>() { + @Override + public Observable> call(PollingState pollingState) { + if (!AzureAsyncOperation.getTerminalStatuses().contains(pollingState.getStatus())) { + return putOrPatchPollingDispatcher(pollingState, url); + } else { + return Observable.just(pollingState); + } + } + }) + // The above process continues until this filter passes + .filter(new Func1, Boolean>() { + @Override + public Boolean call(PollingState pollingState) { + return AzureAsyncOperation.getTerminalStatuses().contains(pollingState.getStatus()); + } + }) + .first() + // Possible extra get to receive the actual resource + .flatMap(new Func1, Observable>>() { + @Override + public Observable> call(PollingState pollingState) { + if (AzureAsyncOperation.SUCCESS_STATUS.equals(pollingState.getStatus()) && pollingState.getResource() == null) { + return updateStateFromGetResourceOperationAsync(pollingState, url); + } + if (AzureAsyncOperation.getFailedStatuses().contains(pollingState.getStatus())) { + return Observable.error(new CloudException("Async operation failed with provisioning state: " + pollingState.getStatus())); + } + return Observable.just(pollingState); + } + }) + .map(new Func1, ServiceResponse>() { + @Override + public ServiceResponse call(PollingState pollingState) { + return new ServiceResponse<>(pollingState.getResource(), pollingState.getResponse()); + } + }); + } catch (IOException e) { + return Observable.error(e); + } } - } catch (Exception e) { /* ignore serialization errors on top of service errors */ } - if (callback != null) { - callback.failure(exception); - } - serviceCall.failure(exception); - return null; - } - - PollingState pollingState; - try { - pollingState = new PollingState<>(response, this.getLongRunningOperationRetryTimeout(), resourceType, restClient().mapperAdapter()); - } catch (IOException e) { - if (callback != null) { - callback.failure(e); - } - serviceCall.failure(e); - return null; - } - String url = response.raw().request().url().toString(); - - // Task runner will take it from here - PutPatchPollingTask task = new PutPatchPollingTask<>(pollingState, url, serviceCall, callback); - executor.schedule(task, pollingState.getDelayInMilliseconds(), TimeUnit.MILLISECONDS); - return task; + }); } /** @@ -212,49 +198,36 @@ public AsyncPollingTask getPutOrPatchResultAsync(Response r * the status of the operation asynchronously, calling the user provided callback * when the operation terminates. * - * @param response the initial response from the PUT or PATCH operation. + * @param observable the initial response from the PUT or PATCH operation. * @param the return type of the caller * @param the type of the response header * @param resourceType the type of the resource. * @param headerType the type of the response header - * @param serviceCall the ServiceCall object tracking Retrofit calls. - * @param callback the user callback to call when operation terminates. * @return the task describing the asynchronous polling. */ - public AsyncPollingTask getPutOrPatchResultWithHeadersAsync(Response response, Type resourceType, final Class headerType, final ServiceCall serviceCall, final ServiceCallback callback) { - return this.getPutOrPatchResultAsync(response, resourceType, serviceCall, new ServiceCallback() { - @Override - public void failure(Throwable t) { - if (callback != null) { - callback.failure(t); - } - serviceCall.failure(t); - } - - @Override - public void success(ServiceResponse result) { - try { - ServiceResponseWithHeaders clientResponse = new ServiceResponseWithHeaders<>( - result.getBody(), - restClient().mapperAdapter().deserialize(restClient().mapperAdapter().serialize(result.getResponse().headers()), headerType), - result.getResponse() - ); - if (callback != null) { - callback.success(clientResponse); + public Observable> getPutOrPatchResultWithHeadersAsync(Observable> observable, Type resourceType, final Class headerType) { + Observable> bodyResponse = getPutOrPatchResultAsync(observable, resourceType); + return bodyResponse + .flatMap(new Func1, Observable>>() { + @Override + public Observable> call(ServiceResponse serviceResponse) { + try { + return Observable + .just(new ServiceResponseWithHeaders<>(serviceResponse.getBody(), + restClient().mapperAdapter().deserialize(restClient().mapperAdapter().serialize(serviceResponse.getResponse().headers()), headerType), + serviceResponse.getResponse())); + } catch (IOException e) { + return Observable.error(e); } - serviceCall.success(clientResponse); - } catch (IOException e) { - failure(e); } - } - }); + }); } /** * Handles an initial response from a POST or DELETE operation response by polling * the status of the operation until the long running operation terminates. * - * @param response the initial response from the POST or DELETE operation. + * @param observable the initial observable from the POST or DELETE operation. * @param the return type of the caller * @param resourceType the type of the resource * @return the terminal response for the operation. @@ -262,60 +235,16 @@ public void success(ServiceResponse result) { * @throws InterruptedException interrupted exception * @throws IOException thrown by deserialization */ - public ServiceResponse getPostOrDeleteResult(Response response, Type resourceType) throws CloudException, InterruptedException, IOException { - if (response == null) { - throw new CloudException("response is null."); - } - - int statusCode = response.code(); - ResponseBody responseBody; - if (response.isSuccessful()) { - responseBody = response.body(); - } else { - responseBody = response.errorBody(); - } - if (statusCode != 200 && statusCode != 202 && statusCode != 204) { - CloudException exception = new CloudException(statusCode + " is not a valid polling status code"); - exception.setResponse(response); - if (responseBody != null) { - exception.setBody((CloudError) restClient().mapperAdapter().deserialize(responseBody.string(), CloudError.class)); - responseBody.close(); - } - throw exception; - } - - PollingState pollingState = new PollingState<>(response, this.getLongRunningOperationRetryTimeout(), resourceType, restClient().mapperAdapter()); - - // Check provisioning state - while (!AzureAsyncOperation.getTerminalStatuses().contains(pollingState.getStatus())) { - Thread.sleep(pollingState.getDelayInMilliseconds()); - - if (pollingState.getAzureAsyncOperationHeaderLink() != null - && !pollingState.getAzureAsyncOperationHeaderLink().isEmpty()) { - updateStateFromAzureAsyncOperationHeader(pollingState); - } else if (pollingState.getLocationHeaderLink() != null - && !pollingState.getLocationHeaderLink().isEmpty()) { - updateStateFromLocationHeaderOnPostOrDelete(pollingState); - } else { - CloudException exception = new CloudException("No header in response"); - exception.setResponse(response); - throw exception; - } - } - - // Check if operation failed - if (AzureAsyncOperation.getFailedStatuses().contains(pollingState.getStatus())) { - throw new CloudException("Async operation failed"); - } - - return new ServiceResponse<>(pollingState.getResource(), pollingState.getResponse()); + public ServiceResponse getPostOrDeleteResult(Observable> observable, Type resourceType) throws CloudException, InterruptedException, IOException { + Observable> asyncObservable = getPutOrPatchResultAsync(observable, resourceType); + return asyncObservable.toBlocking().last(); } /** * Handles an initial response from a POST or DELETE operation response by polling * the status of the operation until the long running operation terminates. * - * @param response the initial response from the POST or DELETE operation. + * @param observable the initial observable from the POST or DELETE operation. * @param resourceType the type of the resource * @param headerType the type of the response header * @param the return type of the caller @@ -325,12 +254,12 @@ public ServiceResponse getPostOrDeleteResult(Response respo * @throws InterruptedException interrupted exception * @throws IOException thrown by deserialization */ - public ServiceResponseWithHeaders getPostOrDeleteResultWithHeaders(Response response, Type resourceType, Class headerType) throws CloudException, InterruptedException, IOException { - ServiceResponse bodyResponse = getPostOrDeleteResult(response, resourceType); + public ServiceResponseWithHeaders getPostOrDeleteResultWithHeaders(Observable> observable, Type resourceType, Class headerType) throws CloudException, InterruptedException, IOException { + ServiceResponse bodyResponse = getPostOrDeleteResult(observable, resourceType); return new ServiceResponseWithHeaders<>( - bodyResponse.getBody(), - restClient().mapperAdapter().deserialize(restClient().mapperAdapter().serialize(bodyResponse.getResponse().headers()), headerType), - bodyResponse.getResponse() + bodyResponse.getBody(), + restClient().mapperAdapter().deserialize(restClient().mapperAdapter().serialize(bodyResponse.getResponse().headers()), headerType), + bodyResponse.getResponse() ); } @@ -339,61 +268,64 @@ public ServiceResponseWithHeaders getPostOrDeleteResult * the status of the operation asynchronously, calling the user provided callback * when the operation terminates. * - * @param response the initial response from the POST or DELETE operation. + * @param observable the initial response from the POST or DELETE operation. * @param the return type of the caller. * @param resourceType the type of the resource. - * @param serviceCall the ServiceCall object tracking Retrofit calls. - * @param callback the user callback to call when operation terminates. * @return the task describing the asynchronous polling. */ - public AsyncPollingTask getPostOrDeleteResultAsync(Response response, Type resourceType, ServiceCall serviceCall, ServiceCallback callback) { - if (response == null) { - CloudException t = new CloudException("response is null."); - if (callback != null) { - callback.failure(t); - } - serviceCall.failure(t); - return null; - } + public Observable> getPostOrDeleteResultAsync(Observable> observable, final Type resourceType) { + return observable + .flatMap(new Func1, Observable>>() { + @Override + public Observable> call(Response response) { + CloudException exception = createExceptionFromResponse(response, 200, 202, 204); + if (exception != null) { + return Observable.error(exception); + } - int statusCode = response.code(); - ResponseBody responseBody; - if (response.isSuccessful()) { - responseBody = response.body(); - } else { - responseBody = response.errorBody(); - } - if (statusCode != 200 && statusCode != 202 && statusCode != 204) { - CloudException exception = new CloudException(statusCode + " is not a valid polling status code"); - exception.setResponse(response); - try { - if (responseBody != null) { - exception.setBody((CloudError) restClient().mapperAdapter().deserialize(responseBody.string(), CloudError.class)); - responseBody.close(); + try { + final PollingState pollingState = new PollingState<>(response, getLongRunningOperationRetryTimeout(), resourceType, restClient().mapperAdapter()); + return Observable.just(pollingState) + // Emit a polling task intermittently + .repeatWhen(new Func1, Observable>() { + @Override + public Observable call(Observable observable) { + return observable.delay(pollingState.getDelayInMilliseconds(), TimeUnit.MILLISECONDS); + } + }) + // Conditionally polls if it's not a terminal status + .flatMap(new Func1, Observable>>() { + @Override + public Observable> call(PollingState pollingState) { + if (!AzureAsyncOperation.getTerminalStatuses().contains(pollingState.getStatus())) { + return postOrDeletePollingDispatcher(pollingState); + } + return Observable.just(pollingState); + } + }) + // The above process continues until this filter passes + .filter(new Func1, Boolean>() { + @Override + public Boolean call(PollingState pollingState) { + return AzureAsyncOperation.getTerminalStatuses().contains(pollingState.getStatus()); + } + }) + .first() + .flatMap(new Func1, Observable>>() { + @Override + public Observable> call(PollingState pollingState) { + if (AzureAsyncOperation.getFailedStatuses().contains(pollingState.getStatus())) { + return Observable.error(new CloudException("Async operation failed with provisioning state: " + pollingState.getStatus())); + } else { + return Observable.just(new ServiceResponse<>(pollingState.getResource(), pollingState.getResponse())); + } + } + }); + } catch (IOException e) { + return Observable.error(e); + } } - } catch (Exception e) { /* ignore serialization errors on top of service errors */ } - if (callback != null) { - callback.failure(exception); - } - serviceCall.failure(exception); - return null; - } - - PollingState pollingState; - try { - pollingState = new PollingState<>(response, this.getLongRunningOperationRetryTimeout(), resourceType, restClient().mapperAdapter()); - } catch (IOException e) { - if (callback != null) { - callback.failure(e); - } - serviceCall.failure(e); - return null; - } - - // Task runner will take it from here - PostDeletePollingTask task = new PostDeletePollingTask<>(pollingState, serviceCall, callback); - executor.schedule(task, pollingState.getDelayInMilliseconds(), TimeUnit.MILLISECONDS); - return task; + }); } /** @@ -401,42 +333,29 @@ public AsyncPollingTask getPostOrDeleteResultAsync(Response * the status of the operation asynchronously, calling the user provided callback * when the operation terminates. * - * @param response the initial response from the POST or DELETE operation. + * @param observable the initial observable from the POST or DELETE operation. * @param the return type of the caller * @param the type of the response header * @param resourceType the type of the resource. * @param headerType the type of the response header - * @param serviceCall the ServiceCall object tracking Retrofit calls. - * @param callback the user callback to call when operation terminates. * @return the task describing the asynchronous polling. */ - public AsyncPollingTask getPostOrDeleteResultWithHeadersAsync(Response response, Type resourceType, final Class headerType, final ServiceCall serviceCall, final ServiceCallback callback) { - return this.getPostOrDeleteResultAsync(response, resourceType, serviceCall, new ServiceCallback() { - @Override - public void failure(Throwable t) { - if (callback != null) { - callback.failure(t); - } - serviceCall.failure(t); - } - - @Override - public void success(ServiceResponse result) { - try { - ServiceResponseWithHeaders clientResponse = new ServiceResponseWithHeaders<>( - result.getBody(), - restClient().mapperAdapter().deserialize(restClient().mapperAdapter().serialize(result.getResponse().headers()), headerType), - result.getResponse() - ); - if (callback != null) { - callback.success(clientResponse); + public Observable> getPostOrDeleteResultWithHeadersAsync(Observable> observable, Type resourceType, final Class headerType) { + Observable> bodyResponse = getPostOrDeleteResultAsync(observable, resourceType); + return bodyResponse + .flatMap(new Func1, Observable>>() { + @Override + public Observable> call(ServiceResponse serviceResponse) { + try { + return Observable + .just(new ServiceResponseWithHeaders<>(serviceResponse.getBody(), + restClient().mapperAdapter().deserialize(restClient().mapperAdapter().serialize(serviceResponse.getResponse().headers()), headerType), + serviceResponse.getResponse())); + } catch (IOException e) { + return Observable.error(e); } - serviceCall.success(clientResponse); - } catch (IOException e) { - failure(e); } - } - }); + }); } /** @@ -445,52 +364,26 @@ public void success(ServiceResponse result) { * * @param pollingState the polling state for the current operation. * @param the return type of the caller. - * @throws CloudException REST exception - * @throws IOException thrown by deserialization */ - private void updateStateFromLocationHeaderOnPut(PollingState pollingState) throws CloudException, IOException { - Response response = poll(pollingState.getLocationHeaderLink()); - int statusCode = response.code(); - if (statusCode == 202) { - pollingState.setResponse(response); - pollingState.setStatus(AzureAsyncOperation.IN_PROGRESS_STATUS); - } else if (statusCode == 200 || statusCode == 201) { - pollingState.updateFromResponseOnPutPatch(response); - } - } - - /** - * Polls from the location header and updates the polling state with the - * polling response for a PUT operation. - * - * @param pollingState the polling state for the current operation. - * @param callback the user callback to call when operation terminates. - * @param the return type of the caller. - * @return the task describing the asynchronous polling. - */ - private Call updateStateFromLocationHeaderOnPutAsync(final PollingState pollingState, final ServiceCallback callback) { - return pollAsync(pollingState.getLocationHeaderLink(), new ServiceCallback() { - @Override - public void failure(Throwable t) { - callback.failure(t); - } - - @Override - public void success(ServiceResponse result) { - try { - int statusCode = result.getResponse().code(); + private Observable> updateStateFromLocationHeaderOnPutAsync(final PollingState pollingState) { + return pollAsync(pollingState.getLocationHeaderLink()) + .flatMap(new Func1, Observable>>() { + @Override + public Observable> call(Response response) { + int statusCode = response.code(); if (statusCode == 202) { - pollingState.setResponse(result.getResponse()); + pollingState.setResponse(response); pollingState.setStatus(AzureAsyncOperation.IN_PROGRESS_STATUS); } else if (statusCode == 200 || statusCode == 201) { - pollingState.updateFromResponseOnPutPatch(result.getResponse()); + try { + pollingState.updateFromResponseOnPutPatch(response); + } catch (CloudException | IOException e) { + return Observable.error(e); + } } - callback.success(new ServiceResponse<>(pollingState.getResource(), pollingState.getResponse())); - } catch (Throwable t) { - failure(t); + return Observable.just(pollingState); } - } - }); + }); } /** @@ -499,67 +392,26 @@ public void success(ServiceResponse result) { * * @param pollingState the polling state for the current operation. * @param the return type of the caller. - * @throws CloudException service exception - * @throws IOException thrown by deserialization */ - private void updateStateFromLocationHeaderOnPostOrDelete(PollingState pollingState) throws CloudException, IOException { - Response response = poll(pollingState.getLocationHeaderLink()); - int statusCode = response.code(); - if (statusCode == 202) { - pollingState.setResponse(response); - pollingState.setStatus(AzureAsyncOperation.IN_PROGRESS_STATUS); - } else if (statusCode == 200 || statusCode == 201 || statusCode == 204) { - pollingState.updateFromResponseOnDeletePost(response); - } - } - - /** - * Polls from the location header and updates the polling state with the - * polling response for a POST or DELETE operation. - * - * @param pollingState the polling state for the current operation. - * @param callback the user callback to call when operation terminates. - * @param the return type of the caller. - * @return the task describing the asynchronous polling. - */ - private Call updateStateFromLocationHeaderOnPostOrDeleteAsync(final PollingState pollingState, final ServiceCallback callback) { - return pollAsync(pollingState.getLocationHeaderLink(), new ServiceCallback() { - @Override - public void failure(Throwable t) { - callback.failure(t); - } - - @Override - public void success(ServiceResponse result) { - try { - int statusCode = result.getResponse().code(); + private Observable> updateStateFromLocationHeaderOnPostOrDeleteAsync(final PollingState pollingState) { + return pollAsync(pollingState.getLocationHeaderLink()) + .flatMap(new Func1, Observable>>() { + @Override + public Observable> call(Response response) { + int statusCode = response.code(); if (statusCode == 202) { - pollingState.setResponse(result.getResponse()); + pollingState.setResponse(response); pollingState.setStatus(AzureAsyncOperation.IN_PROGRESS_STATUS); } else if (statusCode == 200 || statusCode == 201 || statusCode == 204) { - pollingState.updateFromResponseOnDeletePost(result.getResponse()); + try { + pollingState.updateFromResponseOnDeletePost(response); + } catch (IOException e) { + return Observable.error(e); + } } - callback.success(new ServiceResponse<>(pollingState.getResource(), pollingState.getResponse())); - } catch (Throwable t) { - failure(t); + return Observable.just(pollingState); } - } - }); - } - - /** - * Polls from the provided URL and updates the polling state with the - * polling response. - * - * @param pollingState the polling state for the current operation. - * @param url the url to poll from - * @param the return type of the caller. - * @throws CloudException service exception - * @throws IOException thrown by deserialization - */ - private void updateStateFromGetResourceOperation(PollingState pollingState, String url) throws CloudException, IOException { - Response response = poll(url); - pollingState.updateFromResponseOnPutPatch(response); + }); } /** @@ -568,39 +420,21 @@ private void updateStateFromGetResourceOperation(PollingState pollingStat * * @param pollingState the polling state for the current operation. * @param url the url to poll from - * @param serviceCall the future based service call - * @param callback the user callback to call when operation terminates. * @param the return type of the caller. - * @return the task describing the asynchronous polling. */ - private Call updateStateFromGetResourceOperationAsync(final PollingState pollingState, String url, final ServiceCall serviceCall, final ServiceCallback callback) { - return pollAsync(url, new ServiceCallback() { - @Override - public void failure(Throwable t) { - if (callback != null) { - callback.failure(t); - } - if (serviceCall != null) { - serviceCall.failure(t); - } - } - - @Override - public void success(ServiceResponse result) { - try { - pollingState.updateFromResponseOnPutPatch(result.getResponse()); - ServiceResponse clientResponse = new ServiceResponse<>(pollingState.getResource(), pollingState.getResponse()); - if (callback != null) { - callback.success(clientResponse); - } - if (serviceCall != null) { - serviceCall.success(clientResponse); + private Observable> updateStateFromGetResourceOperationAsync(final PollingState pollingState, String url) { + return pollAsync(url) + .flatMap(new Func1, Observable>>() { + @Override + public Observable> call(Response response) { + try { + pollingState.updateFromResponseOnPutPatch(response); + return Observable.just(pollingState); + } catch (CloudException | IOException e) { + return Observable.error(e); } - } catch (Throwable t) { - failure(t); } - } - }); + }); } /** @@ -609,76 +443,34 @@ public void success(ServiceResponse result) { * * @param pollingState the polling state for the current operation. * @param the return type of the caller. - * @throws CloudException service exception - * @throws IOException thrown by deserialization */ - private void updateStateFromAzureAsyncOperationHeader(PollingState pollingState) throws CloudException, IOException { - Response response = poll(pollingState.getAzureAsyncOperationHeaderLink()); - - AzureAsyncOperation body = null; - if (response.body() != null) { - body = restClient().mapperAdapter().deserialize(response.body().string(), AzureAsyncOperation.class); - response.body().close(); - } - - if (body == null || body.getStatus() == null) { - CloudException exception = new CloudException("no body"); - exception.setResponse(response); - if (response.errorBody() != null) { - exception.setBody((CloudError) restClient().mapperAdapter().deserialize(response.errorBody().string(), CloudError.class)); - response.errorBody().close(); - } - throw exception; - } - - pollingState.setStatus(body.getStatus()); - pollingState.setResponse(response); - pollingState.setResource(null); - } - - /** - * Polls from the 'Azure-AsyncOperation' header and updates the polling - * state with the polling response. - * - * @param pollingState the polling state for the current operation. - * @param callback the user callback to call when operation terminates. - * @param the return type of the caller. - * @return the task describing the asynchronous polling. - */ - private Call updateStateFromAzureAsyncOperationHeaderAsync(final PollingState pollingState, final ServiceCallback callback) { - return pollAsync(pollingState.getAzureAsyncOperationHeaderLink(), new ServiceCallback() { - @Override - public void failure(Throwable t) { - callback.failure(t); - } - - @Override - public void success(ServiceResponse result) { - try { + private Observable> updateStateFromAzureAsyncOperationHeaderAsync(final PollingState pollingState) { + return pollAsync(pollingState.getAzureAsyncOperationHeaderLink()) + .flatMap(new Func1, Observable>>() { + @Override + public Observable> call(Response response) { AzureAsyncOperation body = null; - if (result.getBody() != null) { - body = restClient().mapperAdapter().deserialize(result.getBody().string(), AzureAsyncOperation.class); - result.getBody().close(); + if (response.body() != null) { + try { + body = restClient().mapperAdapter().deserialize(response.body().string(), AzureAsyncOperation.class); + response.body().close(); + } catch (IOException e) { + body = null; + } } + if (body == null || body.getStatus() == null) { - CloudException exception = new CloudException("no body"); - exception.setResponse(result.getResponse()); - if (result.getResponse().errorBody() != null) { - exception.setBody((CloudError) restClient().mapperAdapter().deserialize(result.getResponse().errorBody().string(), CloudError.class)); - result.getResponse().errorBody().close(); - } - failure(exception); - } else { - pollingState.setStatus(body.getStatus()); - pollingState.setResponse(result.getResponse()); - pollingState.setResource(null); - callback.success(new ServiceResponse<>(pollingState.getResource(), pollingState.getResponse())); + CloudException exception = new CloudException("polling response does not contain a valid body: " + body); + exception.setResponse(response); + return Observable.error(exception); } - } catch (IOException ex) { - failure(ex); + + pollingState.setStatus(body.getStatus()); + pollingState.setResponse(response); + pollingState.setResource(null); + return Observable.just(pollingState); } - } - }); + }); } /** @@ -686,260 +478,93 @@ public void success(ServiceResponse result) { * * @param url the URL to poll from. * @return the raw response. - * @throws CloudException REST exception - * @throws IOException thrown by deserialization - */ - private Response poll(String url) throws CloudException, IOException { - URL endpoint; - endpoint = new URL(url); - int port = endpoint.getPort(); - if (port == -1) { - port = endpoint.getDefaultPort(); - } - AsyncService service = restClient().retrofit().create(AsyncService.class); - Response response = service.get(endpoint.getFile(), serviceClientUserAgent).execute(); - int statusCode = response.code(); - if (statusCode != 200 && statusCode != 201 && statusCode != 202 && statusCode != 204) { - CloudException exception = new CloudException(statusCode + " is not a valid polling status code"); - exception.setResponse(response); - if (response.body() != null) { - exception.setBody((CloudError) restClient().mapperAdapter().deserialize(response.body().string(), CloudError.class)); - response.body().close(); - } else if (response.errorBody() != null) { - exception.setBody((CloudError) restClient().mapperAdapter().deserialize(response.errorBody().string(), CloudError.class)); - response.errorBody().close(); - } - throw exception; - } - return response; - } - - /** - * Polls asynchronously from the URL provided. - * - * @param url the URL to poll from. - * @param callback the user callback to call when operation terminates. - * @return the {@link Call} object from Retrofit. */ - private Call pollAsync(String url, final ServiceCallback callback) { + private Observable> pollAsync(String url) { URL endpoint; try { endpoint = new URL(url); } catch (MalformedURLException e) { - callback.failure(e); - return null; + return Observable.error(e); } int port = endpoint.getPort(); if (port == -1) { port = endpoint.getDefaultPort(); } AsyncService service = restClient().retrofit().create(AsyncService.class); - Call call = service.get(endpoint.getFile(), serviceClientUserAgent); - call.enqueue(new ServiceResponseCallback(null, callback) { - @Override - public void onResponse(Call call, Response response) { - try { - int statusCode = response.code(); - if (statusCode != 200 && statusCode != 201 && statusCode != 202 && statusCode != 204) { - CloudException exception = new CloudException(statusCode + " is not a valid polling status code"); - exception.setResponse(response); - if (response.body() != null) { - exception.setBody((CloudError) restClient().mapperAdapter().deserialize(response.body().string(), CloudError.class)); - response.body().close(); - } else if (response.errorBody() != null) { - exception.setBody((CloudError) restClient().mapperAdapter().deserialize(response.errorBody().string(), CloudError.class)); - response.errorBody().close(); - } - callback.failure(exception); - return; - } - callback.success(new ServiceResponse<>(response.body(), response)); - } catch (IOException ex) { - callback.failure(ex); - } - } - }); - return call; - } - - /** - * Gets the interval time between two long running operation polls. - * - * @return the time in milliseconds. - */ - public Integer getLongRunningOperationRetryTimeout() { - return longRunningOperationRetryTimeout; - } - - /** - * Sets the interval time between two long running operation polls. - * - * @param longRunningOperationRetryTimeout the time in milliseconds. - */ - public void withLongRunningOperationRetryTimeout(Integer longRunningOperationRetryTimeout) { - this.longRunningOperationRetryTimeout = longRunningOperationRetryTimeout; - } - - /** - * The Retrofit service used for polling. - */ - private interface AsyncService { - @GET - Call get(@Url String url, @Header("User-Agent") String userAgent); - } - - /** - * The task runner that describes the state of an asynchronous long running - * operation. - * - * @param the return type of the caller. - */ - abstract class AsyncPollingTask implements Runnable { - /** The {@link Call} object from Retrofit. */ - protected ServiceCall serviceCall; - /** The polling state for the current operation. */ - protected PollingState pollingState; - /** The callback used for asynchronous polling. */ - protected ServiceCallback pollingCallback; - /** The client callback to call when polling finishes. */ - protected ServiceCallback clientCallback; - } - - /** - * The task runner that handles PUT or PATCH operations. - * - * @param the return type of the caller. - */ - class PutPatchPollingTask extends AsyncPollingTask { - /** The URL to poll from. */ - private String url; - - /** - * Creates an instance of Polling task for PUT or PATCH operations. - * - * @param pollingState the current polling state. - * @param url the URL to poll from. - * @param serviceCall the ServiceCall object tracking Retrofit calls. - * @param clientCallback the client callback to call when a terminal status is hit. - */ - PutPatchPollingTask(final PollingState pollingState, final String url, final ServiceCall serviceCall, final ServiceCallback clientCallback) { - this.serviceCall = serviceCall; - this.pollingState = pollingState; - this.url = url; - this.clientCallback = clientCallback; - this.pollingCallback = new ServiceCallback() { + return service.get(endpoint.getFile(), serviceClientUserAgent) + .flatMap(new Func1, Observable>>() { @Override - public void failure(Throwable t) { - if (clientCallback != null) { - clientCallback.failure(t); + public Observable> call(Response response) { + CloudException exception = createExceptionFromResponse(response, 200, 201, 202, 204); + if (exception != null) { + return Observable.error(exception); + } else { + return Observable.just(response); } - serviceCall.failure(t); } + }); + } - @Override - public void success(ServiceResponse result) { - PutPatchPollingTask task = new PutPatchPollingTask<>(pollingState, url, serviceCall, clientCallback); - executor.schedule(task, pollingState.getDelayInMilliseconds(), TimeUnit.MILLISECONDS); - } - }; + private CloudException createExceptionFromResponse(Response response, Integer... allowedStatusCodes) { + int statusCode = response.code(); + ResponseBody responseBody; + if (response.isSuccessful()) { + responseBody = response.body(); + } else { + responseBody = response.errorBody(); } - - @Override - public void run() { - // Check provisioning state - if (!AzureAsyncOperation.getTerminalStatuses().contains(pollingState.getStatus())) { - if (pollingState.getAzureAsyncOperationHeaderLink() != null - && !pollingState.getAzureAsyncOperationHeaderLink().isEmpty()) { - this.serviceCall.newCall(updateStateFromAzureAsyncOperationHeaderAsync(pollingState, pollingCallback)); - } else if (pollingState.getLocationHeaderLink() != null - && !pollingState.getLocationHeaderLink().isEmpty()) { - this.serviceCall.newCall(updateStateFromLocationHeaderOnPutAsync(pollingState, pollingCallback)); - } else { - this.serviceCall.newCall(updateStateFromGetResourceOperationAsync(pollingState, url, null, pollingCallback)); - } - } else { - if (AzureAsyncOperation.SUCCESS_STATUS.equals(pollingState.getStatus()) && pollingState.getResource() == null) { - this.serviceCall.newCall(updateStateFromGetResourceOperationAsync(pollingState, url, serviceCall, clientCallback)); - } else if (AzureAsyncOperation.getFailedStatuses().contains(pollingState.getStatus())) { - ServiceException t = new ServiceException("Async operation failed"); - if (clientCallback != null) { - clientCallback.failure(t); - } - serviceCall.failure(t); + if (!Arrays.asList(allowedStatusCodes).contains(statusCode)) { + CloudException exception; + try { + CloudError errorBody = restClient().mapperAdapter().deserialize(responseBody.string(), CloudError.class); + if (errorBody != null) { + exception = new CloudException(errorBody.getMessage()); } else { - ServiceResponse clientResponse = new ServiceResponse<>(pollingState.getResource(), pollingState.getResponse()); - if (clientCallback != null) { - clientCallback.success(clientResponse); - } - serviceCall.success(clientResponse); + exception = new CloudException("Unknown error with status code " + statusCode); } + exception.setBody(errorBody); + exception.setResponse(response); + return exception; + } catch (Exception e) { + /* ignore serialization errors on top of service errors */ + return new CloudException("Unknown error with status code " + statusCode, e); } } + return null; } - /** - * The task runner that handles POST or DELETE operations. - * - * @param the return type of the caller. - */ - class PostDeletePollingTask extends AsyncPollingTask { - /** - * Creates an instance of Polling task for POST or DELETE operations. - * - * @param pollingState the current polling state. - * @param serviceCall the ServiceCall object tracking Retrofit calls. - * @param clientCallback the client callback to call when a terminal status is hit. - */ - PostDeletePollingTask(final PollingState pollingState, final ServiceCall serviceCall, final ServiceCallback clientCallback) { - this.serviceCall = serviceCall; - this.pollingState = pollingState; - this.clientCallback = clientCallback; - this.pollingCallback = new ServiceCallback() { - @Override - public void failure(Throwable t) { - if (clientCallback != null) { - clientCallback.failure(t); - } - serviceCall.failure(t); - } - - @Override - public void success(ServiceResponse result) { - PostDeletePollingTask task = new PostDeletePollingTask<>(pollingState, serviceCall, clientCallback); - executor.schedule(task, pollingState.getDelayInMilliseconds(), TimeUnit.MILLISECONDS); - } - }; + private Observable> putOrPatchPollingDispatcher(PollingState pollingState, String url) { + if (pollingState.getAzureAsyncOperationHeaderLink() != null + && !pollingState.getAzureAsyncOperationHeaderLink().isEmpty()) { + return updateStateFromAzureAsyncOperationHeaderAsync(pollingState); + } else if (pollingState.getLocationHeaderLink() != null + && !pollingState.getLocationHeaderLink().isEmpty()) { + return updateStateFromLocationHeaderOnPutAsync(pollingState); + } else { + return updateStateFromGetResourceOperationAsync(pollingState, url); } + } - @Override - public void run() { - if (!AzureAsyncOperation.getTerminalStatuses().contains(pollingState.getStatus())) { - if (pollingState.getAzureAsyncOperationHeaderLink() != null - && !pollingState.getAzureAsyncOperationHeaderLink().isEmpty()) { - updateStateFromAzureAsyncOperationHeaderAsync(pollingState, pollingCallback); - } else if (pollingState.getLocationHeaderLink() != null - && !pollingState.getLocationHeaderLink().isEmpty()) { - updateStateFromLocationHeaderOnPostOrDeleteAsync(pollingState, pollingCallback); - } else { - ServiceException serviceException = new ServiceException("No async header in response"); - pollingCallback.failure(serviceException); - } - } else { - // Check if operation failed - if (AzureAsyncOperation.getFailedStatuses().contains(pollingState.getStatus())) { - ServiceException serviceException = new ServiceException("Async operation failed"); - if (clientCallback != null) { - clientCallback.failure(serviceException); - } - serviceCall.failure(serviceException); - } else { - ServiceResponse serviceResponse = new ServiceResponse<>(pollingState.getResource(), pollingState.getResponse()); - if (clientCallback != null) { - clientCallback.success(serviceResponse); - } - serviceCall.success(serviceResponse); - } - } + private Observable> postOrDeletePollingDispatcher(PollingState pollingState) { + if (pollingState.getAzureAsyncOperationHeaderLink() != null + && !pollingState.getAzureAsyncOperationHeaderLink().isEmpty()) { + return updateStateFromAzureAsyncOperationHeaderAsync(pollingState); + } else if (pollingState.getLocationHeaderLink() != null + && !pollingState.getLocationHeaderLink().isEmpty()) { + return updateStateFromLocationHeaderOnPostOrDeleteAsync(pollingState); + } else { + CloudException exception = new CloudException("Response does not contain an Azure-AsyncOperation or Location header."); + exception.setBody(pollingState.getError()); + exception.setResponse(pollingState.getResponse()); + return Observable.error(exception); } } -} + + /** + * The Retrofit service used for polling. + */ + private interface AsyncService { + @GET + Observable> get(@Url String url, @Header("User-Agent") String userAgent); + } +} \ No newline at end of file diff --git a/azure-client-runtime/src/main/java/com/microsoft/azure/AzureServiceCall.java b/azure-client-runtime/src/main/java/com/microsoft/azure/AzureServiceCall.java new file mode 100644 index 0000000000000..6fb08726d1cb3 --- /dev/null +++ b/azure-client-runtime/src/main/java/com/microsoft/azure/AzureServiceCall.java @@ -0,0 +1,125 @@ +/** + * + * Copyright (c) Microsoft Corporation. All rights reserved. + * Licensed under the MIT License. See License.txt in the project root for license information. + * + */ + +package com.microsoft.azure; + +import com.microsoft.rest.ServiceCall; +import com.microsoft.rest.ServiceResponse; +import com.microsoft.rest.ServiceResponseWithHeaders; +import rx.Observable; +import rx.Subscriber; +import rx.functions.Func1; + +import java.util.List; + +/** + * An instance of this class provides access to the underlying REST call invocation. + * This class wraps around the Retrofit Call object and allows updates to it in the + * progress of a long running operation or a paging operation. + * + * @param the type of the returning object + */ +public final class AzureServiceCall extends ServiceCall { + private AzureServiceCall() { + } + + /** + * Creates a ServiceCall from a paging operation. + * + * @param first the observable to the first page + * @param next the observable to poll subsequent pages + * @param callback the client-side callback + * @param the element type + * @return the future based ServiceCall + */ + public static ServiceCall> create(Observable>> first, final Func1>>> next, final ListOperationCallback callback) { + final AzureServiceCall> serviceCall = new AzureServiceCall<>(); + final PagingSubscriber subscriber = new PagingSubscriber<>(serviceCall, next, callback); + serviceCall.setSubscription(first + .single() + .subscribe(subscriber)); + return serviceCall; + } + + /** + * Creates a ServiceCall from a paging operation that returns a header response. + * + * @param first the observable to the first page + * @param next the observable to poll subsequent pages + * @param callback the client-side callback + * @param the element type + * @param the header object type + * @return the future based ServiceCall + */ + public static ServiceCall> createWithHeaders(Observable, V>> first, final Func1, V>>> next, final ListOperationCallback callback) { + final AzureServiceCall> serviceCall = new AzureServiceCall<>(); + final PagingSubscriber subscriber = new PagingSubscriber<>(serviceCall, new Func1>>>() { + @Override + public Observable>> call(String s) { + return next.call(s) + .map(new Func1, V>, ServiceResponse>>() { + @Override + public ServiceResponse> call(ServiceResponseWithHeaders, V> pageVServiceResponseWithHeaders) { + return pageVServiceResponseWithHeaders; + } + }); + } + }, callback); + serviceCall.setSubscription(first + .single() + .subscribe(subscriber)); + return serviceCall; + } + + /** + * The subscriber that handles user callback and automatically subscribes to the next page. + * + * @param the element type + */ + private static class PagingSubscriber extends Subscriber>> { + private AzureServiceCall> serviceCall; + private Func1>>> next; + private ListOperationCallback callback; + private ServiceResponse> lastResponse; + + PagingSubscriber(final AzureServiceCall> serviceCall, final Func1>>> next, final ListOperationCallback callback) { + this.serviceCall = serviceCall; + this.next = next; + this.callback = callback; + } + + @Override + public void onCompleted() { + // do nothing + } + + @Override + public void onError(Throwable e) { + serviceCall.setException(e); + if (callback != null) { + callback.failure(e); + } + } + + @Override + public void onNext(ServiceResponse> serviceResponse) { + lastResponse = serviceResponse; + ListOperationCallback.PagingBehavior behavior = ListOperationCallback.PagingBehavior.CONTINUE; + if (callback != null) { + behavior = callback.progress(serviceResponse.getBody().getItems()); + if (behavior == ListOperationCallback.PagingBehavior.STOP || serviceResponse.getBody().getNextPageLink() == null) { + callback.success(); + } + } + if (behavior == ListOperationCallback.PagingBehavior.STOP || serviceResponse.getBody().getNextPageLink() == null) { + serviceCall.set(new ServiceResponse<>(lastResponse.getBody().getItems(), lastResponse.getResponse())); + } else { + serviceCall.setSubscription(next.call(serviceResponse.getBody().getNextPageLink()).single().subscribe(this)); + } + } + } +} diff --git a/azure-client-runtime/src/main/java/com/microsoft/azure/ListOperationCallback.java b/azure-client-runtime/src/main/java/com/microsoft/azure/ListOperationCallback.java index 6e0b2ea92a96b..63dc60f9f7c6b 100644 --- a/azure-client-runtime/src/main/java/com/microsoft/azure/ListOperationCallback.java +++ b/azure-client-runtime/src/main/java/com/microsoft/azure/ListOperationCallback.java @@ -8,6 +8,7 @@ package com.microsoft.azure; import com.microsoft.rest.ServiceCallback; +import com.microsoft.rest.ServiceResponse; import java.util.List; @@ -36,16 +37,14 @@ public ListOperationCallback() { /** * Override this method to handle progressive results. - * The user is responsible for returning a {@link PagingBahavior} Enum to indicate + * The user is responsible for returning a {@link PagingBehavior} Enum to indicate * whether the client should continue loading or stop. * * @param partial the list of resources from the current request. * @return CONTINUE if you want to go on loading, STOP otherwise. * */ - public PagingBahavior progress(List partial) { - return PagingBahavior.CONTINUE; - } + public abstract PagingBehavior progress(List partial); /** * Get the list result that stores the accumulated resources loaded from server. @@ -71,6 +70,16 @@ public void load(List result) { } } + @Override + public void success(ServiceResponse> result) { + success(); + } + + /** + * Override this method to handle successful REST call results. + */ + public abstract void success(); + /** * Get the number of loaded pages. * @@ -83,7 +92,7 @@ public int pageCount() { /** * An enum to indicate whether the client should continue loading or stop. */ - public enum PagingBahavior { + public enum PagingBehavior { /** * Indicates that the client should continue loading. */ diff --git a/azure-client-runtime/src/main/java/com/microsoft/azure/PagedList.java b/azure-client-runtime/src/main/java/com/microsoft/azure/PagedList.java index a88a044f751e7..8db10337d9898 100644 --- a/azure-client-runtime/src/main/java/com/microsoft/azure/PagedList.java +++ b/azure-client-runtime/src/main/java/com/microsoft/azure/PagedList.java @@ -9,8 +9,6 @@ import com.microsoft.rest.RestException; -import javax.xml.bind.DataBindingException; -import javax.xml.ws.WebServiceException; import java.io.IOException; import java.util.ArrayList; import java.util.Collection; @@ -19,6 +17,8 @@ import java.util.ListIterator; import java.util.NoSuchElementException; +import javax.xml.bind.DataBindingException; + /** * Defines a list response from a paging operation. The pages are * lazy initialized when an instance of this class is iterated. @@ -47,7 +47,10 @@ public PagedList() { */ public PagedList(Page page) { this(); - items.addAll(page.getItems()); + List retrievedItems = page.getItems(); + if (retrievedItems != null && retrievedItems.size() != 0) { + items.addAll(retrievedItems); + } nextPageLink = page.getNextPageLink(); currentPage = page; } @@ -81,8 +84,6 @@ public void loadNextPage() { this.nextPageLink = nextPage.getNextPageLink(); this.items.addAll(nextPage.getItems()); this.currentPage = nextPage; - } catch (RestException e) { - throw new WebServiceException(e.toString(), e); } catch (IOException e) { throw new DataBindingException(e.getMessage(), e); } @@ -140,14 +141,17 @@ public boolean hasNext() { public E next() { if (!itemsListItr.hasNext()) { if (!hasNextPage()) { - throw new NoSuchElementException(); + throw new NoSuchElementException(); } else { int size = items.size(); loadNextPage(); itemsListItr = items.listIterator(size); } } - return itemsListItr.next(); + if (itemsListItr.hasNext()) { + return itemsListItr.next(); + } + return null; } @Override diff --git a/azure-client-runtime/src/main/java/com/microsoft/azure/PollingState.java b/azure-client-runtime/src/main/java/com/microsoft/azure/PollingState.java index 50255ba600899..0a952b6d2334f 100644 --- a/azure-client-runtime/src/main/java/com/microsoft/azure/PollingState.java +++ b/azure-client-runtime/src/main/java/com/microsoft/azure/PollingState.java @@ -100,7 +100,7 @@ public void updateFromResponseOnPutPatch(Response response) throws } if (responseContent == null || responseContent.isEmpty()) { - CloudException exception = new CloudException("no body"); + CloudException exception = new CloudException("polling response does not contain a valid body"); exception.setResponse(response); throw exception; } diff --git a/azure-client-runtime/src/main/java/com/microsoft/azure/RestClient.java b/azure-client-runtime/src/main/java/com/microsoft/azure/RestClient.java index 0ace881f0ee51..8ff7d89c4c21a 100644 --- a/azure-client-runtime/src/main/java/com/microsoft/azure/RestClient.java +++ b/azure-client-runtime/src/main/java/com/microsoft/azure/RestClient.java @@ -20,6 +20,7 @@ import okhttp3.OkHttpClient; import okhttp3.logging.HttpLoggingInterceptor; import retrofit2.Retrofit; +import retrofit2.adapter.rxjava.RxJavaCallAdapterFactory; import java.lang.reflect.Field; import java.net.CookieManager; @@ -168,6 +169,7 @@ public Builder(OkHttpClient.Builder httpClientBuilder, Retrofit.Builder retrofit // Set up OkHttp client this.httpClientBuilder = httpClientBuilder .cookieJar(new JavaNetCookieJar(cookieManager)) + .readTimeout(30, TimeUnit.SECONDS) .addInterceptor(userAgentInterceptor); this.retrofitBuilder = retrofitBuilder; this.buildable = new Buildable(); @@ -338,6 +340,7 @@ public RestClient build() { .baseUrl(baseUrl) .client(httpClient) .addConverterFactory(mapperAdapter.getConverterFactory()) + .addCallAdapterFactory(RxJavaCallAdapterFactory.create()) .build(), credentials, customHeadersInterceptor, diff --git a/azure-client-runtime/src/main/java/com/microsoft/azure/TaskGroup.java b/azure-client-runtime/src/main/java/com/microsoft/azure/TaskGroup.java index 3ad514d93874a..ada803b38c3e8 100644 --- a/azure-client-runtime/src/main/java/com/microsoft/azure/TaskGroup.java +++ b/azure-client-runtime/src/main/java/com/microsoft/azure/TaskGroup.java @@ -7,8 +7,7 @@ package com.microsoft.azure; -import com.microsoft.rest.ServiceCall; -import com.microsoft.rest.ServiceCallback; +import rx.Observable; /** * Represents a group of related tasks. @@ -48,22 +47,12 @@ public interface TaskGroup> { */ void prepare(); - /** - * Executes the tasks in the group. - *

- * the order of execution of tasks ensure that a task gets selected for execution only after - * the execution of all the tasks it depends on - * @throws Exception the exception - */ - void execute() throws Exception; - /** * Executes the tasks in the group asynchronously. * - * @param callback the callback to call on failure or success * @return the handle to the REST call */ - ServiceCall executeAsync(ServiceCallback callback); + Observable executeAsync(); /** * Gets the result of execution of a task in the group. diff --git a/azure-client-runtime/src/main/java/com/microsoft/azure/TaskGroupBase.java b/azure-client-runtime/src/main/java/com/microsoft/azure/TaskGroupBase.java index e53bfa0a01962..747f0273fa7d6 100644 --- a/azure-client-runtime/src/main/java/com/microsoft/azure/TaskGroupBase.java +++ b/azure-client-runtime/src/main/java/com/microsoft/azure/TaskGroupBase.java @@ -7,11 +7,11 @@ package com.microsoft.azure; -import com.microsoft.rest.ServiceCall; -import com.microsoft.rest.ServiceCallback; -import com.microsoft.rest.ServiceResponse; +import rx.Observable; +import rx.functions.Func1; -import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.ArrayList; +import java.util.List; /** * The base implementation of TaskGroup interface. @@ -25,11 +25,6 @@ public abstract class TaskGroupBase> * Stores the tasks in this group and their dependency information. */ private DAGraph> dag; - /** - * ServiceCall instance that aggregate a set of ServiceCalls each associated with - * the task in this group. - */ - private ParallelServiceCall parallelServiceCall; /** * Creates TaskGroupBase. @@ -39,7 +34,6 @@ public abstract class TaskGroupBase> */ public TaskGroupBase(String rootTaskItemId, U rootTaskItem) { this.dag = new DAGraph<>(new DAGNode<>(rootTaskItemId, rootTaskItem)); - this.parallelServiceCall = new ParallelServiceCall(); } @Override @@ -65,117 +59,30 @@ public void prepare() { } @Override - public void execute() throws Exception { + public Observable executeAsync() { DAGNode nextNode = dag.getNext(); + final List> observables = new ArrayList<>(); while (nextNode != null) { - nextNode.data().execute(); - this.dag().reportedCompleted(nextNode); + final DAGNode thisNode = nextNode; + observables.add(nextNode.data().executeAsync() + .flatMap(new Func1>() { + @Override + public Observable call(T t) { + dag().reportedCompleted(thisNode); + if (dag().isRootNode(thisNode)) { + return Observable.just(t); + } else { + return executeAsync(); + } + } + })); nextNode = dag.getNext(); } - } - - @Override - public ServiceCall executeAsync(final ServiceCallback callback) { - executeReadyTasksAsync(callback); - return parallelServiceCall; + return Observable.merge(observables); } @Override public T taskResult(String taskId) { return dag.getNodeData(taskId).result(); } - - /** - * Executes all runnable tasks, a task is runnable when all the tasks its depends - * on are finished running. - * - * @param callback the callback - */ - private void executeReadyTasksAsync(final ServiceCallback callback) { - DAGNode nextNode = dag.getNext(); - while (nextNode != null) { - ServiceCall serviceCall = nextNode.data().executeAsync(taskCallback(nextNode, callback)); - this.parallelServiceCall.addCall(serviceCall); - nextNode = dag.getNext(); - } - } - - /** - * This method create and return a callback for the runnable task stored in the given node. - * This callback wraps the given callback. - * - * @param taskNode the node containing runnable task - * @param callback the callback to wrap - * @return the task callback - */ - private ServiceCallback taskCallback(final DAGNode taskNode, final ServiceCallback callback) { - final TaskGroupBase self = this; - return new ServiceCallback() { - @Override - public void failure(Throwable t) { - callback.failure(t); - parallelServiceCall.failure(t); - } - - @Override - public void success(ServiceResponse result) { - self.dag().reportedCompleted(taskNode); - if (self.dag().isRootNode(taskNode)) { - if (callback != null) { - callback.success(result); - } - parallelServiceCall.success(result); - } else { - self.executeReadyTasksAsync(callback); - } - } - }; - } - - /** - * Type represents a set of REST calls running possibly in parallel. - */ - private class ParallelServiceCall extends ServiceCall { - private ConcurrentLinkedQueue> serviceCalls; - - /** - * Creates a ParallelServiceCall. - */ - ParallelServiceCall() { - super(null); - this.serviceCalls = new ConcurrentLinkedQueue<>(); - } - - /** - * Cancels all the service calls currently executing. - */ - public void cancel() { - for (ServiceCall call : this.serviceCalls) { - call.cancel(true); - } - } - - /** - * @return true if the call has been canceled; false otherwise. - */ - public boolean isCancelled() { - for (ServiceCall call : this.serviceCalls) { - if (!call.isCancelled()) { - return false; - } - } - return true; - } - - /** - * Add a call to the list of parallel calls. - * - * @param call the call - */ - private void addCall(ServiceCall call) { - if (call != null) { - this.serviceCalls.add(call); - } - } - } } diff --git a/azure-client-runtime/src/main/java/com/microsoft/azure/TaskItem.java b/azure-client-runtime/src/main/java/com/microsoft/azure/TaskItem.java index 1a4bd9a61bf40..dd24167c89681 100644 --- a/azure-client-runtime/src/main/java/com/microsoft/azure/TaskItem.java +++ b/azure-client-runtime/src/main/java/com/microsoft/azure/TaskItem.java @@ -7,8 +7,7 @@ package com.microsoft.azure; -import com.microsoft.rest.ServiceCall; -import com.microsoft.rest.ServiceCallback; +import rx.Observable; /** * Type representing a task in a task group {@link TaskGroup}. @@ -21,22 +20,12 @@ public interface TaskItem { */ U result(); - /** - * Executes the task. - *

- * once executed the result will be available through result getter - * - * @throws Exception exception - */ - void execute() throws Exception; - /** * Executes the task asynchronously. *

* once executed the result will be available through result getter * - * @param callback callback to call on success or failure * @return the handle of the REST call */ - ServiceCall executeAsync(ServiceCallback callback); + Observable executeAsync(); } diff --git a/client-runtime/build.gradle b/client-runtime/build.gradle index e530072694672..d64308d3b3f71 100644 --- a/client-runtime/build.gradle +++ b/client-runtime/build.gradle @@ -28,6 +28,8 @@ dependencies { compile 'com.squareup.okhttp3:logging-interceptor:3.3.1' compile 'com.squareup.okhttp3:okhttp-urlconnection:3.3.1' compile 'com.squareup.retrofit2:converter-jackson:2.0.2' + compile 'com.squareup.retrofit2:adapter-rxjava:2.0.2' + compile 'io.reactivex:rxjava:1.1.8' compile 'com.fasterxml.jackson.datatype:jackson-datatype-joda:2.7.2' compile 'org.apache.commons:commons-lang3:3.4' testCompile 'junit:junit:4.12' diff --git a/client-runtime/pom.xml b/client-runtime/pom.xml index e63a421434b38..042de93fc929d 100644 --- a/client-runtime/pom.xml +++ b/client-runtime/pom.xml @@ -79,6 +79,14 @@ org.apache.commons commons-lang3 + + io.reactivex + rxjava + + + com.squareup.retrofit2 + adapter-rxjava + junit junit diff --git a/client-runtime/src/main/java/com/microsoft/rest/RestException.java b/client-runtime/src/main/java/com/microsoft/rest/RestException.java index cf5dff556aea0..a11ac3c516d2c 100644 --- a/client-runtime/src/main/java/com/microsoft/rest/RestException.java +++ b/client-runtime/src/main/java/com/microsoft/rest/RestException.java @@ -10,7 +10,7 @@ /** * Exception thrown for an invalid response with custom error information. */ -public abstract class RestException extends Exception { +public abstract class RestException extends RuntimeException { /** * Initializes a new instance of the AutoRestException class. */ diff --git a/client-runtime/src/main/java/com/microsoft/rest/ServiceCall.java b/client-runtime/src/main/java/com/microsoft/rest/ServiceCall.java index c0cb3b62ebddd..be18dd23a8064 100644 --- a/client-runtime/src/main/java/com/microsoft/rest/ServiceCall.java +++ b/client-runtime/src/main/java/com/microsoft/rest/ServiceCall.java @@ -8,8 +8,9 @@ package com.microsoft.rest; import com.google.common.util.concurrent.AbstractFuture; - -import retrofit2.Call; +import rx.Observable; +import rx.Subscription; +import rx.functions.Action1; /** * An instance of this class provides access to the underlying REST call invocation. @@ -22,54 +23,110 @@ public class ServiceCall extends AbstractFuture> { /** * The Retrofit method invocation. */ - private Call call; + private Subscription subscription; + + protected ServiceCall() { + } /** - * Creates an instance of ServiceCall. + * Creates a ServiceCall from an observable object. * - * @param call the Retrofit call to wrap around. + * @param observable the observable to create from + * @param the type of the response + * @return the created ServiceCall */ - public ServiceCall(Call call) { - this.call = call; + public static ServiceCall create(final Observable> observable) { + final ServiceCall serviceCall = new ServiceCall<>(); + serviceCall.subscription = observable + .last() + .subscribe(new Action1>() { + @Override + public void call(ServiceResponse t) { + serviceCall.set(t); + } + }, new Action1() { + @Override + public void call(Throwable throwable) { + serviceCall.setException(throwable); + } + }); + return serviceCall; } /** - * Updates the current Retrofit call object. + * Creates a ServiceCall from an observable object and a callback. * - * @param call the new call object. + * @param observable the observable to create from + * @param callback the callback to call when events happen + * @param the type of the response + * @return the created ServiceCall */ - public void newCall(Call call) { - this.call = call; + public static ServiceCall create(final Observable> observable, final ServiceCallback callback) { + final ServiceCall serviceCall = new ServiceCall<>(); + serviceCall.subscription = observable + .last() + .subscribe(new Action1>() { + @Override + public void call(ServiceResponse t) { + if (callback != null) { + callback.success(t); + } + serviceCall.set(t); + } + }, new Action1() { + @Override + public void call(Throwable throwable) { + if (callback != null) { + callback.failure(throwable); + } + serviceCall.setException(throwable); + } + }); + return serviceCall; } /** - * Gets the current Retrofit call object. + * Creates a ServiceCall from an observable and a callback for a header response. * - * @return the current call object. + * @param observable the observable of a REST call that returns JSON in a header + * @param callback the callback to call when events happen + * @param the type of the response body + * @param the type of the response header + * @return the created ServiceCall */ - public Call getCall() { - return call; + public static ServiceCall createWithHeaders(final Observable> observable, final ServiceCallback callback) { + final ServiceCall serviceCall = new ServiceCall<>(); + serviceCall.subscription = observable + .last() + .subscribe(new Action1>() { + @Override + public void call(ServiceResponse t) { + if (callback != null) { + callback.success(t); + } + serviceCall.set(t); + } + }, new Action1() { + @Override + public void call(Throwable throwable) { + if (callback != null) { + callback.failure(throwable); + } + serviceCall.setException(throwable); + } + }); + return serviceCall; } /** - * Cancel the Retrofit call if possible. Parameter - * 'mayInterruptIfRunning is ignored. - * - * @param mayInterruptIfRunning ignored + * @return the current Rx subscription associated with the ServiceCall. */ - @Override - public boolean cancel(boolean mayInterruptIfRunning) { - if (isCancelled()) { - return false; - } else { - call.cancel(); - return true; - } + public Subscription getSubscription() { + return subscription; } - @Override - public boolean isCancelled() { - return call.isCanceled(); + protected void setSubscription(Subscription subscription) { + this.subscription = subscription; } /** @@ -83,14 +140,14 @@ public boolean success(ServiceResponse result) { return set(result); } - /** - * Invoke this method to report a failure, allowing - * {@link AbstractFuture#get()} to throw the exception. - * - * @param t the exception thrown. - * @return true if successfully reported; false otherwise. - */ - public boolean failure(Throwable t) { - return setException(t); + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + subscription.unsubscribe(); + return super.cancel(mayInterruptIfRunning); + } + + @Override + public boolean isCancelled() { + return subscription.isUnsubscribed(); } } diff --git a/client-runtime/src/main/java/com/microsoft/rest/ServiceClient.java b/client-runtime/src/main/java/com/microsoft/rest/ServiceClient.java index 5d6f0f850336c..dd279bc230120 100644 --- a/client-runtime/src/main/java/com/microsoft/rest/ServiceClient.java +++ b/client-runtime/src/main/java/com/microsoft/rest/ServiceClient.java @@ -16,6 +16,7 @@ import okhttp3.JavaNetCookieJar; import okhttp3.OkHttpClient; import retrofit2.Retrofit; +import retrofit2.adapter.rxjava.RxJavaCallAdapterFactory; /** * ServiceClient is the abstraction for accessing REST operations and their payload data types. @@ -65,6 +66,7 @@ protected ServiceClient(String baseUrl, OkHttpClient.Builder clientBuilder, Retr .baseUrl(baseUrl) .client(httpClient) .addConverterFactory(mapperAdapter.getConverterFactory()) + .addCallAdapterFactory(RxJavaCallAdapterFactory.create()) .build(); } diff --git a/client-runtime/src/main/java/com/microsoft/rest/ServiceResponseCallback.java b/client-runtime/src/main/java/com/microsoft/rest/ServiceResponseCallback.java deleted file mode 100644 index a808869960294..0000000000000 --- a/client-runtime/src/main/java/com/microsoft/rest/ServiceResponseCallback.java +++ /dev/null @@ -1,51 +0,0 @@ -/** - * - * Copyright (c) Microsoft Corporation. All rights reserved. - * Licensed under the MIT License. See License.txt in the project root for license information. - * - */ - -package com.microsoft.rest; - -import okhttp3.ResponseBody; -import retrofit2.Call; -import retrofit2.Callback; - -/** - * Inner callback used to merge both successful and failed responses into one - * callback for customized response handling in a response handling delegate. - * - * @param the response body type - */ -public abstract class ServiceResponseCallback implements Callback { - /** - * The client service call object. - */ - private ServiceCall serviceCall; - - /** - * The client callback. - */ - private ServiceCallback serviceCallback; - - /** - * Creates an instance of ServiceResponseCallback. - * - * @param serviceCall the client service call to call on a terminal state. - * @param serviceCallback the client callback to call on a terminal state. - */ - public ServiceResponseCallback(ServiceCall serviceCall, ServiceCallback serviceCallback) { - this.serviceCall = serviceCall; - this.serviceCallback = serviceCallback; - } - - @Override - public void onFailure(Call call, Throwable t) { - if (serviceCallback != null) { - serviceCallback.failure(t); - } - if (serviceCall != null) { - serviceCall.failure(t); - } - } -} diff --git a/client-runtime/src/main/java/com/microsoft/rest/ServiceResponseEmptyCallback.java b/client-runtime/src/main/java/com/microsoft/rest/ServiceResponseEmptyCallback.java deleted file mode 100644 index 2a82667795b27..0000000000000 --- a/client-runtime/src/main/java/com/microsoft/rest/ServiceResponseEmptyCallback.java +++ /dev/null @@ -1,50 +0,0 @@ -/** - * - * Copyright (c) Microsoft Corporation. All rights reserved. - * Licensed under the MIT License. See License.txt in the project root for license information. - * - */ - -package com.microsoft.rest; - -import retrofit2.Call; -import retrofit2.Callback; - -/** - * Inner callback used to merge both successful and failed responses into one - * callback for customized response handling in a response handling delegate. - * - * @param the response body type - */ -public abstract class ServiceResponseEmptyCallback implements Callback { - /** - * The client service call object. - */ - private ServiceCall serviceCall; - - /** - * The client callback. - */ - private ServiceCallback serviceCallback; - - /** - * Creates an instance of ServiceResponseCallback. - * - * @param serviceCall the client service call to call on a terminal state. - * @param serviceCallback the client callback to call on a terminal state. - */ - public ServiceResponseEmptyCallback(ServiceCall serviceCall, ServiceCallback serviceCallback) { - this.serviceCall = serviceCall; - this.serviceCallback = serviceCallback; - } - - @Override - public void onFailure(Call call, Throwable t) { - if (serviceCallback != null) { - serviceCallback.failure(t); - } - if (serviceCall != null) { - serviceCall.failure(t); - } - } -} diff --git a/pom.xml b/pom.xml index aa77260c5893f..5255bc70f9618 100644 --- a/pom.xml +++ b/pom.xml @@ -121,6 +121,16 @@ adal 1.1.11 + + io.reactivex + rxjava + 1.1.8 + + + com.squareup.retrofit2 + adapter-rxjava + 2.0.2 + junit junit