Skip to content

Commit

Permalink
SyncPollingStrategy Compliment to PollingStrategy (#31923)
Browse files Browse the repository at this point in the history
SyncPollingStrategy Compliment to PollingStrategy
  • Loading branch information
alzimmermsft authored Nov 3, 2022
1 parent 4675956 commit 24a912a
Show file tree
Hide file tree
Showing 11 changed files with 1,036 additions and 121 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,31 +5,30 @@

import com.azure.core.exception.AzureException;
import com.azure.core.http.HttpHeader;
import com.azure.core.http.HttpHeaderName;
import com.azure.core.http.HttpMethod;
import com.azure.core.http.HttpPipeline;
import com.azure.core.http.HttpRequest;
import com.azure.core.http.HttpResponse;
import com.azure.core.http.rest.Response;
import com.azure.core.implementation.ImplUtils;
import com.azure.core.implementation.http.HttpHeadersHelper;
import com.azure.core.implementation.serializer.DefaultJsonSerializer;
import com.azure.core.util.BinaryData;
import com.azure.core.util.Context;
import com.azure.core.util.CoreUtils;
import com.azure.core.util.FluxUtil;
import com.azure.core.util.logging.ClientLogger;
import com.azure.core.util.polling.implementation.PollResult;
import com.azure.core.util.polling.implementation.PollingConstants;
import com.azure.core.util.polling.implementation.PollingUtils;
import com.azure.core.util.serializer.ObjectSerializer;
import com.azure.core.util.serializer.TypeReference;
import com.fasterxml.jackson.annotation.JsonSetter;
import reactor.core.publisher.Mono;

import java.net.MalformedURLException;
import java.net.URL;
import java.time.Duration;
import java.time.OffsetDateTime;
import java.util.Locale;
import java.util.Objects;

import static com.azure.core.util.polling.implementation.PollingUtils.getAbsolutePath;
Expand All @@ -43,14 +42,15 @@
*/
public class OperationResourcePollingStrategy<T, U> implements PollingStrategy<T, U> {
private static final ClientLogger LOGGER = new ClientLogger(OperationResourcePollingStrategy.class);
private static final String DEFAULT_OPERATION_LOCATION_HEADER = "Operation-Location";
private static final String DEFAULT_OPERATION_LOCATION_HEADER_LOWER_CASE = "operation-location";
private static final HttpHeaderName DEFAULT_OPERATION_LOCATION_HEADER
= HttpHeaderName.fromString("Operation-Location");
private static final TypeReference<PollResult> POLL_RESULT_TYPE_REFERENCE
= TypeReference.createInstance(PollResult.class);

private final HttpPipeline httpPipeline;
private final ObjectSerializer serializer;
private final String endpoint;
private final String operationLocationHeaderName;
private final String operationLocationHeaderNameLowerCase;
private final HttpHeaderName operationLocationHeaderName;
private final Context context;

/**
Expand All @@ -60,7 +60,7 @@ public class OperationResourcePollingStrategy<T, U> implements PollingStrategy<T
* @param httpPipeline an instance of {@link HttpPipeline} to send requests with
*/
public OperationResourcePollingStrategy(HttpPipeline httpPipeline) {
this(httpPipeline, new DefaultJsonSerializer(), DEFAULT_OPERATION_LOCATION_HEADER, Context.NONE);
this(httpPipeline, null, new DefaultJsonSerializer(), DEFAULT_OPERATION_LOCATION_HEADER, Context.NONE);
}

/**
Expand Down Expand Up @@ -97,23 +97,24 @@ public OperationResourcePollingStrategy(HttpPipeline httpPipeline, ObjectSeriali
*/
public OperationResourcePollingStrategy(HttpPipeline httpPipeline, String endpoint, ObjectSerializer serializer,
String operationLocationHeaderName, Context context) {
this(httpPipeline, endpoint, serializer,
operationLocationHeaderName == null ? null : HttpHeaderName.fromString(operationLocationHeaderName),
context);
}

private OperationResourcePollingStrategy(HttpPipeline httpPipeline, String endpoint,
ObjectSerializer serializer, HttpHeaderName operationLocationHeaderName, Context context) {
this.httpPipeline = Objects.requireNonNull(httpPipeline, "'httpPipeline' cannot be null");
this.endpoint = endpoint;
this.serializer = serializer != null ? serializer : new DefaultJsonSerializer();
if (operationLocationHeaderName != null) {
this.operationLocationHeaderName = operationLocationHeaderName;
this.operationLocationHeaderNameLowerCase = operationLocationHeaderName.toLowerCase(Locale.ROOT);
} else {
this.operationLocationHeaderName = DEFAULT_OPERATION_LOCATION_HEADER;
this.operationLocationHeaderNameLowerCase = DEFAULT_OPERATION_LOCATION_HEADER_LOWER_CASE;
}
this.operationLocationHeaderName = (operationLocationHeaderName == null)
? DEFAULT_OPERATION_LOCATION_HEADER : operationLocationHeaderName;
this.context = context == null ? Context.NONE : context;
}

@Override
public Mono<Boolean> canPoll(Response<?> initialResponse) {
HttpHeader operationLocationHeader = HttpHeadersHelper.getNoKeyFormatting(initialResponse.getHeaders(),
operationLocationHeaderNameLowerCase);
HttpHeader operationLocationHeader = initialResponse.getHeaders().get(operationLocationHeaderName);
if (operationLocationHeader != null) {
try {
new URL(getAbsolutePath(operationLocationHeader.getValue(), endpoint, LOGGER));
Expand All @@ -128,12 +129,10 @@ public Mono<Boolean> canPoll(Response<?> initialResponse) {
@Override
public Mono<PollResponse<T>> onInitialResponse(Response<?> response, PollingContext<T> pollingContext,
TypeReference<T> pollResponseType) {
HttpHeader operationLocationHeader = HttpHeadersHelper.getNoKeyFormatting(response.getHeaders(),
operationLocationHeaderNameLowerCase);
HttpHeader locationHeader = HttpHeadersHelper.getNoKeyFormatting(response.getHeaders(),
PollingConstants.LOCATION_LOWER_CASE);
HttpHeader operationLocationHeader = response.getHeaders().get(operationLocationHeaderName);
HttpHeader locationHeader = response.getHeaders().get(HttpHeaderName.LOCATION);
if (operationLocationHeader != null) {
pollingContext.setData(operationLocationHeaderName,
pollingContext.setData(operationLocationHeaderName.getCaseSensitiveName(),
getAbsolutePath(operationLocationHeader.getValue(), endpoint, LOGGER));
}
if (locationHeader != null) {
Expand Down Expand Up @@ -161,12 +160,12 @@ public Mono<PollResponse<T>> onInitialResponse(Response<?> response, PollingCont

@Override
public Mono<PollResponse<T>> poll(PollingContext<T> pollingContext, TypeReference<T> pollResponseType) {
HttpRequest request = new HttpRequest(HttpMethod.GET, pollingContext.getData(operationLocationHeaderName));
HttpRequest request = new HttpRequest(HttpMethod.GET, pollingContext.getData(operationLocationHeaderName
.getCaseSensitiveName()));
return FluxUtil.withContext(context1 -> httpPipeline.send(request,
CoreUtils.mergeContexts(context1, this.context))).flatMap(response -> response.getBodyAsByteArray()
.map(BinaryData::fromBytes)
.flatMap(binaryData -> PollingUtils.deserializeResponse(
binaryData, serializer, new TypeReference<PollResult>() { })
.flatMap(binaryData -> PollingUtils.deserializeResponse(binaryData, serializer, POLL_RESULT_TYPE_REFERENCE)
.map(pollResult -> {
final String resourceLocation = pollResult.getResourceLocation();
if (resourceLocation != null) {
Expand Down Expand Up @@ -217,77 +216,4 @@ public Mono<U> getResult(PollingContext<T> pollingContext, TypeReference<U> resu
.flatMap(binaryData -> PollingUtils.deserializeResponse(binaryData, serializer, resultType));
}
}

/**
* A simple structure representing the partial response received from an operation location URL, containing the
* information of the status of the long running operation.
*/
private static class PollResult {
private LongRunningOperationStatus status;
private String resourceLocation;

/**
* Gets the status of the long running operation.
* @return the status represented as a {@link LongRunningOperationStatus}
*/
public LongRunningOperationStatus getStatus() {
return status;
}

/**
* Sets the long running operation status in the format of a string returned by the service. This is called by
* the deserializer when a response is received.
*
* @param status the status of the long running operation
* @return the modified PollResult instance
*/
@JsonSetter
public PollResult setStatus(String status) {
if (PollingConstants.STATUS_NOT_STARTED.equalsIgnoreCase(status)) {
this.status = LongRunningOperationStatus.NOT_STARTED;
} else if (PollingConstants.STATUS_IN_PROGRESS.equalsIgnoreCase(status)
|| PollingConstants.STATUS_RUNNING.equalsIgnoreCase(status)) {
this.status = LongRunningOperationStatus.IN_PROGRESS;
} else if (PollingConstants.STATUS_SUCCEEDED.equalsIgnoreCase(status)) {
this.status = LongRunningOperationStatus.SUCCESSFULLY_COMPLETED;
} else if (PollingConstants.STATUS_FAILED.equalsIgnoreCase(status)) {
this.status = LongRunningOperationStatus.FAILED;
} else {
this.status = LongRunningOperationStatus.fromString(status, true);
}
return this;
}

/**
* Sets the long running operation status in the format of the {@link LongRunningOperationStatus} enum.
*
* @param status the status of the long running operation
* @return the modified PollResult instance
*/
public PollResult setStatus(LongRunningOperationStatus status) {
this.status = status;
return this;
}

/**
* Gets the resource location URL to get the final result. This is often available in the response when the
* long running operation has been successfully completed.
*
* @return the resource location URL to get he final result
*/
public String getResourceLocation() {
return resourceLocation;
}

/**
* Sets the resource location URL. this should only be called by the deserializer when a response is received.
*
* @param resourceLocation the resource location URL
* @return the modified PollResult instance
*/
public PollResult setResourceLocation(String resourceLocation) {
this.resourceLocation = resourceLocation;
return this;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.core.util.polling;

import com.azure.core.http.rest.Response;
import com.azure.core.util.logging.ClientLogger;
import com.azure.core.util.serializer.TypeReference;

import java.util.Collections;
import java.util.List;
import java.util.Objects;

/**
* A synchronous polling strategy that chains multiple synchronous polling strategies, finds the first strategy that can
* poll the current long-running operation, and polls with that strategy.
*
* @param <T> the type of the response type from a polling call, or BinaryData if raw response body should be kept
* @param <U> the type of the final result object to deserialize into, or BinaryData if raw response body should be
* kept
*/
public final class SyncChainedPollingStrategy<T, U> implements SyncPollingStrategy<T, U> {
private static final ClientLogger LOGGER = new ClientLogger(SyncChainedPollingStrategy.class);

private final List<SyncPollingStrategy<T, U>> pollingStrategies;
private SyncPollingStrategy<T, U> pollableStrategy = null;

/**
* Creates a synchronous chained polling strategy with a list of polling strategies.
*
* @param strategies the list of synchronous polling strategies
* @throws NullPointerException If {@code strategies} is null.
* @throws IllegalArgumentException If {@code strategies} is an empty list.
*/
public SyncChainedPollingStrategy(List<SyncPollingStrategy<T, U>> strategies) {
Objects.requireNonNull(strategies, "'strategies' cannot be null.");
if (strategies.isEmpty()) {
throw LOGGER.logExceptionAsError(new IllegalArgumentException("'strategies' cannot be empty."));
}
this.pollingStrategies = Collections.unmodifiableList(strategies);
}

@Override
public boolean canPoll(Response<?> initialResponse) {
// Find the first strategy that can poll in series so that
// pollableStrategy is only set once
for (SyncPollingStrategy<T, U> strategy : pollingStrategies) {
if (strategy.canPoll(initialResponse)) {
this.pollableStrategy = strategy;
return true;
}
}

return false;
}

/**
* {@inheritDoc}
*
* @throws NullPointerException if {@link #canPoll(Response)} is not called prior to this, or if it returns false.
*/
@Override
public U getResult(PollingContext<T> context, TypeReference<U> resultType) {
return pollableStrategy.getResult(context, resultType);
}

/**
* {@inheritDoc}
*
* @throws NullPointerException if {@link #canPoll(Response)} is not called prior to this, or if it returns false.
*/
@Override
public PollResponse<T> onInitialResponse(Response<?> response, PollingContext<T> pollingContext,
TypeReference<T> pollResponseType) {
return pollableStrategy.onInitialResponse(response, pollingContext, pollResponseType);
}

/**
* {@inheritDoc}
*
* @throws NullPointerException if {@link #canPoll(Response)} is not called prior to this, or if it returns false.
*/
@Override
public PollResponse<T> poll(PollingContext<T> context, TypeReference<T> pollResponseType) {
return pollableStrategy.poll(context, pollResponseType);
}

/**
* {@inheritDoc}
*
* @throws NullPointerException if {@link #canPoll(Response)} is not called prior to this, or if it returns false.
*/
@Override
public T cancel(PollingContext<T> pollingContext, PollResponse<T> initialResponse) {
return pollableStrategy.cancel(pollingContext, initialResponse);
}
}
Loading

0 comments on commit 24a912a

Please sign in to comment.