From 0170dc760a1a6f125744c6e50f8cf31a2c3383f5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ren=C3=A9=20J=2EV=2E=20Bertin?= Date: Sun, 7 Jun 2015 01:22:53 +0200 Subject: [PATCH] Handle signals (INT,HUP) more appropriately in parallel processing mode: raise a flag that signals the worker threads they should terminate --- ParallelProcess.cpp | 22 ++++++++++++++--- ParallelProcess.h | 1 + ParallelProcess_p.h | 11 +++++++++ afsctool.c | 60 ++++++++++++++++++++++++++++++++++----------- 4 files changed, 76 insertions(+), 18 deletions(-) diff --git a/ParallelProcess.cpp b/ParallelProcess.cpp index 712c983..679d4e0 100644 --- a/ParallelProcess.cpp +++ b/ParallelProcess.cpp @@ -133,15 +133,22 @@ int ParallelFileProcessor::run() } if( allDoneEvent ){ DWORD waitResult = ~WAIT_OBJECT_0; - while( nJobs >= 1 && size() > 0 && waitResult != WAIT_OBJECT_0 ){ + while( nJobs >= 1 && !quitRequested() && size() > 0 && waitResult != WAIT_OBJECT_0 ){ waitResult = WaitForSingleObject( allDoneEvent, 2000 ); if( nJobs ){ double perc = 100.0 * nProcessed / N; if( perc >= prevPerc + 10 ){ fprintf( stderr, "%s %d%%", (prevPerc > 0)? " .." : "", int(perc + 0.5) ); + fflush(stderr); prevPerc = perc; } } + if( quitRequested() && !threadPool.empty() ){ + // the WaitForSingleObject() call above was interrupted by the signal that + // led to quitRequested() being set and as a result the workers haven't yet + // had the chance to exit cleanly. Give them that chance now. + waitResult = WaitForSingleObject( allDoneEvent, 2000 ); + } } fputc( '\n', stderr ); CloseHandle(allDoneEvent); @@ -185,9 +192,9 @@ DWORD FileProcessor::Run(LPVOID arg) if( PP ){ FileEntry entry; nProcessed = 0; - while( PP->getFront(entry) ){ + while( !PP->quitRequested() && PP->getFront(entry) ){ + // create a scoped lock without closing it immediately CRITSECTLOCK::Scope scp(PP->ioLock, 0); -// scp.verbose = true; scope = &scp; entry.compress(this); _InterlockedIncrement(&PP->nProcessed); @@ -268,8 +275,15 @@ bool unLockParallelProcessorIO(FileProcessor *worker) int runParallelProcessor(ParallelFileProcessor *p) { int ret = -1; - if(p){ + if( p ){ ret = p->run(); } return ret; } + +void stopParallelProcessor(ParallelFileProcessor *p) +{ + if( p ){ + p->setQuitRequested(true); + } +} diff --git a/ParallelProcess.h b/ParallelProcess.h index 41dcd19..5458d45 100644 --- a/ParallelProcess.h +++ b/ParallelProcess.h @@ -31,6 +31,7 @@ bool lockParallelProcessorIO(FileProcessor *worker); // unlock the ioLock if it was previously locked by a call to lockParallelProcessorIO() bool unLockParallelProcessorIO(FileProcessor *worker); int runParallelProcessor(ParallelFileProcessor *p); +void stopParallelProcessor(ParallelFileProcessor *p); #ifdef __cplusplus } diff --git a/ParallelProcess_p.h b/ParallelProcess_p.h index 142acd7..bfdb236 100644 --- a/ParallelProcess_p.h +++ b/ParallelProcess_p.h @@ -32,6 +32,7 @@ class ParallelProcessor { listLock = new CRITSECTLOCK(4000); threadLock = new CRITSECTLOCK(4000); + quitRequestedFlag = false; } virtual ~ParallelProcessor() { @@ -68,9 +69,19 @@ class ParallelProcessor } return ret; } + bool quitRequested() + { + return quitRequestedFlag; + } + bool setQuitRequested(bool val) + { bool ret = quitRequestedFlag; + quitRequestedFlag = val; + return ret; + } protected: ItemQueue itemList; CRITSECTLOCK *listLock, *threadLock; + bool quitRequestedFlag; }; typedef struct folder_info FolderInfo; diff --git a/afsctool.c b/afsctool.c index 7178bdf..f61e164 100644 --- a/afsctool.c +++ b/afsctool.c @@ -21,8 +21,8 @@ #include "afsctool.h" #ifdef SUPPORT_PARALLEL # include "ParallelProcess.h" - static ParallelFileProcessor *PP = NULL; - static bool exclusive_io = true; + static ParallelFileProcessor *PP = NULL; + static bool exclusive_io = true; #endif const char *sizeunit10_short[] = {"KB", "MB", "GB", "TB", "PB", "EB"}; @@ -73,6 +73,16 @@ char* getSizeStr(long long int size, long long int size_rounded) #define xfree(x) if((x)){free((x)); (x)=NULL;} +static bool quitRequested = FALSE; + +static void signal_handler(int sig) +{ + fprintf( stderr, "Received signal %d: afsctool will quit\n", sig ); +#ifdef SUPPORT_PARALLEL + stopParallelProcessor(PP); +#endif +} + #if SUPPORT_PARALLEL void compressFile(const char *inFile, struct stat *inFileInfo, struct folder_info *folderinfo, void *worker ) #else @@ -98,6 +108,11 @@ void compressFile(const char *inFile, struct stat *inFileInfo, struct folder_inf struct timeval times[2]; char *backupName = NULL; + if (quitRequested) + { + return; + } + times[0].tv_sec = inFileInfo->st_atimespec.tv_sec; times[0].tv_usec = inFileInfo->st_atimespec.tv_nsec / 1000; times[1].tv_sec = inFileInfo->st_mtimespec.tv_sec; @@ -316,13 +331,13 @@ void compressFile(const char *inFile, struct stat *inFileInfo, struct folder_inf goto bail; } - signal(SIGINT, SIG_IGN); - signal(SIGHUP, SIG_IGN); - #ifdef SUPPORT_PARALLEL if( exclusive_io && worker ){ locked = lockParallelProcessorIO(worker); } +#else + signal(SIGINT, SIG_IGN); + signal(SIGHUP, SIG_IGN); #endif in = fopen(inFile, "w"); if (in == NULL) @@ -438,8 +453,10 @@ void compressFile(const char *inFile, struct stat *inFileInfo, struct folder_inf free(backupName); backupName = NULL; } +#ifndef SUPPORT_PARALLEL signal(SIGINT, SIG_DFL); -signal(SIGHUP, SIG_DFL); + signal(SIGHUP, SIG_DFL); +#endif xfree(inBuf); xfree(outBuf); xfree(outdecmpfsBuf); @@ -458,6 +475,11 @@ void decompressFile(const char *inFile, struct stat *inFileInfo) ssize_t xattrnamesize, indecmpfsLen = 0, inRFLen = 0, getxattrret, RFpos = 0; struct timeval times[2]; + if (quitRequested) + { + return; + } + times[0].tv_sec = inFileInfo->st_atimespec.tv_sec; times[0].tv_usec = inFileInfo->st_atimespec.tv_nsec / 1000; times[1].tv_sec = inFileInfo->st_mtimespec.tv_sec; @@ -1273,6 +1295,11 @@ void process_file(const char *filepath, const char *filetype, struct stat *filei struct filetype_info *filetypeinfo = NULL; bool filetype_found = FALSE; + if (quitRequested) + { + return; + } + xattrnamesize = listxattr(filepath, NULL, 0, XATTR_SHOWCOMPRESSION | XATTR_NOFOLLOW); if (xattrnamesize > 0) @@ -1447,8 +1474,9 @@ void process_folder(FTS *currfolder, struct folder_info *folderinfo) do { - if ((volume_search || strncasecmp("/Volumes/", currfile->fts_path, 9) != 0 || strlen(currfile->fts_path) < 9) && - (strncasecmp("/dev/", currfile->fts_path, 5) != 0 || strlen(currfile->fts_path) < 5)) + if (!quitRequested + && (volume_search || strncasecmp("/Volumes/", currfile->fts_path, 9) != 0 || strlen(currfile->fts_path) < 9) + && (strncasecmp("/dev/", currfile->fts_path, 5) != 0 || strlen(currfile->fts_path) < 5)) { if (S_ISDIR(currfile->fts_statp->st_mode) && currfile->fts_ino != 2) { @@ -1580,7 +1608,7 @@ void process_folder(FTS *currfolder, struct folder_info *folderinfo) } else fts_set(currfolder, currfile, FTS_SKIP); - } while ((currfile = fts_read(currfolder)) != NULL); + } while (!quitRequested && (currfile = fts_read(currfolder)) != NULL); checkForHardLink(NULL, NULL, NULL); fts_close(currfolder); } @@ -1865,6 +1893,7 @@ next_arg:; } #endif + // ignore signals due to exceeding CPU or file size limits signal(SIGXCPU, SIG_IGN); signal(SIGXFSZ, SIG_IGN); @@ -2439,11 +2468,14 @@ next_arg:; free(folderinfo.filetypeslist); #ifdef SUPPORT_PARALLEL - if (PP) - { - fprintf( stderr, "Processed %d entries\n", runParallelProcessor(PP) ); - releaseParallelProcessor(PP); - } + if (PP) + { + signal(SIGINT, signal_handler); + signal(SIGHUP, signal_handler); + fprintf( stderr, "Starting %d worker threads to process queue\n", nJobs ); + fprintf( stderr, "Processed %d entries\n", runParallelProcessor(PP) ); + releaseParallelProcessor(PP); + } #endif return 0; }