From 1f2b2024d713e984ad9b260f46b93f81c5824951 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Tue, 17 Mar 2026 18:21:23 +0000 Subject: [PATCH 1/8] feat(taskprocessing): Add worker command with tests and registration Co-authored-by: marcelklehr <986878+marcelklehr@users.noreply.github.com> --- core/Command/TaskProcessing/WorkerCommand.php | 150 ++++++++++ core/register_command.php | 2 + .../TaskProcessing/WorkerCommandTest.php | 264 ++++++++++++++++++ 3 files changed, 416 insertions(+) create mode 100644 core/Command/TaskProcessing/WorkerCommand.php create mode 100644 tests/Core/Command/TaskProcessing/WorkerCommandTest.php diff --git a/core/Command/TaskProcessing/WorkerCommand.php b/core/Command/TaskProcessing/WorkerCommand.php new file mode 100644 index 0000000000000..29d35be7975dc --- /dev/null +++ b/core/Command/TaskProcessing/WorkerCommand.php @@ -0,0 +1,150 @@ +setName('taskprocessing:worker') + ->setDescription('Run a dedicated worker for synchronous TaskProcessing providers') + ->addOption( + 'timeout', + 't', + InputOption::VALUE_OPTIONAL, + 'Duration in seconds after which the worker exits (0 = run indefinitely)', + 0 + ) + ->addOption( + 'interval', + 'i', + InputOption::VALUE_OPTIONAL, + 'Sleep duration in seconds between polling iterations when no task was processed', + 1 + ) + ->addOption( + 'once', + null, + InputOption::VALUE_NONE, + 'Process at most one task then exit' + ); + parent::configure(); + } + + protected function execute(InputInterface $input, OutputInterface $output): int { + $startTime = time(); + $timeout = (int)$input->getOption('timeout'); + $interval = (int)$input->getOption('interval'); + $once = $input->getOption('once') === true; + + if ($timeout > 0) { + $output->writeln('Task processing worker will stop after ' . $timeout . ' seconds'); + } + + while (true) { + // Stop if timeout exceeded + if ($timeout > 0 && ($startTime + $timeout) < time()) { + $output->writeln('Timeout reached, exiting...', OutputInterface::VERBOSITY_VERBOSE); + break; + } + + // Handle SIGTERM/SIGINT gracefully + try { + $this->abortIfInterrupted(); + } catch (InterruptedException $e) { + $output->writeln('Task processing worker stopped'); + break; + } + + $processedTask = $this->processNextTask($output); + + if ($once) { + break; + } + + if (!$processedTask) { + $output->writeln('No task processed, waiting ' . $interval . ' second(s)...', OutputInterface::VERBOSITY_VERBOSE); + sleep($interval); + } + } + + return 0; + } + + /** + * Attempt to process one task across all preferred synchronous providers. + * + * @return bool True if a task was processed, false if no task was found + */ + private function processNextTask(OutputInterface $output): bool { + $providers = $this->taskProcessingManager->getProviders(); + + foreach ($providers as $provider) { + if (!$provider instanceof ISynchronousProvider) { + continue; + } + + $taskTypeId = $provider->getTaskTypeId(); + + // Only use this provider if it is the preferred one for the task type + try { + $preferredProvider = $this->taskProcessingManager->getPreferredProvider($taskTypeId); + } catch (Exception $e) { + $this->logger->error('Failed to get preferred provider for task type ' . $taskTypeId, ['exception' => $e]); + continue; + } + + if ($provider->getId() !== $preferredProvider->getId()) { + continue; + } + + try { + $task = $this->taskProcessingManager->getNextScheduledTask([$taskTypeId]); + } catch (NotFoundException) { + continue; + } catch (Exception $e) { + $this->logger->error('Unknown error while retrieving scheduled TaskProcessing tasks', ['exception' => $e]); + continue; + } + + $output->writeln( + 'Processing task ' . $task->getId() . ' of type ' . $taskTypeId . ' with provider ' . $provider->getId(), + OutputInterface::VERBOSITY_VERBOSE + ); + + $this->taskProcessingManager->processTask($task, $provider); + + $output->writeln( + 'Finished processing task ' . $task->getId(), + OutputInterface::VERBOSITY_VERBOSE + ); + + return true; + } + + return false; + } +} diff --git a/core/register_command.php b/core/register_command.php index 58aed05ba68a6..d28c1633c62bb 100644 --- a/core/register_command.php +++ b/core/register_command.php @@ -91,6 +91,7 @@ use OC\Core\Command\TaskProcessing\EnabledCommand; use OC\Core\Command\TaskProcessing\GetCommand; use OC\Core\Command\TaskProcessing\Statistics; +use OC\Core\Command\TaskProcessing\WorkerCommand; use OC\Core\Command\TwoFactorAuth\Cleanup; use OC\Core\Command\TwoFactorAuth\Enforce; use OC\Core\Command\TwoFactorAuth\State; @@ -255,6 +256,7 @@ $application->add(Server::get(Command\TaskProcessing\ListCommand::class)); $application->add(Server::get(Statistics::class)); $application->add(Server::get(Command\TaskProcessing\Cleanup::class)); + $application->add(Server::get(WorkerCommand::class)); $application->add(Server::get(RedisCommand::class)); $application->add(Server::get(DistributedClear::class)); diff --git a/tests/Core/Command/TaskProcessing/WorkerCommandTest.php b/tests/Core/Command/TaskProcessing/WorkerCommandTest.php new file mode 100644 index 0000000000000..71ebdee0dbc95 --- /dev/null +++ b/tests/Core/Command/TaskProcessing/WorkerCommandTest.php @@ -0,0 +1,264 @@ +manager = $this->createMock(IManager::class); + $this->logger = $this->createMock(LoggerInterface::class); + $this->command = new WorkerCommand($this->manager, $this->logger); + } + + /** + * Helper to create a minimal ISynchronousProvider mock. + */ + private function createProvider(string $id, string $taskTypeId): ISynchronousProvider&MockObject { + $provider = $this->createMock(ISynchronousProvider::class); + $provider->method('getId')->willReturn($id); + $provider->method('getTaskTypeId')->willReturn($taskTypeId); + return $provider; + } + + /** + * Helper to create a Task mock with an id. + */ + private function createTask(int $id): Task&MockObject { + $task = $this->createMock(Task::class); + $task->method('getId')->willReturn($id); + return $task; + } + + public function testOnceExitsAfterNoTask(): void { + $this->manager->expects($this->once()) + ->method('getProviders') + ->willReturn([]); + + $input = new ArrayInput(['--once' => true], $this->command->getDefinition()); + $output = new NullOutput(); + + $result = $this->command->run($input, $output); + + $this->assertSame(0, $result); + } + + public function testOnceProcessesOneTask(): void { + $taskTypeId = 'test_task_type'; + $provider = $this->createProvider('test_provider', $taskTypeId); + $task = $this->createTask(42); + + $this->manager->expects($this->once()) + ->method('getProviders') + ->willReturn([$provider]); + + $this->manager->expects($this->once()) + ->method('getPreferredProvider') + ->with($taskTypeId) + ->willReturn($provider); + + $this->manager->expects($this->once()) + ->method('getNextScheduledTask') + ->with([$taskTypeId]) + ->willReturn($task); + + $this->manager->expects($this->once()) + ->method('processTask') + ->with($task, $provider) + ->willReturn(true); + + $input = new ArrayInput(['--once' => true], $this->command->getDefinition()); + $output = new NullOutput(); + + $result = $this->command->run($input, $output); + + $this->assertSame(0, $result); + } + + public function testSkipsNonSynchronousProviders(): void { + // A provider that is NOT an ISynchronousProvider + $nonSyncProvider = $this->createMock(\OCP\TaskProcessing\IProvider::class); + $nonSyncProvider->method('getId')->willReturn('non_sync_provider'); + $nonSyncProvider->method('getTaskTypeId')->willReturn('some_type'); + + $this->manager->expects($this->once()) + ->method('getProviders') + ->willReturn([$nonSyncProvider]); + + $this->manager->expects($this->never()) + ->method('getPreferredProvider'); + + $this->manager->expects($this->never()) + ->method('getNextScheduledTask'); + + $input = new ArrayInput(['--once' => true], $this->command->getDefinition()); + $output = new NullOutput(); + + $result = $this->command->run($input, $output); + + $this->assertSame(0, $result); + } + + public function testSkipsNonPreferredProviders(): void { + $taskTypeId = 'test_task_type'; + $provider = $this->createProvider('provider_a', $taskTypeId); + $preferredProvider = $this->createProvider('provider_b', $taskTypeId); + + $this->manager->expects($this->once()) + ->method('getProviders') + ->willReturn([$provider]); + + $this->manager->expects($this->once()) + ->method('getPreferredProvider') + ->with($taskTypeId) + ->willReturn($preferredProvider); + + // provider_a is not preferred (provider_b is), so getNextScheduledTask is never called + $this->manager->expects($this->never()) + ->method('getNextScheduledTask'); + + $input = new ArrayInput(['--once' => true], $this->command->getDefinition()); + $output = new NullOutput(); + + $result = $this->command->run($input, $output); + + $this->assertSame(0, $result); + } + + public function testContinuesWhenNoTaskFound(): void { + $taskTypeId = 'test_task_type'; + $provider = $this->createProvider('test_provider', $taskTypeId); + + $this->manager->expects($this->once()) + ->method('getProviders') + ->willReturn([$provider]); + + $this->manager->expects($this->once()) + ->method('getPreferredProvider') + ->with($taskTypeId) + ->willReturn($provider); + + $this->manager->expects($this->once()) + ->method('getNextScheduledTask') + ->with([$taskTypeId]) + ->willThrowException(new NotFoundException()); + + $this->manager->expects($this->never()) + ->method('processTask'); + + $input = new ArrayInput(['--once' => true], $this->command->getDefinition()); + $output = new NullOutput(); + + $result = $this->command->run($input, $output); + + $this->assertSame(0, $result); + } + + public function testLogsErrorAndContinuesOnException(): void { + $taskTypeId = 'test_task_type'; + $provider = $this->createProvider('test_provider', $taskTypeId); + + $this->manager->expects($this->once()) + ->method('getProviders') + ->willReturn([$provider]); + + $this->manager->expects($this->once()) + ->method('getPreferredProvider') + ->with($taskTypeId) + ->willReturn($provider); + + $exception = new Exception('DB error'); + $this->manager->expects($this->once()) + ->method('getNextScheduledTask') + ->with([$taskTypeId]) + ->willThrowException($exception); + + $this->logger->expects($this->once()) + ->method('error') + ->with('Unknown error while retrieving scheduled TaskProcessing tasks', ['exception' => $exception]); + + $this->manager->expects($this->never()) + ->method('processTask'); + + $input = new ArrayInput(['--once' => true], $this->command->getDefinition()); + $output = new NullOutput(); + + $result = $this->command->run($input, $output); + + $this->assertSame(0, $result); + } + + public function testTimeoutExitsLoop(): void { + // Arrange: no providers so each iteration does nothing, but timeout=1 should exit quickly + $this->manager->method('getProviders')->willReturn([]); + + $input = new ArrayInput(['--timeout' => '1', '--interval' => '0'], $this->command->getDefinition()); + $output = new NullOutput(); + + $start = time(); + $result = $this->command->run($input, $output); + $elapsed = time() - $start; + + $this->assertSame(0, $result); + // Should have exited within a few seconds + $this->assertLessThanOrEqual(5, $elapsed); + } + + public function testProcessesFirstMatchingProvider(): void { + $taskTypeId1 = 'type_a'; + $taskTypeId2 = 'type_b'; + + $provider1 = $this->createProvider('provider_a', $taskTypeId1); + $provider2 = $this->createProvider('provider_b', $taskTypeId2); + $task = $this->createTask(7); + + $this->manager->expects($this->once()) + ->method('getProviders') + ->willReturn([$provider1, $provider2]); + + $this->manager->expects($this->once()) + ->method('getPreferredProvider') + ->with($taskTypeId1) + ->willReturn($provider1); + + $this->manager->expects($this->once()) + ->method('getNextScheduledTask') + ->with([$taskTypeId1]) + ->willReturn($task); + + $this->manager->expects($this->once()) + ->method('processTask') + ->with($task, $provider1) + ->willReturn(true); + + $input = new ArrayInput(['--once' => true], $this->command->getDefinition()); + $output = new NullOutput(); + + $result = $this->command->run($input, $output); + + $this->assertSame(0, $result); + } +} From 7a4b57684e23f5a8926a238c2c8d40d73ff58f5e Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Tue, 17 Mar 2026 19:32:06 +0000 Subject: [PATCH 2/8] feat(taskprocessing): Add --taskTypes whitelist option to taskprocessing:worker command Co-authored-by: marcelklehr <986878+marcelklehr@users.noreply.github.com> --- core/Command/TaskProcessing/WorkerCommand.php | 18 +++- .../TaskProcessing/WorkerCommandTest.php | 91 +++++++++++++++++++ 2 files changed, 107 insertions(+), 2 deletions(-) diff --git a/core/Command/TaskProcessing/WorkerCommand.php b/core/Command/TaskProcessing/WorkerCommand.php index 29d35be7975dc..569045ef3af08 100644 --- a/core/Command/TaskProcessing/WorkerCommand.php +++ b/core/Command/TaskProcessing/WorkerCommand.php @@ -50,6 +50,12 @@ protected function configure(): void { null, InputOption::VALUE_NONE, 'Process at most one task then exit' + ) + ->addOption( + 'taskTypes', + null, + InputOption::VALUE_REQUIRED | InputOption::VALUE_IS_ARRAY, + 'Only process tasks of the given task type IDs (can be specified multiple times)' ); parent::configure(); } @@ -59,6 +65,8 @@ protected function execute(InputInterface $input, OutputInterface $output): int $timeout = (int)$input->getOption('timeout'); $interval = (int)$input->getOption('interval'); $once = $input->getOption('once') === true; + /** @var list $taskTypes */ + $taskTypes = $input->getOption('taskTypes'); if ($timeout > 0) { $output->writeln('Task processing worker will stop after ' . $timeout . ' seconds'); @@ -79,7 +87,7 @@ protected function execute(InputInterface $input, OutputInterface $output): int break; } - $processedTask = $this->processNextTask($output); + $processedTask = $this->processNextTask($output, $taskTypes); if ($once) { break; @@ -97,9 +105,10 @@ protected function execute(InputInterface $input, OutputInterface $output): int /** * Attempt to process one task across all preferred synchronous providers. * + * @param list $taskTypes When non-empty, only providers for these task type IDs are considered. * @return bool True if a task was processed, false if no task was found */ - private function processNextTask(OutputInterface $output): bool { + private function processNextTask(OutputInterface $output, array $taskTypes = []): bool { $providers = $this->taskProcessingManager->getProviders(); foreach ($providers as $provider) { @@ -109,6 +118,11 @@ private function processNextTask(OutputInterface $output): bool { $taskTypeId = $provider->getTaskTypeId(); + // If a task type whitelist was provided, skip providers not in the list + if (!empty($taskTypes) && !in_array($taskTypeId, $taskTypes, true)) { + continue; + } + // Only use this provider if it is the preferred one for the task type try { $preferredProvider = $this->taskProcessingManager->getPreferredProvider($taskTypeId); diff --git a/tests/Core/Command/TaskProcessing/WorkerCommandTest.php b/tests/Core/Command/TaskProcessing/WorkerCommandTest.php index 71ebdee0dbc95..7765a9a85f1f6 100644 --- a/tests/Core/Command/TaskProcessing/WorkerCommandTest.php +++ b/tests/Core/Command/TaskProcessing/WorkerCommandTest.php @@ -261,4 +261,95 @@ public function testProcessesFirstMatchingProvider(): void { $this->assertSame(0, $result); } + + public function testTaskTypesWhitelistFiltersProviders(): void { + $taskTypeId1 = 'type_a'; + $taskTypeId2 = 'type_b'; + + $provider1 = $this->createProvider('provider_a', $taskTypeId1); + $provider2 = $this->createProvider('provider_b', $taskTypeId2); + $task = $this->createTask(99); + + $this->manager->expects($this->once()) + ->method('getProviders') + ->willReturn([$provider1, $provider2]); + + // Only type_b is whitelisted, so provider_a (type_a) must be skipped entirely + $this->manager->expects($this->once()) + ->method('getPreferredProvider') + ->with($taskTypeId2) + ->willReturn($provider2); + + $this->manager->expects($this->once()) + ->method('getNextScheduledTask') + ->with([$taskTypeId2]) + ->willReturn($task); + + $this->manager->expects($this->once()) + ->method('processTask') + ->with($task, $provider2) + ->willReturn(true); + + $input = new ArrayInput(['--once' => true, '--taskTypes' => [$taskTypeId2]], $this->command->getDefinition()); + $output = new NullOutput(); + + $result = $this->command->run($input, $output); + + $this->assertSame(0, $result); + } + + public function testTaskTypesWhitelistWithNoMatchingProviders(): void { + $provider = $this->createProvider('provider_a', 'type_a'); + + $this->manager->expects($this->once()) + ->method('getProviders') + ->willReturn([$provider]); + + // Whitelist does not include type_a so nothing should be processed + $this->manager->expects($this->never()) + ->method('getPreferredProvider'); + + $this->manager->expects($this->never()) + ->method('getNextScheduledTask'); + + $input = new ArrayInput(['--once' => true, '--taskTypes' => ['type_b']], $this->command->getDefinition()); + $output = new NullOutput(); + + $result = $this->command->run($input, $output); + + $this->assertSame(0, $result); + } + + public function testEmptyTaskTypesAllowsAllProviders(): void { + $taskTypeId = 'type_a'; + $provider = $this->createProvider('provider_a', $taskTypeId); + $task = $this->createTask(5); + + $this->manager->expects($this->once()) + ->method('getProviders') + ->willReturn([$provider]); + + $this->manager->expects($this->once()) + ->method('getPreferredProvider') + ->with($taskTypeId) + ->willReturn($provider); + + $this->manager->expects($this->once()) + ->method('getNextScheduledTask') + ->with([$taskTypeId]) + ->willReturn($task); + + $this->manager->expects($this->once()) + ->method('processTask') + ->with($task, $provider) + ->willReturn(true); + + // No --taskTypes option provided + $input = new ArrayInput(['--once' => true], $this->command->getDefinition()); + $output = new NullOutput(); + + $result = $this->command->run($input, $output); + + $this->assertSame(0, $result); + } } From 6795544f205632c58fefe6ba3c13f494aa878022 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Tue, 17 Mar 2026 19:53:50 +0000 Subject: [PATCH 3/8] fix: Fix Task mock error: use real Task instances; run autoloaderchecker Co-authored-by: marcelklehr <986878+marcelklehr@users.noreply.github.com> --- lib/composer/composer/autoload_classmap.php | 1 + lib/composer/composer/autoload_static.php | 1 + tests/Core/Command/TaskProcessing/WorkerCommandTest.php | 8 ++++---- 3 files changed, 6 insertions(+), 4 deletions(-) diff --git a/lib/composer/composer/autoload_classmap.php b/lib/composer/composer/autoload_classmap.php index 22c934ad525f3..ba1487488fda9 100644 --- a/lib/composer/composer/autoload_classmap.php +++ b/lib/composer/composer/autoload_classmap.php @@ -1416,6 +1416,7 @@ 'OC\\Core\\Command\\TaskProcessing\\GetCommand' => $baseDir . '/core/Command/TaskProcessing/GetCommand.php', 'OC\\Core\\Command\\TaskProcessing\\ListCommand' => $baseDir . '/core/Command/TaskProcessing/ListCommand.php', 'OC\\Core\\Command\\TaskProcessing\\Statistics' => $baseDir . '/core/Command/TaskProcessing/Statistics.php', + 'OC\\Core\\Command\\TaskProcessing\\WorkerCommand' => $baseDir . '/core/Command/TaskProcessing/WorkerCommand.php', 'OC\\Core\\Command\\TwoFactorAuth\\Base' => $baseDir . '/core/Command/TwoFactorAuth/Base.php', 'OC\\Core\\Command\\TwoFactorAuth\\Cleanup' => $baseDir . '/core/Command/TwoFactorAuth/Cleanup.php', 'OC\\Core\\Command\\TwoFactorAuth\\Disable' => $baseDir . '/core/Command/TwoFactorAuth/Disable.php', diff --git a/lib/composer/composer/autoload_static.php b/lib/composer/composer/autoload_static.php index 2e1dfc16d709d..993dbfc52e930 100644 --- a/lib/composer/composer/autoload_static.php +++ b/lib/composer/composer/autoload_static.php @@ -1457,6 +1457,7 @@ class ComposerStaticInit749170dad3f5e7f9ca158f5a9f04f6a2 'OC\\Core\\Command\\TaskProcessing\\GetCommand' => __DIR__ . '/../../..' . '/core/Command/TaskProcessing/GetCommand.php', 'OC\\Core\\Command\\TaskProcessing\\ListCommand' => __DIR__ . '/../../..' . '/core/Command/TaskProcessing/ListCommand.php', 'OC\\Core\\Command\\TaskProcessing\\Statistics' => __DIR__ . '/../../..' . '/core/Command/TaskProcessing/Statistics.php', + 'OC\\Core\\Command\\TaskProcessing\\WorkerCommand' => __DIR__ . '/../../..' . '/core/Command/TaskProcessing/WorkerCommand.php', 'OC\\Core\\Command\\TwoFactorAuth\\Base' => __DIR__ . '/../../..' . '/core/Command/TwoFactorAuth/Base.php', 'OC\\Core\\Command\\TwoFactorAuth\\Cleanup' => __DIR__ . '/../../..' . '/core/Command/TwoFactorAuth/Cleanup.php', 'OC\\Core\\Command\\TwoFactorAuth\\Disable' => __DIR__ . '/../../..' . '/core/Command/TwoFactorAuth/Disable.php', diff --git a/tests/Core/Command/TaskProcessing/WorkerCommandTest.php b/tests/Core/Command/TaskProcessing/WorkerCommandTest.php index 7765a9a85f1f6..8d516460aad93 100644 --- a/tests/Core/Command/TaskProcessing/WorkerCommandTest.php +++ b/tests/Core/Command/TaskProcessing/WorkerCommandTest.php @@ -45,11 +45,11 @@ private function createProvider(string $id, string $taskTypeId): ISynchronousPro } /** - * Helper to create a Task mock with an id. + * Helper to create a real Task with a given id. */ - private function createTask(int $id): Task&MockObject { - $task = $this->createMock(Task::class); - $task->method('getId')->willReturn($id); + private function createTask(int $id): Task { + $task = new Task('test_task_type', [], 'testapp', null); + $task->setId($id); return $task; } From 2d24349ae547f0c6a83e245e82cd41c305fb5667 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Wed, 18 Mar 2026 07:17:44 +0000 Subject: [PATCH 4/8] fix: Fix task type starvation in WorkerCommand::processNextTask by shuffling providers Co-authored-by: marcelklehr <986878+marcelklehr@users.noreply.github.com> --- core/Command/TaskProcessing/WorkerCommand.php | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/core/Command/TaskProcessing/WorkerCommand.php b/core/Command/TaskProcessing/WorkerCommand.php index 569045ef3af08..7e370c1ecb606 100644 --- a/core/Command/TaskProcessing/WorkerCommand.php +++ b/core/Command/TaskProcessing/WorkerCommand.php @@ -110,6 +110,10 @@ protected function execute(InputInterface $input, OutputInterface $output): int */ private function processNextTask(OutputInterface $output, array $taskTypes = []): bool { $providers = $this->taskProcessingManager->getProviders(); + // Shuffle providers to avoid starvation: if providers are always iterated + // in the same order, a provider with a constant stream of tasks would + // prevent all subsequent providers from ever being processed. + shuffle($providers); foreach ($providers as $provider) { if (!$provider instanceof ISynchronousProvider) { From f212dc4664c770886619a73eaa63e6530ae63079 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Wed, 18 Mar 2026 13:35:10 +0000 Subject: [PATCH 5/8] fix: Fix task type starvation: collect all eligible task types then pick the oldest task Co-authored-by: marcelklehr <986878+marcelklehr@users.noreply.github.com> --- core/Command/TaskProcessing/WorkerCommand.php | 62 ++++++++++++------- 1 file changed, 38 insertions(+), 24 deletions(-) diff --git a/core/Command/TaskProcessing/WorkerCommand.php b/core/Command/TaskProcessing/WorkerCommand.php index 7e370c1ecb606..6f419daab3ed4 100644 --- a/core/Command/TaskProcessing/WorkerCommand.php +++ b/core/Command/TaskProcessing/WorkerCommand.php @@ -105,16 +105,20 @@ protected function execute(InputInterface $input, OutputInterface $output): int /** * Attempt to process one task across all preferred synchronous providers. * + * To avoid starvation, all eligible task types are first collected and then + * the oldest scheduled task across all of them is fetched in a single query. + * This ensures that tasks are processed in the order they were scheduled, + * regardless of which provider handles them. + * * @param list $taskTypes When non-empty, only providers for these task type IDs are considered. * @return bool True if a task was processed, false if no task was found */ private function processNextTask(OutputInterface $output, array $taskTypes = []): bool { $providers = $this->taskProcessingManager->getProviders(); - // Shuffle providers to avoid starvation: if providers are always iterated - // in the same order, a provider with a constant stream of tasks would - // prevent all subsequent providers from ever being processed. - shuffle($providers); + // Build a map of eligible taskTypeId => provider for all preferred synchronous providers + /** @var array $eligibleProviders */ + $eligibleProviders = []; foreach ($providers as $provider) { if (!$provider instanceof ISynchronousProvider) { continue; @@ -139,30 +143,40 @@ private function processNextTask(OutputInterface $output, array $taskTypes = []) continue; } - try { - $task = $this->taskProcessingManager->getNextScheduledTask([$taskTypeId]); - } catch (NotFoundException) { - continue; - } catch (Exception $e) { - $this->logger->error('Unknown error while retrieving scheduled TaskProcessing tasks', ['exception' => $e]); - continue; - } + $eligibleProviders[$taskTypeId] = $provider; + } - $output->writeln( - 'Processing task ' . $task->getId() . ' of type ' . $taskTypeId . ' with provider ' . $provider->getId(), - OutputInterface::VERBOSITY_VERBOSE - ); + if (empty($eligibleProviders)) { + return false; + } - $this->taskProcessingManager->processTask($task, $provider); + // Fetch the oldest scheduled task across all eligible task types in one query. + // This naturally prevents starvation: regardless of how many tasks one provider + // has queued, another provider's older tasks will be picked up first. + try { + $task = $this->taskProcessingManager->getNextScheduledTask(array_keys($eligibleProviders)); + } catch (NotFoundException) { + return false; + } catch (Exception $e) { + $this->logger->error('Unknown error while retrieving scheduled TaskProcessing tasks', ['exception' => $e]); + return false; + } - $output->writeln( - 'Finished processing task ' . $task->getId(), - OutputInterface::VERBOSITY_VERBOSE - ); + $taskTypeId = $task->getTaskTypeId(); + $provider = $eligibleProviders[$taskTypeId]; - return true; - } + $output->writeln( + 'Processing task ' . $task->getId() . ' of type ' . $taskTypeId . ' with provider ' . $provider->getId(), + OutputInterface::VERBOSITY_VERBOSE + ); + + $this->taskProcessingManager->processTask($task, $provider); + + $output->writeln( + 'Finished processing task ' . $task->getId(), + OutputInterface::VERBOSITY_VERBOSE + ); - return false; + return true; } } From 47e96a0639abf26300997377d5ac10d733c4bb88 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Wed, 18 Mar 2026 18:49:48 +0000 Subject: [PATCH 6/8] test(taskprocessing): fix broken multi-type assertions and add starvation-prevention test Co-authored-by: marcelklehr <986878+marcelklehr@users.noreply.github.com> --- .../TaskProcessing/WorkerCommandTest.php | 67 ++++++++++++++++--- 1 file changed, 56 insertions(+), 11 deletions(-) diff --git a/tests/Core/Command/TaskProcessing/WorkerCommandTest.php b/tests/Core/Command/TaskProcessing/WorkerCommandTest.php index 8d516460aad93..a43f9b83f8f1d 100644 --- a/tests/Core/Command/TaskProcessing/WorkerCommandTest.php +++ b/tests/Core/Command/TaskProcessing/WorkerCommandTest.php @@ -45,10 +45,10 @@ private function createProvider(string $id, string $taskTypeId): ISynchronousPro } /** - * Helper to create a real Task with a given id. + * Helper to create a real Task with a given id and optional task type. */ - private function createTask(int $id): Task { - $task = new Task('test_task_type', [], 'testapp', null); + private function createTask(int $id, string $type = 'test_task_type'): Task { + $task = new Task($type, [], 'testapp', null); $task->setId($id); return $task; } @@ -227,26 +227,31 @@ public function testTimeoutExitsLoop(): void { $this->assertLessThanOrEqual(5, $elapsed); } - public function testProcessesFirstMatchingProvider(): void { + public function testProcessesCorrectProviderForReturnedTaskType(): void { $taskTypeId1 = 'type_a'; $taskTypeId2 = 'type_b'; $provider1 = $this->createProvider('provider_a', $taskTypeId1); $provider2 = $this->createProvider('provider_b', $taskTypeId2); - $task = $this->createTask(7); + // Task has type_a, so provider1 must be chosen to process it + $task = $this->createTask(7, $taskTypeId1); $this->manager->expects($this->once()) ->method('getProviders') ->willReturn([$provider1, $provider2]); - $this->manager->expects($this->once()) + // Both providers are eligible, so getPreferredProvider is called for each + $this->manager->expects($this->exactly(2)) ->method('getPreferredProvider') - ->with($taskTypeId1) - ->willReturn($provider1); + ->willReturnMap([ + [$taskTypeId1, $provider1], + [$taskTypeId2, $provider2], + ]); + // All eligible task types are passed in a single query $this->manager->expects($this->once()) ->method('getNextScheduledTask') - ->with([$taskTypeId1]) + ->with($this->equalTo([$taskTypeId1, $taskTypeId2])) ->willReturn($task); $this->manager->expects($this->once()) @@ -262,13 +267,53 @@ public function testProcessesFirstMatchingProvider(): void { $this->assertSame(0, $result); } + public function testPicksOldestTaskAcrossMultipleEligibleProviders(): void { + $taskTypeId1 = 'type_a'; + $taskTypeId2 = 'type_b'; + + $provider1 = $this->createProvider('provider_a', $taskTypeId1); + $provider2 = $this->createProvider('provider_b', $taskTypeId2); + // getNextScheduledTask returns a type_b task (the globally oldest one) + $task = $this->createTask(3, $taskTypeId2); + + $this->manager->expects($this->once()) + ->method('getProviders') + ->willReturn([$provider1, $provider2]); + + $this->manager->expects($this->exactly(2)) + ->method('getPreferredProvider') + ->willReturnMap([ + [$taskTypeId1, $provider1], + [$taskTypeId2, $provider2], + ]); + + // Both eligible types are queried together to prevent starvation + $this->manager->expects($this->once()) + ->method('getNextScheduledTask') + ->with($this->equalTo([$taskTypeId1, $taskTypeId2])) + ->willReturn($task); + + // provider2 must handle the task because the task has type_b + $this->manager->expects($this->once()) + ->method('processTask') + ->with($task, $provider2) + ->willReturn(true); + + $input = new ArrayInput(['--once' => true], $this->command->getDefinition()); + $output = new NullOutput(); + + $result = $this->command->run($input, $output); + + $this->assertSame(0, $result); + } + public function testTaskTypesWhitelistFiltersProviders(): void { $taskTypeId1 = 'type_a'; $taskTypeId2 = 'type_b'; $provider1 = $this->createProvider('provider_a', $taskTypeId1); $provider2 = $this->createProvider('provider_b', $taskTypeId2); - $task = $this->createTask(99); + $task = $this->createTask(99, $taskTypeId2); $this->manager->expects($this->once()) ->method('getProviders') @@ -323,7 +368,7 @@ public function testTaskTypesWhitelistWithNoMatchingProviders(): void { public function testEmptyTaskTypesAllowsAllProviders(): void { $taskTypeId = 'type_a'; $provider = $this->createProvider('provider_a', $taskTypeId); - $task = $this->createTask(5); + $task = $this->createTask(5, $taskTypeId); $this->manager->expects($this->once()) ->method('getProviders') From a7b18cef003d649e878619d1db321a1d7eb9f44d Mon Sep 17 00:00:00 2001 From: Marcel Klehr Date: Thu, 19 Mar 2026 09:39:02 +0100 Subject: [PATCH 7/8] chore: Address review comments Signed-off-by: Marcel Klehr --- core/Command/TaskProcessing/WorkerCommand.php | 2 +- .../TaskProcessing/WorkerCommandTest.php | 40 ------------------- 2 files changed, 1 insertion(+), 41 deletions(-) diff --git a/core/Command/TaskProcessing/WorkerCommand.php b/core/Command/TaskProcessing/WorkerCommand.php index 6f419daab3ed4..653945a99741b 100644 --- a/core/Command/TaskProcessing/WorkerCommand.php +++ b/core/Command/TaskProcessing/WorkerCommand.php @@ -35,7 +35,7 @@ protected function configure(): void { 'timeout', 't', InputOption::VALUE_OPTIONAL, - 'Duration in seconds after which the worker exits (0 = run indefinitely)', + 'Duration in seconds after which the worker exits (0 = run indefinitely). You should regularly (e.g. every 5 minutes) restart this worker by using this option to make sure it picks up configuration changes.', 0 ) ->addOption( diff --git a/tests/Core/Command/TaskProcessing/WorkerCommandTest.php b/tests/Core/Command/TaskProcessing/WorkerCommandTest.php index a43f9b83f8f1d..2df0195d6a3ac 100644 --- a/tests/Core/Command/TaskProcessing/WorkerCommandTest.php +++ b/tests/Core/Command/TaskProcessing/WorkerCommandTest.php @@ -267,46 +267,6 @@ public function testProcessesCorrectProviderForReturnedTaskType(): void { $this->assertSame(0, $result); } - public function testPicksOldestTaskAcrossMultipleEligibleProviders(): void { - $taskTypeId1 = 'type_a'; - $taskTypeId2 = 'type_b'; - - $provider1 = $this->createProvider('provider_a', $taskTypeId1); - $provider2 = $this->createProvider('provider_b', $taskTypeId2); - // getNextScheduledTask returns a type_b task (the globally oldest one) - $task = $this->createTask(3, $taskTypeId2); - - $this->manager->expects($this->once()) - ->method('getProviders') - ->willReturn([$provider1, $provider2]); - - $this->manager->expects($this->exactly(2)) - ->method('getPreferredProvider') - ->willReturnMap([ - [$taskTypeId1, $provider1], - [$taskTypeId2, $provider2], - ]); - - // Both eligible types are queried together to prevent starvation - $this->manager->expects($this->once()) - ->method('getNextScheduledTask') - ->with($this->equalTo([$taskTypeId1, $taskTypeId2])) - ->willReturn($task); - - // provider2 must handle the task because the task has type_b - $this->manager->expects($this->once()) - ->method('processTask') - ->with($task, $provider2) - ->willReturn(true); - - $input = new ArrayInput(['--once' => true], $this->command->getDefinition()); - $output = new NullOutput(); - - $result = $this->command->run($input, $output); - - $this->assertSame(0, $result); - } - public function testTaskTypesWhitelistFiltersProviders(): void { $taskTypeId1 = 'type_a'; $taskTypeId2 = 'type_b'; From 5ebcf58de99dc6eea04b10da393d9051bd14c461 Mon Sep 17 00:00:00 2001 From: Marcel Klehr Date: Thu, 19 Mar 2026 11:14:26 +0100 Subject: [PATCH 8/8] fix: Apply suggestions from code review Co-authored-by: Marcel Klehr Signed-off-by: Marcel Klehr --- core/Command/TaskProcessing/WorkerCommand.php | 2 +- tests/Core/Command/TaskProcessing/WorkerCommandTest.php | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/core/Command/TaskProcessing/WorkerCommand.php b/core/Command/TaskProcessing/WorkerCommand.php index 653945a99741b..09f3f3573d23c 100644 --- a/core/Command/TaskProcessing/WorkerCommand.php +++ b/core/Command/TaskProcessing/WorkerCommand.php @@ -3,7 +3,7 @@ declare(strict_types=1); /** - * SPDX-FileCopyrightText: 2024 Nextcloud GmbH and Nextcloud contributors + * SPDX-FileCopyrightText: 2026 Nextcloud GmbH and Nextcloud contributors * SPDX-License-Identifier: AGPL-3.0-or-later */ namespace OC\Core\Command\TaskProcessing; diff --git a/tests/Core/Command/TaskProcessing/WorkerCommandTest.php b/tests/Core/Command/TaskProcessing/WorkerCommandTest.php index 2df0195d6a3ac..3f472548cef49 100644 --- a/tests/Core/Command/TaskProcessing/WorkerCommandTest.php +++ b/tests/Core/Command/TaskProcessing/WorkerCommandTest.php @@ -3,7 +3,7 @@ declare(strict_types=1); /** - * SPDX-FileCopyrightText: 2024 Nextcloud GmbH and Nextcloud contributors + * SPDX-FileCopyrightText: 2026 Nextcloud GmbH and Nextcloud contributors * SPDX-License-Identifier: AGPL-3.0-or-later */