Skip to content

Commit

Permalink
UCP/CORE/GTEST: Fix possible purge of requests after discarding is in…
Browse files Browse the repository at this point in the history
…-progress after error handling
  • Loading branch information
dmitrygx committed Nov 19, 2021
1 parent 3edb6f3 commit b2a935b
Show file tree
Hide file tree
Showing 2 changed files with 111 additions and 68 deletions.
8 changes: 8 additions & 0 deletions src/ucp/core/ucp_ep.c
Original file line number Diff line number Diff line change
Expand Up @@ -1196,6 +1196,14 @@ static void ucp_ep_discard_lanes(ucp_ep_h ep, ucs_status_t discard_status)
ucp_lane_index_t lane;
uct_ep_h uct_ep;

if (ep->flags & UCP_EP_FLAG_FAILED) {
/* Avoid calling ucp_ep_discard_lanes_callback() that will purge UCP
* endpoint's requests, if we already started discard and purge process
* this endpoint. Doing so could complete send requests before UCT lanes
* using them are flushed and destroyed. */
return;
}

discard_arg = ucs_malloc(sizeof(*discard_arg), "discard_lanes_arg");
if (discard_arg == NULL) {
ucs_error("ep %p: failed to allocate memory for discarding lanes"
Expand Down
171 changes: 103 additions & 68 deletions test/gtest/ucp/test_ucp_sockaddr.cc
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,20 @@ class test_ucp_sockaddr : public ucp_test {
&params);
}

void* recv(entity& to, void *contig_buffer, size_t length,
ucp_tag_message_h message, ucp_tag_recv_nbx_callback_t cb,
void *user_data)
{
ucp_request_param_t params = {};

params.op_attr_mask = UCP_OP_ATTR_FIELD_CALLBACK |
UCP_OP_ATTR_FIELD_USER_DATA;
params.user_data = user_data;
params.cb.recv = cb;
return ucp_tag_msg_recv_nbx(to.worker(), contig_buffer, length,
message, &params);
}

void* recv(entity& to, void *contig_buffer, size_t length,
ucp_stream_recv_nbx_callback_t cb, void *user_data)
{
Expand Down Expand Up @@ -770,6 +784,89 @@ class test_ucp_sockaddr : public ucp_test {
return false;
}

void do_force_close_during_rndv(bool fail_send_ep)
{
constexpr size_t length = 4 * UCS_KBYTE;

listen_and_communicate(false, SEND_DIRECTION_BIDI);

mem_buffer send_buffer(length, UCS_MEMORY_TYPE_HOST);
send_buffer.pattern_fill(1, length);
void *sreq = send(sender(), send_buffer.ptr(), length,
send_recv_type(), scomplete_reset_data_cbx,
reinterpret_cast<void*>(&send_buffer));

ucp_ep_h ep = sender().revoke_ep();

// Wait for the TAG RNDV/RTS packet sent and the request scheduled to
// be tracked until RNDV/ATS packet is not received from a peer
ucp_tag_message_h message;
ucp_tag_recv_info_t info;
message = message_wait(receiver(), 0, 0, &info);
ASSERT_NE((void*)NULL, message);
ASSERT_EQ(UCS_INPROGRESS, ucp_request_check_status(sreq));

std::map<uct_iface_h, uct_iface_ops_t> sender_uct_ops;

// Prevent destroying UCT endpoints from discarding to not detect error
// by the receiver earlier than data could invalidate by the sender
for (auto lane = 0; lane < ucp_ep_num_lanes(ep); ++lane) {
if (lane == ucp_ep_get_cm_lane(ep)) {
continue;
}

uct_iface_h uct_iface = ep->uct_eps[lane]->iface;
auto res = sender_uct_ops.emplace(uct_iface, uct_iface->ops);
if (res.second) {
uct_iface->ops.ep_flush =
reinterpret_cast<uct_ep_flush_func_t>(
ucs_empty_function_return_no_resource);
uct_iface->ops.ep_pending_add =
reinterpret_cast<uct_ep_pending_add_func_t>(
ucs_empty_function_return_busy);
}
}

if (fail_send_ep) {
UCS_ASYNC_BLOCK(&ep->worker->async);
ucp_ep_set_failed(ep, UCP_NULL_LANE, UCS_ERR_CONNECTION_RESET);
UCS_ASYNC_UNBLOCK(&ep->worker->async);
}
void *close_req = ucp_ep_close_nb(ep, UCP_EP_CLOSE_MODE_FORCE);

// Do some progress of sender's worker to check that it doesn't
// complete UCP requests prior closing UCT endpoint from discarding
request_progress(sreq, { &sender() }, 0.5);

// Restore UCT endpoint's flush function to the original one to allow
// complation of descarding
for (auto &elem : sender_uct_ops) {
elem.first->ops.ep_flush = elem.second.ep_flush;
elem.first->ops.ep_pending_add = elem.second.ep_pending_add;
}

mem_buffer recv_buffer(length, UCS_MEMORY_TYPE_HOST);
recv_buffer.pattern_fill(2, length);
void *rreq = recv(receiver(), recv_buffer.ptr(), length,
message, rtag_complete_check_data_cbx,
reinterpret_cast<void*>(&recv_buffer));

if (fail_send_ep) {
// Progress the receiver to try receiving the data sent by sender
ucs_status_t status = request_progress(rreq, { &receiver() });
ASSERT_NE(UCS_INPROGRESS, status);
} else {
request_progress(sreq, { &sender() });
request_progress(rreq, { &receiver() });
}

{
scoped_log_handler slh(wrap_errors_logger);
std::vector<void*> reqs = { sreq, rreq, close_req };
requests_wait(reqs);
}
}

protected:
static unsigned m_err_count;
};
Expand Down Expand Up @@ -965,75 +1062,13 @@ UCS_TEST_P(test_ucp_sockaddr, err_handle_without_err_cb)
UCS_TEST_SKIP_COND_P(test_ucp_sockaddr, force_close_during_rndv,
(send_recv_type() != SEND_RECV_TAG), "RNDV_THRESH=0")
{
constexpr size_t length = 4 * UCS_KBYTE;

listen_and_communicate(false, SEND_DIRECTION_BIDI);

mem_buffer recv_buffer(length, UCS_MEMORY_TYPE_HOST);
recv_buffer.pattern_fill(2, length);
void *rreq = recv(receiver(), recv_buffer.ptr(), length,
rtag_complete_check_data_cbx,
reinterpret_cast<void*>(&recv_buffer));

mem_buffer send_buffer(length, UCS_MEMORY_TYPE_HOST);
send_buffer.pattern_fill(1, length);
void *sreq = send(sender(), send_buffer.ptr(), length,
send_recv_type(), scomplete_reset_data_cbx,
reinterpret_cast<void*>(&send_buffer));

ucp_ep_h ep = sender().revoke_ep();

// Wait for the TAG RNDV/RTS packet sent and the request scheduled to be
// tracked until RNDV/ATS packet is not received from a peer
ucs_time_t deadline = ucs::get_deadline();
do {
ucp_worker_progress(sender().worker());
} while (ucs_hlist_is_empty(&ucp_ep_ext_gen(ep)->proto_reqs) &&
(ucs_get_time() < deadline));
EXPECT_FALSE(ucs_hlist_is_empty(&ucp_ep_ext_gen(ep)->proto_reqs));

std::map<uct_iface_h, uct_iface_ops_t> sender_uct_ops;

// Prevent destroying UCT endpoints from discarding to not detect error by
// the receiver earlier than data could invalidate by the sender
for (auto lane = 0; lane < ucp_ep_num_lanes(ep); ++lane) {
if (lane == ucp_ep_get_cm_lane(ep)) {
continue;
}

uct_iface_h uct_iface = ep->uct_eps[lane]->iface;
auto res = sender_uct_ops.emplace(uct_iface, uct_iface->ops);
if (res.second) {
uct_iface->ops.ep_flush =
reinterpret_cast<uct_ep_flush_func_t>(
ucs_empty_function_return_no_resource);
uct_iface->ops.ep_pending_add =
reinterpret_cast<uct_ep_pending_add_func_t>(
ucs_empty_function_return_busy);
}
}

void *close_req = ucp_ep_close_nb(ep, UCP_EP_CLOSE_MODE_FORCE);

// Do some progress of sender's worker to check that it doesn't complete
// UCP requests prior closing UCT endpoint from discarding
request_progress(sreq, { &sender() }, 0.5);

// Restore UCT endpoint's flush function to the original one to allow
// complation of descarding
for (auto &elem : sender_uct_ops) {
elem.first->ops.ep_flush = elem.second.ep_flush;
elem.first->ops.ep_pending_add = elem.second.ep_pending_add;
}

request_progress(sreq, { &sender() });
request_progress(rreq, { &receiver() });
do_force_close_during_rndv(false);
}

{
scoped_log_handler slh(wrap_errors_logger);
std::vector<void*> reqs = { sreq, rreq, close_req };
requests_wait(reqs);
}
UCS_TEST_SKIP_COND_P(test_ucp_sockaddr, fail_and_force_close_during_rndv,
(send_recv_type() != SEND_RECV_TAG), "RNDV_THRESH=0")
{
do_force_close_during_rndv(true);
}

UCS_TEST_SKIP_COND_P(test_ucp_sockaddr, listener_invalid_params,
Expand Down

0 comments on commit b2a935b

Please sign in to comment.