Skip to content

Commit

Permalink
doc values file format
Browse files Browse the repository at this point in the history
Signed-off-by: Sarthak Aggarwal <sarthagg@amazon.com>
  • Loading branch information
sarthakaggarwal97 committed Jul 10, 2024
1 parent 99f008d commit 781a2d4
Show file tree
Hide file tree
Showing 16 changed files with 392 additions and 130 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.apache.lucene.codecs.lucene90;

import org.apache.lucene.codecs.DocValuesConsumer;
import org.apache.lucene.codecs.DocValuesProducer;
import org.apache.lucene.index.FieldInfo;
import org.apache.lucene.index.SegmentWriteState;

import java.io.IOException;

/**
* This class is an abstraction of the {@link DocValuesConsumer} for the Star Tree index structure.
* It is responsible to consume various types of document values (numeric, binary, sorted, sorted numeric,
* and sorted set) for fields in the Star Tree index.
*
* @opensearch.experimental
*/
public class Composite99DocValuesConsumer extends DocValuesConsumer {

Lucene90DocValuesConsumer lucene90DocValuesConsumer;

public Composite99DocValuesConsumer(
SegmentWriteState state,
String dataCodec,
String dataExtension,
String metaCodec,
String metaExtension
) throws IOException {
lucene90DocValuesConsumer = new Lucene90DocValuesConsumer(state, dataCodec, dataExtension, metaCodec, metaExtension);
}

@Override
public void close() throws IOException {
lucene90DocValuesConsumer.close();
}

@Override
public void addNumericField(FieldInfo fieldInfo, DocValuesProducer docValuesProducer) throws IOException {
lucene90DocValuesConsumer.addNumericField(fieldInfo, docValuesProducer);
}

@Override
public void addBinaryField(FieldInfo fieldInfo, DocValuesProducer docValuesProducer) throws IOException {
lucene90DocValuesConsumer.addNumericField(fieldInfo, docValuesProducer);
}

@Override
public void addSortedField(FieldInfo fieldInfo, DocValuesProducer docValuesProducer) throws IOException {
lucene90DocValuesConsumer.addSortedField(fieldInfo, docValuesProducer);
}

@Override
public void addSortedNumericField(FieldInfo fieldInfo, DocValuesProducer docValuesProducer) throws IOException {
lucene90DocValuesConsumer.addSortedNumericField(fieldInfo, docValuesProducer);
}

@Override
public void addSortedSetField(FieldInfo fieldInfo, DocValuesProducer docValuesProducer) throws IOException {
lucene90DocValuesConsumer.addSortedSetField(fieldInfo, docValuesProducer);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,15 @@
import org.apache.lucene.index.SortedSetDocValues;
import org.apache.lucene.index.VectorEncoding;
import org.apache.lucene.index.VectorSimilarityFunction;
import org.opensearch.index.compositeindex.datacube.startree.aggregators.MetricAggregatorInfo;
import org.opensearch.index.compositeindex.datacube.startree.aggregators.MetricEntry;
import org.opensearch.index.compositeindex.datacube.startree.utils.StarTreeConstants;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

import static org.opensearch.index.compositeindex.datacube.startree.utils.StarTreeHelper.fullFieldNameForStarTreeDimensionsDocValues;
import static org.opensearch.index.compositeindex.datacube.startree.utils.StarTreeHelper.fullFieldNameForStarTreeMetricsDocValues;

/**
* This class is a custom abstraction of the {@link DocValuesProducer} for the Star Tree index structure.
* It is responsible for providing access to various types of document values (numeric, binary, sorted, sorted numeric,
Expand All @@ -41,7 +41,7 @@ public class StarTree99DocValuesProducer extends DocValuesProducer {

Lucene90DocValuesProducer lucene90DocValuesProducer;
private final List<FieldInfo> dimensions;
private final List<String> metrics;
private final List<MetricEntry> metrics;
private final FieldInfos fieldInfos;

public StarTree99DocValuesProducer(
Expand All @@ -55,15 +55,10 @@ public StarTree99DocValuesProducer(
String compositeFieldName
) throws IOException {
this.dimensions = dimensions;
this.metrics = new ArrayList<>();
for (MetricEntry metricEntry : metricEntries) {
this.metrics.add(
MetricAggregatorInfo.toFieldName(compositeFieldName, metricEntry.getMetricName(), metricEntry.getMetricStat().getTypeName())
);
}
this.metrics = metricEntries;

// populates the dummy list of field infos to fetch doc id set iterators for respective fields.
this.fieldInfos = new FieldInfos(getFieldInfoList());
this.fieldInfos = new FieldInfos(getFieldInfoList(compositeFieldName));
SegmentReadState segmentReadState = new SegmentReadState(state.directory, state.segmentInfo, fieldInfos, state.context);
lucene90DocValuesProducer = new Lucene90DocValuesProducer(segmentReadState, dataCodec, dataExtension, metaCodec, metaExtension);
}
Expand Down Expand Up @@ -108,35 +103,36 @@ public void close() throws IOException {
this.lucene90DocValuesProducer.close();
}

private FieldInfo[] getFieldInfoList() {
private FieldInfo[] getFieldInfoList(String compositeFieldName) {
FieldInfo[] fieldInfoList = new FieldInfo[this.dimensions.size() + metrics.size()];
// field number is not really used. We depend on unique field names to get the desired iterator
int fieldNumber = 0;

for (FieldInfo dimension : this.dimensions) {
fieldInfoList[fieldNumber] = new FieldInfo(
dimension.getName() + StarTreeConstants.DIMENSION_SUFFIX,
fullFieldNameForStarTreeDimensionsDocValues(compositeFieldName, dimension.getName()),
fieldNumber,
false,
dimension.omitsNorms(),
dimension.hasPayloads(),
dimension.getIndexOptions(),
dimension.getDocValuesType(),
false,
true,
IndexOptions.DOCS_AND_FREQS_AND_POSITIONS_AND_OFFSETS,
DocValuesType.SORTED_NUMERIC,
-1,
dimension.attributes(),
dimension.getPointDimensionCount(),
dimension.getPointIndexDimensionCount(),
dimension.getPointNumBytes(),
dimension.getVectorDimension(),
dimension.getVectorEncoding(),
dimension.getVectorSimilarityFunction(),
Collections.emptyMap(),
0,
0,
0,
0,
VectorEncoding.FLOAT32,
VectorSimilarityFunction.EUCLIDEAN,
false,
dimension.isParentField()
false
);
fieldNumber++;
}
for (String metric : metrics) {
for (MetricEntry metric : metrics) {
fieldInfoList[fieldNumber] = new FieldInfo(
metric + StarTreeConstants.METRIC_SUFFIX,
fullFieldNameForStarTreeMetricsDocValues(compositeFieldName, metric.getMetricName(), metric.getMetricStat().getTypeName()),
fieldNumber,
false,
false,
Expand Down
148 changes: 134 additions & 14 deletions server/src/main/java/org/apache/lucene/index/BaseStarTreeBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,11 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.codecs.DocValuesConsumer;
import org.apache.lucene.codecs.DocValuesProducer;
import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.util.Counter;
import org.apache.lucene.util.NumericUtils;
import org.opensearch.index.compositeindex.datacube.Dimension;
import org.opensearch.index.compositeindex.datacube.Metric;
import org.opensearch.index.compositeindex.datacube.MetricStat;
Expand All @@ -32,12 +35,17 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;

import static org.opensearch.index.compositeindex.datacube.startree.utils.StarTreeHelper.fullFieldNameForStarTreeDimensionsDocValues;
import static org.opensearch.index.compositeindex.datacube.startree.utils.StarTreeHelper.fullFieldNameForStarTreeMetricsDocValues;

/**
* Builder for star tree. Defines the algorithm to construct star-tree
Expand Down Expand Up @@ -160,15 +168,9 @@ public List<SequentialDocValuesIterator> getMetricReaders(SegmentWriteState stat
for (MetricStat metricType : metric.getMetrics()) {
SequentialDocValuesIterator metricReader;
FieldInfo metricFieldInfo = state.fieldInfos.fieldInfo(metric.getField());
if (metricType != MetricStat.COUNT) {
// Need not initialize the metric reader with relevant doc id set iterator for COUNT metric type
metricReader = new SequentialDocValuesIterator(
fieldProducerMap.get(metricFieldInfo.name).getSortedNumeric(metricFieldInfo)
);
} else {
metricReader = new SequentialDocValuesIterator();
}

metricReaders.add(metricReader);
}
}
Expand All @@ -178,10 +180,16 @@ public List<SequentialDocValuesIterator> getMetricReaders(SegmentWriteState stat
/**
* Builds the star tree from the original segment documents
*
* @param fieldProducerMap contain s the docValues producer to get docValues associated with each field
* @param fieldProducerMap contain s the docValues producer to get docValues associated with each field
* @param fieldNumberAcrossStarTrees
* @param starTreeDocValuesConsumer
* @throws IOException when we are unable to build star-tree
*/
public void build(Map<String, DocValuesProducer> fieldProducerMap) throws IOException {
public void build(
Map<String, DocValuesProducer> fieldProducerMap,
AtomicInteger fieldNumberAcrossStarTrees,
DocValuesConsumer starTreeDocValuesConsumer
) throws IOException {
long startTime = System.currentTimeMillis();
logger.debug("Star-tree build is a go with star tree field {}", starTreeField.getName());

Expand All @@ -201,17 +209,23 @@ public void build(Map<String, DocValuesProducer> fieldProducerMap) throws IOExce
metricReaders
);
logger.debug("Sorting and aggregating star-tree in ms : {}", (System.currentTimeMillis() - startTime));
build(starTreeDocumentIterator);
build(starTreeDocumentIterator, fieldNumberAcrossStarTrees, starTreeDocValuesConsumer);
logger.debug("Finished Building star-tree in ms : {}", (System.currentTimeMillis() - startTime));
}

/**
* Builds the star tree using sorted and aggregated star-tree Documents
*
* @param starTreeDocumentIterator contains the sorted and aggregated documents
* @param starTreeDocumentIterator contains the sorted and aggregated documents
* @param fieldNumberAcrossStarTrees
* @param starTreeDocValuesConsumer
* @throws IOException when we are unable to build star-tree
*/
public void build(Iterator<StarTreeDocument> starTreeDocumentIterator) throws IOException {
public void build(
Iterator<StarTreeDocument> starTreeDocumentIterator,
AtomicInteger fieldNumberAcrossStarTrees,
DocValuesConsumer starTreeDocValuesConsumer
) throws IOException {
int numSegmentStarTreeDocument = totalSegmentDocs;

while (starTreeDocumentIterator.hasNext()) {
Expand All @@ -238,12 +252,11 @@ public void build(Iterator<StarTreeDocument> starTreeDocumentIterator) throws IO
int numAggregatedStarTreeDocument = numStarTreeDocs - numStarTreeDocument - numStarTreeDocumentUnderStarNode;
logger.debug("Finished creating aggregated documents : {}", numAggregatedStarTreeDocument);

// TODO: When StarTree Codec is ready
// Create doc values indices in disk
createSortedDocValuesIndices(starTreeDocValuesConsumer, fieldNumberAcrossStarTrees);

// serialize star-tree
serializeStarTree(numSegmentStarTreeDocument);

// Write star tree metadata for off heap implementation
}

private void serializeStarTree(int numSegmentStarTreeDocument) throws IOException {
Expand All @@ -263,6 +276,113 @@ private void serializeStarTree(int numSegmentStarTreeDocument) throws IOExceptio
);
}

private void createSortedDocValuesIndices(DocValuesConsumer docValuesConsumer, AtomicInteger fieldNumberAcrossStarTrees)
throws IOException {
List<SortedNumericDocValuesWriter> dimensionWriters = new ArrayList<>();
List<SortedNumericDocValuesWriter> metricWriters = new ArrayList<>();
FieldInfo[] dimensionFieldInfoList = new FieldInfo[starTreeField.getDimensionsOrder().size()];
FieldInfo[] metricFieldInfoList = new FieldInfo[metricAggregatorInfos.size()];

for (int i = 0; i < dimensionFieldInfoList.length; i++) {
final FieldInfo fi = new FieldInfo(
fullFieldNameForStarTreeDimensionsDocValues(starTreeField.getName(), starTreeField.getDimensionsOrder().get(i).getField()),
fieldNumberAcrossStarTrees.getAndIncrement(),
false,
false,
true,
IndexOptions.DOCS_AND_FREQS_AND_POSITIONS_AND_OFFSETS,
DocValuesType.SORTED_NUMERIC,
-1,
Collections.emptyMap(),
0,
0,
0,
0,
VectorEncoding.FLOAT32,
VectorSimilarityFunction.EUCLIDEAN,
false,
false
);
dimensionFieldInfoList[i] = fi;
dimensionWriters.add(new SortedNumericDocValuesWriter(fi, Counter.newCounter()));
}
for (int i = 0; i < metricAggregatorInfos.size(); i++) {
FieldInfo fi = new FieldInfo(
fullFieldNameForStarTreeMetricsDocValues(
starTreeField.getName(),
metricAggregatorInfos.get(i).getField(),
metricAggregatorInfos.get(i).getMetricStat().getTypeName()
),
fieldNumberAcrossStarTrees.getAndIncrement(),
false,
false,
true,
IndexOptions.DOCS_AND_FREQS_AND_POSITIONS_AND_OFFSETS,
DocValuesType.SORTED_NUMERIC,
-1,
Collections.emptyMap(),
0,
0,
0,
0,
VectorEncoding.FLOAT32,
VectorSimilarityFunction.EUCLIDEAN,
false,
false
);
metricFieldInfoList[i] = fi;
metricWriters.add(new SortedNumericDocValuesWriter(fi, Counter.newCounter()));
}

for (int docId = 0; docId < numStarTreeDocs; docId++) {
StarTreeDocument starTreeDocument = getStarTreeDocument(docId);
for (int i = 0; i < starTreeDocument.dimensions.length; i++) {
Long val = starTreeDocument.dimensions[i];
if (val != null) {
dimensionWriters.get(i).addValue(docId, val);
}
}

for (int i = 0; i < starTreeDocument.metrics.length; i++) {
try {
switch (metricAggregatorInfos.get(i).getValueAggregators().getAggregatedValueType()) {
case LONG:
metricWriters.get(i).addValue(docId, (Long) starTreeDocument.metrics[i]);
break;
case DOUBLE:
metricWriters.get(i).addValue(docId, NumericUtils.doubleToSortableLong((Double) starTreeDocument.metrics[i]));
break;
default:
throw new IllegalStateException("Unknown metric doc value type");
}
} catch (IllegalArgumentException e) {
logger.info("could not parse the value, exiting creation of star tree");
}
}
}

addStarTreeDocValueFields(docValuesConsumer, dimensionWriters, dimensionFieldInfoList, starTreeField.getDimensionsOrder().size());
addStarTreeDocValueFields(docValuesConsumer, metricWriters, metricFieldInfoList, metricAggregatorInfos.size());
}

private void addStarTreeDocValueFields(
DocValuesConsumer docValuesConsumer,
List<SortedNumericDocValuesWriter> docValuesWriters,
FieldInfo[] fieldInfoList,
int fieldCount
) throws IOException {
for (int i = 0; i < fieldCount; i++) {
final int increment = i;
DocValuesProducer docValuesProducer = new EmptyDocValuesProducer() {
@Override
public SortedNumericDocValues getSortedNumeric(FieldInfo field) {
return docValuesWriters.get(increment).getDocValues();
}
};
docValuesConsumer.addSortedNumericField(fieldInfoList[i], docValuesProducer);
}
}

/**
* Adds a document to the star-tree.
*
Expand Down
Loading

0 comments on commit 781a2d4

Please sign in to comment.