Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

"Multiple concurrent writes to Dict detected!" with DTables.reduce #437

Closed
StevenWhitaker opened this issue Sep 27, 2023 · 11 comments · Fixed by #443
Closed

"Multiple concurrent writes to Dict detected!" with DTables.reduce #437

StevenWhitaker opened this issue Sep 27, 2023 · 11 comments · Fixed by #443

Comments

@StevenWhitaker
Copy link

I occasionally get the above error message with the following example. I'm not sure if this issue should go in DTables.jl, but I'm putting it here because the other issues I posted there got migrated here :)

Contents of mwe.jl:

using Distributed
nworkers = 1
addprocs(nworkers - nprocs() + 1)

@everywhere using DTables

remotecall_fetch(2) do
    N = 100
    dt = DTable((a = 1:N, b = rand(N)))
    fetch(reduce(+, dt; cols = [:a]))
end

Results:

julia> include("mwe.jl")
(a = 5050,)

julia> include("mwe.jl")
(a = 5050,)

julia> include("mwe.jl")
(a = 5050,)

julia> include("mwe.jl")
(a = 5050,)

julia> include("mwe.jl")
ERROR: LoadError: On worker 2:
ThunkFailedException:
  Root Exception Type: RemoteException
  Root Exception:
ThunkFailedException:
  Root Exception Type: RemoteException
  Root Exception:
AssertionError: Multiple concurrent writes to Dict detected!
Stacktrace:
  [1] rehash!
    @ ./dict.jl:208
  [2] _setindex!
    @ ./dict.jl:355 [inlined]
  [3] get!
    @ ./dict.jl:477
  [4] reschedule_syncdeps!
    @ ~/.julia/packages/Dagger/ZOt9H/src/sch/util.jl:116
  [5] reschedule_syncdeps!
    @ ~/.julia/packages/Dagger/ZOt9H/src/sch/util.jl:100 [inlined]
  [6] #eager_submit_internal!#96
    @ ~/.julia/packages/Dagger/ZOt9H/src/submission.jl:93
  [7] eager_submit_internal!
    @ ~/.julia/packages/Dagger/ZOt9H/src/submission.jl:11
  [8] #invokelatest#2
    @ ./essentials.jl:819 [inlined]
  [9] invokelatest
    @ ./essentials.jl:816 [inlined]
 [10] #29
    @ ~/.julia/packages/Dagger/ZOt9H/src/sch/dynamic.jl:67 [inlined]
 [11] lock
    @ ./lock.jl:229
 [12] macro expansion
    @ ~/.julia/packages/Dagger/ZOt9H/src/sch/dynamic.jl:66 [inlined]
 [13] #28
    @ ./task.jl:514
Stacktrace:
  [1] exec!
    @ ~/.julia/packages/Dagger/ZOt9H/src/sch/dynamic.jl:108
  [2] eager_submit!
    @ ~/.julia/packages/Dagger/ZOt9H/src/submission.jl:126
  [3] eager_launch!
    @ ~/.julia/packages/Dagger/ZOt9H/src/submission.jl:195
  [4] enqueue!
    @ ~/.julia/packages/Dagger/ZOt9H/src/queue.jl:12 [inlined]
  [5] #spawn#88
    @ ~/.julia/packages/Dagger/ZOt9H/src/thunk.jl:322
  [6] spawn
    @ ~/.julia/packages/Dagger/ZOt9H/src/thunk.jl:286 [inlined]
  [7] #66
    @ ~/.julia/packages/Dagger/ZOt9H/src/thunk.jl:401 [inlined]
  [8] iterate
    @ ./generator.jl:47 [inlined]
  [9] collect
    @ ./array.jl:782
 [10] #65
    @ ~/.julia/packages/DTables/bA4g3/src/operations/operations.jl:146
 [11] #invokelatest#2
    @ ./essentials.jl:819 [inlined]
 [12] invokelatest
    @ ./essentials.jl:816 [inlined]
 [13] #43
    @ ~/.julia/packages/Dagger/ZOt9H/src/processor.jl:162
