Laravelのジョブ・キューを活用する方法

はじめに

Webアプリケーション開発において、ユーザー体験を向上させるためには、時間のかかる処理をバックグラウンドで実行することが重要です。メール送信、大量データの処理、外部APIとの通信など、即時の完了が必要ないタスクはバックグラウンドに移すことで、アプリケーションのレスポンス時間を短縮できます。

Laravelは、このようなバックグラウンド処理を簡単に実装できるジョブ・キューシステムを提供しています。この記事では、Laravelのジョブ・キューの基本から応用まで、実践的な例を交えて解説します。

ジョブ・キューの基本概念

ジョブ・キューは、主に以下の要素で構成されています:

  1. ジョブ: 実行すべき処理を定義したクラス
  2. キュー: ジョブを一時的に保存しておく場所
  3. ワーカー: キューからジョブを取り出して実行するプロセス
  4. 接続: キューの保存先(データベース、Redis、Amazon SQSなど)

Laravelのキューシステムを使用することで、以下のメリットが得られます:

  • ユーザーのリクエスト処理時間の短縮
  • サーバーリソースの効率的な利用
  • 処理の失敗時の再試行機能
  • 処理のスケジューリング
  • ジョブの優先順位付け

環境設定

まずは、キューの設定を行いましょう。Laravelでは、.envファイルで簡単にキュードライバを設定できます。

QUEUE_CONNECTION=database

主なキュードライバには以下のものがあります:

  • sync: 同期的に処理(開発環境向け)
  • database: データベースに保存
  • redis: Redisに保存
  • beanstalkd: Beanstalkdに保存
  • sqs: Amazon SQSに保存

データベースをキュードライバとして使用する場合は、マイグレーションを実行する必要があります:

php artisan queue:table
php artisan migrate

基本的なジョブの作成と実行

ジョブクラスの作成

Artisanコマンドを使用して、新しいジョブクラスを作成します:

php artisan make:job ProcessPodcast

これにより、app/JobsディレクトリにProcessPodcastクラスが作成されます:

<?php

namespace App\Jobs;

use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;

class ProcessPodcast implements ShouldQueue
{
    use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;

    protected $podcast;

    /**
     * Create a new job instance.
     */
    public function __construct($podcast)
    {
        $this->podcast = $podcast;
    }

    /**
     * Execute the job.
     */
    public function handle(): void
    {
        // ここにジョブの処理を記述
        \Log::info('ポッドキャスト処理開始: ' . $this->podcast->title);
        
        // 時間のかかる処理をシミュレート
        sleep(5);
        
        // ポッドキャストの処理...
        $this->podcast->processed = true;
        $this->podcast->save();
        
        \Log::info('ポッドキャスト処理完了: ' . $this->podcast->title);
    }
}

ジョブのディスパッチ(実行)

ジョブを実行(キューに追加)するには、以下のいずれかの方法を使用します:

// 方法1: dispatchメソッドを使用
ProcessPodcast::dispatch($podcast);

// 方法2: Bus Facadeを使用
use Illuminate\Support\Facades\Bus;
Bus::dispatch(new ProcessPodcast($podcast));

// 方法3: 依存性注入を使用
public function store(Request $request, Dispatcher $dispatcher)
{
    // ...
    $dispatcher->dispatch(new ProcessPodcast($podcast));
    // ...
}

キューワーカーの実行

キューに追加されたジョブを処理するためには、キューワーカーを実行する必要があります:

php artisan queue:work

本番環境では、プロセスマネージャ(Supervisor など)を使用して、キューワーカーを常時実行状態に保つことをお勧めします。

キューワーカーの設定オプション

# 特定の接続を使用
php artisan queue:work redis

# 特定のキューを処理
php artisan queue:work --queue=high,default

# 単一のジョブのみを処理して終了
php artisan queue:work --once

# メモリ使用量の上限を設定
php artisan queue:work --memory=1024

# ワーカーの最大実行時間を設定
php artisan queue:work --timeout=60

高度なジョブの設定

キュー名の指定

class ProcessPodcast implements ShouldQueue
{
    use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;

    /**
     * ジョブを投入するキュー名
     */
    public $queue = 'podcasts';

    // ...
}

または、ディスパッチ時にキューを指定することもできます:

ProcessPodcast::dispatch($podcast)->onQueue('podcasts');

接続の指定

ProcessPodcast::dispatch($podcast)->onConnection('redis');

遅延実行

// 10秒後に実行
ProcessPodcast::dispatch($podcast)->delay(now()->addSeconds(10));

ジョブチェーン(連鎖的な実行)

Bus::chain([
    new ProcessPodcast($podcast),
    new OptimizePodcast($podcast),
    new ReleasePodcast($podcast),
])->dispatch();

バッチ処理

複数のジョブをバッチとして実行し、すべてのジョブが完了した時点で何らかの処理を行うことができます:

use Illuminate\Support\Facades\Batch;
use Throwable;

$batch = Bus::batch([
    new ProcessPodcast($podcasts[0]),
    new ProcessPodcast($podcasts[1]),
    new ProcessPodcast($podcasts[2]),
])->then(function (Batch $batch) {
    // すべてのジョブが完了したとき
})->catch(function (Batch $batch, Throwable $e) {
    // ジョブのひとつが失敗したとき
})->finally(function (Batch $batch) {
    // バッチが完了または失敗したとき
})->dispatch();

// バッチのIDを保存して、後で状態を確認できるようにする
$batchId = $batch->id;

エラーハンドリングと再試行

