Skip to content

Commit

Permalink
feat: output multiple partition in MergeScanExec (#4227)
Browse files Browse the repository at this point in the history
* feat: output multiple partition in MergeScanExec

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix range manipulate

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
  • Loading branch information
waynexia authored Jun 28, 2024
1 parent ef935a1 commit a7aa556
Show file tree
Hide file tree
Showing 14 changed files with 77 additions and 37 deletions.
8 changes: 7 additions & 1 deletion src/promql/src/extension_plan/range_manipulate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,12 @@ impl ExecutionPlan for RangeManipulateExec {
children: Vec<Arc<dyn ExecutionPlan>>,
) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
assert!(!children.is_empty());
let exec_input = children[0].clone();
let properties = PlanProperties::new(
EquivalenceProperties::new(self.output_schema.clone()),
exec_input.properties().partitioning.clone(),
exec_input.properties().execution_mode,
);
Ok(Arc::new(Self {
start: self.start,
end: self.end,
Expand All @@ -312,7 +318,7 @@ impl ExecutionPlan for RangeManipulateExec {
output_schema: self.output_schema.clone(),
input: children[0].clone(),
metric: self.metric.clone(),
properties: self.properties.clone(),
properties,
}))
}

Expand Down
23 changes: 18 additions & 5 deletions src/query/src/dist_plan/merge_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ pub struct MergeScanExec {
/// Metrics from sub stages
sub_stage_metrics: Arc<Mutex<Vec<RecordBatchMetrics>>>,
query_ctx: QueryContextRef,
target_partition: usize,
}

impl std::fmt::Debug for MergeScanExec {
Expand All @@ -154,11 +155,12 @@ impl MergeScanExec {
arrow_schema: &ArrowSchema,
region_query_handler: RegionQueryHandlerRef,
query_ctx: QueryContextRef,
target_partition: usize,
) -> Result<Self> {
let arrow_schema_without_metadata = Self::arrow_schema_without_metadata(arrow_schema);
let properties = PlanProperties::new(
EquivalenceProperties::new(arrow_schema_without_metadata.clone()),
Partitioning::UnknownPartitioning(1),
Partitioning::UnknownPartitioning(target_partition),
ExecutionMode::Bounded,
);
let schema_without_metadata =
Expand All @@ -174,10 +176,15 @@ impl MergeScanExec {
sub_stage_metrics: Arc::default(),
properties,
query_ctx,
target_partition,
})
}

pub fn to_stream(&self, context: Arc<TaskContext>) -> Result<SendableRecordBatchStream> {
pub fn to_stream(
&self,
context: Arc<TaskContext>,
partition: usize,
) -> Result<SendableRecordBatchStream> {
let regions = self.regions.clone();
let region_query_handler = self.region_query_handler.clone();
let metric = MergeScanMetric::new(&self.metric);
Expand All @@ -189,6 +196,7 @@ impl MergeScanExec {
let current_schema = self.query_ctx.current_schema().to_string();
let timezone = self.query_ctx.timezone().to_string();
let extensions = self.query_ctx.extensions();
let target_partition = self.target_partition;

let sub_sgate_metrics_moved = self.sub_stage_metrics.clone();
let plan = self.plan.clone();
Expand All @@ -198,7 +206,12 @@ impl MergeScanExec {
let mut ready_timer = metric.ready_time().timer();
let mut first_consume_timer = Some(metric.first_consume_time().timer());

for region_id in regions {
for region_id in regions
.iter()
.skip(partition)
.step_by(target_partition)
.copied()
{
let request = QueryRequest {
header: Some(RegionRequestHeader {
tracing_context: tracing_context.to_w3c(),
Expand Down Expand Up @@ -325,11 +338,11 @@ impl ExecutionPlan for MergeScanExec {

fn execute(
&self,
_partition: usize,
partition: usize,
context: Arc<TaskContext>,
) -> Result<DfSendableRecordBatchStream> {
Ok(Box::pin(DfRecordBatchStreamAdapter::new(
self.to_stream(context)?,
self.to_stream(context, partition)?,
)))
}

Expand Down
1 change: 1 addition & 0 deletions src/query/src/dist_plan/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ impl ExtensionPlanner for DistExtensionPlanner {
&schema,
self.region_query_handler.clone(),
query_ctx,
session_state.config().target_partitions(),
)?;
Ok(Some(Arc::new(merge_scan_plan) as _))
}
Expand Down
2 changes: 0 additions & 2 deletions tests/cases/distributed/explain/join_10_tables.result
Original file line number Diff line number Diff line change
Expand Up @@ -121,13 +121,11 @@ limit 1;
|_|_RepartitionExec: partitioning=REDACTED
|_|_CoalesceBatchesExec: target_batch_size=8192_|
|_|_FilterExec: vin@1 IS NOT NULL_|
|_|_RepartitionExec: partitioning=REDACTED
|_|_MergeScanExec: REDACTED
|_|_CoalesceBatchesExec: target_batch_size=8192_|
|_|_RepartitionExec: partitioning=REDACTED
|_|_CoalesceBatchesExec: target_batch_size=8192_|
|_|_FilterExec: vin@1 IS NOT NULL_|
|_|_RepartitionExec: partitioning=REDACTED
|_|_MergeScanExec: REDACTED
|_|_CoalesceBatchesExec: target_batch_size=8192_|
|_|_RepartitionExec: partitioning=REDACTED
Expand Down
3 changes: 2 additions & 1 deletion tests/cases/distributed/explain/multi_partitions.result
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ explain SELECT * FROM demo WHERE ts > cast(1000000000 as timestamp) ORDER BY hos
+-+-+
| logical_plan_| Sort: demo.host ASC NULLS LAST_|
|_|_MergeScan [is_placeholder=false]_|
| physical_plan | SortExec: expr=[host@0 ASC NULLS LAST], preserve_partitioning=[false]_|
| physical_plan | SortPreservingMergeExec: [host@0 ASC NULLS LAST]_|
|_|_SortExec: expr=[host@0 ASC NULLS LAST], preserve_partitioning=[true]_|
|_|_MergeScanExec: REDACTED
|_|_|
+-+-+
Expand Down
2 changes: 0 additions & 2 deletions tests/cases/distributed/explain/order_by.result
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ EXPLAIN SELECT DISTINCT i%2 FROM integers ORDER BY 1;
|_|_CoalesceBatchesExec: target_batch_size=8192_|
|_|_RepartitionExec: partitioning=REDACTED
|_|_AggregateExec: mode=Partial, gby=[integers.i % Int64(2)@0 as integers.i % Int64(2)], aggr=[]_|
|_|_RepartitionExec: partitioning=REDACTED
|_|_MergeScanExec: REDACTED
|_|_|
+-+-+
Expand Down Expand Up @@ -68,7 +67,6 @@ EXPLAIN SELECT DISTINCT a, b FROM test ORDER BY a, b;
|_|_CoalesceBatchesExec: target_batch_size=8192_|
|_|_RepartitionExec: partitioning=REDACTED
|_|_AggregateExec: mode=Partial, gby=[a@0 as a, b@1 as b], aggr=[]_|
|_|_RepartitionExec: partitioning=REDACTED
|_|_MergeScanExec: REDACTED
|_|_|
+-+-+
Expand Down
2 changes: 0 additions & 2 deletions tests/cases/distributed/explain/subqueries.result
Original file line number Diff line number Diff line change
Expand Up @@ -94,10 +94,8 @@ order by t.i desc;
|_|_CoalescePartitionsExec_|
|_|_CoalesceBatchesExec: target_batch_size=8192_|
|_|_FilterExec: i@0 IS NOT NULL_|
|_|_RepartitionExec: partitioning=REDACTED
|_|_ProjectionExec: expr=[i@0 as i]_|
|_|_MergeScanExec: REDACTED
|_|_RepartitionExec: partitioning=REDACTED
|_|_ProjectionExec: expr=[]_|
|_|_MergeScanExec: REDACTED
|_|_|
Expand Down
2 changes: 1 addition & 1 deletion tests/cases/distributed/optimizer/filter_push_down.result
Original file line number Diff line number Diff line change
Expand Up @@ -259,9 +259,9 @@ EXPLAIN SELECT * FROM (SELECT 0=1 AS cond FROM integers i1, integers i2) a1 WHER
| physical_plan | CoalescePartitionsExec_|
|_|_ProjectionExec: expr=[false as cond]_|
|_|_CrossJoinExec_|
|_|_CoalescePartitionsExec_|
|_|_ProjectionExec: expr=[]_|
|_|_MergeScanExec: REDACTED
|_|_RepartitionExec: partitioning=REDACTED
|_|_ProjectionExec: expr=[]_|
|_|_MergeScanExec: REDACTED
|_|_|
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,11 +81,9 @@ EXPLAIN SELECT a % 2, b FROM test UNION SELECT a % 2 AS k, b FROM test ORDER BY
| | AggregateExec: mode=Partial, gby=[test.a % Int64(2)@0 as test.a % Int64(2), b@1 as b], aggr=[] |
| | UnionExec |
| | ProjectionExec: expr=[CAST(a@0 AS Int64) % 2 as test.a % Int64(2), b@1 as b] |
| | RepartitionExec: REDACTED
| | MergeScanExec: REDACTED
| | MergeScanExec: REDACTED
| | ProjectionExec: expr=[CAST(a@0 AS Int64) % 2 as test.a % Int64(2), b@1 as b] |
| | RepartitionExec: REDACTED
| | MergeScanExec: REDACTED
| | MergeScanExec: REDACTED
| | |
+---------------+-----------------------------------------------------------------------------------------------------------+

Expand Down
12 changes: 8 additions & 4 deletions tests/cases/standalone/common/partition.result
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,16 @@ INSERT INTO my_table VALUES

Affected Rows: 8

-- SQLNESS SORT_RESULT 3 1
SELECT * FROM my_table;

+------+---+-------------------------+
| a | b | ts |
+------+---+-------------------------+
| 100 | a | 1970-01-01T00:00:00.001 |
| 200 | b | 1970-01-01T00:00:00.002 |
| 1100 | c | 1970-01-01T00:00:00.003 |
| 1200 | d | 1970-01-01T00:00:00.004 |
| 200 | b | 1970-01-01T00:00:00.002 |
| 2000 | e | 1970-01-01T00:00:00.005 |
| 2100 | f | 1970-01-01T00:00:00.006 |
| 2200 | g | 1970-01-01T00:00:00.007 |
Expand All @@ -65,14 +66,15 @@ DELETE FROM my_table WHERE a < 150;

Affected Rows: 1

-- SQLNESS SORT_RESULT 3 1
SELECT * FROM my_table;

+------+---+-------------------------+
| a | b | ts |
+------+---+-------------------------+
| 200 | b | 1970-01-01T00:00:00.002 |
| 1100 | c | 1970-01-01T00:00:00.003 |
| 1200 | d | 1970-01-01T00:00:00.004 |
| 200 | b | 1970-01-01T00:00:00.002 |
| 2000 | e | 1970-01-01T00:00:00.005 |
| 2100 | f | 1970-01-01T00:00:00.006 |
| 2200 | g | 1970-01-01T00:00:00.007 |
Expand All @@ -83,14 +85,15 @@ DELETE FROM my_table WHERE a < 2200 AND a > 1500;

Affected Rows: 2

-- SQLNESS SORT_RESULT 3 1
SELECT * FROM my_table;

+------+---+-------------------------+
| a | b | ts |
+------+---+-------------------------+
| 200 | b | 1970-01-01T00:00:00.002 |
| 1100 | c | 1970-01-01T00:00:00.003 |
| 1200 | d | 1970-01-01T00:00:00.004 |
| 200 | b | 1970-01-01T00:00:00.002 |
| 2200 | g | 1970-01-01T00:00:00.007 |
| 2400 | h | 1970-01-01T00:00:00.008 |
+------+---+-------------------------+
Expand Down Expand Up @@ -148,15 +151,16 @@ INSERT INTO my_table VALUES

Affected Rows: 8

-- SQLNESS SORT_RESULT 3 1
SELECT * FROM my_table;

+------+---+-------------------------+
| a | b | ts |
+------+---+-------------------------+
| 100 | a | 1970-01-01T00:00:00.001 |
| 200 | b | 1970-01-01T00:00:00.002 |
| 1100 | c | 1970-01-01T00:00:00.003 |
| 1200 | d | 1970-01-01T00:00:00.004 |
| 200 | b | 1970-01-01T00:00:00.002 |
| 2000 | e | 1970-01-01T00:00:00.005 |
| 2100 | f | 1970-01-01T00:00:00.006 |
| 2200 | g | 1970-01-01T00:00:00.007 |
Expand Down
4 changes: 4 additions & 0 deletions tests/cases/standalone/common/partition.sql
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,17 @@ INSERT INTO my_table VALUES
(2200, 'g', 7),
(2400, 'h', 8);

-- SQLNESS SORT_RESULT 3 1
SELECT * FROM my_table;

DELETE FROM my_table WHERE a < 150;

-- SQLNESS SORT_RESULT 3 1
SELECT * FROM my_table;

DELETE FROM my_table WHERE a < 2200 AND a > 1500;

-- SQLNESS SORT_RESULT 3 1
SELECT * FROM my_table;

DELETE FROM my_table WHERE a < 2500;
Expand Down Expand Up @@ -66,6 +69,7 @@ INSERT INTO my_table VALUES
(2200, 'g', 7),
(2400, 'h', 8);

-- SQLNESS SORT_RESULT 3 1
SELECT * FROM my_table;

DROP TABLE my_table;
Expand Down
2 changes: 2 additions & 0 deletions tests/cases/standalone/common/range/nest.result
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ EXPLAIN SELECT ts, host, min(val) RANGE '5s' FROM host ALIGN '5s';
| logical_plan_| RangeSelect: range_exprs=[MIN(host.val) RANGE 5s], align=5000ms, align_to=0ms, align_by=[host.host], time_index=ts |
|_|_MergeScan [is_placeholder=false]_|
| physical_plan | RangeSelectExec: range_expr=[MIN(host.val) RANGE 5s], align=5000ms, align_to=0ms, align_by=[host@1], time_index=ts |
|_|_CoalescePartitionsExec_|
|_|_MergeScanExec: REDACTED
|_|_|
+-+-+
Expand All @@ -72,6 +73,7 @@ EXPLAIN ANALYZE SELECT ts, host, min(val) RANGE '5s' FROM host ALIGN '5s';
| stage | node | plan_|
+-+-+-+
| 0_| 0_|_RangeSelectExec: range_expr=[MIN(host.val) RANGE 5s], align=5000ms, align_to=0ms, align_by=[host@1], time_index=ts REDACTED
|_|_|_CoalescePartitionsExec REDACTED
|_|_|_MergeScanExec: REDACTED
|_|_|_|
| 1_| 0_|_SeqScan: partition_count=1 (1 memtable ranges, 0 file ranges) REDACTED
Expand Down
12 changes: 8 additions & 4 deletions tests/cases/standalone/common/tql-explain-analyze/analyze.result
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ TQL ANALYZE (0, 10, '5s') test;
| 0_| 0_|_PromInstantManipulateExec: range=[0..10000], lookback=[300000], interval=[5000], time index=[j] REDACTED
|_|_|_PromSeriesNormalizeExec: offset=[0], time index=[j], filter NaN: [false] REDACTED
|_|_|_PromSeriesDivideExec: tags=["k"] REDACTED
|_|_|_SortExec: expr=[k@2 ASC NULLS LAST], preserve_partitioning=[false] REDACTED
|_|_|_SortPreservingMergeExec: [k@2 ASC NULLS LAST] REDACTED
|_|_|_SortExec: expr=[k@2 ASC NULLS LAST], preserve_partitioning=[true] REDACTED
|_|_|_MergeScanExec: REDACTED
|_|_|_|
| 1_| 0_|_SortPreservingMergeExec: [k@2 DESC NULLS LAST,j@1 DESC NULLS LAST] REDACTED
Expand Down Expand Up @@ -50,7 +51,8 @@ TQL ANALYZE (0, 10, '1s', '2s') test;
| 0_| 0_|_PromInstantManipulateExec: range=[0..10000], lookback=[2000], interval=[1000], time index=[j] REDACTED
|_|_|_PromSeriesNormalizeExec: offset=[0], time index=[j], filter NaN: [false] REDACTED
|_|_|_PromSeriesDivideExec: tags=["k"] REDACTED
|_|_|_SortExec: expr=[k@2 ASC NULLS LAST], preserve_partitioning=[false] REDACTED
|_|_|_SortPreservingMergeExec: [k@2 ASC NULLS LAST] REDACTED
|_|_|_SortExec: expr=[k@2 ASC NULLS LAST], preserve_partitioning=[true] REDACTED
|_|_|_MergeScanExec: REDACTED
|_|_|_|
| 1_| 0_|_SortPreservingMergeExec: [k@2 DESC NULLS LAST,j@1 DESC NULLS LAST] REDACTED
Expand Down Expand Up @@ -78,7 +80,8 @@ TQL ANALYZE ('1970-01-01T00:00:00'::timestamp, '1970-01-01T00:00:00'::timestamp
| 0_| 0_|_PromInstantManipulateExec: range=[0..10000], lookback=[300000], interval=[5000], time index=[j] REDACTED
|_|_|_PromSeriesNormalizeExec: offset=[0], time index=[j], filter NaN: [false] REDACTED
|_|_|_PromSeriesDivideExec: tags=["k"] REDACTED
|_|_|_SortExec: expr=[k@2 ASC NULLS LAST], preserve_partitioning=[false] REDACTED
|_|_|_SortPreservingMergeExec: [k@2 ASC NULLS LAST] REDACTED
|_|_|_SortExec: expr=[k@2 ASC NULLS LAST], preserve_partitioning=[true] REDACTED
|_|_|_MergeScanExec: REDACTED
|_|_|_|
| 1_| 0_|_SortPreservingMergeExec: [k@2 DESC NULLS LAST,j@1 DESC NULLS LAST] REDACTED
Expand Down Expand Up @@ -108,7 +111,8 @@ TQL ANALYZE VERBOSE (0, 10, '5s') test;
| 0_| 0_|_PromInstantManipulateExec: range=[0..10000], lookback=[300000], interval=[5000], time index=[j] REDACTED
|_|_|_PromSeriesNormalizeExec: offset=[0], time index=[j], filter NaN: [false] REDACTED
|_|_|_PromSeriesDivideExec: tags=["k"] REDACTED
|_|_|_SortExec: expr=[k@2 ASC NULLS LAST], preserve_partitioning=[false] REDACTED
|_|_|_SortPreservingMergeExec: [k@2 ASC NULLS LAST] REDACTED
|_|_|_SortExec: expr=[k@2 ASC NULLS LAST], preserve_partitioning=[true] REDACTED
|_|_|_MergeScanExec: REDACTED
|_|_|_|
| 1_| 0_|_SortPreservingMergeExec: [k@2 DESC NULLS LAST,j@1 DESC NULLS LAST] REDACTED
Expand Down
Loading

0 comments on commit a7aa556

Please sign in to comment.