Skip to content

feat: add support for chained jobs #54

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Apr 16, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/deptrac.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,4 @@ on:

jobs:
deptrac:
uses: codeigniter4/.github/.github/workflows/deptrac.yml@main
uses: codeigniter4/.github/.github/workflows/deptrac.yml@CI46
2 changes: 1 addition & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@
"cs": "php-cs-fixer fix --ansi --verbose --dry-run --diff",
"cs-fix": "php-cs-fixer fix --ansi --verbose --diff",
"style": "@cs-fix",
"deduplicate": "phpcpd app/ src/",
"deduplicate": "phpcpd src/ tests/",
"inspect": "deptrac analyze --cache-file=build/deptrac.cache",
"mutate": "infection --threads=2 --skip-initial-tests --coverage=build/phpunit",
"test": "phpunit"
Expand Down
15 changes: 15 additions & 0 deletions docs/basic-usage.md
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,21 @@ service('queue')->push('emails', 'email', ['message' => 'Email message goes here

We will be pushing `email` job to the `emails` queue.

### Sending chained jobs to the queue

Sending chained jobs is also simple and lets you specify the particular order of the job execution.

```php
service('queue')->chain(function($chain) {
$chain
->push('reports', 'generate-report', ['userId' => 123])
->push('emails', 'email', ['message' => 'Email message goes here', 'userId' => 123]);
});
```

In the example above, we will send jobs to the `reports` and `emails` queue. First, we will generate a report for given user with the `generate-report` job, after this, we will send an email with `email` job.
The `email` job will be executed only if the `generate-report` job was successful.

### Consuming the queue

Since we sent our sample job to queue `emails`, then we need to run the worker with the appropriate queue:
Expand Down
34 changes: 34 additions & 0 deletions docs/running-queues.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,40 @@ Note that there is no guarantee that the job will run exactly in 5 minutes. If m

We can also combine delayed jobs with priorities.

### Chained jobs

We can create sequences of jobs that run in a specific order. Each job in the chain will be executed after the previous job has completed successfully.

```php
service('queue')->chain(function($chain) {
$chain
->push('reports', 'generate-report', ['userId' => 123])
->setPriority('high') // optional
->push('emails', 'email', ['message' => 'Email message goes here', 'userId' => 123])
->setDelay(30); // optional
});
```

As you may notice, we can use the same options as in regular `push()` - we can set priority and delay, which are optional settings.

#### Important Differences from Regular `push()`

When using the `chain()` method, there are a few important differences compared to the regular `push()` method:

1. **Method Order**: Unlike the regular `push()` method where you set the priority and delay before pushing the job, in a chain you must set these properties after calling `push()` for each job:

```php
// Regular push() - priority set before pushing
service('queue')->setPriority('high')->push('queue', 'job', []);

// Chain push() - priority set after pushing
service('queue')->chain(function($chain) {
$chain->push('queue', 'job', [])->setPriority('high');
});
```

2. **Configuration Scope**: Each configuration (priority, delay) only applies to the job that was just added to the chain.

### Running many instances of the same queue

As mentioned above, sometimes we may want to have multiple instances of the same command running at the same time. The queue is safe to use in that scenario with all databases as long as you keep the `skipLocked` to `true` in the config file. Only for SQLite3 driver, this setting is not relevant as it provides atomicity without the need for explicit concurrency control.
Expand Down
4 changes: 4 additions & 0 deletions phpstan.neon.dist
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ parameters:
message: '#Call to an undefined method CodeIgniter\\Queue\\Models\\QueueJobFailedModel::truncate\(\).#'
paths:
- src/Handlers/BaseHandler.php
-
message: '#If condition is always true.#'
paths:
- src/Commands/QueueWork.php
universalObjectCratesClasses:
- CodeIgniter\Entity
- CodeIgniter\Entity\Entity
Expand Down
5 changes: 5 additions & 0 deletions rector.php
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
use Rector\EarlyReturn\Rector\Return_\PreparedValueToEarlyReturnRector;
use Rector\Php55\Rector\String_\StringClassNameToClassConstantRector;
use Rector\Php73\Rector\FuncCall\StringifyStrNeedlesRector;
use Rector\Php81\Rector\ClassMethod\NewInInitializerRector;
use Rector\PHPUnit\AnnotationsToAttributes\Rector\Class_\AnnotationWithValueToAttributeRector;
use Rector\PHPUnit\AnnotationsToAttributes\Rector\ClassMethod\DataProviderAnnotationToAttributeRector;
use Rector\PHPUnit\CodeQuality\Rector\Class_\YieldDataProviderRector;
Expand Down Expand Up @@ -93,6 +94,10 @@

// Supported from PHPUnit 10
DataProviderAnnotationToAttributeRector::class,

NewInInitializerRector::class => [
'src/Payloads/Payload.php',
],
]);

// auto import fully qualified class names
Expand Down
43 changes: 43 additions & 0 deletions src/Commands/QueueWork.php
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
use CodeIgniter\CLI\CLI;
use CodeIgniter\Queue\Config\Queue as QueueConfig;
use CodeIgniter\Queue\Entities\QueueJob;
use CodeIgniter\Queue\Payloads\PayloadMetadata;
use Exception;
use Throwable;

Expand Down Expand Up @@ -247,6 +248,11 @@ private function handleWork(QueueJob $work, QueueConfig $config, ?int $tries, ?i
service('queue')->done($work, $config->keepDoneJobs);

CLI::write('The processing of this job was successful', 'green');

// Check chained jobs
if (isset($payload['metadata']) && $payload['metadata'] !== []) {
$this->processNextJobInChain($payload['metadata']);
}
} catch (Throwable $err) {
if (isset($job) && ++$work->attempts < ($tries ?? $job->getTries())) {
// Schedule for later
Expand All @@ -262,6 +268,43 @@ private function handleWork(QueueJob $work, QueueConfig $config, ?int $tries, ?i
}
}

/**
* Process the next job in the chain
*/
private function processNextJobInChain(array $payloadMetadata): void
{
$payloadMetadata = PayloadMetadata::fromArray($payloadMetadata);

if (! $payloadMetadata->hasChainedJobs()) {
return;
}

$nextPayload = $payloadMetadata->getChainedJobs()->shift();
$priority = $nextPayload->getPriority();
$delay = $nextPayload->getDelay();

if ($priority !== null) {
service('queue')->setPriority($priority);
}

if ($delay !== null) {
service('queue')->setDelay($delay);
}

if ($payloadMetadata->hasChainedJobs()) {
$nextPayload->setChainedJobs($payloadMetadata->getChainedJobs());
}

service('queue')->push(
$nextPayload->getQueue(),
$nextPayload->getJob(),
$nextPayload->getData(),
$nextPayload->getMetadata(),
);

CLI::write(sprintf('Chained job: %s has been placed in the queue: %s', $nextPayload->getJob(), $nextPayload->getQueue()), 'green');
}

private function maxJobsCheck(int $maxJobs, int $countJobs): bool
{
if ($maxJobs > 0 && $countJobs >= $maxJobs) {
Expand Down
79 changes: 45 additions & 34 deletions src/Handlers/BaseHandler.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,16 @@

namespace CodeIgniter\Queue\Handlers;

use Closure;
use CodeIgniter\I18n\Time;
use CodeIgniter\Queue\Config\Queue as QueueConfig;
use CodeIgniter\Queue\Entities\QueueJob;
use CodeIgniter\Queue\Entities\QueueJobFailed;
use CodeIgniter\Queue\Exceptions\QueueException;
use CodeIgniter\Queue\Models\QueueJobFailedModel;
use CodeIgniter\Queue\Payloads\ChainBuilder;
use CodeIgniter\Queue\Payloads\PayloadMetadata;
use CodeIgniter\Queue\Traits\HasQueueValidation;
use ReflectionException;
use Throwable;

Expand All @@ -27,13 +31,15 @@
*/
abstract class BaseHandler
{
use HasQueueValidation;

protected QueueConfig $config;
protected ?string $priority = null;
protected ?int $delay = null;

abstract public function name(): string;

abstract public function push(string $queue, string $job, array $data): bool;
abstract public function push(string $queue, string $job, array $data, ?PayloadMetadata $metadata = null): bool;

abstract public function pop(string $queue, array $priorities): ?QueueJob;

Expand All @@ -45,38 +51,6 @@ abstract public function done(QueueJob $queueJob, bool $keepJob): bool;

abstract public function clear(?string $queue = null): bool;

/**
* Set priority for job queue.
*/
public function setPriority(string $priority): static
{
if (! preg_match('/^[a-z_-]+$/', $priority)) {
throw QueueException::forIncorrectPriorityFormat();
}

if (strlen($priority) > 64) {
throw QueueException::forTooLongPriorityName();
}

$this->priority = $priority;

return $this;
}

/**
* Set delay for job queue (in seconds).
*/
public function setDelay(int $delay): static
{
if ($delay < 0) {
throw QueueException::forIncorrectDelayValue();
}

$this->delay = $delay;

return $this;
}

/**
* Retry failed job.
*
Expand Down Expand Up @@ -104,7 +78,7 @@ public function retry(?int $id, ?string $queue): int
}

/**
* Delete failed job by ID.
* Delete a failed job by ID.
*/
public function forget(int $id): bool
{
Expand Down Expand Up @@ -150,6 +124,43 @@ public function listFailed(?string $queue): array
->findAll();
}

/**
* Set delay for job queue (in seconds).
*/
public function setDelay(int $delay): static
{
$this->validateDelay($delay);

$this->delay = $delay;

return $this;
}

/**
* Set priority for job queue.
*/
public function setPriority(string $priority): static
{
$this->validatePriority($priority);

$this->priority = $priority;

return $this;
}

/**
* Create a job chain on the specified queue
*
* @param Closure $callback Chain definition callback
*/
public function chain(Closure $callback): bool
{
$chainBuilder = new ChainBuilder($this);
$callback($chainBuilder);

return $chainBuilder->dispatch();
}

/**
* Log failed job.
*
Expand Down
7 changes: 4 additions & 3 deletions src/Handlers/DatabaseHandler.php
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@
use CodeIgniter\Queue\Enums\Status;
use CodeIgniter\Queue\Interfaces\QueueInterface;
use CodeIgniter\Queue\Models\QueueJobModel;
use CodeIgniter\Queue\Payload;
use CodeIgniter\Queue\Payloads\Payload;
use CodeIgniter\Queue\Payloads\PayloadMetadata;
use ReflectionException;
use Throwable;

Expand All @@ -46,13 +47,13 @@ public function name(): string
*
* @throws ReflectionException
*/
public function push(string $queue, string $job, array $data): bool
public function push(string $queue, string $job, array $data, ?PayloadMetadata $metadata = null): bool
{
$this->validateJobAndPriority($queue, $job);

$queueJob = new QueueJob([
'queue' => $queue,
'payload' => new Payload($job, $data),
'payload' => new Payload($job, $data, $metadata),
'priority' => $this->priority,
'status' => Status::PENDING->value,
'attempts' => 0,
Expand Down
7 changes: 4 additions & 3 deletions src/Handlers/PredisHandler.php
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@
use CodeIgniter\Queue\Entities\QueueJob;
use CodeIgniter\Queue\Enums\Status;
use CodeIgniter\Queue\Interfaces\QueueInterface;
use CodeIgniter\Queue\Payload;
use CodeIgniter\Queue\Payloads\Payload;
use CodeIgniter\Queue\Payloads\PayloadMetadata;
use Exception;
use Predis\Client;
use Throwable;
Expand Down Expand Up @@ -58,7 +59,7 @@ public function name(): string
/**
* Add job to the queue.
*/
public function push(string $queue, string $job, array $data): bool
public function push(string $queue, string $job, array $data, ?PayloadMetadata $metadata = null): bool
{
$this->validateJobAndPriority($queue, $job);

Expand All @@ -69,7 +70,7 @@ public function push(string $queue, string $job, array $data): bool
$queueJob = new QueueJob([
'id' => random_string('numeric', 16),
'queue' => $queue,
'payload' => new Payload($job, $data),
'payload' => new Payload($job, $data, $metadata),
'priority' => $this->priority,
'status' => Status::PENDING->value,
'attempts' => 0,
Expand Down
7 changes: 4 additions & 3 deletions src/Handlers/RedisHandler.php
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@
use CodeIgniter\Queue\Entities\QueueJob;
use CodeIgniter\Queue\Enums\Status;
use CodeIgniter\Queue\Interfaces\QueueInterface;
use CodeIgniter\Queue\Payload;
use CodeIgniter\Queue\Payloads\Payload;
use CodeIgniter\Queue\Payloads\PayloadMetadata;
use Redis;
use RedisException;
use Throwable;
Expand Down Expand Up @@ -75,7 +76,7 @@ public function name(): string
*
* @throws RedisException
*/
public function push(string $queue, string $job, array $data): bool
public function push(string $queue, string $job, array $data, ?PayloadMetadata $metadata = null): bool
{
$this->validateJobAndPriority($queue, $job);

Expand All @@ -86,7 +87,7 @@ public function push(string $queue, string $job, array $data): bool
$queueJob = new QueueJob([
'id' => random_string('numeric', 16),
'queue' => $queue,
'payload' => new Payload($job, $data),
'payload' => new Payload($job, $data, $metadata),
'priority' => $this->priority,
'status' => Status::PENDING->value,
'attempts' => 0,
Expand Down
Loading
Loading