Skip to content

Commit

Permalink
work around trunk.c race
Browse files Browse the repository at this point in the history
  • Loading branch information
rtjohnso committed Nov 22, 2024
1 parent 0f73b69 commit ea674ca
Show file tree
Hide file tree
Showing 2 changed files with 138 additions and 9 deletions.
113 changes: 104 additions & 9 deletions src/trunk.c
Original file line number Diff line number Diff line change
Expand Up @@ -3152,6 +3152,87 @@ trunk_inc_branch_range(trunk_handle *spl,
}
}

static inline void
trunk_perform_gc_tasks(trunk_handle *spl, bool immediate)
{
uint64 my_idx = spl->gc_task_queue_head;
my_idx = my_idx % TRUNK_GC_TASK_QUEUE_SIZE;
trunk_gc_task *task = &spl->gc_task_queue[my_idx];
uint64 enqueue_time = task->enqueue_time;
int i = 0;
while (i < 2 && enqueue_time != 0
&& (immediate
|| TRUNK_GC_DELAY < platform_timestamp_elapsed(enqueue_time)))
{
if (__sync_bool_compare_and_swap(&task->enqueue_time, enqueue_time, 0)) {
__sync_fetch_and_add(&spl->gc_task_queue_head, 1);
switch (task->type) {
case TRUNK_GC_TYPE_ROUTING_FILTER_ZAP:
routing_filter_zap(spl->cc, &task->args.filter);
break;
case TRUNK_GC_TYPE_BTREE_DEC_REF_RANGE:
btree_dec_ref_range(
spl->cc,
&spl->cfg.btree_cfg,
task->args.btree_dec_ref_range.root_addr,
key_buffer_key(&task->args.btree_dec_ref_range.min_key),
key_buffer_key(&task->args.btree_dec_ref_range.max_key));
key_buffer_deinit(&task->args.btree_dec_ref_range.min_key);
key_buffer_deinit(&task->args.btree_dec_ref_range.max_key);
break;
default:
platform_default_log("Unknown GC task type %d\n", task->type);
break;
}
}

my_idx = spl->gc_task_queue_head;
my_idx = my_idx % TRUNK_GC_TASK_QUEUE_SIZE;
task = &spl->gc_task_queue[my_idx];
enqueue_time = task->enqueue_time;
i++;
}
}

static inline void
trunk_enqueue_routing_filter_zap(trunk_handle *spl, routing_filter *filter)
{
trunk_perform_gc_tasks(spl, FALSE);

uint64 my_idx = __sync_fetch_and_add(&spl->gc_task_queue_tail, 1);
platform_assert(my_idx - spl->gc_task_queue_head < TRUNK_GC_TASK_QUEUE_SIZE);
my_idx = my_idx % TRUNK_GC_TASK_QUEUE_SIZE;
trunk_gc_task *task = &spl->gc_task_queue[my_idx];
platform_assert(task->enqueue_time == 0);

task->type = TRUNK_GC_TYPE_ROUTING_FILTER_ZAP;
task->args.filter = *filter;
task->enqueue_time = platform_get_timestamp();
}

static inline void
trunk_enqueue_btree_dec_ref_range(trunk_handle *spl,
uint64 btree_root_addr,
key start_key,
key end_key)
{
trunk_perform_gc_tasks(spl, FALSE);

uint64 my_idx = __sync_fetch_and_add(&spl->gc_task_queue_tail, 1);
platform_assert(my_idx - spl->gc_task_queue_head < TRUNK_GC_TASK_QUEUE_SIZE);
my_idx = my_idx % TRUNK_GC_TASK_QUEUE_SIZE;
trunk_gc_task *task = &spl->gc_task_queue[my_idx];
platform_assert(task->enqueue_time == 0);

task->type = TRUNK_GC_TYPE_BTREE_DEC_REF_RANGE;
task->args.btree_dec_ref_range.root_addr = btree_root_addr;
key_buffer_init_from_key(
&task->args.btree_dec_ref_range.min_key, spl->heap_id, start_key);
key_buffer_init_from_key(
&task->args.btree_dec_ref_range.max_key, spl->heap_id, end_key);
task->enqueue_time = platform_get_timestamp();
}

