Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Limit nested loop join record batch size #12634

Closed

Conversation

mhilton
Copy link
Contributor

@mhilton mhilton commented Sep 26, 2024

Which issue does this PR close?

Closes #12633.

Rationale for this change

Some joins use an excessive amount of memory due to creating very large record batches. This will reduce that memory use.

What changes are included in this PR?

From the high level there are two changes introduced by this PR.

The first is to process the probe-side input batches in smaller sizes. The processing loop only processes as many rows of the probe-side input that are likely to fit in a record batch. This is somewhat pessimistic and assumes that for each probe-side row there will be one output row per build-side row (INNER joins excepted). It is possible that this could be tuned in the future to balance processing speed with memory use. In order to make progress at least one probe-side row will be processed on each loop.

The second change is to introduce an output buffer. This is used to consolidate small record batches where the JOIN condition has low selectivity. If the join condition has a high selectivity and therefore produces large batches the output buffer breaks these into smaller batches for further processing. The output buffer will always produce one batch, even if that batch is empty.

Are these changes tested?

There is a new test that ensures the output batches from NestedLoopJoinExec are no bigger than the configured batch size.

Existing tests are assumed to be sufficient to show that the behaviour hasn't changed.

Repeated the example from #12633 gives:

> SHOW datafusion.execution.batch_size;
+---------------------------------+-------+
| name                            | value |
+---------------------------------+-------+
| datafusion.execution.batch_size | 8192  |
+---------------------------------+-------+
1 row(s) fetched. 
Elapsed 0.039 seconds.

> CREATE TABLE test AS VALUES (0), (1), (2), (3), (4), (5), (6), (7), (8), (9);
0 row(s) fetched. 
Elapsed 0.010 seconds.

