Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[TaskProcessing] Add manager::runTask method #47522

Merged
merged 2 commits into from
Aug 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
164 changes: 122 additions & 42 deletions lib/private/TaskProcessing/Manager.php
Original file line number Diff line number Diff line change
Expand Up @@ -716,55 +716,69 @@ public function scheduleTask(Task $task): void {
if (!$this->canHandleTask($task)) {
throw new \OCP\TaskProcessing\Exception\PreConditionNotMetException('No task processing provider is installed that can handle this task type: ' . $task->getTaskTypeId());
}
$taskTypes = $this->getAvailableTaskTypes();
$inputShape = $taskTypes[$task->getTaskTypeId()]['inputShape'];
$inputShapeDefaults = $taskTypes[$task->getTaskTypeId()]['inputShapeDefaults'];
$inputShapeEnumValues = $taskTypes[$task->getTaskTypeId()]['inputShapeEnumValues'];
$optionalInputShape = $taskTypes[$task->getTaskTypeId()]['optionalInputShape'];
$optionalInputShapeEnumValues = $taskTypes[$task->getTaskTypeId()]['optionalInputShapeEnumValues'];
$optionalInputShapeDefaults = $taskTypes[$task->getTaskTypeId()]['optionalInputShapeDefaults'];
// validate input
$this->validateInput($inputShape, $inputShapeDefaults, $inputShapeEnumValues, $task->getInput());
$this->validateInput($optionalInputShape, $optionalInputShapeDefaults, $optionalInputShapeEnumValues, $task->getInput(), true);
// authenticate access to mentioned files
$ids = [];
foreach ($inputShape + $optionalInputShape as $key => $descriptor) {
if (in_array(EShapeType::getScalarType($descriptor->getShapeType()), [EShapeType::File, EShapeType::Image, EShapeType::Audio, EShapeType::Video], true)) {
/** @var list<int>|int $inputSlot */
$inputSlot = $task->getInput()[$key];
if (is_array($inputSlot)) {
$ids += $inputSlot;
} else {
$ids[] = $inputSlot;
}
}
}
foreach ($ids as $fileId) {
$this->validateFileId($fileId);
$this->validateUserAccessToFile($fileId, $task->getUserId());
}
// remove superfluous keys and set input
$input = $this->removeSuperfluousArrayKeys($task->getInput(), $inputShape, $optionalInputShape);
$inputWithDefaults = $this->fillInputDefaults($input, $inputShapeDefaults, $optionalInputShapeDefaults);
$task->setInput($inputWithDefaults);
$this->prepareTask($task);
$task->setStatus(Task::STATUS_SCHEDULED);
$task->setScheduledAt(time());
$provider = $this->getPreferredProvider($task->getTaskTypeId());
// calculate expected completion time
$completionExpectedAt = new \DateTime('now');
$completionExpectedAt->add(new \DateInterval('PT'.$provider->getExpectedRuntime().'S'));
$task->setCompletionExpectedAt($completionExpectedAt);
// create a db entity and insert into db table
$taskEntity = \OC\TaskProcessing\Db\Task::fromPublicTask($task);
$this->taskMapper->insert($taskEntity);
// make sure the scheduler knows the id
$task->setId($taskEntity->getId());
$this->storeTask($task);
// schedule synchronous job if the provider is synchronous
$provider = $this->getPreferredProvider($task->getTaskTypeId());
if ($provider instanceof ISynchronousProvider) {
$this->jobList->add(SynchronousBackgroundJob::class, null);
}
}

public function runTask(Task $task): Task {
if (!$this->canHandleTask($task)) {
throw new \OCP\TaskProcessing\Exception\PreConditionNotMetException('No task processing provider is installed that can handle this task type: ' . $task->getTaskTypeId());
}

$provider = $this->getPreferredProvider($task->getTaskTypeId());
if ($provider instanceof ISynchronousProvider) {
$this->prepareTask($task);
$task->setStatus(Task::STATUS_SCHEDULED);
$this->storeTask($task);
$this->processTask($task, $provider);
$task = $this->getTask($task->getId());
} else {
$this->scheduleTask($task);
// poll task
while ($task->getStatus() === Task::STATUS_SCHEDULED || $task->getStatus() === Task::STATUS_RUNNING) {
sleep(1);
$task = $this->getTask($task->getId());
}
}
return $task;
}

