Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix query concurrency #861

Merged
merged 3 commits into from
May 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 68 additions & 51 deletions query/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,66 +12,83 @@ import (
"github.com/polarsignals/frostdb/dynparquet"
schemapb "github.com/polarsignals/frostdb/gen/proto/go/frostdb/schema/v1alpha1"
"github.com/polarsignals/frostdb/query/logicalplan"
"github.com/polarsignals/frostdb/query/physicalplan"
)

func TestUniqueAggregation(t *testing.T) {
mem := memory.NewCheckedAllocator(memory.NewGoAllocator())
defer mem.AssertSize(t, 0)

schema, err := dynparquet.SchemaFromDefinition(&schemapb.Schema{
Name: "test",
Columns: []*schemapb.Column{{
Name: "example",
StorageLayout: &schemapb.StorageLayout{
Type: schemapb.StorageLayout_TYPE_INT64,
tests := map[string]struct {
execOptions []physicalplan.Option
}{
"no concurrency": {
execOptions: []physicalplan.Option{
physicalplan.WithConcurrency(1),
},
}, {
Name: "timestamp",
StorageLayout: &schemapb.StorageLayout{
Type: schemapb.StorageLayout_TYPE_INT64,
},
}},
})
require.NoError(t, err)
},
"default": {
execOptions: []physicalplan.Option{},
},
}
for name, test := range tests {
t.Run(name, func(t *testing.T) {
mem := memory.NewCheckedAllocator(memory.NewGoAllocator())
defer mem.AssertSize(t, 0)

rb := array.NewRecordBuilder(mem, arrow.NewSchema([]arrow.Field{{
Name: "example",
Type: arrow.PrimitiveTypes.Int64,
}, {
Name: "timestamp",
Type: arrow.PrimitiveTypes.Int64,
}}, nil))
defer rb.Release()
schema, err := dynparquet.SchemaFromDefinition(&schemapb.Schema{
Name: "test",
Columns: []*schemapb.Column{{
Name: "example",
StorageLayout: &schemapb.StorageLayout{
Type: schemapb.StorageLayout_TYPE_INT64,
},
}, {
Name: "timestamp",
StorageLayout: &schemapb.StorageLayout{
Type: schemapb.StorageLayout_TYPE_INT64,
},
}},
})
require.NoError(t, err)

rb.Field(0).(*array.Int64Builder).AppendValues([]int64{1, 2, 3}, nil)
rb.Field(1).(*array.Int64Builder).AppendValues([]int64{1, 1, 3}, nil)
rb := array.NewRecordBuilder(mem, arrow.NewSchema([]arrow.Field{{
Name: "example",
Type: arrow.PrimitiveTypes.Int64,
}, {
Name: "timestamp",
Type: arrow.PrimitiveTypes.Int64,
}}, nil))
defer rb.Release()

r := rb.NewRecord()
defer r.Release()
rb.Field(0).(*array.Int64Builder).AppendValues([]int64{1, 2, 3}, nil)
rb.Field(1).(*array.Int64Builder).AppendValues([]int64{1, 1, 3}, nil)

ran := false
err = NewEngine(mem, &FakeTableProvider{
Tables: map[string]logicalplan.TableReader{
"test": &FakeTableReader{
FrostdbSchema: schema,
Records: []arrow.Record{r},
},
},
}).ScanTable("test").
Aggregate(
[]*logicalplan.AggregationFunction{logicalplan.Unique(logicalplan.Col("example"))},
[]logicalplan.Expr{logicalplan.Col("timestamp")},
).
Execute(context.Background(), func(ctx context.Context, r arrow.Record) error {
require.Equal(t, []int64{1, 3}, r.Column(0).(*array.Int64).Int64Values())
require.True(t, r.Column(1).(*array.Int64).IsNull(0))
require.True(t, r.Column(1).(*array.Int64).IsValid(1))
require.Equal(t, int64(3), r.Column(1).(*array.Int64).Value(1))
ran = true
return nil
r := rb.NewRecord()
defer r.Release()

ran := false
err = NewEngine(mem, &FakeTableProvider{
Tables: map[string]logicalplan.TableReader{
"test": &FakeTableReader{
FrostdbSchema: schema,
Records: []arrow.Record{r},
},
},
}, WithPhysicalplanOptions(test.execOptions...)).ScanTable("test").
Aggregate(
[]*logicalplan.AggregationFunction{logicalplan.Unique(logicalplan.Col("example"))},
[]logicalplan.Expr{logicalplan.Col("timestamp")},
).
Execute(context.Background(), func(ctx context.Context, r arrow.Record) error {
require.Equal(t, []int64{1, 3}, r.Column(0).(*array.Int64).Int64Values())
require.True(t, r.Column(1).(*array.Int64).IsNull(0))
require.True(t, r.Column(1).(*array.Int64).IsValid(1))
require.Equal(t, int64(3), r.Column(1).(*array.Int64).Value(1))
ran = true
return nil
})
require.NoError(t, err)
require.True(t, ran)
})
require.NoError(t, err)
require.True(t, ran)
}
}

func TestAndAggregation(t *testing.T) {
Expand Down
47 changes: 29 additions & 18 deletions query/physicalplan/physicalplan.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@ import (
"github.com/polarsignals/frostdb/recovery"
)

// TODO: Make this smarter.
var concurrencyHardcoded = runtime.GOMAXPROCS(0)
var defaultConcurrency = runtime.GOMAXPROCS(0)

type PhysicalPlan interface {
Callback(ctx context.Context, r arrow.Record) error
Expand Down Expand Up @@ -255,34 +254,47 @@ func (p *noopOperator) Draw() *Diagram {
return p.next.Draw()
}

type execOptions struct {
type ExecOptions struct {
orderedAggregations bool
overrideInput []PhysicalPlan
readMode logicalplan.ReadMode
concurrency int
}

type Option func(o *execOptions)
func NewExecOptions() ExecOptions {
return ExecOptions{
concurrency: defaultConcurrency,
}
}

type Option func(o *ExecOptions)

func WithReadMode(m logicalplan.ReadMode) Option {
return func(o *execOptions) {
return func(o *ExecOptions) {
o.readMode = m
}
}

func WithOrderedAggregations() Option {
return func(o *execOptions) {
return func(o *ExecOptions) {
o.orderedAggregations = true
}
}

// WithOverrideInput can be used to provide an input stage on top of which the
// Build function can build the physical plan.
func WithOverrideInput(input []PhysicalPlan) Option {
return func(o *execOptions) {
return func(o *ExecOptions) {
o.overrideInput = input
}
}

func WithConcurrency(concurrency int) Option {
return func(o *ExecOptions) {
o.concurrency = concurrency
}
}

func Build(
ctx context.Context,
pool memory.Allocator,
Expand All @@ -294,7 +306,7 @@ func Build(
_, span := tracer.Start(ctx, "PhysicalPlan/Build")
defer span.End()

execOpts := execOptions{}
execOpts := NewExecOptions()
for _, o := range options {
o(&execOpts)
}
Expand All @@ -318,7 +330,7 @@ func Build(
// Create noop operators since we don't know what to push the scan
// results to. In a following node visit, these noops will have
// SetNext called on them and push to the correct operator.
plans := make([]PhysicalPlan, concurrencyHardcoded)
plans := make([]PhysicalPlan, execOpts.concurrency)
for i := range plans {
plans[i] = &noopOperator{}
}
Expand All @@ -333,7 +345,7 @@ func Build(
// Create noop operators since we don't know what to push the scan
// results to. In a following node visit, these noops will have
// SetNext called on them and push to the correct operator.
plans := make([]PhysicalPlan, concurrencyHardcoded)
plans := make([]PhysicalPlan, execOpts.concurrency)
for i := range plans {
plans[i] = &noopOperator{}
}
Expand Down Expand Up @@ -435,13 +447,12 @@ func Build(
ordered = false
}
var sync PhysicalPlan
if len(prev) > 1 {
// These aggregate operators need to be synchronized.
if ordered && len(plan.Aggregation.GroupExprs) > 0 {
sync = NewOrderedSynchronizer(pool, len(prev), plan.Aggregation.GroupExprs)
} else {
sync = Synchronize(len(prev))
}
// These aggregate operators need to be synchronized.
// NOTE: that in the case of concurrency 1 we still add a syncronizer because the Aggregation operator expects a final aggregation to be performed.
if ordered && len(plan.Aggregation.GroupExprs) > 0 {
sync = NewOrderedSynchronizer(pool, len(prev), plan.Aggregation.GroupExprs)
} else {
sync = Synchronize(len(prev))
}
seed := maphash.MakeSeed()
for i := 0; i < len(prev); i++ {
Expand Down Expand Up @@ -500,7 +511,7 @@ func Build(
}

func shouldPlanOrderedAggregate(
execOpts execOptions, oInfo *planOrderingInfo, agg *logicalplan.Aggregation,
execOpts ExecOptions, oInfo *planOrderingInfo, agg *logicalplan.Aggregation,
) (bool, error) {
if !execOpts.orderedAggregations {
// Ordered aggregations disabled.
Expand Down
Loading