static inline void
trunk_zap_branch_range(trunk_handle *spl,
trunk_branch *branch,
Expand All @@ -3163,8 +3244,10 @@ trunk_zap_branch_range(trunk_handle *spl,
platform_assert((key_is_null(start_key) && key_is_null(end_key))
|| (type != PAGE_TYPE_MEMTABLE && !key_is_null(start_key)));
platform_assert(branch->root_addr != 0, "root_addr=%lu", branch->root_addr);
btree_dec_ref_range(
spl->cc, &spl->cfg.btree_cfg, branch->root_addr, start_key, end_key);
trunk_enqueue_btree_dec_ref_range(
spl, branch->root_addr, start_key, end_key);
// btree_dec_ref_range(
// spl->cc, &spl->cfg.btree_cfg, branch->root_addr, start_key, end_key);
}

/*
Expand Down Expand Up @@ -3914,8 +3997,9 @@ trunk_dec_filter(trunk_handle *spl, routing_filter *filter)
if (filter->addr == 0) {
return;
}
cache *cc = spl->cc;
routing_filter_zap(cc, filter);
trunk_enqueue_routing_filter_zap(spl, filter);
// cache *cc = spl->cc;
// routing_filter_zap(cc, filter);
}

/*
Expand Down Expand Up @@ -4885,13 +4969,15 @@ trunk_branch_iterator_deinit(trunk_handle *spl,
if (itor->root_addr == 0) {
return;
}
cache *cc = spl->cc;
btree_config *btree_cfg = &spl->cfg.btree_cfg;
key min_key = itor->min_key;
key max_key = itor->max_key;
key min_key = itor->min_key;
key max_key = itor->max_key;
btree_iterator_deinit(itor);
if (should_dec_ref) {
btree_dec_ref_range(cc, btree_cfg, itor->root_addr, min_key, max_key);
trunk_enqueue_btree_dec_ref_range(spl, itor->root_addr, min_key, max_key);
// cache *cc = spl->cc;
// btree_config *btree_cfg = &spl->cfg.btree_cfg;
// btree_dec_ref_range(cc, btree_cfg, itor->root_addr, min_key,
// max_key);
}
}

Expand Down Expand Up @@ -7872,6 +7958,10 @@ trunk_prepare_for_shutdown(trunk_handle *spl)
platform_status rc = task_perform_until_quiescent(spl->ts);
platform_assert_status_ok(rc);

while (spl->gc_task_queue_head < spl->gc_task_queue_tail) {
trunk_perform_gc_tasks(spl, TRUE);
}

// destroy memtable context (and its memtables)
memtable_context_destroy(spl->heap_id, spl->mt_ctxt);

Expand Down Expand Up @@ -7934,6 +8024,11 @@ trunk_destroy(trunk_handle *spl)
srq_deinit(&spl->srq);
trunk_prepare_for_shutdown(spl);
trunk_for_each_node(spl, trunk_node_destroy, NULL);

while (spl->gc_task_queue_head < spl->gc_task_queue_tail) {
trunk_perform_gc_tasks(spl, TRUE);
}

mini_unkeyed_dec_ref(spl->cc, spl->mini.meta_head, PAGE_TYPE_TRUNK, FALSE);
// clear out this splinter table from the meta page.
allocator_remove_super_addr(spl->al, spl->id);
Expand Down
34 changes: 34 additions & 0 deletions src/trunk.h
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,35 @@ typedef struct trunk_compacted_memtable {
trunk_compact_bundle_req *req;
} trunk_compacted_memtable;

/* The trunk_gc infrastructure is to compensate for a race condition in this
* version of the trunk. The problem is that the trunk can under some
* circumstances garbage-collect a branch or filter while there still exist
* readers that could access it. A new version of the trunk without the race
* will be merged soon. In the meantime, the gc system just delays garbage
* collection of branches and filters by 10 seconds, which should be pleanty
* enough time to ensure that any old readers will have finished. */

typedef enum trunk_gc_type {
TRUNK_GC_TYPE_ROUTING_FILTER_ZAP,
TRUNK_GC_TYPE_BTREE_DEC_REF_RANGE,
} trunk_gc_type;

typedef struct trunk_gc_task {
uint64_t enqueue_time;
trunk_gc_type type;
union {
routing_filter filter;
struct {
uint64 root_addr;
key_buffer min_key;
key_buffer max_key;
} btree_dec_ref_range;
} args;
} trunk_gc_task;

#define TRUNK_GC_TASK_QUEUE_SIZE (10 * 1024)
#define TRUNK_GC_DELAY (10ULL * 1000 * 1000 * 1000) // 10s

struct trunk_handle {
volatile uint64 root_addr;
uint64 super_block_idx;
Expand Down Expand Up @@ -224,6 +253,11 @@ struct trunk_handle {
// space rec queue
srq srq;

// gc
uint64 gc_task_queue_head;
uint64 gc_task_queue_tail;
trunk_gc_task gc_task_queue[TRUNK_GC_TASK_QUEUE_SIZE];

trunk_compacted_memtable compacted_memtable[/*cfg.mt_cfg.max_memtables*/];
};

Expand Down

0 comments on commit ea674ca

Please sign in to comment.