Skip to content

Commit

Permalink
240104-2
Browse files Browse the repository at this point in the history
  • Loading branch information
yulon committed Jan 4, 2024
1 parent 5b4d359 commit 51758fe
Show file tree
Hide file tree
Showing 7 changed files with 72 additions and 112 deletions.
14 changes: 0 additions & 14 deletions include/rua/binary/bytes.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -1180,20 +1180,6 @@ bytes_util<Span>::rfind(bytes_pattern pat, size_t start_pos) const {
const_bytes_finder::rfind(*$this(), std::move(pat), {}, start_pos));
}

class writeable_bytes : public bytes_ref {
public:
constexpr writeable_bytes(std::nullptr_t = nullptr) : bytes_ref(), $b() {}

writeable_bytes(bytes_ref br) : bytes_ref(br), $b() {}

writeable_bytes(bytes &&b) : bytes_ref(), $b(std::move(b)) {
*static_cast<bytes_ref *>(this) = $b;
}

private:
bytes $b;
};

template <typename Derived, size_t Size = size_of<Derived>::value>
class enable_bytes_accessor
: public bytes_util<enable_bytes_accessor<Derived, Size>> {
Expand Down
84 changes: 19 additions & 65 deletions include/rua/conc/mutex.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,89 +14,43 @@ namespace rua {

class mutex {
public:
class unlocker {
public:
constexpr unlocker() : $mtx(nullptr) {}

~unlocker() {
(*this)();
}

unlocker(const unlocker &) = delete;

unlocker &operator=(const unlocker &) = delete;

unlocker(unlocker &&src) : $mtx(exchange(src.$mtx, nullptr)) {}

unlocker &operator=(unlocker &&src) noexcept {
$mtx = exchange(src.$mtx, nullptr);
return *this;
}

explicit operator bool() const noexcept {
return $mtx;
}

void operator()() {
if (!$mtx) {
return;
}

assert($mtx->$c.load());

auto wtr_opt = $mtx->$wtrs.pop_back_if(
[this]() -> bool { return --$mtx->$c > 0; });
if (!wtr_opt) {
return;
}
(*wtr_opt)->fulfill(std::move(*this));

assert(!$mtx);
}

private:
mutex *$mtx;

explicit unlocker(mutex &mtx) : $mtx(&mtx) {}

friend mutex;
};

constexpr mutex() : $c(0), $wtrs() {}

mutex(const mutex &) = delete;

mutex &operator=(const mutex &) = delete;

unlocker try_lock() noexcept {
bool try_lock() noexcept {
size_t old_val = 0;
return $c.compare_exchange_strong(old_val, 1) ? unlocker(*this)
: unlocker();
return $c.compare_exchange_strong(old_val, 1);
}

future<unlocker> lock() {
future<> lock() {
if (++$c == 1) {
return unlocker(*this);
return meet_expected;
}

auto prm = new newable_promise<unlocker>;

auto is_emplaced = $wtrs.emplace_front_if_non_empty_or(
[this]() -> bool { return $c.load() > 1; }, prm);
if (!is_emplaced) {
return future<unlocker>(*prm);
auto prm = new newable_promise<>;
if ($wtrs.emplace_front_if_non_empty_or(
[this]() -> bool { return $c.load() > 1; }, prm)) {
return future<>(*prm);
}

prm->unuse();
return meet_expected;
}

return unlocker(*this);
void unlock() {
assert($c.load());

auto wtr_opt = $wtrs.pop_back_if([this]() -> bool { return --$c > 0; });
if (!wtr_opt) {
return;
}
(*wtr_opt)->fulfill();
}

private:
std::atomic<size_t> $c;
lockfree_list<promise<unlocker> *> $wtrs;

friend unlocker;
lockfree_list<promise<> *> $wtrs;
};

} // namespace rua
Expand Down
9 changes: 4 additions & 5 deletions include/rua/conc/once.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,18 @@ class once {
if ($executed.load()) {
return {};
}
auto ul = $mtx.try_lock();
if (ul) {
if ($mtx.try_lock()) {
if ($executed.load()) {
return {};
}
std::forward<Callable>(callable)(std::forward<Args>(args)...);
$executed.store(true);
ul();
$mtx.unlock();
return {};
}
return $mtx.lock() >> [this](mutex::unlocker ul) {
return $mtx.lock() >> [this]() {
assert($executed.load());
ul();
$mtx.unlock();
};
}

Expand Down
30 changes: 19 additions & 11 deletions include/rua/io/stream.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ class stream {

auto w_cac = $w_cac();
if (!w_cac.size()) {
return {};
return meet_expected;
}

return unbuf_async_write(w_cac) >> [=](size_t wn) mutable -> future<> {
Expand All @@ -119,6 +119,9 @@ class stream {
if ($->$w_cac_start == $->$w_cac_end) {
$->$w_cac_start = 0;
$->$w_cac_end = 0;
if ($->$w_buf.size() > 4096) {
$->$w_buf.resize(4096);
}
return meet_expected;
}

Expand All @@ -131,32 +134,37 @@ class stream {
return err_stream_was_closed;
}

assert($w_cac_start == 0);
assert($w_cac_end <= $w_buf.size());

auto $ = self();

if (!$w_buf) {
$w_buf.reset(4096);
}

auto w_buf_tail_n = $w_buf.size() - $w_cac_end;
auto data_n = data.size();
auto no_flush = w_buf_tail_n > data_n;

if (data_n > w_buf_tail_n) {
$w_buf.resize((($w_cac_end + data_n - 1) / 1024 + 1) * 1024);
assert($w_buf.size() > 4096);
}

auto cp_n = $w_buf($w_cac_end).copy(data);
assert(cp_n == data_n);
$w_cac_end += cp_n;

assert($w_cac_start == 0);
assert($w_cac_end <= $w_buf.size());
if ($w_cac_end < $w_buf.size()) {
assert(cp_n == data.size());
if (no_flush) {
return cp_n;
}

return flush() >> [=]() -> future<size_t> {
assert($->$w_cac_start == 0);
assert($->$w_cac_end == 0);

if (cp_n == data.size()) {
return cp_n;
}

return $->write(data(cp_n)) >>
[=](size_t wn) -> future<size_t> { return cp_n + wn; };
return data_n;
};
}

Expand Down
32 changes: 20 additions & 12 deletions include/rua/log.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,23 +80,31 @@ inline mutex &log_mutex() {
}

template <typename... Args>
inline void log(Args &&...args) {
auto &p = log_printer();
if (!p) {
return;
inline future<> log(Args &&...args) {
if (!sout()) {
return future<>();
}
auto ul = *log_mutex().lock();
*p.println(std::forward<Args>(args)...);
auto s = make_move_only(sprint(std::forward<Args>(args)..., eol::sys));
return log_mutex().lock() >> [s]() {
return sout().write_all(as_bytes(*s)) >> [](expected<> exp) {
log_mutex().unlock();
return exp;
};
};
}

template <typename... Args>
inline void err_log(Args &&...args) {
auto &p = err_log_printer();
if (!p) {
return;
inline future<> err_log(Args &&...args) {
if (!serr()) {
return future<>();
}
auto ul = *log_mutex().lock();
*p.println(std::forward<Args>(args)...);
auto s = make_move_only(sprint(std::forward<Args>(args)..., eol::sys));
return log_mutex().lock() >> [s]() {
return serr().write_all(as_bytes(*s)) >> [](expected<> exp) {
log_mutex().unlock();
return exp;
};
};
}

} // namespace rua
Expand Down
11 changes: 8 additions & 3 deletions include/rua/printer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@

namespace rua {

template <typename... Args>
inline std::string sprint(Args &&...args) {
return join({to_temp_string(args)...}, ' ');
}

class printer {
public:
constexpr printer(std::nullptr_t = nullptr) : $w(nullptr), $eol(eol::lf) {}
Expand All @@ -26,12 +31,12 @@ class printer {
}

template <typename... Args>
future<> print(Args &&...args) {
return $w->write_all(as_bytes(join({to_temp_string(args)...}, ' ')));
future<size_t> print(Args &&...args) {
return $w->write(as_bytes(sprint(std::forward<Args>(args)...)));
}

template <typename... Args>
future<> println(Args &&...args) {
future<size_t> println(Args &&...args) {
return print(std::forward<Args>(args)..., $eol);
}

Expand Down
4 changes: 2 additions & 2 deletions test/ucontext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
#include <doctest/doctest.h>

static void log(rua::string_view str) {
static size_t log_sz = 0;
/*static size_t log_sz = 0;
if (log_sz) {
*rua::printer(rua::sout()).print(std::string(log_sz, '\b'));
log_sz = 0;
Expand All @@ -14,7 +14,7 @@ static void log(rua::string_view str) {
return;
}
log_sz = str.size();
*rua::printer(rua::sout()).print(str);
*rua::printer(rua::sout()).print(str);*/
}

TEST_CASE("ucontext") {
Expand Down

0 comments on commit 51758fe

Please sign in to comment.