Skip to content

Commit

Permalink
Initial commit
Browse files Browse the repository at this point in the history
  • Loading branch information
zbud-msft committed Aug 23, 2024
1 parent ea2020a commit dc2e7bc
Showing 1 changed file with 53 additions and 2 deletions.
55 changes: 53 additions & 2 deletions src/sonic-eventd/src/eventd.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,7 @@ capture_service::do_capture()
zmq_msg_t msg;
zmq_msg_init(&msg);
int rc = zmq_msg_recv(&msg, cap_sub_sock, 0);
RET_ON_ERR(rc == 1, "Failed to read subscription message when XSUB connects to XPUB");

/*
* When XSUB socket connects to XPUB, a subscription message is sent as a single byte 1.
* When capture service begins to read, the very first message that it will read is this
Expand All @@ -409,8 +409,59 @@ capture_service::do_capture()
*
* This behavior will only happen once when XSUB connects to XPUB not everytime cache is started.
*
* There are chances that there are events already published to XSUB endpoint before XSUB is able to connect to XPUB, so we can receive events
before the subscription message
*/
init_done = true;


if(rc == 1) { // Expected case to receive subscription message as very first message
SWSS_LOG_INFO("Received subscription message when XSUB connects to XPUB");
zmq_msg_close(&msg);
} else if (rc > 1) { // If there are events already published to XSUB when XSUB connects to XPUB, we can receive events before subscription message
string event_source((const char*)zmq_msg_data(&msg), zmq_msg_size(&msg));
SWSS_LOG_INFO("Receiving event from source: %s, will read second part of event", event_source.c_str());
zmq_msg_close(&msg);
int more = 0;
size_t more_size = sizeof(more);
zmq_getsockopt(cap_sub_sock, ZMQ_RCVMORE, &more, &more_size);
// Read multi-part message
if(more) {
zmq_msg_t msg_part;
zmq_msg_init(&msg_part);
rc = zmq_msg_recv(&msg_part, cap_sub_sock, 0);
if(rc > 0) {
string event_data((const char*)zmq_msg_data(&msg_part),zmq_msg_size(&msg_part));
SWSS_LOG_INFO("Received second part of event: %s", event_data.c_str());
zmq_msg_close(&msg_part);
internal_event_t event;
if(deserialize(event_data, event) == 0) {
runtime_id_t rid;
sequence_t seq;

if(validate_event(event, rid, seq)) {
m_pre_exist_id[rid] = seq;
m_events.push_back(event_data);
}
rc = 1;
} else {
SWSS_LOG_ERROR("Unable to deserialize first event");
rc = -1;
}
} else {
SWSS_LOG_ERROR("Unable to read second part of first event, rc=%d", rc);
zmq_msg_close(&msg_part);
rc = -1;
}
} else { // No second part to read
rc = 1;
}
} else {
zmq_msg_close(&msg);
SWSS_LOG_ERROR("Error reading from ZMQ socket, rc=%d", rc);
}

RET_ON_ERR(rc == 1, "Failed to read subscription message when XSUB connects to XPUB");
init_done = true;
}

while (m_ctrl != START_CAPTURE) {
Expand Down

0 comments on commit dc2e7bc

Please sign in to comment.