From 71a1475b13de1c96c00310cad101fb3296d3d01e Mon Sep 17 00:00:00 2001 From: Joe Gallo Date: Tue, 22 Oct 2024 16:05:01 -0400 Subject: [PATCH] Optimize downloader task executor (#115355) --- .../geoip/GeoIpDownloaderTaskExecutor.java | 67 +++++++++++-------- .../ingest/ConfigurationUtils.java | 3 +- 2 files changed, 40 insertions(+), 30 deletions(-) diff --git a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderTaskExecutor.java b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderTaskExecutor.java index 0e5248924c327..39683bcb7ff24 100644 --- a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderTaskExecutor.java +++ b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderTaskExecutor.java @@ -43,13 +43,14 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.RemoteTransportException; +import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; -import java.util.stream.Collectors; import static org.elasticsearch.ingest.geoip.GeoIpDownloader.DATABASES_INDEX; import static org.elasticsearch.ingest.geoip.GeoIpDownloader.GEOIP_DOWNLOADER; @@ -238,14 +239,11 @@ public void clusterChanged(ClusterChangedEvent event) { } static boolean hasAtLeastOneGeoipProcessor(ClusterState clusterState) { - if (pipelineConfigurationsWithGeoIpProcessor(clusterState, true).isEmpty() == false) { + if (pipelinesWithGeoIpProcessor(clusterState, true).isEmpty() == false) { return true; } - Set checkReferencedPipelines = pipelineConfigurationsWithGeoIpProcessor(clusterState, false).stream() - .map(PipelineConfiguration::getId) - .collect(Collectors.toSet()); - + final Set checkReferencedPipelines = pipelinesWithGeoIpProcessor(clusterState, false); if (checkReferencedPipelines.isEmpty()) { return false; } @@ -258,22 +256,24 @@ static boolean hasAtLeastOneGeoipProcessor(ClusterState clusterState) { } /** - * Retrieve list of pipelines that have at least one geoip processor. + * Retrieve the set of pipeline ids that have at least one geoip processor. * @param clusterState Cluster state. * @param downloadDatabaseOnPipelineCreation Filter the list to include only pipeline with the download_database_on_pipeline_creation * matching the param. - * @return A list of {@link PipelineConfiguration} matching criteria. + * @return A set of pipeline ids matching criteria. */ @SuppressWarnings("unchecked") - private static List pipelineConfigurationsWithGeoIpProcessor( - ClusterState clusterState, - boolean downloadDatabaseOnPipelineCreation - ) { - List pipelineDefinitions = IngestService.getPipelines(clusterState); - return pipelineDefinitions.stream().filter(pipelineConfig -> { - List> processors = (List>) pipelineConfig.getConfigAsMap().get(Pipeline.PROCESSORS_KEY); - return hasAtLeastOneGeoipProcessor(processors, downloadDatabaseOnPipelineCreation); - }).toList(); + private static Set pipelinesWithGeoIpProcessor(ClusterState clusterState, boolean downloadDatabaseOnPipelineCreation) { + List configurations = IngestService.getPipelines(clusterState); + Set ids = new HashSet<>(); + // note: this loop is unrolled rather than streaming-style because it's hot enough to show up in a flamegraph + for (PipelineConfiguration configuration : configurations) { + List> processors = (List>) configuration.getConfigAsMap().get(Pipeline.PROCESSORS_KEY); + if (hasAtLeastOneGeoipProcessor(processors, downloadDatabaseOnPipelineCreation)) { + ids.add(configuration.getId()); + } + } + return Collections.unmodifiableSet(ids); } /** @@ -283,7 +283,15 @@ private static List pipelineConfigurationsWithGeoIpProces * @return true if a geoip processor is found in the processor list. */ private static boolean hasAtLeastOneGeoipProcessor(List> processors, boolean downloadDatabaseOnPipelineCreation) { - return processors != null && processors.stream().anyMatch(p -> hasAtLeastOneGeoipProcessor(p, downloadDatabaseOnPipelineCreation)); + if (processors != null) { + // note: this loop is unrolled rather than streaming-style because it's hot enough to show up in a flamegraph + for (Map processor : processors) { + if (hasAtLeastOneGeoipProcessor(processor, downloadDatabaseOnPipelineCreation)) { + return true; + } + } + } + return false; } /** @@ -317,7 +325,7 @@ private static boolean hasAtLeastOneGeoipProcessor(Map processor } /** - * Check if a processor config is has an on_failure clause containing at least a geoip processor. + * Check if a processor config has an on_failure clause containing at least a geoip processor. * @param processor Processor config. * @param downloadDatabaseOnPipelineCreation Should the download_database_on_pipeline_creation of the geoip processor be true or false. * @return true if a geoip processor is found in the processor list. @@ -327,16 +335,17 @@ private static boolean isProcessorWithOnFailureGeoIpProcessor( Map processor, boolean downloadDatabaseOnPipelineCreation ) { - return processor != null - && processor.values() - .stream() - .anyMatch( - value -> value instanceof Map - && hasAtLeastOneGeoipProcessor( - ((Map>>) value).get("on_failure"), - downloadDatabaseOnPipelineCreation - ) - ); + // note: this loop is unrolled rather than streaming-style because it's hot enough to show up in a flamegraph + for (Object value : processor.values()) { + if (value instanceof Map + && hasAtLeastOneGeoipProcessor( + ((Map>>) value).get("on_failure"), + downloadDatabaseOnPipelineCreation + )) { + return true; + } + } + return false; } /** diff --git a/server/src/main/java/org/elasticsearch/ingest/ConfigurationUtils.java b/server/src/main/java/org/elasticsearch/ingest/ConfigurationUtils.java index 5059272aa2e23..97a68d9807688 100644 --- a/server/src/main/java/org/elasticsearch/ingest/ConfigurationUtils.java +++ b/server/src/main/java/org/elasticsearch/ingest/ConfigurationUtils.java @@ -12,6 +12,7 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.common.Strings; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.util.Maps; import org.elasticsearch.common.xcontent.LoggingDeprecationHandler; @@ -239,7 +240,7 @@ private static Boolean readBoolean(String processorType, String processorTag, St processorType, processorTag, propertyName, - "property isn't a boolean, but of type [" + value.getClass().getName() + "]" + Strings.format("property isn't a boolean, but of type [%s]", value.getClass().getName()) ); }