Aller au contenu principal
Version: 5.x

Tâches asynchrones (Queue)

Introduction

Les tâches asynchrones permettent de différer l'exécution de processus lourds (envoi d'emails, génération de rapports, traitement d'images) en les plaçant dans une file d'attente. Bow Framework supporte plusieurs backends de queue : Sync, Database, Redis, Beanstalkd, RabbitMQ, Amazon SQS et Kafka.

Création d'une tâche

Utilisez la commande add:task pour générer une nouvelle tâche :

php bow add:task SendWelcomeEmail

Cette commande crée le fichier app/Tasks/SendWelcomeEmail.php :

app/Tasks/SendWelcomeEmail.php
<?php

namespace App\Tasks;

use Bow\Queue\QueueTask;

class SendWelcomeEmail extends QueueTask
{
/**
* Les données de la tâche
*/
private string $email;
private string $name;

/**
* Constructeur
*/
public function __construct(string $email, string $name)
{
$this->email = $email;
$this->name = $name;
}

/**
* Exécute la tâche
*/
public function process(): void
{
// Envoyer l'email de bienvenue
Mail::to($this->email)
->subject("Bienvenue, {$this->name} !")
->send("emails.welcome", ["name" => $this->name]);
}
}

Configuration

La configuration des queues se trouve dans config/queue.php :

config/queue.php
<?php

return [
// Driver par défaut
"default" => app_env("QUEUE_DRIVER", "sync"),

"connections" => [
// Exécution synchrone (pour le développement)
"sync" => [
"queue" => "default",
],

// Base de données
"database" => [
"queue" => "default",
"table" => "queues",
],

// Redis
"redis" => [
"queue" => "default",
"block_timeout" => 5,
],

// Beanstalkd
"beanstalkd" => [
"hostname" => "127.0.0.1",
"port" => 11300,
"timeout" => 10,
"queue" => "default",
],

// RabbitMQ
"rabbitmq" => [
"queue" => "default",
"host" => app_env("RABBITMQ_HOST", "127.0.0.1"),
"port" => app_env("RABBITMQ_PORT", 5672),
"user" => app_env("RABBITMQ_USER", "guest"),
"password" => app_env("RABBITMQ_PASSWORD", "guest"),
"vhost" => app_env("RABBITMQ_VHOST", "/"),
],

// Amazon SQS
"sqs" => [
"queue" => "default",
"url" => app_env("SQS_URL"),
"region" => app_env("AWS_REGION"),
"version" => "latest",
"credentials" => [
"key" => app_env("AWS_KEY"),
"secret" => app_env("AWS_SECRET"),
],
],

// Apache Kafka
"kafka" => [
"host" => "localhost",
"port" => 9092,
"topic" => "default",
"group_id" => "bow_queue_group",
"auto_offset_reset" => "earliest",
"enable_auto_commit" => "true",
],
],
];

Migration pour le driver database

Si vous utilisez le driver database, créez la table de queue :

php bow add:migration create_queues_table
migrations/Version_CreateQueuesTable.php
<?php

use Bow\Database\Migration\Table;
use Bow\Database\Migration\Migration;

class Version_CreateQueuesTable extends Migration
{
public function up(): void
{
$this->create("queues", function (Table $table) {
$table->uuid("id")->primary();
$table->string("queue");
$table->longText("payload");
$table->integer("attempts")->default(0);
$table->integer("delay")->default(0);
$table->enum("status", ["pending", "processing", "completed", "failed"])
->default("pending");
$table->timestamp("available_at");
$table->timestamps();

$table->index("queue");
$table->index("status");
});
}

public function rollback(): void
{
$this->dropIfExists("queues");
}
}

Dispatcher une tâche

Avec la fonction helper

use App\Tasks\SendWelcomeEmail;

// Dispatcher immédiatement
queue(new SendWelcomeEmail("john@example.com", "John"));

Depuis un contrôleur

app/Controllers/UserController.php
<?php

namespace App\Controllers;

use App\Tasks\SendWelcomeEmail;
use App\Models\User;
use Bow\Http\Request;

class UserController extends Controller
{
public function store(Request $request)
{
$user = User::create([
"name" => $request->get("name"),
"email" => $request->get("email"),
"password" => app_hash($request->get("password")),
]);

// Envoyer l'email en arrière-plan
queue(new SendWelcomeEmail($user->email, $user->name));

return response_json([
"message" => "Utilisateur créé avec succès",
"user" => $user
], 201);
}
}

Propriétés des tâches

File d'attente (queue)

Par défaut, les tâches sont envoyées dans la queue default. Vous pouvez spécifier une queue différente :

class ProcessOrderTask extends QueueTask
{
protected string $queue = "orders";

// ...
}

Délai d'exécution (delay)

Différer l'exécution d'une tâche en secondes :

class SendReminderTask extends QueueTask
{
protected int $delay = 3600; // 1 heure

// ...
}

Ou dynamiquement :

$task = new SendReminderTask($userId);
$task->setDelay(1800); // 30 minutes
queue($task);

Tentatives (attempts & retry)

Configurer le nombre de tentatives et le délai entre les tentatives :

class ProcessPaymentTask extends QueueTask
{
// Nombre maximum de tentatives
protected int $attempts = 3;

// Délai entre les tentatives (en secondes)
protected int $retry = 60;

public function process(): void
{
// Traiter le paiement
}
}

Priorité

Les tâches avec une priorité plus élevée sont traitées en premier :

class UrgentNotificationTask extends QueueTask
{
protected int $priority = 10; // Haute priorité (défaut: 1)

// ...
}

Gestion des erreurs

Méthode onException

Implémentez onException() pour gérer les erreurs :

use Throwable;

class ProcessDataTask extends QueueTask
{
public function process(): void
{
// Traitement qui peut échouer
}

public function onException(Throwable $e): void
{
// Logger l'erreur
logger()->error("Échec de ProcessDataTask: " . $e->getMessage());

// Notifier l'administrateur
Mail::to("admin@example.com")
->subject("Erreur de tâche")
->send("emails.task-error", ["error" => $e]);
}
}

Supprimer une tâche échouée

Pour empêcher les nouvelles tentatives :

public function process(): void
{
try {
// Traitement
} catch (IrrecoverableException $e) {
// Ne pas réessayer cette tâche
$this->deleteTask();
throw $e;
}
}

Exécuter le worker

Commande de base

# Traiter toutes les queues
php bow worker:run

# Traiter une queue spécifique
php bow worker:run --queue=orders

# Spécifier une connexion
php bow worker:run --connection=redis

# Limiter le nombre de tâches
php bow worker:run --once

En production avec Supervisor

Créez un fichier de configuration Supervisor :

/etc/supervisor/conf.d/bow-worker.conf
[program:bow-worker]
process_name=%(program_name)s_%(process_num)02d
command=php /var/www/html/bow worker:run --queue=default
autostart=true
autorestart=true
stopasgroup=true
killasgroup=true
user=www-data
numprocs=2
redirect_stderr=true
stdout_logfile=/var/log/bow-worker.log
stopwaitsecs=3600

Démarrez le worker :

sudo supervisorctl reread
sudo supervisorctl update
sudo supervisorctl start bow-worker:*

Exemple complet

Tâche de génération de rapport

app/Tasks/GenerateReportTask.php
<?php

namespace App\Tasks;

use Bow\Queue\QueueTask;
use App\Models\Report;
use App\Services\ReportGenerator;
use Throwable;

class GenerateReportTask extends QueueTask
{
protected string $queue = "reports";
protected int $attempts = 3;
protected int $retry = 120;

private int $reportId;
private string $format;

public function __construct(int $reportId, string $format = "pdf")
{
$this->reportId = $reportId;
$this->format = $format;
}

public function process(): void
{
$report = Report::findOrFail($this->reportId);

$generator = new ReportGenerator();
$filePath = $generator->generate($report, $this->format);

$report->update([
"status" => "completed",
"file_path" => $filePath,
"generated_at" => date("Y-m-d H:i:s"),
]);

// Notifier l'utilisateur
queue(new SendReportReadyNotification(
$report->user_id,
$filePath
));
}

public function onException(Throwable $e): void
{
$report = Report::find($this->reportId);

if ($report) {
$report->update([
"status" => "failed",
"error_message" => $e->getMessage(),
]);
}

logger()->error("Échec génération rapport #{$this->reportId}: " . $e->getMessage());
}
}

Dispatcher depuis un contrôleur

app/Controllers/ReportController.php
<?php

namespace App\Controllers;

use App\Tasks\GenerateReportTask;
use App\Models\Report;
use Bow\Http\Request;

class ReportController extends Controller
{
public function generate(Request $request)
{
$report = Report::create([
"user_id" => auth()->id(),
"type" => $request->get("type"),
"parameters" => $request->get("parameters"),
"status" => "pending",
]);

// Dispatcher la tâche
queue(new GenerateReportTask($report->id, $request->get("format", "pdf")));

return response_json([
"message" => "Génération du rapport en cours",
"report_id" => $report->id,
], 202);
}
}

Intégration avec le Scheduler

Combinez les tâches avec le planificateur pour des traitements récurrents :

app/Kernel.php
public function schedules(Scheduler $schedule): void
{
// Nettoyer les anciennes sessions toutes les heures
$schedule->task(App\Tasks\CleanupSessionsTask::class)
->hourly()
->description("Nettoyer les sessions expirées");

// Générer les rapports quotidiens
$schedule->task(App\Tasks\DailyReportTask::class)
->daily()
->at("06:00")
->description("Générer les rapports quotidiens");
}

Bonnes pratiques

Recommandations
  • Tâches idempotentes : Concevez vos tâches pour qu'elles puissent être exécutées plusieurs fois sans effets secondaires.
  • Données minimales : Passez uniquement les IDs au constructeur, récupérez les données complètes dans process().
  • Gestion des erreurs : Implémentez toujours onException() pour logger et notifier les échecs.
  • Timeouts : Configurez des timeouts appropriés dans Supervisor pour éviter les tâches bloquées.
  • Monitoring : Surveillez la taille des queues et le temps de traitement en production.
  • Queues séparées : Utilisez des queues distinctes pour les tâches critiques (paiements) et non-critiques (emails).
Attention
  • Les tâches sont sérialisées : n'incluez pas d'objets non-sérialisables (connexions DB, closures).
  • En mode sync, les tâches s'exécutent de manière synchrone (utile pour le développement).
  • Assurez-vous que le worker a les mêmes dépendances et configurations que l'application.

Il manque quelque chose ?

Si vous rencontrez des problèmes avec la documentation ou si vous avez des suggestions pour améliorer la documentation ou le projet en général, veuillez déposer une issue pour nous, ou envoyer un tweet mentionnant le compte Twitter @bowframework ou sur directement sur le github.