Skip to content

Commit

Permalink
Support single concurrency
Browse files Browse the repository at this point in the history
The aggregators exepct to have a final aggregation.
In the concurrency = 1 case we weren't correctly aggregating
because no final aggregation was created.

This removes the requirement for concurrency to be > 1 for a syncronizer
and final aggregation to be created.

Ideally in the future we would just schedule a single aggregation that
handles both being a normal and final aggregation but this is a simple
fix for an edge case that isn't important at this point.
  • Loading branch information
thorfour committed May 10, 2024
1 parent acac395 commit 1d21b94
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 58 deletions.
119 changes: 68 additions & 51 deletions query/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,66 +12,83 @@ import (
"github.com/polarsignals/frostdb/dynparquet"
schemapb "github.com/polarsignals/frostdb/gen/proto/go/frostdb/schema/v1alpha1"
"github.com/polarsignals/frostdb/query/logicalplan"
"github.com/polarsignals/frostdb/query/physicalplan"
)

func TestUniqueAggregation(t *testing.T) {
mem := memory.NewCheckedAllocator(memory.NewGoAllocator())
defer mem.AssertSize(t, 0)

schema, err := dynparquet.SchemaFromDefinition(&schemapb.Schema{
Name: "test",
Columns: []*schemapb.Column{{
Name: "example",
StorageLayout: &schemapb.StorageLayout{
Type: schemapb.StorageLayout_TYPE_INT64,
tests := map[string]struct {
execOptions []physicalplan.Option
}{
"no concurrency": {
execOptions: []physicalplan.Option{
physicalplan.WithConcurrency(1),
},
}, {
Name: "timestamp",
StorageLayout: &schemapb.StorageLayout{
Type: schemapb.StorageLayout_TYPE_INT64,
},
}},
})
require.NoError(t, err)
},
"default": {
execOptions: []physicalplan.Option{},
},
}
for name, test := range tests {
t.Run(name, func(t *testing.T) {
mem := memory.NewCheckedAllocator(memory.NewGoAllocator())
defer mem.AssertSize(t, 0)

rb := array.NewRecordBuilder(mem, arrow.NewSchema([]arrow.Field{{
Name: "example",
Type: arrow.PrimitiveTypes.Int64,
}, {
Name: "timestamp",
Type: arrow.PrimitiveTypes.Int64,
}}, nil))
defer rb.Release()
schema, err := dynparquet.SchemaFromDefinition(&schemapb.Schema{
Name: "test",
Columns: []*schemapb.Column{{
Name: "example",
StorageLayout: &schemapb.StorageLayout{
Type: schemapb.StorageLayout_TYPE_INT64,
},
}, {
Name: "timestamp",
StorageLayout: &schemapb.StorageLayout{
Type: schemapb.StorageLayout_TYPE_INT64,
},
}},
})
require.NoError(t, err)

rb.Field(0).(*array.Int64Builder).AppendValues([]int64{1, 2, 3}, nil)
rb.Field(1).(*array.Int64Builder).AppendValues([]int64{1, 1, 3}, nil)
rb := array.NewRecordBuilder(mem, arrow.NewSchema([]arrow.Field{{
Name: "example",
Type: arrow.PrimitiveTypes.Int64,
}, {
Name: "timestamp",
Type: arrow.PrimitiveTypes.Int64,
}}, nil))
defer rb.Release()

r := rb.NewRecord()
defer r.Release()
rb.Field(0).(*array.Int64Builder).AppendValues([]int64{1, 2, 3}, nil)
rb.Field(1).(*array.Int64Builder).AppendValues([]int64{1, 1, 3}, nil)

ran := false
err = NewEngine(mem, &FakeTableProvider{
Tables: map[string]logicalplan.TableReader{
"test": &FakeTableReader{
FrostdbSchema: schema,
Records: []arrow.Record{r},
},
},
}).ScanTable("test").
Aggregate(
[]*logicalplan.AggregationFunction{logicalplan.Unique(logicalplan.Col("example"))},
[]logicalplan.Expr{logicalplan.Col("timestamp")},
).
Execute(context.Background(), func(ctx context.Context, r arrow.Record) error {
require.Equal(t, []int64{1, 3}, r.Column(0).(*array.Int64).Int64Values())
require.True(t, r.Column(1).(*array.Int64).IsNull(0))
require.True(t, r.Column(1).(*array.Int64).IsValid(1))
require.Equal(t, int64(3), r.Column(1).(*array.Int64).Value(1))
ran = true
return nil
r := rb.NewRecord()
defer r.Release()

ran := false
err = NewEngine(mem, &FakeTableProvider{
Tables: map[string]logicalplan.TableReader{
"test": &FakeTableReader{
FrostdbSchema: schema,
Records: []arrow.Record{r},
},
},
}, WithPhysicalplanOptions(test.execOptions...)).ScanTable("test").
Aggregate(
[]*logicalplan.AggregationFunction{logicalplan.Unique(logicalplan.Col("example"))},
[]logicalplan.Expr{logicalplan.Col("timestamp")},
).
Execute(context.Background(), func(ctx context.Context, r arrow.Record) error {
require.Equal(t, []int64{1, 3}, r.Column(0).(*array.Int64).Int64Values())
require.True(t, r.Column(1).(*array.Int64).IsNull(0))
require.True(t, r.Column(1).(*array.Int64).IsValid(1))
require.Equal(t, int64(3), r.Column(1).(*array.Int64).Value(1))
ran = true
return nil
})
require.NoError(t, err)
require.True(t, ran)
})
require.NoError(t, err)
require.True(t, ran)
}
}

func TestAndAggregation(t *testing.T) {
Expand Down
13 changes: 6 additions & 7 deletions query/physicalplan/physicalplan.go
Original file line number Diff line number Diff line change
Expand Up @@ -447,13 +447,12 @@ func Build(
ordered = false
}
var sync PhysicalPlan
if len(prev) > 1 {
// These aggregate operators need to be synchronized.
if ordered && len(plan.Aggregation.GroupExprs) > 0 {
sync = NewOrderedSynchronizer(pool, len(prev), plan.Aggregation.GroupExprs)
} else {
sync = Synchronize(len(prev))
}
// These aggregate operators need to be synchronized.
// NOTE: that in the case of concurrency 1 we still add a syncronizer because the Aggregation operator expects a final aggregation to be performed.
if ordered && len(plan.Aggregation.GroupExprs) > 0 {
sync = NewOrderedSynchronizer(pool, len(prev), plan.Aggregation.GroupExprs)
} else {
sync = Synchronize(len(prev))
}
seed := maphash.MakeSeed()
for i := 0; i < len(prev); i++ {
Expand Down

0 comments on commit 1d21b94

Please sign in to comment.