Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implemented custom byte_vector for local storage #245

Open
wants to merge 17 commits into
base: v0.7-dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ target_include_directories(
$<INSTALL_INTERFACE:include>
)
target_link_libraries(
ygm INTERFACE MPI::MPI_CXX Threads::Threads stdc++fs ${YGM_CEREAL_TARGET}
ygm INTERFACE MPI::MPI_CXX Threads::Threads stdc++fs rt ${YGM_CEREAL_TARGET}
)

option(TEST_WITH_SLURM "Run tests with Slurm" OFF)
Expand Down
2 changes: 1 addition & 1 deletion include/ygm/collective.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ void bcast(T &to_bcast, int root, const comm &cm) {
ASSERT_MPI(
MPI_Bcast(&to_bcast, sizeof(T), MPI_BYTE, root, cm.get_mpi_comm()));
} else {
std::vector<std::byte> packed;
ygm::detail::byte_vector packed;
cereal::YGMOutputArchive oarchive(packed);
if (cm.rank() == root) {
oarchive(to_bcast);
Expand Down
18 changes: 10 additions & 8 deletions include/ygm/comm.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@
#include <deque>
#include <functional>
#include <memory>
#include <utility>
#include <vector>

#include <ygm/detail/byte_vector.hpp>
#include <ygm/detail/comm_environment.hpp>
#include <ygm/detail/comm_router.hpp>
#include <ygm/detail/comm_stats.hpp>
Expand Down Expand Up @@ -182,7 +184,7 @@ class comm {
private:
void comm_setup(MPI_Comm comm);

size_t pack_header(std::vector<std::byte> &packed, const int dest,
size_t pack_header(ygm::detail::byte_vector &packed, const int dest,
size_t size);

std::pair<uint64_t, uint64_t> barrier_reduce_counts();
Expand All @@ -195,23 +197,23 @@ class comm {

void flush_to_capacity();

void post_new_irecv(std::shared_ptr<std::byte[]> &recv_buffer);
void post_new_irecv(std::shared_ptr<ygm::detail::byte_vector> &recv_buffer);

template <typename Lambda, typename... PackArgs>
size_t pack_lambda(std::vector<std::byte> &packed, Lambda l,
size_t pack_lambda(ygm::detail::byte_vector &packed, Lambda l,
const PackArgs &...args);

template <typename Lambda, typename... PackArgs>
void pack_lambda_broadcast(Lambda l, const PackArgs &...args);

template <typename Lambda, typename RemoteLogicLambda, typename... PackArgs>
size_t pack_lambda_generic(std::vector<std::byte> &packed, Lambda l,
size_t pack_lambda_generic(ygm::detail::byte_vector &packed, Lambda l,
RemoteLogicLambda rll, const PackArgs &...args);

void queue_message_bytes(const std::vector<std::byte> &packed,
void queue_message_bytes(const ygm::detail::byte_vector &packed,
const int dest);

void handle_next_receive(std::shared_ptr<std::byte[]> buffer,
void handle_next_receive(std::shared_ptr<ygm::detail::byte_vector> &buffer,
const size_t buffer_size);

bool process_receive_queue();
Expand All @@ -234,13 +236,13 @@ class comm {
MPI_Comm m_comm_barrier;
MPI_Comm m_comm_other;

std::vector<std::vector<std::byte>> m_vec_send_buffers;
std::vector<ygm::detail::byte_vector> m_vec_send_buffers;
size_t m_send_buffer_bytes = 0;
std::deque<int> m_send_dest_queue;

std::deque<mpi_irecv_request> m_recv_queue;
std::deque<mpi_isend_request> m_send_queue;
std::vector<std::shared_ptr<std::vector<std::byte>>> m_free_send_buffers;
std::vector<std::shared_ptr<ygm::detail::byte_vector>> m_free_send_buffers;

size_t m_pending_isend_bytes = 0;

Expand Down
165 changes: 165 additions & 0 deletions include/ygm/detail/byte_vector.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
#pragma once

#include <cstring>
#include <string>
#include <cstddef>
#include <stdio.h>
#include <stdlib.h>
#include <fcntl.h>
#include <errno.h>
#include <iostream>
#include <stdexcept>
#include <sys/mman.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <unistd.h>
#include <vector>

namespace ygm::detail {
class byte_vector {
using value_type = std::byte;
using pointer = std::byte*;
using reference = std::byte&;
using difference_type = std::ptrdiff_t;
public:
class Byte_Iterator {
public:

Byte_Iterator() : m_bv(nullptr), i(0) {}
Byte_Iterator(byte_vector* v, size_t i) : m_bv(v), i(i) {}

reference operator*() { return (*m_bv)[i]; }
const reference operator*() const { return (*m_bv)[i]; }
pointer operator->() { return &(*m_bv)[i]; }
const pointer operator->() const { return &(*m_bv)[i]; }
reference operator[](int n) { return (*m_bv)[i + n]; }
const reference operator[](int n) const { return (*m_bv)[i + n]; }

Byte_Iterator& operator++() { ++i; return *this; }
Byte_Iterator& operator--() { --i; return *this; }
Byte_Iterator operator++(int) { Byte_Iterator r(*this); ++i; return r; }
Byte_Iterator operator--(int) { Byte_Iterator r(*this); --i; return r; }

Byte_Iterator& operator+=(int n) { i += n; return *this; }
Byte_Iterator& operator-=(int n) { i -= n; return *this; }

difference_type operator-(Byte_Iterator const& r) const { return i - r.i; }

bool operator<(Byte_Iterator const& r) const { return i < r.i; }
bool operator<=(Byte_Iterator const& r) const { return i <= r.i; }
bool operator>(Byte_Iterator const& r) const { return i > r.i; }
bool operator>=(Byte_Iterator const& r) const { return i >= r.i; }
bool operator!=(Byte_Iterator const& r) const { return i != r.i; }
bool operator==(Byte_Iterator const& r) const { return i == r.i; }

private:
const byte_vector* m_bv;
size_t i;
};

byte_vector() : m_data(nullptr), m_size(0), m_capacity(0) {}

byte_vector(size_t set_capacity) : m_size(0) {
m_capacity = get_page_aligned_size(set_capacity);
// now that the shm_file is the correct size we can memory map to it.
m_data = (pointer) mmap(NULL, m_capacity, PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS, 0, 0);
if (m_data == MAP_FAILED) {
std::cerr << strerror(errno) << std::endl;
throw std::runtime_error("mmap failed");
}
}

~byte_vector() {
munmap(m_data, m_capacity);
}

byte_vector(byte_vector&) = default;
byte_vector(const byte_vector&) = default;
byte_vector(byte_vector&&) = default;

const reference operator[](int i) const { return m_data[i]; }
reference operator[](int i) { return m_data[i]; }

pointer data() const { return m_data; }
bool empty() const { return m_size == 0; }
void clear() { m_size = 0; }
size_t size() const { return m_size; }
size_t capacity() const { return m_capacity; }

Byte_Iterator begin() { return Byte_Iterator(this, 0); }
Byte_Iterator end() { return Byte_Iterator(this, m_size); }

void swap(byte_vector& other) {
std::swap(m_data, other.m_data);
std::swap(m_size, other.m_size);
std::swap(m_capacity, other.m_capacity);
}

void reserve(size_t cap) {
size_t new_capacity = get_page_aligned_size(cap);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May be worth implementing an exponentially growing size to get the number of calls to mremap to be logarithmic in the buffer size rather than linear.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be experimented with before implementing.

if(m_data == nullptr) {
m_capacity = new_capacity;
// now that the shm_file is the correct size we can memory map to it.
m_data = (pointer) mmap(NULL, m_capacity, PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS, 0, 0);
if (m_data == MAP_FAILED) {
throw std::runtime_error("mmap failed");
}
return;
}
// if max osx handler
#if __APPLE__
pointer temp = (pointer) mmap(NULL, new_capacity, PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS, 0, 0);
if (temp == MAP_FAILED) {
std::cerr << strerror(errno) << std::endl;
throw std::runtime_error("mmap failed");
}
memcpy(temp, m_data, m_size);
munmap(m_data, m_capacity);
m_data = temp;
#else
m_data = (pointer) mremap(m_data, m_capacity, new_capacity, MREMAP_MAYMOVE);
if(m_data == MAP_FAILED) {
std::cerr << m_capacity << "," << cap << "," << new_capacity << ", " << strerror(errno) << std::endl;
throw std::runtime_error("mremap failed");
}
#endif
m_capacity = new_capacity;
}

void resize(size_t s) {
if(s > m_capacity) this->reserve(s);
m_size = s;
}

void push_bytes(const void* d, size_t s) {
if(s > m_capacity - m_size) {
size_t new_capacity = std::max(m_capacity * 2, m_size + s);
this->reserve(new_capacity);
}
memcpy(m_data + m_size, d, s);
m_size += s;
}

void push_bytes(void* d, size_t s) {
if(s > m_capacity - m_size) {
size_t new_capacity = std::max(m_capacity * 2, m_size + s);
this->reserve(new_capacity);
}
memcpy(m_data + m_size, d, s);
m_size += s;
}

private:
size_t get_page_aligned_size(size_t s) const {
auto pagesize = getpagesize();
auto num_pages = s / pagesize;
if (s % pagesize != 0) num_pages++;
return num_pages * pagesize;
}

pointer m_data;
size_t m_size;
size_t m_capacity;
};

} // end ygm namespace
Loading