Skip to content

Commit

Permalink
Fixes Wdt#80 - Task#8474686
Browse files Browse the repository at this point in the history
Summary: * Don't overwrite by default
* really make sure we either create or open a file
* when create we fail unless overwrite is set and truncate it otherwise
* fix the timing of open and seek to count all of them not just successes
* perf record directory creation too
* openAndSetSize now private as it's risky (for retries)
* fixed perf reporting bug of missing \n when there is only 1 data point
* improved logging of option_type
* made resumption imply overwrite
* fixed the profiler test/script
* make sure failure in shell does print what fails

Reviewed By: @uddipta

Differential Revision: D2470377
  • Loading branch information
ldemailly committed Sep 25, 2015
1 parent 1607bd3 commit 5eaaf32
Show file tree
Hide file tree
Showing 16 changed files with 216 additions and 65 deletions.
5 changes: 4 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ cmake_minimum_required(VERSION 3.2)
# There is no C per se in WDT but if you use CXX only here many checks fail
# Version is Major.Minor.YYMMDDX for up to 10 releases per day
# Minor currently is also the protocol version - has to match with Protocol.cpp
project("WDT" LANGUAGES C CXX VERSION 1.20.1509240)
project("WDT" LANGUAGES C CXX VERSION 1.20.1509241)

# On MacOS this requires the latest (master) CMake (and/or CMake 3.1.1/3.2)
set(CMAKE_CXX_STANDARD 11)
Expand Down Expand Up @@ -297,4 +297,7 @@ if (BUILD_TESTING)
add_test(NAME WdtFileListTest COMMAND
"${CMAKE_CURRENT_SOURCE_DIR}/wdt_file_list_test.py")

add_test(NAME WdtOverwriteTest COMMAND
"${CMAKE_CURRENT_SOURCE_DIR}/wdt_overwrite_test.py")

