Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: various fixes for #252 (write and DDL relations) and #284 (relation references) #288

Closed
wants to merge 1 commit into from

Conversation

jvanstraten
Copy link
Contributor

  • add new relation types to rel_type to make them usable
  • add constraints to prevent cyclic relation references
  • document how relation references work in the relation basics section
  • s/tuples/records/g for naming consistency
  • move ReferenceRel out of the AggregateFunction message scope

BREAKING CHANGE: various messages and semantics that were not yet reachable from the
Plan message were changed

…ubstrait-io#284 (relation references)

 - add new relation types to rel_type to make them usable
 - add constraints to prevent cyclic relation references
 - document how relation references work in the relation basics section
 - s/tuples/records/g for naming consistency
 - move ReferenceRel out of the AggregateFunction message scope

BREAKING CHANGE: various messages and semantics that were not yet reachable from the
Plan message were changed
@jvanstraten
Copy link
Contributor Author

To address problems that @westonpace and I found in #252 and #284 after they were already merged.

@curino, I was pretty confused by some of the proto comments relating to WriteRel. I wrote FIXME lines where applicable. Could you please elaborate in review threads or something? I'll mark this as ready for review when I'm clear on what's going on and have removed those lines.

Copy link
Contributor

@jacques-n jacques-n left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for picking up these cleanups @jvanstraten.