> EXPLAIN ANALYZE WITH test_t AS (SELECT concat(t1.column1, t2.column1, t3.column1, t4.column1, t5.column1) AS v FROM test t1, test t2, test t3, test t4, test t5) SELECT * FROM test_t tt1 FULL OUTER JOIN test_t tt2 ON tt1.v<>tt2.v;
+-------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type         | plan                                                                                                                                                                                                                                                                                  |
+-------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| Plan with Metrics | NestedLoopJoinExec: join_type=Full, filter=v@0 != v@1, metrics=[output_rows=9999900000, build_input_batches=10000, build_input_rows=100000, input_batches=10000, input_rows=100000, output_batches=1300010, build_mem_used=2492500, build_time=170.59826ms, join_time=309.369402772s] |
|                   |   CoalescePartitionsExec, metrics=[output_rows=100000, elapsed_compute=30.75µs]                                                                                                                                                                                                       |
|                   |     ProjectionExec: expr=[concat(CAST(column1@1 AS Utf8), CAST(column1@2 AS Utf8), CAST(column1@3 AS Utf8), CAST(column1@4 AS Utf8), CAST(column1@0 AS Utf8)) as v], metrics=[output_rows=100000, elapsed_compute=67.949286ms]                                                        |
|                   |       CrossJoinExec, metrics=[output_rows=100000, build_input_batches=1, build_input_rows=10, input_batches=1000, input_rows=10000, output_batches=10000, build_mem_used=224, build_time=139.458µs, join_time=8.338651ms]                                                             |
|                   |         MemoryExec: partitions=1, partition_sizes=[1], metrics=[]                                                                                                                                                                                                                     |
|                   |         RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1, metrics=[fetch_time=1.661829ms, repartition_time=1ns, send_time=10.136821ms]                                                                                                                           |
|                   |           ProjectionExec: expr=[column1@1 as column1, column1@2 as column1, column1@3 as column1, column1@0 as column1], metrics=[output_rows=10000, elapsed_compute=348.255µs]                                                                                                       |
|                   |             CrossJoinExec, metrics=[output_rows=10000, build_input_batches=1, build_input_rows=10, input_batches=100, input_rows=1000, output_batches=1000, build_mem_used=224, build_time=9.917µs, join_time=464.211µs]                                                              |
|                   |               MemoryExec: partitions=1, partition_sizes=[1], metrics=[]                                                                                                                                                                                                               |
|                   |               ProjectionExec: expr=[column1@1 as column1, column1@2 as column1, column1@0 as column1], metrics=[output_rows=1000, elapsed_compute=33.044µs]                                                                                                                           |
|                   |                 CrossJoinExec, metrics=[output_rows=1000, build_input_batches=1, build_input_rows=10, input_batches=10, input_rows=100, output_batches=100, build_mem_used=224, build_time=1.375µs, join_time=53.299µs]                                                               |
|                   |                   MemoryExec: partitions=1, partition_sizes=[1], metrics=[]                                                                                                                                                                                                           |
|                   |                   CrossJoinExec, metrics=[output_rows=100, build_input_batches=1, build_input_rows=10, input_batches=1, input_rows=10, output_batches=10, build_mem_used=224, build_time=1.083µs, join_time=244.708µs]                                                                |
|                   |                     MemoryExec: partitions=1, partition_sizes=[1], metrics=[]                                                                                                                                                                                                         |
|                   |                     MemoryExec: partitions=1, partition_sizes=[1], metrics=[]                                                                                                                                                                                                         |
|                   |   ProjectionExec: expr=[concat(CAST(column1@1 AS Utf8), CAST(column1@2 AS Utf8), CAST(column1@3 AS Utf8), CAST(column1@4 AS Utf8), CAST(column1@0 AS Utf8)) as v], metrics=[output_rows=100000, elapsed_compute=262.67843ms]                                                          |
|                   |     CrossJoinExec, metrics=[output_rows=100000, build_input_batches=1, build_input_rows=10, input_batches=1000, input_rows=10000, output_batches=10000, build_mem_used=224, build_time=5.916µs, join_time=60.39301ms]                                                                 |
|                   |       MemoryExec: partitions=1, partition_sizes=[1], metrics=[]                                                                                                                                                                                                                       |
|                   |       RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1, metrics=[fetch_time=1.857489ms, repartition_time=1ns, send_time=31.12258491s]                                                                                                                            |
|                   |         ProjectionExec: expr=[column1@1 as column1, column1@2 as column1, column1@3 as column1, column1@0 as column1], metrics=[output_rows=10000, elapsed_compute=408.628µs]                                                                                                         |
|                   |           CrossJoinExec, metrics=[output_rows=10000, build_input_batches=1, build_input_rows=10, input_batches=100, input_rows=1000, output_batches=1000, build_mem_used=224, build_time=792ns, join_time=926.525µs]                                                                  |
|                   |             MemoryExec: partitions=1, partition_sizes=[1], metrics=[]                                                                                                                                                                                                                 |
|                   |             ProjectionExec: expr=[column1@1 as column1, column1@2 as column1, column1@0 as column1], metrics=[output_rows=1000, elapsed_compute=44.416µs]                                                                                                                             |
|                   |               CrossJoinExec, metrics=[output_rows=1000, build_input_batches=1, build_input_rows=10, input_batches=10, input_rows=100, output_batches=100, build_mem_used=224, build_time=416ns, join_time=95.039µs]                                                                   |
|                   |                 MemoryExec: partitions=1, partition_sizes=[1], metrics=[]                                                                                                                                                                                                             |
|                   |                 CrossJoinExec, metrics=[output_rows=100, build_input_batches=1, build_input_rows=10, input_batches=1, input_rows=10, output_batches=10, build_mem_used=224, build_time=417ns, join_time=4.499µs]                                                                      |
|                   |                   MemoryExec: partitions=1, partition_sizes=[1], metrics=[]                                                                                                                                                                                                           |
|                   |                   MemoryExec: partitions=1, partition_sizes=[1], metrics=[]                                                                                                                                                                                                           |
|                   |                                                                                                                                                                                                                                                                                       |
+-------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
1 row(s) fetched. 
Elapsed 32.145 seconds.

Which is a mean batch size of 7692.17 rows.

Are there any user-facing changes?

No users should not notice any bahvioural difference.

Nested loop join creates a single output batch for each (right side)
input batch. When performing an outer join the size of the output
batch can be as large as number of left data rows * batch rows. If
the size of the left data is large then this can produce unreasonably
large output batches. Attempt to reduce the size of the output
batches by only processing a subset of the input batch at a time
where the output could be very large. The trade-off is that this
can produce a ;arge number of very small batches instead if the
left data is large but there is a highly selective filter.
Use buffering to keep the size of output batches from nested loop
join around the configured batch size. Small record batches are
buffered until there is enough rows available to fill a full batch
at which point the small batches are combined into a single batch.
Larger batches have batch sized slices taken from them until they
become smaller than the configured batch size.
Add a test that the nested loop join keeps the output batches smaller
than the configured batch size.
@github-actions github-actions bot added physical-expr Physical Expressions core Core DataFusion crate sqllogictest SQL Logic Tests (.slt) labels Sep 26, 2024
Copy link
Contributor

@comphead comphead left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @mhilton I'm planning to review it this week
@korowa FYI

@comphead
Copy link
Contributor

@mhilton would be that possible to create a unit test reproducing the problem? This will also be important to prevent regression. The repro can be on small batch size up to 5

Add a test that exercises the large batch size issue described in
issue apache#12633. This was a code review request.
@mhilton
Copy link
Contributor Author

mhilton commented Sep 27, 2024

@mhilton would be that possible to create a unit test reproducing the problem? This will also be important to prevent regression. The repro can be on small batch size up to 5

I have added test_issue_12633 which covers this. I've checked that it fails without the changes from this PR.

let right_batch = self.outer_record_batch.as_ref().unwrap();
let num_rows = match (self.join_type, left_data.batch().num_rows()) {
// An inner join will only produce 1 output row per input row.
(JoinType::Inner, _) | (_, 0) => self.output_buffer.needed_rows(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't it possible for an Inner Join produce multiple output rows per input row? I believe the "1 output row per input row" statement only holds true for equijoins. NestedLoopJoin works on non equijoins by definition.

/// NestedLoopJoinExec is build-probe join operator, whose main task is to
/// perform joins without any equijoin conditions in `ON` clause.

For example, for left=[1, 2] and right=[1, 2, 3] with the ON clause left<>right, it produces [(1, 2), (2, 1), (1, 3), (2, 3)]. The row 3 from the right side produces 2 rows.

It seems impossible to predict the number of output rows without running the join.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right of course, I'm not sure why I got it in my head that they would be 1-1. I'll fix that.

Stop assuming that an INNER join cannot produce more output rows
than input. Use the same row count logic for all join types.
@alihan-synnada
Copy link
Contributor

Hello. I ran some benchmarks and it seem this solution can slow down the execution by a considerable amount. Can you run benchmarks on your end too? I'm working on an alternative solution that may potentially run better. I'll open a PR if it ends up working.

Copy link
Contributor

@comphead comphead left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alihan-synnada is totally correct, we need to get some performance numbers before moving on

@austin362667
Copy link
Contributor

austin362667 commented Oct 1, 2024

@mhilton Thanks for your work.
Here is the benchmark against the IMDB dataset; I hope it will be helpful.
Query 2 is notably slower, so perhaps we can dig into it further.

SELECT MIN(mc.note) AS production_note, MIN(t.title) AS movie_title, MIN(t.production_year) AS movie_year FROM company_type AS ct, info_type AS it, movie_companies AS mc, movie_info_idx AS mi_idx, title AS t WHERE ct.kind = 'production companies' AND it.info = 'bottom 10 rank' AND mc.note  not like '%(as Metro-Goldwyn-Mayer Pictures)%' AND t.production_year between 2005 and 2010 AND ct.id = mc.company_type_id AND t.id = mc.movie_id AND t.id = mi_idx.movie_id AND mc.movie_id = mi_idx.movie_id AND it.id = mi_idx.info_type_id;
austin@Machine ~/D/arrow-datafusion (imdb-benchmark-on-pr#12634)> ./benchmarks/bench.sh compare imdb-benchmark imdb-benchmark-on-pr#12634
Comparing imdb-benchmark and imdb-benchmark-on-pr#12634
--------------------
Benchmark imdb.json
--------------------
┏━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ Query        ┃ imdb-benchmark ┃ imdb-benchmark-on-pr#12634 ┃        Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
│ QQuery 1     │       427.68ms │                   251.46ms │ +1.70x faster │
│ QQuery 2     │       239.03ms │                   590.69ms │  2.47x slower │
│ QQuery 3     │       186.16ms │                   268.15ms │  1.44x slower │
│ QQuery 4     │       229.37ms │                   326.47ms │  1.42x slower │
│ QQuery 5     │       223.12ms │                   309.27ms │  1.39x slower │
│ QQuery 6     │       224.09ms │                   323.87ms │  1.45x slower │
│ QQuery 7     │       218.43ms │                   315.13ms │  1.44x slower │
│ QQuery 8     │       312.08ms │                   358.15ms │  1.15x slower │
│ QQuery 9     │       918.78ms │                  1006.36ms │  1.10x slower │
│ QQuery 10    │       796.41ms │                   714.25ms │ +1.12x faster │
│ QQuery 11    │       935.68ms │                   820.09ms │ +1.14x faster │
│ QQuery 12    │       268.38ms │                   231.93ms │ +1.16x faster │
│ QQuery 13    │       223.23ms │                   183.57ms │ +1.22x faster │
│ QQuery 14    │       313.61ms │                   254.16ms │ +1.23x faster │
│ QQuery 15    │       966.59ms │                   818.66ms │ +1.18x faster │
│ QQuery 16    │       985.37ms │                   816.81ms │ +1.21x faster │
│ QQuery 17    │       919.12ms │                   867.88ms │ +1.06x faster │
│ QQuery 18    │      2927.84ms │                  2749.69ms │ +1.06x faster │
│ QQuery 19    │      2895.11ms │                  2816.35ms │     no change │
│ QQuery 20    │      2759.74ms │                  2707.31ms │     no change │
│ QQuery 21    │      3128.71ms │                  2859.25ms │ +1.09x faster │
│ QQuery 22    │      3057.45ms │                  2773.62ms │ +1.10x faster │
│ QQuery 23    │      3573.51ms │                  3020.14ms │ +1.18x faster │
│ QQuery 24    │       920.38ms │                   785.96ms │ +1.17x faster │
│ QQuery 25    │       833.97ms │                   772.89ms │ +1.08x faster │
│ QQuery 26    │      1327.76ms │                  1226.86ms │ +1.08x faster │
│ QQuery 27    │       790.45ms │                   830.99ms │  1.05x slower │
│ QQuery 28    │       858.53ms │                   901.15ms │     no change │
│ QQuery 29    │      7372.84ms │                  6747.32ms │ +1.09x faster │
│ QQuery 30    │      7402.80ms │                  5700.26ms │ +1.30x faster │
│ QQuery 31    │      1349.99ms │                  1343.04ms │     no change │
│ QQuery 32    │      1416.36ms │                  1158.56ms │ +1.22x faster │
│ QQuery 33    │      1452.93ms │                  1384.48ms │     no change │
│ QQuery 34    │      1493.34ms │                  1429.02ms │     no change │
│ QQuery 35    │      1092.89ms │                   926.61ms │ +1.18x faster │
│ QQuery 36    │       910.28ms │                   813.24ms │ +1.12x faster │
│ QQuery 37    │       900.57ms │                   912.84ms │     no change │
│ QQuery 38    │       280.75ms │                   248.47ms │ +1.13x faster │
│ QQuery 39    │       286.03ms │                   270.67ms │ +1.06x faster │
│ QQuery 40    │       272.78ms │                   264.77ms │     no change │
│ QQuery 41    │       768.78ms │                   737.98ms │     no change │
│ QQuery 42    │       844.27ms │                   900.34ms │  1.07x slower │
│ QQuery 43    │      1428.58ms │                  1439.04ms │     no change │
│ QQuery 44    │       902.23ms │                  1050.91ms │  1.16x slower │
│ QQuery 45    │       952.23ms │                   986.88ms │     no change │
│ QQuery 46    │       733.12ms │                   738.83ms │     no change │
│ QQuery 47    │       824.63ms │                   706.66ms │ +1.17x faster │
│ QQuery 48    │       882.41ms │                   802.16ms │ +1.10x faster │
│ QQuery 49    │      1008.04ms │                   971.09ms │     no change │
│ QQuery 50    │       962.39ms │                   922.04ms │     no change │
│ QQuery 51    │       999.78ms │                   914.23ms │ +1.09x faster │
│ QQuery 52    │      1230.14ms │                  1059.77ms │ +1.16x faster │
│ QQuery 53    │      1072.94ms │                  1066.73ms │     no change │
│ QQuery 54    │      1227.30ms │                  1175.94ms │     no change │
│ QQuery 55    │       487.65ms │                   399.71ms │ +1.22x faster │
│ QQuery 56    │     19721.02ms │                 16432.02ms │ +1.20x faster │
│ QQuery 57    │     17101.19ms │                 17297.19ms │     no change │
│ QQuery 58    │     16990.06ms │                 17562.84ms │     no change │
│ QQuery 59    │     17443.75ms │                 16147.33ms │ +1.08x faster │
│ QQuery 60    │     12064.59ms │                  8756.85ms │ +1.38x faster │
│ QQuery 61    │     25606.02ms │                 33042.57ms │  1.29x slower │
│ QQuery 62    │     25240.80ms │                 28170.18ms │  1.12x slower │
│ QQuery 63    │     25238.73ms │                 27432.88ms │  1.09x slower │
│ QQuery 64    │      9405.30ms │                 10152.95ms │  1.08x slower │
│ QQuery 65    │     28449.72ms │                 27702.03ms │     no change │
│ QQuery 66    │      2910.74ms │                  2996.31ms │     no change │
│ QQuery 67    │      1835.37ms │                  1439.12ms │ +1.28x faster │
│ QQuery 68    │      1864.57ms │                  1528.06ms │ +1.22x faster │
│ QQuery 69    │      2362.05ms │                  2074.52ms │ +1.14x faster │
│ QQuery 70    │      1833.31ms │                  2282.09ms │  1.24x slower │
│ QQuery 71    │      1946.16ms │                  2036.51ms │     no change │
│ QQuery 72    │      2014.85ms │                  2397.57ms │  1.19x slower │
│ QQuery 73    │       985.92ms │                   937.37ms │     no change │
│ QQuery 74    │      1227.04ms │                  1027.96ms │ +1.19x faster │
│ QQuery 75    │      1434.08ms │                  1349.56ms │ +1.06x faster │
│ QQuery 76    │       895.58ms │                   980.36ms │  1.09x slower │
│ QQuery 77    │       975.93ms │                   969.45ms │     no change │
│ QQuery 78    │      1163.51ms │                  1062.58ms │ +1.09x faster │
│ QQuery 79    │      1335.79ms │                  1155.68ms │ +1.16x faster │
│ QQuery 80    │      1070.18ms │                  1112.60ms │     no change │
│ QQuery 81    │      1340.66ms │                  1188.29ms │ +1.13x faster │
│ QQuery 82    │      1518.97ms │                  1249.35ms │ +1.22x faster │
│ QQuery 83    │      1905.73ms │                  1149.41ms │ +1.66x faster │
│ QQuery 84    │      1515.77ms │                  1040.90ms │ +1.46x faster │
│ QQuery 85    │      1151.02ms │                  1303.31ms │  1.13x slower │
│ QQuery 86    │      2680.10ms │                  2422.67ms │ +1.11x faster │
│ QQuery 87    │      2209.13ms │                  2036.14ms │ +1.08x faster │
│ QQuery 88    │      1494.10ms │                  1412.88ms │ +1.06x faster │
│ QQuery 89    │      1453.42ms │                  1484.47ms │     no change │
│ QQuery 90    │      1976.67ms │                  1556.59ms │ +1.27x faster │
│ QQuery 91    │      1091.85ms │                   961.81ms │ +1.14x faster │
│ QQuery 92    │      1194.71ms │                  1036.54ms │ +1.15x faster │
│ QQuery 93    │      1064.62ms │                  1279.49ms │  1.20x slower │
│ QQuery 94    │      1040.22ms │                  1201.87ms │  1.16x slower │
│ QQuery 95    │      1045.82ms │                  1160.86ms │  1.11x slower │
│ QQuery 96    │      1213.03ms │                  1033.81ms │ +1.17x faster │
│ QQuery 97    │      1078.71ms │                  1147.42ms │  1.06x slower │
│ QQuery 98    │      1071.27ms │                  1072.74ms │     no change │
│ QQuery 99    │      1159.91ms │                  1350.06ms │  1.16x slower │
│ QQuery 100   │      1845.79ms │                  1845.12ms │     no change │
│ QQuery 101   │      1804.21ms │                  1902.28ms │  1.05x slower │
│ QQuery 102   │      2440.43ms │                  2293.14ms │ +1.06x faster │
│ QQuery 103   │      1564.05ms │                  1441.52ms │ +1.09x faster │
│ QQuery 104   │      1410.36ms │                  1476.32ms │     no change │
│ QQuery 105   │      1765.29ms │                  1557.54ms │ +1.13x faster │
│ QQuery 106   │      1862.78ms │                  1641.91ms │ +1.13x faster │
│ QQuery 107   │      1962.28ms │                  1468.31ms │ +1.34x faster │
│ QQuery 108   │      1685.42ms │                  1661.01ms │     no change │
│ QQuery 109   │       348.77ms │                   255.79ms │ +1.36x faster │
│ QQuery 110   │       380.69ms │                   262.99ms │ +1.45x faster │
│ QQuery 111   │       555.95ms │                   490.05ms │ +1.13x faster │
│ QQuery 112   │       409.35ms │                   406.78ms │     no change │
│ QQuery 113   │       553.14ms │                   501.00ms │ +1.10x faster │
└──────────────┴────────────────┴────────────────────────────┴───────────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━┓
┃ Benchmark Summary                         ┃             ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━┩
│ Total Time (imdb-benchmark)               │ 336927.11ms │
│ Total Time (imdb-benchmark-on-pr#12634)   │ 333311.69ms │
│ Average Time (imdb-benchmark)             │   2981.66ms │
│ Average Time (imdb-benchmark-on-pr#12634) │   2949.66ms │
│ Queries Faster                            │          58 │
│ Queries Slower                            │          25 │
│ Queries with No Change                    │          30 │
└───────────────────────────────────────────┴─────────────┘

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am running the standard tpch benchmarks now as well to see what they show.

I left a comment about how to potentially avoid the slowdown, but it would indeed make this code (even more) complicated

if self.total_rows < self.target_batch_size {
return None;
}
if self.total_rows == self.target_batch_size {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we are guaranteed that the rows will always be exactly the target batch size?

Maybe it it would be safer to use something like

Suggested change
if self.total_rows == self.target_batch_size {
if self.total_rows >= self.target_batch_size {

num_rows,
right_batch.num_rows() - self.outer_record_batch_row,
);
let sliced_right_batch =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems to be the core concern / potential loss of performance (joining one row at a time)

An alternate solution might be to

  1. Try to join the entire batch, but return an error if the number of matched rows exceeds some threshold (the target batch size? 2x the target batchsize?)
  2. If the "too many matched rows" error is returned, then fallback to joining one row at a time

This is indeed more complicated, but it would mean we only pay the performance hit when a large number of matching rows are produced (aka when the join is increasing cardinality significantly)

Copy link
Contributor

@comphead comphead Oct 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We def need to go to batch
Reg to this nice video
https://www.youtube.com/watch?v=RcEW0P8iVTc
it shows how the NLJ cost can drastically be decreased by using chunk based approach

I'll try to look at it some day when sort out SMJ problem(which is more promising now)

@alamb
Copy link
Contributor

alamb commented Oct 3, 2024

Here are my numbers for tpch which shows maybe a tiny slowdown (though to be honest I would have thought these queries used HashJoin (I haven't looked at if / why they are using NLJ)

--------------------
Benchmark tpch_sf1.json
--------------------
┏━━━━━━━━━━━━━━┳━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ Query        ┃ main_base ┃ limit-nested-loop-join-record-ba… ┃        Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
│ QQuery 1     │  211.87ms │                          217.66ms │     no change │
│ QQuery 2     │  118.69ms │                          117.45ms │     no change │
│ QQuery 3     │  129.28ms │                          113.61ms │ +1.14x faster │
│ QQuery 4     │   88.87ms │                           87.13ms │     no change │
│ QQuery 5     │  162.00ms │                          154.86ms │     no change │
│ QQuery 6     │   41.92ms │                           41.09ms │     no change │
│ QQuery 7     │  198.63ms │                          211.70ms │  1.07x slower │
│ QQuery 8     │  169.88ms │                          164.14ms │     no change │
│ QQuery 9     │  233.57ms │                          242.03ms │     no change │
│ QQuery 10    │  211.21ms │                          222.28ms │  1.05x slower │
│ QQuery 11    │   93.98ms │                           90.89ms │     no change │
│ QQuery 12    │  143.02ms │                          142.96ms │     no change │
│ QQuery 13    │  216.01ms │                          214.19ms │     no change │
│ QQuery 14    │   76.94ms │                           92.43ms │  1.20x slower │
│ QQuery 15    │  108.46ms │                          110.15ms │     no change │
│ QQuery 16    │   81.20ms │                           90.09ms │  1.11x slower │
│ QQuery 17    │  222.14ms │                          218.50ms │     no change │
│ QQuery 18    │  330.92ms │                          326.56ms │     no change │
│ QQuery 19    │  130.54ms │                          138.10ms │  1.06x slower │
│ QQuery 20    │  138.87ms │                          142.13ms │     no change │
│ QQuery 21    │  260.53ms │                          280.13ms │  1.08x slower │
│ QQuery 22    │   63.30ms │                           64.16ms │     no change │
└──────────────┴───────────┴───────────────────────────────────┴───────────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━┓
┃ Benchmark Summary                                       ┃           ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━┩
│ Total Time (main_base)                                  │ 3431.83ms │
│ Total Time (limit-nested-loop-join-record-batch-size)   │ 3482.24ms │
│ Average Time (main_base)                                │  155.99ms │
│ Average Time (limit-nested-loop-join-record-batch-size) │  158.28ms │
│ Queries Faster                                          │         1 │
│ Queries Slower                                          │         6 │
│ Queries with No Change                                  │        15 │
└─────────────────────────────────────────────────────────┴───────────┘

@alihan-synnada
Copy link
Contributor

alihan-synnada commented Oct 4, 2024

I wondered if buffering batches and joining them was an issue so I ran some test with just slicing batches without any buffering or joining. I set the number build rows to 1000 and ran benchmarks on just NestedLoopJoinStream with different batch sizes (128, 1024, 8192), selectivities (1%, 50%, 99%) and ratios of rows in build side by rows in probe side (1, 10, 100)

Edit: Filter used is left <> right AND right % 100 < selectivity

slice_indices_then_build_batch
build_batch_then_slice_batch

Building the output batch first and then slicing it down to target batch size while returning batches is the fastest solution but it still uses probe rows x build rows memory. It does help other operators down the line receive smaller batches, so it might have a merit.

Slicing down to target batch sizes solves the memory issue but comes at a huge time cost. We could compromise and meet in the middle, slice the indices to a multiple of target batch size and then split them down to target batch size while returning the batches in the end. The more calls we make to apply_join_filter_to_indices and adjust_indices_by_join_type the slower it gets.

I dug into these functions and realized that the way indices are constructed in adjust_indices_by_join_type is inefficient as they use iter + collect. Using something like MutableArrayData instead gives about a 15% boost in all cases and even more with smaller batch sizes with the slice-first-build-later strategy.

@alamb
Copy link
Contributor

alamb commented Oct 16, 2024

@alihan-synnada has an alternate proposal in #12969

@mhilton
Copy link
Contributor Author

mhilton commented Oct 18, 2024

This has been superseded by #12969

@mhilton mhilton closed this Oct 18, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core DataFusion crate physical-expr Physical Expressions sqllogictest SQL Logic Tests (.slt)
Projects
None yet
Development

Successfully merging this pull request may close these issues.

NestedLoopJoinExec can create excessively large record batches
5 participants