Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support qualified columns in queries #55

Merged
merged 38 commits into from
Jun 22, 2021
Merged

Conversation

houqp
Copy link
Member

@houqp houqp commented Apr 25, 2021

This turned out to be a much larger/destructive PR than I initially expected. Would like to get some early feedback on the approach before I spend more time working on the clean up. So far I have been able to get all unit tests to pass.

TODO:

  • Address FIXMEs and TODOs
  • Check integration tests
  • Rebase to latest master

Which issue does this PR close?

closes #56
closes #57
closes #311

What changes are included in this PR?

Here is the main design change introduced in this PR:

  • Physical column expression now references columns by unique indices instead of names
  • Logical column expression now wraps around a newly defined Column struct that can represent both qualified and unqualified columns

Query execution flow change:

  1. When a TableScan plan has table_name set to Some(name), all of the fields in its schema will be created as fully qualified fields.
  2. Logical plan builder is responsible for normalizing all logical column expressions by adding qualifier based on schema wherever applicable.
  3. Logical plan optimizer operates on normalized column expressions.
  4. During physical planning, logical column expressions are resolved to physical column expressions with corresponding indices based on logical plan schemas. Notice a side effect of this is we don't look up column index during execution anymore. It is now done at planning time.
  5. During physical planning, physical schema (arrow schema) has all column qualifiers stripped.

Some other changes introduced in this PR to help make all tests pass:

  • avoid coalesce for hash repartition plan
  • added partitioned hash join tests to hash_join module
  • added support for join with alias (for self join) support table alias in join clause #547
  • added join_using method to logical plan builder
  • fixed cross join handling in projection push down in optimizer (schema fields not trimmed based on pushed down table scan projections)
  • fixed join handling in projection push down in optimizer (schema fields not trimmed based on pushed down table scan projections)
  • produce unique join columns when using join constraint is being used
  • ser/de physical plans in ballista without going through physical planner
  • fixed couple other bugs here and there along the way, but couldn't remember :(

Output field name are not 100% conforming to https://github.com/apache/arrow-datafusion/blob/master/docs/specification/output-field-name-semantic.md yet. The remaining differences are all minor formatting issues like converting function names to lower cases, so I decided to leave that to follow up PRs to reduce the diff.

Are there any user-facing changes?

breaking api changes:

  • Column expression now wraps Column struct instead of String
  • TableScan plan now takes table_name as Option instead of String
  • Various dataframe scan method now takes table name as Option instead of &str
  • Physical Column expression now requires index field
  • logical planer builder join method now takes left and right keys as Vec<impl Into<Column>> instead of &[&str]

Copy link
Member

@jorgecarleitao jorgecarleitao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I gave it a look and it looks great. 💯

If I understood correctly, there are two main changes:

  1. At the logical plane, a column is no longer refereed by String, but by the struct Column, that may contain an optional qualifier. I think that that makes a lot of sense, and I can see that we can do a lot of things with this, as you demonstrate in this PR.

  2. At the physical plane, a column is not longer refereed by String, but by a string and index.

The planner is responsible for mapping the logical names into the physical columns, mapping full qualifier -> (name, index) in the schema.

They make sense to me.


In the SQL API, does this change the resulting schema? I.e.

SELECT a FROM t1

results in an output column named t1.a or a?

In the DataFrame API, does this change the resulting schema? I.e.

df = ctx.table("temp")?;
df.select("a").collect().schema().fields()[0].name()

results in "temp.a" or "a"?

@Dandandan
Copy link
Contributor

This is really cool @houqp thanks for doing this!

It's looking good to me, I added some thoughts

/// relation/table name.
pub relation: Option<String>,
/// field/column name.
pub name: String,
Copy link
Member

@andygrove andygrove Apr 25, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we consider supporting compound names here so that we can eventually support nested types? For example SELECT t1.customer.address.zip, ...

Perhaps Column could be an enum?

enum Column {
  Simple(Option<String>, String),
  Compound(Option<String>, Vec<String>)
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest we track this as a follow on ticket

@Dandandan
Copy link
Contributor

I think as part of this PR we can also have a look whether query 7/8 of the tpch benchmark are succeeding and add them to the regression test if they do :)

@houqp houqp marked this pull request as draft April 25, 2021 17:30
@houqp
Copy link
Member Author

houqp commented Apr 25, 2021

@jorgecarleitao your understanding on column changes in logical and physical planes are correct. I would add that in physical plane, the string column name is probably not needed. We are currently only using the index field for evaluation. I kept it mostly for debugging purpose. But given the column name info is also available in physical schema fields, I think it should be safe to only store index in physical column expressions.

The answer to your schema change question is a little bit tricky, let me try to clarify the new behavior. In short, it changes the field names in logical plan schemas because we require all columns to be normalized when building the plan. For physical schemas, there should be no change for column names except when columns are wrapped with operators.

Using your SQL query as an example:

SELECT a FROM t1

The logical schema field will be normalized to t1.a. However, the final execution output will have a physical/arrow schema with a field a. The qualifier is stripped during physical planning at: https://github.com/houqp/arrow-datafusion/blob/8ecc215bb7fe44d8cf9dcb4b90df753f0c50afb7/datafusion/src/physical_plan/planner.rs#L483-L486

For DataFrame API, the behavior is the same since both SQL and Dataframe go through the same query builder interface:

df = ctx.table("temp")?;
df.select("a").collect().schema().fields()[0].name()

The above code will result in a. So far this is the same as what datafusion does today. The difference comes in when operators are involed, for example:

SELECT a, MAX(b) FROM t1

This will result in two unqualified fields a and MAX(t1.b). Here are some more related sample outputs in the tests:

Basically I made sure the behavior is consistent with MySQL, Postgresql and Spark.

@alamb
Copy link
Contributor

alamb commented Apr 26, 2021

FWIW I plan to look at this tomorrow more carefully

When a projection is pushed down to cross join inputs, fields from
resulting plan's schema need to be trimmed to only contain projected
fields.
Copy link
Member

@jorgecarleitao jorgecarleitao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @houqp for the explanation.

I think that it is worth reiterating the invariants about logical planning that I have been preserving and striving to preserve in DataFusion, which most of our tests validate. IMO breaking these invariants has a major impact to consumers.

I think that this draft is currently breaking many of them. So, if anything we need a design document describing exactly why we do not hold these invariants in DataFusion, and what invariants the consumers can expect from DataFusion.

Some of them are described in detail here, but let me try to summarize:

What we write is what we get

Examples:

  • SELECT a FROM t must result in the schema field name a
  • SELECT t.a FROM t must result in the schema field name t.a
  • SELECT COUNT(t.a) FROM t must result in the field name COUNT(t.a)
  • SELECT count(t.a) FROM t must result in the field name count(t.a)
  • SELECT count(a) FROM t must result in the field name count(a)

For DataFrames, this means:

df.select("a").collect() must result in the schema field name a

logical optimizations must preserve schemas

I.e. for all optimizations opt, schema(plan) == schema(opt(plan))

logical optimizations must preserve semantics

I.e. for all optimizations opt, collect(plan) == collect(opt(plan))

@alamb
Copy link
Contributor

alamb commented Apr 27, 2021

FYI I added #211 to track "making the invariants document easier to find / ensure everyone is aligned"

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am with @jorgecarleitao think changing the output from AVG(c2) to AVG(test.c2) is not really an improvement, and will require many changes in downstream consumers of datafusion ( it certainly would for IOx)

FWIW postgres doesn't do either DataFusion's current behavior nor the behavior proposed in this PR:

alamb=# select sum(value) from example;
 sum 
-----
   4
(1 row)

alamb=# select Sum(value) from example;
 sum 
-----
   4
(1 row)

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First of all, thank you very much @houqp for taking this on 💯 .

I like the high level design changes ✔️ :

  • Physical column expression now references columns by unique indices instead of names
  • Logical column expression now wraps around a newly defined Column struct that can represent both qualified and unqualified columns

I did not carefully review this PR (it is too big!) and I think it would be easier to review if it could be broken down into smaller parts.

Maybe start by adding support for qualified column references ColumnRef (and erroring at physical plan time if there are duplicate names)?

I also saw some changes to UNION logic, which I don't fully understand, but perhaps could get their own tests / PR

@@ -141,7 +137,7 @@ pub enum LogicalPlan {
/// Produces rows from a table provider by reference or from the context
TableScan {
/// The name of the table
table_name: String,
table_name: Option<String>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't fully understand how a table scan (which results in a relation) can end up without a name. What is an example of when we need to use a None table_name?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is needed because all the non-table scan all eventually goes into TableScan, for example: https://github.com/apache/arrow-datafusion/blob/245f0b8a68c5763a236aef3e727f0502188d0bfa/datafusion/src/logical_plan/builder.rs#L110-L117. Currently we are using empty str '' to imply scan without table names, which makes the relation name match a little bit less readable downstream in the code base.

Copy link
Contributor

@alamb alamb Apr 28, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess I don't fully understand how we could be scanning relations that don't have names (like how do you refer to them in SQL?) As I recall, postgres doesn't use the term 'table name' and instead uses the word 'relation name' as the name can come from a table, or an alias, or subquery (and probably others I am not remembering now)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In SQL this would never happen. It's more of a thing in spark land where for simple queries, people usually just load a csv or parquet partition into a dataframe without any table registration. So for example, for dataframe users, they could start with a Context.read_csv call and only reference columns by unqualified names in subsequent transformations.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see -- thank you for the explanation. In this case in DataFusion, we could perhaps register the data with either a temporary name (or the csv file name or something).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated code to assign ?table? as name for unnamed tables. TableScan name is not an Option anymore.

@jorgecarleitao
Copy link
Member

So, having done this type of exercises before in DataFusion, I tend to agree that these things are really difficult to divide in smaller PRs, as they touch delicate balances in the code.

I agree that the current way we handle names has reached its limits, and thus I am super happy to see this PR.

wrt to the invariants, I do think that we should be able to address them here. I am still trying to understand why we need to break invariants to enable qualified column names. This is the gist of my comment: are these changes by design, a theoretical impediment whereby we can't have both qualified names and the invariants, or derived from this still being work in progress?

@houqp
Copy link
Member Author

houqp commented Apr 27, 2021

Thanks @jorgecarleitao for the invariant design doc, I wasn't aware of it before. I will definitely go through it and write something up for this PR this weekend. I think we should spend some effort into adding that doc into this repo as well. This is the kind of reference I have wanted to have while working on the patch set.

My first attempt of this does preserve the invariant of SELECT t.a FROM t must result in the schema field name t.a, but I later found out this would cause issue with subsequent read from writes. Basically the reader will end up with something like t.t.a. So I changed the behavior to be consistent with Spark and MySQL.

For breaking changes in schema fields involving operators, I will do more research on my end. The current behavior is the one with the least amount of work for me. But I am leaning towards better compatibility with the current data eng ecosystem like spark, mysql and postgres.

I fully agree with the optimization invariants. I have been thinking about whether there is a better way to enforce them more formally other than unit tests.

@jorgecarleitao to your last question, all the invariants, other than the first one I mentioned in this comment, are pretty straight forward to maintain by tracking the user provided input in the column struct. I didn't do it because I wasn't aware of these invariants.

@alamb I am with you for the size of this PR as well :( That's why I decided to send it out early before finishing it because I noticed it just kept growing in size especially after every time I merge with master :P. I will try to see if I can restrict the change to just logical plans, but as @jorgecarleitao mentioned it might not be easy. I will do it if the overhead turned out to be small.

Anyway, putting code change aside, I think the most urgent thing to do is for me to write up a design doc to fully spec out the semantics so we are all on the same page on what needs to be implemented.

@houqp
Copy link
Member Author

houqp commented Apr 29, 2021

@jorgecarleitao looking more into the logical optimization invariants, I think we might want to relax it a little bit to account for some optimizations that may change column orders: https://github.com/apache/arrow-datafusion/blob/57eeb64659b9ca9c496a959f7716090fb32085b6/datafusion/src/optimizer/hash_build_probe_order.rs#L122-L133.

So basically something like this:

  • If projection plan is the root node, then we can guarantee strict schema invariants for logical optimization, i.e. we preserve the exact same schema field vector.
  • If root node is not a projection plan, we only guarantee same set of schema fields to be preserved, but not the order

From the user's point of view, it also makes sense since if I am executing a query like SELECT * FROM t, I am basically saying just give me all the columns in whatever order.

Technically, we could still enforce strict schema invariants for all plans by manually wrapping a projection plan when the outer plan is not a projection. But I think this adds unnecessary execution overhead for minor semantic gain.

Interesting in what others think about this.

@jorgecarleitao
Copy link
Member

@houqp , let me clarify the context of the invariants: back in August 2020, I started looking into improving the schema invariants, as DataFusion was not preserving any. Before committing to any work, I outlined the invariants that I though were relevant, shared it around, and worked to enforce them. Please by no means consider it to be like something that "the community" agreed; it was more like a guideline for me, which at the time some people read it through. I felt the need to have some design choices to guide my development at the time to arrive at a consistent state.

Wrt to your question, I think that you are right: taking the assumption column names are unique, it follows that the order of columns in a schema does not need to be preserved because it is expected that consumers access columns by name, not by index. So, I think that the == in schema(plan) == schema(opt(plan)) should represent the same metadata, set (in a mathematical sense) of fields. We may want to define this equality somewhere in DataFusion so that we can use it whenever we want to assert the invariant (since the compiler can't do it for us).

I also agree that we could also preserve column order: in the case of the join, we would need to make the side probe optimization to be a physical, not logical, optimization (and write the batches with the correct order at the end), and in * we would define a rule for column order.

I do not have an opinion in either direction; I just think that it is useful to be explicit about these so that our users can have a mental model on how column names behave and how they access them (can I rely on stable column names?, can I rely on stable indexes?, etc.).

@andygrove
Copy link
Member

Thanks, @houqp! I'm excited to see this work. I have had a few attempts in the past at implementing some of these changes myself and they always ended up being hugely disruptive. I look forward to seeing the design document and will comment more once that is up.

One more thing we should think about is working with schemaless data sources. This isn't something we have really talked about yet and I have a concern about moving from resolving columns by name to using indices because it might limit our ability to support this use case in the future.

@houqp
Copy link
Member Author

houqp commented Apr 29, 2021

@andygrove could you give some examples for schemaless data sources? I would like to incorporate that into the design doc.

@houqp
Copy link
Member Author

houqp commented Jun 20, 2021

looks like integration test is failing due to mismatch between logical schema and physical schema for window queries, i will dig into this.

@jimexist
Copy link
Member

jimexist commented Jun 20, 2021

looks like integration test is failing due to mismatch between logical schema and physical schema for window queries, i will dig into this.

thanks, it might be because of some edge cases that i did miss. i didn't find time to fix it but so far this query will fail:

> select max(c1) over () max_c1, max(c1) over (partition by c2) max_c1_by_c2 from test;
Plan("Schema contains duplicate unqualified field name 'MAX(c1)'")

@jimexist
Copy link
Member

created #592 to address this

@houqp
Copy link
Member Author

houqp commented Jun 20, 2021

after digging more into this, the bug seems to be related to the projection push down optimizer, we are breaking the Logical schema is invariant under logical optimization invariant for both window and aggregate plans. This issue has been in our code base for awhile, but only surfaced through the newly added window order by integration test.

@houqp
Copy link
Member Author

houqp commented Jun 20, 2021

@alamb, turns out the bug you demoed is not simple to fix and surfaced a problem in how join columns are handled in the current design. Based on what I have seen in MySQL and PostgreSQL, join columns deduplication should only be applied to join clauses with USING constraints. We are currently applying the deduplication for all join types other than semi/anti joins.

I have a fix in houqp@7636787, which touched a lot of files and is a 600+ LoC diff in itself.

So considering this bug also exits in the current master, I think it would be easier to merge the current reviewed PR as is. Then we can focus on my fix to discuss whether the change in join column handling logic is the right move or not. I am also not very happen with how my fix is implemented, so would love to get some ideas on alternative implementations as well.

@alamb
Copy link
Contributor

alamb commented Jun 21, 2021

So considering this bug also exits in the current master, I think it would be easier to merge the current reviewed PR as is. Then we can focus on my fix to discuss whether the change in join column handling logic is the right move or not. I am also not very happen with how my fix is implemented, so would love to get some ideas on alternative implementations as well.

I agree this is the best course of action.

Do you think this is ready to merge @houqp ?

@houqp
Copy link
Member Author

houqp commented Jun 21, 2021

@alamb yes, this is ready to merge.

@alamb
Copy link
Contributor

alamb commented Jun 22, 2021

🚀

@alamb alamb merged commit f2c01de into apache:master Jun 22, 2021
@Dandandan
Copy link
Contributor

Woohoo 🙌🙌🙌🙌 🎉 🎉🎉🎉

@houqp houqp deleted the qp_qualified branch June 22, 2021 18:13
@houqp
Copy link
Member Author

houqp commented Jun 22, 2021

Thank you all for the reviews! When I initially started working on this, I thought it would only take me two weekends, lol. I will send the join semantic change diff as a follow up PR tonight.

@alamb
Copy link
Contributor

alamb commented Jun 22, 2021

Thanks for the epic work @houqp -- this is really great to see

@jorgecarleitao
Copy link
Member

Thank you @houqp for this excellent PR, together with the .md docs. They set a really important foundation to the query's semantics 💯

@alamb
Copy link
Contributor

alamb commented Jun 22, 2021

I filed a ticket to track some follow on work: #601

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api change Changes the API exposed to users of the crate datafusion Changes in the datafusion crate enhancement New feature or request
Projects
None yet
8 participants