Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add exclude physical plan compiler impl #1280

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ Thank you to all who have contributed!
- **BREAKING** In the produced plan:
- The new plan is fully resolved and typed.
- Operators will be converted to function call.
- **EXPERIMENTAL** `EXCLUDE` addition to the physical plan
- This is currently marked as experimental until the RFC is approved https://github.com/partiql/partiql-lang/issues/27
- Evaluation of `EXCLUDE` in the `PlanCompiler`
- Changes the return type of `filter_distinct` to a list if input collection is list

### Deprecated
Expand Down
9 changes: 9 additions & 0 deletions partiql-ast/src/main/pig/partiql.ion
Original file line number Diff line number Diff line change
Expand Up @@ -799,6 +799,9 @@ may then be further optimized by selecting better implementations of each operat

// For every row of `source`, adds each specified `let_binding`.
(let source::bexpr bindings::(* let_binding 1))

// For every row of `source`, omits the values specified by `exclude_expr`s
(exclude_clause source::bexpr exprs::(* exclude_expr 1))
)
)

Expand Down Expand Up @@ -872,6 +875,7 @@ may then be further optimized by selecting better implementations of each operat
column_component
returning_mapping
assignment
exclude_op
)
)
)
Expand Down Expand Up @@ -934,6 +938,9 @@ may then be further optimized by selecting better implementations of each operat
)
)

(exclude exclude_expr)
(include (product exclude_expr root::int steps::(* exclude_step 1)))

// Replace statement.dml.target with statement.dml.uniqueId (the "resolved" corollary).
(with statement
(exclude dml)
Expand Down Expand Up @@ -1007,6 +1014,8 @@ may then be further optimized by selecting better implementations of each operat
// Notice that the physical window operator contains a list of window expression
// That is because, we want to combine the window functions that are operating on the same window to a single window operator
(window i::impl source:: bexpr window_specification:: over window_expression_list:: (* window_expression 1))

(exclude_clause i::impl source::bexpr exprs::(* exclude_expr 1))
)
)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import org.partiql.lang.eval.builtins.storedprocedure.StoredProcedure
import org.partiql.lang.eval.internal.builtins.SCALAR_BUILTINS_DEFAULT
import org.partiql.lang.eval.internal.builtins.definitionalBuiltins
import org.partiql.lang.eval.physical.operators.AggregateOperatorFactoryDefault
import org.partiql.lang.eval.physical.operators.ExcludeRelationalOperatorFactoryDefault
import org.partiql.lang.eval.physical.operators.FilterRelationalOperatorFactoryDefault
import org.partiql.lang.eval.physical.operators.JoinRelationalOperatorFactoryDefault
import org.partiql.lang.eval.physical.operators.LetRelationalOperatorFactoryDefault
Expand Down Expand Up @@ -87,6 +88,7 @@ class PartiQLCompilerBuilder private constructor() {
// Notice here we will not propagate the optin requirement to the user
@OptIn(ExperimentalWindowFunctions::class)
WindowRelationalOperatorFactoryDefault,
ExcludeRelationalOperatorFactoryDefault,
)

@JvmStatic
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,9 @@ import org.partiql.lang.ast.sourceLocation
import org.partiql.lang.eval.EvaluationException
import org.partiql.lang.eval.ExprValue
import org.partiql.lang.eval.ExprValueType
import org.partiql.lang.eval.StructOrdering
import org.partiql.lang.eval.internal.ext.isUnknown
import org.partiql.lang.eval.name
import org.partiql.lang.eval.namedValue
import org.partiql.lang.eval.internal.ext.name
import org.partiql.lang.eval.internal.ext.namedValue
import org.partiql.lang.types.StaticTypeUtils
import org.partiql.types.AnyOfType
import org.partiql.types.AnyType
Expand Down Expand Up @@ -228,19 +227,19 @@ internal class AnyOfCastTable(
val children = source.asSequence().map { cast(it) }

when (targetType) {
ExprValueType.LIST -> ExprValue.newList(children)
ExprValueType.SEXP -> ExprValue.newSexp(children)
ExprValueType.BAG -> ExprValue.newBag(children)
ExprValueType.LIST -> ListExprValue(children)
ExprValueType.SEXP -> SexpExprValue(children)
ExprValueType.BAG -> BagExprValue(children)
ExprValueType.STRUCT -> {
if (source.type != ExprValueType.STRUCT) {
// Should not be possible
throw IllegalStateException("Cannot cast from non-struct to struct")
}
ExprValue.Companion.newStruct(
children.zip(source.asSequence()).map { (child, original) ->
StructExprValue(
sequence = children.zip(source.asSequence()).map { (child, original) ->
child.namedValue(original.name!!)
},
StructOrdering.UNORDERED
ordering = StructOrdering.UNORDERED
)
}
else -> throw IllegalStateException("Invalid collection target type: $targetType")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at:
*
* http://aws.amazon.com/apache2.0/
*
* or in the "license" file accompanying this file. This file is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific
* language governing permissions and limitations under the License.
*/

package org.partiql.lang.eval.internal

import com.amazon.ion.IonStruct
import com.amazon.ion.IonValue
import org.partiql.errors.ErrorCode
import org.partiql.errors.Property
import org.partiql.lang.eval.BindingCase
import org.partiql.lang.eval.BindingName
import org.partiql.lang.eval.Bindings
import org.partiql.lang.eval.EvaluationException
import org.partiql.lang.eval.ExprValue
import org.partiql.lang.eval.internal.ext.namedValue
import org.partiql.lang.util.propertyValueMapOf
import org.partiql.lang.util.to

internal fun errAmbiguousBinding(bindingName: String, matchingNames: List<String>): Nothing {
err(
"Multiple matches were found for the specified identifier",
ErrorCode.EVALUATOR_AMBIGUOUS_BINDING,
propertyValueMapOf(
Property.BINDING_NAME to bindingName,
Property.BINDING_NAME_MATCHES to matchingNames.joinToString(", ")
),
internal = false
)
}

/**
* Custom implementation of [Bindings] that lazily computes case sensitive or insensitive hash tables which
* will speed up the lookup of bindings within structs.
*
* The key difference in behavior between this and other [Bindings] implementations is that it
* can throw an ambiguous binding [EvaluationException] even for case-sensitive lookups as it is
* entirely possible that fields with identical names can appear within [IonStruct]s.
*
* Important: this class is critical to performance for many queries. Change with caution.
*/
internal class IonStructBindings(private val myStruct: IonStruct) : Bindings<ExprValue> {

private val caseInsensitiveFieldMap by lazy {
HashMap<String, ArrayList<IonValue>>().apply {
for (field in myStruct) {
val entries = getOrPut(field.fieldName.lowercase()) { ArrayList(1) }
entries.add(field)
}
}
}

private val caseSensitiveFieldMap by lazy {
HashMap<String, ArrayList<IonValue>>().apply {
for (field in myStruct) {
val entries = getOrPut(field.fieldName) { ArrayList(1) }
entries.add(field)
}
}
}

private fun caseSensitiveLookup(fieldName: String): IonValue? =
caseSensitiveFieldMap[fieldName]?.let { entries -> handleMatches(entries, fieldName) }

private fun caseInsensitiveLookup(fieldName: String): IonValue? =
caseInsensitiveFieldMap[fieldName.lowercase()]?.let { entries -> handleMatches(entries, fieldName) }

private fun handleMatches(entries: List<IonValue>, fieldName: String): IonValue? =
when (entries.size) {
0 -> null
1 -> entries[0]
else ->
errAmbiguousBinding(fieldName, entries.map { it.fieldName })
}

override operator fun get(bindingName: BindingName): ExprValue? =
when (bindingName.bindingCase) {
BindingCase.SENSITIVE -> caseSensitiveLookup(bindingName.name)
BindingCase.INSENSITIVE -> caseInsensitiveLookup(bindingName.name)
}?.let {
ionValueToExprValue(it).namedValue(ExprValue.newString(it.fieldName))
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package org.partiql.lang.eval.internal

import com.amazon.ion.IonBlob
import com.amazon.ion.IonBool
import com.amazon.ion.IonClob
import com.amazon.ion.IonDatagram
import com.amazon.ion.IonDecimal
import com.amazon.ion.IonFloat
import com.amazon.ion.IonInt
import com.amazon.ion.IonList
import com.amazon.ion.IonSexp
import com.amazon.ion.IonString
import com.amazon.ion.IonStruct
import com.amazon.ion.IonSymbol
import com.amazon.ion.IonTimestamp
import com.amazon.ion.IonValue
import org.partiql.lang.eval.BAG_ANNOTATION
import org.partiql.lang.eval.Bindings
import org.partiql.lang.eval.DATE_ANNOTATION
import org.partiql.lang.eval.ExprValue
import org.partiql.lang.eval.GRAPH_ANNOTATION
import org.partiql.lang.eval.MISSING_ANNOTATION
import org.partiql.lang.eval.TIME_ANNOTATION
import org.partiql.lang.eval.internal.ext.namedValue
import org.partiql.lang.eval.time.Time
import org.partiql.lang.graph.ExternalGraphReader
import org.partiql.lang.util.bytesValue
import java.math.BigDecimal

/**
* Creates a new [ExprValue] instance from any Ion value.
*
* If possible, prefer the use of the other methods instead because they might return [ExprValue] instances
* that are better optimized for their specific data type (depending on implementation).
*/
internal fun ionValueToExprValue(value: IonValue): ExprValue {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ported from ExprValue.kt and renamed from ExprValue.of(value) to ionValueToExprValue(value)

return when {
value.isNullValue && value.hasTypeAnnotation(MISSING_ANNOTATION) -> ExprValue.missingValue // MISSING
value.isNullValue -> ExprValue.newNull(value.type) // NULL
value is IonBool -> ExprValue.newBoolean(value.booleanValue()) // BOOL
value is IonInt -> ExprValue.newInt(value.longValue()) // INT
value is IonFloat -> ExprValue.newFloat(value.doubleValue()) // FLOAT
value is IonDecimal -> ExprValue.newDecimal(value.decimalValue()) // DECIMAL
value is IonTimestamp && value.hasTypeAnnotation(DATE_ANNOTATION) -> { // DATE
val timestampValue = value.timestampValue()
ExprValue.newDate(timestampValue.year, timestampValue.month, timestampValue.day)
}
value is IonTimestamp -> ExprValue.newTimestamp(value.timestampValue()) // TIMESTAMP
value is IonStruct && value.hasTypeAnnotation(TIME_ANNOTATION) -> { // TIME
val hourValue = (value["hour"] as IonInt).intValue()
val minuteValue = (value["minute"] as IonInt).intValue()
val secondInDecimal = (value["second"] as IonDecimal).decimalValue()
val secondValue = secondInDecimal.toInt()
val nanoValue = secondInDecimal.remainder(BigDecimal.ONE).multiply(NANOS_PER_SECOND.toBigDecimal()).toInt()
val timeZoneHourValue = (value["timezone_hour"] as IonInt).intValue()
val timeZoneMinuteValue = (value["timezone_minute"] as IonInt).intValue()
ExprValue.newTime(
Time.of(
hourValue,
minuteValue,
secondValue,
nanoValue,
secondInDecimal.scale(),
timeZoneHourValue * 60 + timeZoneMinuteValue
)
)
}
value is IonStruct && value.hasTypeAnnotation(GRAPH_ANNOTATION) -> // GRAPH
ExprValue.newGraph(ExternalGraphReader.read(value))
value is IonSymbol -> ExprValue.newSymbol(value.stringValue()) // SYMBOL
value is IonString -> ExprValue.newString(value.stringValue()) // STRING
value is IonClob -> ExprValue.newClob(value.bytesValue()) // CLOB
value is IonBlob -> ExprValue.newBlob(value.bytesValue()) // BLOB
value is IonList && value.hasTypeAnnotation(BAG_ANNOTATION) -> BagExprValue(value.map { ionValueToExprValue(it) }) // BAG
value is IonList -> ListExprValue(value.map { ionValueToExprValue(it) }) // LIST
value is IonSexp -> SexpExprValue(value.map { ionValueToExprValue(it) }) // SEXP
value is IonStruct -> IonStructExprValue(value) // STRUCT
value is IonDatagram -> BagExprValue(value.map { ionValueToExprValue(it) }) // DATAGRAM represented as BAG ExprValue
else -> error("Unrecognized IonValue to transform to ExprValue: $value")
}
}

private class IonStructExprValue(
ionStruct: IonStruct
) : StructExprValue(
StructOrdering.UNORDERED,
ionStruct.asSequence().map { ionValueToExprValue(it).namedValue(ExprValue.newString(it.fieldName)) }
) {
override val bindings: Bindings<ExprValue> =
IonStructBindings(ionStruct)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package org.partiql.lang.eval.internal

import org.partiql.lang.eval.ExprValue
import org.partiql.lang.eval.Named
import org.partiql.lang.eval.stringify
import org.partiql.lang.util.downcast

/**
* An [ExprValue] that also implements [Named].
*/
internal class NamedExprValue(override val name: ExprValue, val value: ExprValue) : ExprValue by value, Named {
override fun <T : Any?> asFacet(type: Class<T>?): T? = downcast(type) ?: value.asFacet(type)

override fun toString(): String = stringify()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package org.partiql.lang.eval.internal

import org.partiql.lang.eval.BaseExprValue
import org.partiql.lang.eval.ExprValue
import org.partiql.lang.eval.ExprValueType
import org.partiql.lang.eval.OrdinalBindings
import org.partiql.lang.eval.internal.ext.namedValue

internal class ListExprValue(val values: Sequence<ExprValue>) : BaseExprValue() {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These sequence ExprValues were ported from ExprValue.kt. They were internal to partiql-lang. Needed to case on the concrete Java class name in ExcludeRelationalOperatorFactory, so ported to this file in partiql-eval.

override val type = ExprValueType.LIST
override val ordinalBindings by lazy { OrdinalBindings.ofList(toList()) }
override fun iterator() = values.mapIndexed { i, v -> v.namedValue(ExprValue.newInt(i)) }.iterator()

constructor(values: List<ExprValue>) : this(values.asSequence())
}

internal class BagExprValue(val values: Sequence<ExprValue>) : BaseExprValue() {
override val type = ExprValueType.BAG
override val ordinalBindings = OrdinalBindings.EMPTY
override fun iterator() = values.iterator()

constructor(values: List<ExprValue>) : this(values.asSequence())
}

internal class SexpExprValue(val values: Sequence<ExprValue>) : BaseExprValue() {
override val type = ExprValueType.SEXP
override val ordinalBindings by lazy { OrdinalBindings.ofList(toList()) }
override fun iterator() = values.mapIndexed { i, v -> v.namedValue(ExprValue.newInt(i)) }.iterator()

constructor(values: List<ExprValue>) : this(values.asSequence())
}

/**
* Returns an [ExprValue] created from a sequence of [seq]. Requires [type] to be a sequence type
* (i.e. [ExprValueType.isSequence] == true).
*/
internal fun newSequenceExprValue(type: ExprValueType, seq: Sequence<ExprValue>): ExprValue {
return when (type) {
ExprValueType.LIST -> ListExprValue(seq)
ExprValueType.BAG -> BagExprValue(seq)
ExprValueType.SEXP -> SexpExprValue(seq)
else -> error("Sequence type required")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,10 @@ internal enum class StructOrdering {
* Provides a [ExprValueType.STRUCT] implementation lazily backed by a sequence.
*/
internal open class StructExprValue(
private val ordering: StructOrdering,
internal val ordering: StructOrdering,
private val sequence: Sequence<ExprValue>
) : BaseExprValue() {
constructor(fields: List<ExprValue>, ordering: StructOrdering) : this(ordering, fields.asSequence())

override val type = ExprValueType.STRUCT

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import org.partiql.lang.eval.ExprValue
import org.partiql.lang.eval.ExprValueType
import org.partiql.lang.eval.NaturalExprValueComparators
import org.partiql.lang.eval.booleanValue
import org.partiql.lang.eval.internal.BagExprValue
import org.partiql.lang.eval.internal.ExprAggregator
import org.partiql.lang.eval.internal.errNoContext
import org.partiql.lang.eval.internal.ext.bigDecimalOf
Expand Down Expand Up @@ -173,7 +174,7 @@ internal class AccumulatorGroupAs(
exprValues.add(value)
}

override fun compute(): ExprValue = ExprValue.newBag(exprValues)
override fun compute(): ExprValue = BagExprValue(exprValues)
}

private fun comparisonAccumulator(comparator: NaturalExprValueComparators): (ExprValue?, ExprValue) -> ExprValue =
Expand Down
Loading
Loading