Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Some possible bug in multi-threaded update of pools #406

Open
bkamins opened this issue Apr 11, 2024 · 4 comments
Open

Some possible bug in multi-threaded update of pools #406

bkamins opened this issue Apr 11, 2024 · 4 comments

Comments

@bkamins
Copy link
Member

bkamins commented Apr 11, 2024

Found in: https://github.com/JuliaData/DataFrames.jl/actions/runs/8635821381/job/23674443172

I can reproduce it by starting Julia with julia --check-bounds=yes -t 4 and running the following code:

@testset "CategoricalArray thread safety" begin
    # These tests do not actually trigger multithreading bugs,
    # but at least they check that the code that disables multithreading
    # with CategoricalArray when levels are different works
    Random.seed!(35)
    df = DataFrame(x=rand(1:10, 100),
                   y=categorical(rand(10:15, 100)),
                   z=categorical(rand(0:20, 100)))
    df.y2 = reverse(df.y) # Same levels
    gd = groupby(df, :x)

    @test combine(gd, :y => (y -> y[1]) => :res) ==
        combine(gd, [:y, :y2] => ((y, x) -> y[1]) => :res) ==
        combine(gd, [:y, :x] => ((y, x) -> y[1]) => :res) ==
        combine(gd, [:y, :z] => ((y, z) -> y[1]) => :res) ==
        combine(gd, :y => (y -> unwrap(y[1])) => :res)

    @test combine(gd, [:x, :y, :y2] =>
                          ((x, y, y2) -> x[1] <= 5 ? y[1] : y2[1]) => :res) ==
        combine(gd, [:x, :y, :y2] =>
                        ((x, y, y2) -> x[1] <= 5 ? unwrap(y[1]) : unwrap(y2[1])) => :res)

    @test combine(gd, [:x, :y, :z] =>
                          ((x, y, z) -> x[1] <= 5 ? y[1] : z[1]) => :res) ==
        combine(gd, [:x, :y, :z] =>
                        ((x, y, z) -> x[1] <= 5 ? unwrap(y[1]) : unwrap(z[1])) => :res)
end

Note that the error does not happen always unfortunately.

The relevant part of error is:

          nested task error: BoundsError: attempt to access 7-element Vector{UInt32} at index [14]
          Stacktrace:
           [1] throw_boundserror(A::Vector{UInt32}, I::Tuple{Int64})
             @ Base .\essentials.jl:14
           [2] getindex
             @ .\essentials.jl:891 [inlined]
           [3] update_refs!(A::CategoricalVector{Int64, UInt32, Int64, CategoricalValue{Int64, UInt32}, Union{}}, newlevels::Vector{Int64})
             @ CategoricalArrays \CategoricalArrays\0yLZN\src\array.jl:472
           [4] merge_pools!(A::CategoricalVector{Int64, UInt32, Int64, CategoricalValue{Int64, UInt32}, Union{}}, B::CategoricalValue{Int64, UInt32}; updaterefs::Bool, updatepool::Bool)
             @ CategoricalArrays \CategoricalArrays\0yLZN\src\array.jl:489
           [5] merge_pools!
             @ \CategoricalArrays\0yLZN\src\array.jl:477 [inlined]
           [6] setindex!
             @ \CategoricalArrays\0yLZN\src\array.jl:500 [inlined]
           [7] fill_row!(row::@NamedTuple{x1::CategoricalValue{Int64, UInt32}}, outcols::Tuple{CategoricalVector{Int64, UInt32, Int64, CategoricalValue{Int64, UInt32}, Union{}}}, i::Int64, colstart::Int64, colnames::Tuple{Symbol})
@nalimilan
Copy link
Member

nalimilan commented Apr 19, 2024

Reduced reproducer:

using DataFrames, CategoricalArrays, Test, Random
Random.seed!(35)
df = DataFrame(x=rand(1:10, 100),
                y=categorical(rand(10:15, 100)),
                z=categorical(rand(0:20, 100)))
df.y2 = reverse(df.y) # Same levels
gd = groupby(df, :x)

combine(gd, [:x, :y, :z] => ((x, y, z) -> x[1] <= 5 ? y[1] : z[1]) => :res)

The problem is that combine calls setindex! on the same CategoricalArray from two different threads at the same time, and they expand the pool and recompute reference codes in parallel. Not sure why it only happens on Julia nightly, it's probably just random.

