Skip to content

Commit

Permalink
Merge pull request #94 from sy-c/fix-daemon-1
Browse files Browse the repository at this point in the history
v2.7.3
  • Loading branch information
sy-c authored Sep 30, 2024
2 parents c6a0c39 + 872c03b commit 7e35caf
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 16 deletions.
3 changes: 3 additions & 0 deletions doc/releaseNotes.md
Original file line number Diff line number Diff line change
Expand Up @@ -159,3 +159,6 @@ This file describes the main feature changes for each InfoLogger released versio
- increased default value of rxMaxConnections from 1024 to 2048. This is the number of concurrent clients allowed to connect to o2-infologger-daemon. Can be changed in configuration file.
- o2-infologger-server: improved logging per thread/database.
- o2-infologger-newdb: added option to skip user creation

# v2.7.3 - 30/09/2024
- o2-infologger-daemon: removed limitation of 1024 connections (was because of hard limit in select() system call, replaced now by poll()).
57 changes: 41 additions & 16 deletions src/infoLoggerD.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
#include <fcntl.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <sys/poll.h>

#include <sys/stat.h>
#include <string.h>
Expand Down Expand Up @@ -197,6 +198,7 @@ int checkDirAndCreate(const char* dir, SimpleLog* log = NULL)
typedef struct {
int socket;
std::string buffer; // currently pending data
int pollIx; // index of client in poll structure
} t_clientConnection;

class InfoLoggerD : public Daemon
Expand All @@ -214,6 +216,8 @@ class InfoLoggerD : public Daemon

unsigned long long numberOfMessagesReceived = 0;
std::list<t_clientConnection> clients;
struct pollfd *fds = nullptr; // array for poll()
int nfds = 0; // size of array

TR_client_configuration cfgCx; // config for transport
TR_client_handle hCx = nullptr; // handle to server transport
Expand Down Expand Up @@ -451,6 +455,9 @@ InfoLoggerD::~InfoLoggerD()
for (auto c : clients) {
close(c.socket);
}
if (fds) {
free(fds);
}
if (hCx != nullptr) {
TR_client_stop(hCx);
}
Expand All @@ -462,28 +469,37 @@ Daemon::LoopStatus InfoLoggerD::doLoop()
return LoopStatus::Error;
}

fd_set readfds;
FD_ZERO(&readfds);
FD_SET(rxSocket, &readfds);
int nfds = rxSocket;
for (auto client : clients) {
FD_SET(client.socket, &readfds);
if (client.socket > nfds) {
nfds = client.socket;
bool updateFds = false; // raise this flag to renew structure (change in client list)
if (fds == nullptr) {
// create new poll structure based on current list of clients
size_t sz = sizeof(struct pollfd) * (clients.size() + 1);
fds = (struct pollfd *)malloc(sz);
if (fds == nullptr) {
return LoopStatus::Error;
}
memset(fds, 0, sz);
nfds = 1;
fds[0].fd = rxSocket;
fds[0].events = POLLIN;

for (auto &client : clients) {
if (client.socket<0) continue;
fds[nfds].fd = client.socket;
fds[nfds].events = POLLIN;
fds[nfds].revents = 0;
client.pollIx = nfds; // keep index of fds for this client, to be able to check poll result
nfds++;
}
}
nfds++;

struct timeval selectTimeout;
selectTimeout.tv_sec = 1;
selectTimeout.tv_usec = 0;

if (select(nfds, &readfds, NULL, NULL, &selectTimeout) > 0) {
// poll with 1 second timeout
if (poll(fds, nfds, 1000) > 0) {

// check existing clients
int cleanupNeeded = 0;
for (auto& client : clients) {
if (FD_ISSET(client.socket, &readfds)) {
if (client.socket<0) continue;
if ((client.pollIx >= 0) && (fds[client.pollIx].revents)) {
char newData[1024000]; // never read more than 10k at a time... allows to round-robin through clients
int bytesRead = read(client.socket, newData, sizeof(newData));
if (bytesRead > 0) {
Expand Down Expand Up @@ -544,10 +560,11 @@ Daemon::LoopStatus InfoLoggerD::doLoop()
if (cleanupNeeded) {
clients.remove_if([](t_clientConnection& c) { return (c.socket == -1); });
log.info("%d clients disconnected, now having %d/%d", cleanupNeeded, (int)clients.size(), configInfoLoggerD.rxMaxConnections);
updateFds = 1;
}

// handle new connection requests
if (FD_ISSET(rxSocket, &readfds)) {
if (fds[0].revents) {
struct sockaddr_un socketAddress;
socklen_t socketAddressLen = sizeof(socketAddress);
int tmpSocket = accept(rxSocket, (struct sockaddr*)&socketAddress, &socketAddressLen);
Expand All @@ -574,9 +591,17 @@ Daemon::LoopStatus InfoLoggerD::doLoop()
newClient.buffer.clear();
clients.push_back(newClient);
log.info("New client: %d/%d", (int)clients.size(), configInfoLoggerD.rxMaxConnections);
updateFds = 1;
}
}
}

if (updateFds) {
if (fds != nullptr) {
free(fds);
}
fds = nullptr;
}
}

return LoopStatus::Ok;
Expand Down

0 comments on commit 7e35caf

Please sign in to comment.