Skip to content

Commit

Permalink
AsyncTask rewritten in terms of new Scheduler
Browse files Browse the repository at this point in the history
  • Loading branch information
wkjarosz committed Mar 2, 2024
1 parent c36693e commit 6b0383c
Show file tree
Hide file tree
Showing 6 changed files with 56 additions and 73 deletions.
30 changes: 15 additions & 15 deletions src/app.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -644,20 +644,20 @@ void HDRViewApp::load_image(const string filename, string_view buffer)
// convert the buffer (if any) to a string so the async thread has its own copy
// then load from the string or filename depending on whether the buffer is empty
m_pending_images.emplace_back(
std::make_shared<PendingImage>(filename,
[buffer_str = string(buffer), filename](AtomicProgress &prog)
{
if (buffer_str.empty())
{
std::ifstream is{filename, std::ios_base::binary};
return Image::load(is, filename);
}
else
{
std::istringstream is{buffer_str};
return Image::load(is, filename);
}
}));
std::make_shared<PendingImages>(filename,
[buffer_str = string(buffer), filename](AtomicProgress &prog)
{
if (buffer_str.empty())
{
std::ifstream is{filename, std::ios_base::binary};
return Image::load(is, filename);
}
else
{
std::istringstream is{buffer_str};
return Image::load(is, filename);
}
}));

