diff --git a/.github/workflows/BenchmarksAndMicroIntegration.yml b/.github/workflows/BenchmarksAndMicroIntegration.yml new file mode 100644 index 00000000..67b8b421 --- /dev/null +++ b/.github/workflows/BenchmarksAndMicroIntegration.yml @@ -0,0 +1,40 @@ +name: Benchmarks and MicroIntegration + +on: + push: + branches: + - master + pull_request: + +jobs: + test: + name: Benchmarks and MicroIntegration + runs-on: ubuntu-latest + strategy: + fail-fast: false + steps: + - uses: actions/checkout@v2 + - uses: julia-actions/setup-julia@v1 + with: + version: 1 + arch: x64 + - uses: julia-actions/julia-buildpkg@latest + - name: setup enviroment + shell: julia --color=yes --project=perf {0} + run: | + using Pkg + try + # force it to use this PR's version of the package + pkg"add Turing#hg/new-libtask2" # TODO: remove this when Turing is updated + Pkg.develop(PackageSpec(path=".")) # resolver may fail with main deps + Pkg.update() + catch err + err isa Pkg.Resolve.ResolverError || rethrow() + # If we can't resolve that means this is incompatible by SemVer and this is fine + # It means we marked this as a breaking change, so we don't need to worry about + # Mistakenly introducing a breaking change, as we have intentionally made one + @info "Not compatible with this release. No problem." exception=err + exit(0) # Exit immediately, as a success + end + - name: run + run: julia --color=yes --project=perf perf/runtests.jl diff --git a/Project.toml b/Project.toml index 7166d770..544296e8 100644 --- a/Project.toml +++ b/Project.toml @@ -3,10 +3,11 @@ uuid = "6f1fad26-d15e-5dc8-ae53-837a1d7b8c9f" license = "MIT" desc = "Tape based task copying in Turing" repo = "https://github.com/TuringLang/Libtask.jl.git" -version = "0.6.2" +version = "0.6.3" [deps] IRTools = "7869d1d1-7146-5819-86e3-90919afe41df" +LRUCache = "8ac3fa9e-de4c-5943-b1dc-09c6b5f20637" LinearAlgebra = "37e2e46d-f89d-539d-b4ee-838fcccc9c8e" MacroTools = "1914dd2f-81c6-5fcd-8719-6d5c9610ff09" Statistics = "10745b16-79ce-11e8-11f9-7d13ad32a3b2" diff --git a/perf/Project.toml b/perf/Project.toml new file mode 100644 index 00000000..9e9ab49b --- /dev/null +++ b/perf/Project.toml @@ -0,0 +1,14 @@ +[deps] +AbstractMCMC = "80f14c24-f653-4e6a-9b94-39d6b0f70001" +AdvancedPS = "576499cb-2369-40b2-a588-c64705576edc" +BenchmarkTools = "6e4b80f9-dd63-53aa-95a3-0cdb28fa8baf" +DynamicPPL = "366bfd00-2699-11ea-058f-f148b4cae6d8" +Libtask = "6f1fad26-d15e-5dc8-ae53-837a1d7b8c9f" +Test = "8dfed614-e22c-5e08-85e1-65c5234f0b40" +Turing = "fce5fe82-541a-59a6-adf8-730c64b5f9a0" + +[compat] +julia = "1.3" + +[targets] +test = ["Test", "BenchmarkTools"] diff --git a/perf/p0.jl b/perf/p0.jl new file mode 100644 index 00000000..b2ae3c57 --- /dev/null +++ b/perf/p0.jl @@ -0,0 +1,47 @@ +# ]add Turing#hg/new-libtask2 + +using Libtask +using Turing, DynamicPPL, AdvancedPS +using BenchmarkTools + +@model gdemo(x, y) = begin + # Assumptions + σ ~ InverseGamma(2,3) + μ ~ Normal(0,sqrt(σ)) + # Observations + x ~ Normal(μ, sqrt(σ)) + y ~ Normal(μ, sqrt(σ)) +end + + +# Case 1: Sample from the prior. + +m = Turing.Core.TracedModel(gdemo(1.5, 2.), SampleFromPrior(), VarInfo()) + +f = m.evaluator[1]; + +args = m.evaluator[2:end]; + +@show "Directly call..." +@btime f(args...) +# (2.0, VarInfo (2 variables (μ, σ), dimension 2; logp: -6.162)) + +@show "CTask construction..." +t = @btime Libtask.CTask(f, args...) +# schedule(t.task) # work fine! +# @show Libtask.result(t.tf.tape) +@show "Step in a tape..." +@btime Libtask.step_in(t.tf.tape, args) + +# Case 2: SMC sampler + +m = Turing.Core.TracedModel(gdemo(1.5, 2.), Sampler(SMC(50)), VarInfo()); +@show "Directly call..." +@btime m.evaluator[1](m.evaluator[2:end]...) + +@show "CTask construction..." +t = @btime Libtask.CTask(m.evaluator[1], m.evaluator[2:end]...); +# schedule(t.task) +# @show Libtask.result(t.tf.tape) +@show "Step in a tape..." +@btime Libtask.step_in(t.tf.tape, m.evaluator[2:end]) diff --git a/perf/p1.jl b/perf/p1.jl new file mode 100644 index 00000000..4ecd2ec8 --- /dev/null +++ b/perf/p1.jl @@ -0,0 +1,39 @@ +using Turing, Test, AbstractMCMC, DynamicPPL, Random + +import AbstractMCMC.AbstractSampler + +function check_numerical(chain, + symbols::Vector, + exact_vals::Vector; + atol=0.2, + rtol=0.0) + for (sym, val) in zip(symbols, exact_vals) + E = val isa Real ? + mean(chain[sym]) : + vec(mean(chain[sym], dims=1)) + @info (symbol=sym, exact=val, evaluated=E) + @test E ≈ val atol=atol rtol=rtol + end +end + +function check_MoGtest_default(chain; atol=0.2, rtol=0.0) + check_numerical(chain, + [:z1, :z2, :z3, :z4, :mu1, :mu2], + [1.0, 1.0, 2.0, 2.0, 1.0, 4.0], + atol=atol, rtol=rtol) +end + +@model gdemo_d(x, y) = begin + s ~ InverseGamma(2, 3) + m ~ Normal(0, sqrt(s)) + x ~ Normal(m, sqrt(s)) + y ~ Normal(m, sqrt(s)) + return s, m +end + +alg = CSMC(15) +chain = sample(gdemo_d(1.5, 2.0), alg, 5_000) + +@show chain + +check_numerical(chain, [:s, :m], [49/24, 7/6], atol=0.1) diff --git a/perf/p2.jl b/perf/p2.jl new file mode 100644 index 00000000..44fd61a7 --- /dev/null +++ b/perf/p2.jl @@ -0,0 +1,63 @@ +using Turing, Test, AbstractMCMC, DynamicPPL, Random, Turing.RandomMeasures, Libtask + +@model infiniteGMM(x) = begin + # Hyper-parameters, i.e. concentration parameter and parameters of H. + α = 1.0 + μ0 = 0.0 + σ0 = 1.0 + + # Define random measure, e.g. Dirichlet process. + rpm = DirichletProcess(α) + + # Define the base distribution, i.e. expected value of the Dirichlet process. + H = Normal(μ0, σ0) + + # Latent assignment. + z = tzeros(Int, length(x)) + + # Locations of the infinitely many clusters. + μ = tzeros(Float64, 0) + + for i in 1:length(x) + + # Number of clusters. + K = maximum(z) + nk = Vector{Int}(map(k -> sum(z .== k), 1:K)) + + # Draw the latent assignment. + z[i] ~ ChineseRestaurantProcess(rpm, nk) + + # Create a new cluster? + if z[i] > K + push!(μ, 0.0) + + # Draw location of new cluster. + μ[z[i]] ~ H + end + + # Draw observation. + x[i] ~ Normal(μ[z[i]], 1.0) + end +end + +# Generate some test data. +Random.seed!(1) + +data = vcat(randn(10), randn(10) .- 5, randn(10) .+ 10) +data .-= mean(data) +data /= std(data) + +# MCMC sampling +Random.seed!(2) +iterations = 500 +model_fun = infiniteGMM(data) + +m = Turing.Core.TracedModel(model_fun, Sampler(SMC(50)), VarInfo()) +f = m.evaluator[1] +args = m.evaluator[2:end] + +t = Libtask.CTask(f, args...) + +Libtask.step_in(t.tf.tape, args) + +@show Libtask.result(t.tf.tape) diff --git a/perf/runtests.jl b/perf/runtests.jl new file mode 100644 index 00000000..9856db08 --- /dev/null +++ b/perf/runtests.jl @@ -0,0 +1,3 @@ +include("p0.jl") +include("p1.jl") +include("p2.jl") diff --git a/src/Libtask.jl b/src/Libtask.jl index 12a8c516..59795b98 100644 --- a/src/Libtask.jl +++ b/src/Libtask.jl @@ -3,6 +3,8 @@ module Libtask using IRTools using MacroTools +using LRUCache + export CTask, consume, produce export TArray, tzeros, tfill, TRef diff --git a/src/tapedfunction.jl b/src/tapedfunction.jl index 083cfccc..10d27dc3 100644 --- a/src/tapedfunction.jl +++ b/src/tapedfunction.jl @@ -6,6 +6,11 @@ mutable struct Tape owner end +""" + Instruction + +An `Instruction` stands for a function call +""" mutable struct Instruction{F} <: AbstractInstruction fun::F input::Tuple @@ -46,6 +51,10 @@ function Base.show(io::IO, box::Box) println(io, "Box($(box.val))") end +function Base.show(io::IO, instruction::AbstractInstruction) + println(io, "A $(typeof(instruction))") +end + function Base.show(io::IO, instruction::Instruction) fun = instruction.fun tape = instruction.tape @@ -53,6 +62,9 @@ function Base.show(io::IO, instruction::Instruction) end function Base.show(io::IO, tp::Tape) + # we use an extra IOBuffer to collect all the data and then + # output it once to avoid output interrupt during task context + # switching buf = IOBuffer() print(buf, "$(length(tp))-element Tape") isempty(tp) || println(buf, ":") @@ -66,10 +78,30 @@ function Base.show(io::IO, tp::Tape) end function (instr::Instruction{F})() where F - output = instr.fun(map(val, instr.input)...) - instr.output.val = output + # catch run-time exceptions / errors. + try + output = instr.fun(map(val, instr.input)...) + instr.output.val = output + catch e + println(e, catch_backtrace()); + rethrow(e); + end end +function _new end +function (instr::Instruction{typeof(_new)})() + # catch run-time exceptions / errors. + try + expr = Expr(:new, map(val, instr.input)...) + output = eval(expr) + instr.output.val = output + catch e + println(e, catch_backtrace()); + rethrow(e); + end +end + + function increase_counter!(t::Tape) t.counter > length(t) && return # instr = t[t.counter] @@ -101,6 +133,19 @@ function run_and_record!(tape::Tape, f, args...) return output end +function run_and_record!(tape::Tape, ::typeof(_new), args...) + output = try + expr = Expr(:new, map(val, args)...) + box(eval(expr)) + catch e + @warn e + Box{Any}(nothing) + end + ins = Instruction(_new, args, output, tape) + push!(tape, ins) + return output +end + function unbox_condition(ir) for blk in IRTools.blocks(ir) vars = keys(blk) @@ -169,9 +214,15 @@ function intercept(ir; recorder=:run_and_record!) for (x, st) in ir x == tape && continue - Meta.isexpr(st.expr, :call) || continue - new_args = (x == args_var) ? st.expr.args : _replace_args(st.expr.args, arg_pairs) - ir[x] = IRTools.xcall(@__MODULE__, recorder, tape, new_args...) + if Meta.isexpr(st.expr, :call) + new_args = (x == args_var) ? st.expr.args : _replace_args(st.expr.args, arg_pairs) + ir[x] = IRTools.xcall(@__MODULE__, recorder, tape, new_args...) + elseif Meta.isexpr(st.expr, :new) + args = st.expr.args + ir[x] = IRTools.xcall(@__MODULE__, recorder, tape, _new, args...) + else + @warn "Unknown IR code: " st + end end # the real return value will be in the last instruction on the tape IRTools.return!(ir, tape) @@ -190,6 +241,13 @@ mutable struct TapedFunction end end +function reset!(tf::TapedFunction, ir::IRTools.IR, tape::Tape) + tf.ir = ir + tf.tape = tape + setowner!(tape, tf) + return tf +end + function (tf::TapedFunction)(args...) if isempty(tf.tape) ir = IRTools.@code_ir tf.func(args...) diff --git a/src/tapedtask.jl b/src/tapedtask.jl index e6da976d..72af7d2e 100644 --- a/src/tapedtask.jl +++ b/src/tapedtask.jl @@ -16,9 +16,22 @@ struct TapedTask end end +const TRCache = LRU{Any, Any}(maxsize=10) + function TapedTask(tf::TapedFunction, args...) - tf.owner != nothing && error("TapedFunction is owned to another task.") - isempty(tf.tape) && tf(args...) + tf.owner !== nothing && error("TapedFunction is owned by another task.") + if isempty(tf.tape) + cache_key = (tf.func, typeof.(args)...) + if haskey(TRCache, cache_key) + ir, tape = TRCache[cache_key] + # Here we don't need change the initial arguments of the tape, + # it will be set when we `step_in` to the tape. + reset!(tf, ir, copy(tape, Dict{UInt64, Any}(); partial=false)) + else + tf(args...) + TRCache[cache_key] = (tf.ir, tf.tape) + end + end produce_ch = Channel() consume_ch = Channel{Int}() task = @task try @@ -199,14 +212,16 @@ function Base.copy(x::Instruction, on_tape::Tape, roster::Dict{UInt64, Any}) Instruction(x.fun, input, output, on_tape) end -function Base.copy(t::Tape, roster::Dict{UInt64, Any}) +function Base.copy(t::Tape, roster::Dict{UInt64, Any}; partial=true) old_data = t.tape - new_data = Vector{AbstractInstruction}() - new_tape = Tape(new_data, t.counter, t.owner) + len = partial ? length(old_data) - t.counter + 1 : length(old_data) + start = partial ? t.counter : 1 + new_data = Vector{AbstractInstruction}(undef, len) + new_tape = Tape(new_data, 1, t.owner) - for x in old_data + for (i, x) in enumerate(old_data[start:end]) new_ins = copy(x, new_tape, roster) - push!(new_data, new_ins) + new_data[i] = new_ins end return new_tape diff --git a/src/tarray.jl b/src/tarray.jl index 507f59c5..9c3c2fba 100644 --- a/src/tarray.jl +++ b/src/tarray.jl @@ -37,6 +37,9 @@ TArray{T,N}(::UndefInitializer, d::Vararg{<:Integer,N}) where {T,N} = TArray{T,N TArray{T,N}(dim::NTuple{N,Int}) where {T,N} = TArray(T, dim) TArray(T::Type, dim) = TArray(Array{T}(undef, dim)) +localize(x) = x +localize(x::AbstractArray) = TArray(x) +getdata(x) = x getdata(x::TArray) = x.data tape_copy(x::TArray) = TArray(deepcopy(x.data)) @@ -166,70 +169,70 @@ end # Other methods from stdlib Base.view(x::TArray, inds...; kwargs...) = - Base.view(getdata(x), inds...; kwargs...) |> TArray -Base.:-(x::TArray) = (-getdata(x)) |> TArray -Base.transpose(x::TArray) = transpose(getdata(x)) |> TArray -Base.adjoint(x::TArray) = adjoint(getdata(x)) |> TArray -Base.repeat(x::TArray; kw...) = repeat(getdata(x); kw...) |> TArray + Base.view(getdata(x), inds...; kwargs...) |> localize +Base.:-(x::TArray) = (-getdata(x)) |> localize +Base.transpose(x::TArray) = transpose(getdata(x)) |> localize +Base.adjoint(x::TArray) = adjoint(getdata(x)) |> localize +Base.repeat(x::TArray; kw...) = repeat(getdata(x); kw...) |> localize Base.hcat(xs::Union{TArray{T,1}, TArray{T,2}}...) where T = - hcat(getdata.(xs)...) |> TArray + hcat(getdata.(xs)...) |> localize Base.vcat(xs::Union{TArray{T,1}, TArray{T,2}}...) where T = - vcat(getdata.(xs)...) |> TArray + vcat(getdata.(xs)...) |> localize Base.cat(xs::Union{TArray{T,1}, TArray{T,2}}...; dims) where T = - cat(getdata.(xs)...; dims = dims) |> TArray + cat(getdata.(xs)...; dims = dims) |> localize -Base.reshape(x::TArray, dims::Union{Colon,Int}...) = reshape(getdata(x), dims) |> TArray +Base.reshape(x::TArray, dims::Union{Colon,Int}...) = reshape(getdata(x), dims) |> localize Base.reshape(x::TArray, dims::Tuple{Vararg{Union{Int,Colon}}}) = - reshape(getdata(x), Base._reshape_uncolon(getdata(x), dims)) |> TArray -Base.reshape(x::TArray, dims::Tuple{Vararg{Int}}) = reshape(getdata(x), dims) |> TArray - -Base.permutedims(x::TArray, perm) = permutedims(getdata(x), perm) |> TArray -Base.PermutedDimsArray(x::TArray, perm) = PermutedDimsArray(getdata(x), perm) |> TArray -Base.reverse(x::TArray; dims) = reverse(getdata(x), dims = dims) |> TArray - -Base.sum(x::TArray; dims = :) = sum(getdata(x), dims = dims) |> TArray -Base.sum(f::Union{Function,Type},x::TArray) = sum(f.(getdata(x))) |> TArray -Base.prod(x::TArray; dims=:) = prod(getdata(x); dims=dims) |> TArray -Base.prod(f::Union{Function, Type}, x::TArray) = prod(f.(getdata(x))) |> TArray - -Base.findfirst(x::TArray, args...) = findfirst(getdata(x), args...) |> TArray -Base.maximum(x::TArray; dims = :) = maximum(getdata(x), dims = dims) |> TArray -Base.minimum(x::TArray; dims = :) = minimum(getdata(x), dims = dims) |> TArray - -Base.:/(x::TArray, y::TArray) = getdata(x) / getdata(y) |> TArray -Base.:/(x::AbstractArray, y::TArray) = x / getdata(y) |> TArray -Base.:/(x::TArray, y::AbstractArray) = getdata(x) / y |> TArray -Base.:\(x::TArray, y::TArray) = getdata(x) \ getdata(y) |> TArray -Base.:\(x::AbstractArray, y::TArray) = x \ getdata(y) |> TArray -Base.:\(x::TArray, y::AbstractArray) = getdata(x) \ y |> TArray -Base.:*(x::TArray, y::TArray) = getdata(x) * getdata(y) |> TArray -Base.:*(x::AbstractArray, y::TArray) = x * getdata(y) |> TArray -Base.:*(x::TArray, y::AbstractArray) = getdata(x) * y |> TArray + reshape(getdata(x), Base._reshape_uncolon(getdata(x), dims)) |> localize +Base.reshape(x::TArray, dims::Tuple{Vararg{Int}}) = reshape(getdata(x), dims) |> localize + +Base.permutedims(x::TArray, perm) = permutedims(getdata(x), perm) |> localize +Base.PermutedDimsArray(x::TArray, perm) = PermutedDimsArray(getdata(x), perm) |> localize +Base.reverse(x::TArray; dims) = reverse(getdata(x), dims = dims) |> localize + +Base.sum(x::TArray; dims = :) = sum(getdata(x), dims = dims) |> localize +Base.sum(f::Union{Function,Type},x::TArray) = sum(f.(getdata(x))) |> localize +Base.prod(x::TArray; dims=:) = prod(getdata(x); dims=dims) |> localize +Base.prod(f::Union{Function, Type}, x::TArray) = prod(f.(getdata(x))) |> localize + +Base.findfirst(x::TArray, args...) = findfirst(getdata(x), args...) |> localize +Base.maximum(x::TArray; dims = :) = maximum(getdata(x), dims = dims) |> localize +Base.minimum(x::TArray; dims = :) = minimum(getdata(x), dims = dims) |> localize + +Base.:/(x::TArray, y::TArray) = getdata(x) / getdata(y) |> localize +Base.:/(x::AbstractArray, y::TArray) = x / getdata(y) |> localize +Base.:/(x::TArray, y::AbstractArray) = getdata(x) / y |> localize +Base.:\(x::TArray, y::TArray) = getdata(x) \ getdata(y) |> localize +Base.:\(x::AbstractArray, y::TArray) = x \ getdata(y) |> localize +Base.:\(x::TArray, y::AbstractArray) = getdata(x) \ y |> localize +Base.:*(x::TArray, y::TArray) = getdata(x) * getdata(y) |> localize +Base.:*(x::AbstractArray, y::TArray) = x * getdata(y) |> localize +Base.:*(x::TArray, y::AbstractArray) = getdata(x) * y |> localize # broadcast Base.BroadcastStyle(::Type{<:TArray}) = Broadcast.ArrayStyle{TArray}() -Broadcast.broadcasted(::Broadcast.ArrayStyle{TArray}, f, args...) = f.(getdata.(args)...) |> TArray +Broadcast.broadcasted(::Broadcast.ArrayStyle{TArray}, f, args...) = f.(getdata.(args)...) |> localize import LinearAlgebra import LinearAlgebra: \, /, inv, det, logdet, logabsdet, norm -LinearAlgebra.inv(x::TArray) = inv(getdata(x)) |> TArray -LinearAlgebra.det(x::TArray) = det(getdata(x)) |> TArray -LinearAlgebra.logdet(x::TArray) = logdet(getdata(x)) |> TArray -LinearAlgebra.logabsdet(x::TArray) = logabsdet(getdata(x)) |> TArray +LinearAlgebra.inv(x::TArray) = inv(getdata(x)) |> localize +LinearAlgebra.det(x::TArray) = det(getdata(x)) |> localize +LinearAlgebra.logdet(x::TArray) = logdet(getdata(x)) |> localize +LinearAlgebra.logabsdet(x::TArray) = logabsdet(getdata(x)) |> localize LinearAlgebra.norm(x::TArray, p::Real = 2) = - LinearAlgebra.norm(getdata(x), p) |> TArray + LinearAlgebra.norm(getdata(x), p) |> localize import LinearAlgebra: dot -dot(x::TArray, ys::TArray) = dot(getdata(x), getdata(ys)) |> TArray -dot(x::AbstractArray, ys::TArray) = dot(x, getdata(ys)) |> TArray -dot(x::TArray, ys::AbstractArray) = dot(getdata(x), ys) |> TArray +dot(x::TArray, ys::TArray) = dot(getdata(x), getdata(ys)) |> localize +dot(x::AbstractArray, ys::TArray) = dot(x, getdata(ys)) |> localize +dot(x::TArray, ys::AbstractArray) = dot(getdata(x), ys) |> localize using Statistics -Statistics.mean(x::TArray; dims = :) = mean(getdata(x), dims = dims) |> TArray -Statistics.std(x::TArray; kw...) = std(getdata(x), kw...) |> TArray +Statistics.mean(x::TArray; dims = :) = mean(getdata(x), dims = dims) |> localize +Statistics.std(x::TArray; kw...) = std(getdata(x), kw...) |> localize # TODO # * NNlib diff --git a/test/runtests.jl b/test/runtests.jl index 2749827a..24c28f6e 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -1,6 +1,7 @@ using Libtask using Test +include("tf.jl") include("ctask.jl") include("tarray.jl") include("tref.jl") diff --git a/test/tf.jl b/test/tf.jl new file mode 100644 index 00000000..53ac57ca --- /dev/null +++ b/test/tf.jl @@ -0,0 +1,17 @@ +using Libtask + +@testset "tapedfunction" begin + # Test case 1: stack allocated objects are deep copied. + @testset "Instruction{typeof(_new)}" begin + mutable struct S + i::Int + S(x, y) = new(x + y) + end + + tf = Libtask.TapedFunction(S) + s1 = tf(1, 2) + @test s1.i == 3 + newins = findall(x -> isa(x, Libtask.Instruction{typeof(Libtask._new)}), tf.tape.tape) + @test length(newins) == 1 + end +end