Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Inconsistent behavior in HashJoin Projections #10978

Closed
adragomir opened this issue Jun 18, 2024 · 3 comments · Fixed by #13022
Closed

Inconsistent behavior in HashJoin Projections #10978

adragomir opened this issue Jun 18, 2024 · 3 comments · Fixed by #13022
Labels
bug Something isn't working

Comments

@adragomir
Copy link

Describe the bug

We ran into problems with projections inside HashJoin.

Each schema in the join (left / right) has:

  • a single struct column
  • and the join column (reference to a get_field inside the first column)

The projection is [0, 2] - the struct column from left, and the struct column from right

The join column is not specified in the output. When trying to optimize the join and reverse the order, the projection is swapped as [2, 0], however there is no column with index 2 in the output, as the output contains only the 2 structs

To Reproduce

  • Create two schemas with a single struct column (key, value)
  • Join on the key
  • request the two value fields

Expected behavior

The hash join optimization works, even when swapping the join order (and wrapping in a ProjectionExec)

Additional context

Reading the comment for HashJoinExec::projection it says The projection indices of the columns in the output schema of join, however

I tried taking a stab at it, but it's unclear what the meaning of what is passed in projections is.
For now, I am fixing it surgically when swapping the order - I am rewriting the projections to be relative to the output schema when wrapping the join with a ProjectionExec

@adragomir adragomir added the bug Something isn't working label Jun 18, 2024
adragomir added a commit to hstack/arrow-datafusion that referenced this issue Jun 18, 2024
@adragomir
Copy link
Author

You can see my temporary possible fix here: hstack@a4ab67d

@my-vegetable-has-exploded
Copy link
Contributor

It seems a bug related to me, thanks for catching it. I would take a look later.

adragomir added a commit to hstack/arrow-datafusion that referenced this issue Jun 20, 2024
adragomir added a commit to hstack/arrow-datafusion that referenced this issue Jul 23, 2024
adragomir added a commit to hstack/arrow-datafusion that referenced this issue Jul 23, 2024
adragomir added a commit to hstack/arrow-datafusion that referenced this issue Aug 23, 2024
@korowa
Copy link
Contributor

korowa commented Sep 8, 2024

@adragomir could you provide any example of failure / inconsistency via physical plan construction or SQL statement?

From what I see, [2, 0] seems to be a valid embedded projection, since it applied to union of left and right join input schemas (which is called an "output schema" -- this part may be confusing, as the actual join output schema is different). As an example, the following test/piece of code, works as expected

    #[tokio::test]
    async fn join_inner_with_projection() -> Result<()> {
        let task_ctx = Arc::new(TaskContext::default());
        let left = build_table(
            ("a1", &vec![1, 2, 3]),
            ("b1", &vec![4, 5, 5]),
            ("c1", &vec![7, 8, 9]),
        );
        let right = build_table(
            ("a2", &vec![10, 20, 30]),
            ("b2", &vec![4, 5, 6]),
            ("c2", &vec![70, 80, 90]),
        );

        let on = vec![(
            Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
            Arc::new(Column::new_with_schema("b2", &right.schema())?) as _,
        )];

        let join = HashJoinExec::try_new(
            left,
            right,
            on,
            None,
            &JoinType::Inner,
            None,
            PartitionMode::CollectLeft,
            false,
        )?
        .with_projection(Some(vec![3, 0]))?;

        let batches = common::collect(join.execute(0, task_ctx)?).await?;

        let expected = [
            "+----+----+",
            "| a2 | a1 |",
            "+----+----+",
            "| 10 | 1  |",
            "| 20 | 2  |",
            "| 20 | 3  |",
            "+----+----+",
        ];

        // Inner join output is expected to preserve both inputs order
        assert_batches_eq!(expected, &batches);

        Ok(())
    }

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants