Skip to content

Commit

Permalink
Merge pull request #61 from etschannen/release-5.2
Browse files Browse the repository at this point in the history
Release 5.2
  • Loading branch information
etschannen authored Mar 9, 2018
2 parents 2c92ef8 + 28ea983 commit f773b94
Show file tree
Hide file tree
Showing 14 changed files with 104 additions and 56 deletions.
99 changes: 69 additions & 30 deletions fdbclient/BackupContainer.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -564,17 +564,14 @@ class BackupContainerFileSystem : public IBackupContainer {
// Force is required if there is not a restorable snapshot which both
// - begins at or after expireEndVersion
// - ends at or before restorableBeginVersion
bool forceNeeded = true;
state bool forceNeeded = true;
for(KeyspaceSnapshotFile &s : desc.snapshots) {
if(s.restorable.orDefault(false) && s.beginVersion >= expireEndVersion && s.endVersion <= restorableBeginVersion) {
forceNeeded = false;
break;
}
}

if(forceNeeded && !force)
throw backup_cannot_expire();

// Get metadata
state Optional<Version> expiredEnd;
state Optional<Version> logBegin;
Expand Down Expand Up @@ -604,52 +601,94 @@ class BackupContainerFileSystem : public IBackupContainer {
newLogBeginVersion = expireEndVersion;
}
else {
// If the last log overlaps the expiredEnd then use the log's begin version
// If the last log overlaps the expiredEnd then use the log's begin version and move the expiredEnd
// back to match it.
if(last.endVersion > expireEndVersion) {
newLogBeginVersion = last.beginVersion;
logs.pop_back();
expireEndVersion = newLogBeginVersion.get();
}
}
}

// If we have a new log begin version then potentially update the property but definitely set
// expireEndVersion to the new log begin because we will only be deleting files up to but not
// including that version.
if(newLogBeginVersion.present()) {
expireEndVersion = newLogBeginVersion.get();
// If the new version is greater than the existing one or the
// existing one is not present then write the new value
if(logBegin.orDefault(0) < newLogBeginVersion.get()) {
Void _ = wait(bc->logBeginVersion().set(newLogBeginVersion.get()));
}
}
else {
// Otherwise, if the old logBeginVersion is present and older than expireEndVersion then clear it because
// it refers to a version in a range we're about to delete and apparently continuity through
// expireEndVersion is broken.
if(logBegin.present() && logBegin.get() < expireEndVersion)
Void _ = wait(bc->logBeginVersion().clear());
}

// Delete files
state std::vector<Future<Void>> deletes;
// Make a list of files to delete
state std::vector<std::string> toDelete;

// Move filenames out of vector then destroy it to save memory
for(auto const &f : logs) {
deletes.push_back(bc->deleteFile(f.fileName));
toDelete.push_back(std::move(f.fileName));
}
logs.clear();

// Move filenames out of vector then destroy it to save memory
for(auto const &f : ranges) {
// Must recheck version because list returns data up to and including the given endVersion
if(f.version < expireEndVersion)
deletes.push_back(bc->deleteFile(f.fileName));
toDelete.push_back(std::move(f.fileName));
}
ranges.clear();

for(auto const &f : desc.snapshots) {
if(f.endVersion < expireEndVersion)
deletes.push_back(bc->deleteFile(f.fileName));
toDelete.push_back(std::move(f.fileName));
}
desc = BackupDescription();

// If some files to delete were found AND force is needed AND the force option is NOT set, then fail
if(!toDelete.empty() && forceNeeded && !force)
throw backup_cannot_expire();

// We are about to start deleting files, at which point no data prior to the expire end version can be
// safely assumed to exist. The [logBegin, logEnd) range from the container's metadata describes
// a range of log versions which can be assumed to exist, so if the range of data being deleted overlaps
// that range then the metadata range must be updated.

// If we're expiring the entire log range described by the metadata then clear both metadata values
if(logEnd.present() && logEnd.get() < expireEndVersion) {
if(logBegin.present())
Void _ = wait(bc->logBeginVersion().clear());
if(logEnd.present())
Void _ = wait(bc->logEndVersion().clear());
}
else {
// If we are expiring to a point within the metadata range then update the begin if we have a new
// log begin version (which we should!) or clear the metadata range if we do not (which would be
// repairing the metadata from an incorrect state)
if(logBegin.present() && logBegin.get() < expireEndVersion) {
if(newLogBeginVersion.present()) {
Void _ = wait(bc->logBeginVersion().set(newLogBeginVersion.get()));
}
else {
if(logBegin.present())
Void _ = wait(bc->logBeginVersion().clear());
if(logEnd.present())
Void _ = wait(bc->logEndVersion().clear());
}
}
}

// Delete files, but limit parallelism because the file list could use a lot of memory and the corresponding
// delete actor states would use even more if they all existed at the same time.
state std::list<Future<Void>> deleteFutures;

