Skip to content

Commit

Permalink
CORTX-27600 dtm0/log: initial implementation for dtx0 and pruner (Sea…
Browse files Browse the repository at this point in the history
…gate#1503)

Problem:
Currently, dtm0 log entries are stored in a simple list, which reduces
the performance because for each pmsg we have to traverse this list to
find the entry to update it with the information about the received pmsg.

Solution:
In the dtm0 log we store the entries in the btree.
This solves the above issue.

Also, we add a simple pruner which deletes the old records from the log
if they are All-P (i.e. received pmsgs from all participants).
Without pmsgs and redo_lists (TBD in next PRs), the pruner just deletes
records from the log as soon as they are added there.

In contrary to the old dtm0 code currently in main, which requires compilation
option to be enabled, this code will be always ON.
It will help us to debug the code earlier.

The patch contains UTs for the log and the pruner.

Signed-off-by: Maxim Medved max.medved@seagate.com
Co-authored-by: Ivan Alekhin ivan.alekhin@seagate.com
Co-authored-by: Madhavrao Vemuri madhav.vemuri@seagate.com
Co-authored-by: Andriy Tkachuk andriy.tkachuk@seagate.com
  • Loading branch information
max-seagate authored and kiwionly2 committed Aug 30, 2022
1 parent e055c46 commit 51776f7
Show file tree
Hide file tree
Showing 54 changed files with 2,667 additions and 182 deletions.
1 change: 1 addition & 0 deletions be/btree.c
Original file line number Diff line number Diff line change
Expand Up @@ -4113,6 +4113,7 @@ static bool fkvv_invariant(const struct nd *node)
M0_BT_COB_OBJECT_INDEX,
M0_BT_COB_FILEATTR_BASIC,
M0_BT_CONFDB,
M0_BT_DTM0_LOG,
M0_BT_UT_KV_OPS))) &&
_0C(h->fkvv_ksize != 0);
}
Expand Down
1 change: 1 addition & 0 deletions be/btree.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ enum m0_btree_types {
M0_BT_COB_FILEATTR_OMG,
M0_BT_COB_BYTECOUNT,
M0_BT_CONFDB,
M0_BT_DTM0_LOG,
M0_BT_UT_KV_OPS,
M0_BT_NR
};
Expand Down
35 changes: 35 additions & 0 deletions be/linux_kernel/stubs.c
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,13 @@ M0_INTERNAL bool m0_be_op_is_done(struct m0_be_op *op)
return true;
}

M0_INTERNAL int m0_be_op_tick_ret(struct m0_be_op *op,
struct m0_fom *fom,
int next_state)
{
return 0;
}

M0_INTERNAL struct m0_be_allocator *m0_be_seg_allocator(struct m0_be_seg *seg)
{
return NULL;
Expand Down Expand Up @@ -187,6 +194,11 @@ M0_INTERNAL void m0_be_tx_open(struct m0_be_tx *tx)
tx->t_sm.sm_state = M0_BTS_ACTIVE;
}

M0_INTERNAL int m0_be_tx_exclusive_open_sync(struct m0_be_tx *tx)
{
return -ENOSYS;
}

M0_INTERNAL void
m0_be_tx_capture(struct m0_be_tx *tx, const struct m0_be_reg *reg)
{
Expand Down Expand Up @@ -293,6 +305,12 @@ M0_INTERNAL struct m0_be_seg *m0_be_domain_seg0_get(struct m0_be_domain *dom)
return NULL;
}

M0_INTERNAL struct m0_be_seg *
m0_be_domain_seg_first(const struct m0_be_domain *dom)
{
return NULL;
}

