Skip to content

Commit

Permalink
Reduce heap-memory usage of ingest-geoip plugin (#28963)
Browse files Browse the repository at this point in the history
With this commit we reduce heap usage of the ingest-geoip plugin by
memory-mapping the database files. Previously, we have stored these
files gzip-compressed but this has resulted that data are loaded on the
heap.

Closes #28782
  • Loading branch information
danielmitterdorfer committed Mar 12, 2018
1 parent b8a9640 commit bf91962
Show file tree
Hide file tree
Showing 7 changed files with 66 additions and 60 deletions.
8 changes: 4 additions & 4 deletions docs/plugins/ingest-geoip.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ The ingest-geoip plugin ships by default with the GeoLite2 City, GeoLite2 Countr
under the CCA-ShareAlike 4.0 license. For more details see, http://dev.maxmind.com/geoip/geoip2/geolite2/

The GeoIP processor can run with other geoip2 databases from Maxmind. The files must be copied into the geoip config directory,
and the `database_file` option should be used to specify the filename of the custom database. Custom database files must be compressed
with gzip. The geoip config directory is located at `$ES_HOME/config/ingest-geoip` and holds the shipped databases too.
and the `database_file` option should be used to specify the filename of the custom database. Custom database files must be stored
uncompressed. The geoip config directory is located at `$ES_HOME/config/ingest-geoip` and holds the shipped databases too.

:plugin_name: ingest-geoip
include::install_remove.asciidoc[]
Expand All @@ -25,7 +25,7 @@ include::install_remove.asciidoc[]
| Name | Required | Default | Description
| `field` | yes | - | The field to get the ip address from for the geographical lookup.
| `target_field` | no | geoip | The field that will hold the geographical information looked up from the Maxmind database.
| `database_file` | no | GeoLite2-City.mmdb | The database filename in the geoip config directory. The ingest-geoip plugin ships with the GeoLite2-City.mmdb.gz and GeoLite2-Country.mmdb.gz files.
| `database_file` | no | GeoLite2-City.mmdb | The database filename in the geoip config directory. The ingest-geoip plugin ships with the GeoLite2-City.mmdb, GeoLite2-Country.mmdb and GeoLite2-ASN.mmdb files.
| `properties` | no | [`continent_name`, `country_iso_code`, `region_name`, `city_name`, `location`] * | Controls what properties are added to the `target_field` based on the geoip lookup.
| `ignore_missing` | no | `false` | If `true` and `field` does not exist, the processor quietly exits without modifying the document
|======
Expand Down Expand Up @@ -101,7 +101,7 @@ PUT _ingest/pipeline/geoip
"geoip" : {
"field" : "ip",
"target_field" : "geo",
"database_file" : "GeoLite2-Country.mmdb.gz"
"database_file" : "GeoLite2-Country.mmdb"
}
}
]
Expand Down
4 changes: 2 additions & 2 deletions plugins/ingest-geoip/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,13 @@ dependencies {
compile("com.fasterxml.jackson.core:jackson-databind:${versions.jackson}")
compile('com.maxmind.db:maxmind-db:1.2.2')

testCompile 'org.elasticsearch:geolite2-databases:20171206'
testCompile 'org.elasticsearch:geolite2-databases:20180303'
}

task copyDefaultGeoIp2DatabaseFiles(type: Copy) {
from { zipTree(configurations.testCompile.files.find { it.name.contains('geolite2-databases')}) }
into "${project.buildDir}/ingest-geoip"
include "*.mmdb.gz"
include "*.mmdb"
}

project.bundlePlugin.dependsOn(copyDefaultGeoIp2DatabaseFiles)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.Processor;

