Skip to content

Commit

Permalink
Group requests to ensure balance between bandwidth utilization and bo…
Browse files Browse the repository at this point in the history
…okkeeping

This fixes issues: #5391 #5390 #4498 #1633 #4454
  • Loading branch information
mrow4a committed Jan 9, 2017
1 parent 0cc1ea8 commit f5298b5
Show file tree
Hide file tree
Showing 10 changed files with 394 additions and 23 deletions.
1 change: 1 addition & 0 deletions src/libsync/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ set(libsync_SRCS
propagateupload.cpp
propagateuploadv1.cpp
propagateuploadng.cpp
propagateuploadbundle.cpp
propagateremotedelete.cpp
propagateremotemove.cpp
propagateremotemkdir.cpp
Expand Down
9 changes: 9 additions & 0 deletions src/libsync/capabilities.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,15 @@ QByteArray Capabilities::uploadChecksumType() const
return QByteArray();
}

bool Capabilities::bundling() const
{
static const auto bundling = qgetenv("OWNCLOUD_BUNDLING");
if (bundling == "0") return false;
if (bundling == "1") return true;

return _capabilities["dav"].toMap()["bundling"].toByteArray() >= "1.0";
}

bool Capabilities::chunkingNg() const
{
static const auto chunkng = qgetenv("OWNCLOUD_CHUNKING_NG");
Expand Down
1 change: 1 addition & 0 deletions src/libsync/capabilities.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ class OWNCLOUDSYNC_EXPORT Capabilities {
bool sharePublicLinkEnforceExpireDate() const;
int sharePublicLinkExpireDateDays() const;
bool shareResharing() const;
bool bundling() const;
bool chunkingNg() const;

/// disable parallel upload in chunking
Expand Down
7 changes: 7 additions & 0 deletions src/libsync/configfile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ static const char updateCheckIntervalC[] = "updateCheckInterval";
static const char geometryC[] = "geometry";
static const char timeoutC[] = "timeout";
static const char chunkSizeC[] = "chunkSize";
static const char smallFileSizeC[] = "smallFileSize";

static const char proxyHostC[] = "Proxy/host";
static const char proxyTypeC[] = "Proxy/type";
Expand Down Expand Up @@ -128,6 +129,12 @@ quint64 ConfigFile::chunkSize() const
return settings.value(QLatin1String(chunkSizeC), 10*1000*1000).toLongLong(); // default to 10 MB
}

quint64 ConfigFile::smallFileSize() const
{
QSettings settings(configFile(), QSettings::IniFormat);
return settings.value(QLatin1String(smallFileSizeC), 1*1000*1000).toLongLong(); // default to 1 MB
}

void ConfigFile::setOptionalDesktopNotifications(bool show)
{
QSettings settings(configFile(), QSettings::IniFormat);
Expand Down
1 change: 1 addition & 0 deletions src/libsync/configfile.h
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ class OWNCLOUDSYNC_EXPORT ConfigFile

int timeout() const;
quint64 chunkSize() const;
quint64 smallFileSize() const;

void saveGeometry(QWidget *w);
void restoreGeometry(QWidget *w);
Expand Down
95 changes: 79 additions & 16 deletions src/libsync/owncloudpropagator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,7 @@ void OwncloudPropagator::start(const SyncFileItemVector& items)
directories.push(qMakePair(QString(), _rootJob.data()));
QVector<PropagatorJob*> directoriesToRemove;
QString removedDirectory;
bool enableBundledRequests = account()->capabilities().bundling();
foreach(const SyncFileItemPtr &item, items) {

if (!removedDirectory.isEmpty() && item->_file.startsWith(removedDirectory)) {
Expand Down Expand Up @@ -389,13 +390,30 @@ void OwncloudPropagator::start(const SyncFileItemVector& items)
currentDirJob->append(dir);
}
directories.push(qMakePair(item->destination() + "/" , dir));
} else if (PropagateItemJob* current = createJob(item)) {
if (item->_instruction == CSYNC_INSTRUCTION_TYPE_CHANGE) {
// will delete directories, so defer execution
directoriesToRemove.prepend(current);
removedDirectory = item->_file + "/";
} else {
directories.top().second->append(current);
} else {
// Ensure that only files under or equal to chunk size are being inserted to Normal Upload
if (enableBundledRequests && item->_size <= chunkSize()
&& item->_instruction == CSYNC_INSTRUCTION_NEW
&& item->_direction == SyncFileItem::Up ) {
// Get PropagateNormalUpload container job
PropagateNormalUpload* bundleJob = 0;
if (directories.top().second->_bundledUploadJob.isNull()) {
bundleJob = new PropagateNormalUpload(this);
directories.top().second->_bundledUploadJob.reset(bundleJob);
} else {
bundleJob = qobject_cast<PropagateNormalUpload*>(directories.top().second->_bundledUploadJob.data());
}

// Append Upload job
bundleJob->append(item);
} else if (PropagateItemJob* current = createJob(item)) {
if (item->_instruction == CSYNC_INSTRUCTION_TYPE_CHANGE) {
// will delete directories, so defer execution
directoriesToRemove.prepend(current);
removedDirectory = item->_file + "/";
} else {
directories.top().second->append(current);
}
}
}
}
Expand Down Expand Up @@ -459,6 +477,20 @@ quint64 OwncloudPropagator::chunkSize()
return chunkSize;
}

quint64 OwncloudPropagator::smallFileSize()
{
// Small filesize item is the file which transfer time
// typicaly will be lower than its bookkeping time.
static uint smallFileSize;
if (!smallFileSize) {
smallFileSize = qgetenv("OWNCLOUD_SMALLFILE_SIZE").toUInt();
if (smallFileSize == 0) {
ConfigFile cfg;
smallFileSize = cfg.smallFileSize();
}
}
return smallFileSize;
}

bool OwncloudPropagator::localFileNameClash( const QString& relFile )
{
Expand Down Expand Up @@ -591,6 +623,13 @@ PropagatorJob::JobParallelism PropagateDirectory::parallelism()
return FullParallelism;
}

void PropagateDirectory::append(PropagatorJob *subJob) {
if (!subJob->isJobsContainer()){
// This is standard job, so increase global counter
_propagator->_standardJobsCount++;
}
_subJobs.append(subJob);
}

bool PropagateDirectory::scheduleNextJob()
{
Expand All @@ -601,15 +640,25 @@ bool PropagateDirectory::scheduleNextJob()
if (_state == NotYetStarted) {
_state = Running;

// At the begining of the Directory Job, update expected number of Jobs to be synced
_totalJobs = _subJobs.count();
if (_firstJob)
_totalJobs++;
if(_bundledUploadJob){
// PropagateNormalUpload is not a standard job, since it is abstract object
PropagateNormalUpload* bundle = qobject_cast<PropagateNormalUpload*>(_bundledUploadJob.take());
append(bundle);
}

if (!_firstJob && _subJobs.isEmpty()) {
finalize();
return true;
}

// At the begining of the Directory Job, update expected number of Jobs to be synced
_totalJobs = _subJobs.count();
if (_firstJob) {
// _firstJob is a standard job, since it does interact with server
_propagator->_standardJobsCount++;
_totalJobs++;
}

}

if (_firstJob && _firstJob->_state == NotYetStarted) {
Expand All @@ -626,14 +675,20 @@ bool PropagateDirectory::scheduleNextJob()

while (subJobsIterator.hasNext()) {
subJobsIterator.next();
// Get the state of the state of the sub job pointed by call next()
// Function value() will directly access the item through hash in the QList at that subjob
// Get the state of the sub job pointed by call next()
if (subJobsIterator.value()->_state == Finished) {
// If this items is finish, remove it from the _subJobs list as it is not needed anymore
// If this items is finished, remove it from the _subJobs as it is not needed anymore
// Note that in this case remove() from QVector will just perform memmove of pointer array items.
PropagatorJob * jobPointer = subJobsIterator.value();
PropagatorJob * job = subJobsIterator.value();
subJobsIterator.remove();
delete jobPointer;

// Delete only containers now, we need items in slotSubJobFinished
// Items will be deleted when one will call delete on parent container later
if (job->isJobsContainer()){
delete job;
} else {
_finishedSubJobs.append(job);
}
continue;
}

Expand Down Expand Up @@ -669,12 +724,20 @@ void PropagateDirectory::slotSubJobFinished(SyncFileItem::Status status)
} else if (status == SyncFileItem::NormalError || status == SyncFileItem::SoftError) {
_hasError = status;
}

PropagatorJob *job = qobject_cast<PropagatorJob *>(sender());
if (job && !job->isJobsContainer()){
// The finished job was an item, decrease global counter as it is finished
_propagator->_standardJobsCount--;
}

_runningNow--;
_jobsFinished++;

// We finished processing all the jobs
// check if we finished
if (_jobsFinished >= _totalJobs) {
Q_ASSERT(_propagator->_standardJobsCount>=0);
Q_ASSERT(!_runningNow); // how can we be finished if there are still jobs running now
finalize();
} else {
Expand Down
32 changes: 27 additions & 5 deletions src/libsync/owncloudpropagator.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,12 @@ class PropagatorJob : public QObject {
*/
virtual qint64 committedDiskSpace() const { return 0; }

/**
* As in the description, this class can be job or job container
* This flag will allow to detect it
*/
virtual bool isJobsContainer() const { return false; }

public slots:
virtual void abort() {}

Expand Down Expand Up @@ -185,11 +191,18 @@ class OWNCLOUDSYNC_EXPORT PropagateDirectory : public PropagatorJob {
Q_OBJECT
public:
// e.g: create the directory
QScopedPointer<PropagateItemJob>_firstJob;
QScopedPointer<PropagateItemJob> _firstJob;

// e.g: create class which will handle bundled uploads and bandwidth utilization vs bookkeeping balance
QScopedPointer<PropagatorJob> _bundledUploadJob;

// all the sub files or sub directories.
QVector<PropagatorJob *> _subJobs;

// all the finished sub PropagatorJob items which are not PropagateItemJob.
// one might need PropagatorJobs (PropagateDirectory, PropagateNormalUpload)
QVector<PropagatorJob *> _finishedSubJobs;

SyncFileItemPtr _item;

int _jobsFinished; // number of jobs that have completed
Expand All @@ -199,16 +212,16 @@ class OWNCLOUDSYNC_EXPORT PropagateDirectory : public PropagatorJob {

explicit PropagateDirectory(OwncloudPropagator *propagator, const SyncFileItemPtr &item = SyncFileItemPtr(new SyncFileItem))
: PropagatorJob(propagator)
, _firstJob(0), _item(item), _jobsFinished(0), _runningNow(0), _hasError(SyncFileItem::NoStatus), _totalJobs(0)
, _firstJob(0), _bundledUploadJob(0), _item(item), _jobsFinished(0), _runningNow(0), _hasError(SyncFileItem::NoStatus), _totalJobs(0)

{ }

virtual ~PropagateDirectory() {
qDeleteAll(_subJobs);
qDeleteAll(_finishedSubJobs);
}

void append(PropagatorJob *subJob) {
_subJobs.append(subJob);
}
void append(PropagatorJob *subJob);

virtual bool scheduleNextJob() Q_DECL_OVERRIDE;
virtual JobParallelism parallelism() Q_DECL_OVERRIDE;
Expand All @@ -227,6 +240,8 @@ class OWNCLOUDSYNC_EXPORT PropagateDirectory : public PropagatorJob {

qint64 committedDiskSpace() const Q_DECL_OVERRIDE;

bool isJobsContainer() const Q_DECL_OVERRIDE { return true; }

private slots:
bool possiblyRunNextJob(PropagatorJob *next) {
if (next->_state == NotYetStarted) {
Expand Down Expand Up @@ -281,6 +296,9 @@ class OwncloudPropagator : public QObject {
, _journal(progressDb)
, _finishedEmited(false)
, _bandwidthManager(this)
, _activeDBJobs(0)
, _dbJobsCount(0)
, _standardJobsCount(0)
, _anotherSyncNeeded(false)
, _account(account)
{ }
Expand All @@ -302,6 +320,9 @@ class OwncloudPropagator : public QObject {
Jobs can be several time on the list (example, when several chunks are uploaded in parallel)
*/
QList<PropagateItemJob*> _activeJobList;
qint8 _activeDBJobs; // number of active DB jobs running
qint64 _dbJobsCount; // number of all jobs in which db operations are major factor
qint64 _standardJobsCount; // number of all jobs which are rare or in which db operations are not a major factor

/** We detected that another sync is required after this one */
bool _anotherSyncNeeded;
Expand All @@ -327,6 +348,7 @@ class OwncloudPropagator : public QObject {

/** returns the size of chunks in bytes */
static quint64 chunkSize();
static quint64 smallFileSize();

AccountPtr account() const;

Expand Down
2 changes: 1 addition & 1 deletion src/libsync/propagatedownload.h
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ class PropagateDownloadFile : public PropagateItemJob {
qint64 committedDiskSpace() const Q_DECL_OVERRIDE;

// We think it might finish quickly because it is a small file.
bool isLikelyFinishedQuickly() Q_DECL_OVERRIDE { return _item->_size < 100*1024; }
bool isLikelyFinishedQuickly() Q_DECL_OVERRIDE { return _item->_size < _propagator->smallFileSize(); }

/**
* Whether an existing folder with the same name may be deleted before
Expand Down
Loading

0 comments on commit f5298b5

Please sign in to comment.