Skip to content

Commit

Permalink
Improve code and UT
Browse files Browse the repository at this point in the history
  • Loading branch information
liuh-80 committed Sep 27, 2023
1 parent 12fda22 commit 46c0ba9
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 13 deletions.
14 changes: 7 additions & 7 deletions common/asyncdbupdater.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,15 +63,13 @@ void AsyncDBUpdater::dbUpdateThread()

while (m_runThread)
{
m_dbUpdateDataNotifyCv.wait(cvLock);

size_t count;
count = queueSize();
if (count == 0)
{
count = queueSize();
if (!count)
{
continue;
}
// when queue is empty, wait notification, when data come, continue to check queue size again
m_dbUpdateDataNotifyCv.wait(cvLock);
continue;
}

for (size_t ie = 0; ie < count; ie++)
Expand Down Expand Up @@ -101,6 +99,8 @@ void AsyncDBUpdater::dbUpdateThread()
}
}
}

SWSS_LOG_DEBUG("AsyncDBUpdater dbUpdateThread end: %s", m_tableName.c_str());
}

size_t AsyncDBUpdater::queueSize()
Expand Down
26 changes: 20 additions & 6 deletions tests/zmq_state_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -109,12 +109,21 @@ static void producerWorker(string tableName, string endpoint, bool dbPersistence
p.del(keys);
}

// wait all data been received by consumer and all persist data write to redis
while (!allDataReceived && (p.dbUpdaterQueueSize() > 0))
// wait all data been received by consumer
while (!allDataReceived)
{
sleep(1);
}

if (dbPersistence)
{
// wait all persist data write to redis
while (p.dbUpdaterQueueSize() > 0)
{
sleep(1);
}
}

cout << "Producer thread ended: " << tableName << endl;
}

Expand Down Expand Up @@ -180,13 +189,17 @@ static void consumerWorker(string tableName, string endpoint, bool dbPersistence
}
}

// wait all persist data write to redis
while (c.dbUpdaterQueueSize() > 0)
allDataReceived = true;

if (dbPersistence)
{
sleep(1);
// wait all persist data write to redis
while (c.dbUpdaterQueueSize() > 0)
{
sleep(1);
}
}

allDataReceived = true;
cout << "Consumer thread ended: " << tableName << endl;
}

Expand All @@ -203,6 +216,7 @@ static void testMethod(bool producerPersistence)
delCount = 0;
batchSetCount = 0;
batchDelCount = 0;
allDataReceived = false;

// start consumer first, SHM can only have 1 consumer per table.
thread *consumerThread = new thread(consumerWorker, testTableName, pullEndpoint, !producerPersistence);
Expand Down

0 comments on commit 46c0ba9

Please sign in to comment.