diff --git a/Project.toml b/Project.toml index 419a5516..56b55132 100644 --- a/Project.toml +++ b/Project.toml @@ -31,9 +31,9 @@ AbstractTrees = "^0.3.0, 0.4" ArraysOfArrays = "^0.5.3, ^0.6" Arrow = "2 - 2.5" BitIntegers = "^0.2.6, ^0.3" -CodecLz4 = "^0.3.0, ^0.4.0" -CodecXz = "^0.6.0, ^0.7.0" -CodecZstd = "^0.6.0, ^0.7.0" +CodecLz4 = "^0.3, ^0.4" +CodecXz = "^0.6, ^0.7" +CodecZstd = "^0.6, ^0.7, ^0.8" HTTP = "^0.9.7, 1" IterTools = "^1" LRUCache = "^1.3.0" diff --git a/src/RNTuple/highlevel.jl b/src/RNTuple/highlevel.jl index a1b95594..41848e24 100644 --- a/src/RNTuple/highlevel.jl +++ b/src/RNTuple/highlevel.jl @@ -19,9 +19,10 @@ struct RNTupleField{R, F, O, E} <: AbstractVector{E} function RNTupleField(rn::R, field::F) where {R, F} O = _field_output_type(F) E = eltype(O) - buffers = Vector{O}(undef, Threads.nthreads()) - thread_locks = [ReentrantLock() for _ in 1:Threads.nthreads()] - buffer_ranges = [0:-1 for _ in 1:Threads.nthreads()] + Nthreads = _maxthreadid() + buffers = Vector{O}(undef, Nthreads) + thread_locks = [ReentrantLock() for _ in 1:Nthreads] + buffer_ranges = [0:-1 for _ in 1:Nthreads] new{R, F, O, E}(rn, field, buffers, thread_locks, buffer_ranges) end end diff --git a/src/UnROOT.jl b/src/UnROOT.jl index 3b6b5dbd..14236ec5 100644 --- a/src/UnROOT.jl +++ b/src/UnROOT.jl @@ -64,6 +64,13 @@ include("RNTuple/displays.jl") # show(devnull, df) # show(devnull, df[1]) # end +# + +_maxthreadid() = @static if VERSION < v"1.9" + Threads.nthreads() +else + Threads.maxthreadid() +end if VERSION >= v"1.9" let diff --git a/src/iteration.jl b/src/iteration.jl index b4714e9b..b6f8832b 100644 --- a/src/iteration.jl +++ b/src/iteration.jl @@ -123,11 +123,12 @@ mutable struct LazyBranch{T,J,B} <: AbstractVector{T} _buffer = VectorOfVectors(T(), Int32[1]) T = SubArray{eltype(T), 1, T, Tuple{UnitRange{Int64}}, true} end + Nthreads = _maxthreadid() return new{T,J,typeof(_buffer)}(f, b, length(b), b.fBasketEntry, - [_buffer for _ in 1:Threads.nthreads()], - [ReentrantLock() for _ in 1:Threads.nthreads()], - [0:-1 for _ in 1:Threads.nthreads()]) + [_buffer for _ in 1:Nthreads], + [ReentrantLock() for _ in 1:Nthreads], + [0:-1 for _ in 1:Nthreads]) end end LazyBranch(f::ROOTFile, s::AbstractString) = LazyBranch(f, f[s]) diff --git a/test/rntuple_tests.jl b/test/rntuple_tests.jl index 1506b0f7..9b3d9b6b 100644 --- a/test/rntuple_tests.jl +++ b/test/rntuple_tests.jl @@ -157,8 +157,17 @@ end Threads.@threads for i in eachindex(field) @inbounds accumulator[Threads.threadid()] += field[i] end + # test we've hit each thread's buffer - @test all(!isempty, field.buffers) + @test all( + map(eachindex(field.buffers)) do b + if !isassigned(field.buffers, b) + return true + else + return !isempty(field.buffers[b]) + end + + end) @test sum(accumulator) == sum(1:5e4) accumulator .= 0 diff --git a/test/runtests.jl b/test/runtests.jl index 4edb1547..babb8fc2 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -4,7 +4,7 @@ using StaticArrays using InteractiveUtils using MD5 -const nthreads = Threads.nthreads() +const nthreads = UnROOT._maxthreadid() nthreads == 1 && @warn "Running on a single thread. Please re-run the test suite with at least two threads (`julia --threads 2 ...`)" const SAMPLES_DIR = joinpath(@__DIR__, "samples") @@ -766,18 +766,12 @@ end if get(ENV, "CI", "false") == "true" - if nthreads >= 1 - @test Threads.nthreads()>1 - else + if nthreads == 1 @warn "CI wasn't run with multiple threads" end end - nmus = if isdefined(Threads, :maxthreadid) - zeros(Int, Threads.maxthreadid()) - else - zeros(Int, Threads.nthreads()) - end + nmus = zeros(Int, nthreads) Threads.@threads for i in 1:length(t) nmus[Threads.threadid()] += length(t.Muon_pt[i])