I can easily fix this using a lock (tested locally). But it's harder to define a more general policy regarding thread safety. Taking a lock for expensive operations is fine, but it's unacceptable for getindex. Yet, getindex needs the pool and refs to match, which is exceptionally not the case when merging two pools and recomputing refs (just like when calling levels!). Maybe a reasonable policy would be that we guarantee thread safety except when reordering levels, which includes mixing read and write operations from arrays with different levels.

EDIT: Adding new levels while doing write operations from another thread is also unsafe as the Dict may be in an invalid state. Not sure whether there exists an efficient parallel dict implementation, or whether we would have to take a lock too (which would have a significant performance cost).

@bkamins
Copy link
Member Author

bkamins commented Apr 19, 2024

Thank you for looking into this. I will also think what could be done.

@bkamins
Copy link
Member Author

bkamins commented Apr 20, 2024

I thought a bit about it. The possible solution is as follows:

  • by default guarantee as much thread safety as possible but not always; be explicit about it (just as we added the warning to https://github.com/JuliaData/PooledArrays.jl)
  • both here, and maybe also in https://github.com/JuliaData/PooledArrays.jl add a thread-safe mode. I.e. a separate set of operations that are slower but thread safe. Then user could choose this version in case user knows the operation is done in multi-threaded mode. It is a bit heavy-handed, but maybe it is acceptable? Unfortunately we do not have a general mechanism to signal multi-threaded context (I think @jpsamaroo some time ago proposed such general mechanism, but I think it is available).
  • an alternative, again not very nice is in DataFrames.jl to detect pooled and categorical vectors and for them manually disable multi-threading for unsafe operations.

@nalimilan
Copy link
Member

Yes, that would be OK if we can manage to make at least a reasonable set of operations thread-safe. That's already the case for read operations, but it's hard to achieve for writes that may add levels. This seems problematic in particular due to combine using multithreading.

It seems silly not to make CategoricalArrays mostly thread-safe, as in theory that should be possible without sacrificing performance in all cases except when reordering existing levels. Indeed, apart from this, the only problem is when adding levels (at the end) and the dict needs to be resized, which puts it temporarily in an inconsistent state. But this situation is relatively rare so it would even be acceptable performance-wise to create a new dict instead to ensure that another thread can access a valid dict (old or new) at any moment. Unfortunately, I don't see a clean way to check whether the dict is going to be resized before adding a new key. (A super-simple solution would be to call sizehint!(d, 256) and say that it's thread-safe up to 256 levels, with a way to configure this, but that's quite ad-hoc.)

We could use ConcurrentDict from ConcurrentCollections.jl, which allows multithreaded access. I've done a few tests, it doesn't seem slower for setindex with a String, but about 60% slower for Int. Given that the main use case is String that may be acceptable.

That would still not fix setindex! with a CategoricalValue as in the OP though. As I said we can take a lock when assigning a CategoricalValue requires changing levels as it's a rare event, but if another thread makes an assignment which does not require changing levels, it may end up using outdated refcodes. I wonder whether with some thought we could fix this by taking a lock, but only when the CategoricalValue is from a non-equal pool (which is slow anyway).

  • both here, and maybe also in https://github.com/JuliaData/PooledArrays.jl add a thread-safe mode. I.e. a separate set of operations that are slower but thread safe. Then user could choose this version in case user knows the operation is done in multi-threaded mode. It is a bit heavy-handed, but maybe it is acceptable? Unfortunately we do not have a general mechanism to signal multi-threaded context (I think @jpsamaroo some time ago proposed such general mechanism, but I think it is available).

That's interesting. Indeed if combine could set this, we wouldn't have to pay the overhead in non multithreading contexts. But the result would be that we would have to take a lock on each setindex! operations, which would essentially be equivalent to disabling multithreading in the combine example above (and even slower due to the overhead).

  • an alternative, again not very nice is in DataFrames.jl to detect pooled and categorical vectors and for them manually disable multi-threading for unsafe operations.

Yeah, it would make sense to add some traits to ArrayInterface.jl to detect whether reads and/or writes are thread-safe. Then combine could disable multithreading if one of the output (resp., input) column types isn't thread-safe for writes (resp. reads).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants