Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CH] improve mutex usage in shuffle writer #3722

Closed
taiyang-li opened this issue Nov 15, 2023 · 3 comments · Fixed by #3728
Closed

[CH] improve mutex usage in shuffle writer #3722

taiyang-li opened this issue Nov 15, 2023 · 3 comments · Fixed by #3728
Labels
enhancement New feature or request

Comments

@taiyang-li
Copy link
Contributor

Description

rt

@taiyang-li taiyang-li added the enhancement New feature or request label Nov 15, 2023
taiyang-li added a commit to bigo-sg/gluten that referenced this issue Nov 15, 2023
@taiyang-li
Copy link
Contributor Author

taiyang-li commented Nov 15, 2023

优化方案

  1. 每个Partition对象只对两个接口加锁,用于保护成员变量blocks
  • addBlock: 将block append到blocks中
  • spill: 将blocks通过writer序列化并清空
  1. 去掉没用的接口
  • clear: 清空blocks已经在spill中做了
  • empty: 判断blocks是否为空已经在spill中做了

@taiyang-li
Copy link
Contributor Author

taiyang-li commented Nov 15, 2023

原先代码使用锁的问题

  1. Partition::clear并不保证能执行成功(如果没抢到锁,blocks.clear不会执行,但此时这些blocks可能已经被发送给celeborn,可能有同步问题)
    企业微信截图_17000321077865

  2. Partition::empty并不是线程安全的,该函数在CelebornPartitionWriter::evictPartitions被调用,可能有同步问题
    企业微信截图_17000321263088

  3. CelebornPartitionWriter::evictPartitions中判断partition是否为空,不为空则执行spill, spill完成后执行clear。这三个操作并不是原子的,多线程下可能会有同步问题。
    image

@taiyang-li
Copy link
Contributor Author

taiyang-li commented Nov 16, 2023

发现另外一个问题,生产环境执行application_1691660805290_5264904 (sql: d_8832_0)任务时,发现executor 1283会卡死。

通过thread dump可以看到

io.glutenproject.vectorized.CHShuffleSplitterJniWrapper.evict(Native Method)
org.apache.spark.shuffle.CHCelebornHashBasedColumnarShuffleWriter$$anon$1.spill(CHCelebornHashBasedColumnarShuffleWriter.scala:85)
io.glutenproject.memory.memtarget.spark.RegularMemoryConsumer.spill(RegularMemoryConsumer.java:62)
org.apache.spark.memory.TaskMemoryManager.trySpillAndAcquire(TaskMemoryManager.java:225)
org.apache.spark.memory.TaskMemoryManager.acquireExecutionMemory(TaskMemoryManager.java:188) => holding Monitor(org.apache.spark.memory.TaskMemoryManager@250651367})
org.apache.spark.memory.MemoryConsumer.acquireMemory(MemoryConsumer.java:136)
io.glutenproject.memory.memtarget.spark.RegularMemoryConsumer.borrow(RegularMemoryConsumer.java:108)
io.glutenproject.memory.alloc.CHManagedCHReservationListener.reserve(CHManagedCHReservationListener.java:86) => holding Monitor(io.glutenproject.memory.alloc.CHManagedCHReservationListener@43784731})
io.glutenproject.memory.alloc.CHManagedCHReservationListener.reserveOrThrow(CHManagedCHReservationListener.java:48)
io.glutenproject.vectorized.CHShuffleSplitterJniWrapper.split(Native Method)
org.apache.spark.shuffle.CHCelebornHashBasedColumnarShuffleWriter.internalWrite(CHCelebornHashBasedColumnarShuffleWriter.scala:100)
org.apache.spark.shuffle.CelebornHashBasedColumnarShuffleWriter.write(CelebornHashBasedColumnarShuffleWriter.scala:84)
org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:60)
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
org.apache.spark.scheduler.Task.run(Task.scala:136)
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
org.apache.spark.executor.Executor$TaskRunner$$Lambda$644/1449317960.apply(Unknown Source)
org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
java.lang.Thread.run(Thread.java:745)

这里hang死的原因是出现了死锁。可能的原因是:

  • C++中shuffle writer执行split时,某个分区拿到了一把锁,并申请一些内存
  • C++中申请内存会调用before_alloc hook函数,调用ReservationListenerWrapper::reserveOrThrow
  • JVM中memory manager发现内存不够,调用C++中shuffle writer的spill
  • spill中同样的分区要再次获取相同的锁,导致死锁

综上,导致死锁的原因是同一个线程中重复获取锁。解决方案:使用std::recursive_mutex

taiyang-li added a commit to bigo-sg/gluten that referenced this issue Nov 16, 2023
liuneng1994 pushed a commit that referenced this issue Nov 30, 2023
What changes were proposed in this pull request?
(Please fill in changes proposed in this fix)

(Fixes: #3722)

Changes

Improve style of some codes in shuffle writer.
Make sure that columns buffer do not exceeds split size in shuffle writer
Remove all the locks in shuffle writer for write/spill/stop are executed in single jvm thread.
Protect memory spill from being called recursively.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging a pull request may close this issue.

1 participant