import java.io.IOException;
import java.net.InetAddress;
import java.security.AccessController;
import java.security.PrivilegedAction;
Expand Down Expand Up @@ -68,8 +67,7 @@ public final class GeoIpProcessor extends AbstractProcessor {
private final Set<Property> properties;
private final boolean ignoreMissing;

GeoIpProcessor(String tag, String field, DatabaseReader dbReader, String targetField, Set<Property> properties,
boolean ignoreMissing) throws IOException {
GeoIpProcessor(String tag, String field, DatabaseReader dbReader, String targetField, Set<Property> properties, boolean ignoreMissing) {
super(tag);
this.field = field;
this.targetField = targetField;
Expand Down Expand Up @@ -323,7 +321,7 @@ public GeoIpProcessor create(Map<String, Processor.Factory> registry, String pro
Map<String, Object> config) throws Exception {
String ipField = readStringProperty(TYPE, processorTag, config, "field");
String targetField = readStringProperty(TYPE, processorTag, config, "target_field", "geoip");
String databaseFile = readStringProperty(TYPE, processorTag, config, "database_file", "GeoLite2-City.mmdb.gz");
String databaseFile = readStringProperty(TYPE, processorTag, config, "database_file", "GeoLite2-City.mmdb");
List<String> propertyNames = readOptionalList(TYPE, processorTag, config, "properties");
boolean ignoreMissing = readBooleanProperty(TYPE, processorTag, config, "ignore_missing", false);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,28 +21,28 @@

import com.maxmind.db.NoCache;
import com.maxmind.db.NodeCache;
import com.maxmind.db.Reader;
import com.maxmind.geoip2.DatabaseReader;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.common.Booleans;
import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.ingest.Processor;
import org.elasticsearch.plugins.IngestPlugin;
import org.elasticsearch.plugins.Plugin;

import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.PathMatcher;
import java.nio.file.StandardOpenOption;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.stream.Stream;
import java.util.zip.GZIPInputStream;

public class IngestGeoIpPlugin extends Plugin implements IngestPlugin, Closeable {
public static final Setting<Long> CACHE_SIZE =
Expand Down Expand Up @@ -80,28 +80,38 @@ static Map<String, DatabaseReaderLazyLoader> loadDatabaseReaders(Path geoIpConfi
if (Files.exists(geoIpConfigDirectory) == false && Files.isDirectory(geoIpConfigDirectory)) {
throw new IllegalStateException("the geoip directory [" + geoIpConfigDirectory + "] containing databases doesn't exist");
}

boolean loadDatabaseOnHeap = Booleans.parseBoolean(System.getProperty("es.geoip.load_db_on_heap", "false"));
Map<String, DatabaseReaderLazyLoader> databaseReaders = new HashMap<>();
try (Stream<Path> databaseFiles = Files.list(geoIpConfigDirectory)) {
PathMatcher pathMatcher = geoIpConfigDirectory.getFileSystem().getPathMatcher("glob:**.mmdb.gz");
PathMatcher pathMatcher = geoIpConfigDirectory.getFileSystem().getPathMatcher("glob:**.mmdb");
// Use iterator instead of forEach otherwise IOException needs to be caught twice...
Iterator<Path> iterator = databaseFiles.iterator();
while (iterator.hasNext()) {
Path databasePath = iterator.next();
if (Files.isRegularFile(databasePath) && pathMatcher.matches(databasePath)) {
String databaseFileName = databasePath.getFileName().toString();
DatabaseReaderLazyLoader holder = new DatabaseReaderLazyLoader(databaseFileName, () -> {
try (InputStream inputStream = new GZIPInputStream(Files.newInputStream(databasePath, StandardOpenOption.READ))) {
return new DatabaseReader.Builder(inputStream).withCache(cache).build();
}
});
DatabaseReaderLazyLoader holder = new DatabaseReaderLazyLoader(databaseFileName,
() -> {
DatabaseReader.Builder builder = createDatabaseBuilder(databasePath).withCache(cache);
if (loadDatabaseOnHeap) {
builder.fileMode(Reader.FileMode.MEMORY);
} else {
builder.fileMode(Reader.FileMode.MEMORY_MAPPED);
}
return builder.build();
});
databaseReaders.put(databaseFileName, holder);
}
}
}
return Collections.unmodifiableMap(databaseReaders);
}

@SuppressForbidden(reason = "Maxmind API requires java.io.File")
private static DatabaseReader.Builder createDatabaseBuilder(Path databasePath) {
return new DatabaseReader.Builder(databasePath.toFile());
}

@Override
public void close() throws IOException {
if (databaseReaders != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,12 @@ public static void loadDatabaseReaders() throws IOException {
Path configDir = createTempDir();
Path geoIpConfigDir = configDir.resolve("ingest-geoip");
Files.createDirectories(geoIpConfigDir);
Files.copy(new ByteArrayInputStream(StreamsUtils.copyToBytesFromClasspath("/GeoLite2-City.mmdb.gz")),
geoIpConfigDir.resolve("GeoLite2-City.mmdb.gz"));
Files.copy(new ByteArrayInputStream(StreamsUtils.copyToBytesFromClasspath("/GeoLite2-Country.mmdb.gz")),
geoIpConfigDir.resolve("GeoLite2-Country.mmdb.gz"));
Files.copy(new ByteArrayInputStream(StreamsUtils.copyToBytesFromClasspath("/GeoLite2-ASN.mmdb.gz")),
geoIpConfigDir.resolve("GeoLite2-ASN.mmdb.gz"));
Files.copy(new ByteArrayInputStream(StreamsUtils.copyToBytesFromClasspath("/GeoLite2-City.mmdb")),
geoIpConfigDir.resolve("GeoLite2-City.mmdb"));
Files.copy(new ByteArrayInputStream(StreamsUtils.copyToBytesFromClasspath("/GeoLite2-Country.mmdb")),
geoIpConfigDir.resolve("GeoLite2-Country.mmdb"));
Files.copy(new ByteArrayInputStream(StreamsUtils.copyToBytesFromClasspath("/GeoLite2-ASN.mmdb")),
geoIpConfigDir.resolve("GeoLite2-ASN.mmdb"));

NodeCache cache = randomFrom(NoCache.getInstance(), new GeoIpCache(randomNonNegativeLong()));
databaseReaders = IngestGeoIpPlugin.loadDatabaseReaders(geoIpConfigDir, cache);
Expand Down Expand Up @@ -111,7 +111,7 @@ public void testCountryBuildDefaults() throws Exception {

Map<String, Object> config = new HashMap<>();
config.put("field", "_field");
config.put("database_file", "GeoLite2-Country.mmdb.gz");
config.put("database_file", "GeoLite2-Country.mmdb");
String processorTag = randomAlphaOfLength(10);

GeoIpProcessor processor = factory.create(null, processorTag, config);
Expand All @@ -129,7 +129,7 @@ public void testAsnBuildDefaults() throws Exception {

Map<String, Object> config = new HashMap<>();
config.put("field", "_field");
config.put("database_file", "GeoLite2-ASN.mmdb.gz");
config.put("database_file", "GeoLite2-ASN.mmdb");
String processorTag = randomAlphaOfLength(10);

GeoIpProcessor processor = factory.create(null, processorTag, config);
Expand Down Expand Up @@ -157,7 +157,7 @@ public void testBuildDbFile() throws Exception {
GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseReaders);
Map<String, Object> config = new HashMap<>();
config.put("field", "_field");
config.put("database_file", "GeoLite2-Country.mmdb.gz");
config.put("database_file", "GeoLite2-Country.mmdb");
GeoIpProcessor processor = factory.create(null, null, config);
assertThat(processor.getField(), equalTo("_field"));
assertThat(processor.getTargetField(), equalTo("geoip"));
Expand All @@ -170,7 +170,7 @@ public void testBuildWithCountryDbAndAsnFields() throws Exception {
GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseReaders);
Map<String, Object> config = new HashMap<>();
config.put("field", "_field");
config.put("database_file", "GeoLite2-Country.mmdb.gz");
config.put("database_file", "GeoLite2-Country.mmdb");
EnumSet<GeoIpProcessor.Property> asnOnlyProperties = EnumSet.copyOf(GeoIpProcessor.Property.ALL_ASN_PROPERTIES);
asnOnlyProperties.remove(GeoIpProcessor.Property.IP);
String asnProperty = RandomPicks.randomFrom(Randomness.get(), asnOnlyProperties).toString();
Expand All @@ -184,7 +184,7 @@ public void testBuildWithAsnDbAndCityFields() throws Exception {
GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseReaders);
Map<String, Object> config = new HashMap<>();
config.put("field", "_field");
config.put("database_file", "GeoLite2-ASN.mmdb.gz");
config.put("database_file", "GeoLite2-ASN.mmdb");
EnumSet<GeoIpProcessor.Property> cityOnlyProperties = EnumSet.copyOf(GeoIpProcessor.Property.ALL_CITY_PROPERTIES);
cityOnlyProperties.remove(GeoIpProcessor.Property.IP);
String cityProperty = RandomPicks.randomFrom(Randomness.get(), cityOnlyProperties).toString();
Expand All @@ -199,9 +199,9 @@ public void testBuildNonExistingDbFile() throws Exception {

Map<String, Object> config = new HashMap<>();
config.put("field", "_field");
config.put("database_file", "does-not-exist.mmdb.gz");
config.put("database_file", "does-not-exist.mmdb");
Exception e = expectThrows(ElasticsearchParseException.class, () -> factory.create(null, null, config));
assertThat(e.getMessage(), equalTo("[database_file] database file [does-not-exist.mmdb.gz] doesn't exist"));
assertThat(e.getMessage(), equalTo("[database_file] database file [does-not-exist.mmdb] doesn't exist"));
}

public void testBuildFields() throws Exception {
Expand Down Expand Up @@ -249,12 +249,12 @@ public void testLazyLoading() throws Exception {
Path configDir = createTempDir();
Path geoIpConfigDir = configDir.resolve("ingest-geoip");
Files.createDirectories(geoIpConfigDir);
Files.copy(new ByteArrayInputStream(StreamsUtils.copyToBytesFromClasspath("/GeoLite2-City.mmdb.gz")),
geoIpConfigDir.resolve("GeoLite2-City.mmdb.gz"));
Files.copy(new ByteArrayInputStream(StreamsUtils.copyToBytesFromClasspath("/GeoLite2-Country.mmdb.gz")),
geoIpConfigDir.resolve("GeoLite2-Country.mmdb.gz"));
Files.copy(new ByteArrayInputStream(StreamsUtils.copyToBytesFromClasspath("/GeoLite2-ASN.mmdb.gz")),
geoIpConfigDir.resolve("GeoLite2-ASN.mmdb.gz"));
Files.copy(new ByteArrayInputStream(StreamsUtils.copyToBytesFromClasspath("/GeoLite2-City.mmdb")),
geoIpConfigDir.resolve("GeoLite2-City.mmdb"));
Files.copy(new ByteArrayInputStream(StreamsUtils.copyToBytesFromClasspath("/GeoLite2-Country.mmdb")),
geoIpConfigDir.resolve("GeoLite2-Country.mmdb"));
Files.copy(new ByteArrayInputStream(StreamsUtils.copyToBytesFromClasspath("/GeoLite2-ASN.mmdb")),
geoIpConfigDir.resolve("GeoLite2-ASN.mmdb"));

// Loading another database reader instances, because otherwise we can't test lazy loading as the
// database readers used at class level are reused between tests. (we want to keep that otherwise running this
Expand All @@ -268,15 +268,15 @@ public void testLazyLoading() throws Exception {

Map<String, Object> config = new HashMap<>();
config.put("field", "_field");
config.put("database_file", "GeoLite2-City.mmdb.gz");
config.put("database_file", "GeoLite2-City.mmdb");
factory.create(null, "_tag", config);
config = new HashMap<>();
config.put("field", "_field");
config.put("database_file", "GeoLite2-Country.mmdb.gz");
config.put("database_file", "GeoLite2-Country.mmdb");
factory.create(null, "_tag", config);
config = new HashMap<>();
config.put("field", "_field");
config.put("database_file", "GeoLite2-ASN.mmdb.gz");
config.put("database_file", "GeoLite2-ASN.mmdb");
factory.create(null, "_tag", config);

for (DatabaseReaderLazyLoader lazyLoader : databaseReaders.values()) {
Expand Down
Loading

0 comments on commit bf91962

Please sign in to comment.