@@ -333,7 +333,7 @@ doing `ReferenceRel(0) JOIN D`. This allows to avoid the redundancy of `A JOIN B

| Property | Description | Required |
|-----------------------------|--------------------------------------------------------------------------------| --------------------------- |
| Referred Rel | A zero-indexed positional reference to a `Rel` defined within the same `Plan`. | Required |
| Referred Rel | A zero-indexed positional reference to a `Rel` defined within the same `Plan`. The index must be less than the index of the relation tree that the reference appears in; put differently, you can only refer to trees that have already been declared. This avoids cyclic dependencies and forward references. | Required |
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not convinced we need the constraint of no forward references. I'm fine with it, just not convinced it is necessary. If you feel strongly, sounds good.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like a fine sufficient condition, not necessary, but loop-checks are maybe trickier?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's indeed not a necessary constraint to ensure there are no loops. But the point of Substrait is to be machine-readable and machine-writable, right? So here's an algorithmic argument.

On the producer side, a valid algorithm to convert any DAG to a list of relations that satisfies this constraint is pretty trivial; emit all nodes without in-edges, follow their out-edges to the inputs of other nodes to replace those in-edges with relation references using the indices that were just emitted (removing the edges), repeat until all nodes have been emitted. With the right data structure for the DAG this is O(n). In fact, the majority of the algorithms I've managed to come up with for this give you a correct ordering (or its reverse) naturally. The only one I've come up with that doesn't is to first generate relation indices by a random walk over the node list, which, in fairness, might be a natural way to do it if you're storing references to all nodes in a list with no particular order vs. relying on edge references (that would be a directed graph data structure though, not a guaranteed-acyclic graph data structure).

On the consumer side, the algorithm with this constraint is equally simple: starting at the front of the list, emit nodes and keep an index-to-node-reference map as you do, replacing relation references with edges by resolving the indices using the map. You're now guaranteed to have a DAG. Error-checking happens naturally because the index-to-node resolution will fail if the constraint is not satisfied. O(n) complexity.

If the consumer does cannot rely on this constraint, the algorithm I would probably use is to loop over the list to ensure each node is created, creating dependent nodes earlier as needed using recursion. Problem is, this gives you an arbitrary directed graph, which means you have to go out of your way to write validation to detect cycles. And the easiest way to do that is... to see if it's possible to create a node ordering that satisfies exactly the constraint I'm proposing (or its inverse), by actually running a constructive algorithm for such an ordering and seeing if it will fail. It'd be pretty tempting for a consumer to just not check, though... and now you have guaranteed memory leaks if you were using reference counting for the node edges and probably a deadlock, stack overflow, or infinite runtime when actually executing the plan. This might never even make it to a test case, in which case it might surface as a DoS exploit ten years later instead.

So, to be honest, the only arguments I see for not having the constraint is to let humans write nodes in any order they want or make it easier for people to write subtly broken consumers.

WRITE_OP_INSERT = 1;
// The removal of tuples from a table
// The removal of records from a table
// FIXME: how are the records to be deleted identified? It's stated that schema(input) must match the table schema, so is it equality? Does that mean that it's impossible to differentiate between different records with the same contents, i.e. it's not possible to match positionally?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@curino and I had a long conversation about this offline. We should have done a better job of capturing that discussion here. Yeah, I agree this is a problem. In some circumstances you can imagine from input that there is a implicit rowId column that comes from the input to the delete. However, there are also scenarios where you might be using a different pattern and those should be illegal. @curino wasn't as sold on the problem so we punted on it until someone was actually trying to implement delete.

Copy link
Contributor Author

@jvanstraten jvanstraten Aug 23, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SQL delete is based on a condition, which makes sense to me, so given this structure, I would expect a delete operator in Substrait to take a schema with a single boolean as its relation input... though on second thought, that doesn't really make sense either, because that boolean could be derived from a different table entirely. It's also not obvious at all how you'd make use of indices in that case.

Okay, I guess I see the problem. I imagine @curino had a structure like read(table X) -> filter(condition) -> write(delete from table X) in mind for deletions? The problem with that is if I would write read(table Y) -> filter(condition) -> write(delete from table X) it would be legal by these rules if table X and Y happen to have the same schema, but equal schema doesn't mean equal table. Likewise I could stick a projection in there to make the schema match and it'd still be nonsense.

I feel like the problem is that we're trying to push a square peg in a round hole here. Why are we trying to make these fundamentally different operators satisfy normal relation semantics, and the exact same semantics across a number of them to boot? I didn't want to butt into this discussion at the time because I didn't feel qualified, but maybe I should have...

IMO, update and delete should be a different operator entirely, specifically one without an input. But if we would want to stick to a single relation type for modifications, I would define it like this, based on my limited knowledge of SQL:

message DdlRel {
  RelCommon common = 1;

  // Table to operate on.
  oneof read_type {
    LocalFiles local_files = 2;
    NamedTable named_table = 3;
    ExtensionTable extension_table = 4;
  }

  // Current table schema. If not specified, the table does not currently
  // exist.
  NamedStruct current_schema = 5;

  // Optional row deletion expression. Operates on the table schema directly.
  // Only legal for preexisting tables, i.e. base_schema is specified. Must
  // yield a non-nullable boolean if specified; if true, a record is deleted,
  // if false, it is retained. If not specified, all records are retained
  // (same as literal false). Set to literal true to model a SQL truncate
  // operation.
  Expression delete = 6;

  // Optional filter expression for update operations. Operates on the table
  // schema directly. Only legal for preexisting tables, i.e. base_schema is
  // specified. If delete is also specified, this is evaluated only on the rows
  // that were retained. Must yield a non-nullable boolean if specified; if
  // true, a record is selected for updating, if false, it is not selected. If
  // not specified, all records are updated (same as literal true).
  Expression filter = 7;
  
  // Specifies the new value for each column. If specified, there must be
  // exactly as many Column messages as there are columns in the (altered)
  // schema. Must be specified when the schema is altered. Only legal for
  // preexisting tables, i.e. base_schema is specified.
  repeated Column update = 8;

  message Column {
    // Optional expression to use to update the column. The return type must
    // match the column type as per the (altered) schema exactly. If not
    // specified, the column is not updated. All columns must be specified if
    // the schema is being altered and delete is not set to literal true. The
    // expression operates on the original table schema, and is only evaluated
    // for records where the filter expression (if specified) returned true
    // and the delete expression (if specified) returned false.
    Expression expression = 1;
  }
  
  // Optional relation to produce records that are to be inserted into the
  // table. The schema returned by the relation must match the (altered) schema
  // of the table exactly. For ordered storage, the rows are expected to be
  // appended to the end of preexisting records.
  Rel insert = 9;

  // If specified for a preexisting table, the table schema is altered to the
  // given schema. Cannot be used in conjunction with filter; i.e. it's illegal
  // to specify both. Must be specified if there is no preexisting table, i.e.
  // current_schema is not specified.
  NamedStruct new_schema = 10;

  // Extension for additional schema information, such as index specifications.
  substrait.extensions.AdvancedExtension new_schema_extension = 11;
  
  // SQL operations vs. fields populated:
  //
  // | Operation   | current_schema | delete | filter | update | insert | new_schema |
  // |-------------|----------------|--------|--------|--------|--------|------------|
  // | CREATE      | -              | -      | -      | -      | ?      | specified  |
  // | INSERT INTO | specified      | ?      | ?      | ?      | VALUES | -          |
  // | UPDATE      | specified      | ?      | WHERE  | SET    | ?      | -          |
  // | DELETE FROM | specified      | WHERE  | ?      | ?      | ?      | -          |
  // | ALTER       | specified      | ?      | -      | *      | ?      | specified  |
  // | TRUNCATE    | specified      | true   | -      | -      | ?      | ?          |
  //
  // - : should not be specified/illegal
  // ? : optional, but not possible to specify using SQL
  // * : new columns would be set to literal null for a normal ALTER, but any
  //     initial value or projection can be used
  
  // What to return. Required.
  oneof return_type {
    // Return an empty schema with zero rows (as close to nothing as
    // we can get in Substrait's relation tree system).
    google.protobuf.Empty nothing = 12;

    // Return the affected rows (deleted or updated) as they were before the
    // update. Inserted rows are not returned.
    google.protobuf.Empty pre_image = 13;

    // Return the updated rows as they were after the update. Deleted rows are
    // not returned.
    google.protobuf.Empty post_image = 14;

    // Return the field-wise concatenation of the pre-image and post-image,
    // with all top-level fields aka column types made nullable. The pre-image
    // part of a record will be nullified for inserted columns, the post-image
    // part of a record will be nullified for deleted columns. Only affected
    // rows (deleted, updated, or inserted) are returned.
    google.protobuf.Empty affected_pre_and_post = 15;
    
    // Like affected_pre_and_post, but also returns rows that were not
    // affected.
    google.protobuf.Empty all_pre_and_post = 16;
  }
}

But I understand if it's a bit late for that.

ETA: DELETE TABLE is missing from this, but I don't see that in the current definition either to be fair.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jvanstraten thanks for looking into this. I think the challenge @jacques-n and I were faced with is that SQL supports more complex statements where you can write a nearly arbitrary query that affects a DELETE or UPDATE statement. A couple of examples:

UPDATE wine w
SET stock = stock - (
                     SELECT SUM (quantity)
                     FROM order
                     WHERE date = CURRENT_DATE AND order.wine_name = w.name
                    )

DELETE table-name1
  FROM table-name1 
  JOIN table-name2 ON column-name3 = column-name4
 WHERE condition

This made us prefer reusing the Rel structure, vs building out a separate dedicated scheme. You are right that you could construct a non-sensical plan, but this is true in general (hence your awesome validator work). The intuition is what you describe, where the input selects the tuples we want to affect and either describe its after-image for updates, or the tuples to be deleted. SQL being declarative there is no way to say "delete the first occurrence of this identical tuples", as a consequence folks are used to workarounds such as primary keys or rowIds (per @jacques-n's comment) in the less common scenario where separate handling of cloned tuples are required. Also I believe the Expression approach above might suffer of the same limitation right?

I am not attached to this implementation, and happy to brainstorm/support variations.

I think we should also soon build-out a "golden dataset" of queries we aim to support so that we can iterate variations and compare them against that (or at least convince ourselves we can cover all cases that matter). Folks at MS are starting to using this in practical settings and we will stumble on any detail issue and come back and address them, but if you have ideas/time to work on this, super happy to collaborate.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also for the pre/post image I liked the structure we came up with with @jacques-n as it was rather minimal and allowed us to stitch together plans that do any sort of pre/after image with very few additions/fields.

I think beside better comments we should soon get to have a bunch of example SQL->Plan listed in the docs to explain what we mean (I have been hesitating not to overly SQL-ify our website).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, after a few hours of typing (most of which I've now edited out), I think I've narrowed down where my misunderstanding comes from. I now agree that, with some further specification, WriteRel could work, but with the following limitations:

  • It will only ever work on key-value-based storage. Anything that identifies rows implicitly by some index or pointer or whatever (columnar formats, row-oriented formats, stream-based systems, etc.) can never work with this, because the identifying information is by definition not part of the data.
  • An update operation can never modify key columns. I don't know if this is also a limitation of SQL update statements, but if so, IMO this would be a pretty big limitation to just copy-paste from SQL without further thought.
  • Records with equivalent data cannot be told apart, though I suppose that's a limitation of key-value-based storage in the first place.

These feel like pretty hefty limitations, considering that the rest of Substrait works just fine for non-key-value-based systems, and considering that Substrait currently doesn't even acknowledge the concept of key columns in the first place. However, I'm not the one who has experience implementing query engines, so I'll yield if you guys think these limitations are perfectly acceptable.

That being said, my proposal does not suffer from these limitations. The key difference is that an expression is defined to be evaluated for each record individually, so any implicit information identifying the record can stay implicit in the execution context of that expression. A consumer could even define a function extension that allows a user to query this implicit information for use in the expression; for example, you could define a function that returns the index of the record in the current iteration, if that is meaningful information for that particular system. It would be, for instance, for a read from a columnar format, which has implicit ordering (even if Substrait does not in general require a meaningful ordering from a read).

Also, I'm confident that I can convert everything that the current WriteRel supports to my proposal. You mention deletions and updates as pain points, so here goes. Delete:

delete = in_predicate_subquery(
  needles = (field references to key columns)
  haystack = (
    project(
      emit only key columns,
      <relation input goes here>
    )
  )
)

That's pretty much just making explicit what a key-value-based delete operator actually does. I imagine a key-value-based system would special-case that syntax.

Update is uglier:

update[i] = coalesce(
  scalar_subquery( // I'm assuming this yields null if there is no input
    fetch( // just to avoid nonsense if the relation yields equivalent rows
      count = 1,
      project(
        emit only column i,
        filter(
          condition = (key columns in inner query match key columns in outer query),
          <relation input goes here>
        )
      )
    )
  ),
  field(i)
)

but only because I'm lacking a proper struct-building expression (and this is not the first time I'm running into this by far; other things it would make redundant are the repeated expressions in aggregate grouping sets, the 2D version of or_list, and struct literals; you could do the same for list and map too and avoid lots of literal shenanigans in fact). It would be much more natural to have only one update expression and require it to yield a struct that matches the schema, rather than requiring individual expressions. That would also solve the problem of differentiating between "don't update the record" (would be a null struct) and "replace a field in the record with null" (would be a struct containing null). The whole thing in Substrait where the schema of a row is defined using the same abstraction as a struct is awesome; I don't see why we're not using it more. But I digress. Assuming that expression would exist, it would become:

update = coalesce(
  scalar_subquery(
    fetch(
      count = 1,
      project(
        expression[0] = struct(field(0) ... field(n-1)),
        emit only above expression,
        filter(
          condition = (key columns in inner query match key columns in outer query),
          <relation input goes here>
        )
      )
    )
  ),
  struct(field(0) ... field(n-1))
)

Even better would be if the field reference would be generalized to be able to return the entire record as a struct, rather than requiring at least one index operation. Then the struct(field(0) ... field(n-1)) subexpressions would just become that generalized reference type. I suppose technically you could already do it with a mask expression, not sure if it would be considered legal to apply that to RootReference and OuterReference though.

The particular update query you listed can be represented much more naturally, though,

update[row index of stock] = subtract(field(row index of stock), scalar_subquery(...))

but if SQL supports IMO arcane syntax like the delete statement for updates as well, I imagine it would be harder to generate them like this from SQL in general. That being said, I don't think we should be considering a SQL-to-Substrait producer as the golden standard for how things should be represented in Substrait anyway. There are more producers being worked on than just Isthmus; if Substrait intends to be a generalized interchange format for queries, I don't see why it should receive special treatment. Substrait is not supposed to just be a SQL syntax tree, right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @jvanstraten. I see your points. Some could be dealt with by constraining which Rel can be in the body of an input but covering all corner cases might be tricky (and leave them unspecified scatchy ;-)).

Let's maybe chat about it in the next community meeting (briefly) and schedule some time for me/you/ @jacques-n and anyone else interested to sit down together and close one this, the async long messages mode is not very efficient to drive convergence.

@jcamachor offline let's consider the UPDATE = INSERT + DELETE and see how hard would it be for a consumer to reconstruct the UPDATE semantics. Having that clear in our heads before meeting with @jvanstraten could help the conversation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, sorting this out in a call (at least to converge on the basics) works for me.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hijacking this for another thing I only just put one and one together for while writing this: I don't see how WriteRel can ever consistently work for write-only files, because it relies heavily on associative table and read-modify-write semantics. That means it's not the complementary operation of ReadRel. That's, honestly, terrible and confusing. Can we just rename it to UpdateRel or something and reserve WriteRel for a pure sink operation? AFAICT a pure WriteRel would replace CTAS, though...?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jvanstraten and @jacques-n we should find time to talk!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@curino It took me this long just to figure out how to put this, but unfortunately some things unrelated to this have happened recently that have made me very wary of continuing to contribute to Substrait. I'm finding it exceedingly and increasingly difficult to formulate my arguments and opinions in ways that are not considered to be hostile or disrespectful, in the end (to no avail) spending more than a day on a single comment to not be dishonest about how frustrated I am while also not stepping on anyone's toes. Additionally, my way of working doesn't seem to mesh well with open-source development. As such I don't think it would help anyone for me to continue contributing, and I have very little desire left to do so myself. That being said, I did enjoy working with you, so I'm sorry things turned out this way :(

I suppose I'll leave this PR open because of all the relevant discussion in here and because one of the things offense was taken to was me closing my own PRs before being completed, but I do not intend to continue working on this. It should probably be converted to issues insofar that is deemed necessary and then closed when deemed appropriate. Either way I'll leave the branches up on my fork for future reference.

@CLAassistant
Copy link

CLAassistant commented Oct 6, 2022

CLA assistant check
All committers have signed the CLA.

@CLAassistant
Copy link

CLA assistant check
Thank you for your submission! We really appreciate it. Like many open source projects, we ask that you sign our Contributor License Agreement before we can accept your contribution.
You have signed the CLA already but the status is still pending? Let us recheck it.

EpsilonPrime pushed a commit that referenced this pull request Dec 11, 2023
The `ReferenceRel`, `WriteRel`, and `DdlRel` were defined in
`algebra.proto` but not part
of `message Rel` which meant they were unusable. This PR adds those
back. It is
inspired by #288 but more targeted in scope. One change from that
original PR which I
also kept was replacing the word `tuple` with `record` in the
documentation for consistency.

This is not to imply that `ReferenceRel`, `WriteRel`, or `DdlRel` are
"complete" or "stable"
in any way. I feel these relations are still quite ill defined. However,
my hope is that by
making them usable we can inspire further change to them.

BREAKING CHANGE: The enum `WriteRel::OutputMode` had an option change
from
`OUTPUT_MODE_MODIFIED_TUPLES` to `OUTPUT_MODE_MODIFIED_RECORDS`
BREAKING CHANGE: The message `AggregateFunction.ReferenceRel` has moved
to `ReferenceRel`.
@jacques-n
Copy link
Contributor

Closing due to lack of progress in a year and another ticket solving the catalyst issue.

@jacques-n jacques-n closed this Jul 16, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants