Skip to content

Commit

Permalink
fix incorrect context usage
Browse files Browse the repository at this point in the history
  • Loading branch information
leiysky committed May 24, 2022
1 parent 5648b5b commit 492af92
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 27 deletions.
64 changes: 37 additions & 27 deletions query/src/sql/exec/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,38 +161,39 @@ impl PipelineBuilder {

match plan {
RelOperator::PhysicalScan(physical_scan) => {
self.build_physical_scan(physical_scan, pipeline)
self.build_physical_scan(context, physical_scan, pipeline)
}
RelOperator::Project(project) => {
let input_schema =
self.build_pipeline(context, &expression.children()[0], pipeline)?;
self.build_project(project, input_schema, pipeline)
self.build_pipeline(context.clone(), &expression.children()[0], pipeline)?;
self.build_project(context, project, input_schema, pipeline)
}
RelOperator::EvalScalar(eval_scalar) => {
let input_schema =
self.build_pipeline(context, &expression.children()[0], pipeline)?;
self.build_eval_scalar(eval_scalar, input_schema, pipeline)
self.build_pipeline(context.clone(), &expression.children()[0], pipeline)?;
self.build_eval_scalar(context, eval_scalar, input_schema, pipeline)
}
RelOperator::Filter(filter) => {
let input_schema =
self.build_pipeline(context, &expression.children()[0], pipeline)?;
self.build_filter(filter, input_schema, pipeline)
self.build_pipeline(context.clone(), &expression.children()[0], pipeline)?;
self.build_filter(context, filter, input_schema, pipeline)
}
RelOperator::Aggregate(aggregate) => {
let input_schema =
self.build_pipeline(context, &expression.children()[0], pipeline)?;
self.build_aggregate(aggregate, input_schema, pipeline)
self.build_pipeline(context.clone(), &expression.children()[0], pipeline)?;
self.build_aggregate(context, aggregate, input_schema, pipeline)
}
RelOperator::PhysicalHashJoin(hash_join) => {
let probe_schema =
self.build_pipeline(context.clone(), &expression.children()[0], pipeline)?;
let mut child_pipeline = NewPipeline::create();
let build_schema = self.build_pipeline(
QueryContext::create_from(context),
QueryContext::create_from(context.clone()),
&expression.children()[1],
&mut child_pipeline,
)?;
self.build_hash_join(
context,
hash_join,
build_schema,
probe_schema,
Expand All @@ -202,20 +203,21 @@ impl PipelineBuilder {
}
RelOperator::Sort(sort_plan) => {
let input_schema =
self.build_pipeline(context, &expression.children()[0], pipeline)?;
self.build_order_by(sort_plan, input_schema, pipeline)
self.build_pipeline(context.clone(), &expression.children()[0], pipeline)?;
self.build_order_by(context, sort_plan, input_schema, pipeline)
}
RelOperator::Limit(limit_plan) => {
let input_schema =
self.build_pipeline(context, &expression.children()[0], pipeline)?;
self.build_limit(limit_plan, input_schema, pipeline)
self.build_pipeline(context.clone(), &expression.children()[0], pipeline)?;
self.build_limit(context, limit_plan, input_schema, pipeline)
}
_ => Err(ErrorCode::LogicalError("Invalid physical plan")),
}
}

fn build_project(
&mut self,
ctx: Arc<QueryContext>,
project: &Project,
input_schema: DataSchemaRef,
pipeline: &mut NewPipeline,
Expand All @@ -234,14 +236,16 @@ impl PipelineBuilder {
input_schema.clone(),
output_schema.clone(),
expressions.clone(),
self.ctx.clone(),
ctx.clone(),
)
})?;

Ok(output_schema)
}

fn build_eval_scalar(
&mut self,
ctx: Arc<QueryContext>,
eval_scalar: &EvalScalar,
input_schema: DataSchemaRef,
pipeline: &mut NewPipeline,
Expand All @@ -262,7 +266,7 @@ impl PipelineBuilder {
input_schema.clone(),
output_schema.clone(),
expressions.clone(),
self.ctx.clone(),
ctx.clone(),
)
})?;

Expand All @@ -271,6 +275,7 @@ impl PipelineBuilder {

fn build_filter(
&mut self,
ctx: Arc<QueryContext>,
filter: &FilterPlan,
input_schema: DataSchemaRef,
pipeline: &mut NewPipeline,
Expand Down Expand Up @@ -301,23 +306,24 @@ impl PipelineBuilder {
pred.clone(),
transform_input_port,
transform_output_port,
self.ctx.clone(),
ctx.clone(),
)
})?;
Ok(output_schema)
}