// remove any instances of filename from the recent files list until we know it has loaded successfully
m_recent_files.erase(std::remove(m_recent_files.begin(), m_recent_files.end(), filename), m_recent_files.end());
Expand All @@ -672,7 +672,7 @@ void HDRViewApp::load_image(const string filename, string_view buffer)
void HDRViewApp::add_pending_images()
{
// Criterion to check if a image is ready, and copy it into our m_images vector if so
auto removable = [this](shared_ptr<PendingImage> p)
auto removable = [this](shared_ptr<PendingImages> p)
{
if (p->images.ready())
{
Expand Down
6 changes: 3 additions & 3 deletions src/app.h
Original file line number Diff line number Diff line change
Expand Up @@ -198,13 +198,13 @@ class HDRViewApp
int m_current = -1, m_reference = -1;

using ImageLoadTask = AsyncTask<vector<ImagePtr>>;
struct PendingImage
struct PendingImages
{
string filename;
ImageLoadTask images;
PendingImage(const string &f, ImageLoadTask::TaskFunc func) : filename(f), images(func) { images.compute(); }
PendingImages(const string &f, ImageLoadTask::TaskFunc func) : filename(f), images(func) { images.compute(); }
};
vector<shared_ptr<PendingImage>> m_pending_images;
vector<shared_ptr<PendingImages>> m_pending_images;

float m_exposure = 0.f, m_exposure_live = 0.f, m_gamma = 2.2f, m_gamma_live = 2.f;
AxisScale_ m_x_scale = AxisScale_Asinh, m_y_scale = AxisScale_Linear;
Expand Down
75 changes: 30 additions & 45 deletions src/async.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,47 +6,38 @@
#pragma once

#include "progress.h"
#include <chrono>
#include "scheduler.h"
#include <functional>
#include <future>

template <typename T>
class AsyncTask
{
public:
#if defined(FORCE_SERIAL) or defined(__EMSCRIPTEN__)
// In emscripten, even if compiled with pthread support, we shouldn't use these simple async-based tasks since the
// async would block on the mail thread, which is a no-no.
static const auto policy = std::launch::deferred;
#else
static const auto policy = std::launch::async;
#endif

using TaskFunc = std::function<T(AtomicProgress &progress)>;
using NoProgressTaskFunc = std::function<T(void)>;
using NoProgressTaskFunc = std::function<T()>;

/*!
* Create an asynchronous task that can report back on its progress
* @param compute The function to execute asynchronously
*/
AsyncTask(TaskFunc compute) :
m_compute(
[compute](AtomicProgress &prog)
AsyncTask(TaskFunc f, Scheduler *s = nullptr) :
m_func(
[f](AtomicProgress &prog)
{
T ret = compute(prog);
T ret = f(prog);
prog.set_done();
return ret;
}),
m_progress(true)
m_progress(true), m_threadpool(s)
{
}

/*!
* Create an asynchronous task without progress updates
* @param compute The function to execute asynchronously
*/
AsyncTask(NoProgressTaskFunc compute) :
m_compute([compute](AtomicProgress &) { return compute(); }), m_progress(false)
AsyncTask(NoProgressTaskFunc f, Scheduler *s = nullptr) :
m_func([f](AtomicProgress &) { return f(); }), m_progress(false), m_threadpool(s)
{
}

Expand All @@ -56,8 +47,19 @@ class AsyncTask
void compute()
{
// start only if not done and not already started
if (!m_future.valid() && !m_ready)
m_future = std::async(policy, m_compute, std::ref(m_progress));
if (!m_started)
{
auto callback = [](int, int, void *payload)
{
AsyncTask *self = (AsyncTask *)payload;
self->m_value = self->m_func(self->m_progress);
};

auto pool = m_threadpool ? m_threadpool : Scheduler::singleton();
m_task = pool->parallelizeAsync(1, this, callback);

m_started = true;
}
}

/*!
Expand All @@ -68,12 +70,7 @@ class AsyncTask
*/
T &get()
{
if (m_ready)
return m_value;

m_value = m_future.valid() ? m_future.get() : m_compute(m_progress);

m_ready = true;
m_task.wait();
return m_value;
}

Expand All @@ -95,25 +92,13 @@ class AsyncTask
/*!
* @return true if the computation has finished
*/
bool ready() const
{
if (m_ready)
return true;

if (!m_future.valid())
return false;

auto status = m_future.wait_for(std::chrono::seconds(0));

// pretend that the computation is ready for deferred execution since we will compute it on-demand in
// get() anyway
return (status == std::future_status::ready || status == std::future_status::deferred);
}
bool ready() const { return m_started && m_task.ready(); }

private:
TaskFunc m_compute;
std::future<T> m_future;
T m_value;
AtomicProgress m_progress;
bool m_ready = false;
T m_value;
TaskFunc m_func;
Scheduler::TaskTracker m_task;
AtomicProgress m_progress;
bool m_started = false;
Scheduler *m_threadpool = nullptr;
};
11 changes: 2 additions & 9 deletions src/image.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ void Channel::update_stats()
// cancel and wait for the current task
spdlog::trace("Canceling and waiting for outdated stats computation.");
async_stats.cancel();
async_stats.get();
// async_stats.get();

// create the new task
spdlog::trace("Scheduling new stats computation");
Expand All @@ -358,13 +358,6 @@ void Channel::update_stats()
return ret;
});

// async_tracker = do_async(
// [this, desired_settings](AtomicProgress &prog)
// {
// async_stats2 = make_shared<PixelStats>(*this, desired_settings.exposure, desired_settings.x_scale,
// desired_settings.y_scale, prog);
// },
// progress);
async_settings = desired_settings;
async_stats.compute();
};
Expand Down Expand Up @@ -616,7 +609,7 @@ void Image::finalize()
}

// update the stats/histograms for all channels
for (auto &c : channels) c.update_stats();
// for (auto &c : channels) c.update_stats();
}

string Image::to_string() const
Expand Down
2 changes: 2 additions & 0 deletions src/scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,8 @@ Scheduler::TaskTracker Scheduler::async(int numUnits, void *data, TaskFn f, Task
return result;
}

bool Scheduler::TaskTracker::ready() const { return !task || task->dependencies.load() == 0; }

void Scheduler::TaskTracker::wait()
{
if (!task)
Expand Down
5 changes: 4 additions & 1 deletion src/scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@
//

// scheduler.h

#pragma once

#include "progress.h"
#include <atomic>
#include <condition_variable>
Expand Down Expand Up @@ -92,7 +95,7 @@ class Scheduler
}

// Non-blocking check whether the computation is finished.
bool ready() const { return !task; }
bool ready() const;

// Wait for the task to complete. Calling wait will make the calling thread to temporarily
// enter the task scheduler and participate to the computation.
Expand Down

0 comments on commit 6b0383c

Please sign in to comment.