int m0_be_tx_fol_add(struct m0_be_tx *tx, struct m0_fol_rec *rec)
{
return M0_ERR(-EINVAL);
Expand Down Expand Up @@ -327,5 +345,22 @@ M0_INTERNAL void m0_be_queue_unlock(struct m0_be_queue *bq)
{
}

void m0_be_0type_add_credit(struct m0_be_domain *dom,
const struct m0_be_0type *zt,
const char *suffix,
const struct m0_buf *data,
struct m0_be_tx_credit *credit)
{
}

int m0_be_0type_add(struct m0_be_0type *zt,
struct m0_be_domain *dom,
struct m0_be_tx *tx,
const char *suffix,
const struct m0_buf *data)
{
return -ENOSYS;
}


#undef M0_TRACE_SUBSYSTEM
2 changes: 1 addition & 1 deletion be/seg0.c
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ int m0_be_0type_del(struct m0_be_0type *zt,
const char *suffix)
{
struct m0_be_seg *seg;
struct m0_buf *opt;
struct m0_buf *opt = NULL;
char keyname[256] = {};
int rc;

Expand Down
9 changes: 9 additions & 0 deletions be/seg0.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,15 @@ int m0_be_0type_del(struct m0_be_0type *zt,
struct m0_be_tx *tx,
const char *suffix);


/**
* Returns a record with a given m0_be_0type and a given suffix.
* @return -ENOENT no such record exists.
*/
int m0_be_0type_get(struct m0_be_0type *zt,
struct m0_be_domain *dom,
const char *suffix,
struct m0_buf *data);
/**
* <hr> <!------------------------------------------------------------>
* @section seg0-metadata Meta-segment (seg0), systematic BE storage startup.
Expand Down
2 changes: 2 additions & 0 deletions be/ut/helper.c
Original file line number Diff line number Diff line change
Expand Up @@ -262,13 +262,15 @@ void m0_be_ut_backend_cfg_default(struct m0_be_domain_cfg *cfg)
extern struct m0_be_0type m0_be_cob0;
extern struct m0_be_0type m0_be_active_record0;
extern struct m0_be_0type m0_be_dtm0;
extern struct m0_be_0type m0_dtm0_log0;

static struct m0_atomic64 dom_key = { .a_value = 0xbef11e };
static const struct m0_be_0type *zts[] = {
&m0_stob_ad_0type,
&m0_be_cob0,
&m0_be_active_record0,
&m0_be_dtm0,
&m0_dtm0_log0,
};
struct m0_reqh *reqh = cfg->bc_engine.bec_reqh;

Expand Down
57 changes: 57 additions & 0 deletions cas/cas.c
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
#include "mdservice/fsync_fops.h" /* m0_fsync_fom_ops */
#include "mdservice/fsync_fops_xc.h" /* m0_fop_fsync_xc */
#include "cas/client.h" /* m0_cas_sm_conf_init */
#include "lib/memory.h" /* M0_ALLOC_PTR */

struct m0_fom_type_ops;
struct m0_sm_conf;
Expand Down Expand Up @@ -285,6 +286,62 @@ M0_INTERNAL bool m0_crv_is_none(const struct m0_crv *crv)
return memcmp(crv, &M0_CRV_INIT_NONE, sizeof(*crv)) == 0;
}

M0_INTERNAL bool m0_cas_fop_is_redoable(struct m0_fop *fop)
{
struct m0_fop_type *cas_fopt = &cas_put_fopt;
return fop->f_type == cas_fopt;
}

M0_INTERNAL int m0_cas_fop2redo(const struct m0_fop *fop,
struct m0_dtm0_redo *redo)
{
struct m0_cas_op *op = m0_fop_data(fop);
struct m0_buf payload = {};
int rc;

/* TODO: encode fop opcode and fop reply as well. */

rc = m0_xcode_obj_enc_to_buf(&M0_XCODE_OBJ(m0_cas_op_xc, op),
&payload.b_addr, &payload.b_nob);
if (rc != 0)
return M0_ERR(rc);

rc = m0_dtm0_redo_init(redo, &op->cg_descriptor,
&payload, M0_DTX0_PAYLOAD_CAS);
m0_buf_free(&payload);

return M0_RC(rc);
}

M0_INTERNAL int m0_cas_redo2fop(struct m0_fop *fop,
const struct m0_dtm0_redo *redo)
{
struct m0_buf *payload = redo->dtr_payload.dtp_data.ab_elems;
struct m0_cas_op *op;
struct m0_fop_type *cas_fopt;
int rc;

M0_PRE(redo->dtr_payload.dtp_type == M0_DTX0_PAYLOAD_CAS);
M0_PRE(redo->dtr_payload.dtp_data.ab_count == 1);

/* TODO: Select the right fop type based on encoded req type. */
cas_fopt = &cas_put_fopt;

M0_ALLOC_PTR(op);
if (op == NULL)
return M0_ERR(-ENOMEM);

rc = m0_xcode_obj_dec_from_buf(&M0_XCODE_OBJ(m0_cas_op_xc, op),
payload->b_addr, payload->b_nob);
if (rc == 0)
m0_fop_init(fop, cas_fopt, op, &m0_fop_release);
else
m0_free(op);

return M0_RC(rc);
}


#undef M0_TRACE_SUBSYSTEM

/** @} end of cas group */
Expand Down
22 changes: 22 additions & 0 deletions cas/cas.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@
#include "dix/layout_xc.h"
#include "dtm0/tx_desc.h" /* tx_desc */
#include "dtm0/tx_desc_xc.h" /* xc for tx_desc */
#include "dtm0/dtm0.h" /* m0_dtx0_descriptor */
#include "dtm0/dtm0_xc.h" /* xc for m0_dtx0_descriptor */

/**
* @page cas-fspec The catalogue service (CAS)
Expand Down Expand Up @@ -398,6 +400,12 @@ struct m0_cas_op {
* Transaction descriptor associated with CAS operation.
*/
struct m0_dtm0_tx_desc cg_txd;

/**
* Transaction descriptor (new DTM0) associated with this CAS
* operation.
*/
struct m0_dtx0_descriptor cg_descriptor;
} M0_XCA_RECORD M0_XCA_DOMAIN(rpc);

/**
Expand Down Expand Up @@ -475,6 +483,11 @@ M0_INTERNAL void m0_cas__ut_svc_be_set(struct m0_reqh_service *svc,
struct m0_be_domain *dom);
M0_INTERNAL struct m0_be_domain *
m0_cas__ut_svc_be_get(struct m0_reqh_service *svc);
struct m0_dtm0_domain;
M0_INTERNAL void m0_cas__ut_svc_dtm0_domain_set(struct m0_reqh_service *svc,
struct m0_dtm0_domain *dod);
M0_INTERNAL struct m0_be_domain *
m0_cas__ut_svc_be_get(struct m0_reqh_service *svc);
M0_INTERNAL int m0_cas_fom_spawn(
struct m0_fom *lead,
struct m0_fom_thralldom *thrall,
Expand Down Expand Up @@ -542,6 +555,15 @@ M0_INTERNAL struct m0_dtm0_ts m0_crv_ts(const struct m0_crv *crv);
M0_INTERNAL void m0_crv_ts_set(struct m0_crv *crv,
const struct m0_dtm0_ts *ts);

struct m0_fop;
struct m0_dtm0_redo;

M0_INTERNAL bool m0_cas_fop_is_redoable(struct m0_fop *fop);
M0_INTERNAL int m0_cas_fop2redo(const struct m0_fop *fop,
struct m0_dtm0_redo *redo);
M0_INTERNAL int m0_cas_redo2fop(struct m0_fop *fop,
const struct m0_dtm0_redo *redo);

/** @} end of cas_dfspec */
#endif /* __MOTR_CAS_CAS_H__ */

Expand Down
47 changes: 47 additions & 0 deletions cas/client.c
Original file line number Diff line number Diff line change
Expand Up @@ -1649,6 +1649,47 @@ static int cas_req_prep(struct m0_cas_req *req,
return M0_RC(rc);
}

static int
m0_dtm0_tx_desc2dtx0_descriptor_prep(const struct m0_dtm0_tx_desc *txd,
struct m0_dtx0_descriptor *descriptor)
{
static int i = 0;

/*
* TODO: Check if txd is valid or not. If txd is not valid then
* just fill the descriptor with some random data.
* (txd is not valid == it was zeroed).
*/
descriptor->dtd_id.dti_originator_sdev_fid.f_key = i++;
descriptor->dtd_id.dti_originator_sdev_fid.f_container = 1;
descriptor->dtd_id.dti_timestamp = m0_time_now();
M0_LOG(M0_DEBUG, "DTX id: " DTID1_F, DTID1_P(&descriptor->dtd_id));

/**
* @todo update all participants of this request in the descriptor,
* until then originator only is added as a participant.
*/
if (txd->dtd_ps.dtp_nr == 0) {
descriptor->dtd_participants.dtpa_participants_nr = 1;
M0_ALLOC_ARR(descriptor->dtd_participants.dtpa_participants, 1);
if (descriptor->dtd_participants.dtpa_participants == NULL)
return -ENOMEM;
descriptor->dtd_participants.dtpa_participants[0] =
descriptor->dtd_id.dti_originator_sdev_fid;
return 0;
}

descriptor->dtd_participants.dtpa_participants_nr = txd->dtd_ps.dtp_nr;
M0_ALLOC_ARR(descriptor->dtd_participants.dtpa_participants,
txd->dtd_ps.dtp_nr);
if (descriptor->dtd_participants.dtpa_participants == NULL)
return -ENOMEM;
for (i = 0; i < txd->dtd_ps.dtp_nr; i++)
descriptor->dtd_participants.dtpa_participants[i] =
txd->dtd_ps.dtp_pa->p_fid;
return 0;
}

M0_INTERNAL int m0_cas_put(struct m0_cas_req *req,
struct m0_cas_id *index,
const struct m0_bufvec *keys,
Expand Down Expand Up @@ -1683,6 +1724,12 @@ M0_INTERNAL int m0_cas_put(struct m0_cas_req *req,
rc = m0_dtx0_txd_copy(dtx, &op->cg_txd);
if (rc != 0)
return M0_ERR(rc);

rc = m0_dtm0_tx_desc2dtx0_descriptor_prep(&op->cg_txd,
&op->cg_descriptor);
if (rc != 0)
return M0_ERR(rc);

rc = creq_fop_create_and_prepare(req, &cas_put_fopt, op, &next_state);
if (rc == 0) {
cas_fop_send(req);
Expand Down
1 change: 1 addition & 0 deletions cas/index_gc.c
Original file line number Diff line number Diff line change
Expand Up @@ -490,6 +490,7 @@ static void cgc_start_fom(struct m0_fom *fom0, struct m0_fop *fop)
m0_fom_init(fom0, &fop->f_type->ft_fom_type,
&cgc_fom_ops, fop, NULL, fom->cg_reqh);
fom0->fo_local = true;
fom0->fo_local_update = true;
fom->cg_ctg_op_initialized = false;
m0_long_lock_link_init(&fom->cg_dead_index, fom0,
&fom->cg_dead_index_addb2);
Expand Down
Loading

0 comments on commit 51776f7

Please sign in to comment.