Skip to content

Commit

Permalink
Part-2: Add Combine and Segment Level Operators for Time Series (#13999)
Browse files Browse the repository at this point in the history
  • Loading branch information
ankitsultana authored Sep 19, 2024
1 parent 4a7a1cd commit 9c44ef7
Show file tree
Hide file tree
Showing 21 changed files with 977 additions and 15 deletions.
4 changes: 4 additions & 0 deletions pinot-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,10 @@
<groupId>org.apache.pinot</groupId>
<artifactId>pinot-segment-spi</artifactId>
</dependency>
<dependency>
<groupId>org.apache.pinot</groupId>
<artifactId>pinot-timeseries-spi</artifactId>
</dependency>

<dependency>
<groupId>org.apache.httpcomponents.client5</groupId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.common.request.context;

import java.util.concurrent.TimeUnit;
import org.apache.pinot.tsdb.spi.AggInfo;
import org.apache.pinot.tsdb.spi.TimeBuckets;


public class TimeSeriesContext {
private final String _engine;
private final String _timeColumn;
private final TimeUnit _timeUnit;
private final TimeBuckets _timeBuckets;
private final Long _offsetSeconds;
private final ExpressionContext _valueExpression;
private final AggInfo _aggInfo;

public TimeSeriesContext(String engine, String timeColumn, TimeUnit timeUnit, TimeBuckets timeBuckets,
Long offsetSeconds, ExpressionContext valueExpression, AggInfo aggInfo) {
_engine = engine;
_timeColumn = timeColumn;
_timeUnit = timeUnit;
_timeBuckets = timeBuckets;
_offsetSeconds = offsetSeconds;
_valueExpression = valueExpression;
_aggInfo = aggInfo;
}

public String getEngine() {
return _engine;
}

public String getTimeColumn() {
return _timeColumn;
}

public TimeUnit getTimeUnit() {
return _timeUnit;
}

public TimeBuckets getTimeBuckets() {
return _timeBuckets;
}

public Long getOffsetSeconds() {
return _offsetSeconds;
}

public ExpressionContext getValueExpression() {
return _valueExpression;
}

public AggInfo getAggInfo() {
return _aggInfo;
}
}
4 changes: 4 additions & 0 deletions pinot-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,10 @@
<groupId>org.apache.lucene</groupId>
<artifactId>lucene-analysis-common</artifactId>
</dependency>
<dependency>
<groupId>org.apache.pinot</groupId>
<artifactId>pinot-timeseries-spi</artifactId>
</dependency>

<dependency>
<groupId>org.apache.pinot</groupId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.core.operator.blocks.results;

import java.io.IOException;
import java.util.List;
import javax.annotation.Nullable;
import org.apache.pinot.common.datatable.DataTable;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.core.query.request.context.QueryContext;
import org.apache.pinot.tsdb.spi.series.TimeSeriesBlock;


public class TimeSeriesResultsBlock extends BaseResultsBlock {
private final TimeSeriesBlock _seriesBlock;

public TimeSeriesResultsBlock(TimeSeriesBlock seriesBlock) {
_seriesBlock = seriesBlock;
}

@Override
public int getNumRows() {
// TODO: Unused right now.
return 0;
}

@Nullable
@Override
public QueryContext getQueryContext() {
// TODO: Unused right now.
return null;
}

@Nullable
@Override
public DataSchema getDataSchema() {
// TODO: Unused right now.
return null;
}

@Nullable
@Override
public List<Object[]> getRows() {
return null;
}

@Override
public DataTable getDataTable()
throws IOException {
return null;
}

public TimeSeriesBlock getTimeSeriesBlock() {
return _seriesBlock;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.core.operator.combine;

import java.util.List;
import java.util.concurrent.ExecutorService;
import javax.annotation.Nullable;
import org.apache.pinot.core.common.Operator;
import org.apache.pinot.core.operator.blocks.results.TimeSeriesResultsBlock;
import org.apache.pinot.core.operator.combine.merger.ResultsBlockMerger;
import org.apache.pinot.core.query.request.context.QueryContext;


public class TimeSeriesCombineOperator extends BaseSingleBlockCombineOperator<TimeSeriesResultsBlock> {
private static final String EXPLAIN_NAME = "TIME_SERIES_COMBINE";

public TimeSeriesCombineOperator(ResultsBlockMerger<TimeSeriesResultsBlock> resultsBlockMerger,
List<Operator> operators, QueryContext queryContext, ExecutorService executorService) {
super(resultsBlockMerger, operators, queryContext, executorService);
}

@Nullable
@Override
public String toExplainString() {
return EXPLAIN_NAME;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.core.operator.combine.merger;

import com.google.common.collect.ImmutableList;
import java.util.ArrayList;
import java.util.List;
import org.apache.pinot.core.operator.blocks.results.TimeSeriesResultsBlock;
import org.apache.pinot.tsdb.spi.AggInfo;
import org.apache.pinot.tsdb.spi.series.BaseTimeSeriesBuilder;
import org.apache.pinot.tsdb.spi.series.TimeSeries;
import org.apache.pinot.tsdb.spi.series.TimeSeriesBlock;
import org.apache.pinot.tsdb.spi.series.TimeSeriesBuilderFactory;


public class TimeSeriesAggResultsBlockMerger implements ResultsBlockMerger<TimeSeriesResultsBlock> {
private final TimeSeriesBuilderFactory _seriesBuilderFactory;
private final AggInfo _aggInfo;

public TimeSeriesAggResultsBlockMerger(TimeSeriesBuilderFactory seriesBuilderFactory, AggInfo aggInfo) {
_seriesBuilderFactory = seriesBuilderFactory;
_aggInfo = aggInfo;
}

@Override
public void mergeResultsBlocks(TimeSeriesResultsBlock mergedBlock, TimeSeriesResultsBlock blockToMerge) {
TimeSeriesBlock currentTimeSeriesBlock = mergedBlock.getTimeSeriesBlock();
TimeSeriesBlock seriesBlockToMerge = blockToMerge.getTimeSeriesBlock();
for (var entry : seriesBlockToMerge.getSeriesMap().entrySet()) {
long seriesHash = entry.getKey();
List<TimeSeries> currentTimeSeriesList = currentTimeSeriesBlock.getSeriesMap().get(seriesHash);
TimeSeries currentTimeSeries = null;
if (currentTimeSeriesList != null && !currentTimeSeriesList.isEmpty()) {
currentTimeSeries = currentTimeSeriesList.get(0);
}
TimeSeries newTimeSeriesToMerge = entry.getValue().get(0);
if (currentTimeSeries == null) {
List<TimeSeries> newTimeSeriesList = new ArrayList<>();
newTimeSeriesList.add(newTimeSeriesToMerge);
currentTimeSeriesBlock.getSeriesMap().put(seriesHash, newTimeSeriesList);
} else {
BaseTimeSeriesBuilder mergedTimeSeriesBuilder = _seriesBuilderFactory.newTimeSeriesBuilder(
_aggInfo, currentTimeSeries.getId(), currentTimeSeries.getTimeBuckets(), currentTimeSeries.getTagNames(),
currentTimeSeries.getTagValues());
mergedTimeSeriesBuilder.mergeAlignedSeries(newTimeSeriesToMerge);
currentTimeSeriesBlock.getSeriesMap().put(seriesHash, ImmutableList.of(mergedTimeSeriesBuilder.build()));
}
}
}
}
Loading

0 comments on commit 9c44ef7

Please sign in to comment.