Skip to content

Commit

Permalink
Remove failure from initial do_capture read
Browse files Browse the repository at this point in the history
  • Loading branch information
zbud-msft committed Aug 27, 2024
1 parent dc2e7bc commit 3d5f62d
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 28 deletions.
15 changes: 4 additions & 11 deletions src/sonic-eventd/src/eventd.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -419,7 +419,7 @@ capture_service::do_capture()
zmq_msg_close(&msg);
} else if (rc > 1) { // If there are events already published to XSUB when XSUB connects to XPUB, we can receive events before subscription message
string event_source((const char*)zmq_msg_data(&msg), zmq_msg_size(&msg));
SWSS_LOG_INFO("Receiving event from source: %s, will read second part of event", event_source.c_str());
SWSS_LOG_DEBUG("Receiving event from source: %s, will read second part of event", event_source.c_str());
zmq_msg_close(&msg);
int more = 0;
size_t more_size = sizeof(more);
Expand All @@ -431,7 +431,7 @@ capture_service::do_capture()
rc = zmq_msg_recv(&msg_part, cap_sub_sock, 0);
if(rc > 0) {
string event_data((const char*)zmq_msg_data(&msg_part),zmq_msg_size(&msg_part));
SWSS_LOG_INFO("Received second part of event: %s", event_data.c_str());
SWSS_LOG_DEBUG("Received second part of event: %s", event_data.c_str());
zmq_msg_close(&msg_part);
internal_event_t event;
if(deserialize(event_data, event) == 0) {
Expand All @@ -442,25 +442,18 @@ capture_service::do_capture()
m_pre_exist_id[rid] = seq;
m_events.push_back(event_data);
}
rc = 1;
} else {
SWSS_LOG_ERROR("Unable to deserialize first event");
rc = -1;
SWSS_LOG_DEBUG("Unable to deserialize first event");
}
} else {
SWSS_LOG_ERROR("Unable to read second part of first event, rc=%d", rc);
SWSS_LOG_DEBUG("Unable to read second part of first event, rc=%d", rc);
zmq_msg_close(&msg_part);
rc = -1;
}
} else { // No second part to read
rc = 1;
}
} else {
zmq_msg_close(&msg);
SWSS_LOG_ERROR("Error reading from ZMQ socket, rc=%d", rc);
}

RET_ON_ERR(rc == 1, "Failed to read subscription message when XSUB connects to XPUB");
init_done = true;
}

Expand Down
23 changes: 6 additions & 17 deletions src/sonic-eventd/tests/eventd_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -150,9 +150,8 @@ static const test_data_t ldata[] = {
},
};


void run_cap(void *zctx, bool &term, string &read_source,
int &cnt, bool &should_read_control)
int &cnt)
{
void *mock_cap = zmq_socket (zctx, ZMQ_SUB);
string source;
Expand All @@ -165,11 +164,10 @@ void run_cap(void *zctx, bool &term, string &read_source,
EXPECT_EQ(0, zmq_setsockopt(mock_cap, ZMQ_SUBSCRIBE, "", 0));
EXPECT_EQ(0, zmq_setsockopt(mock_cap, ZMQ_RCVTIMEO, &block_ms, sizeof (block_ms)));

if(should_read_control) {
zmq_msg_t msg;
zmq_msg_init(&msg);
EXPECT_NE(1, zmq_msg_recv(&msg, mock_cap, 0)); // Subscription message should be read by do_capture
}
zmq_msg_t msg;
zmq_msg_init(&msg);
int rc = zmq_msg_recv(&msg, mock_cap, 0);
EXPECT_EQ(1, rc); // read control character

while(!term) {
string source;
Expand Down Expand Up @@ -228,7 +226,6 @@ void run_pub(void *mock_pub, const string wr_source, internal_events_lst_t &lst)
TEST(eventd, proxy)
{
printf("Proxy TEST started\n");
bool should_read_control = false;
bool term_sub = false;
bool term_cap = false;
string rd_csource, rd_source, wr_source("hello");
Expand All @@ -246,7 +243,7 @@ TEST(eventd, proxy)
EXPECT_EQ(0, pxy->init());

/* capture in a thread */
thread thrc(&run_cap, zctx, ref(term_cap), ref(rd_csource), ref(rd_cevts_sz), ref(should_read_control));
thread thrc(&run_cap, zctx, ref(term_cap), ref(rd_csource), ref(rd_cevts_sz));

/* subscriber in a thread */
thread thr(&run_sub, zctx, ref(term_sub), ref(rd_source), ref(rd_evts), ref(rd_evts_sz));
Expand Down Expand Up @@ -283,17 +280,9 @@ TEST(eventd, proxy)

zmq_close(mock_pub);

/* Do control test */

should_read_control = true;

/* capture in a thread */
thread thrcc(&run_cap, zctx, ref(term_cap), ref(rd_csource), ref(rd_cevts_sz), ref(should_read_control));

delete pxy;
pxy = NULL;

thrcc.join();
zmq_ctx_term(zctx);

/* Provide time for async proxy removal to complete */
Expand Down

0 comments on commit 3d5f62d

Please sign in to comment.