From 9f09ea93e3b3958144d71794fda107e183e06d79 Mon Sep 17 00:00:00 2001 From: Alexey Stukalov Date: Tue, 4 Aug 2015 16:35:43 +0300 Subject: [PATCH] add ParallelPopulationOptimizer --- src/BlackBoxOptim.jl | 2 + src/optimization_methods.jl | 1 + src/parallel_population_optimizer.jl | 362 +++++++++++++++++++++ test/helper.jl | 5 + test/runtests.jl | 1 + test/test_parallel_population_optimizer.jl | 26 ++ test/test_smoketest_bboptimize.jl | 8 +- 7 files changed, 401 insertions(+), 4 deletions(-) create mode 100644 src/parallel_population_optimizer.jl create mode 100644 test/test_parallel_population_optimizer.jl diff --git a/src/BlackBoxOptim.jl b/src/BlackBoxOptim.jl index 3d3980be..02d36430 100644 --- a/src/BlackBoxOptim.jl +++ b/src/BlackBoxOptim.jl @@ -4,6 +4,7 @@ using Distributions, StatsBase, Compat export Optimizer, AskTellOptimizer, SteppingOptimizer, PopulationOptimizer, bboptimize, bbsetup, compare_optimizers, + ParallelPopulationOptimizer, DiffEvoOpt, de_rand_1_bin, de_rand_1_bin_radiuslimited, adaptive_de_rand_1_bin, adaptive_de_rand_1_bin_radiuslimited, @@ -177,6 +178,7 @@ include("resampling_memetic_search.jl") include("simultaneous_perturbation_stochastic_approximation.jl") include("generating_set_search.jl") include("direct_search_with_probabilistic_descent.jl") +include("parallel_population_optimizer.jl") # Fitness # include("fitness/fitness_types.jl") FIXME merge it with fitness.jl diff --git a/src/optimization_methods.jl b/src/optimization_methods.jl index 8c95e473..992d2d68 100644 --- a/src/optimization_methods.jl +++ b/src/optimization_methods.jl @@ -14,6 +14,7 @@ const ValidMethods = @compat Dict{Symbol,Union(Any,Function)}( :simultaneous_perturbation_stochastic_approximation => SimultaneousPerturbationSA2, :generating_set_search => GeneratingSetSearcher, :probabilistic_descent => direct_search_probabilistic_descent, + :parallel_population_optimizer => parallel_population_optimizer, ) const MethodNames = sort!(collect(keys(ValidMethods))) diff --git a/src/parallel_population_optimizer.jl b/src/parallel_population_optimizer.jl new file mode 100644 index 00000000..9117c86f --- /dev/null +++ b/src/parallel_population_optimizer.jl @@ -0,0 +1,362 @@ +const ParallelPopulationOptimizer_DefaultParameters = @compat Dict{Symbol,Any}( + :WorkerMethod => :adaptive_de_rand_1_bin_radiuslimited, # worker population optimization method + :NWorkers => 2, # number of workers + :Workers => Vector{Int}(), # IDs of worker processes, takes precedence over NWorkers + :MigrationSize => 1, # number of "migrant" individual to sent to the master + :MigrationPeriod => 100, # number of worker iterations before sending migrants + :ArchiveCapacity => 10, # ParallelPseudoEvaluator archive capacity + :ToWorkerChannelCapacity => 100, # how many unread messages the master->worker channel can store + :FromWorkersChannelCapacity => 1000 # how many unread messages the workers->master channel can store +) + +# metrics from worker optimizer +type WorkerMetrics + num_evals::Int # number of function evals + num_steps::Int # number of steps + num_better::Int # number of steps that improved best fitness + num_migrated::Int # number of migrants worker received + num_better_migrated::Int # number of migrants accepted (because fitness improved) + + WorkerMetrics() = new(0, 0, 0, 0, 0) +end + +function Base.copy!(a::WorkerMetrics, b::WorkerMetrics) + a.num_evals = b.num_evals + a.num_steps = b.num_steps + a.num_better = b.num_better + a.num_migrated = b.num_migrated + a.num_better_migrated = b.num_better_migrated + return a +end + +# fake evaluator for ParallelPopulationOptimizer +# it doesn't evaluate itself, but stores some +# metrics from the workers evaluators +type ParallelPseudoEvaluator{F, P<:OptimizationProblem} <: Evaluator{P} + problem::P + archive::TopListArchive{F} + workers_metrics::Vector{WorkerMetrics} # function evals per worker etc + last_fitness::F +end + +num_evals(ppe::ParallelPseudoEvaluator) = mapreduce(x -> x.num_evals, +, 0, ppe.workers_metrics) +num_better(ppe::ParallelPseudoEvaluator) = mapreduce(x -> x.num_better, +, 0, ppe.workers_metrics) + +function ParallelPseudoEvaluator{P<:OptimizationProblem}( + problem::P, nworkers::Int; + ArchiveCapacity::Int = 10) + ParallelPseudoEvaluator{fitness_type(problem), P}( + problem, + TopListArchive(fitness_scheme(problem), numdims(problem), ArchiveCapacity), + WorkerMetrics[WorkerMetrics() for i in 1:nworkers], + nafitness(fitness_scheme(problem))) +end + +const FINAL_CANDIDATE = -12345 # special terminating candidate with worker index sent by master +const ERROR_CANDIDATE = -67890 # special error candidate with worker index send by worker + +# message with the candidate passed between the workers and the master +immutable CandidateMessage{F} + worker::Int # origin of candidate + metrics::WorkerMetrics # current origin worker metrics + candi::Candidate{F} +end + +typealias WorkerChannel{F} Channel{CandidateMessage{F}} +typealias WorkerChannelRef{F} RemoteRef{WorkerChannel{F}} + +# Parallel population optimizer +# starts nworkers parallel population optimizers. +# At regular interval, the workers send the master process their random population members +# and the master redirects them to the other workers +type ParallelPopulationOptimizer{F, P<:OptimizationProblem} <: SteppingOptimizer + final_fitnesses::Vector{RemoteRef{Channel{Any}}} # references to the @spawnat ID run_worker() + from_workers::WorkerChannelRef{F} + to_workers::Vector{WorkerChannelRef{F}} + is_started::RemoteRef{Channel{Bool}} + evaluator::ParallelPseudoEvaluator{F, P} +end + +nworkers(ppopt::ParallelPopulationOptimizer) = length(ppopt.to_workers) + +# read worker's message, stores the worker metrics and updates best fitness using +function store!{F}(ppe::ParallelPseudoEvaluator{F}, msg::CandidateMessage{F}) + copy!(ppe.workers_metrics[msg.worker], msg.metrics) + if !isnafitness(msg.candi.fitness, fitness_scheme(ppe)) # store only the candidates with the known fitness + add_candidate!(ppe.archive, msg.candi.fitness, msg.candi.params, num_evals(ppe)) + end +end + +# check that worker stil running +# their RemoteRefs should not be ready, +# but if there was exception in the worker, +# it would be thrown into the main thread +function check_workers_running{T}(workers::AbstractVector{RemoteRef{T}}) + for i in eachindex(workers) + if isready(workers[i]) + fetch(workers[i]) # fetch the worker, this should trigger an exception + # no exception, but the worker should not be ready + error("Worker #i has finished before the master shutdown") + end + end + return true +end + +# outer parallel population optimizer constructor that +# also spawns worker tasks +function ParallelPopulationOptimizer{P<:OptimizationProblem}( + problem::P, optimizer_generator::Function; NWorkers::Int = 1, Workers::Vector{Int} = Vector{Int}(), + MigrationSize::Int = 1, MigrationPeriod::Int = 100, + ArchiveCapacity::Int = 10, + ToWorkerChannelCapacity::Int = 1000, + FromWorkersChannelCapacity::Int = (isempty(Workers) ? NWorkers : length(Workers)) * ToWorkerChannelCapacity) + F = fitness_type(problem) + info("Constructing parallel optimizer...") + if isempty(Workers) + # take the first NWorkers workers + Workers = workers()[1:NWorkers] + end + if length(Workers) <= 1 + throw(ArgumentError("ParallelPopulationOptimizer requires at least 2 worker processes")) + end + from_workers = RemoteRef(() -> WorkerChannel{F}(FromWorkersChannelCapacity)) + to_workers = WorkerChannelRef{F}[RemoteRef(() -> WorkerChannel{F}(ToWorkerChannelCapacity), id) for id in Workers] + workers_ready = RemoteRef(() -> Channel{Int}(length(to_workers))) # FIXME do we need to wait for the worker? + is_started = RemoteRef(() -> Channel{Bool}(1)) + final_fitnesses = map(function(i) + procid = Workers[i] + info(" Spawning worker #$i at process #$procid..."); + @spawnat procid run_worker(i, workers_ready, is_started, problem, optimizer_generator, + from_workers, to_workers[i], + MigrationSize, MigrationPeriod) + end, 1:length(to_workers)) + # wait until all the workers are started + info("Waiting for the workers to be ready...") + # FIXME is it required? + nready = 0 + while nready < length(to_workers) + check_workers_running(final_fitnesses) + worker_id = take!(workers_ready) + info(" Worker #$worker_id is ready") + nready += 1 + end + info("All workers ready") + ParallelPopulationOptimizer{F, P}(final_fitnesses, from_workers, to_workers, is_started, + ParallelPseudoEvaluator(problem, length(to_workers); + ArchiveCapacity = ArchiveCapacity)) +end + +function parallel_population_optimizer(problem::OptimizationProblem, parameters::Parameters) + param_dict = convert(ParamsDict, parameters) # FIXME convert to dict to avoid serialization problems of DictChain + params = chain(ParallelPopulationOptimizer_DefaultParameters, parameters) + worker_method = params[:WorkerMethod] + info("Using $worker_method as worker method for parallel optimization") + optimizer_func = ValidMethods[worker_method] + + ParallelPopulationOptimizer(problem, problem -> optimizer_func(problem, param_dict), + NWorkers = params[:NWorkers], Workers = params[:Workers], + MigrationSize = params[:MigrationSize], + MigrationPeriod = params[:MigrationPeriod], + ArchiveCapacity = params[:ArchiveCapacity], + ToWorkerChannelCapacity = params[:ToWorkerChannelCapacity], + FromWorkersChannelCapacity = params[:FromWorkersChannelCapacity]) +end + +# redirects candidate to another worker +function redirect{F}(ppopt::ParallelPopulationOptimizer{F}, msg::CandidateMessage{F}) + # redirect to the other parallel task + #println("redirecting from $(msg.worker)") + recv_ix = sample(1:(length(ppopt.to_workers)-1)) + if recv_ix >= msg.worker # index is the origin worker + recv_ix += 1 + end + @assert (recv_ix != msg.worker) && (1 <= recv_ix <= nworkers(ppopt)) + msg.candi.op = NO_GEN_OP # reset operation and tag to avoid calling adjust!() out-of-context + msg.candi.tag = 0 + put!(ppopt.to_workers[recv_ix], msg) + #println("redirecting done") +end + +function step!{F}(ppopt::ParallelPopulationOptimizer{F}) + #println("main#: n_evals=$(num_evals(ppopt.evaluator))") + if !isready(ppopt.is_started) put!(ppopt.is_started, true) end # if it's the first iteration + last_better = num_better(ppopt.evaluator) + candidate = take!(ppopt.from_workers)::CandidateMessage{F} + if candidate.worker == ERROR_CANDIDATE + check_workers_running(ppopt.final_fitnesses) + error("Exception in the worker, but all workers still running") + end + #println("candidate=$candidate") + store!(ppopt.evaluator, candidate) + redirect(ppopt, candidate) + return num_better(ppopt.evaluator) - last_better +end + +# finalize the master: wait for the workers shutdown, +# get their best candidates +function finalize!{F}(ppopt::ParallelPopulationOptimizer{F}) + info("Finalizing parallel optimizer...") + # send special terminating candidate + for to_worker in ppopt.to_workers + put!(to_worker, CandidateMessage{F}(FINAL_CANDIDATE, WorkerMetrics(), + Candidate{F}(Individual()))) + end + # wait until all threads finish + # the last candidates being sent are the best in the population + info("Waiting for the workers to finish...") + for i in eachindex(ppopt.final_fitnesses) + msg = fetch(ppopt.final_fitnesses[i])::CandidateMessage{F} + @assert msg.worker == i + store!(ppopt.evaluator, msg) # store the best candidate + info("Worker #$(msg.worker) finished") + end + info("Parallel optimizer finished. Metrics per worker: $(ppopt.evaluator.workers_metrics)") +end + +# wraps the worker's population optimizer +# and communicates with the master +type PopulationOptimizerWrapper{F,O<:PopulationOptimizer,E<:Evaluator} + id::Int # worker's Id + optimizer::O + evaluator::E + to_master::WorkerChannelRef{F} # outgoing candidates + from_master::WorkerChannelRef{F} # incoming candidates + migrationSize::Int # size of the migrating group + migrationPeriod::Int # number of iterations between the migrations + + metrics::WorkerMetrics # current metrics + is_stopping::Bool # if the optimizer is being shut down + can_receive::Condition # condition recv_task waits for + can_run::Condition # condition run!() task waits for + recv_task::Task # task that continuously executes recv_immigrants() + + function Base.call{F,O,E}(::Type{PopulationOptimizerWrapper}, + id::Int, optimizer::O, evaluator::E, + to_master::WorkerChannelRef{F}, from_master::WorkerChannelRef{F}, + migrationSize::Int = 1, migrationPeriod::Int = 100) + res = new{F,O,E}(id, optimizer, evaluator, + to_master, from_master, + migrationSize, migrationPeriod, + WorkerMetrics(), false, Condition(), Condition()) + # "background" migrants receiver task + res.recv_task = @schedule while !res.is_stopping + wait(res.can_receive) + recv_immigrants!(res) + notify(res.can_run) + end + return res + end +end + +function send_emigrants{F}(wrapper::PopulationOptimizerWrapper{F}) + pop = population(wrapper.optimizer) + # prepare the group of emigrants + migrant_ixs = sample(1:popsize(pop), wrapper.migrationSize, replace=false) + for migrant_ix in migrant_ixs + migrant = acquire_candi(pop, migrant_ix) + # send them outward + wrapper.metrics.num_evals = num_evals(wrapper.evaluator) + put!(wrapper.to_master, CandidateMessage{F}(wrapper.id, wrapper.metrics, migrant)) + # FIXME check that we the reuse of candidate does not affect + # the migrants while they wait to be sent + release_candi(pop, migrant) + end +end + +function take_master_message!{F}(wrapper::PopulationOptimizerWrapper{F}) + take!(wrapper.from_master)::CandidateMessage{F}, isready(wrapper.from_master) +end + +# receive migrants (called from "background" task) +function recv_immigrants!{F}(wrapper::PopulationOptimizerWrapper{F}) + pop = population(wrapper.optimizer) + # receive all the immigrants in the channel + continue_recv = true + while continue_recv + msg, continue_recv = take_master_message!(wrapper) # here it can hibernate waiting for the immigrant to appear + if msg.worker == FINAL_CANDIDATE # special index sent by master to indicate termination + wrapper.is_stopping = true + break + end + # assign migrants to random population indices + migrant_ix = sample(1:popsize(pop)) + candidates = sizehint!(Vector{candidate_type(pop)}(), 2) + push!(candidates, acquire_candi(pop, migrant_ix)) + push!(candidates, acquire_candi(pop, msg.candi)) + candidates[end].index = migrant_ix # override the incoming index + rank_by_fitness!(wrapper.evaluator, candidates) + wrapper.metrics.num_migrated += 1 + wrapper.metrics.num_better_migrated += tell!(wrapper.optimizer, candidates) + end +end + +# run the wrapper (called in the "main" task) +function run!(wrapper::PopulationOptimizerWrapper) + while !wrapper.is_stopping + if istaskdone(wrapper.recv_task) + error("recv_task has completed prematurely") + end + wrapper.metrics.num_steps += 1 + #println("$(wrapper.metrics.num_steps)-th iteration") + if wrapper.metrics.num_steps % wrapper.migrationPeriod == 0 + send_emigrants(wrapper) # before we started processing + notify(wrapper.can_receive) # switch to migrants receiving task + wait(wrapper.can_run) + end + # normal ask/tell sequence + candidates = ask(wrapper.optimizer) + rank_by_fitness!(wrapper.evaluator, candidates) + wrapper.metrics.num_better += tell!(wrapper.optimizer, candidates) + end + finalize!(wrapper.optimizer, wrapper.evaluator) +end + +# returns the candidate message with final metrics and the best candidate +function final_fitness{F}(wrapper::PopulationOptimizerWrapper{F}) + @assert wrapper.is_stopping + # send the best candidate + pop = population(wrapper.optimizer) + best_candi = acquire_candi(pop) + copy!(best_candi.params, best_candidate(wrapper.evaluator.archive)) + best_candi.fitness = best_fitness(wrapper.evaluator.archive) + best_candi.index = -1 # we don't know it + best_candi.tag = 0 + # HACK send negative worker index to acknowledge the worker is finishing + wrapper.metrics.num_evals = num_evals(wrapper.evaluator) + CandidateMessage{F}(wrapper.id, wrapper.metrics, best_candi) +end + +# Function that the master process spawns at each worker process. +# Creates and run the worker wrapper +function run_worker{F}(id::Int, + worker_ready::RemoteRef{Channel{Int}}, + is_ready::RemoteRef{Channel{Bool}}, + problem::OptimizationProblem, + optimizer_generator::Function, + to_master::WorkerChannelRef{F}, + from_master::WorkerChannelRef{F}, + migrationSize, migrationPeriod) + info("Initializing parallel optimization worker #$id at task=$(myid())") + wrapper = PopulationOptimizerWrapper(id, + optimizer_generator(problem), + ProblemEvaluator(problem), + to_master, from_master, + migrationSize, migrationPeriod) + # create immigrants receiving tasks=# + put!(worker_ready, id) + info("Waiting for the master start signal...") + fetch(is_ready) # wait until the master and other workers are ready + info("Running worker #$id") + try + run!(wrapper) + catch e + # send error candidate to notify about an error and to release + # the master from waiting for worker messages + put!(wrapper.to_master, + CandidateMessage{F}(ERROR_CANDIDATE, WorkerMetrics(), + Candidate{F}(Individual()))) + rethrow(e) + end + info("Worker #$id stopped") + final_fitness(wrapper) # return the best fitness +end diff --git a/test/helper.jl b/test/helper.jl index 673dc15a..3351ae01 100644 --- a/test/helper.jl +++ b/test/helper.jl @@ -3,3 +3,8 @@ using FactCheck using Compat NumTestRepetitions = 100 + +if nprocs() < 4 + addprocs(4-nprocs()) # add procs for parallel population optimizer +end +@everywhere using BlackBoxOptim diff --git a/test/runtests.jl b/test/runtests.jl index 30d76582..575cd2c3 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -18,6 +18,7 @@ my_tests = [ "test_differential_evolution.jl", "test_adaptive_differential_evolution.jl", "test_natural_evolution_strategies.jl", + "test_parallel_population_optimizer.jl", "test_toplevel_bboptimize.jl", "test_smoketest_bboptimize.jl", diff --git a/test/test_parallel_population_optimizer.jl b/test/test_parallel_population_optimizer.jl new file mode 100644 index 00000000..e6b5c89e --- /dev/null +++ b/test/test_parallel_population_optimizer.jl @@ -0,0 +1,26 @@ +facts("Parallel population optimizer") do + +@everywhere using BlackBoxOptim + +context("optimizing small problem") do + rosenbrock2d(x) = (1.0 - x[1])^2 + 100.0 * (x[2] - x[1]^2)^2 + + res = bboptimize(rosenbrock2d; Method = :parallel_population_optimizer, + SearchSpace = [(-5.0, 5.0), (-2.0, 2.0)], MaxTime = 100.0, + ShowTrace = true, MigrationSize = 2, MigrationPeriod = 100) + @fact size(best_candidate(res)) => (2,) + @fact typeof(best_fitness(res)) => Float64 + @fact best_fitness(res) => less_than(100.0) +end + +context("propagating exceptions from workers to the master") do + # 0.01 chance to get domain error + bogus(x) = sqrt(x[1]-0.01) + # 0.01 chance to raise exception prematurely during bbsetup() + res = bbsetup(bogus; Method = :parallel_population_optimizer, + SearchSpace = [(0.0, 1.0)], MaxTime = 100.0, + ShowTrace = true, MigrationSize = 2, MigrationPeriod = 100) + @fact_throws RemoteException bboptimize(res) +end + +end diff --git a/test/test_smoketest_bboptimize.jl b/test/test_smoketest_bboptimize.jl index e9a1c5ab..b27b1cfb 100644 --- a/test/test_smoketest_bboptimize.jl +++ b/test/test_smoketest_bboptimize.jl @@ -1,8 +1,8 @@ -function rosenbrock2d(x) - return (1.0 - x[1])^2 + 100.0 * (x[2] - x[1]^2)^2 -end - facts("bboptimize smoketest") do + function rosenbrock2d(x) + return (1.0 - x[1])^2 + 100.0 * (x[2] - x[1]^2)^2 + end + for(m in keys(BlackBoxOptim.ValidMethods)) context("testing $(m) method to ensure it works") do res = bboptimize(rosenbrock2d; Method = m,