Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Aggregate queries produce different results between runs #658

Closed
andygrove opened this issue Jul 3, 2021 · 7 comments
Closed

Aggregate queries produce different results between runs #658

andygrove opened this issue Jul 3, 2021 · 7 comments
Labels
bug Something isn't working

Comments

@andygrove
Copy link
Member

andygrove commented Jul 3, 2021

Describe the bug
I ran TPC-H query 12 several times with DataFusion and got different results each time:

+------------+-----------------+----------------+
| l_shipmode | high_line_count | low_line_count |
+------------+-----------------+----------------+
| MAIL       | 623097          | 934685         |
| SHIP       | 622962          | 934510         |
+------------+-----------------+----------------+
Query 12 iteration 0 took 42932.5 ms

+------------+-----------------+----------------+
| l_shipmode | high_line_count | low_line_count |
+------------+-----------------+----------------+
| MAIL       | 623099          | 934685         |
| SHIP       | 622961          | 934513         |
+------------+-----------------+----------------+
Query 12 iteration 0 took 41807.4 ms

+------------+-----------------+----------------+
| l_shipmode | high_line_count | low_line_count |
+------------+-----------------+----------------+
| MAIL       | 623099          | 934691         |
| SHIP       | 622961          | 934514         |
+------------+-----------------+----------------+
Query 12 iteration 1 took 41652.9 ms

+------------+-----------------+----------------+
| l_shipmode | high_line_count | low_line_count |
+------------+-----------------+----------------+
| MAIL       | 623097          | 934686         |
| SHIP       | 622962          | 934510         |
+------------+-----------------+----------------+
Query 12 iteration 2 took 41660.9 ms

I see the same behavior with Ballista.

Query 12 iteration 0 took 32341.6 ms
+------------+-----------------+----------------+
| l_shipmode | high_line_count | low_line_count |
+------------+-----------------+----------------+
| MAIL       | 623094          | 934692         |
| SHIP       | 622961          | 934512         |
+------------+-----------------+----------------+

Query 12 iteration 1 took 29347.7 ms
+------------+-----------------+----------------+
| l_shipmode | high_line_count | low_line_count |
+------------+-----------------+----------------+
| MAIL       | 623093          | 934697         |
| SHIP       | 622963          | 934515         |
+------------+-----------------+----------------+

To Reproduce

../target/release/tpch benchmark datafusion --path /mnt/tpch/parquet-sf100-partitioned/ --format parquet --iterations 3 --query 12 --debug

Expected behavior
Results should be the same on each run.

Additional context

Physical plan:

