Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support free-fragment recycling in shared-segment. Add fingerprint object management. #569

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -476,6 +476,10 @@ $(BINDIR)/$(UNITDIR)/splinter_shmem_test: $(UTIL_SYS) \
$(COMMON_UNIT_TESTOBJ) \
$(LIBDIR)/libsplinterdb.so

$(BINDIR)/$(UNITDIR)/splinter_shmem_oom_test: $(UTIL_SYS) \
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Split out OOM-related test case(s) out of unit/splinter_shmem_test

$(COMMON_UNIT_TESTOBJ) \
$(LIBDIR)/libsplinterdb.so

$(BINDIR)/$(UNITDIR)/splinter_ipc_test: $(UTIL_SYS) \
$(COMMON_UNIT_TESTOBJ)

Expand All @@ -495,8 +499,6 @@ $(BINDIR)/$(UNITDIR)/splinterdb_heap_id_mgmt_test: $(COMMON_TESTOBJ) \
$(OBJDIR)/$(FUNCTIONAL_TESTSDIR)/test_async.o \
$(LIBDIR)/libsplinterdb.so



########################################
# Convenience mini unit-test targets
unit/util_test: $(BINDIR)/$(UNITDIR)/util_test
Expand Down
2 changes: 1 addition & 1 deletion include/splinterdb/splinterdb.h
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ splinterdb_open(const splinterdb_config *cfg, splinterdb **kvs);
// Close a splinterdb
//
// This will flush all data to disk and release all resources
void
int
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To percolate errors found by shm-destroy, if large-fragments not free are still found hanging around.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good change. Can you add a comment defining the meaning of the return value? e.g.

"returns 0 on success, non-zero otherwise."

Or

"returns

  • 0 on success,
  • a positive integer when all data has been persisted but not all resources were able to be released, and
  • a negative number to indicate that not all data was able to be persisted and the database was unable to shut down safely."

splinterdb_close(splinterdb **kvs);

// Register the current thread so that it can be used with splinterdb.
Expand Down
1 change: 0 additions & 1 deletion src/PackedArray.c
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,6 @@ void PACKEDARRAY_JOIN(__PackedArray_unpack_, PACKEDARRAY_IMPL_BITS_PER_ITEM)(con
#include "poison.h"

#define PACKEDARRAY_MALLOC(size) platform_malloc(size)
#define PACKEDARRAY_FREE(p) platform_free(p)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unused interface.


void PackedArray_pack(uint32* a, const uint32 offset, const uint32* in, uint32 count, size_t bitsPerItem)
{
Expand Down
18 changes: 9 additions & 9 deletions src/btree.c
Original file line number Diff line number Diff line change
Expand Up @@ -3103,6 +3103,12 @@ btree_pack_link_extent(btree_pack_req *req,
req->num_edges[height] = 0;
}

static inline bool
btree_pack_can_fit_tuple(btree_pack_req *req)
{
return req->num_tuples < req->max_tuples;
}

static inline btree_node *
btree_pack_create_next_node(btree_pack_req *req, uint64 height, key pivot)
{
Expand Down Expand Up @@ -3167,8 +3173,8 @@ btree_pack_loop(btree_pack_req *req, // IN/OUT
log_trace_key(tuple_key, "btree_pack_loop (bottom)");

if (req->hash) {
platform_assert(req->num_tuples < req->max_tuples);
req->fingerprint_arr[req->num_tuples] =
platform_assert(btree_pack_can_fit_tuple(req));
Copy link
Collaborator Author

@gapisback gapisback Dec 9, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Packaged the condition on deleted L3170 to this simple bool checker fn, which already existed on deleted L3220, below, and the fn has been relocated a few lines above.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice

fingerprint_start(&req->fingerprint)[req->num_tuples] =
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here's where you will start to see the use of fingerprint object and its accessor / interfaces.

req->hash(key_data(tuple_key), key_length(tuple_key), req->seed);
}

Expand Down Expand Up @@ -3216,12 +3222,6 @@ btree_pack_post_loop(btree_pack_req *req, key last_key)
mini_release(&req->mini, last_key);
}

static bool32
btree_pack_can_fit_tuple(btree_pack_req *req, key tuple_key, message data)
{
return req->num_tuples < req->max_tuples;
}

static void
btree_pack_abort(btree_pack_req *req)
{
Expand Down Expand Up @@ -3259,7 +3259,7 @@ btree_pack(btree_pack_req *req)

while (iterator_can_next(req->itor)) {
iterator_curr(req->itor, &tuple_key, &data);
if (!btree_pack_can_fit_tuple(req, tuple_key, data)) {
if (!btree_pack_can_fit_tuple(req)) {
platform_error_log("%s(): req->num_tuples=%lu exceeded output size "
"limit, req->max_tuples=%lu\n",
__func__,
Expand Down
24 changes: 15 additions & 9 deletions src/btree.h
Original file line number Diff line number Diff line change
Expand Up @@ -151,9 +151,9 @@ typedef struct btree_pack_req {
btree_config *cfg;
iterator *itor; // the itor which is being packed
uint64 max_tuples;
hash_fn hash; // hash function used for calculating filter_hash
unsigned int seed; // seed used for calculating filter_hash
uint32 *fingerprint_arr; // IN/OUT: hashes of the keys in the tree
hash_fn hash; // hash function used for calculating filter_hash
unsigned int seed; // seed used for calculating filter_hash
fp_hdr fingerprint; // IN/OUT: hashes of the keys in the tree
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The in-place char * array is now replaced by the fingerprint object, which carries inside of it platform_memfrag{} handle to track allocate memory fragment's size and to free it reliably.


// internal data
uint16 height;
Expand All @@ -168,6 +168,7 @@ typedef struct btree_pack_req {
uint64 num_tuples; // no. of tuples in the output tree
uint64 key_bytes; // total size of keys in tuples of the output tree
uint64 message_bytes; // total size of msgs in tuples of the output tree
uint64 line; // Caller's line #
} btree_pack_req;

struct btree_async_ctxt;
Expand Down Expand Up @@ -325,6 +326,10 @@ btree_iterator_init(cache *cc,
void
btree_iterator_deinit(btree_iterator *itor);

/*
* Initialize BTree Pack request structure. May allocate memory for fingerprint
* array.
*/
static inline platform_status
btree_pack_req_init(btree_pack_req *req,
cache *cc,
Expand All @@ -343,26 +348,27 @@ btree_pack_req_init(btree_pack_req *req,
req->hash = hash;
req->seed = seed;
if (hash != NULL && max_tuples > 0) {
req->fingerprint_arr =
TYPED_ARRAY_ZALLOC(hid, req->fingerprint_arr, max_tuples);

fingerprint_init(&req->fingerprint, hid, max_tuples); // Allocates memory
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inline memory allocation on old L345 is, henceforth, replaced by init()'ing the fingerprint object ... And so on ...


// When we run with shared-memory configured, we expect that it is sized
// big-enough to not get OOMs from here. Hence, only a debug_assert().
debug_assert(req->fingerprint_arr,
debug_assert(!fingerprint_is_empty(&req->fingerprint),
"Unable to allocate memory for %lu tuples",
max_tuples);
if (!req->fingerprint_arr) {
if (fingerprint_is_empty(&req->fingerprint)) {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can no longer check for NULL array ptr, to detect OOM. You must consult the is_empty() method to figure out if there is memory or not.

return STATUS_NO_MEMORY;
}
}
return STATUS_OK;
}

// Free memory if any was allocated for fingerprint array.
static inline void
btree_pack_req_deinit(btree_pack_req *req, platform_heap_id hid)
{
if (req->fingerprint_arr) {
platform_free(hid, req->fingerprint_arr);
if (!fingerprint_is_empty(&req->fingerprint)) {
fingerprint_deinit(hid, &req->fingerprint);
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

deinit() will free memory.

}
}

Expand Down
42 changes: 29 additions & 13 deletions src/clockcache.c
Original file line number Diff line number Diff line change
Expand Up @@ -1818,20 +1818,25 @@ clockcache_init(clockcache *cc, // OUT
cc->heap_id = hid;

/* lookup maps addrs to entries, entry contains the entries themselves */
cc->lookup =
TYPED_ARRAY_MALLOC(cc->heap_id, cc->lookup, allocator_page_capacity);
platform_memfrag memfrag_cc_lookup;
cc->lookup = TYPED_ARRAY_MALLOC_MF(
&memfrag_cc_lookup, cc->heap_id, cc->lookup, allocator_page_capacity);
if (!cc->lookup) {
goto alloc_error;
}
cc->lookup_size = memfrag_size(&memfrag_cc_lookup);
Copy link
Collaborator Author

@gapisback gapisback Apr 14, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here's the first instance of a pair of init / deinit calls, which now need to communicate the size of memory fragment allocated by init().

Like it's done on this line, few common structures now gain a new size field to track the memory fragment's size. These structures are of the kind where they are allocated / init'ed in one function and much later the deinit() method is called in a separate function.


for (i = 0; i < allocator_page_capacity; i++) {
cc->lookup[i] = CC_UNMAPPED_ENTRY;
}

cc->entry =
TYPED_ARRAY_ZALLOC(cc->heap_id, cc->entry, cc->cfg->page_capacity);
platform_memfrag memfrag_cc_entry;
cc->entry = TYPED_ARRAY_ZALLOC_MF(
&memfrag_cc_entry, cc->heap_id, cc->entry, cc->cfg->page_capacity);
if (!cc->entry) {
goto alloc_error;
}
cc->entry_size = memfrag_size(&memfrag_cc_entry);
Copy link
Collaborator Author

@gapisback gapisback Dec 9, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alternative: One thing I considered was to require that all such structures have a commonly-named mf_size field and this caller-macro will (behind-the-scenes) stash away memfrag->size into this structure'smf_size field.

But that solution hides some of this machinations behind macros, which might make code tracking difficult.

Also, not all structures need, or even have, this mf_size field. You only need this tracking mf_size if the free is done in some entirely different function.

For most usages, where allocation and free is done in the same function, we just directly use memfrag->size to do the free. In such cases, the allocating structure (or memory ptr) will not need, and will not have, a mf_size field.

Hence, this alternative hasn't been implemented.


platform_status rc = STATUS_NO_MEMORY;

Expand Down Expand Up @@ -1860,11 +1865,13 @@ clockcache_init(clockcache *cc, // OUT
cc->refcount = platform_buffer_getaddr(&cc->rc_bh);

/* Separate ref counts for pins */
cc->pincount =
TYPED_ARRAY_ZALLOC(cc->heap_id, cc->pincount, cc->cfg->page_capacity);
platform_memfrag memfrag_cc_pincount;
cc->pincount = TYPED_ARRAY_ZALLOC_MF(
&memfrag_cc_pincount, cc->heap_id, cc->pincount, cc->cfg->page_capacity);
if (!cc->pincount) {
goto alloc_error;
}
cc->pincount_size = memfrag_size(&memfrag_cc_pincount);

/* The hands and associated page */
cc->free_hand = 0;
Expand All @@ -1873,13 +1880,16 @@ clockcache_init(clockcache *cc, // OUT
cc->per_thread[thr_i].free_hand = CC_UNMAPPED_ENTRY;
cc->per_thread[thr_i].enable_sync_get = TRUE;
}
platform_memfrag memfrag_cc_batch_busy;
cc->batch_busy =
TYPED_ARRAY_ZALLOC(cc->heap_id,
cc->batch_busy,
cc->cfg->page_capacity / CC_ENTRIES_PER_BATCH);
TYPED_ARRAY_ZALLOC_MF(&memfrag_cc_batch_busy,
cc->heap_id,
cc->batch_busy,
(cc->cfg->page_capacity / CC_ENTRIES_PER_BATCH));
if (!cc->batch_busy) {
goto alloc_error;
}
cc->batch_busy_size = memfrag_size(&memfrag_cc_batch_busy);

return STATUS_OK;

Expand Down Expand Up @@ -1907,10 +1917,12 @@ clockcache_deinit(clockcache *cc) // IN/OUT
}

if (cc->lookup) {
platform_free(cc->heap_id, cc->lookup);
platform_free_mem(cc->heap_id, cc->lookup, cc->lookup_size);
cc->lookup = NULL;
}
if (cc->entry) {
platform_free(cc->heap_id, cc->entry);
platform_free_mem(cc->heap_id, cc->entry, cc->entry_size);
cc->entry = NULL;
}

debug_only platform_status rc = STATUS_TEST_FAILED;
Expand All @@ -1929,11 +1941,15 @@ clockcache_deinit(clockcache *cc) // IN/OUT
cc->refcount = NULL;
}

platform_memfrag mf = {0};
if (cc->pincount) {
platform_free_volatile(cc->heap_id, cc->pincount);
memfrag_init(&mf, cc->heap_id, (void *)cc->pincount, cc->pincount_size);
platform_free_volatile(cc->heap_id, &mf);
}
if (cc->batch_busy) {
platform_free_volatile(cc->heap_id, cc->batch_busy);
memfrag_init(
&mf, cc->heap_id, (void *)cc->batch_busy, cc->batch_busy_size);
platform_free_volatile(cc->heap_id, &mf);
}
}

Expand Down
6 changes: 4 additions & 2 deletions src/clockcache.h
Original file line number Diff line number Diff line change
Expand Up @@ -139,15 +139,17 @@ struct clockcache {

// Stats
cache_stats stats[MAX_THREADS];
size_t lookup_size;
size_t entry_size;
size_t pincount_size;
size_t batch_busy_size;
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is an example of a structure which has 4 member ptr-fields for which we have to allocate separate memory fragments.

Each of these memory fragment's size has to be independently tracked. So, it is difficult to have just one structure-wide mf_size field to track size of fragment allocation.

(This point is in continuation of the alternate solution remark on L1837 above.)

};


/*
*-----------------------------------------------------------------------------
* Function declarations
*-----------------------------------------------------------------------------
*/

void
clockcache_config_init(clockcache_config *cache_config,
io_config *io_cfg,
Expand Down
13 changes: 8 additions & 5 deletions src/memtable.c
Original file line number Diff line number Diff line change
Expand Up @@ -307,15 +307,19 @@ memtable_context_create(platform_heap_id hid,
process_fn process,
void *process_ctxt)
{
platform_memfrag memfrag_ctxt = {0};
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is the 1st example of the new memory allocation interface(s), some of which require an on-stack platform_memfrag structure to be declared, named as an extension of the ptr-handle-variable-name for which allocation is being done.

Here, we are allocating memory for ctxt variable, so the macro expects that a memfrag_ctxt be declared on-stack. Otherwise, compilation will fail.

You will see many such patterns of this coding style.

I chose this approach so that we do not perturb the call to memory allocation on L312 below, and still bring-in the requirement of platform_memfrag * to memory allocation interfaces.

The approach chosen reduces code churn to some extend in this review packet.

memtable_context *ctxt =
TYPED_FLEXIBLE_STRUCT_ZALLOC(hid, ctxt, mt, cfg->max_memtables);
ctxt->cc = cc;
ctxt->mf_size = memfrag_size(&memfrag_ctxt);
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And the 2nd pattern you will see is:

After a successful memory allocation, code will (and needs to):

  1. Save off the size of the memory fragment allocated using memfrag_size().
  2. Consistently, all structures that need to track this fragment-size will have a field named mf_size.
  3. Much later, platform_free() will be invoked using this structure->mf_size field.

I have tried to keep this name the same in all places. If you find something is not named like so, flag it and I will fix that.

ctxt->cc = cc;
memmove(&ctxt->cfg, cfg, sizeof(ctxt->cfg));

platform_mutex_init(
&ctxt->incorporation_mutex, platform_get_module_id(), hid);
ctxt->rwlock = TYPED_MALLOC(hid, ctxt->rwlock);
platform_memfrag memfrag_rwlock = {0};
ctxt->rwlock = TYPED_MALLOC_MF(&memfrag_rwlock, hid, ctxt->rwlock);
platform_batch_rwlock_init(ctxt->rwlock);
ctxt->rwlock_mf_size = memfrag_size(&memfrag_rwlock);

for (uint64 mt_no = 0; mt_no < cfg->max_memtables; mt_no++) {
uint64 generation = mt_no;
Expand Down Expand Up @@ -343,9 +347,8 @@ memtable_context_destroy(platform_heap_id hid, memtable_context *ctxt)
}

platform_mutex_destroy(&ctxt->incorporation_mutex);
platform_free(hid, ctxt->rwlock);

platform_free(hid, ctxt);
platform_free_mem(hid, ctxt->rwlock, ctxt->rwlock_mf_size);
platform_free_mem(hid, ctxt, ctxt->mf_size);
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NOTE: Here is an example where we have two memory fragment sizes to be tracked.

  1. rwlock_mf_size - tracks size of fragment for rw-lock field.
  2. mf_size - tracks size of fragment for holding ctxt structure.

Hence, these two slightly differently named size-fields.

}

void
Expand Down
4 changes: 3 additions & 1 deletion src/memtable.h
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,9 @@ typedef struct memtable_context {
// read lock to read and write lock to modify.
volatile uint64 generation_retired;

bool32 is_empty;
bool is_empty;
size_t mf_size; // # of bytes of memory allocated to this struct
size_t rwlock_mf_size; // # of bytes of memory allocated to rwlock

// Effectively thread local, no locking at all:
btree_scratch scratch[MAX_THREADS];
Expand Down
3 changes: 2 additions & 1 deletion src/merge.c
Original file line number Diff line number Diff line change
Expand Up @@ -545,6 +545,7 @@ merge_iterator_create(platform_heap_id hid,
== ARRAY_SIZE(merge_itor->ordered_iterators),
"size mismatch");

platform_memfrag memfrag_merge_itor;
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another idiomatic usage example:

To allocate memory for a field named merge_itor, declare an on-stack platform_memfrag memfrag_merge_itor, which is used by the allocator macro.

merge_itor = TYPED_ZALLOC(PROCESS_PRIVATE_HEAP_ID, merge_itor);
if (merge_itor == NULL) {
return STATUS_NO_MEMORY;
Expand Down Expand Up @@ -598,7 +599,7 @@ platform_status
merge_iterator_destroy(platform_heap_id hid, merge_iterator **merge_itor)
{
merge_accumulator_deinit(&(*merge_itor)->merge_buffer);
platform_free(PROCESS_PRIVATE_HEAP_ID, *merge_itor);
platform_free_heap(PROCESS_PRIVATE_HEAP_ID, *merge_itor);
*merge_itor = NULL;

return STATUS_OK;
Expand Down
14 changes: 9 additions & 5 deletions src/pcq.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,9 @@

typedef struct {
uint32 num_elems;
cache_aligned_uint32 tail; // Producers enqueue to here
cache_aligned_uint32 head; // Consumer dequeues from here
cache_aligned_uint32 tail; // Producers enqueue to here
cache_aligned_uint32 head; // Consumer dequeues from here
size_t mf_size; // of memory fragment allocated for this struct
void *elems[];
} pcq;

Expand All @@ -28,9 +29,11 @@ pcq_alloc(platform_heap_id hid, size_t num_elems)
{
pcq *q;

platform_memfrag memfrag_q;
q = TYPED_FLEXIBLE_STRUCT_ZALLOC(hid, q, elems, num_elems);
if (q != NULL) {
q->num_elems = num_elems;
q->mf_size = memfrag_size(&memfrag_q);
}

return q;
Expand Down Expand Up @@ -61,11 +64,12 @@ pcq_is_full(const pcq *q)
return pcq_count(q) == q->num_elems;
}

// Deallocate a PCQ
// Deallocate a PCQ, and NULL out input handle
static inline void
pcq_free(platform_heap_id hid, pcq *q)
pcq_free(platform_heap_id hid, pcq **q)
{
platform_free(hid, q);
platform_free_mem(hid, *q, (*q)->mf_size);
*q = NULL;
}

// Enqueue an elem to a PCQ. Element must not be NULL
Expand Down
Loading