Skip to content

Commit

Permalink
nits and tests
Browse files Browse the repository at this point in the history
Signed-off-by: Sarthak Aggarwal <sarthagg@amazon.com>
  • Loading branch information
sarthakaggarwal97 committed Jul 19, 2024
1 parent 9250579 commit 8946d85
Show file tree
Hide file tree
Showing 11 changed files with 417 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@
*/
@ExperimentalApi
public class Composite99DocValuesReader extends DocValuesProducer implements CompositeIndexReader {
private static final Logger logger = LogManager.getLogger(CompositeIndexMetadata.class);
private static final Logger logger = LogManager.getLogger(Composite99DocValuesReader.class);

private final DocValuesProducer delegate;
private IndexInput dataIn;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public Long toStarTreeNumericTypeValue(Long value) {
}

@Override
public long getIdempotentMetricValue() {
public long getIdentityMetricValue() {
return 0L;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public Double toStarTreeNumericTypeValue(Long value) {
}

@Override
public long getIdempotentMetricValue() {
public long getIdentityMetricValue() {
return Long.MIN_VALUE;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public Double toStarTreeNumericTypeValue(Long value) {
}

@Override
public long getIdempotentMetricValue() {
public long getIdentityMetricValue() {
return Long.MAX_VALUE;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ public Double toStarTreeNumericTypeValue(Long value) {
}

@Override
public long getIdempotentMetricValue() {
public long getIdentityMetricValue() {
return 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,5 +65,5 @@ public interface ValueAggregator<A> {
/**
* Fetches a value that does not alter the result of aggregations
*/
long getIdempotentMetricValue();
long getIdentityMetricValue();
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.util.Counter;
import org.apache.lucene.util.NumericUtils;
import org.opensearch.common.time.DateUtils;
import org.opensearch.index.compositeindex.datacube.Dimension;
import org.opensearch.index.compositeindex.datacube.Metric;
import org.opensearch.index.compositeindex.datacube.MetricStat;
Expand All @@ -42,7 +41,6 @@
import org.opensearch.index.mapper.NumberFieldMapper;

import java.io.IOException;
import java.time.temporal.ChronoField;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand Down Expand Up @@ -269,35 +267,6 @@ public void build(
serializeStarTree(numSegmentStarTreeDocument);
}

private long getTimeStampVal(final String fieldName, final long val) {
long roundedDate = 0;
long ratio = 0;

switch (fieldName) {

case "@timestamp":
ratio = ChronoField.MINUTE_OF_HOUR.getBaseUnit().getDuration().toMillis();
roundedDate = DateUtils.roundFloor(val, ratio);
return roundedDate;
case "hour":
ratio = ChronoField.HOUR_OF_DAY.getBaseUnit().getDuration().toMillis();
roundedDate = DateUtils.roundFloor(val, ratio);
return roundedDate;
case "day":
ratio = ChronoField.DAY_OF_MONTH.getBaseUnit().getDuration().toMillis();
roundedDate = DateUtils.roundFloor(val, ratio);
return roundedDate;
case "month":
roundedDate = DateUtils.roundMonthOfYear(val);
return roundedDate;
case "year":
roundedDate = DateUtils.roundYear(val);
return roundedDate;
default:
return val;
}
}

private void serializeStarTree(int numSegmentStarTreeDocument) throws IOException {
// serialize the star tree data
long dataFilePointer = dataOut.getFilePointer();
Expand Down Expand Up @@ -581,7 +550,7 @@ protected StarTreeDocument reduceSegmentStarTreeDocuments(
metrics[i] = metricValueAggregator.getInitialAggregatedValue(segmentDocument.metrics[i]);
} else {
metrics[i] = metricValueAggregator.getInitialAggregatedValueForSegmentDocValue(
getLong(segmentDocument.metrics[i], metricValueAggregator.getIdempotentMetricValue()),
getLong(segmentDocument.metrics[i], metricValueAggregator.getIdentityMetricValue()),
starTreeNumericType
);
}
Expand All @@ -605,7 +574,7 @@ protected StarTreeDocument reduceSegmentStarTreeDocuments(
} else {
aggregatedSegmentDocument.metrics[i] = metricValueAggregator.mergeAggregatedValueAndSegmentValue(
aggregatedSegmentDocument.metrics[i],
getLong(segmentDocument.metrics[i], metricValueAggregator.getIdempotentMetricValue()),
getLong(segmentDocument.metrics[i], metricValueAggregator.getIdentityMetricValue()),
starTreeNumericType
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public StarTreesBuilder(SegmentWriteState segmentWriteState, MapperService mappe
* @param dataOut an IndexInput for star-tree data
* @param fieldProducerMap fetches iterators for the fields (dimensions and metrics)
* @param starTreeDocValuesConsumer a consumer to write star-tree doc values
* @throws IOException
* @throws IOException when an error occurs while building the star-trees
*/
public void build(
IndexOutput metaOut,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,15 @@ public class StarTreeDataWriter {
* @throws IOException if an I/O error occurs while writing the star-tree data
*/
public static long serializeStarTree(IndexOutput indexOutput, TreeNode rootNode, int numNodes) throws IOException {
int headerSizeInBytes = computeStarTreeDataHeaderByteSize();
long totalSizeInBytes = headerSizeInBytes + (long) numNodes * SERIALIZABLE_DATA_SIZE_IN_BYTES;

logger.debug("Star tree size in bytes : {}", totalSizeInBytes);
long totalDataSizeInBytes = (long) numNodes * SERIALIZABLE_DATA_SIZE_IN_BYTES;
if (logger.isDebugEnabled()) {
int headerSizeInBytes = computeStarTreeDataHeaderByteSize();
logger.debug("Star tree size in bytes : {}", headerSizeInBytes + totalDataSizeInBytes);
}

writeStarTreeHeader(indexOutput, numNodes);
writeStarTreeNodes(indexOutput, rootNode);
return totalSizeInBytes;
return totalDataSizeInBytes;
}

/**
Expand Down Expand Up @@ -95,7 +96,7 @@ private static void writeStarTreeNodes(IndexOutput output, TreeNode rootNode) th
while (!queue.isEmpty()) {
TreeNode node = queue.remove();

if (node.children == null) {
if (node.children == null || node.children.isEmpty()) {
writeStarTreeNode(output, node, ALL, ALL);
} else {

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.index.compositeindex.datacube.startree.meta;

import org.apache.lucene.codecs.lucene99.Lucene99Codec;
import org.apache.lucene.index.DocValuesType;
import org.apache.lucene.index.FieldInfo;
import org.apache.lucene.index.FieldInfos;
import org.apache.lucene.index.IndexOptions;
import org.apache.lucene.index.SegmentInfo;
import org.apache.lucene.index.SegmentWriteState;
import org.apache.lucene.index.VectorEncoding;
import org.apache.lucene.index.VectorSimilarityFunction;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.util.InfoStream;
import org.apache.lucene.util.Version;
import org.opensearch.index.compositeindex.CompositeIndexMetadata;
import org.opensearch.index.compositeindex.datacube.Dimension;
import org.opensearch.index.compositeindex.datacube.Metric;
import org.opensearch.index.compositeindex.datacube.MetricStat;
import org.opensearch.index.compositeindex.datacube.NumericDimension;
import org.opensearch.index.compositeindex.datacube.startree.StarTreeField;
import org.opensearch.index.compositeindex.datacube.startree.StarTreeFieldConfiguration;
import org.opensearch.index.compositeindex.datacube.startree.aggregators.MetricAggregatorInfo;
import org.opensearch.index.compositeindex.datacube.startree.aggregators.MetricEntry;
import org.opensearch.index.compositeindex.datacube.startree.utils.StarTreeBuilderUtils;
import org.opensearch.index.fielddata.IndexNumericFieldData;
import org.opensearch.test.OpenSearchTestCase;
import org.junit.Before;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.UUID;

import static org.opensearch.index.compositeindex.CompositeIndexConstants.MAGIC_MARKER;
import static org.opensearch.index.mapper.CompositeMappedFieldType.CompositeFieldType.STAR_TREE;

public class StarTreeMetaTests extends OpenSearchTestCase {

private IndexOutput metaOut;
private IndexInput metaIn;
private StarTreeField starTreeField;
private SegmentWriteState writeState;
private Directory directory;
private FieldInfo[] fieldsInfo;
private List<Dimension> dimensionsOrder;
private List<String> fields = List.of();
private List<Metric> metrics;
private List<MetricAggregatorInfo> metricAggregatorInfos = new ArrayList<>();
private int segmentDocumentCount;
private long dataFilePointer;
private long dataFileLength;

@Before
public void setup() throws IOException {
fields = List.of("field1", "field2", "field3", "field4", "field5", "field6", "field7", "field8", "field9", "field10");
directory = newFSDirectory(createTempDir());
SegmentInfo segmentInfo = new SegmentInfo(
directory,
Version.LATEST,
Version.LUCENE_9_11_0,
"test_segment",
6,
false,
false,
new Lucene99Codec(),
new HashMap<>(),
UUID.randomUUID().toString().substring(0, 16).getBytes(StandardCharsets.UTF_8),
new HashMap<>(),
null
);

fieldsInfo = new FieldInfo[fields.size()];
for (int i = 0; i < fieldsInfo.length; i++) {
fieldsInfo[i] = new FieldInfo(
fields.get(i),
i,
false,
false,
true,
IndexOptions.DOCS_AND_FREQS_AND_POSITIONS_AND_OFFSETS,
DocValuesType.SORTED_NUMERIC,
-1,
Collections.emptyMap(),
0,
0,
0,
0,
VectorEncoding.FLOAT32,
VectorSimilarityFunction.EUCLIDEAN,
false,
false
);
}
FieldInfos fieldInfos = new FieldInfos(fieldsInfo);
writeState = new SegmentWriteState(InfoStream.getDefault(), segmentInfo.dir, segmentInfo, fieldInfos, null, newIOContext(random()));
}

public void test_starTreeMetadata() throws IOException {
dimensionsOrder = List.of(
new NumericDimension("field1"),
new NumericDimension("field3"),
new NumericDimension("field5"),
new NumericDimension("field8")
);
metrics = List.of(
new Metric("field2", List.of(MetricStat.SUM)),
new Metric("field4", List.of(MetricStat.SUM)),
new Metric("field6", List.of(MetricStat.COUNT)),
new Metric("field9", List.of(MetricStat.MIN)),
new Metric("field10", List.of(MetricStat.MAX))
);
int maxLeafDocs = randomInt();
StarTreeFieldConfiguration starTreeFieldConfiguration = new StarTreeFieldConfiguration(
maxLeafDocs,
new HashSet<>(),
StarTreeFieldConfiguration.StarTreeBuildMode.ON_HEAP
);
starTreeField = new StarTreeField("star_tree", dimensionsOrder, metrics, starTreeFieldConfiguration);

for (Metric metric : metrics) {
for (MetricStat metricType : metric.getMetrics()) {
MetricAggregatorInfo metricAggregatorInfo = new MetricAggregatorInfo(
metricType,
metric.getField(),
starTreeField.getName(),
IndexNumericFieldData.NumericType.DOUBLE
);
metricAggregatorInfos.add(metricAggregatorInfo);
}
}

dataFileLength = randomLong();
dataFilePointer = randomLong();
segmentDocumentCount = randomInt();
metaOut = directory.createOutput("star-tree-metadata", IOContext.DEFAULT);
StarTreeBuilderUtils.serializeStarTreeMetadata(
metaOut,
starTreeField,
writeState,
metricAggregatorInfos,
segmentDocumentCount,
dataFilePointer,
dataFileLength
);
metaOut.close();
metaIn = directory.openInput("star-tree-metadata", IOContext.READONCE);
assertEquals(MAGIC_MARKER, metaIn.readLong());

CompositeIndexMetadata compositeIndexMetadata = new CompositeIndexMetadata(metaIn);
assertEquals(starTreeField.getName(), compositeIndexMetadata.getCompositeFieldName());
assertEquals(STAR_TREE, compositeIndexMetadata.getCompositeFieldType());

StarTreeMetadata starTreeMetadata = compositeIndexMetadata.getStarTreeMetadata();
assertNotNull(starTreeMetadata);

for (int i = 0; i < dimensionsOrder.size(); i++) {
assertEquals(
writeState.fieldInfos.fieldInfo(dimensionsOrder.get(i).getField()).getFieldNumber(),
starTreeMetadata.getDimensionFieldNumbers().get(i),
0
);
}

for (int i = 0; i < metricAggregatorInfos.size(); i++) {
MetricEntry metricEntry = starTreeMetadata.getMetricEntries().get(i);
assertEquals(metricAggregatorInfos.get(i).getField(), metricEntry.getMetricName());
assertEquals(metricAggregatorInfos.get(i).getMetricStat(), metricEntry.getMetricStat());
}
assertEquals(segmentDocumentCount, starTreeMetadata.getSegmentAggregatedDocCount(), 0);
assertEquals(maxLeafDocs, starTreeMetadata.getMaxLeafDocs(), 0);
assertEquals(
starTreeFieldConfiguration.getSkipStarNodeCreationInDims().size(),
starTreeMetadata.getSkipStarNodeCreationInDims().size()
);
for (String skipStarNodeCreationInDims : starTreeField.getStarTreeConfig().getSkipStarNodeCreationInDims()) {
Integer skipStarNodeCreationInDimsFieldNumber = writeState.fieldInfos.fieldInfo(skipStarNodeCreationInDims).getFieldNumber();
assertTrue(starTreeMetadata.getSkipStarNodeCreationInDims().contains(skipStarNodeCreationInDimsFieldNumber));
}
assertEquals(starTreeFieldConfiguration.getBuildMode(), starTreeMetadata.getStarTreeBuildMode());
assertEquals(dataFileLength, starTreeMetadata.getDataLength());
assertEquals(dataFilePointer, starTreeMetadata.getDataStartFilePointer());

metaIn.close();

}

@Override
public void tearDown() throws Exception {
super.tearDown();
metaOut.close();
metaIn.close();
directory.close();
}

}
Loading

0 comments on commit 8946d85

Please sign in to comment.