-
Notifications
You must be signed in to change notification settings - Fork 0
/
AsyncReader.hpp
135 lines (126 loc) · 3.87 KB
/
AsyncReader.hpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
#pragma once
#include <stdexec/execution.hpp>
#include <exec/async_scope.hpp>
#include <chrono>
#include <iostream>
#include <thread>
#include <shared_mutex>
#include <list>
#include <condition_variable>
template<typename T, stdexec::sender_of<stdexec::set_value_t(T)> U>
class AsyncReader
{
private:
static constexpr const uint32_t BACKBUFFER_SIZE = 2;
public:
using Sender = U;
using Result = T;
AsyncReader(Sender sender, exec::async_scope* scope)
: m_sender(std::move(sender))
, m_scope(scope)
{
asyncReadImpl();
}
struct [[nodiscard]] Awaiter
{
AsyncReader* reader {nullptr};
std::coroutine_handle<> awaiting_coroutine = std::noop_coroutine();
bool await_ready() { return reader->isReady(); }
bool await_suspend(std::coroutine_handle<> handle)
{
if(reader->m_awaiting_coroutine != nullptr)
{
throw std::runtime_error("Only one co_await is supported");
}
std::unique_lock lock(reader->m_backbuffer_mutex);
awaiting_coroutine = handle;
/*
Lock is necessary to protect both cases, imagine that else is not part:
- [ThreadA] reader.hasValue()? -> false & release lock
- [ThreadB] reader.setValue() & release lock
- [ThreadB] reader.resumeAwaitingCoro() (which is null right now)
- [ThreadA] reader.setAwawitingCoro() (too late)
*/
if(reader->m_backbuffer[reader->getReadIndex()].has_value())
{
return false;
}
else
{
reader->m_awaiting_coroutine = this;
return true;
}
}
Result await_resume()
{
Result result = *reader->clear();
reader->asyncReadImpl();
return result;
}
explicit Awaiter(AsyncReader* reader)
: reader(reader)
{}
};
Awaiter asyncRead()
{
return Awaiter{this};
}
private:
void asyncReadImpl()
{
using stdexec::then;
m_scope->spawn(m_sender | then([this](Result result) { setResult(std::move(result)); resumeAwaitingCoroutine(); } ));
}
void setResult(Result result)
{
std::lock_guard lock(m_backbuffer_mutex);
if(m_backbuffer[getWriteIndex()] != std::nullopt)
{
throw std::runtime_error("Data lost");
}
m_backbuffer[getWriteIndex()] = std::move(result);
stepBackbuffer();
}
bool isReady() const
{
std::lock_guard lock(m_backbuffer_mutex);
return m_backbuffer[getReadIndex()].has_value();
}
std::optional<Result> clear()
{
std::lock_guard lock(m_backbuffer_mutex);
auto result = std::move(m_backbuffer[getReadIndex()]);
m_backbuffer[getReadIndex()] = std::nullopt;
return result;
}
// Should be outside of the locking scope to avoid keep locking the scope while the coroutine runs
void resumeAwaitingCoroutine()
{
if(m_awaiting_coroutine != nullptr)
{
auto coroutine = m_awaiting_coroutine.exchange(nullptr);
if(coroutine != nullptr)
{
coroutine->awaiting_coroutine.resume();
}
}
}
uint32_t getWriteIndex() const
{
return m_head_index;
}
uint32_t getReadIndex() const
{
return (m_head_index + m_backbuffer.size() - 1) % m_backbuffer.size();
}
void stepBackbuffer()
{
m_head_index = (m_head_index + 1) % m_backbuffer.size();
}
Sender m_sender;
exec::async_scope* m_scope {nullptr};
mutable std::mutex m_backbuffer_mutex;
std::array<std::optional<Result>, BACKBUFFER_SIZE> m_backbuffer {};
uint32_t m_head_index{0};
std::atomic<Awaiter*> m_awaiting_coroutine {nullptr};
};