Skip to content

Commit

Permalink
Handle signals (INT,HUP) more appropriately in parallel processing mode:
Browse files Browse the repository at this point in the history
raise a flag that signals the worker threads they should terminate
  • Loading branch information
RJVB committed Jun 6, 2015
1 parent 5976a1d commit 0170dc7
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 18 deletions.
22 changes: 18 additions & 4 deletions ParallelProcess.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -133,15 +133,22 @@ int ParallelFileProcessor::run()
}
if( allDoneEvent ){
DWORD waitResult = ~WAIT_OBJECT_0;
while( nJobs >= 1 && size() > 0 && waitResult != WAIT_OBJECT_0 ){
while( nJobs >= 1 && !quitRequested() && size() > 0 && waitResult != WAIT_OBJECT_0 ){
waitResult = WaitForSingleObject( allDoneEvent, 2000 );
if( nJobs ){
double perc = 100.0 * nProcessed / N;
if( perc >= prevPerc + 10 ){
fprintf( stderr, "%s %d%%", (prevPerc > 0)? " .." : "", int(perc + 0.5) );
fflush(stderr);
prevPerc = perc;
}
}
if( quitRequested() && !threadPool.empty() ){
// the WaitForSingleObject() call above was interrupted by the signal that
// led to quitRequested() being set and as a result the workers haven't yet
// had the chance to exit cleanly. Give them that chance now.
waitResult = WaitForSingleObject( allDoneEvent, 2000 );
}
}
fputc( '\n', stderr );
CloseHandle(allDoneEvent);
Expand Down Expand Up @@ -185,9 +192,9 @@ DWORD FileProcessor::Run(LPVOID arg)
if( PP ){
FileEntry entry;
nProcessed = 0;
while( PP->getFront(entry) ){
while( !PP->quitRequested() && PP->getFront(entry) ){
// create a scoped lock without closing it immediately
CRITSECTLOCK::Scope scp(PP->ioLock, 0);
// scp.verbose = true;
scope = &scp;
entry.compress(this);
_InterlockedIncrement(&PP->nProcessed);
Expand Down Expand Up @@ -268,8 +275,15 @@ bool unLockParallelProcessorIO(FileProcessor *worker)

int runParallelProcessor(ParallelFileProcessor *p)
{ int ret = -1;
if(p){
if( p ){
ret = p->run();
}
return ret;
}

void stopParallelProcessor(ParallelFileProcessor *p)
{
if( p ){
p->setQuitRequested(true);
}
}
1 change: 1 addition & 0 deletions ParallelProcess.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ bool lockParallelProcessorIO(FileProcessor *worker);
// unlock the ioLock if it was previously locked by a call to lockParallelProcessorIO()
bool unLockParallelProcessorIO(FileProcessor *worker);
int runParallelProcessor(ParallelFileProcessor *p);
void stopParallelProcessor(ParallelFileProcessor *p);

#ifdef __cplusplus
}
Expand Down
11 changes: 11 additions & 0 deletions ParallelProcess_p.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ class ParallelProcessor
{
listLock = new CRITSECTLOCK(4000);
threadLock = new CRITSECTLOCK(4000);
quitRequestedFlag = false;
}
virtual ~ParallelProcessor()
{
Expand Down Expand Up @@ -68,9 +69,19 @@ class ParallelProcessor
}
return ret;
}
bool quitRequested()
{
return quitRequestedFlag;
}
bool setQuitRequested(bool val)
{ bool ret = quitRequestedFlag;
quitRequestedFlag = val;
return ret;
}
protected:
ItemQueue itemList;
CRITSECTLOCK *listLock, *threadLock;
bool quitRequestedFlag;
};

typedef struct folder_info FolderInfo;
Expand Down
60 changes: 46 additions & 14 deletions afsctool.c
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
#include "afsctool.h"
#ifdef SUPPORT_PARALLEL
# include "ParallelProcess.h"
static ParallelFileProcessor *PP = NULL;
static bool exclusive_io = true;
static ParallelFileProcessor *PP = NULL;
static bool exclusive_io = true;
#endif

const char *sizeunit10_short[] = {"KB", "MB", "GB", "TB", "PB", "EB"};
Expand Down Expand Up @@ -73,6 +73,16 @@ char* getSizeStr(long long int size, long long int size_rounded)

#define xfree(x) if((x)){free((x)); (x)=NULL;}

static bool quitRequested = FALSE;

static void signal_handler(int sig)
{
fprintf( stderr, "Received signal %d: afsctool will quit\n", sig );
#ifdef SUPPORT_PARALLEL
stopParallelProcessor(PP);
#endif
}

#if SUPPORT_PARALLEL
void compressFile(const char *inFile, struct stat *inFileInfo, struct folder_info *folderinfo, void *worker )
#else
Expand All @@ -98,6 +108,11 @@ void compressFile(const char *inFile, struct stat *inFileInfo, struct folder_inf
struct timeval times[2];
char *backupName = NULL;

if (quitRequested)
{
return;
}

times[0].tv_sec = inFileInfo->st_atimespec.tv_sec;
times[0].tv_usec = inFileInfo->st_atimespec.tv_nsec / 1000;
times[1].tv_sec = inFileInfo->st_mtimespec.tv_sec;
Expand Down Expand Up @@ -316,13 +331,13 @@ void compressFile(const char *inFile, struct stat *inFileInfo, struct folder_inf
goto bail;
}

signal(SIGINT, SIG_IGN);
signal(SIGHUP, SIG_IGN);

#ifdef SUPPORT_PARALLEL
if( exclusive_io && worker ){
locked = lockParallelProcessorIO(worker);
}
#else
signal(SIGINT, SIG_IGN);
signal(SIGHUP, SIG_IGN);
#endif
in = fopen(inFile, "w");
if (in == NULL)
Expand Down Expand Up @@ -438,8 +453,10 @@ void compressFile(const char *inFile, struct stat *inFileInfo, struct folder_inf
free(backupName);
backupName = NULL;
}
#ifndef SUPPORT_PARALLEL
signal(SIGINT, SIG_DFL);
signal(SIGHUP, SIG_DFL);
signal(SIGHUP, SIG_DFL);
#endif
xfree(inBuf);
xfree(outBuf);
xfree(outdecmpfsBuf);
Expand All @@ -458,6 +475,11 @@ void decompressFile(const char *inFile, struct stat *inFileInfo)
ssize_t xattrnamesize, indecmpfsLen = 0, inRFLen = 0, getxattrret, RFpos = 0;
struct timeval times[2];

if (quitRequested)
{
return;
}

times[0].tv_sec = inFileInfo->st_atimespec.tv_sec;
times[0].tv_usec = inFileInfo->st_atimespec.tv_nsec / 1000;
times[1].tv_sec = inFileInfo->st_mtimespec.tv_sec;
Expand Down Expand Up @@ -1273,6 +1295,11 @@ void process_file(const char *filepath, const char *filetype, struct stat *filei
struct filetype_info *filetypeinfo = NULL;
bool filetype_found = FALSE;

if (quitRequested)
{
return;
}

xattrnamesize = listxattr(filepath, NULL, 0, XATTR_SHOWCOMPRESSION | XATTR_NOFOLLOW);

if (xattrnamesize > 0)
Expand Down Expand Up @@ -1447,8 +1474,9 @@ void process_folder(FTS *currfolder, struct folder_info *folderinfo)

do
{
if ((volume_search || strncasecmp("/Volumes/", currfile->fts_path, 9) != 0 || strlen(currfile->fts_path) < 9) &&
(strncasecmp("/dev/", currfile->fts_path, 5) != 0 || strlen(currfile->fts_path) < 5))
if (!quitRequested
&& (volume_search || strncasecmp("/Volumes/", currfile->fts_path, 9) != 0 || strlen(currfile->fts_path) < 9)
&& (strncasecmp("/dev/", currfile->fts_path, 5) != 0 || strlen(currfile->fts_path) < 5))
{
if (S_ISDIR(currfile->fts_statp->st_mode) && currfile->fts_ino != 2)
{
Expand Down Expand Up @@ -1580,7 +1608,7 @@ void process_folder(FTS *currfolder, struct folder_info *folderinfo)
}
else
fts_set(currfolder, currfile, FTS_SKIP);
} while ((currfile = fts_read(currfolder)) != NULL);
} while (!quitRequested && (currfile = fts_read(currfolder)) != NULL);
checkForHardLink(NULL, NULL, NULL);
fts_close(currfolder);
}
Expand Down Expand Up @@ -1865,6 +1893,7 @@ next_arg:;
}
#endif

// ignore signals due to exceeding CPU or file size limits
signal(SIGXCPU, SIG_IGN);
signal(SIGXFSZ, SIG_IGN);

Expand Down Expand Up @@ -2439,11 +2468,14 @@ next_arg:;
free(folderinfo.filetypeslist);

#ifdef SUPPORT_PARALLEL
if (PP)
{
fprintf( stderr, "Processed %d entries\n", runParallelProcessor(PP) );
releaseParallelProcessor(PP);
}
if (PP)
{
signal(SIGINT, signal_handler);
signal(SIGHUP, signal_handler);
fprintf( stderr, "Starting %d worker threads to process queue\n", nJobs );
fprintf( stderr, "Processed %d entries\n", runParallelProcessor(PP) );
releaseParallelProcessor(PP);
}
#endif
return 0;
}

0 comments on commit 0170dc7

Please sign in to comment.