fn build_physical_scan(
&mut self,
ctx: Arc<QueryContext>,
scan: &PhysicalScan,
pipeline: &mut NewPipeline,
) -> Result<DataSchemaRef> {
let table_entry = self.metadata.read().table(scan.table_index).clone();
let plan = table_entry.source;

let table = self.ctx.build_table_from_source_plan(&plan)?;
self.ctx.try_set_partitions(plan.parts.clone())?;
table.read2(self.ctx.clone(), &plan, pipeline)?;
let table = ctx.build_table_from_source_plan(&plan)?;
ctx.try_set_partitions(plan.parts.clone())?;
table.read2(ctx.clone(), &plan, pipeline)?;
let columns: Vec<IndexType> = scan.columns.iter().cloned().collect();
let projections: Vec<Expression> = columns
.iter()
Expand All @@ -340,7 +346,7 @@ impl PipelineBuilder {
input_schema.clone(),
output_schema.clone(),
projections.clone(),
self.ctx.clone(),
ctx.clone(),
)
})?;

Expand All @@ -349,6 +355,7 @@ impl PipelineBuilder {

fn build_aggregate(
&mut self,
ctx: Arc<QueryContext>,
aggregate: &AggregatePlan,
input_schema: DataSchemaRef,
pipeline: &mut NewPipeline,
Expand Down Expand Up @@ -422,7 +429,7 @@ impl PipelineBuilder {
transform_output_port,
&partial_aggr_params,
)?,
self.ctx.clone(),
ctx.clone(),
)
})?;

Expand All @@ -443,7 +450,7 @@ impl PipelineBuilder {
transform_output_port,
&final_aggr_params,
)?,
self.ctx.clone(),
ctx.clone(),
)
})?;

Expand All @@ -456,7 +463,7 @@ impl PipelineBuilder {
final_schema.clone(),
output_schema.clone(),
rename_expressions.clone(),
self.ctx.clone(),
ctx.clone(),
)
})?;

Expand All @@ -465,6 +472,7 @@ impl PipelineBuilder {

fn build_hash_join(
&mut self,
ctx: Arc<QueryContext>,
hash_join: &PhysicalHashJoin,
build_schema: DataSchemaRef,
probe_schema: DataSchemaRef,
Expand All @@ -491,7 +499,7 @@ impl PipelineBuilder {
probe_expressions,
build_schema,
probe_schema,
self.ctx.clone(),
ctx.clone(),
)?);

// Build side
Expand All @@ -500,7 +508,7 @@ impl PipelineBuilder {
// Probe side
pipeline.add_transform(|input, output| {
Ok(TransformHashJoinProbe::create(
self.ctx.clone(),
ctx.clone(),
input,
output,
hash_join_state.clone(),
Expand Down Expand Up @@ -536,6 +544,7 @@ impl PipelineBuilder {

fn build_order_by(
&mut self,
ctx: Arc<QueryContext>,
sort_plan: &SortPlan,
input_schema: DataSchemaRef,
pipeline: &mut NewPipeline,
Expand Down Expand Up @@ -564,7 +573,7 @@ impl PipelineBuilder {
input_schema.clone(),
output_schema.clone(),
expressions.clone(),
self.ctx.clone(),
ctx.clone(),
)
})?;

Expand Down Expand Up @@ -617,6 +626,7 @@ impl PipelineBuilder {

fn build_limit(
&mut self,
_ctx: Arc<QueryContext>,
limit_plan: &LimitPlan,
input_schema: DataSchemaRef,
pipeline: &mut NewPipeline,
Expand Down
5 changes: 5 additions & 0 deletions tests/suites/0_stateless/20+_others/20_0001_planner_v2.result
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,11 @@ NULL 2 3
1 2
2 3
1000
0
1
2
3
4
====ORDER_BY====
2 0
2 0
Expand Down
2 changes: 2 additions & 0 deletions tests/suites/0_stateless/20+_others/20_0001_planner_v2.sql
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,8 @@ select * from t2 inner join t on t.a = t2.c + 1;
select * from t2 inner join t on t.a = t2.c + 1 and t.a - 1 = t2.c;
select count(*) from numbers(1000) as t inner join numbers(1000) as t1 on t.number = t1.number;

select t.number from numbers(10000) as t inner join numbers(1000) as t1 on t.number % 1000 = t1.number order by number limit 5;

-- order by
select '====ORDER_BY====';
SELECT number%3 as c1, number%2 as c2 FROM numbers_mt (10) order by c1 desc, c2 asc;
Expand Down

0 comments on commit 492af92

Please sign in to comment.