Skip to content

Commit

Permalink
Optimize downloader task executor (elastic#115355)
Browse files Browse the repository at this point in the history
  • Loading branch information
joegallo committed Oct 22, 2024
1 parent f9a1561 commit 71a1475
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,14 @@
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.RemoteTransportException;

import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;

import static org.elasticsearch.ingest.geoip.GeoIpDownloader.DATABASES_INDEX;
import static org.elasticsearch.ingest.geoip.GeoIpDownloader.GEOIP_DOWNLOADER;
Expand Down Expand Up @@ -238,14 +239,11 @@ public void clusterChanged(ClusterChangedEvent event) {
}

static boolean hasAtLeastOneGeoipProcessor(ClusterState clusterState) {
if (pipelineConfigurationsWithGeoIpProcessor(clusterState, true).isEmpty() == false) {
if (pipelinesWithGeoIpProcessor(clusterState, true).isEmpty() == false) {
return true;
}

Set<String> checkReferencedPipelines = pipelineConfigurationsWithGeoIpProcessor(clusterState, false).stream()
.map(PipelineConfiguration::getId)
.collect(Collectors.toSet());

final Set<String> checkReferencedPipelines = pipelinesWithGeoIpProcessor(clusterState, false);
if (checkReferencedPipelines.isEmpty()) {
return false;
}
Expand All @@ -258,22 +256,24 @@ static boolean hasAtLeastOneGeoipProcessor(ClusterState clusterState) {
}

/**
* Retrieve list of pipelines that have at least one geoip processor.
* Retrieve the set of pipeline ids that have at least one geoip processor.
* @param clusterState Cluster state.
* @param downloadDatabaseOnPipelineCreation Filter the list to include only pipeline with the download_database_on_pipeline_creation
* matching the param.
* @return A list of {@link PipelineConfiguration} matching criteria.
* @return A set of pipeline ids matching criteria.
*/
@SuppressWarnings("unchecked")
private static List<PipelineConfiguration> pipelineConfigurationsWithGeoIpProcessor(
ClusterState clusterState,
boolean downloadDatabaseOnPipelineCreation
) {
List<PipelineConfiguration> pipelineDefinitions = IngestService.getPipelines(clusterState);
return pipelineDefinitions.stream().filter(pipelineConfig -> {
List<Map<String, Object>> processors = (List<Map<String, Object>>) pipelineConfig.getConfigAsMap().get(Pipeline.PROCESSORS_KEY);
return hasAtLeastOneGeoipProcessor(processors, downloadDatabaseOnPipelineCreation);
}).toList();
private static Set<String> pipelinesWithGeoIpProcessor(ClusterState clusterState, boolean downloadDatabaseOnPipelineCreation) {
List<PipelineConfiguration> configurations = IngestService.getPipelines(clusterState);
Set<String> ids = new HashSet<>();
// note: this loop is unrolled rather than streaming-style because it's hot enough to show up in a flamegraph
for (PipelineConfiguration configuration : configurations) {
List<Map<String, Object>> processors = (List<Map<String, Object>>) configuration.getConfigAsMap().get(Pipeline.PROCESSORS_KEY);
if (hasAtLeastOneGeoipProcessor(processors, downloadDatabaseOnPipelineCreation)) {
ids.add(configuration.getId());
}
}
return Collections.unmodifiableSet(ids);
}

/**
Expand All @@ -283,7 +283,15 @@ private static List<PipelineConfiguration> pipelineConfigurationsWithGeoIpProces
* @return true if a geoip processor is found in the processor list.
*/
private static boolean hasAtLeastOneGeoipProcessor(List<Map<String, Object>> processors, boolean downloadDatabaseOnPipelineCreation) {
return processors != null && processors.stream().anyMatch(p -> hasAtLeastOneGeoipProcessor(p, downloadDatabaseOnPipelineCreation));
if (processors != null) {
// note: this loop is unrolled rather than streaming-style because it's hot enough to show up in a flamegraph
for (Map<String, Object> processor : processors) {
if (hasAtLeastOneGeoipProcessor(processor, downloadDatabaseOnPipelineCreation)) {
return true;
}
}
}
return false;
}

/**
Expand Down Expand Up @@ -317,7 +325,7 @@ private static boolean hasAtLeastOneGeoipProcessor(Map<String, Object> processor
}

/**
* Check if a processor config is has an on_failure clause containing at least a geoip processor.
* Check if a processor config has an on_failure clause containing at least a geoip processor.
* @param processor Processor config.
* @param downloadDatabaseOnPipelineCreation Should the download_database_on_pipeline_creation of the geoip processor be true or false.
* @return true if a geoip processor is found in the processor list.
Expand All @@ -327,16 +335,17 @@ private static boolean isProcessorWithOnFailureGeoIpProcessor(
Map<String, Object> processor,
boolean downloadDatabaseOnPipelineCreation
) {
return processor != null
&& processor.values()
.stream()
.anyMatch(
value -> value instanceof Map
&& hasAtLeastOneGeoipProcessor(
((Map<String, List<Map<String, Object>>>) value).get("on_failure"),
downloadDatabaseOnPipelineCreation
)
);
// note: this loop is unrolled rather than streaming-style because it's hot enough to show up in a flamegraph
for (Object value : processor.values()) {
if (value instanceof Map
&& hasAtLeastOneGeoipProcessor(
((Map<String, List<Map<String, Object>>>) value).get("on_failure"),
downloadDatabaseOnPipelineCreation
)) {
return true;
}
}
return false;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.util.Maps;
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
Expand Down Expand Up @@ -239,7 +240,7 @@ private static Boolean readBoolean(String processorType, String processorTag, St
processorType,
processorTag,
propertyName,
"property isn't a boolean, but of type [" + value.getClass().getName() + "]"
Strings.format("property isn't a boolean, but of type [%s]", value.getClass().getName())
);
}

Expand Down

0 comments on commit 71a1475

Please sign in to comment.