From 9003cbb3ed389f73d68aab1ee26546324b807e7f Mon Sep 17 00:00:00 2001 From: Thomas Jungblut Date: Tue, 23 Apr 2024 17:37:55 +0200 Subject: [PATCH] Add int64 support fixes #330 Signed-off-by: Thomas Jungblut --- aggregates/average.go | 28 ++++++ aggregates/max.go | 5 ++ aggregates/min.go | 5 ++ aggregates/sum.go | 30 +++++++ datasources/csv/execution.go | 2 +- datasources/parquet/reconstruct.go | 2 +- functions/functions.go | 131 ++++++++++++++++++++++++++++ octosql/types.go | 4 + octosql/values.go | 25 ++++++ outputs/formats/csv_format.go | 2 + outputs/formats/json_format.go | 2 + parser/parser.go | 2 + plugins/internal/plugins/plugins.go | 8 +- 13 files changed, 242 insertions(+), 4 deletions(-) diff --git a/aggregates/average.go b/aggregates/average.go index c15f6d77..f95e903e 100644 --- a/aggregates/average.go +++ b/aggregates/average.go @@ -14,6 +14,11 @@ var AverageOverloads = []physical.AggregateDescriptor{ OutputType: octosql.Int, Prototype: NewAverageIntPrototype(), }, + { + ArgumentType: octosql.Int64, + OutputType: octosql.Int64, + Prototype: NewAverageInt64Prototype(), + }, { ArgumentType: octosql.Float, OutputType: octosql.Float, @@ -49,6 +54,29 @@ func (c *AverageInt) Trigger() octosql.Value { return octosql.NewInt(c.sum.Trigger().Int / c.count.Trigger().Int) } +type AverageInt64 struct { + sum SumInt64 + count Count +} + +func NewAverageInt64Prototype() func() nodes.Aggregate { + return func() nodes.Aggregate { + return &AverageInt64{ + sum: SumInt64{}, + count: Count{}, + } + } +} + +func (c *AverageInt64) Add(retraction bool, value octosql.Value) bool { + c.sum.Add(retraction, value) + return c.count.Add(retraction, value) +} + +func (c *AverageInt64) Trigger() octosql.Value { + return octosql.NewInt64(c.sum.Trigger().Int64 / int64(c.count.Trigger().Int)) +} + type AverageFloat struct { sum SumFloat count Count diff --git a/aggregates/max.go b/aggregates/max.go index dc54a422..935807a1 100644 --- a/aggregates/max.go +++ b/aggregates/max.go @@ -17,6 +17,11 @@ var MaxOverloads = []physical.AggregateDescriptor{ OutputType: octosql.Int, Prototype: NewMaxPrototype(), }, + { + ArgumentType: octosql.Int64, + OutputType: octosql.Int64, + Prototype: NewMaxPrototype(), + }, { ArgumentType: octosql.Float, OutputType: octosql.Float, diff --git a/aggregates/min.go b/aggregates/min.go index 3adbffc9..5cdf2d96 100644 --- a/aggregates/min.go +++ b/aggregates/min.go @@ -17,6 +17,11 @@ var MinOverloads = []physical.AggregateDescriptor{ OutputType: octosql.Int, Prototype: NewMinPrototype(), }, + { + ArgumentType: octosql.Int64, + OutputType: octosql.Int64, + Prototype: NewMinPrototype(), + }, { ArgumentType: octosql.Float, OutputType: octosql.Float, diff --git a/aggregates/sum.go b/aggregates/sum.go index 87dff0ba..a3c1817e 100644 --- a/aggregates/sum.go +++ b/aggregates/sum.go @@ -14,6 +14,11 @@ var SumOverloads = []physical.AggregateDescriptor{ OutputType: octosql.Int, Prototype: NewSumIntPrototype(), }, + { + ArgumentType: octosql.Int64, + OutputType: octosql.Int64, + Prototype: NewSumInt64Prototype(), + }, { ArgumentType: octosql.Float, OutputType: octosql.Float, @@ -51,6 +56,31 @@ func (c *SumInt) Trigger() octosql.Value { return octosql.NewInt(c.sum) } +type SumInt64 struct { + sum int64 +} + +func NewSumInt64Prototype() func() nodes.Aggregate { + return func() nodes.Aggregate { + return &SumInt64{ + sum: 0, + } + } +} + +func (c *SumInt64) Add(retraction bool, value octosql.Value) bool { + if !retraction { + c.sum += value.Int64 + } else { + c.sum -= value.Int64 + } + return c.sum == 0 +} + +func (c *SumInt64) Trigger() octosql.Value { + return octosql.NewInt64(c.sum) +} + type SumFloat struct { sum float64 } diff --git a/datasources/csv/execution.go b/datasources/csv/execution.go index c609a700..58116260 100644 --- a/datasources/csv/execution.go +++ b/datasources/csv/execution.go @@ -71,7 +71,7 @@ func (d *DatasourceExecuting) Run(ctx ExecutionContext, produce ProduceFn, metaS if octosql.Int.Is(d.fields[i].Type) == octosql.TypeRelationIs { integer, err := fastfloat.ParseInt64(str) if err == nil { - values[i] = octosql.NewInt(int(integer)) + values[i] = octosql.NewInt64(integer) continue } } diff --git a/datasources/parquet/reconstruct.go b/datasources/parquet/reconstruct.go index 41148902..c93b8acc 100644 --- a/datasources/parquet/reconstruct.go +++ b/datasources/parquet/reconstruct.go @@ -295,7 +295,7 @@ func assignValue(dst *octosql.Value, src parquet.Value) error { case parquet.Int32: *dst = octosql.NewInt(int(src.Int32())) case parquet.Int64: - *dst = octosql.NewInt(int(src.Int64())) + *dst = octosql.NewInt64(src.Int64()) case parquet.Int96: *dst = octosql.NewString(src.Int96().String()) case parquet.Float: diff --git a/functions/functions.go b/functions/functions.go index fa70d728..8c711f2a 100644 --- a/functions/functions.go +++ b/functions/functions.go @@ -163,6 +163,14 @@ func FunctionMap() map[string]physical.FunctionDetails { return octosql.NewInt(values[0].Int + values[1].Int), nil }, }, + { + ArgumentTypes: []octosql.Type{octosql.Int64, octosql.Int64}, + OutputType: octosql.Int64, + Strict: true, + Function: func(values []octosql.Value) (octosql.Value, error) { + return octosql.NewInt64(values[0].Int64 + values[1].Int64), nil + }, + }, { ArgumentTypes: []octosql.Type{octosql.Float, octosql.Float}, OutputType: octosql.Float, @@ -223,6 +231,22 @@ func FunctionMap() map[string]physical.FunctionDetails { return octosql.NewInt(-values[0].Int), nil }, }, + { + ArgumentTypes: []octosql.Type{octosql.Int64, octosql.Int64}, + OutputType: octosql.Int64, + Strict: true, + Function: func(values []octosql.Value) (octosql.Value, error) { + return octosql.NewInt64(values[0].Int64 - values[1].Int64), nil + }, + }, + { + ArgumentTypes: []octosql.Type{octosql.Int64}, + OutputType: octosql.Int64, + Strict: true, + Function: func(values []octosql.Value) (octosql.Value, error) { + return octosql.NewInt64(-values[0].Int64), nil + }, + }, { ArgumentTypes: []octosql.Type{octosql.Float, octosql.Float}, OutputType: octosql.Float, @@ -275,6 +299,14 @@ func FunctionMap() map[string]physical.FunctionDetails { return octosql.NewInt(values[0].Int * values[1].Int), nil }, }, + { + ArgumentTypes: []octosql.Type{octosql.Int64, octosql.Int64}, + OutputType: octosql.Int, + Strict: true, + Function: func(values []octosql.Value) (octosql.Value, error) { + return octosql.NewInt64(values[0].Int64 * values[1].Int64), nil + }, + }, { ArgumentTypes: []octosql.Type{octosql.Float, octosql.Float}, OutputType: octosql.Float, @@ -327,6 +359,14 @@ func FunctionMap() map[string]physical.FunctionDetails { return octosql.NewInt(values[0].Int / values[1].Int), nil }, }, + { + ArgumentTypes: []octosql.Type{octosql.Int64, octosql.Int64}, + OutputType: octosql.Int64, + Strict: true, + Function: func(values []octosql.Value) (octosql.Value, error) { + return octosql.NewInt64(values[0].Int64 / values[1].Int64), nil + }, + }, { ArgumentTypes: []octosql.Type{octosql.Float, octosql.Float}, OutputType: octosql.Float, @@ -368,6 +408,17 @@ func FunctionMap() map[string]physical.FunctionDetails { return octosql.NewInt(values[0].Int * -1), nil }, }, + { + ArgumentTypes: []octosql.Type{octosql.Int64}, + OutputType: octosql.Int64, + Strict: true, + Function: func(values []octosql.Value) (octosql.Value, error) { + if values[0].Int64 > 0 { + return values[0], nil + } + return octosql.NewInt64(values[0].Int64 * -1), nil + }, + }, { ArgumentTypes: []octosql.Type{octosql.Float}, OutputType: octosql.Float, @@ -910,6 +961,14 @@ func FunctionMap() map[string]physical.FunctionDetails { return values[0], nil }, }, + { + ArgumentTypes: []octosql.Type{octosql.Int64}, + OutputType: octosql.Int, + Strict: true, + Function: func(values []octosql.Value) (octosql.Value, error) { + return octosql.NewInt(int(values[0].Int64)), nil + }, + }, { ArgumentTypes: []octosql.Type{octosql.Boolean}, OutputType: octosql.Int, @@ -953,6 +1012,70 @@ func FunctionMap() map[string]physical.FunctionDetails { }, }, }, + "int64": { + Description: "Converts the argument to an int64.", + Descriptors: []physical.FunctionDescriptor{ + { + // This case will catch any types which may be int at the start of non-exact matching. + // So the int function can be used as a type cast. + ArgumentTypes: []octosql.Type{octosql.Int}, + OutputType: octosql.Int64, + Strict: true, + Function: func(values []octosql.Value) (octosql.Value, error) { + return octosql.NewInt64(int64(values[0].Int)), nil + }, + }, + { + ArgumentTypes: []octosql.Type{octosql.Int64}, + OutputType: octosql.Int64, + Strict: true, + Function: func(values []octosql.Value) (octosql.Value, error) { + return values[0], nil + }, + }, + { + ArgumentTypes: []octosql.Type{octosql.Boolean}, + OutputType: octosql.Int64, + Strict: true, + Function: func(values []octosql.Value) (octosql.Value, error) { + if values[0].Boolean { + return octosql.NewInt64(1), nil + } else { + return octosql.NewInt64(0), nil + } + }, + }, + { + ArgumentTypes: []octosql.Type{octosql.Float}, + OutputType: octosql.Int64, + Strict: true, + Function: func(values []octosql.Value) (octosql.Value, error) { + return octosql.NewInt64(int64(values[0].Float)), nil + }, + }, + { + ArgumentTypes: []octosql.Type{octosql.String}, + OutputType: octosql.Int64, + Strict: true, + Function: func(values []octosql.Value) (octosql.Value, error) { + n, err := strconv.ParseInt(values[0].Str, 10, 64) + if err != nil { + log.Printf("couldn't parse string '%s' as int64: %s", values[0].Str, err) + return octosql.NewNull(), nil + } + return octosql.NewInt64(n), nil + }, + }, + { + ArgumentTypes: []octosql.Type{octosql.Duration}, + OutputType: octosql.Int64, + Strict: true, + Function: func(values []octosql.Value) (octosql.Value, error) { + return octosql.NewInt64(int64(values[0].Duration)), nil + }, + }, + }, + }, "float": { Description: "Converts the argument to an float.", Descriptors: []physical.FunctionDescriptor{ @@ -974,6 +1097,14 @@ func FunctionMap() map[string]physical.FunctionDetails { return octosql.NewFloat(float64(values[0].Int)), nil }, }, + { + ArgumentTypes: []octosql.Type{octosql.Int64}, + OutputType: octosql.Float, + Strict: true, + Function: func(values []octosql.Value) (octosql.Value, error) { + return octosql.NewFloat(float64(values[0].Int64)), nil + }, + }, { ArgumentTypes: []octosql.Type{octosql.String}, OutputType: octosql.Float, diff --git a/octosql/types.go b/octosql/types.go index b21df9eb..9bbe2b1b 100644 --- a/octosql/types.go +++ b/octosql/types.go @@ -21,6 +21,7 @@ const ( TypeIDTuple TypeIDUnion TypeIDAny // TODO: Remove this type? + TypeIDInt64 ) func (t TypeID) String() string { @@ -29,6 +30,8 @@ func (t TypeID) String() string { return "Null" case TypeIDInt: return "Int" + case TypeIDInt64: + return "Int64" case TypeIDFloat: return "Float" case TypeIDBoolean: @@ -228,6 +231,7 @@ func (t Type) String() string { var ( Null = Type{TypeID: TypeIDNull} Int = Type{TypeID: TypeIDInt} + Int64 = Type{TypeID: TypeIDInt64} Float = Type{TypeID: TypeIDFloat} Boolean = Type{TypeID: TypeIDBoolean} String = Type{TypeID: TypeIDString} diff --git a/octosql/values.go b/octosql/values.go index f6b35d46..2dc5ec28 100644 --- a/octosql/values.go +++ b/octosql/values.go @@ -15,6 +15,7 @@ var ZeroValue = Value{} type Value struct { TypeID TypeID Int int + Int64 int64 Float float64 Boolean bool Str string @@ -38,6 +39,13 @@ func NewInt(value int) Value { } } +func NewInt64(value int64) Value { + return Value{ + TypeID: TypeIDInt64, + Int64: value, + } +} + func NewFloat(value float64) Value { return Value{ TypeID: TypeIDFloat, @@ -118,6 +126,15 @@ func (value Value) Compare(other Value) int { return 0 } + case TypeIDInt64: + if value.Int64 < other.Int64 { + return -1 + } else if value.Int64 > other.Int64 { + return 1 + } else { + return 0 + } + case TypeIDFloat: if value.Float < other.Float { return -1 @@ -259,6 +276,9 @@ func (value Value) hash(hash uint64) uint64 { case TypeIDInt: hash = fnv1a.AddUint64(hash, uint64(value.Int)) + case TypeIDInt64: + hash = fnv1a.AddUint64(hash, uint64(value.Int64)) + case TypeIDFloat: hash = fnv1a.AddUint64(hash, math.Float64bits(value.Float)) @@ -368,6 +388,9 @@ func (value Value) append(builder *strings.Builder) { case TypeIDInt: builder.WriteString(fmt.Sprint(value.Int)) + case TypeIDInt64: + builder.WriteString(fmt.Sprint(value.Int64)) + case TypeIDFloat: builder.WriteString(fmt.Sprint(value.Float)) @@ -429,6 +452,8 @@ func (value Value) ToRawGoValue(t Type) interface{} { return nil case TypeIDInt: return value.Int + case TypeIDInt64: + return value.Int64 case TypeIDFloat: return value.Float case TypeIDBoolean: diff --git a/outputs/formats/csv_format.go b/outputs/formats/csv_format.go index 147b34e2..2ecfc959 100644 --- a/outputs/formats/csv_format.go +++ b/outputs/formats/csv_format.go @@ -51,6 +51,8 @@ func FormatCSVValue(builder *strings.Builder, value octosql.Value) { case octosql.TypeIDNull: case octosql.TypeIDInt: builder.WriteString(strconv.FormatInt(int64(value.Int), 10)) + case octosql.TypeIDInt64: + builder.WriteString(strconv.FormatInt(value.Int64, 10)) case octosql.TypeIDFloat: builder.WriteString(strconv.FormatFloat(value.Float, 'f', -1, 64)) case octosql.TypeIDBoolean: diff --git a/outputs/formats/json_format.go b/outputs/formats/json_format.go index e74fc202..fca2fb89 100644 --- a/outputs/formats/json_format.go +++ b/outputs/formats/json_format.go @@ -61,6 +61,8 @@ func ValueToJson(arena *fastjson.Arena, t octosql.Type, value octosql.Value) *fa return arena.NewNull() case octosql.TypeIDInt: return arena.NewNumberInt(value.Int) + case octosql.TypeIDInt64: + return arena.NewNumberInt(value.Int) case octosql.TypeIDFloat: return arena.NewNumberFloat64(value.Float) case octosql.TypeIDBoolean: diff --git a/parser/parser.go b/parser/parser.go index 1f6aa9a8..7bf212e0 100644 --- a/parser/parser.go +++ b/parser/parser.go @@ -766,6 +766,8 @@ func ParseType(t sqlparser.ConvertType) (octosql.TypeID, error) { return octosql.TypeIDNull, nil case "int": return octosql.TypeIDInt, nil + case "int64": + return octosql.TypeIDInt64, nil case "float": return octosql.TypeIDFloat, nil case "boolean": diff --git a/plugins/internal/plugins/plugins.go b/plugins/internal/plugins/plugins.go index d5e81935..2cb31fb9 100644 --- a/plugins/internal/plugins/plugins.go +++ b/plugins/internal/plugins/plugins.go @@ -61,6 +61,8 @@ func NativeValueToProto(value octosql.Value) *Value { case octosql.TypeIDNull: case octosql.TypeIDInt: out.Int = int64(value.Int) + case octosql.TypeIDInt64: + out.Int = value.Int64 case octosql.TypeIDFloat: out.Float = value.Float case octosql.TypeIDBoolean: @@ -104,6 +106,8 @@ func (x *Value) ToNativeValue() octosql.Value { case octosql.TypeIDNull: case octosql.TypeIDInt: out.Int = int(x.Int) + case octosql.TypeIDInt64: + out.Int64 = x.Int case octosql.TypeIDFloat: out.Float = x.Float case octosql.TypeIDBoolean: @@ -173,7 +177,7 @@ func NativeTypeToProto(t octosql.Type) *Type { TypeId: int32(t.TypeID), } switch t.TypeID { - case octosql.TypeIDNull, octosql.TypeIDInt, octosql.TypeIDFloat, octosql.TypeIDBoolean, octosql.TypeIDString, octosql.TypeIDTime, octosql.TypeIDDuration, octosql.TypeIDAny: + case octosql.TypeIDNull, octosql.TypeIDInt, octosql.TypeIDInt64, octosql.TypeIDFloat, octosql.TypeIDBoolean, octosql.TypeIDString, octosql.TypeIDTime, octosql.TypeIDDuration, octosql.TypeIDAny: case octosql.TypeIDList: if t.List.Element != nil { out.List = NativeTypeToProto(*t.List.Element) @@ -210,7 +214,7 @@ func (x *Type) ToNativeType() octosql.Type { TypeID: octosql.TypeID(x.TypeId), } switch octosql.TypeID(x.TypeId) { - case octosql.TypeIDNull, octosql.TypeIDInt, octosql.TypeIDFloat, octosql.TypeIDBoolean, octosql.TypeIDString, octosql.TypeIDTime, octosql.TypeIDDuration, octosql.TypeIDAny: + case octosql.TypeIDNull, octosql.TypeIDInt, octosql.TypeIDInt64, octosql.TypeIDFloat, octosql.TypeIDBoolean, octosql.TypeIDString, octosql.TypeIDTime, octosql.TypeIDDuration, octosql.TypeIDAny: case octosql.TypeIDList: if x.List != nil { t := x.List.ToNativeType()