再試行回数の指定

class ProcessPodcast implements ShouldQueue
{
    /**
     * 再試行する最大回数
     */
    public $tries = 3;

    /**
     * タイムアウトまでの秒数
     */
    public $timeout = 90;

    // ...
}

再試行間隔の指定

/**
 * ジョブを再試行するまでの秒数を計算
 */
public function backoff()
{
    return [1, 5, 10]; // 1秒後、5秒後、10秒後に再試行
}

失敗したジョブの処理

/**
 * ジョブが失敗したときの処理
 */
public function failed(Throwable $exception)
{
    // 失敗を通知または記録する
    \Log::error('ジョブが失敗しました: ' . $exception->getMessage());
    
    // 管理者に通知
    \Notification::route('mail', 'admin@example.com')
        ->notify(new JobFailedNotification($this->podcast, $exception));
}

ジョブイベントとリスナー

ジョブのライフサイクルに関連するイベントをリッスンすることができます:

// AppServiceProvider::bootメソッド内など
Queue::before(function (JobProcessing $event) {
    // ジョブ処理前の処理
});

Queue::after(function (JobProcessed $event) {
    // ジョブ処理後の処理
});

Queue::failing(function (JobFailed $event) {
    // ジョブ失敗時の処理
});

実践的な使用例

メール送信

class SendWelcomeEmail implements ShouldQueue
{
    use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;

    protected $user;

    public function __construct(User $user)
    {
        $this->user = $user;
    }

    public function handle(Mailer $mailer)
    {
        $mailer->to($this->user->email)
               ->send(new WelcomeEmail($this->user));
    }
}

// 使用例
SendWelcomeEmail::dispatch($newUser)->onQueue('emails');

画像処理

class ResizeUploadedImage implements ShouldQueue
{
    use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;

    protected $imagePath;
    protected $sizes;

    public function __construct(string $imagePath, array $sizes)
    {
        $this->imagePath = $imagePath;
        $this->sizes = $sizes;
    }

    public function handle()
    {
        $image = Image::make(storage_path('app/' . $this->imagePath));
        
        foreach ($this->sizes as $name => $size) {
            $resized = $image->resize($size['width'], $size['height']);
            $resized->save(storage_path('app/public/images/' . $name . '_' . basename($this->imagePath)));
        }
    }
}

// 使用例
ResizeUploadedImage::dispatch(
    'uploads/image.jpg',
    [
        'thumbnail' => ['width' => 100, 'height' => 100],
        'medium' => ['width' => 600, 'height' => null],
        'large' => ['width' => 1200, 'height' => null],
    ]
);

データのインポート

class ImportCsvData implements ShouldQueue
{
    use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;

    protected $filePath;
    protected $chunkSize;

    public function __construct(string $filePath, int $chunkSize = 1000)
    {
        $this->filePath = $filePath;
        $this->chunkSize = $chunkSize;
    }

    public function handle()
    {
        $csv = array_map('str_getcsv', file(storage_path('app/' . $this->filePath)));
        $header = array_shift($csv);
        
        // チャンクごとに処理
        foreach (array_chunk($csv, $this->chunkSize) as $chunk) {
            $records = [];
            foreach ($chunk as $row) {
                $records[] = array_combine($header, $row);
            }
            
            // バルクインサート
            DB::table('imported_data')->insert($records);
        }
    }
}

キューのモニタリングとデバッグ

失敗したジョブの確認

php artisan queue:failed

失敗したジョブの再実行

php artisan queue:retry all         # すべての失敗したジョブを再実行
php artisan queue:retry 5           # ID=5のジョブを再実行
php artisan queue:retry --queue=emails  # emailsキューの失敗したジョブを再実行

失敗したジョブの削除

php artisan queue:flush             # すべての失敗したジョブを削除
php artisan queue:forget 5          # ID=5のジョブを削除

キューモニタリングツール

  • Horizon: Laravelが提供するRedisキューのダッシュボード
  • Laravel Telescope: デバッグとパフォーマンス分析のためのツール

Horizonの活用

LaravelのスケーラブルなキューモニタリングツールであるHorizonを使用すると、キューの状態を視覚的に把握できます。

インストールと設定

composer require laravel/horizon
php artisan horizon:install

Horizonの設定はconfig/horizon.phpファイルで行います。

Horizonの起動

php artisan horizon

本番環境では、Supervisorなどのプロセスマネージャを使って、Horizonを常時実行状態に保つことをお勧めします。

ダッシュボードへのアクセス

/horizonパスにアクセスすると、ダッシュボードが表示されます。ダッシュボードでは、以下の情報を確認できます:

  • 処理済みジョブの数
  • 現在のキューの長さ
  • ジョブの処理時間
  • 失敗したジョブの詳細
  • ワーカープロセスの状態

まとめ

Laravelのジョブ・キューシステムを活用することで、時間のかかる処理をバックグラウンドに移し、アプリケーションのレスポンス時間を向上させることができます。基本的なジョブのディスパッチから、高度なバッチ処理、エラーハンドリング、モニタリングまで、Laravelは柔軟で強力なキューシステムを提供しています。

以下のようなケースでジョブ・キューを活用すると効果的です:

  • メール送信
  • レポート生成
  • ファイル処理
  • 外部APIとの通信
  • データのインポート/エクスポート
  • 定期的なバッチ処理

適切にジョブ・キューを設計することで、アプリケーションのスケーラビリティと信頼性が向上し、ユーザー体験の改善につながります。

コメント

タイトルとURLをコピーしました