public function processTask(Task $task, ISynchronousProvider $provider): bool {
try {
try {
$input = $this->prepareInputData($task);
} catch (GenericFileException|NotPermittedException|LockedException|ValidationException|UnauthorizedException $e) {
$this->logger->warning('Failed to prepare input data for a TaskProcessing task with synchronous provider ' . $provider->getId(), ['exception' => $e]);
$this->setTaskResult($task->getId(), $e->getMessage(), null);
return false;
}
try {
$this->setTaskStatus($task, Task::STATUS_RUNNING);
$output = $provider->process($task->getUserId(), $input, fn (float $progress) => $this->setTaskProgress($task->getId(), $progress));
} catch (ProcessingException $e) {
$this->logger->warning('Failed to process a TaskProcessing task with synchronous provider ' . $provider->getId(), ['exception' => $e]);
$this->setTaskResult($task->getId(), $e->getMessage(), null);
return false;
} catch (\Throwable $e) {
$this->logger->error('Unknown error while processing TaskProcessing task', ['exception' => $e]);
$this->setTaskResult($task->getId(), $e->getMessage(), null);
return false;
}
$this->setTaskResult($task->getId(), null, $output);
} catch (NotFoundException $e) {
$this->logger->info('Could not find task anymore after execution. Moving on.', ['exception' => $e]);
} catch (Exception $e) {
$this->logger->error('Failed to report result of TaskProcessing task', ['exception' => $e]);
}
return true;
}

