Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Cumulative Cardinality agg (and Data Science plugin) #43661

Merged
merged 17 commits into from
Aug 26, 2019
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions distribution/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,7 @@ task run(type: RunTask) {
setting 'xpack.monitoring.enabled', 'true'
setting 'xpack.sql.enabled', 'true'
setting 'xpack.rollup.enabled', 'true'
setting 'xpack.data-science.enabled', 'true'
keystoreSetting 'bootstrap.password', 'password'
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,230 @@
[role="xpack"]
[testenv="basic"]
[[search-aggregations-pipeline-cumulative-cardinality-aggregation]]
=== Cumulative Cardinality Aggregation

A parent pipeline aggregation which calculates the Cumulative Cardinality in a parent histogram (or date_histogram)
aggregation. The specified metric must be a cardinality aggregation and the enclosing histogram
must have `min_doc_count` set to `0` (default for `histogram` aggregations).

The `cumulative_cardinality` agg is useful for finding "total new items", like the number of new visitors to your
website each day. A regular cardinality aggregation will tell you how many unique visitors came each day, but doesn't
differentiate between "new" or "repeat" visitors. The Cumulative Cardinality aggregation can be used to determine
how many of each day's unique visitors are "new".

==== Syntax

A `cumulative_cardinality` aggregation looks like this in isolation:

[source,js]
--------------------------------------------------
{
"cumulative_cardinality": {
"buckets_path": "my_cardinality_agg"
}
}
--------------------------------------------------
// NOTCONSOLE

[[cumulative-cardinality-params]]
.`cumulative_cardinality` Parameters
[options="header"]
|===
|Parameter Name |Description |Required |Default Value
|`buckets_path` |The path to the cardinality aggregation we wish to find the cumulative cardinality for (see <<buckets-path-syntax>> for more
details) |Required |
|`format` |format to apply to the output value of this aggregation |Optional |`null`
|===

The following snippet calculates the cumulative cardinality of the total monthly `sales`:
polyfractal marked this conversation as resolved.
Show resolved Hide resolved

[source,js]
--------------------------------------------------
POST /sales/_search
{
"size": 0,
"aggs" : {
"sales_per_month" : {
"date_histogram" : {
"field" : "date",
"calendar_interval" : "month"
},
"aggs": {
"distinct_sale_types": {
"cardinality": {
"field": "type"
}
},
"total_new_types": {
"cumulative_cardinality": {
"buckets_path": "distinct_sale_types" <1>
}
}
}
}
}
}
--------------------------------------------------
// CONSOLE
// TEST[setup:sales]

<1> `buckets_path` instructs this aggregation to use the output of the `distinct_sale_types` aggregation for the cumulative cardinality

And the following may be the response:

[source,js]
--------------------------------------------------
{
"took": 11,
"timed_out": false,
"_shards": ...,
"hits": ...,
"aggregations": {
"sales_per_month": {
"buckets": [
{
"key_as_string": "2015/01/01 00:00:00",
"key": 1420070400000,
"doc_count": 3,
"distinct_sale_types": {
"value": 3
},
"total_new_types": {
"value": 3
}
},
{
"key_as_string": "2015/02/01 00:00:00",
"key": 1422748800000,
"doc_count": 2,
"distinct_sale_types": {
"value": 2
},
"total_new_types": {
"value": 3
}
},
{
"key_as_string": "2015/03/01 00:00:00",
"key": 1425168000000,
"doc_count": 2,
"distinct_sale_types": {
"value": 2
},
"total_new_types": {
"value": 3
}
}
]
}
}
}
--------------------------------------------------
// TESTRESPONSE[s/"took": 11/"took": $body.took/]
// TESTRESPONSE[s/"_shards": \.\.\./"_shards": $body._shards/]
// TESTRESPONSE[s/"hits": \.\.\./"hits": $body.hits/]


==== Incremental cumulative cardinality

The `cumulative_cardinality` agg will show you the total, distinct count since the beginning of the time period
being queried. Sometimes, however, it is useful to see the "incremental" count. Meaning, how many new users
are added each day, rather than the total cumulative count.

This can be accomplished by adding a `derivative` aggregation to our query:

[source,js]
--------------------------------------------------
POST /sales/_search
{
"size": 0,
"aggs" : {
"sales_per_month" : {
"date_histogram" : {
"field" : "date",
"calendar_interval" : "month"
},
"aggs": {
"distinct_sale_types": {
"cardinality": {
"field": "type"
}
},
"total_new_types": {
"cumulative_cardinality": {
"buckets_path": "distinct_sale_types"
}
},
"incremental_new_types": {
"derivative": {
"buckets_path": "total_new_types"
}
}
}
}
}
}
--------------------------------------------------
// CONSOLE
// TEST[setup:sales]


And the following may be the response:

[source,js]
--------------------------------------------------
{
"took": 11,
"timed_out": false,
"_shards": ...,
"hits": ...,
"aggregations": {
"sales_per_month": {
"buckets": [
{
"key_as_string": "2015/01/01 00:00:00",
"key": 1420070400000,
"doc_count": 3,
"distinct_sale_types": {
"value": 3
},
"total_new_types": {
"value": 3
}
},
{
"key_as_string": "2015/02/01 00:00:00",
"key": 1422748800000,
"doc_count": 2,
"distinct_sale_types": {
"value": 2
},
"total_new_types": {
"value": 3
},
"incremental_new_types": {
"value": 0.0
}
},
{
"key_as_string": "2015/03/01 00:00:00",
"key": 1425168000000,
"doc_count": 2,
"distinct_sale_types": {
"value": 2
},
"total_new_types": {
"value": 3
},
"incremental_new_types": {
"value": 0.0
}
}
]
}
}
}
--------------------------------------------------
// TESTRESPONSE[s/"took": 11/"took": $body.took/]
// TESTRESPONSE[s/"_shards": \.\.\./"_shards": $body._shards/]
// TESTRESPONSE[s/"hits": \.\.\./"hits": $body.hits/]
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public long getValue() {
return counts == null ? 0 : counts.cardinality(0);
}

HyperLogLogPlusPlus getCounts() {
public HyperLogLogPlusPlus getCounts() {
return counts;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ public class XPackLicenseState {
"Creating and Starting rollup jobs will no longer be allowed.",
"Stopping/Deleting existing jobs, RollupCaps API and RollupSearch continue to function."
});
messages.put(XPackField.DATA_SCIENCE, new String[] {
"Aggregations provided by Data Science plugin are no longer usable."
});
EXPIRATION_MESSAGES = Collections.unmodifiableMap(messages);
}

Expand Down Expand Up @@ -719,6 +722,15 @@ public synchronized boolean isOdbcAllowed() {
return licensed && localStatus.active;
}

/**
* Rollup is always available as long as there is a valid license
polyfractal marked this conversation as resolved.
Show resolved Hide resolved
*
* @return true if the license is active
*/
public synchronized boolean isDataScienceAllowed() {
return status.active;
}

public synchronized boolean isTrialLicense() {
return status.mode == OperationMode.TRIAL;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ public final class XPackField {
public static final String VECTORS = "vectors";
/** Name constant for the voting-only-node feature. */
public static final String VOTING_ONLY = "voting_only";
/** Name constant for the data science plugin. */
public static final String DATA_SCIENCE = "data_science";

private XPackField() {}

Expand Down
26 changes: 26 additions & 0 deletions x-pack/plugin/data-science/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
evaluationDependsOn(xpackModule('core'))

apply plugin: 'elasticsearch.esplugin'
esplugin {
name 'x-pack-data-science'
description 'Elasticsearch Expanded Pack Plugin - Data Science'
classname 'org.elasticsearch.xpack.datascience.DataSciencePlugin'
extendedPlugins = ['x-pack-core']
}
archivesBaseName = 'x-pack-data-science'

compileJava.options.compilerArgs << "-Xlint:-rawtypes"
compileTestJava.options.compilerArgs << "-Xlint:-rawtypes"


dependencies {
compileOnly project(":server")

compileOnly project(path: xpackModule('core'), configuration: 'default')
testCompile project(path: xpackModule('core'), configuration: 'testArtifacts')
if (isEclipse) {
testCompile project(path: xpackModule('core-tests'), configuration: 'testArtifacts')
}
}

integTest.enabled = false
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.datascience;

import org.elasticsearch.xpack.datascience.cumulativecardinality.CumulativeCardinalityPipelineAggregationBuilder;

public class DataScienceAggregationBuilders {

public static CumulativeCardinalityPipelineAggregationBuilder cumulativeCaardinality(String name, String bucketsPath) {
return new CumulativeCardinalityPipelineAggregationBuilder(name, bucketsPath);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.datascience;

import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.SearchPlugin;
import org.elasticsearch.xpack.core.XPackPlugin;
import org.elasticsearch.xpack.datascience.cumulativecardinality.CumulativeCardinalityPipelineAggregationBuilder;
import org.elasticsearch.xpack.datascience.cumulativecardinality.CumulativeCardinalityPipelineAggregator;

import java.util.List;

import static java.util.Collections.singletonList;

public class DataSciencePlugin extends Plugin implements SearchPlugin {

// volatile so all threads can see changes
protected static volatile boolean isDataScienceAllowed;

public DataSciencePlugin() {
registerLicenseListener();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually don't think all this stuff is needed, since you can't go from Basic+ to OSS without uninstalling/reinstalling (I think). So there's no need to watch and cache license changes for a Basic+ feature here.

Will verify and adjust accordingly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like it's still needed, at least until #45022 is resolved.

}

/**
* Protected for test over-riding
*/
protected void registerLicenseListener() {
// Add a listener to the license state and cache it when there is a change.
// Aggs could be called in high numbers so we don't want them contending on
// the synchronized isFooAllowed() methods
XPackPlugin.getSharedLicenseState()
.addListener(() -> isDataScienceAllowed = XPackPlugin.getSharedLicenseState().isDataScienceAllowed());
}

public static boolean isIsDataScienceAllowed() {
return isDataScienceAllowed;
}

@Override
public List<PipelineAggregationSpec> getPipelineAggregations() {
return singletonList(new PipelineAggregationSpec(
CumulativeCardinalityPipelineAggregationBuilder.NAME,
CumulativeCardinalityPipelineAggregationBuilder::new,
CumulativeCardinalityPipelineAggregator::new,
CumulativeCardinalityPipelineAggregationBuilder::parse));
}
}
Loading