Skip to content

Commit

Permalink
[ML] Avoid assertions on empty Optional in DF usage test (elastic#40043)
Browse files Browse the repository at this point in the history
Refactor the usage class to make testing simpler
  • Loading branch information
davidkyle authored Mar 15, 2019
1 parent 758fb15 commit 4c803e5
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 69 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,12 @@

public class DataFrameTransformStateAndStatsTests extends AbstractSerializingDataFrameTestCase<DataFrameTransformStateAndStats> {

public static DataFrameTransformStateAndStats randomDataFrameTransformStateAndStats(String id) {
return new DataFrameTransformStateAndStats(id,
DataFrameTransformStateTests.randomDataFrameTransformState(),
DataFrameIndexerTransformStatsTests.randomStats());
}

public static DataFrameTransformStateAndStats randomDataFrameTransformStateAndStats() {
return new DataFrameTransformStateAndStats(randomAlphaOfLengthBetween(1, 10),
DataFrameTransformStateTests.randomDataFrameTransformState(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,12 @@
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameIndexerTransformStats;
import org.elasticsearch.xpack.core.dataframe.action.GetDataFrameTransformsStatsAction;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformStateAndStats;
import org.elasticsearch.xpack.core.indexing.IndexerState;

import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
Expand Down Expand Up @@ -78,31 +80,38 @@ public void usage(ActionListener<XPackFeatureSet.Usage> listener) {

GetDataFrameTransformsAction.Request transformsRequest = new GetDataFrameTransformsAction.Request(MetaData.ALL);
client.execute(GetDataFrameTransformsAction.INSTANCE, transformsRequest, ActionListener.wrap(
transforms -> {
Set<String> transformIds = transforms.getTransformConfigurations()
.stream()
.map(DataFrameTransformConfig::getId)
.collect(Collectors.toSet());
GetDataFrameTransformsStatsAction.Request transformStatsRequest =
new GetDataFrameTransformsStatsAction.Request(MetaData.ALL);
client.execute(GetDataFrameTransformsStatsAction.INSTANCE,
transformStatsRequest,
ActionListener.wrap(transformStatsResponse -> {
Map<String, Long> transformsCountByState = new HashMap<>();
DataFrameIndexerTransformStats accumulatedStats = new DataFrameIndexerTransformStats();

transformStatsResponse.getTransformsStateAndStats().forEach(singleResult -> {
transformIds.remove(singleResult.getId());
transformsCountByState.merge(singleResult.getTransformState().getIndexerState().value(), 1L, Long::sum);
accumulatedStats.merge(singleResult.getTransformStats());
});
// If there is no state returned, assumed stopped
transformIds.forEach(ignored -> transformsCountByState.merge(IndexerState.STOPPED.value(), 1L, Long::sum));

listener.onResponse(new DataFrameFeatureSetUsage(available(), enabled(), transformsCountByState, accumulatedStats));
}, listener::onFailure));
},
listener::onFailure
transforms -> {
GetDataFrameTransformsStatsAction.Request transformStatsRequest =
new GetDataFrameTransformsStatsAction.Request(MetaData.ALL);
client.execute(GetDataFrameTransformsStatsAction.INSTANCE, transformStatsRequest,
ActionListener.wrap(transformStatsResponse -> {
listener.onResponse(createUsage(available(), enabled(), transforms.getTransformConfigurations(),
transformStatsResponse.getTransformsStateAndStats()));
}, listener::onFailure));
},
listener::onFailure
));
}

static DataFrameFeatureSetUsage createUsage(boolean available, boolean enabled,
List<DataFrameTransformConfig> transforms,
List<DataFrameTransformStateAndStats> transformsStateAndStats) {

Set<String> transformIds = transforms.stream()
.map(DataFrameTransformConfig::getId)
.collect(Collectors.toSet());

Map<String, Long> transformsCountByState = new HashMap<>();
DataFrameIndexerTransformStats accumulatedStats = new DataFrameIndexerTransformStats();

transformsStateAndStats.forEach(singleResult -> {
transformIds.remove(singleResult.getId());
transformsCountByState.merge(singleResult.getTransformState().getIndexerState().value(), 1L, Long::sum);
accumulatedStats.merge(singleResult.getTransformStats());
});
// If there is no state returned, assumed stopped
transformIds.forEach(ignored -> transformsCountByState.merge(IndexerState.STOPPED.value(), 1L, Long::sum));

return new DataFrameFeatureSetUsage(available, enabled, transformsCountByState, accumulatedStats);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

package org.elasticsearch.xpack.dataframe;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.settings.Settings;
Expand All @@ -19,14 +18,12 @@
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.core.XPackFeatureSet;
import org.elasticsearch.xpack.core.XPackFeatureSet.Usage;
import org.elasticsearch.xpack.core.dataframe.action.GetDataFrameTransformsAction;
import org.elasticsearch.xpack.core.dataframe.DataFrameFeatureSetUsage;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameIndexerTransformStats;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfigTests;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformStateAndStats;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformStateAndStatsTests;
import org.elasticsearch.xpack.core.dataframe.action.GetDataFrameTransformsStatsAction;
import org.elasticsearch.xpack.core.dataframe.action.GetDataFrameTransformsStatsAction.Response;
import org.elasticsearch.xpack.core.indexing.IndexerState;
import org.junit.Before;

Expand All @@ -39,9 +36,6 @@

import static java.lang.Math.toIntExact;
import static org.hamcrest.core.Is.is;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.same;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

Expand Down Expand Up @@ -73,21 +67,20 @@ public void testEnabledDefault() {
assertTrue(featureSet.enabled());
}

@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/40022")
public void testUsage() throws InterruptedException, ExecutionException, IOException {
Client client = mock(Client.class);
when(licenseState.isDataFrameAllowed()).thenReturn(true);

DataFrameFeatureSet featureSet = new DataFrameFeatureSet(Settings.EMPTY, client, licenseState);

public void testUsage() throws IOException {
List<DataFrameTransformStateAndStats> transformsStateAndStats = new ArrayList<>();
for (int i = 0; i < randomIntBetween(0, 10); ++i) {
transformsStateAndStats.add(DataFrameTransformStateAndStatsTests.randomDataFrameTransformStateAndStats());
int count = randomIntBetween(0, 10);
int uniqueId = 0;
for (int i = 0; i < count; ++i) {
transformsStateAndStats.add(
DataFrameTransformStateAndStatsTests.randomDataFrameTransformStateAndStats("df-" + Integer.toString(uniqueId++)));
}

count = randomIntBetween(0, 10);
List<DataFrameTransformConfig> transformConfigWithoutTasks = new ArrayList<>();
for (int i = 0; i < randomIntBetween(0, 10); ++i) {
transformConfigWithoutTasks.add(DataFrameTransformConfigTests.randomDataFrameTransformConfig());
for (int i = 0; i < count; ++i) {
transformConfigWithoutTasks.add(
DataFrameTransformConfigTests.randomDataFrameTransformConfig("df-" + Integer.toString(uniqueId++)));
}

List<DataFrameTransformConfig> transformConfigWithTasks = new ArrayList<>(transformsStateAndStats.size());
Expand All @@ -98,35 +91,17 @@ public void testUsage() throws InterruptedException, ExecutionException, IOExcep
allConfigs.addAll(transformConfigWithoutTasks);
allConfigs.addAll(transformConfigWithTasks);

GetDataFrameTransformsStatsAction.Response mockResponse = new GetDataFrameTransformsStatsAction.Response(transformsStateAndStats);
GetDataFrameTransformsAction.Response mockTransformsResponse = new GetDataFrameTransformsAction.Response(allConfigs);

doAnswer(invocationOnMock -> {
@SuppressWarnings("unchecked")
ActionListener<Response> listener = (ActionListener<Response>) invocationOnMock.getArguments()[2];
listener.onResponse(mockResponse);
return Void.TYPE;
}).when(client).execute(same(GetDataFrameTransformsStatsAction.INSTANCE), any(), any());

doAnswer(invocationOnMock -> {
@SuppressWarnings("unchecked")
ActionListener<GetDataFrameTransformsAction.Response> listener =
(ActionListener<GetDataFrameTransformsAction.Response>) invocationOnMock.getArguments()[2];
listener.onResponse(mockTransformsResponse);
return Void.TYPE;
}).when(client).execute(same(GetDataFrameTransformsAction.INSTANCE), any(), any());

PlainActionFuture<Usage> future = new PlainActionFuture<>();
featureSet.usage(future);
XPackFeatureSet.Usage usage = future.get();
boolean enabled = randomBoolean();
boolean available = randomBoolean();
DataFrameFeatureSetUsage usage = DataFrameFeatureSet.createUsage(available, enabled, allConfigs, transformsStateAndStats);

assertTrue(usage.enabled());
assertEquals(enabled, usage.enabled());
try (XContentBuilder builder = XContentFactory.jsonBuilder()) {
usage.toXContent(builder, ToXContent.EMPTY_PARAMS);

XContentParser parser = createParser(builder);
Map<String, Object> usageAsMap = parser.map();
assertTrue((boolean) XContentMapValues.extractValue("available", usageAsMap));
assertEquals(available, (boolean) XContentMapValues.extractValue("available", usageAsMap));

if (transformsStateAndStats.isEmpty() && transformConfigWithoutTasks.isEmpty()) {
// no transforms, no stats
Expand All @@ -142,12 +117,16 @@ public void testUsage() throws InterruptedException, ExecutionException, IOExcep
transformConfigWithoutTasks.forEach(ignored -> stateCounts.merge(IndexerState.STOPPED.value(), 1, Integer::sum));
stateCounts.forEach((k, v) -> assertEquals(v, XContentMapValues.extractValue("transforms." + k, usageAsMap)));

DataFrameIndexerTransformStats combinedStats = transformsStateAndStats.stream().map(x -> x.getTransformStats())
.reduce((l, r) -> l.merge(r)).get();
// use default constructed stats object for assertions if transformsStateAndStats is empty
DataFrameIndexerTransformStats combinedStats = new DataFrameIndexerTransformStats();
if (transformsStateAndStats.isEmpty() == false) {
combinedStats = transformsStateAndStats.stream().map(x -> x.getTransformStats()).reduce((l, r) -> l.merge(r)).get();
}

assertEquals(toIntExact(combinedStats.getIndexFailures()),
XContentMapValues.extractValue("stats.index_failures", usageAsMap));
assertEquals(toIntExact(combinedStats.getIndexTotal()), XContentMapValues.extractValue("stats.index_total", usageAsMap));
assertEquals(toIntExact(combinedStats.getIndexTotal()),
XContentMapValues.extractValue("stats.index_total", usageAsMap));
assertEquals(toIntExact(combinedStats.getSearchTime()),
XContentMapValues.extractValue("stats.search_time_in_ms", usageAsMap));
assertEquals(toIntExact(combinedStats.getNumDocuments()),
Expand Down

0 comments on commit 4c803e5

Please sign in to comment.