endif(BUILD_TESTING)
79 changes: 65 additions & 14 deletions FileCreator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,14 @@ bool FileCreator::setFileSize(int fd, int64_t fileSize) {

int FileCreator::openAndSetSize(BlockDetails const *blockDetails) {
const auto &options = WdtOptions::get();
int fd = createFile(blockDetails->fileName);
int fd;
const bool doCreate = blockDetails->allocationStatus == NOT_EXISTS;
const bool isTooLarge = (blockDetails->allocationStatus == EXISTS_TOO_LARGE);
if (doCreate) {
fd = createFile(blockDetails->fileName);
} else {
fd = openExistingFile(blockDetails->fileName);
}
if (fd < 0) {
return -1;
}
Expand All @@ -67,14 +74,13 @@ int FileCreator::openAndSetSize(BlockDetails const *blockDetails) {
return -1;
}
if (options.isLogBasedResumption()) {
if (blockDetails->allocationStatus == EXISTS_TOO_LARGE) {
if (isTooLarge) {
LOG(WARNING) << "File size smaller in the sender side "
<< blockDetails->fileName
<< ", marking previous transferred chunks as invalid";
transferLogManager_.addInvalidationEntry(blockDetails->prevSeqId);
}
if (blockDetails->allocationStatus == EXISTS_TOO_LARGE ||
blockDetails->allocationStatus == NOT_EXISTS) {
if (isTooLarge || doCreate) {
transferLogManager_.addFileCreationEntry(
blockDetails->fileName, blockDetails->seqId, blockDetails->fileSize);
} else {
Expand Down Expand Up @@ -147,11 +153,33 @@ int FileCreator::openForBlocks(int threadIndex,
return -1;
}
}
return createFile(blockDetails->fileName);
return openExistingFile(blockDetails->fileName);
}

using std::string;

int FileCreator::openExistingFile(const string &relPathStr) {
// This should have been validated earlier and errored out
// instead of crashing here
WDT_CHECK(!relPathStr.empty());
WDT_CHECK(relPathStr[0] != '/');
WDT_CHECK(relPathStr.back() != '/');

string path(rootDir_);
path.append(relPathStr);

int openFlags = O_WRONLY;
START_PERF_TIMER
int res = open(path.c_str(), openFlags, 0644);
RECORD_PERF_RESULT(PerfStatReport::FILE_OPEN)
if (res < 0) {
PLOG(ERROR) << "failed opening file " << path;
return -1;
}
VLOG(1) << "successfully opened file " << path;
return res;
}

int FileCreator::createFile(const string &relPathStr) {
CHECK(!relPathStr.empty());
CHECK(relPathStr[0] != '/');
Expand All @@ -167,39 +195,62 @@ int FileCreator::createFile(const string &relPathStr) {
std::string dir;
if (p) {
dir.assign(relPathStr.data(), p);
if (!createDirRecursively(dir)) {
START_PERF_TIMER
const bool dirSuccess1 = createDirRecursively(dir);
RECORD_PERF_RESULT(PerfStatReport::DIRECTORY_CREATE)
if (!dirSuccess1) {
// retry with force
LOG(ERROR) << "failed to create dir " << dir << " recursively, "
<< "trying to force directory creation";
if (!createDirRecursively(dir, true /* force */)) {
START_PERF_TIMER
const bool dirSuccess2 = createDirRecursively(dir, true /* force */);
RECORD_PERF_RESULT(PerfStatReport::DIRECTORY_CREATE)
if (!dirSuccess2) {
LOG(ERROR) << "failed to create dir " << dir << " recursively";
return -1;
}
}
}
int openFlags = O_CREAT | O_WRONLY;
auto &options = WdtOptions::get();
// When doing download resumption we sometime open files that do already
// exist and we need to overwrite them anyway (files which have been
// discarded from the log for some reason)
if (options.overwrite || options.enable_download_resumption) {
// Make sure file size resumption will not get messed up if we
// expect to create this file
openFlags |= O_TRUNC;
} else {
// Make sure open will fail if we don't allow overwriting and
// the file happens to already exist
openFlags |= O_EXCL;
}
START_PERF_TIMER
int res = open(path.c_str(), openFlags, 0644);
RECORD_PERF_RESULT(PerfStatReport::FILE_OPEN)
if (res < 0) {
if (dir.empty()) {
PLOG(ERROR) << "failed creating file " << path;
return -1;
}
PLOG(ERROR) << "failed creating file " << path << ", trying to "
<< "force directory creation";
if (!createDirRecursively(dir, true /* force */)) {
LOG(ERROR) << "failed to create dir " << dir << " recursively";
return -1;
{
START_PERF_TIMER
const bool dirSuccess = createDirRecursively(dir, true /* force */);
RECORD_PERF_RESULT(PerfStatReport::DIRECTORY_CREATE)
if (!dirSuccess) {
LOG(ERROR) << "failed to create dir " << dir << " recursively";
return -1;
}
}
START_PERF_TIMER
res = open(path.c_str(), openFlags, 0644);
RECORD_PERF_RESULT(PerfStatReport::FILE_OPEN)
if (res < 0) {
PLOG(ERROR) << "failed creating file " << path;
return -1;
}
RECORD_PERF_RESULT(PerfStatReport::FILE_OPEN)
} else {
RECORD_PERF_RESULT(PerfStatReport::FILE_OPEN)
}
VLOG(1) << "successfully created file " << path;
return res;
Expand All @@ -210,7 +261,7 @@ bool FileCreator::createDirRecursively(const std::string dir, bool force) {
return true;
}

CHECK(dir.back() == '/');
WDT_CHECK(dir.back() == '/');

int64_t lastIndex = dir.size() - 1;
while (lastIndex > 0 && dir[lastIndex - 1] != '/') {
Expand Down
29 changes: 17 additions & 12 deletions FileCreator.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,17 +57,6 @@ class FileCreator {
delete[] threadConditionVariables_;
}

/**
* Opens the file and sets its size. If the existing file size is greater than
* required size, the file is truncated using ftruncate. Space is
* allocated using posix_fallocate.
*
* @param blockDetails block-details
*
* @return file descriptor in case of success, -1 otherwise
*/
int openAndSetSize(BlockDetails const *blockDetails);

/**
* This is used to open the file in block mode. If the current thread is the
* first one to try to open the file, then it allocates space using
Expand All @@ -94,18 +83,34 @@ class FileCreator {
}

private:
/**
* Opens the file and sets its size. If the existing file size is greater than
* required size, the file is truncated using ftruncate. Space is
* allocated using posix_fallocate.
*
* @param blockDetails block-details
*
* @return file descriptor in case of success, -1 otherwise
*/
int openAndSetSize(BlockDetails const *blockDetails);

/**
* Create a file and open for writing, recursively create subdirs.
* Subdirs are only created once due to createdDirs_ cache, but
* if an open fails where we assumed the directory already exists
* based on cache, we try creating the dir and open again before
* failing.
* failing. Will not overwrite existing files unless overwrite option
* is set.
*
* @param relPath path relative to root dir
*
* @return file descriptor or -1 on error
*/
int createFile(const std::string &relPath);
/**
* Open existing file
*/
int openExistingFile(const std::string &relPath);

/**
* sets the size of the file. If the size is greater then the
Expand Down
24 changes: 9 additions & 15 deletions FileWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,21 +24,15 @@ ErrorCode FileWriter::open() {
if (options.skip_writes) {
return OK;
}
if (blockDetails_->fileSize == blockDetails_->dataSize) {
// single block file
WDT_CHECK(blockDetails_->offset == 0);
fd_ = fileCreator_->openAndSetSize(blockDetails_);
} else {
// multi block file
fd_ = fileCreator_->openForBlocks(threadIndex_, blockDetails_);
if (fd_ >= 0 && blockDetails_->offset > 0) {
START_PERF_TIMER
if (lseek(fd_, blockDetails_->offset, SEEK_SET) < 0) {
PLOG(ERROR) << "Unable to seek " << blockDetails_->fileName;
close();
} else {
RECORD_PERF_RESULT(PerfStatReport::FILE_SEEK)
}
// TODO: consider a working optimization for small files
fd_ = fileCreator_->openForBlocks(threadIndex_, blockDetails_);
if (fd_ >= 0 && blockDetails_->offset > 0) {
START_PERF_TIMER
const int ret = lseek(fd_, blockDetails_->offset, SEEK_SET);
RECORD_PERF_RESULT(PerfStatReport::FILE_SEEK);
if (ret < 0) {
PLOG(ERROR) << "Unable to seek " << blockDetails_->fileName;
close();
}
}
if (fd_ == -1) {
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ Make sure to do the following, before "arc diff":
fbconfig --clang --with-project-version clang:dev -r wdt
fbmake runtests --extended-tests
fbmake runtests --run-disabled --extended-tests
fbmake runtests_opt
fbmake opt
Expand Down
13 changes: 8 additions & 5 deletions Reporting.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -246,9 +246,12 @@ void ProgressReporter::logProgress(int64_t effectiveDataBytes, int progress,
folly::ThreadLocalPtr<PerfStatReport> perfStatReport;

const std::string PerfStatReport::statTypeDescription_[] = {
"Socket Read", "Socket Write", "File Open", "File Close",
"File Read", "File Write", "Sync File Range", "fsync",
"File Seek", "Throttler Sleep", "Receiver Wait Sleep"};
"Socket Read", "Socket Write",
"File Open", "File Close",
"File Read", "File Write",
"Sync File Range", "fsync",
"File Seek", "Throttler Sleep",
"Receiver Wait Sleep", "Directory creation"};

PerfStatReport::PerfStatReport() {
static_assert(
Expand Down Expand Up @@ -363,7 +366,7 @@ std::ostream& operator<<(std::ostream& os, const PerfStatReport& statReport) {
os << "p95 " << time << " ";
}
if (p99Count > runningCount && p99Count <= runningCount + count) {
os << "p99 " << time << "\n";
os << "p99 " << time;
}
runningCount += count;

Expand All @@ -373,7 +376,7 @@ std::ostream& operator<<(std::ostream& os, const PerfStatReport& statReport) {
}
buckets[currentBucketIndex] += count;
}

os << '\n';
for (int i = 0; i < numBuckets; i++) {
if (buckets[i] == 0) {
continue;
Expand Down
1 change: 1 addition & 0 deletions Reporting.h
Original file line number Diff line number Diff line change
Expand Up @@ -476,6 +476,7 @@ class PerfStatReport {
RECEIVER_WAIT_SLEEP, // receiver sleep duration between sending wait cmd to
// sender. A high sum for this suggests threads
// were not properly load balanced
DIRECTORY_CREATE,
END
};

Expand Down
4 changes: 2 additions & 2 deletions WdtBase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ string WdtTransferRequest::generateUrl(bool genFull) const {
wdtUri.setQueryParam(TRANSFER_ID_PARAM, transferId);
wdtUri.setQueryParam(RECEIVER_PROTOCOL_VERSION_PARAM,
folly::to<string>(protocolVersion));
const auto &options = WdtOptions::get();
const auto& options = WdtOptions::get();
if (options.url_backward_compatibility) {
wdtUri.setQueryParam(LEGACY_PROTOCOL_VERSION_PARAM,
folly::to<string>(LEGACY_PROTCOL_VERSION));
Expand All @@ -313,7 +313,7 @@ void WdtTransferRequest::serializePorts(WdtUri& wdtUri) const {
}
prevPort = ports[i];
}
const auto &options = WdtOptions::get();
const auto& options = WdtOptions::get();
if (hasHoles || options.url_backward_compatibility) {
wdtUri.setQueryParam(PORTS_PARAM, getSerializedPortsList());
} else {
Expand Down
4 changes: 2 additions & 2 deletions WdtConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@

#define WDT_VERSION_MAJOR 1
#define WDT_VERSION_MINOR 20
#define WDT_VERSION_BUILD 1509240
#define WDT_VERSION_BUILD 1509241
// Add -fbcode to version str
#define WDT_VERSION_STR "1.20.1509240-fbcode"
#define WDT_VERSION_STR "1.20.1509241-fbcode"
// Tie minor and proto version
#define WDT_PROTOCOL_VERSION WDT_VERSION_MINOR

Expand Down
3 changes: 2 additions & 1 deletion WdtFlags.cpp.inc
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ WDT_OPT(namespace_receiver_limit, int32,
"A value of zero disables limits");

#ifdef WDT_SUPPORTS_ODIRECT
WDT_OPT(odirect_reads, bool,
WDT_OPT(odirect_reads, bool,
"Wdt can read files in O_DIRECT mode, set this flag to true"
" to make sender read all files in O_DIRECT");
#else
Expand All @@ -150,3 +150,4 @@ WDT_OPT(open_files_during_discovery, bool,
"If true, files are opened when they are discovered");
WDT_OPT(url_backward_compatibility, bool,
"If true, we send url that works with older version(<19)");
WDT_OPT(overwrite, bool, "Allow the receiver to overwrite existing files");
28 changes: 21 additions & 7 deletions WdtOptions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,20 @@
namespace facebook {
namespace wdt {

#define CHANGE_IF_NOT_SPECIFIED(option, specifiedOptions, value) \
if (specifiedOptions.find(#option) == specifiedOptions.end()) { \
option = value; \
/**
* Macro to change the default of some flags based on some other flag
* Example of usage:
* if (enable_download_resumption) {
* CHANGE_IF_NOT_SPECIFIED(overwrite, userSpecifiedOptions, true,
* "(download resumption)")
* }
*/
#define CHANGE_IF_NOT_SPECIFIED(option, specifiedOptions, value, msg) \
if (specifiedOptions.find(#option) == specifiedOptions.end()) { \
LOG(INFO) << "Setting " << #option << " to " << value << " " << msg; \
option = value; \
} else { \
LOG(INFO) << "Not overwriting user specified " << #option << " " << msg; \
}

const std::string WdtOptions::FLASH_OPTION_TYPE = "flash";
Expand All @@ -23,10 +34,13 @@ void WdtOptions::modifyOptions(
const std::string& optionType,
const std::set<std::string>& userSpecifiedOptions) {
if (optionType == DISK_OPTION_TYPE) {
CHANGE_IF_NOT_SPECIFIED(num_ports, userSpecifiedOptions, 1)
CHANGE_IF_NOT_SPECIFIED(block_size_mbytes, userSpecifiedOptions, -1)
CHANGE_IF_NOT_SPECIFIED(disable_preallocation, userSpecifiedOptions, true)
CHANGE_IF_NOT_SPECIFIED(resume_using_dir_tree, userSpecifiedOptions, true)
std::string msg("(disk option type)");
CHANGE_IF_NOT_SPECIFIED(num_ports, userSpecifiedOptions, 1, msg)
CHANGE_IF_NOT_SPECIFIED(block_size_mbytes, userSpecifiedOptions, -1, msg)
CHANGE_IF_NOT_SPECIFIED(disable_preallocation, userSpecifiedOptions, true,
msg)
CHANGE_IF_NOT_SPECIFIED(resume_using_dir_tree, userSpecifiedOptions, true,
msg)
return;
}
if (optionType != FLASH_OPTION_TYPE) {
Expand Down
5 changes: 5 additions & 0 deletions WdtOptions.h
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,11 @@ class WdtOptions {
*/
bool url_backward_compatibility{false};

/**
* If true, wdt can overwrite existing files
*/
bool overwrite{false};

/**
* @return whether files should be pre-allocated or not
*/
Expand Down
Loading

0 comments on commit 5eaaf32

Please sign in to comment.