Skip to content

Commit

Permalink
Merge pull request #3525 from mapfish/backport/3524-to-3.31
Browse files Browse the repository at this point in the history
[Backport 3.31] Close resources to stop Apache lib from pending
  • Loading branch information
sebr72 authored Dec 20, 2024
2 parents 543dfeb + 25a4d09 commit 0042714
Show file tree
Hide file tree
Showing 10 changed files with 152 additions and 91 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,15 @@
* This request factory will attempt to load resources using {@link
* org.mapfish.print.config.Configuration#loadFile(String)} and {@link
* org.mapfish.print.config.Configuration#isAccessible(String)} to load the resources if the http
* method is GET and will fallback to the normal/wrapped factory to make http requests.
* method is GET and will fall back to the normal/wrapped factory to make http requests.
*/
public final class ConfigFileResolvingHttpRequestFactory implements MfClientHttpRequestFactory {
private final Configuration config;
@Nonnull private final Map<String, String> mdcContext;
private final MfClientHttpRequestFactoryImpl httpRequestFactory;
private final List<RequestConfigurator> callbacks = new CopyOnWriteArrayList<>();

/** Maximum number of attempts to try to fetch the same http request in case it is failing. */
@Value("${httpRequest.fetchRetry.maxNumber}")
private int httpRequestMaxNumberFetchRetry;

Expand Down Expand Up @@ -54,7 +55,9 @@ public void register(@Nonnull final RequestConfigurator callback) {
}

@Override
public ClientHttpRequest createRequest(final URI uri, final HttpMethod httpMethod) {
@Nonnull
public ClientHttpRequest createRequest(
@Nonnull final URI uri, @Nonnull final HttpMethod httpMethod) {
return new ConfigFileResolvingRequest(this, uri, httpMethod);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,73 +141,83 @@ private ClientHttpResponse doFileRequest() throws IOException {
}

private ClientHttpResponse doHttpRequestWithRetry(final HttpHeaders headers) throws IOException {
AtomicInteger counter = new AtomicInteger();
final AtomicInteger counter = new AtomicInteger();
do {
logFetchingURIResource(headers);
try {
ClientHttpResponse response = attemptToFetchResponse(headers, counter);
ClientHttpResponse response = attemptToFetchResponse(headers);
if (response != null) {
return response;
} else {
if (canRetry(counter)) {
sleepWithExceptionHandling();
LOGGER.debug("Retry fetching {}", this.getURI());
} else {
PrintException printException = new PrintException("Failed fetching " + getURI());
LOGGER.debug("Throwing exception since it cannot retry.", printException);
throw printException;
}
}
} catch (final IOException | RuntimeException e) {
boolean hasSlept = sleepIfPossible(e, counter);
if (!hasSlept) {
throw e;
}
} catch (final IOException e) {
handleIOException(e, counter);
}
} while (true);
}

private void logFetchingURIResource(final HttpHeaders headers) {
// Display headers, one by line <name>: <value>
LOGGER.debug(
"Fetching URI resource {}, headers:\n{}",
"Fetching {}, using headers:\n{}",
this.getURI(),
headers.entrySet().stream()
.map(entry -> entry.getKey() + "=" + String.join(", ", entry.getValue()))
.collect(Collectors.joining("\n")));
}

private ClientHttpResponse attemptToFetchResponse(
final HttpHeaders headers, final AtomicInteger counter) throws IOException {
private ClientHttpResponse attemptToFetchResponse(final HttpHeaders headers) throws IOException {
ClientHttpRequest requestUsed =
this.request != null ? this.request : createRequestFromWrapped(headers);
LOGGER.debug("Executing http request: {}", requestUsed.getURI());
ClientHttpResponse response = executeCallbacksAndRequest(requestUsed);
if (response.getRawStatusCode() < 500) {
LOGGER.debug(
"Fetching success URI resource {}, status code {}",
getURI(),
response.getRawStatusCode());
return response;
}
LOGGER.debug(
"Fetching failed URI resource {}, error code {}", getURI(), response.getRawStatusCode());
if (canRetry(counter)) {
sleepWithExceptionHandling();
LOGGER.debug("Retry fetching URI resource {}", this.getURI());
} else {
throw new PrintException(
String.format(
"Fetching failed URI resource %s, error code %s",
getURI(), response.getRawStatusCode()));
final int minStatusCodeError = HttpStatus.INTERNAL_SERVER_ERROR.value();
int rawStatusCode = minStatusCodeError;
try {
rawStatusCode = response.getRawStatusCode();
if (rawStatusCode < minStatusCodeError) {
LOGGER.debug("Successfully fetched {}, with status code {}", getURI(), rawStatusCode);
return response;
}
LOGGER.debug("Failed fetching {}, with error code {}", getURI(), rawStatusCode);
return null;
} finally {
if (rawStatusCode >= minStatusCodeError) {
response.close();
}
}
return null;
}

private void handleIOException(final IOException e, final AtomicInteger counter)
throws IOException {

private boolean sleepIfPossible(final Exception e, final AtomicInteger counter) {
if (canRetry(counter)) {
sleepWithExceptionHandling();
LOGGER.debug("Retry fetching URI resource {}", this.getURI());
LOGGER.debug("Retry fetching {} following exception", this.getURI(), e);
return true;
} else {
LOGGER.debug("Fetching failed URI resource {}", getURI());
throw e;
LOGGER.debug(
"Reached maximum number of {} allowed requests attempts for {}",
getHttpRequestMaxNumberFetchRetry(),
getURI());
return false;
}
}

private void sleepWithExceptionHandling() {
int sleepMs = configFileResolvingHttpRequestFactory.getHttpRequestFetchRetryIntervalMillis();
LOGGER.debug("Sleeping {} ms before retrying.", sleepMs);
try {
TimeUnit.MILLISECONDS.sleep(
configFileResolvingHttpRequestFactory.getHttpRequestFetchRetryIntervalMillis());
TimeUnit.MILLISECONDS.sleep(sleepMs);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PrintException("Interrupted while sleeping", e);
Expand Down
22 changes: 13 additions & 9 deletions core/src/main/java/org/mapfish/print/http/HttpRequestFetcher.java
Original file line number Diff line number Diff line change
Expand Up @@ -87,18 +87,22 @@ private CachedClientHttpResponse(final ClientHttpResponse originalResponse) thro
this.headers = originalResponse.getHeaders();
this.status = originalResponse.getRawStatusCode();
this.statusText = originalResponse.getStatusText();
this.cachedFile =
this.cachedFile = createCachedFile(originalResponse.getBody());
}

private File createCachedFile(final InputStream originalBody) throws IOException {
File tempFile =
File.createTempFile("cacheduri", null, HttpRequestFetcher.this.temporaryDirectory);
LOGGER.debug("Caching URI resource to {}", this.cachedFile);
try (OutputStream os = Files.newOutputStream(this.cachedFile.toPath())) {
InputStream body = originalResponse.getBody();
LOGGER.debug("Caching URI resource to {}", tempFile);
try (OutputStream os = Files.newOutputStream(tempFile.toPath())) {
LOGGER.debug(
"Get from input stream {}, for response {}, body available: {}",
body.getClass(),
originalResponse.getClass(),
body.available());
IOUtils.copy(body, os);
"Get from input stream {}, body available: {}",
originalBody.getClass(),
originalBody.available());
IOUtils.copy(originalBody, os);
originalBody.close();
}
return tempFile;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
Expand All @@ -31,6 +32,7 @@
import org.apache.http.protocol.HTTP;
import org.apache.http.protocol.HttpContext;
import org.mapfish.print.config.Configuration;
import org.mapfish.print.servlet.job.impl.ThreadPoolJobManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.HttpHeaders;
Expand All @@ -52,30 +54,53 @@ public class MfClientHttpRequestFactoryImpl extends HttpComponentsClientHttpRequ
* @param maxConnTotal Maximum total connections.
* @param maxConnPerRoute Maximum connections per route.
*/
public MfClientHttpRequestFactoryImpl(final int maxConnTotal, final int maxConnPerRoute) {
super(createHttpClient(maxConnTotal, maxConnPerRoute));
public MfClientHttpRequestFactoryImpl(
final int maxConnTotal,
final int maxConnPerRoute,
final ThreadPoolJobManager threadPoolJobManager) {
super(createHttpClient(maxConnTotal, maxConnPerRoute, threadPoolJobManager));
}

@Nullable
static Configuration getCurrentConfiguration() {
return CURRENT_CONFIGURATION.get();
}

private static int getIntProperty(final String name) {
/**
* Return the number of milliseconds until the timeout Use the Automatic cancellation timeout if
* not defined.
*
* @param name timeout idemtifier
* @return the number of milliseconds until the timeout
*/
private static int getTimeoutValue(
final String name, final ThreadPoolJobManager threadPoolJobManager) {
final String value = System.getProperty(name);
if (value == null) {
return -1;
long millis = TimeUnit.SECONDS.toMillis(threadPoolJobManager.getTimeout());
if (millis > Integer.MAX_VALUE) {
LOGGER.warn(
"The value of {} is too large. The timeout will be set to the maximum value of {}",
name,
Integer.MAX_VALUE);
return Integer.MAX_VALUE;
} else {
return Integer.parseInt(Long.toString(millis));
}
}
return Integer.parseInt(value);
}

private static CloseableHttpClient createHttpClient(
final int maxConnTotal, final int maxConnPerRoute) {
final int maxConnTotal,
final int maxConnPerRoute,
final ThreadPoolJobManager threadPoolJobManager) {
final RequestConfig requestConfig =
RequestConfig.custom()
.setConnectionRequestTimeout(getIntProperty("http.connectionRequestTimeout"))
.setConnectTimeout(getIntProperty("http.connectTimeout"))
.setSocketTimeout(getIntProperty("http.socketTimeout"))
.setConnectionRequestTimeout(
getTimeoutValue("http.connectionRequestTimeout", threadPoolJobManager))
.setConnectTimeout(getTimeoutValue("http.connectTimeout", threadPoolJobManager))
.setSocketTimeout(getTimeoutValue("http.socketTimeout", threadPoolJobManager))
.build();

final HttpClientBuilder httpClientBuilder =
Expand All @@ -89,11 +114,19 @@ private static CloseableHttpClient createHttpClient(
.setMaxConnTotal(maxConnTotal)
.setMaxConnPerRoute(maxConnPerRoute)
.setUserAgent(UserAgentCreator.getUserAgent());
return httpClientBuilder.build();
CloseableHttpClient closeableHttpClient = httpClientBuilder.build();
LOGGER.debug(
"Created CloseableHttpClient using connectionRequestTimeout: {} connectTimeout: {}"
+ " socketTimeout: {}",
getTimeoutValue("http.connectionRequestTimeout", threadPoolJobManager),
getTimeoutValue("http.connectTimeout", threadPoolJobManager),
getTimeoutValue("http.socketTimeout", threadPoolJobManager));
return closeableHttpClient;
}

// allow extension only for testing
@Override
@Nonnull
public ConfigurableRequest createRequest(
@Nonnull final URI uri, @Nonnull final HttpMethod httpMethod) {
HttpRequestBase httpRequest = (HttpRequestBase) createHttpUriRequest(httpMethod, uri);
Expand Down Expand Up @@ -161,20 +194,24 @@ public HttpMethod getMethod() {
return HttpMethod.valueOf(this.request.getMethod());
}

@Nonnull
@Override
public String getMethodValue() {
return this.request.getMethod();
}

@Nonnull
public URI getURI() {
return this.request.getURI();
}

@Nonnull
@Override
protected OutputStream getBodyInternal(@Nonnull final HttpHeaders headers) {
return this.outputStream;
}

@Nonnull
@Override
protected Response executeInternal(@Nonnull final HttpHeaders headers) throws IOException {
CURRENT_CONFIGURATION.set(this.configuration);
Expand Down Expand Up @@ -207,7 +244,7 @@ protected Response executeInternal(@Nonnull final HttpHeaders headers) throws IO
}
}

static class Response extends AbstractClientHttpResponse {
public static class Response extends AbstractClientHttpResponse {
private static final Logger LOGGER = LoggerFactory.getLogger(Response.class);
private static final AtomicInteger ID_COUNTER = new AtomicInteger();
private final HttpResponse response;
Expand All @@ -224,6 +261,7 @@ public int getRawStatusCode() {
return this.response.getStatusLine().getStatusCode();
}

@Nonnull
@Override
public String getStatusText() {
return this.response.getStatusLine().getReasonPhrase();
Expand All @@ -247,6 +285,7 @@ public void close() {
LOGGER.trace("Closed Http Response object: {}", this.id);
}

@Nonnull
@Override
public synchronized InputStream getBody() throws IOException {
if (this.inputStream == null) {
Expand All @@ -262,6 +301,7 @@ public synchronized InputStream getBody() throws IOException {
return this.inputStream;
}

@Nonnull
@Override
public HttpHeaders getHeaders() {
final HttpHeaders translatedHeaders = new HttpHeaders();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,10 @@ public final void setTimeout(final long timeout) {
this.timeout = timeout;
}

public final long getTimeout() {
return this.timeout;
}

public final void setAbandonedTimeout(final long abandonedTimeout) {
this.abandonedTimeout = abandonedTimeout;
}
Expand Down Expand Up @@ -452,10 +456,13 @@ private boolean updateRegistry() {
final SubmittedPrintJob printJob = submittedJobs.next();
if (!printJob.getReportFuture().isDone()
&& (isTimeoutExceeded(printJob) || isAbandoned(printJob))) {
LOGGER.info("Canceling job after timeout {}", printJob.getEntry().getReferenceId());
LOGGER.info(
"About to attempt timeout based automatic cancellation of job {}",
printJob.getEntry().getReferenceId());
if (!printJob.getReportFuture().cancel(true)) {
LOGGER.info(
"Could not cancel job after timeout {}", printJob.getEntry().getReferenceId());
"Automatic cancellation after timeout failed for job {}",
printJob.getEntry().getReferenceId());
}
// remove all canceled tasks from the work queue (otherwise the queue comparator
// might stumble on non-PrintJob entries)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@
<bean id="healthCheckRegistry" class="org.mapfish.print.metrics.HealthCheckingRegistry"/>
<bean id="httpClientFactory" class="org.mapfish.print.http.MfClientHttpRequestFactoryImpl">
<constructor-arg index="0" value="${maxConnectionsTotal}" />
<constructor-arg index="1" value="${maxConnectionsPerRoute}" />
<constructor-arg index="1" value="${maxConnectionsPerRoute}"/>
<constructor-arg index="2" ref="jobManager"/>
</bean>
<bean id="metricNameStrategy" class="org.mapfish.print.metrics.MetricsNameStrategyFactory" factory-method="hostAndMethod" />
<bean id="loggingMetricsConfigurator" class="org.mapfish.print.metrics.LoggingMetricsConfigurator" lazy-init="false"/>
Expand Down
Loading

0 comments on commit 0042714

Please sign in to comment.