Skip to content

Commit

Permalink
Use conditional GET request to download external mapping (#298)
Browse files Browse the repository at this point in the history
  • Loading branch information
matvey-mtn authored Mar 22, 2022
1 parent 64691a7 commit f1d208e
Show file tree
Hide file tree
Showing 5 changed files with 144 additions and 25 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package io.logz.sawmill.http;

import java.util.Map;

public class ExternalMappingResponse {
private final Long lastModified;
private final Map<String, Iterable<String>> mappings;

public ExternalMappingResponse(Long lastModified, Map<String, Iterable<String>> mappings) {
this.lastModified = lastModified;
this.mappings = mappings;
}

public boolean isModified() {
return mappings != null;
}

public Long getLastModified() {
return lastModified;
}

public Map<String, Iterable<String>> getMappings() {
return mappings;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@
import java.net.HttpURLConnection;
import java.net.MalformedURLException;
import java.net.URL;
import java.time.Instant;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
Expand All @@ -18,14 +22,11 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.http.HttpHeaders;
import static com.google.common.base.Preconditions.checkState;

public class ExternalMappingsClient {

private static final Logger logger = LoggerFactory.getLogger(ExternalMappingsClient.class);

private final URL mappingSourceUrl;
private final int connectTimeout;
private final int readTimeout;
Expand All @@ -36,28 +37,30 @@ public ExternalMappingsClient(ExternalMappingSourceProcessor.Configuration confi
this.readTimeout = configuration.getExternalMappingReadTimeout();
}

public Map<String, Iterable<String>> loadMappings() {
Map<String, Iterable<String>> mappings = new HashMap<>();
public ExternalMappingResponse loadMappings(Long lastModified) throws IOException {
HttpURLConnection conn = (HttpURLConnection) mappingSourceUrl.openConnection();

try {
HttpURLConnection conn = (HttpURLConnection) mappingSourceUrl.openConnection();
conn.setConnectTimeout(connectTimeout);
conn.setReadTimeout(readTimeout);
setIfModifiedSinceHeader(conn, lastModified);
conn.setConnectTimeout(connectTimeout);
conn.setReadTimeout(readTimeout);

if (conn.getResponseCode() != HttpURLConnection.HTTP_OK) {
throw new HttpRequestExecutionException("Couldn't load external mappings. Message: " + conn.getResponseMessage());
}
if (conn.getResponseCode() == HttpURLConnection.HTTP_NOT_MODIFIED) {
return new ExternalMappingResponse(conn.getLastModified(), null);
}

loadMappingsFromHttpConnection(conn, mappings);
} catch (IOException e) {
logger.error("Failed to get external mappings", e);
throw new HttpRequestExecutionException(e.getMessage(), e);
if (conn.getResponseCode() != HttpURLConnection.HTTP_OK) {
throw new HttpRequestExecutionException(
String.format("Couldn't load external mappings. Response status: %d Message: %s",
conn.getResponseCode(), conn.getResponseMessage())
);
}

return mappings;
Map<String, Iterable<String>> mappings = loadMappingsFromHttpConnection(conn);
return new ExternalMappingResponse(conn.getLastModified(), mappings);
}

private void loadMappingsFromHttpConnection(HttpURLConnection connection, Map<String, Iterable<String>> mappings) throws IOException {
private Map<String, Iterable<String>> loadMappingsFromHttpConnection(HttpURLConnection connection) throws IOException {
Map<String, Iterable<String>> mappings = new HashMap<>();
MappingSizeTracker mappingSizeTracker = new MappingSizeTracker();

try (BufferedReader in = new BufferedReader(new InputStreamReader(connection.getInputStream()))) {
Expand All @@ -73,6 +76,8 @@ private void loadMappingsFromHttpConnection(HttpURLConnection connection, Map<St
mappings.merge(entry.getLeft(), entry.getRight(), Iterables::concat);
}
}

return mappings;
}

private void validateMappingSize(MappingSizeTracker mappingSizeTracker) {
Expand Down Expand Up @@ -107,6 +112,12 @@ private Pair<String, Iterable<String>> toKeyValuePair(String inputLine) {
return new ImmutablePair<>(key, values);
}

private void setIfModifiedSinceHeader(HttpURLConnection conn, Long lastModified) {
String lastModifiedValue = DateTimeFormatter.RFC_1123_DATE_TIME.format(
ZonedDateTime.ofInstant(Instant.ofEpochMilli(lastModified), ZoneOffset.UTC));
conn.setRequestProperty(HttpHeaders.IF_MODIFIED_SINCE, lastModifiedValue);
}

private static class MappingSizeTracker {
private long linesCount = 0;
private long bytesCount = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import io.logz.sawmill.Processor;
import io.logz.sawmill.annotations.ProcessorProvider;
import io.logz.sawmill.exceptions.ProcessorConfigurationException;
import io.logz.sawmill.http.ExternalMappingResponse;
import io.logz.sawmill.http.ExternalMappingsClient;
import io.logz.sawmill.utilities.JsonUtils;
import java.net.MalformedURLException;
Expand All @@ -17,6 +18,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import org.apache.commons.lang3.StringEscapeUtils;
import org.apache.commons.lang3.StringUtils;
Expand All @@ -40,6 +42,8 @@ public class ExternalMappingSourceProcessor implements Processor {
private final Supplier<Void> lazyInitSupplier;

private volatile Map<String, Iterable<String>> keyValueMappingsCache;

private final AtomicLong lastModifiedTime = new AtomicLong();
private volatile boolean refreshErrorOccurred = false;

public ExternalMappingSourceProcessor(Configuration configuration) throws MalformedURLException {
Expand All @@ -65,7 +69,14 @@ private Void lazyInit() {
@VisibleForTesting
protected void refreshExternalMapping() {
try {
keyValueMappingsCache = externalMappingsClient.loadMappings();
ExternalMappingResponse externalMappingResponse = externalMappingsClient.loadMappings(lastModifiedTime.get());
if (!externalMappingResponse.isModified()) {
logger.debug("External mapping didn't change since {}. Skipping refresh.", lastModifiedTime.get());
return;
}

this.keyValueMappingsCache = externalMappingResponse.getMappings();
this.lastModifiedTime.set(externalMappingResponse.getLastModified());
refreshErrorOccurred = false;
} catch (Exception e) {
logger.error("Cannot load external mapping for field {} due to an unexpected error", sourceField, e);
Expand Down Expand Up @@ -201,7 +212,8 @@ public ExternalMappingSourceProcessor create(Map<String, Object> config) {
}

public static final class Constants {
private Constants() {}
private Constants() {
}

public static final int MINIMUM_REFRESH_PERIOD_IN_MILLIS = 15_000;
public static final int DISABLE_MAPPING_REFRESH = -1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,13 @@
import com.github.tomakehurst.wiremock.junit.WireMockClassRule;
import io.logz.sawmill.Doc;
import java.net.MalformedURLException;
import java.time.Instant;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
import org.apache.http.HttpHeaders;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
Expand All @@ -27,7 +32,9 @@ public class ExternalMappingProcessorRefreshTest {

public static final String MAPPING_REFRESH_SCENARIO = "Books Mapping Refresh";
public static final String MAPPING_NOT_FOUND_AFTER_REFRESH_SCENARIO = "Books Mapping Not Found After Refresh";
public static final String MAPPING_NOT_MODIFIED_AFTER_REFRESH_SCENARIO = "Books Mapping Not Modified After Refresh";
public static final String UPDATED_BOOKS_MAPPING_STATE = "Updated Books Mapping";
public static final String BOOKS_MAPPING_NOT_MODIFIED_STATE = "Books Mapping Not Changed";
public static final String BOOKS_MAPPING_NOT_FOUND_STATE = "Books Mapping Not Found";

private static Integer port;
Expand Down Expand Up @@ -96,13 +103,48 @@ public void testProcessorRetainsPreviousMappingOnRefreshFailure() throws Malform
assertThat(targetField2).containsAll(Arrays.asList("Oliver Twist", "A Christmas Carol", "The Chimes"));
}

@Test
public void testProcessorRetainsPreviousMappingOnNotModifiedResponse() throws MalformedURLException, InterruptedException {
setUpBooksMappingNotModifiedScenario();

ExternalMappingSourceProcessor.Configuration config = new ExternalMappingSourceProcessor.Configuration(
SOURCE_FIELD_NAME, TARGET_FIELD_NAME,
"http://localhost:" + port + BOOKS_MAPPING, DISABLE_MAPPING_REFRESH
);
ExternalMappingSourceProcessor processor = new ExternalMappingSourceProcessor(config);

Doc doc1 = createDoc(SOURCE_FIELD_NAME, "Jack London");
/* invokes refreshExternalMapping() under the hood */
processor.process(doc1);

assertThat(doc1.hasField(TARGET_FIELD_NAME)).isTrue();
Iterable<String> targetField1 = doc1.getField(TARGET_FIELD_NAME);
assertThat(targetField1).containsAll(Arrays.asList("White Fang", "Martin Eden", "The Sea Wolf"));

/* second call to the refresh mapping endpoint return 304 */
processor.refreshExternalMapping();
Doc doc2 = createDoc(SOURCE_FIELD_NAME, "Jack London");
processor.process(doc2);

/* processor should retain previous mapping when GET external-mapping request return 304 status */
assertThat(doc2.hasField(TARGET_FIELD_NAME)).isTrue();
Iterable<String> targetField2 = doc2.getField(TARGET_FIELD_NAME);
assertThat(targetField2).containsAll(Arrays.asList("White Fang", "Martin Eden", "The Sea Wolf"));

/* error tags shouldn't be present in the document */
assertThat(doc2.hasField("tags")).isFalse();
}

private void setUpBooksMappingRefreshScenario() {
wireMockRule.stubFor(get(BOOKS_MAPPING).inScenario(MAPPING_REFRESH_SCENARIO)
.whenScenarioStateIs(STARTED)
.willReturn(
aResponse()
.withStatus(200)
.withHeader("Content-Type", "text/plain; charset=utf-8")
.withHeader(HttpHeaders.CONTENT_TYPE, "text/plain; charset=utf-8")
.withHeader(HttpHeaders.LAST_MODIFIED,
DateTimeFormatter.RFC_1123_DATE_TIME.format(ZonedDateTime.ofInstant(Instant.now(), ZoneOffset.UTC))
)
.withBodyFile("books_mapping.properties")
)
.willSetStateTo(UPDATED_BOOKS_MAPPING_STATE)
Expand All @@ -113,7 +155,10 @@ private void setUpBooksMappingRefreshScenario() {
.willReturn(
aResponse()
.withStatus(200)
.withHeader("Content-Type", "text/plain; charset=utf-8")
.withHeader(HttpHeaders.CONTENT_TYPE, "text/plain; charset=utf-8")
.withHeader(HttpHeaders.LAST_MODIFIED,
DateTimeFormatter.RFC_1123_DATE_TIME.format(ZonedDateTime.ofInstant(Instant.now(), ZoneOffset.UTC))
)
.withBodyFile("updated_books_mapping.properties")
)
);
Expand All @@ -125,7 +170,7 @@ private void setUpBooksMappingNotFoundAfterRefresh() {
.willReturn(
aResponse()
.withStatus(200)
.withHeader("Content-Type", "text/plain; charset=utf-8")
.withHeader(HttpHeaders.CONTENT_TYPE, "text/plain; charset=utf-8")
.withBodyFile("books_mapping.properties")
)
.willSetStateTo(BOOKS_MAPPING_NOT_FOUND_STATE)
Expand All @@ -140,4 +185,29 @@ private void setUpBooksMappingNotFoundAfterRefresh() {
)
);
}

private void setUpBooksMappingNotModifiedScenario() {
wireMockRule.stubFor(get(BOOKS_MAPPING).inScenario(MAPPING_NOT_MODIFIED_AFTER_REFRESH_SCENARIO)
.whenScenarioStateIs(STARTED)
.willReturn(
aResponse()
.withStatus(200)
.withHeader(HttpHeaders.CONTENT_TYPE, "text/plain; charset=utf-8")
.withBodyFile("books_mapping.properties")
)
.willSetStateTo(BOOKS_MAPPING_NOT_MODIFIED_STATE)
);

wireMockRule.stubFor(get(BOOKS_MAPPING).inScenario(MAPPING_NOT_MODIFIED_AFTER_REFRESH_SCENARIO)
.whenScenarioStateIs(BOOKS_MAPPING_NOT_MODIFIED_STATE)
.willReturn(
aResponse()
.withStatus(304)
.withBody("Not Modified")
.withHeader(HttpHeaders.LAST_MODIFIED,
DateTimeFormatter.RFC_1123_DATE_TIME.format(ZonedDateTime.ofInstant(Instant.now(), ZoneOffset.UTC))
)
)
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import java.util.Arrays;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.HttpHeaders;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
Expand Down Expand Up @@ -221,7 +222,7 @@ private void setUpBooksMappingStub() {
.willReturn(
aResponse()
.withStatus(200)
.withHeader("Content-Type", "text/plain; charset=utf-8")
.withHeader(HttpHeaders.CONTENT_TYPE, "text/plain; charset=utf-8")
.withBodyFile("books_mapping.properties")
)
);
Expand Down

0 comments on commit f1d208e

Please sign in to comment.