Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] test_single_sort_in_part is failed in nightly UCX and AQE (no UCX) integration #2477

Closed
jlowe opened this issue May 21, 2021 · 12 comments
Assignees
Labels
bug Something isn't working P0 Must have for release

Comments

@jlowe
Copy link
Member

jlowe commented May 21, 2021

The test_single_sort_in_part test failed with a comparison failure in a recent UCX EGX standalone integration test:

11:08:43  =================================== FAILURES ===================================
11:08:43  �[31m�[1m___ test_single_sort_in_part[Column<b'a DESC NULLS FIRST'>-Float(not_null)] ____�[0m
11:08:43  
11:08:43  data_gen = Float(not_null), order = Column<b'a DESC NULLS FIRST'>
11:08:43  
11:08:43      @pytest.mark.parametrize('data_gen', orderable_gens + orderable_not_null_gen, ids=idfn)
11:08:43      @pytest.mark.parametrize('order', [f.col('a').asc(), f.col('a').asc_nulls_last(), f.col('a').desc(), f.col('a').desc_nulls_first()], ids=idfn)
11:08:43      def test_single_sort_in_part(data_gen, order):
11:08:43  >       assert_gpu_and_cpu_are_equal_collect(
11:08:43                  lambda spark : unary_op_df(spark, data_gen).sortWithinPartitions(order),
11:08:43                  conf = allow_negative_scale_of_decimal_conf)
11:08:43  
11:08:43  �[1m�[31m../../src/main/python/sort_test.py�[0m:112: 
11:08:43  _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
11:08:43  �[1m�[31m../../src/main/python/asserts.py�[0m:364: in assert_gpu_and_cpu_are_equal_collect
11:08:43      _assert_gpu_and_cpu_are_equal(func, 'COLLECT', conf=conf, is_cpu_first=is_cpu_first)
11:08:43  �[1m�[31m../../src/main/python/asserts.py�[0m:356: in _assert_gpu_and_cpu_are_equal
11:08:43      assert_equal(from_cpu, from_gpu)
11:08:43  �[1m�[31m../../src/main/python/asserts.py�[0m:93: in assert_equal
11:08:43      _assert_equal(cpu, gpu, float_check=get_float_check(), path=[])
11:08:43  �[1m�[31m../../src/main/python/asserts.py�[0m:41: in _assert_equal
11:08:43      _assert_equal(cpu[index], gpu[index], float_check, path + [index])
11:08:43  �[1m�[31m../../src/main/python/asserts.py�[0m:34: in _assert_equal
11:08:43      _assert_equal(cpu[field], gpu[field], float_check, path + [field])
11:08:43  _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
11:08:43  
11:08:43  cpu = inf, gpu = 7.168535671669734e+33
11:08:43  float_check = <function get_float_check.<locals>.<lambda> at 0x7f3945ac5430>
11:08:43  path = [2, 'a']
11:08:43  
11:08:43      def _assert_equal(cpu, gpu, float_check, path):
11:08:43          t = type(cpu)
11:08:43          if (t is Row):
11:08:43              assert len(cpu) == len(gpu), "CPU and GPU row have different lengths at {} CPU: {} GPU: {}".format(path, len(cpu), len(gpu))
11:08:43              if hasattr(cpu, "__fields__") and hasattr(gpu, "__fields__"):
11:08:43                  for field in cpu.__fields__:
11:08:43                      _assert_equal(cpu[field], gpu[field], float_check, path + [field])
11:08:43              else:
11:08:43                  for index in range(len(cpu)):
11:08:43                      _assert_equal(cpu[index], gpu[index], float_check, path + [index])
11:08:43          elif (t is list):
11:08:43              assert len(cpu) == len(gpu), "CPU and GPU list have different lengths at {} CPU: {} GPU: {}".format(path, len(cpu), len(gpu))
11:08:43              for index in range(len(cpu)):
11:08:43                  _assert_equal(cpu[index], gpu[index], float_check, path + [index])
11:08:43          elif (t is pytypes.GeneratorType):
11:08:43              index = 0
11:08:43              # generator has no zip :( so we have to do this the hard way
11:08:43              done = False
11:08:43              while not done:
11:08:43                  sub_cpu = None
11:08:43                  sub_gpu = None
11:08:43                  try:
11:08:43                      sub_cpu = next(cpu)
11:08:43                  except StopIteration:
11:08:43                      done = True
11:08:43      
11:08:43                  try:
11:08:43                      sub_gpu = next(gpu)
11:08:43                  except StopIteration:
11:08:43                      done = True
11:08:43      
11:08:43                  if done:
11:08:43                      assert sub_cpu == sub_gpu and sub_cpu == None, "CPU and GPU generators have different lengths at {}".format(path)
11:08:43                  else:
11:08:43                      _assert_equal(sub_cpu, sub_gpu, float_check, path + [index])
11:08:43      
11:08:43                  index = index + 1
11:08:43          elif (t is dict):
11:08:43              # TODO eventually we need to split this up so we can do the right thing for float/double
11:08:43              # values stored under the map some where, especially for NaNs
11:08:43              assert cpu == gpu, "GPU and CPU map values are different at {}".format(path)
11:08:43          elif (t is int):
11:08:43              assert cpu == gpu, "GPU and CPU int values are different at {}".format(path)
11:08:43          elif (t is float):
11:08:43              if (math.isnan(cpu)):
11:08:43                  assert math.isnan(gpu), "GPU and CPU float values are different at {}".format(path)
11:08:43              else:
11:08:43  >               assert float_check(cpu, gpu), "GPU and CPU float values are different {}".format(path)
11:08:43  �[1m�[31mE               AssertionError: GPU and CPU float values are different [2, 'a']�[0m
11:08:43  
11:08:43  �[1m�[31m../../src/main/python/asserts.py�[0m:75: AssertionError
11:08:43  ----------------------------- Captured stdout call -----------------------------
11:08:43  ### CPU RUN ###
11:08:43  ### GPU RUN ###
11:08:43  ### COLLECT: GPU TOOK 0.18548274040222168 CPU TOOK 0.16509771347045898 ###
11:08:43  =========================== short test summary info ============================
11:08:43  FAILED ../../src/main/python/sort_test.py::test_single_sort_in_part[Column<b'a DESC NULLS FIRST'>-Float(not_null)]
@jlowe jlowe added bug Something isn't working ? - Needs Triage Need team to review and classify labels May 21, 2021
@abellina abellina self-assigned this May 21, 2021
@abellina
Copy link
Collaborator

I'll attempt to repro/triage this one.

@abellina abellina added the P0 Must have for release label May 21, 2021
@abellina
Copy link
Collaborator

Marking P1 as we don't know anything about it.

@abellina
Copy link
Collaborator

I haven't been able to reproduce locally yet, with several executors running and UCX enabled. I verified I have the same cudf jar as in the failed job.

I am trying to see how repeatable it is in the EGX environment.

@sameerz sameerz removed the ? - Needs Triage Need team to review and classify label May 25, 2021
@abellina
Copy link
Collaborator

@sameerz I still haven't been able to reproduce, and the UCX job hasn't failed since this was filed. Will keep looking at it.

@abellina
Copy link
Collaborator

Still no repro success, my summary so far:

  • This test does not cause a shuffle: Scan ExistingRDD > GpuRowToColumnar > GpuSort > GpuColumnarToRow, so it's not related to UCX, though the timing was suspect (right after the Active Message merge).
  • I tried going back to jars from May 20, May 19, and latest.
  • I tried added many iterations for the failing test test_single_sort_in_part[Column<b'a DESC NULLS FIRST'>-Float(not_null)], trying to also set the seed parameter to a counter (so in my case 1K iterations = 1K seeds), and also running with smaller batch sizes so we get many batches per partition, with many rows (up to 2M) for the generated float dataframe.
  • I didn't go back pre-OutOfCore sort, since the error happened (way after) it was merged.
  • Tried locally on RTX 6000 and on T4 in EGX (manually, and the CI).

@revans2 can you think of a dimension I haven't looked at here?

@revans2
Copy link
Collaborator

revans2 commented May 27, 2021

I have absolutely no idea. The only thing I can think of is that it must be a race condition somewhere and it is very very rare to lose that race.

@abellina
Copy link
Collaborator

abellina commented Jun 1, 2021

Closing this as we have not have been able to reproduce.

@abellina abellina closed this as completed Jun 1, 2021
@abellina abellina changed the title [BUG] test_single_sort_in_part failed in nightly UCX integration [BUG] test_single_sort_in_part is failed in nightly UCX and AQE (no UCX) integration Jun 2, 2021
@abellina abellina reopened this Jun 2, 2021
@abellina
Copy link
Collaborator

abellina commented Jun 2, 2021

Opening again, since we saw another occurrence in a different CI job that runs with AQE, but not with UCX.

@abellina
Copy link
Collaborator

abellina commented Jun 9, 2021

Made some progress on this today. First, I can reproduce a similar error by killing an executor process during the test (which may or may not be the root cause). Second, in the test that failed last (the AQE log) I clearly see this behavior:

13:02:47  src/main/python/sort_test.py::test_single_nested_orderby_with_limit[Column<b'a DESC NULLS LAST'>1-Array(String)] XFAIL [ 88%]
13:02:49  src/main/python/sort_test.py::test_single_nested_orderby_with_limit[Column<b'a DESC NULLS LAST'>1-Map(String(not_null),Map(String(not_null),String))] 21/06/02 18:03:40 WARN HeartbeatReceiver: Removing executor 0 with no recent heartbeats: 163712 ms exceeds timeout 120000 ms
**13:02:49  21/06/02 18:03:40 ERROR TaskSchedulerImpl: Lost executor 0 on 10.233.106.251: Executor heartbeat timed out after 163712 ms**
13:02:51  XFAIL [ 88%]
13:02:59  src/main/python/sort_test.py::test_single_sort_in_part[Column<b'a ASC NULLS FIRST'>-Byte] FAILED [ 88%]

I am pretty convinced this is the reason. When an executor dies and likely comes back between the CPU and the GPU session, we are executing each sub-test against different clusters. In terms of the function we are testing sortWithinPartitions, it makes sense that it would yield different results here, since the slices of the original dataframe are partition slices (unless we override defaultParallelism). Spark will pick the number of cores available in the cluster as the value for defaultParallelism, which in this scenario seems to be different between CPU and GPU, as the executor is getting re-launched. The sort is working fine, but it is working with differently shaped slices, so when we compare row by row for the whole result, we have an issue, since some rows could have been sorted very differently, since they belong to different slices.

One could "fix" this is by setting defaultParallelism, so we ask Spark to generate i.e. 10 map partitions (always), regardless of where they get executed. So far that works locally. But what we really need is executor logs, which we do for some builds and we should now get with the AQE build. At this rate I am more interested in tests before this test that could have caused the executor to miss its heartbeat => that seems like the real root cause.

@abellina
Copy link
Collaborator

I confirmed that killing an executor changes the number of partitions in the tests, and in this case causes the sort to not match. I am adding a numSlices parameter so we can hard code the number of partitions.

The task isn't done, but we should monitor executors failing via #2698, and focus on the test that actually cause the failure to begin with.

@abellina
Copy link
Collaborator

Linking the spark issue that we see as the culprit in spark 3.0.2: apache/spark#31540

@abellina
Copy link
Collaborator

Closing this, as I think we handled this test. There could be other failures in CI that task retries mask, but we've disabled task retry for spark 3.1.1+, so we should be able to open a new issue if we see this again.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working P0 Must have for release
Projects
None yet
Development

No branches or pull requests

4 participants