Void _ = wait(waitForAll(deletes));
while(!toDelete.empty() || !deleteFutures.empty()) {

// While there are files to delete and budget in the deleteFutures list, start a delete
while(!toDelete.empty() && deleteFutures.size() < CLIENT_KNOBS->BACKUP_CONCURRENT_DELETES) {
deleteFutures.push_back(bc->deleteFile(toDelete.back()));
toDelete.pop_back();
}

// Wait for deletes to finish until there are only targetDeletesInFlight remaining.
// If there are no files left to start then this value is 0, otherwise it is one less
// than the delete concurrency limit.
state int targetFuturesSize = toDelete.empty() ? 0 : (CLIENT_KNOBS->BACKUP_CONCURRENT_DELETES - 1);

while(deleteFutures.size() > targetFuturesSize) {
Void _ = wait(deleteFutures.front());
deleteFutures.pop_front();
}
}

// Update the expiredEndVersion property.
Void _ = wait(bc->expiredEndVersion().set(expireEndVersion));
Expand Down
1 change: 1 addition & 0 deletions fdbclient/Knobs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ ClientKnobs::ClientKnobs(bool randomize) {
init( TASKBUCKET_MAX_TASK_KEYS, 1000 ); if( randomize && BUGGIFY ) TASKBUCKET_MAX_TASK_KEYS = 20;

//Backup
init( BACKUP_CONCURRENT_DELETES, 100 );
init( BACKUP_SIMULATED_LIMIT_BYTES, 1e6 ); if( randomize && BUGGIFY ) BACKUP_SIMULATED_LIMIT_BYTES = 1000;
init( BACKUP_GET_RANGE_LIMIT_BYTES, 1e6 );
init( BACKUP_LOCK_BYTES, 1e8 );
Expand Down
1 change: 1 addition & 0 deletions fdbclient/Knobs.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ class ClientKnobs : public Knobs {
int TASKBUCKET_MAX_TASK_KEYS;

// Backup
int BACKUP_CONCURRENT_DELETES;
int BACKUP_SIMULATED_LIMIT_BYTES;
int BACKUP_GET_RANGE_LIMIT_BYTES;
int BACKUP_LOCK_BYTES;
Expand Down
4 changes: 2 additions & 2 deletions fdbrpc/AsyncFileEIO.actor.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ class AsyncFileEIO : public IAsyncFile, public ReferenceCounted<AsyncFileEIO> {
TraceEvent(notFound ? SevWarn : SevWarnAlways, "FileOpenError").error(e).GetLastError().detail("File", filename).detail("Flags", flags).detail("Mode", mode);
throw e;
}
TraceEvent("AsyncFileOpened").detail("Filename", filename).detail("fd", r->result).detail("Flags", flags);
TraceEvent("AsyncFileOpened").detail("Filename", filename).detail("fd", r->result).detail("Flags", flags).suppressFor(1.0);

if ((flags & OPEN_LOCK) && !lock_fd(r->result)) {
TraceEvent(SevError, "UnableToLockFile").detail("filename", filename).GetLastError();
Expand Down Expand Up @@ -264,7 +264,7 @@ class AsyncFileEIO : public IAsyncFile, public ReferenceCounted<AsyncFileEIO> {
state eio_req* r = eio_close(fd, 0, eio_callback, &p);
Void _ = wait( p.getFuture() );
if (r->result) error( "CloseError", fd, r );
TraceEvent("AsyncFileClosed").detail("fd", fd);
TraceEvent("AsyncFileClosed").detail("fd", fd).suppressFor(1.0);
}

ACTOR static Future<int> read_impl( int fd, void* data, int length, int64_t offset ) {
Expand Down
2 changes: 1 addition & 1 deletion fdbserver/ClusterController.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ class ClusterControllerData {
};

bool workerAvailable( WorkerInfo const& worker, bool checkStable ) {
return IFailureMonitor::failureMonitor().getState(worker.interf.storage.getEndpoint()).isAvailable() && ( !checkStable || worker.reboots < 2 );
return ( now() - startTime < 2 * FLOW_KNOBS->SERVER_REQUEST_INTERVAL ) || ( IFailureMonitor::failureMonitor().getState(worker.interf.storage.getEndpoint()).isAvailable() && ( !checkStable || worker.reboots < 2 ) );
}

std::pair<WorkerInterface, ProcessClass> getStorageWorker( RecruitStorageRequest const& req ) {
Expand Down
1 change: 0 additions & 1 deletion fdbserver/DataDistribution.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -611,7 +611,6 @@ struct DDTeamCollection {
}

if( foundExact || (req.wantsTrueBest && bestOption.present() ) ) {
TraceEvent("getTeam").detail("wantsVariety", req.wantsNewServers).detail("bestOption", bestOption.get()->getDesc());
ASSERT( bestOption.present() );
req.reply.send( bestOption );
return Void();
Expand Down
2 changes: 1 addition & 1 deletion fdbserver/DataDistributionQueue.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -702,7 +702,7 @@ ACTOR Future<Void> dataDistributionRelocator( DDQueueData *self, RelocateData rd

destination->addDataInFlightToTeam( +metrics.bytes );

TraceEvent("RelocateShardHasDestination", masterId)
TraceEvent(relocateShardInterval.severity, "RelocateShardHasDestination", masterId)
.detail("PairId", relocateShardInterval.pairID)
.detail("DestinationTeam", destination->getDesc());

Expand Down
4 changes: 4 additions & 0 deletions fdbserver/LeaderElection.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,10 @@ ACTOR Future<Void> tryBecomeLeaderInternal( ServerCoordinators coordinators, Val
state bool iAmLeader = false;
state UID prevChangeID;

if( asyncProcessClass->get().machineClassFitness(ProcessClass::ClusterController) > ProcessClass::UnsetFit || asyncIsExcluded->get() ) {
Void _ = wait( delay(SERVER_KNOBS->WAIT_FOR_GOOD_RECRUITMENT_DELAY) );
}

nominees->set( vector<Optional<LeaderInfo>>( coordinators.clientLeaderServers.size() ) );

myInfo.serializedInfo = proposedSerializedInterface;
Expand Down
32 changes: 17 additions & 15 deletions fdbserver/Status.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -790,8 +790,21 @@ ACTOR static Future<StatusObject> processStatusFetcher(

if (programStarts.count(address)) {
auto const& psxml = programStarts.at(address);
int64_t memLimit = parseInt64(extractAttribute(psxml, "MemoryLimit"));
memoryObj["limit_bytes"] = memLimit;

if(psxml.size() > 0) {
int64_t memLimit = parseInt64(extractAttribute(psxml, "MemoryLimit"));
memoryObj["limit_bytes"] = memLimit;

std::string version;
if (tryExtractAttribute(psxml, LiteralStringRef("Version"), version)) {
statusObj["version"] = version;
}

std::string commandLine;
if (tryExtractAttribute(psxml, LiteralStringRef("CommandLine"), commandLine)) {
statusObj["command_line"] = commandLine;
}
}
}

// if this process address is in the machine metrics
Expand All @@ -811,9 +824,10 @@ ACTOR static Future<StatusObject> processStatusFetcher(

StatusArray messages;

if (errors.count(address) && errors[address].size())
if (errors.count(address) && errors[address].size()) {
// returns status object with type and time of error
messages.push_back(getError(errors.at(address)));
}

// string of address used so that other fields of a NetworkAddress are not compared
std::string strAddress = address.toString();
Expand All @@ -838,18 +852,6 @@ ACTOR static Future<StatusObject> processStatusFetcher(
// Get roles for the worker's address as an array of objects
statusObj["roles"] = roles.getStatusForAddress(address);

if (programStarts.count(address)) {
auto const& psxml = programStarts.at(address);

std::string version;
if (tryExtractAttribute(psxml, LiteralStringRef("Version"), version))
statusObj["version"] = version;

std::string commandLine;
if (tryExtractAttribute(psxml, LiteralStringRef("CommandLine"), commandLine))
statusObj["command_line"] = commandLine;
}

if (configuration.present()){
statusObj["excluded"] = configuration.get().isExcludedServer(address);
}
Expand Down
6 changes: 4 additions & 2 deletions fdbserver/workloads/ConsistencyCheck.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -837,7 +837,7 @@ struct ConsistencyCheckWorkload : TestWorkload
else if(!isRelocating)
{
TraceEvent("ConsistencyCheck_StorageServerUnavailable").detail("StorageServer", storageServers[j]).detail("ShardBegin", printable(range.begin)).detail("ShardEnd", printable(range.end))
.detail("Address", storageServerInterfaces[j].address()).detail("GetKeyValuesToken", storageServerInterfaces[j].getKeyValues.getEndpoint().token);
.detail("Address", storageServerInterfaces[j].address()).detail("GetKeyValuesToken", storageServerInterfaces[j].getKeyValues.getEndpoint().token).suppressFor(1.0);

//All shards should be available in quiscence
if(self->performQuiescentChecks)
Expand Down Expand Up @@ -977,7 +977,9 @@ struct ConsistencyCheckWorkload : TestWorkload
}
}

TraceEvent("ConsistencyCheck_ReadRange").detail("range", printable(range)).detail("bytesRead", bytesReadInRange);
if(bytesReadInRange > 0) {
TraceEvent("ConsistencyCheck_ReadRange").detail("range", printable(range)).detail("bytesRead", bytesReadInRange);
}
}

//SOMEDAY: when background data distribution is implemented, include this test
Expand Down
2 changes: 1 addition & 1 deletion packaging/msi/FDBInstaller.wxs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@

<Wix xmlns='http://schemas.microsoft.com/wix/2006/wi'>
<Product Name='$(var.Title)'
Id='{61C46988-7589-4B8A-9BB9-D850FD5B8B05}'
Id='{4B030686-EEAE-40E1-B69E-1394537456B2}'
UpgradeCode='{A95EA002-686E-4164-8356-C715B7F8B1C8}'
Version='$(var.Version)'
Manufacturer='$(var.Manufacturer)'
Expand Down
Loading

0 comments on commit f773b94

Please sign in to comment.