Skip to content

Commit

Permalink
Merge branch 'main' into retrievers
Browse files Browse the repository at this point in the history
  • Loading branch information
jdconrad committed Feb 28, 2024
2 parents c6de697 + 0fb3a6e commit 4f8cd27
Show file tree
Hide file tree
Showing 12 changed files with 93 additions and 87 deletions.
24 changes: 14 additions & 10 deletions distribution/archives/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import java.nio.file.Path

apply plugin: 'elasticsearch.internal-distribution-archive-setup'

CopySpec archiveFiles(CopySpec modulesFiles, String distributionType, String platform, String architecture, boolean isTestDistro) {
CopySpec archiveFiles(String distributionType, String os, String architecture, boolean isTestDistro) {
return copySpec {
into("elasticsearch-${version}") {
into('lib') {
Expand All @@ -29,9 +29,9 @@ CopySpec archiveFiles(CopySpec modulesFiles, String distributionType, String pla
into('bin') {
with binFiles(distributionType, isTestDistro)
}
into("darwin".equals(platform) ? 'jdk.app' : 'jdk') {
into("darwin".equals(os) ? 'jdk.app' : 'jdk') {
if (isTestDistro == false) {
with jdkFiles(project, platform, architecture)
with jdkFiles(project, os, architecture)
}
}
into('') {
Expand All @@ -56,7 +56,11 @@ CopySpec archiveFiles(CopySpec modulesFiles, String distributionType, String pla

with noticeFile(isTestDistro)
into('modules') {
with modulesFiles
if (isTestDistro) {
with integTestModulesFiles
} else {
with modulesFiles(os, architecture)
}
}
}
}
Expand All @@ -65,42 +69,42 @@ CopySpec archiveFiles(CopySpec modulesFiles, String distributionType, String pla
distribution_archives {
integTestZip {
content {
archiveFiles(integTestModulesFiles, 'zip', null, 'x64', true)
archiveFiles('zip', null, null, true)
}
}

windowsZip {
archiveClassifier = 'windows-x86_64'
content {
archiveFiles(modulesFiles('windows-x86_64'), 'zip', 'windows', 'x64', false)
archiveFiles('zip', 'windows', 'x64', false)
}
}

darwinTar {
archiveClassifier = 'darwin-x86_64'
content {
archiveFiles(modulesFiles('darwin-x86_64'), 'tar', 'darwin', 'x64', false)
archiveFiles('tar', 'darwin', 'x64', false)
}
}

darwinAarch64Tar {
archiveClassifier = 'darwin-aarch64'
content {
archiveFiles(modulesFiles('darwin-aarch64'), 'tar', 'darwin', 'aarch64', false)
archiveFiles('tar', 'darwin', 'aarch64', false)
}
}

linuxAarch64Tar {
archiveClassifier = 'linux-aarch64'
content {
archiveFiles(modulesFiles('linux-aarch64'), 'tar', 'linux', 'aarch64', false)
archiveFiles('tar', 'linux', 'aarch64', false)
}
}

linuxTar {
archiveClassifier = 'linux-x86_64'
content {
archiveFiles(modulesFiles('linux-x86_64'), 'tar', 'linux', 'x64', false)
archiveFiles('tar', 'linux', 'x64', false)
}
}
}
Expand Down
17 changes: 11 additions & 6 deletions distribution/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -332,10 +332,10 @@ configure(subprojects.findAll { ['archives', 'packages'].contains(it.name) }) {
}
}

modulesFiles = { platform ->
modulesFiles = { os, architecture ->
copySpec {
eachFile {
if (it.relativePath.segments[-2] == 'bin' || ((platform == 'darwin-x86_64' || platform == 'darwin-aarch64') && it.relativePath.segments[-2] == 'MacOS')) {
if (it.relativePath.segments[-2] == 'bin' || (os == 'darwin' && it.relativePath.segments[-2] == 'MacOS')) {
// bin files, wherever they are within modules (eg platform specific) should be executable
// and MacOS is an alternative to bin on macOS
it.mode = 0755
Expand All @@ -344,7 +344,12 @@ configure(subprojects.findAll { ['archives', 'packages'].contains(it.name) }) {
}
}
List excludePlatforms = ['linux-x86_64', 'linux-aarch64', 'windows-x86_64', 'darwin-x86_64', 'darwin-aarch64']
if (platform != null) {
if (os != null) {
String platform = os + '-' + architecture
if (architecture == 'x64') {
// ML platform dir uses the x86_64 nomenclature
platform = os + '-x86_64'
}
excludePlatforms.remove(excludePlatforms.indexOf(platform))
} else {
excludePlatforms = []
Expand Down Expand Up @@ -430,15 +435,15 @@ configure(subprojects.findAll { ['archives', 'packages'].contains(it.name) }) {
}
}

jdkFiles = { Project project, String platform, String architecture ->
jdkFiles = { Project project, String os, String architecture ->
return copySpec {
from project.jdks."bundled_${platform}_${architecture}"
from project.jdks."bundled_${os}_${architecture}"
exclude "demo/**"
/*
* The Contents/MacOS directory interferes with notarization, and is unused by our distribution, so we exclude
* it from the build.
*/
if ("darwin".equals(platform)) {
if ("darwin".equals(os)) {
exclude "Contents/MacOS"
}
eachFile { FileCopyDetails details ->
Expand Down
2 changes: 1 addition & 1 deletion distribution/packages/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ def commonPackageConfig(String type, String architecture) {
with libFiles
}
into('modules') {
with modulesFiles('linux-' + ((architecture == 'x64') ? 'x86_64' : architecture))
with modulesFiles('linux', architecture)
}
into('jdk') {
with jdkFiles(project, 'linux', architecture)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,18 @@
package org.elasticsearch.cluster;

import org.apache.lucene.tests.util.LuceneTestCase;
import org.elasticsearch.action.admin.cluster.allocation.ClusterAllocationExplainRequest;
import org.elasticsearch.action.admin.cluster.allocation.TransportClusterAllocationExplainAction;
import org.elasticsearch.action.admin.cluster.node.shutdown.NodePrevalidateShardPathResponse;
import org.elasticsearch.action.admin.cluster.node.shutdown.PrevalidateShardPathRequest;
import org.elasticsearch.action.admin.cluster.node.shutdown.PrevalidateShardPathResponse;
import org.elasticsearch.action.admin.cluster.node.shutdown.TransportPrevalidateShardPathAction;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.ChunkedToXContent;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.test.ESIntegTestCase;

Expand Down Expand Up @@ -77,7 +81,31 @@ public void testCheckShards() throws Exception {
assertThat(resp2.getNodes().size(), equalTo(1));
assertThat(resp2.getNodes().get(0).getNode().getId(), equalTo(node2Id));
assertTrue("There should be no failures in the response", resp.failures().isEmpty());
assertTrue("The relocation source node should have removed the shard(s)", resp2.getNodes().get(0).getShardIds().isEmpty());
Set<ShardId> node2ShardIds = resp2.getNodes().get(0).getShardIds();
if (node2ShardIds.size() > 0) {
for (var node2Shard : clusterService().state()
.routingTable()
.allShards()
.filter(s -> s.getIndexName().equals(indexName))
.filter(s -> node2ShardIds.contains(s.shardId()))
.filter(s -> s.currentNodeId().equals(node2Id))
.toList()) {
var explanation = client().execute(
TransportClusterAllocationExplainAction.TYPE,
new ClusterAllocationExplainRequest().setIndex(node2Shard.getIndexName())
.setCurrentNode(node2Shard.currentNodeId())
.setShard(node2Shard.id())
.setPrimary(node2Shard.primary())
).get();
logger.info(
"Shard: {} is still located on relocation source node: {}. Allocation explanation: {}",
node2Shard.shardId(),
node2,
Strings.toString(ChunkedToXContent.wrapAsToXContent(explanation), false, true)
);
}
throw new AssertionError("The relocation source node should have removed the shard(s)");
}
} catch (AssertionError e) {
// Removal of shards which are no longer allocated to the node is attempted on every cluster state change in IndicesStore.
// If for whatever reason the removal is not triggered (e.g. not enough nodes reported that the shards are active) or it
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,13 @@

import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;

import org.apache.lucene.tests.util.LuceneTestCase;
import org.elasticsearch.test.TestClustersThreadFilter;
import org.elasticsearch.test.cluster.ElasticsearchCluster;
import org.elasticsearch.xpack.esql.qa.rest.FieldExtractorTestCase;
import org.junit.ClassRule;

@LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/105837")
@ThreadLeakFilters(filters = TestClustersThreadFilter.class)
public class FieldExtractorIT extends FieldExtractorTestCase {
@ClassRule
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -541,7 +541,6 @@ public void testClusterWithTwoMlNodes_RunsDatafeed_GivenOriginalNodeGoesDown() t
});
}

@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/103108")
public void testClusterWithTwoMlNodes_StopsDatafeed_GivenJobFailsOnReassign() throws Exception {
internalCluster().ensureAtMostNumDataNodes(0);
logger.info("Starting dedicated master node...");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ class DatafeedJob {
private volatile long lastDataCheckTimeMs;
private volatile Tuple<String, Annotation> lastDataCheckAnnotationWithId;
private volatile Long lastEndTimeMs;
private AtomicBoolean running = new AtomicBoolean(true);
private final AtomicBoolean running = new AtomicBoolean(true);
private volatile boolean isIsolated;
private volatile boolean haveEverSeenData;
private volatile long consecutiveDelayedDataBuckets;
Expand Down Expand Up @@ -351,7 +351,7 @@ public boolean isRunning() {
return running.get();
}

private void run(long start, long end, FlushJobAction.Request flushRequest) throws IOException {
private void run(long start, long end, FlushJobAction.Request flushRequest) {
if (end <= start) {
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.elasticsearch.xpack.core.ml.datafeed.SearchInterval;
import org.elasticsearch.xpack.ml.datafeed.DatafeedTimingStatsReporter;
import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractor;
import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorQueryContext;
import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorUtils;

import java.io.ByteArrayInputStream;
Expand Down Expand Up @@ -121,7 +122,7 @@ private InternalAggregations search() {
LOGGER.debug("[{}] Executing aggregated search", context.jobId);
ActionRequestBuilder<SearchRequest, SearchResponse> searchRequest = buildSearchRequest(buildBaseSearchSource());
assert searchRequest.request().allowPartialSearchResults() == false;
SearchResponse searchResponse = executeSearchRequest(searchRequest);
SearchResponse searchResponse = executeSearchRequest(client, context.queryContext, searchRequest);
try {
LOGGER.debug("[{}] Search response was obtained", context.jobId);
timingStatsReporter.reportSearchDuration(searchResponse.getTook());
Expand All @@ -142,9 +143,13 @@ private void initAggregationProcessor(InternalAggregations aggs) throws IOExcept
aggregationToJsonProcessor.process(aggs);
}

private SearchResponse executeSearchRequest(ActionRequestBuilder<SearchRequest, SearchResponse> searchRequestBuilder) {
static SearchResponse executeSearchRequest(
Client client,
DataExtractorQueryContext context,
ActionRequestBuilder<SearchRequest, SearchResponse> searchRequestBuilder
) {
SearchResponse searchResponse = ClientHelper.executeWithHeaders(
context.queryContext.headers,
context.headers,
ClientHelper.ML_ORIGIN,
client,
searchRequestBuilder::get
Expand Down Expand Up @@ -216,7 +221,7 @@ public DataSummary getSummary() {
ActionRequestBuilder<SearchRequest, SearchResponse> searchRequestBuilder = buildSearchRequest(
DataExtractorUtils.getSearchSourceBuilderForSummary(context.queryContext)
);
SearchResponse searchResponse = executeSearchRequest(searchRequestBuilder);
SearchResponse searchResponse = executeSearchRequest(client, context.queryContext, searchRequestBuilder);
try {
LOGGER.debug("[{}] Aggregating Data summary response was obtained", context.jobId);
timingStatsReporter.reportSearchDuration(searchResponse.getTook());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregation;
import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregationBuilder;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.xpack.core.ClientHelper;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfigUtils;
import org.elasticsearch.xpack.core.ml.datafeed.SearchInterval;
import org.elasticsearch.xpack.core.ml.utils.Intervals;
Expand Down Expand Up @@ -48,9 +47,6 @@ class CompositeAggregationDataExtractor implements DataExtractor {

private static final Logger LOGGER = LogManager.getLogger(CompositeAggregationDataExtractor.class);

private static final String EARLIEST_TIME = "earliest_time";
private static final String LATEST_TIME = "latest_time";

private volatile Map<String, Object> afterKey = null;
private final CompositeAggregationBuilder compositeAggregationBuilder;
private final Client client;
Expand Down Expand Up @@ -90,7 +86,7 @@ public boolean isCancelled() {

@Override
public void cancel() {
LOGGER.debug(() -> "[" + context.jobId + "] Data extractor received cancel request");
LOGGER.debug("[{}] Data extractor received cancel request", context.jobId);
isCancelled = true;
}

Expand All @@ -113,7 +109,7 @@ public Result next() throws IOException {
SearchInterval searchInterval = new SearchInterval(context.queryContext.start, context.queryContext.end);
InternalAggregations aggs = search();
if (aggs == null) {
LOGGER.trace(() -> "[" + context.jobId + "] extraction finished");
LOGGER.trace("[{}] extraction finished", context.jobId);
hasNext = false;
afterKey = null;
return new Result(searchInterval, Optional.empty());
Expand Down Expand Up @@ -153,9 +149,9 @@ private InternalAggregations search() {
}
searchSourceBuilder.aggregation(compositeAggregationBuilder);
ActionRequestBuilder<SearchRequest, SearchResponse> searchRequest = requestBuilder.build(searchSourceBuilder);
SearchResponse searchResponse = executeSearchRequest(searchRequest);
SearchResponse searchResponse = AbstractAggregationDataExtractor.executeSearchRequest(client, context.queryContext, searchRequest);
try {
LOGGER.trace(() -> "[" + context.jobId + "] Search composite response was obtained");
LOGGER.trace("[{}] Search composite response was obtained", context.jobId);
timingStatsReporter.reportSearchDuration(searchResponse.getTook());
InternalAggregations aggregations = searchResponse.getAggregations();
if (aggregations == null) {
Expand All @@ -171,25 +167,6 @@ private InternalAggregations search() {
}
}

private SearchResponse executeSearchRequest(ActionRequestBuilder<SearchRequest, SearchResponse> searchRequestBuilder) {
SearchResponse searchResponse = ClientHelper.executeWithHeaders(
context.queryContext.headers,
ClientHelper.ML_ORIGIN,
client,
searchRequestBuilder::get
);
boolean success = false;
try {
DataExtractorUtils.checkForSkippedClusters(searchResponse);
success = true;
} finally {
if (success == false) {
searchResponse.decRef();
}
}
return searchResponse;
}

private InputStream processAggs(InternalAggregations aggs) throws IOException {
AggregationToJsonProcessor aggregationToJsonProcessor = new AggregationToJsonProcessor(
context.queryContext.timeField,
Expand Down Expand Up @@ -262,7 +239,11 @@ public DataSummary getSummary() {
client,
context.queryContext
);
SearchResponse searchResponse = executeSearchRequest(searchRequestBuilder);
SearchResponse searchResponse = AbstractAggregationDataExtractor.executeSearchRequest(
client,
context.queryContext,
searchRequestBuilder
);
try {
LOGGER.debug("[{}] Aggregating Data summary response was obtained", context.jobId);
timingStatsReporter.reportSearchDuration(searchResponse.getTook());
Expand Down
Loading

0 comments on commit 4f8cd27

Please sign in to comment.