Skip to content

Commit

Permalink
Add folder items scheduler by specific predicate - item sizes ascending
Browse files Browse the repository at this point in the history
  • Loading branch information
mrow4a committed Dec 5, 2016
1 parent 4ac2b76 commit 353cd16
Show file tree
Hide file tree
Showing 8 changed files with 121 additions and 42 deletions.
57 changes: 33 additions & 24 deletions src/libsync/owncloudpropagator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -401,9 +401,12 @@ void OwncloudPropagator::start(const SyncFileItemVector& items)
}

foreach(PropagatorJob* it, directoriesToRemove) {
it->setHighJobPriority();
_rootJob->append(it);
}

_rootJob->updateJobPredicateValues();

connect(_rootJob.data(), SIGNAL(itemCompleted(const SyncFileItem &, const PropagatorJob &)),
this, SIGNAL(itemCompleted(const SyncFileItem &, const PropagatorJob &)));
connect(_rootJob.data(), SIGNAL(progress(const SyncFileItem &,quint64)), this, SIGNAL(progress(const SyncFileItem &,quint64)));
Expand Down Expand Up @@ -574,20 +577,29 @@ OwncloudPropagator::DiskSpaceResult OwncloudPropagator::diskSpaceCheck() const

// ================================================================================

void PropagateDirectory::updateJobPredicateValues()
{
QMutableVectorIterator<PropagatorJob *> containerJobsIterator(_containerJobs);
while (containerJobsIterator.hasNext()) {
PropagatorJob * job = containerJobsIterator.next();
job->updateJobPredicateValues();
_subJobs.insertMulti(job->getJobPredicateValue(), job);
containerJobsIterator.remove();
}
}

PropagatorJob::JobParallelism PropagateDirectory::parallelism()
{
// If any of the non-finished sub jobs is not parallel, we have to wait

// FIXME! we should probably cache this result

if (_firstJob && _firstJob->_state != Finished) {
if (_firstJob->parallelism() != FullParallelism)
return WaitForFinished;
}

// FIXME: use the cached value of finished job
for (int i = 0; i < _subJobs.count(); ++i) {
if (_subJobs.at(i)->_state != Finished && _subJobs.at(i)->parallelism() != FullParallelism) {
QMapIterator<quint64, PropagatorJob *> subJobsIterator(_subJobs);
while (subJobsIterator.hasNext()) {
subJobsIterator.next();
if (subJobsIterator.value()->_state != Finished && subJobsIterator.value()->parallelism() != FullParallelism) {
return WaitForFinished;
}
}
Expand All @@ -604,6 +616,11 @@ bool PropagateDirectory::scheduleNextJob()
if (_state == NotYetStarted) {
_state = Running;

// at the begining of the Directory Job, update expected number of Jobs to be synced
_totalJobs = _subJobs.count();
if (_firstJob)
_totalJobs++;

if (!_firstJob && _subJobs.isEmpty()) {
finalize();
return true;
Expand All @@ -618,30 +635,27 @@ bool PropagateDirectory::scheduleNextJob()
return false;
}

// cache the value of first unfinished subjob
bool stopAtDirectory = false;
int i = _firstUnfinishedSubJob;
int subJobsCount = _subJobs.count();
while (i < subJobsCount && _subJobs.at(i)->_state == Finished) {
_firstUnfinishedSubJob = ++i;
}

for (int i = _firstUnfinishedSubJob; i < subJobsCount; ++i) {
if (_subJobs.at(i)->_state == Finished) {
QMutableMapIterator<quint64, PropagatorJob *> subJobsIterator(_subJobs);
while (subJobsIterator.hasNext()) {
subJobsIterator.next();
if (subJobsIterator.value()->_state == Finished) {
subJobsIterator.remove();
continue;
}

if (stopAtDirectory && qobject_cast<PropagateDirectory*>(_subJobs.at(i))) {
if (stopAtDirectory && qobject_cast<PropagateDirectory*>(subJobsIterator.value())) {
return false;
}

if (possiblyRunNextJob(_subJobs.at(i))) {
if (possiblyRunNextJob(subJobsIterator.value())) {
return true;
}

Q_ASSERT(_subJobs.at(i)->_state == Running);
Q_ASSERT(subJobsIterator.value()->_state == Running);

auto paral = _subJobs.at(i)->parallelism();
auto paral = subJobsIterator.value()->parallelism();
if (paral == WaitForFinished) {
return false;
}
Expand All @@ -666,14 +680,9 @@ void PropagateDirectory::slotSubJobFinished(SyncFileItem::Status status)
_runningNow--;
_jobsFinished++;

int totalJobs = _subJobs.count();
if (_firstJob) {
totalJobs++;
}

// We finished processing all the jobs
// check if we finished
if (_jobsFinished >= totalJobs) {
if (_jobsFinished >= _totalJobs) {
Q_ASSERT(!_runningNow); // how can we be finished if there are still jobs running now
finalize();
} else {
Expand Down
82 changes: 72 additions & 10 deletions src/libsync/owncloudpropagator.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,24 @@ class PropagatorJob : public QObject {
OwncloudPropagator *_propagator;

public:
explicit PropagatorJob(OwncloudPropagator* propagator) : _propagator(propagator), _state(NotYetStarted) {}
enum JobPriority {

/** Jobs can prioritized, so that they will be executed first, according to insertion order */
InsertionOrderHighPriority,

/** Jobs are normaly prioritized, so that they will be executed according to some predicate represented by integer*/
NormalPriority,\

/** To contruct predicate for this Priority, all subitem has to be classified and contenerised */
ContainerItemsPriority,
};

explicit PropagatorJob(OwncloudPropagator* propagator, JobPriority priority) : _propagator(propagator), _priority(priority), _state(NotYetStarted) {}

/*
* Keeps track of the priority of the object
*/
JobPriority _priority;

enum JobState {
NotYetStarted,
Expand Down Expand Up @@ -100,6 +117,23 @@ class PropagatorJob : public QObject {
*/
virtual qint64 committedDiskSpace() const { return 0; }

/**
* Returns job priority predicate value
* 0 if InsertionOrderHighPriority
* Priority if other type of priority
*/
virtual quint64 getJobPredicateValue() const { return 0; }

/**
* Updates job priorities for the given job
*/
virtual void updateJobPredicateValues() {}

/**
* Enforces job priority
*/
virtual void setHighJobPriority() { _priority = JobPriority::InsertionOrderHighPriority; }

public slots:
virtual void abort() {}

Expand Down Expand Up @@ -158,8 +192,8 @@ protected slots:
QScopedPointer<PropagateItemJob> _restoreJob;

public:
PropagateItemJob(OwncloudPropagator* propagator, const SyncFileItemPtr &item)
: PropagatorJob(propagator), _item(item) {}
PropagateItemJob(OwncloudPropagator* propagator, const SyncFileItemPtr &item, JobPriority priority)
: PropagatorJob(propagator, priority), _item(item) {}

bool scheduleNextJob() Q_DECL_OVERRIDE {
if (_state != NotYetStarted) {
Expand Down Expand Up @@ -187,27 +221,49 @@ class OWNCLOUDSYNC_EXPORT PropagateDirectory : public PropagatorJob {
// e.g: create the directory
QScopedPointer<PropagateItemJob>_firstJob;

// all the sub files or sub directories.
QVector<PropagatorJob *> _subJobs;
/*
* All the sub files or sub directories. This map has to be updated with _containerJobs
* QMultiMap is ordered by the key value, or in case of equal keys (e.g 0) by insertion order.
* <quint64 predicate, PropagatorJob * job>, where predicate is obtained by call to getJobPredicateValue()
*/
QMultiMap<quint64, PropagatorJob *> _subJobs;

/*
* This vector is just temporary structure used to keep the "container" jobs like directory job.
* After the updateJobPredicateValues() is called on this directories, these container jobs will be inserted
* to _subJobs queue. Note: This has to be called after all items are added to the _subJobs for whole the sync!
*/
QVector<PropagatorJob *> _containerJobs;

SyncFileItemPtr _item;

int _jobsFinished; // number of jobs that have completed
int _totalJobs; // number of jobs that will be defined to be synced
int _runningNow; // number of subJobs running right now
SyncFileItem::Status _hasError; // NoStatus, or NormalError / SoftError if there was an error
int _firstUnfinishedSubJob;

/*
* Keeps track of the all the sub files or sub directories priority.
*/
qint64 _subJobsPriority;

explicit PropagateDirectory(OwncloudPropagator *propagator, const SyncFileItemPtr &item = SyncFileItemPtr(new SyncFileItem))
: PropagatorJob(propagator)
, _firstJob(0), _item(item), _jobsFinished(0), _runningNow(0), _hasError(SyncFileItem::NoStatus), _firstUnfinishedSubJob(0)
: PropagatorJob(propagator, JobPriority::ContainerItemsPriority)
, _firstJob(0), _item(item), _jobsFinished(0), _totalJobs(0), _runningNow(0), _hasError(SyncFileItem::NoStatus), _subJobsPriority(0)
{ }

virtual ~PropagateDirectory() {
qDeleteAll(_subJobs);
qDeleteAll(_containerJobs);
}

void append(PropagatorJob *subJob) {
_subJobs.append(subJob);
_subJobsPriority += subJob->getJobPredicateValue();
if(subJob->_priority == JobPriority::ContainerItemsPriority){
_containerJobs.append(subJob);
} else {
_subJobs.insert(subJob->getJobPredicateValue(), subJob);
}
}

virtual bool scheduleNextJob() Q_DECL_OVERRIDE;
Expand All @@ -227,6 +283,12 @@ class OWNCLOUDSYNC_EXPORT PropagateDirectory : public PropagatorJob {

qint64 committedDiskSpace() const Q_DECL_OVERRIDE;

// this item is prioritized normaly, so get priority by its sub items total size
quint64 getJobPredicateValue() const Q_DECL_OVERRIDE { return _subJobsPriority; }

// This uses recursion to perform Depth-First Traversal of the directories with changes trees
// If the given (this) directory contains _containerJobs, it will call updateJob on that child dir job, otherwise does nothing
void updateJobPredicateValues();
private slots:
bool possiblyRunNextJob(PropagatorJob *next) {
if (next->_state == NotYetStarted) {
Expand All @@ -252,7 +314,7 @@ class PropagateIgnoreJob : public PropagateItemJob {
Q_OBJECT
public:
PropagateIgnoreJob(OwncloudPropagator* propagator,const SyncFileItemPtr& item)
: PropagateItemJob(propagator, item) {}
: PropagateItemJob(propagator, item, JobPriority::InsertionOrderHighPriority) {}
void start() Q_DECL_OVERRIDE {
SyncFileItem::Status status = _item->_status;
done(status == SyncFileItem::NoStatus ? SyncFileItem::FileIgnored : status, _item->_errorString);
Expand Down
5 changes: 4 additions & 1 deletion src/libsync/propagatedownload.h
Original file line number Diff line number Diff line change
Expand Up @@ -110,13 +110,16 @@ class PropagateDownloadFile : public PropagateItemJob {
Q_OBJECT
public:
PropagateDownloadFile(OwncloudPropagator* propagator,const SyncFileItemPtr& item)
: PropagateItemJob(propagator, item), _resumeStart(0), _downloadProgress(0), _deleteExisting(false) {}
: PropagateItemJob(propagator, item, JobPriority::NormalPriority), _resumeStart(0), _downloadProgress(0), _deleteExisting(false) {}
void start() Q_DECL_OVERRIDE;
qint64 committedDiskSpace() const Q_DECL_OVERRIDE;

// We think it might finish quickly because it is a small file.
bool isLikelyFinishedQuickly() Q_DECL_OVERRIDE { return _item->_size < 100*1024; }

// this item is prioritized normaly, so get priority by its size
quint64 getJobPredicateValue() const Q_DECL_OVERRIDE { return _item->_size; }

/**
* Whether an existing folder with the same name may be deleted before
* the download.
Expand Down
2 changes: 1 addition & 1 deletion src/libsync/propagateremotedelete.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class PropagateRemoteDelete : public PropagateItemJob {
QPointer<DeleteJob> _job;
public:
PropagateRemoteDelete (OwncloudPropagator* propagator,const SyncFileItemPtr& item)
: PropagateItemJob(propagator, item) {}
: PropagateItemJob(propagator, item, JobPriority::InsertionOrderHighPriority) {}
void start() Q_DECL_OVERRIDE;
void abort() Q_DECL_OVERRIDE;

Expand Down
2 changes: 1 addition & 1 deletion src/libsync/propagateremotemkdir.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class PropagateRemoteMkdir : public PropagateItemJob {
friend class PropagateDirectory; // So it can access the _item;
public:
PropagateRemoteMkdir (OwncloudPropagator* propagator,const SyncFileItemPtr& item)
: PropagateItemJob(propagator, item), _deleteExisting(false) {}
: PropagateItemJob(propagator, item, JobPriority::InsertionOrderHighPriority), _deleteExisting(false) {}
void start() Q_DECL_OVERRIDE;
void abort() Q_DECL_OVERRIDE;

Expand Down
2 changes: 1 addition & 1 deletion src/libsync/propagateremotemove.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class PropagateRemoteMove : public PropagateItemJob {
QPointer<MoveJob> _job;
public:
PropagateRemoteMove (OwncloudPropagator* propagator,const SyncFileItemPtr& item)
: PropagateItemJob(propagator, item) {}
: PropagateItemJob(propagator, item, JobPriority::InsertionOrderHighPriority) {}
void start() Q_DECL_OVERRIDE;
void abort() Q_DECL_OVERRIDE;
JobParallelism parallelism() Q_DECL_OVERRIDE { return OCC::PropagatorJob::WaitForFinishedInParentDirectory; }
Expand Down
5 changes: 4 additions & 1 deletion src/libsync/propagateupload.h
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ class PropagateUploadFileCommon : public PropagateItemJob {

public:
PropagateUploadFileCommon(OwncloudPropagator* propagator,const SyncFileItemPtr& item)
: PropagateItemJob(propagator, item), _finished(false), _deleteExisting(false) {}
: PropagateItemJob(propagator, item, JobPriority::NormalPriority), _finished(false), _deleteExisting(false) {}

/**
* Whether an existing entity with the same name may be deleted before
Expand All @@ -211,6 +211,9 @@ class PropagateUploadFileCommon : public PropagateItemJob {

bool isLikelyFinishedQuickly() Q_DECL_OVERRIDE { return _item->_size < 100*1024; }

// this item is prioritized normaly, so get priority by its size
quint64 getJobPredicateValue() const Q_DECL_OVERRIDE { return _item->_size; }

private slots:
void slotComputeContentChecksum();
// Content checksum computed, compute the transmission checksum
Expand Down
8 changes: 5 additions & 3 deletions src/libsync/propagatorjobs.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ static const char checkSumAdlerC[] = "Adler32";
class PropagateLocalRemove : public PropagateItemJob {
Q_OBJECT
public:
PropagateLocalRemove (OwncloudPropagator* propagator,const SyncFileItemPtr& item) : PropagateItemJob(propagator, item) {}
PropagateLocalRemove (OwncloudPropagator* propagator,const SyncFileItemPtr& item)
: PropagateItemJob(propagator, item, JobPriority::InsertionOrderHighPriority) {}
void start() Q_DECL_OVERRIDE;
private:
bool removeRecursively(const QString &path);
Expand All @@ -55,7 +56,7 @@ class PropagateLocalMkdir : public PropagateItemJob {
Q_OBJECT
public:
PropagateLocalMkdir (OwncloudPropagator* propagator,const SyncFileItemPtr& item)
: PropagateItemJob(propagator, item), _deleteExistingFile(false) {}
: PropagateItemJob(propagator, item, JobPriority::InsertionOrderHighPriority), _deleteExistingFile(false) {}
void start() Q_DECL_OVERRIDE;

/**
Expand All @@ -77,7 +78,8 @@ class PropagateLocalMkdir : public PropagateItemJob {
class PropagateLocalRename : public PropagateItemJob {
Q_OBJECT
public:
PropagateLocalRename (OwncloudPropagator* propagator,const SyncFileItemPtr& item) : PropagateItemJob(propagator, item) {}
PropagateLocalRename (OwncloudPropagator* propagator,const SyncFileItemPtr& item)
: PropagateItemJob(propagator, item, JobPriority::InsertionOrderHighPriority) {}
void start() Q_DECL_OVERRIDE;
JobParallelism parallelism() Q_DECL_OVERRIDE { return WaitForFinishedInParentDirectory; }
};
Expand Down

0 comments on commit 353cd16

Please sign in to comment.