From acac395d0b1d4f0b91430e3c2e2b0d6e636cfcee Mon Sep 17 00:00:00 2001 From: thorfour Date: Fri, 10 May 2024 09:08:26 -0500 Subject: [PATCH 1/3] Make concurrency configurable as physicalplan option --- query/physicalplan/physicalplan.go | 22 +++++++++++++++++----- 1 file changed, 17 insertions(+), 5 deletions(-) diff --git a/query/physicalplan/physicalplan.go b/query/physicalplan/physicalplan.go index 787d39d16..e9a031451 100644 --- a/query/physicalplan/physicalplan.go +++ b/query/physicalplan/physicalplan.go @@ -17,8 +17,7 @@ import ( "github.com/polarsignals/frostdb/recovery" ) -// TODO: Make this smarter. -var concurrencyHardcoded = runtime.GOMAXPROCS(0) +var defaultConcurrency = runtime.GOMAXPROCS(0) type PhysicalPlan interface { Callback(ctx context.Context, r arrow.Record) error @@ -259,6 +258,13 @@ type execOptions struct { orderedAggregations bool overrideInput []PhysicalPlan readMode logicalplan.ReadMode + concurrency int +} + +func NewExecOptions() execOptions { + return execOptions{ + concurrency: defaultConcurrency, + } } type Option func(o *execOptions) @@ -283,6 +289,12 @@ func WithOverrideInput(input []PhysicalPlan) Option { } } +func WithConcurrency(concurrency int) Option { + return func(o *execOptions) { + o.concurrency = concurrency + } +} + func Build( ctx context.Context, pool memory.Allocator, @@ -294,7 +306,7 @@ func Build( _, span := tracer.Start(ctx, "PhysicalPlan/Build") defer span.End() - execOpts := execOptions{} + execOpts := NewExecOptions() for _, o := range options { o(&execOpts) } @@ -318,7 +330,7 @@ func Build( // Create noop operators since we don't know what to push the scan // results to. In a following node visit, these noops will have // SetNext called on them and push to the correct operator. - plans := make([]PhysicalPlan, concurrencyHardcoded) + plans := make([]PhysicalPlan, execOpts.concurrency) for i := range plans { plans[i] = &noopOperator{} } @@ -333,7 +345,7 @@ func Build( // Create noop operators since we don't know what to push the scan // results to. In a following node visit, these noops will have // SetNext called on them and push to the correct operator. - plans := make([]PhysicalPlan, concurrencyHardcoded) + plans := make([]PhysicalPlan, execOpts.concurrency) for i := range plans { plans[i] = &noopOperator{} } From 1d21b94895d1af08ceac30968b17ae48044afea5 Mon Sep 17 00:00:00 2001 From: thorfour Date: Fri, 10 May 2024 09:31:31 -0500 Subject: [PATCH 2/3] Support single concurrency The aggregators exepct to have a final aggregation. In the concurrency = 1 case we weren't correctly aggregating because no final aggregation was created. This removes the requirement for concurrency to be > 1 for a syncronizer and final aggregation to be created. Ideally in the future we would just schedule a single aggregation that handles both being a normal and final aggregation but this is a simple fix for an edge case that isn't important at this point. --- query/engine_test.go | 119 ++++++++++++++++------------- query/physicalplan/physicalplan.go | 13 ++-- 2 files changed, 74 insertions(+), 58 deletions(-) diff --git a/query/engine_test.go b/query/engine_test.go index 2f2a1dc34..ba1294f67 100644 --- a/query/engine_test.go +++ b/query/engine_test.go @@ -12,66 +12,83 @@ import ( "github.com/polarsignals/frostdb/dynparquet" schemapb "github.com/polarsignals/frostdb/gen/proto/go/frostdb/schema/v1alpha1" "github.com/polarsignals/frostdb/query/logicalplan" + "github.com/polarsignals/frostdb/query/physicalplan" ) func TestUniqueAggregation(t *testing.T) { - mem := memory.NewCheckedAllocator(memory.NewGoAllocator()) - defer mem.AssertSize(t, 0) - - schema, err := dynparquet.SchemaFromDefinition(&schemapb.Schema{ - Name: "test", - Columns: []*schemapb.Column{{ - Name: "example", - StorageLayout: &schemapb.StorageLayout{ - Type: schemapb.StorageLayout_TYPE_INT64, + tests := map[string]struct { + execOptions []physicalplan.Option + }{ + "no concurrency": { + execOptions: []physicalplan.Option{ + physicalplan.WithConcurrency(1), }, - }, { - Name: "timestamp", - StorageLayout: &schemapb.StorageLayout{ - Type: schemapb.StorageLayout_TYPE_INT64, - }, - }}, - }) - require.NoError(t, err) + }, + "default": { + execOptions: []physicalplan.Option{}, + }, + } + for name, test := range tests { + t.Run(name, func(t *testing.T) { + mem := memory.NewCheckedAllocator(memory.NewGoAllocator()) + defer mem.AssertSize(t, 0) - rb := array.NewRecordBuilder(mem, arrow.NewSchema([]arrow.Field{{ - Name: "example", - Type: arrow.PrimitiveTypes.Int64, - }, { - Name: "timestamp", - Type: arrow.PrimitiveTypes.Int64, - }}, nil)) - defer rb.Release() + schema, err := dynparquet.SchemaFromDefinition(&schemapb.Schema{ + Name: "test", + Columns: []*schemapb.Column{{ + Name: "example", + StorageLayout: &schemapb.StorageLayout{ + Type: schemapb.StorageLayout_TYPE_INT64, + }, + }, { + Name: "timestamp", + StorageLayout: &schemapb.StorageLayout{ + Type: schemapb.StorageLayout_TYPE_INT64, + }, + }}, + }) + require.NoError(t, err) - rb.Field(0).(*array.Int64Builder).AppendValues([]int64{1, 2, 3}, nil) - rb.Field(1).(*array.Int64Builder).AppendValues([]int64{1, 1, 3}, nil) + rb := array.NewRecordBuilder(mem, arrow.NewSchema([]arrow.Field{{ + Name: "example", + Type: arrow.PrimitiveTypes.Int64, + }, { + Name: "timestamp", + Type: arrow.PrimitiveTypes.Int64, + }}, nil)) + defer rb.Release() - r := rb.NewRecord() - defer r.Release() + rb.Field(0).(*array.Int64Builder).AppendValues([]int64{1, 2, 3}, nil) + rb.Field(1).(*array.Int64Builder).AppendValues([]int64{1, 1, 3}, nil) - ran := false - err = NewEngine(mem, &FakeTableProvider{ - Tables: map[string]logicalplan.TableReader{ - "test": &FakeTableReader{ - FrostdbSchema: schema, - Records: []arrow.Record{r}, - }, - }, - }).ScanTable("test"). - Aggregate( - []*logicalplan.AggregationFunction{logicalplan.Unique(logicalplan.Col("example"))}, - []logicalplan.Expr{logicalplan.Col("timestamp")}, - ). - Execute(context.Background(), func(ctx context.Context, r arrow.Record) error { - require.Equal(t, []int64{1, 3}, r.Column(0).(*array.Int64).Int64Values()) - require.True(t, r.Column(1).(*array.Int64).IsNull(0)) - require.True(t, r.Column(1).(*array.Int64).IsValid(1)) - require.Equal(t, int64(3), r.Column(1).(*array.Int64).Value(1)) - ran = true - return nil + r := rb.NewRecord() + defer r.Release() + + ran := false + err = NewEngine(mem, &FakeTableProvider{ + Tables: map[string]logicalplan.TableReader{ + "test": &FakeTableReader{ + FrostdbSchema: schema, + Records: []arrow.Record{r}, + }, + }, + }, WithPhysicalplanOptions(test.execOptions...)).ScanTable("test"). + Aggregate( + []*logicalplan.AggregationFunction{logicalplan.Unique(logicalplan.Col("example"))}, + []logicalplan.Expr{logicalplan.Col("timestamp")}, + ). + Execute(context.Background(), func(ctx context.Context, r arrow.Record) error { + require.Equal(t, []int64{1, 3}, r.Column(0).(*array.Int64).Int64Values()) + require.True(t, r.Column(1).(*array.Int64).IsNull(0)) + require.True(t, r.Column(1).(*array.Int64).IsValid(1)) + require.Equal(t, int64(3), r.Column(1).(*array.Int64).Value(1)) + ran = true + return nil + }) + require.NoError(t, err) + require.True(t, ran) }) - require.NoError(t, err) - require.True(t, ran) + } } func TestAndAggregation(t *testing.T) { diff --git a/query/physicalplan/physicalplan.go b/query/physicalplan/physicalplan.go index e9a031451..2c540c2b3 100644 --- a/query/physicalplan/physicalplan.go +++ b/query/physicalplan/physicalplan.go @@ -447,13 +447,12 @@ func Build( ordered = false } var sync PhysicalPlan - if len(prev) > 1 { - // These aggregate operators need to be synchronized. - if ordered && len(plan.Aggregation.GroupExprs) > 0 { - sync = NewOrderedSynchronizer(pool, len(prev), plan.Aggregation.GroupExprs) - } else { - sync = Synchronize(len(prev)) - } + // These aggregate operators need to be synchronized. + // NOTE: that in the case of concurrency 1 we still add a syncronizer because the Aggregation operator expects a final aggregation to be performed. + if ordered && len(plan.Aggregation.GroupExprs) > 0 { + sync = NewOrderedSynchronizer(pool, len(prev), plan.Aggregation.GroupExprs) + } else { + sync = Synchronize(len(prev)) } seed := maphash.MakeSeed() for i := 0; i < len(prev); i++ { From 51869268377e45158ea15a78b3367b64576a0f4e Mon Sep 17 00:00:00 2001 From: thorfour Date: Fri, 10 May 2024 09:39:41 -0500 Subject: [PATCH 3/3] lint --- query/physicalplan/physicalplan.go | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/query/physicalplan/physicalplan.go b/query/physicalplan/physicalplan.go index 2c540c2b3..0b718dd37 100644 --- a/query/physicalplan/physicalplan.go +++ b/query/physicalplan/physicalplan.go @@ -254,29 +254,29 @@ func (p *noopOperator) Draw() *Diagram { return p.next.Draw() } -type execOptions struct { +type ExecOptions struct { orderedAggregations bool overrideInput []PhysicalPlan readMode logicalplan.ReadMode concurrency int } -func NewExecOptions() execOptions { - return execOptions{ +func NewExecOptions() ExecOptions { + return ExecOptions{ concurrency: defaultConcurrency, } } -type Option func(o *execOptions) +type Option func(o *ExecOptions) func WithReadMode(m logicalplan.ReadMode) Option { - return func(o *execOptions) { + return func(o *ExecOptions) { o.readMode = m } } func WithOrderedAggregations() Option { - return func(o *execOptions) { + return func(o *ExecOptions) { o.orderedAggregations = true } } @@ -284,13 +284,13 @@ func WithOrderedAggregations() Option { // WithOverrideInput can be used to provide an input stage on top of which the // Build function can build the physical plan. func WithOverrideInput(input []PhysicalPlan) Option { - return func(o *execOptions) { + return func(o *ExecOptions) { o.overrideInput = input } } func WithConcurrency(concurrency int) Option { - return func(o *execOptions) { + return func(o *ExecOptions) { o.concurrency = concurrency } } @@ -511,7 +511,7 @@ func Build( } func shouldPlanOrderedAggregate( - execOpts execOptions, oInfo *planOrderingInfo, agg *logicalplan.Aggregation, + execOpts ExecOptions, oInfo *planOrderingInfo, agg *logicalplan.Aggregation, ) (bool, error) { if !execOpts.orderedAggregations { // Ordered aggregations disabled.