public function deleteTask(Task $task): void {
$taskEntity = \OC\TaskProcessing\Db\Task::fromPublicTask($task);
$this->taskMapper->delete($taskEntity);
Expand Down Expand Up @@ -1095,6 +1109,72 @@ public function setTaskStatus(Task $task, int $status): void {
$this->taskMapper->update($taskEntity);
}

/**
* Validate input, fill input default values, set completionExpectedAt, set scheduledAt
*
* @param Task $task
* @return void
* @throws UnauthorizedException
* @throws ValidationException
* @throws \OCP\TaskProcessing\Exception\Exception
*/
private function prepareTask(Task $task): void {
$taskTypes = $this->getAvailableTaskTypes();
$taskType = $taskTypes[$task->getTaskTypeId()];
$inputShape = $taskType['inputShape'];
$inputShapeDefaults = $taskType['inputShapeDefaults'];
$inputShapeEnumValues = $taskType['inputShapeEnumValues'];
$optionalInputShape = $taskType['optionalInputShape'];
$optionalInputShapeEnumValues = $taskType['optionalInputShapeEnumValues'];
$optionalInputShapeDefaults = $taskType['optionalInputShapeDefaults'];
// validate input
$this->validateInput($inputShape, $inputShapeDefaults, $inputShapeEnumValues, $task->getInput());
$this->validateInput($optionalInputShape, $optionalInputShapeDefaults, $optionalInputShapeEnumValues, $task->getInput(), true);
// authenticate access to mentioned files
$ids = [];
foreach ($inputShape + $optionalInputShape as $key => $descriptor) {
if (in_array(EShapeType::getScalarType($descriptor->getShapeType()), [EShapeType::File, EShapeType::Image, EShapeType::Audio, EShapeType::Video], true)) {
/** @var list<int>|int $inputSlot */
$inputSlot = $task->getInput()[$key];
if (is_array($inputSlot)) {
$ids += $inputSlot;
} else {
$ids[] = $inputSlot;
}
}
}
foreach ($ids as $fileId) {
$this->validateFileId($fileId);
$this->validateUserAccessToFile($fileId, $task->getUserId());
}
// remove superfluous keys and set input
$input = $this->removeSuperfluousArrayKeys($task->getInput(), $inputShape, $optionalInputShape);
$inputWithDefaults = $this->fillInputDefaults($input, $inputShapeDefaults, $optionalInputShapeDefaults);
$task->setInput($inputWithDefaults);
$task->setScheduledAt(time());
$provider = $this->getPreferredProvider($task->getTaskTypeId());
// calculate expected completion time
$completionExpectedAt = new \DateTime('now');
$completionExpectedAt->add(new \DateInterval('PT'.$provider->getExpectedRuntime().'S'));
$task->setCompletionExpectedAt($completionExpectedAt);
}

/**
* Store the task in the DB and set its ID in the \OCP\TaskProcessing\Task input param
*
* @param Task $task
* @return void
* @throws Exception
* @throws \JsonException
*/
private function storeTask(Task $task): void {
// create a db entity and insert into db table
$taskEntity = \OC\TaskProcessing\Db\Task::fromPublicTask($task);
$this->taskMapper->insert($taskEntity);
// make sure the scheduler knows the id
$task->setId($taskEntity->getId());
}

/**
* @param array $output
* @param ShapeDescriptor[] ...$specs the specs that define which keys to keep
Expand Down
57 changes: 16 additions & 41 deletions lib/private/TaskProcessing/SynchronousBackgroundJob.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,8 @@
use OCP\AppFramework\Utility\ITimeFactory;
use OCP\BackgroundJob\IJobList;
use OCP\BackgroundJob\QueuedJob;
use OCP\Files\GenericFileException;
use OCP\Files\NotPermittedException;
use OCP\Lock\LockedException;
use OCP\TaskProcessing\Exception\Exception;
use OCP\TaskProcessing\Exception\NotFoundException;
use OCP\TaskProcessing\Exception\ProcessingException;
use OCP\TaskProcessing\Exception\UnauthorizedException;
use OCP\TaskProcessing\Exception\ValidationException;
use OCP\TaskProcessing\IManager;
use OCP\TaskProcessing\ISynchronousProvider;
use OCP\TaskProcessing\Task;
Expand Down Expand Up @@ -57,46 +51,27 @@ protected function run($argument) {
$this->logger->error('Unknown error while retrieving scheduled TaskProcessing tasks', ['exception' => $e]);
continue;
}
try {
try {
$input = $this->taskProcessingManager->prepareInputData($task);
} catch (GenericFileException|NotPermittedException|LockedException|ValidationException|UnauthorizedException $e) {
$this->logger->warning('Failed to prepare input data for a TaskProcessing task with synchronous provider ' . $provider->getId(), ['exception' => $e]);
$this->taskProcessingManager->setTaskResult($task->getId(), $e->getMessage(), null);
// Schedule again
$this->jobList->add(self::class, $argument);
return;
}
try {
$this->taskProcessingManager->setTaskStatus($task, Task::STATUS_RUNNING);
$output = $provider->process($task->getUserId(), $input, fn (float $progress) => $this->taskProcessingManager->setTaskProgress($task->getId(), $progress));
} catch (ProcessingException $e) {
$this->logger->warning('Failed to process a TaskProcessing task with synchronous provider ' . $provider->getId(), ['exception' => $e]);
$this->taskProcessingManager->setTaskResult($task->getId(), $e->getMessage(), null);
// Schedule again
$this->jobList->add(self::class, $argument);
return;
} catch (\Throwable $e) {
$this->logger->error('Unknown error while processing TaskProcessing task', ['exception' => $e]);
$this->taskProcessingManager->setTaskResult($task->getId(), $e->getMessage(), null);
// Schedule again
$this->jobList->add(self::class, $argument);
return;
}
$this->taskProcessingManager->setTaskResult($task->getId(), null, $output);
} catch (NotFoundException $e) {
$this->logger->info('Could not find task anymore after execution. Moving on.', ['exception' => $e]);
} catch (Exception $e) {
$this->logger->error('Failed to report result of TaskProcessing task', ['exception' => $e]);
if (!$this->taskProcessingManager->processTask($task, $provider)) {
// Schedule again
$this->jobList->add(self::class, $argument);
}
}

// check if this job needs to be scheduled again:
// if there is at least one preferred synchronous provider that has a scheduled task
$synchronousProviders = array_filter($providers, fn ($provider) =>
$provider instanceof ISynchronousProvider);
$taskTypes = array_values(array_map(fn ($provider) =>
$provider->getTaskTypeId(),
$synchronousProviders
));
$synchronousPreferredProviders = array_filter($synchronousProviders, function ($provider) {
$taskTypeId = $provider->getTaskTypeId();
$preferredProvider = $this->taskProcessingManager->getPreferredProvider($taskTypeId);
return $provider->getId() === $preferredProvider->getId();
});
$taskTypes = array_values(
array_map(
fn ($provider) => $provider->getTaskTypeId(),
$synchronousPreferredProviders
)
);
$taskTypesWithTasks = array_filter($taskTypes, function ($taskType) {
try {
$this->taskProcessingManager->getNextScheduledTask([$taskType]);
Expand Down
27 changes: 27 additions & 0 deletions lib/public/TaskProcessing/IManager.php
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,33 @@ public function getAvailableTaskTypes(): array;
*/
public function scheduleTask(Task $task): void;

/**
* Run the task and return the finished task
*
* @param Task $task The task to run
* @return Task The result task
* @throws PreConditionNotMetException If no or not the requested provider was registered but this method was still called
* @throws ValidationException the given task input didn't pass validation against the task type's input shape and/or the providers optional input shape specs
* @throws Exception storing the task in the database failed
* @throws UnauthorizedException the user scheduling the task does not have access to the files used in the input
* @since 30.0.0
*/
public function runTask(Task $task): Task;

/**
* Process task with a synchronous provider
*
* Prepare task input data and run the process method of the provider
* This should only be used by OC\TaskProcessing\SynchronousBackgroundJob::run() and OCP\TaskProcessing\IManager::runTask()
*
* @param Task $task
* @param ISynchronousProvider $provider
* @return bool True if the task has run successfully
* @throws Exception
* @since 30.0.0
*/
public function processTask(Task $task, ISynchronousProvider $provider): bool;

/**
* Delete a task that has been scheduled before
*
Expand Down
Loading