Stacktrace:
  [1] wait
    @ ./task.jl:349 [inlined]
  [2] fetch
    @ ./task.jl:369 [inlined]
  [3] #execute!#42
    @ ~/.julia/packages/Dagger/ZOt9H/src/processor.jl:172
  [4] execute!
    @ ~/.julia/packages/Dagger/ZOt9H/src/processor.jl:157
  [5] #156
    @ ~/.julia/packages/Dagger/ZOt9H/src/sch/Sch.jl:1551 [inlined]
  [6] #21
    @ ~/.julia/packages/Dagger/ZOt9H/src/options.jl:17 [inlined]
  [7] #1
    @ ~/.julia/packages/ScopedValues/92HJZ/src/ScopedValues.jl:163
  [8] with_logstate
    @ ./logging.jl:514
  [9] with_logger
    @ ./logging.jl:626 [inlined]
 [10] enter_scope
    @ ~/.julia/packages/ScopedValues/92HJZ/src/payloadlogger.jl:17 [inlined]
 [11] with
    @ ~/.julia/packages/ScopedValues/92HJZ/src/ScopedValues.jl:162
 [12] #scoped#4
    @ ./deprecated.jl:116
 [13] scoped
    @ ./deprecated.jl:113
 [14] with_options
    @ ~/.julia/packages/Dagger/ZOt9H/src/options.jl:16
 [15] do_task
    @ ~/.julia/packages/Dagger/ZOt9H/src/sch/Sch.jl:1549
 [16] macro expansion
    @ ~/.julia/packages/Dagger/ZOt9H/src/sch/Sch.jl:1243 [inlined]
 [17] #130
    @ ./task.jl:134
  Root Thunk:  Thunk(id=22, #65(Union{Dagger.EagerThunk, Dagger.Chunk}[Dagger.Chunk{NamedTuple{(:a, :b), Tuple{UnitRange{Int64}, Vector{Float64}}}, MemPool.DRef, Dagger.OSProc, Dagger.AnyScope}(NamedTuple{(:a, :b), Tuple{UnitRange{Int64}, Vector{Float64}}}, Dagger.UnitDomain(), MemPool.DRef(2, 30, 0x0000000000000360), Dagger.OSProc(2), Dagger.AnyScope(), false)], +, [:a], Base._InitialValue()))
  Inner Thunk: Thunk(id=25, #47(+, Thunk[24](#44, Any[:a, Thunk[22](#65, Any[Union{Dagger.EagerThunk, Dagger.Chunk}[Dagger.Chunk{NamedTuple{(:a, :b), Tuple{UnitRange{Int64}, Vector{Float64}}}, MemPool.DRef, Dagger.OSProc, Dagger.AnyScope}(NamedTuple{(:a, :b), Tuple{UnitRange{Int64}, Vector{Float64}}}, Dagger.UnitDomain(), MemPool.DRef(2, 30, 0x0000000000000360), Dagger.OSProc(2), Dagger.AnyScope(), false)], +, [:a], Base._InitialValue()])]), Base._InitialValue()))
  This Thunk:  Thunk(id=25, #47(+, Thunk[24](#44, Any[:a, Thunk[22](#65, Any[Union{Dagger.EagerThunk, Dagger.Chunk}[Dagger.Chunk{NamedTuple{(:a, :b), Tuple{UnitRange{Int64}, Vector{Float64}}}, MemPool.DRef, Dagger.OSProc, Dagger.AnyScope}(NamedTuple{(:a, :b), Tuple{UnitRange{Int64}, Vector{Float64}}}, Dagger.UnitDomain(), MemPool.DRef(2, 30, 0x0000000000000360), Dagger.OSProc(2), Dagger.AnyScope(), false)], +, [:a], Base._InitialValue()])]), Base._InitialValue()))
Stacktrace:
  [1] #fetch#70
    @ ~/.julia/packages/Dagger/ZOt9H/src/eager_thunk.jl:16
  [2] fetch
    @ ~/.julia/packages/Dagger/ZOt9H/src/eager_thunk.jl:11 [inlined]
  [3] #fetch#75
    @ ~/.julia/packages/Dagger/ZOt9H/src/eager_thunk.jl:58 [inlined]
  [4] fetch
    @ ~/.julia/packages/Dagger/ZOt9H/src/eager_thunk.jl:54 [inlined]
  [5] _broadcast_getindex_evalf
    @ ./broadcast.jl:683 [inlined]
  [6] _broadcast_getindex
    @ ./broadcast.jl:656 [inlined]
  [7] getindex
    @ ./broadcast.jl:610 [inlined]
  [8] copy
    @ ./broadcast.jl:912 [inlined]
  [9] materialize
    @ ./broadcast.jl:873 [inlined]
 [10] #50
    @ ~/.julia/packages/DTables/bA4g3/src/operations/operations.jl:115
 [11] #invokelatest#2
    @ ./essentials.jl:819 [inlined]
 [12] invokelatest
    @ ./essentials.jl:816 [inlined]
 [13] #43
    @ ~/.julia/packages/Dagger/ZOt9H/src/processor.jl:162
Stacktrace:
  [1] wait
    @ ./task.jl:349 [inlined]
  [2] fetch
    @ ./task.jl:369 [inlined]
  [3] #execute!#42
    @ ~/.julia/packages/Dagger/ZOt9H/src/processor.jl:172
  [4] execute!
    @ ~/.julia/packages/Dagger/ZOt9H/src/processor.jl:157
  [5] #156
    @ ~/.julia/packages/Dagger/ZOt9H/src/sch/Sch.jl:1551 [inlined]
  [6] #21
    @ ~/.julia/packages/Dagger/ZOt9H/src/options.jl:17 [inlined]
  [7] #1
    @ ~/.julia/packages/ScopedValues/92HJZ/src/ScopedValues.jl:163
  [8] with_logstate
    @ ./logging.jl:514
  [9] with_logger
    @ ./logging.jl:626 [inlined]
 [10] enter_scope
    @ ~/.julia/packages/ScopedValues/92HJZ/src/payloadlogger.jl:17 [inlined]
 [11] with
    @ ~/.julia/packages/ScopedValues/92HJZ/src/ScopedValues.jl:162
 [12] #scoped#4
    @ ./deprecated.jl:116
 [13] scoped
    @ ./deprecated.jl:113
 [14] with_options
    @ ~/.julia/packages/Dagger/ZOt9H/src/options.jl:16
 [15] do_task
    @ ~/.julia/packages/Dagger/ZOt9H/src/sch/Sch.jl:1549
 [16] macro expansion
    @ ~/.julia/packages/Dagger/ZOt9H/src/sch/Sch.jl:1243 [inlined]
 [17] #130
    @ ./task.jl:134
  This Thunk:  Thunk(id=26, #50([:a], Dagger.EagerThunk[EagerThunk (finished)]))
Stacktrace:
  [1] #fetch#70
    @ ~/.julia/packages/Dagger/ZOt9H/src/eager_thunk.jl:16
  [2] fetch
    @ ~/.julia/packages/Dagger/ZOt9H/src/eager_thunk.jl:11 [inlined]
  [3] #fetch#75
    @ ~/.julia/packages/Dagger/ZOt9H/src/eager_thunk.jl:58 [inlined]
  [4] fetch
    @ ~/.julia/packages/Dagger/ZOt9H/src/eager_thunk.jl:54 [inlined]
  [5] #13
    @ ~/tmp/mwe.jl:10
  [6] #invokelatest#2
    @ ./essentials.jl:819 [inlined]
  [7] invokelatest
    @ ./essentials.jl:816
  [8] #110
    @ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/process_messages.jl:285
  [9] run_work_thunk
    @ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/process_messages.jl:70
 [10] macro expansion
    @ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/process_messages.jl:285 [inlined]
 [11] #109
    @ ./task.jl:514
Stacktrace:
 [1] remotecall_fetch(::Function, ::Distributed.Worker; kwargs::Base.Pairs{Symbol, Union{}, Tuple{}, NamedTuple{(), Tuple{}}})
   @ Distributed ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/remotecall.jl:465
 [2] remotecall_fetch(::Function, ::Distributed.Worker)
   @ Distributed ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/remotecall.jl:454
 [3] #remotecall_fetch#162
   @ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/remotecall.jl:492 [inlined]
 [4] remotecall_fetch(::Function, ::Int64)
   @ Distributed ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/remotecall.jl:492
 [5] top-level scope
   @ ~/tmp/mwe.jl:7
 [6] include(fname::String)
   @ Base.MainInclude ./client.jl:478
 [7] top-level scope
   @ REPL[1]:1
in expression starting at /home/steven/tmp/mwe.jl:7

Notice that not every run results in the error.

I also occasionally see the following error printed, but the result of the remotecall_fetch still returns normally with the correct answer:

Unhandled Task ERROR: ArgumentError: destination has fewer elements than required
Stacktrace:
 [1] copyto!(dest::Vector{Dagger.Sch.ProcessorState}, src::Base.ValueIterator{Dict{Dagger.Processor, Dagger.Sch.ProcessorState}})
   @ Base ./abstractarray.jl:949
 [2] _collect
   @ ./array.jl:713 [inlined]
 [3] collect
   @ ./array.jl:707 [inlined]
 [4] macro expansion
   @ ~/.julia/packages/Dagger/ZOt9H/src/sch/Sch.jl:1189 [inlined]
 [5] (::Dagger.Sch.var"#126#133"{Dagger.Sch.ProcessorInternalState, UInt64, RemoteChannel{Channel{Any}}, Dagger.ThreadProc})()
   @ Dagger.Sch ./task.jl:134

This is with DTables v0.4.1 and Dagger v0.18.3.

@jpsamaroo
Copy link
Member

jpsamaroo commented Sep 27, 2023

Yep, definitely a Dagger bug (and in the same newly-upgraded submission logic)! I see the bug - I forgot to add locking at

return remotecall_fetch(eager_submit_internal!, 1, (ntasks, uid, future, finalizer_ref, f, args, options, true))
(but do it correctly in the else branch). Will plan to push a fix tonight, and also will take a look at that copyto! error and see if it's related.

Thanks again for the excellent reporting!

@StevenWhitaker
Copy link
Author

Thanks, I'm just glad you're able to fix these issues pretty quickly!

@StevenWhitaker
Copy link
Author

I'm finding a possibly related error in some code I have that looks similar to the OP example.

Code excerpt:

gdt = groupby(dt, cols)
gkeys = sort!(collect(keys(gdt)))
sums = map(gkeys) do key
    reduce(+, gdt[key]; cols = sum_cols)
end .|> fetch

Error (it's printed to a log, so the nice formatting is lost, unfortunately):

      From worker 2:    └  Dagger.ThunkFailedException{RemoteException}(Thunk[154](#50, Any[[column names], Dagger.EagerThunk[EagerThunk (finished), EagerThunk (finished), EagerThunk (finished), EagerThunk (finished), EagerThunk (finished), EagerThunk (finished), EagerThunk (finished), EagerThunk (finished), EagerThunk (finished), EagerThunk (finished), EagerThunk (finished), EagerThunk (finished)]]), Thunk[154](#50, Any[[column names], Dagger.EagerThunk[EagerThunk (finished), EagerThunk (finished), EagerThunk (finished), EagerThunk (finished), EagerThunk (finished), EagerThunk (finished), EagerThunk (finished), EagerThunk (finished), EagerThunk (finished), EagerThunk (finished), EagerThunk (finished), EagerThunk (finished)]]), RemoteException(3, CapturedException(CapturedException(ConcurrencyViolationError("lock must be held"), Any[(concurrency_violation at condition.jl:8, 1), (assert_havelock at condition.jl:25 [inlined], 1), (assert_havelock at condition.jl:48 [inlined], 1), (assert_havelock at condition.jl:72 [inlined], 1), (_wait2 at condition.jl:83, 1), (#wait#621 at condition.jl:127, 1), (wait at condition.jl:125 [inlined], 1), (wait_for_conn at cluster.jl:195, 1), (check_worker_state at cluster.jl:170, 1), (send_msg_ at messages.jl:172, 1), (send_msg at messages.jl:122 [inlined], 1), (#remotecall_fetch#159 at remotecall.jl:460, 1), (remotecall_fetch at remotecall.jl:454, 1), (#remotecall_fetch#162 at remotecall.jl:492 [inlined], 1), (remotecall_fetch at remotecall.jl:492 [inlined], 1), (#171 at datastore.jl:424 [inlined], 1), (forwardkeyerror at datastore.jl:409, 1), (poolget at datastore.jl:423, 1), (move at chunks.jl:98, 1), (move at chunks.jl:96 [inlined], 1), (move at chunks.jl:102, 1), (#fetch#70 at eager_thunk.jl:21, 1), (fetch at eager_thunk.jl:11 [inlined], 1), (#fetch#75 at eager_thunk.jl:58 [inlined], 1), (fetch at eager_thunk.jl:54 [inlined], 1), (_broadcast_getindex_evalf at broadcast.jl:683 [inlined], 1), (_broadcast_getindex at broadcast.jl:656 [inlined], 1), (getindex at broadcast.jl:610 [inlined], 1), (copy at broadcast.jl:912 [inlined], 1), (materialize at broadcast.jl:873 [inlined], 1), (#50 at operations.jl:115, 1), (#invokelatest#2 at essentials.jl:819 [inlined], 1), (invokelatest at essentials.jl:816 [inlined], 1), (#43 at processor.jl:162, 1)]), Any[(wait at task.jl:349 [inlined], 1), (fetch at task.jl:369 [inlined], 1), (#execute!#42 at processor.jl:172, 1), (execute! at processor.jl:157, 1), (#156 at Sch.jl:1551 [inlined], 1), (#21 at options.jl:17 [inlined], 1), (#1 at ScopedValues.jl:163, 1), (with_logstate at logging.jl:514, 1), (with_logger at logging.jl:626 [inlined], 1), (enter_scope at payloadlogger.jl:17 [inlined], 1), (with at ScopedValues.jl:162, 1), (#scoped#4 at deprecated.jl:116, 1), (scoped at deprecated.jl:113, 1), (with_options at options.jl:16, 1), (do_task at Sch.jl:1549, 1), (macro expansion at Sch.jl:1243 [inlined], 1), (#130 at task.jl:134, 1)])))
      From worker 2:    [ 2023-09-28T08:31:34.312 ] pid: 2137 proc: 2 Info:    > Dagger.ThunkFailedException{RemoteException}(Thunk[154](#50, Any[[column names], Dagger.EagerThunk[EagerThunk (finished), EagerThunk (finished), EagerThunk (finished), EagerThunk (finished), EagerThunk (finished), EagerThunk (finished), EagerThunk (finished), EagerThunk (finished), EagerThunk (finished), EagerThunk (finished), EagerThunk (finished), EagerThunk (finished)]]), Thunk[154](#50, Any[[column names], Dagger.EagerThunk[EagerThunk (finished), EagerThunk (finished), EagerThunk (finished), EagerThunk (finished), EagerThunk (finished), EagerThunk (finished), EagerThunk (finished), EagerThunk (finished), EagerThunk (finished), EagerThunk (finished), EagerThunk (finished), EagerThunk (finished)]]), RemoteException(3, CapturedException(CapturedException(ConcurrencyViolationError("lock must be held"), Any[(concurrency_violation at condition.jl:8, 1), (assert_havelock at condition.jl:25 [inlined], 1), (assert_havelock at condition.jl:48 [inlined], 1), (assert_havelock at condition.jl:72 [inlined], 1), (_wait2 at condition.jl:83, 1), (#wait#621 at condition.jl:127, 1), (wait at condition.jl:125 [inlined], 1), (wait_for_conn at cluster.jl:195, 1), (check_worker_state at cluster.jl:170, 1), (send_msg_ at messages.jl:172, 1), (send_msg at messages.jl:122 [inlined], 1), (#remotecall_fetch#159 at remotecall.jl:460, 1), (remotecall_fetch at remotecall.jl:454, 1), (#remotecall_fetch#162 at remotecall.jl:492 [inlined], 1), (remotecall_fetch at remotecall.jl:492 [inlined], 1), (#171 at datastore.jl:424 [inlined], 1), (forwardkeyerror at datastore.jl:409, 1), (poolget at datastore.jl:423, 1), (move at chunks.jl:98, 1), (move at chunks.jl:96 [inlined], 1), (move at chunks.jl:102, 1), (#fetch#70 at eager_thunk.jl:21, 1), (fetch at eager_thunk.jl:11 [inlined], 1), (#fetch#75 at eager_thunk.jl:58 [inlined], 1), (fetch at eager_thunk.jl:54 [inlined], 1), (_broadcast_getindex_evalf at broadcast.jl:683 [inlined], 1), (_broadcast_getindex at broadcast.jl:656 [inlined], 1), (getindex at broadcast.jl:610 [inlined], 1), (copy at broadcast.jl:912 [inlined], 1), (materialize at broadcast.jl:873 [inlined], 1), (#50 at operations.jl:115, 1), (#invokelatest#2 at essentials.jl:819 [inlined], 1), (invokelatest at essentials.jl:816 [inlined], 1), (#43 at processor.jl:162, 1)]), Any[(wait at task.jl:349 [inlined], 1), (fetch at task.jl:369 [inlined], 1), (#execute!#42 at processor.jl:172, 1), (execute! at processor.jl:157, 1), (#156 at Sch.jl:1551 [inlined], 1), (#21 at options.jl:17 [inlined], 1), (#1 at ScopedValues.jl:163, 1), (with_logstate at logging.jl:514, 1), (with_logger at logging.jl:626 [inlined], 1), (enter_scope at payloadlogger.jl:17 [inlined], 1), (with at ScopedValues.jl:162, 1), (#scoped#4 at deprecated.jl:116, 1), (scoped at deprecated.jl:113, 1), (with_options at options.jl:16, 1), (do_task at Sch.jl:1549, 1), (macro expansion at Sch.jl:1243 [inlined], 1), (#130 at task.jl:134, 1)])))
      From worker 2:    [ 2023-09-28T08:31:34.444 ] pid: 2137 proc: 2 Info:  Base.StackTraces.StackFrame[fetch(t::Dagger.ThunkFuture; proc::Dagger.OSProc, raw::Bool) at eager_thunk.jl:16, fetch at eager_thunk.jl:11 [inlined], #fetch#75 at eager_thunk.jl:58 [inlined], fetch at eager_thunk.jl:54 [inlined], |> at operators.jl:907 [inlined], _broadcast_getindex_evalf at broadcast.jl:683 [inlined], _broadcast_getindex at broadcast.jl:656 [inlined], getindex at broadcast.jl:610 [inlined], copyto_nonleaf!(dest::Vector{NamedTuple{(column names), NTuple{12, Float64}}}, bc::Base.Broadcast.Broadcasted{Base.Broadcast.DefaultArrayStyle{1}, Tuple{Base.OneTo{Int}}, typeof(|>), Tuple{Base.Broadcast.Extruded{Vector{Dagger.EagerThunk}, Tuple{Bool}, Tuple{Int}}, Base.RefValue{typeof(fetch)}}}, iter::Base.OneTo{Int}, state::Int, count::Int) at broadcast.jl:1068, copy at broadcast.jl:920 [inlined], materialize(bc::Base.Broadcast.Broadcasted{Base.Broadcast.DefaultArrayStyle{1}, Nothing, typeof(|>), Tuple{Vector{Dagger.EagerThunk}, Base.RefValue{typeof(fetch)}}}) at broadcast.jl:873, stacktrace through my code, (::Distributed.var"#110#112"{Distributed.CallMsg{:call_fetch}})() at process_messages.jl:285, run_work_thunk(thunk::Distributed.var"#110#112"{Distributed.CallMsg{:call_fetch}}, print_error::Bool) at process_messages.jl:70, macro expansion at process_messages.jl:285 [inlined], (::Distributed.var"#109#111"{Distributed.CallMsg{:call_fetch}, Distributed.MsgHeader, Sockets.TCPSocket})() at task.jl:514]

The key part of the error I think is: ConcurrencyViolationError("lock must be held"). Because it has to do with locking I thought it might be related to the OP. I tried isolating the code in a MWE but ended up getting the "Multiple concurrent writes to Dict detected!" error instead.

@jpsamaroo
Copy link
Member

Aside from the most obvious issue (which I have fixed locally), I'm also seeing a variety of concurrency issues, and am trying to narrow them down.

@StevenWhitaker
Copy link
Author

Thanks for the update, hopefully the other issues can be resolved soon as well!

@StevenWhitaker
Copy link
Author

@jpsamaroo Any updates on this front?

@jpsamaroo
Copy link
Member

Sorry, not yet, I'm in the middle of getting ready to move across the US, so I will have to get back to this over the weekend/next week. I did find a variety of nearly identical segfaults across the stack, so there is definitely a common source, I just need to find it.

@jpsamaroo
Copy link
Member

I've narrowed this down to some issue with the usage of WeakRef within WeakChunk, where not using WeakRef (just storing the Chunk directly) fixes the segfaults that I was seeing. I'm going to try to put together a workaround that instead uses WeakKeyDict, but if that doesn't work, I'll just drop the usage of WeakRef until I can get this figured out and fixed.

@StevenWhitaker
Copy link
Author

Great, thanks for the update and for your work on this; I really appreciate it!

I'm in the middle of getting ready to move across the US

I got to do that a year ago; it's definitely a lot of work, so I understand how busy you must be! Hopefully that all goes smoothly for you!

@jpsamaroo
Copy link
Member

I've found the issue - WeakRef serialization is not implemented, and we were serializing them during task submission (hence why this only started occurring recently). I'm working on a workaround, but running into other fun bugs in the process, so this may take a few more days. Thank you for your patience!

@jpsamaroo
Copy link
Member

Ok, I've got a fix locally for this that gets the following example working:

using Dagger, DTables
using Distributed
addprocs(1)

@everywhere using DTables

remotecall_fetch(2) do
    N = 2
    dt = DTable((a = 1:N, b = rand(N)))
    @sync for i in 1:20
        Threads.@spawn begin
            println("Iter $i")
            fetch(reduce(+, dt; cols = [:a]))
        end
    end
end

I'll push the full fix for it soon!

Another fun issue that was found in the process of debugging this: Chunks hash equivalently when they refer to the same data, even if the Chunks are different objects. This makes many internal parts of Dagger much simpler to work with (since they do refer to the same data, in the end, so they are functionally interchangeable). However, when we serialize the same Chunk across the wire, we get back different Chunk objects on the other end. This means that we need to be careful when working with deserialized Chunks to make sure that we don't naively insert them as keys into Dicts for the purpose of GC object retention - when we need to do this, we need to instead see if the Dict already has an equivalent Chunk, and if so, use that Chunk instead of the one we're currently looking at (which is a logical duplicate of it). If we don't, we might "bump out" an equivalent Chunk from the Dict that really needed to stay there to keep the referenced remote data alive, causing the GC to delete things that we were relying on. Oh, the joys of remote garbage collection 😄

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants