Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize downloader task executor #115355

Merged
merged 4 commits into from
Oct 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,14 @@
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.RemoteTransportException;

import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;

import static org.elasticsearch.ingest.geoip.GeoIpDownloader.DATABASES_INDEX;
import static org.elasticsearch.ingest.geoip.GeoIpDownloader.GEOIP_DOWNLOADER;
Expand Down Expand Up @@ -238,14 +239,11 @@ public void clusterChanged(ClusterChangedEvent event) {
}

static boolean hasAtLeastOneGeoipProcessor(ClusterState clusterState) {
if (pipelineConfigurationsWithGeoIpProcessor(clusterState, true).isEmpty() == false) {
if (pipelinesWithGeoIpProcessor(clusterState, true).isEmpty() == false) {
return true;
}

Set<String> checkReferencedPipelines = pipelineConfigurationsWithGeoIpProcessor(clusterState, false).stream()
.map(PipelineConfiguration::getId)
.collect(Collectors.toSet());

final Set<String> checkReferencedPipelines = pipelinesWithGeoIpProcessor(clusterState, false);
if (checkReferencedPipelines.isEmpty()) {
return false;
}
Expand All @@ -258,22 +256,24 @@ static boolean hasAtLeastOneGeoipProcessor(ClusterState clusterState) {
}

/**
* Retrieve list of pipelines that have at least one geoip processor.
* Retrieve the set of pipeline ids that have at least one geoip processor.
* @param clusterState Cluster state.
* @param downloadDatabaseOnPipelineCreation Filter the list to include only pipeline with the download_database_on_pipeline_creation
* matching the param.
* @return A list of {@link PipelineConfiguration} matching criteria.
* @return A set of pipeline ids matching criteria.
*/
@SuppressWarnings("unchecked")
private static List<PipelineConfiguration> pipelineConfigurationsWithGeoIpProcessor(
ClusterState clusterState,
boolean downloadDatabaseOnPipelineCreation
) {
List<PipelineConfiguration> pipelineDefinitions = IngestService.getPipelines(clusterState);
return pipelineDefinitions.stream().filter(pipelineConfig -> {
List<Map<String, Object>> processors = (List<Map<String, Object>>) pipelineConfig.getConfigAsMap().get(Pipeline.PROCESSORS_KEY);
return hasAtLeastOneGeoipProcessor(processors, downloadDatabaseOnPipelineCreation);
}).toList();
private static Set<String> pipelinesWithGeoIpProcessor(ClusterState clusterState, boolean downloadDatabaseOnPipelineCreation) {
List<PipelineConfiguration> configurations = IngestService.getPipelines(clusterState);
Set<String> ids = new HashSet<>();
// note: this loop is unrolled rather than streaming-style because it's hot enough to show up in a flamegraph
for (PipelineConfiguration configuration : configurations) {
List<Map<String, Object>> processors = (List<Map<String, Object>>) configuration.getConfigAsMap().get(Pipeline.PROCESSORS_KEY);
if (hasAtLeastOneGeoipProcessor(processors, downloadDatabaseOnPipelineCreation)) {
ids.add(configuration.getId());
}
}
return Collections.unmodifiableSet(ids);
}

/**
Expand All @@ -283,7 +283,15 @@ private static List<PipelineConfiguration> pipelineConfigurationsWithGeoIpProces
* @return true if a geoip processor is found in the processor list.
*/
private static boolean hasAtLeastOneGeoipProcessor(List<Map<String, Object>> processors, boolean downloadDatabaseOnPipelineCreation) {
return processors != null && processors.stream().anyMatch(p -> hasAtLeastOneGeoipProcessor(p, downloadDatabaseOnPipelineCreation));
if (processors != null) {
// note: this loop is unrolled rather than streaming-style because it's hot enough to show up in a flamegraph
for (Map<String, Object> processor : processors) {
if (hasAtLeastOneGeoipProcessor(processor, downloadDatabaseOnPipelineCreation)) {
return true;
}
}
}
return false;
}

/**
Expand Down Expand Up @@ -317,7 +325,7 @@ private static boolean hasAtLeastOneGeoipProcessor(Map<String, Object> processor
}

/**
* Check if a processor config is has an on_failure clause containing at least a geoip processor.
* Check if a processor config has an on_failure clause containing at least a geoip processor.
* @param processor Processor config.
* @param downloadDatabaseOnPipelineCreation Should the download_database_on_pipeline_creation of the geoip processor be true or false.
* @return true if a geoip processor is found in the processor list.
Expand All @@ -327,16 +335,17 @@ private static boolean isProcessorWithOnFailureGeoIpProcessor(
Map<String, Object> processor,
boolean downloadDatabaseOnPipelineCreation
) {
return processor != null
&& processor.values()
.stream()
.anyMatch(
value -> value instanceof Map
&& hasAtLeastOneGeoipProcessor(
((Map<String, List<Map<String, Object>>>) value).get("on_failure"),
downloadDatabaseOnPipelineCreation
)
);
// note: this loop is unrolled rather than streaming-style because it's hot enough to show up in a flamegraph
for (Object value : processor.values()) {
if (value instanceof Map
&& hasAtLeastOneGeoipProcessor(
((Map<String, List<Map<String, Object>>>) value).get("on_failure"),
downloadDatabaseOnPipelineCreation
)) {
return true;
}
}
return false;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.util.Maps;
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
Expand Down Expand Up @@ -239,7 +240,7 @@ private static Boolean readBoolean(String processorType, String processorTag, St
processorType,
processorTag,
propertyName,
"property isn't a boolean, but of type [" + value.getClass().getName() + "]"
Strings.format("property isn't a boolean, but of type [%s]", value.getClass().getName())
);
}

Expand Down