SortExec: [l_shipmode@0 ASC]
  MergeExec
    ProjectionExec: expr=[l_shipmode@0 as l_shipmode, SUM(CASE WHEN #orders.o_orderpriority Eq Utf8("1-URGENT") Or #orders.o_orderpriority Eq Utf8("2-HIGH") THEN Int64(1) ELSE Int64(0) END)@1 as high_line_count, SUM(CASE WHEN #orders.o_orderpriority NotEq Utf8("1-URGENT") And #orders.o_orderpriority NotEq Utf8("2-HIGH") THEN Int64(1) ELSE Int64(0) END)@2 as low_line_count]
      HashAggregateExec: mode=FinalPartitioned, gby=[l_shipmode@0 as l_shipmode], aggr=[SUM(CASE WHEN #orders.o_orderpriority Eq Utf8("1-URGENT") Or #orders.o_orderpriority Eq Utf8("2-HIGH") THEN Int64(1) ELSE Int64(0) END), SUM(CASE WHEN #orders.o_orderpriority NotEq Utf8("1-URGENT") And #orders.o_orderpriority NotEq Utf8("2-HIGH") THEN Int64(1) ELSE Int64(0) END)]
        CoalesceBatchesExec: target_batch_size=4096
          RepartitionExec: partitioning=Hash([Column { name: "l_shipmode", index: 0 }], 2)
            HashAggregateExec: mode=Partial, gby=[l_shipmode@4 as l_shipmode], aggr=[SUM(CASE WHEN #orders.o_orderpriority Eq Utf8("1-URGENT") Or #orders.o_orderpriority Eq Utf8("2-HIGH") THEN Int64(1) ELSE Int64(0) END), SUM(CASE WHEN #orders.o_orderpriority NotEq Utf8("1-URGENT") And #orders.o_orderpriority NotEq Utf8("2-HIGH") THEN Int64(1) ELSE Int64(0) END)]
              CoalesceBatchesExec: target_batch_size=4096
                HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: "l_orderkey", index: 0 }, Column { name: "o_orderkey", index: 0 })]
                  CoalesceBatchesExec: target_batch_size=4096
                    RepartitionExec: partitioning=Hash([Column { name: "l_orderkey", index: 0 }], 2)
                      CoalesceBatchesExec: target_batch_size=4096
                        FilterExec: l_shipmode@4 IN ([Literal { value: Utf8("MAIL") }, Literal { value: Utf8("SHIP") }]) AND l_commitdate@2 < l_receiptdate@3 AND l_shipdate@1 < l_commitdate@2 AND l_receiptdate@3 >= 8766 AND l_receiptdate@3 < 9131
                          ParquetExec: batch_size=8192, limit=None, partitions=[/mnt/tpch/parquet-sf100-partitioned//lineitem/part-00004-bfa6ef87-037f-477e-b18b-d9344b7d72a1-c000.snappy.parquet, /mnt/tpch/parquet-sf100-partitioned//lineitem/part-00002-bfa6ef87-037f-477e-b18b-d9344b7d72a1-c000.snappy.parquet, /mnt/tpch/parquet-sf100-partitioned//lineitem/part-00003-bfa6ef87-037f-477e-b18b-d9344b7d72a1-c000.snappy.parquet, /mnt/tpch/parquet-sf100-partitioned//lineitem/part-00005-bfa6ef87-037f-477e-b18b-d9344b7d72a1-c000.snappy.parquet, /mnt/tpch/parquet-sf100-partitioned//lineitem/part-00000-bfa6ef87-037f-477e-b18b-d9344b7d72a1-c000.snappy.parquet, /mnt/tpch/parquet-sf100-partitioned//lineitem/part-00006-bfa6ef87-037f-477e-b18b-d9344b7d72a1-c000.snappy.parquet, /mnt/tpch/parquet-sf100-partitioned//lineitem/part-00001-bfa6ef87-037f-477e-b18b-d9344b7d72a1-c000.snappy.parquet, /mnt/tpch/parquet-sf100-partitioned//lineitem/part-00007-bfa6ef87-037f-477e-b18b-d9344b7d72a1-c000.snappy.parquet]
                  CoalesceBatchesExec: target_batch_size=4096
                    RepartitionExec: partitioning=Hash([Column { name: "o_orderkey", index: 0 }], 2)
                      ParquetExec: batch_size=8192, limit=None, partitions=[/mnt/tpch/parquet-sf100-partitioned//orders/part-00001-9530d415-a7b1-4dd0-8f7b-7be0e6f0f37a-c000.snappy.parquet, /mnt/tpch/parquet-sf100-partitioned//orders/part-00006-9530d415-a7b1-4dd0-8f7b-7be0e6f0f37a-c000.snappy.parquet, /mnt/tpch/parquet-sf100-partitioned//orders/part-00000-9530d415-a7b1-4dd0-8f7b-7be0e6f0f37a-c000.snappy.parquet, /mnt/tpch/parquet-sf100-partitioned//orders/part-00005-9530d415-a7b1-4dd0-8f7b-7be0e6f0f37a-c000.snappy.parquet, /mnt/tpch/parquet-sf100-partitioned//orders/part-00004-9530d415-a7b1-4dd0-8f7b-7be0e6f0f37a-c000.snappy.parquet, /mnt/tpch/parquet-sf100-partitioned//orders/part-00003-9530d415-a7b1-4dd0-8f7b-7be0e6f0f37a-c000.snappy.parquet, /mnt/tpch/parquet-sf100-partitioned//orders/part-00007-9530d415-a7b1-4dd0-8f7b-7be0e6f0f37a-c000.snappy.parquet, /mnt/tpch/parquet-sf100-partitioned//orders/part-00002-9530d415-a7b1-4dd0-8f7b-7be0e6f0f37a-c000.snappy.parquet]

@andygrove andygrove added the bug Something isn't working label Jul 3, 2021
@andygrove
Copy link
Member Author

@alamb @jorgecarleitao @Dandandan fyi I am going to take a look at this one tomorrow but let me know if you all have any educated guesses as to the root cause. I think I will start by adding more metrics to see number of output rows from each operator and see if that provides any clues.

@Dandandan
Copy link
Contributor

Dandandan commented Jul 3, 2021

We had some problems before with errors during execution causing it to return empty results, as the errors were ignored elsewhere.

Might be something similar that an error happens somewhere during the execution, so it only produces part of the data... In that case, the randomness could be because of the streams of batches being produced in parallel?

@andygrove
Copy link
Member Author

The issue seems to be in HashJoinExec. Here are results from two runs.

HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: "l_orderkey", index: 0 }, Column { name: "o_orderkey", index: 0 })], metrics=outputRows=3115255
  CoalesceBatchesExec: target_batch_size=4096, metrics=outputRows=3115341
HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: "l_orderkey", index: 0 }, Column { name: "o_orderkey", index: 0 })], metrics=outputRows=3115252
  CoalesceBatchesExec: target_batch_size=4096, metrics=outputRows=3115341

@andygrove
Copy link
Member Author

I created #664 to add metrics to help with this. I am out of time for today.

@Dandandan
Copy link
Contributor

@andygrove I am wondering if this is still the case on latest master.

#827 had one fix for an issue which could maybe trigger something like this for big datasets.

@andygrove
Copy link
Member Author

@Dandandan This does seem to be resolved now. I ran it half a dozen times just now and got consistent results:

+------------+-----------------+----------------+
| l_shipmode | high_line_count | low_line_count |
+------------+-----------------+----------------+
| MAIL       | 623115          | 934713         |
| SHIP       | 622979          | 934534         |
+------------+-----------------+----------------+
Query 12 iteration 1 took 23521.0 ms

It is also with noting that this query is now almost 2x the speed compared to when I filed this issue 🚀

I am closing this issue.

@andygrove
Copy link
Member Author

I ran the same query with Apache Spark and the results are consistent.

+----------+---------------+--------------+
|l_shipmode|high_line_count|low_line_count|
+----------+---------------+--------------+
|      MAIL|         623115|        934713|
|      SHIP|         622979|        934534|
+----------+---------------+--------------+

Iteration 0 took 23305 ms

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

2 participants