Skip to content

队列

简介

在构建 Web 应用程序时,你可能需要执行一些耗时的任务,比如解析和存储上传的 CSV 文件。幸运的是,Laravel 允许你轻松创建可以在后台处理的队列任务。通过将耗时的任务移到队列中,你的应用程序可以以闪电般的速度响应 Web 请求,为客户提供更好的用户体验。

Laravel 队列为各种不同的队列后端提供了统一的 API,如 Amazon SQSRedis 甚至关系型数据库。

Laravel 的队列配置选项存储在你应用程序的 config/queue.php 配置文件中。在这个文件中,你会找到框架中包含的每个队列驱动程序的连接配置,包括数据库、Amazon SQSRedisBeanstalkd 驱动程序,以及一个在本地开发期间使用的同步驱动程序。还包括一个 null 队列驱动程序,它会丢弃队列任务。

NOTE

Laravel 现在提供了 Horizon,这是一个用于 Redis 驱动队列的漂亮的仪表板和配置系统。查看完整的 Horizon 文档 以获取更多信息。

连接与队列

在开始使用 Laravel 队列之前,理解"连接"和"队列"之间的区别很重要。在你的 config/queue.php 配置文件中,有一个 connections 配置数组。这个选项定义了到后端队列服务的连接,如 Amazon SQS、Beanstalk 或 Redis。然而,任何给定的队列连接可能有多个"队列",可以被认为是不同的堆栈或成堆的排队任务。

注意,queue 配置文件中的每个连接配置示例都包含一个 queue 属性。这是任务被发送到给定连接时将被分发到的默认队列。换句话说,如果你分发一个任务而没有明确定义它应该被分发到哪个队列,那么这个任务将被放置在连接配置的 queue 属性中定义的队列中:

php
    use App\Jobs\ProcessPodcast;

    // This job is sent to the default connection's default queue...
    ProcessPodcast::dispatch();

    // This job is sent to the default connection's "emails" queue...
    ProcessPodcast::dispatch()->onQueue('emails');

一些应用程序可能不需要将任务推送到多个队列,而是希望有一个简单的队列。但是,将任务推送到多个队列可以对希望优先处理或分段任务的应用程序特别有用,因为 Laravel 队列允许你指定哪些队列应该被处理,按优先级排序。例如,如果你将任务推送到 high 队列,你可以运行一个给它们更高处理优先级的工作进程:

shell
php artisan queue:work --queue=high,default

驱动程序说明与前提条件

数据库

要使用 database 队列驱动程序,你需要一个数据库表来保存任务。通常,这包含在 Laravel 的默认 0001_01_01_000002_create_jobs_table.php 数据库迁移中;但是,如果你的应用程序没有包含这个迁移,你可以使用 make:queue-table Artisan 命令来创建它:

shell
php artisan make:queue-table

php artisan migrate

Redis

要使用 redis 队列驱动程序,你应该在你的 config/database.php 配置文件中配置一个 Redis 数据库连接。

WARNING

redis 队列驱动程序不支持 serializercompression Redis 选项。

Redis 集群

如果你的 Redis 队列连接使用 Redis 集群,你的队列名称必须包含一个 键哈希标签。这是必需的,以确保给定队列的所有 Redis 键都放入同一个哈希槽中:

php
    'redis' => [
        'driver' => 'redis',
        'connection' => env('REDIS_QUEUE_CONNECTION', 'default'),
        'queue' => env('REDIS_QUEUE', '{default}'),
        'retry_after' => env('REDIS_QUEUE_RETRY_AFTER', 90),
        'block_for' => null,
        'after_commit' => false,
    ],

阻塞

当使用 Redis 队列时,你可以使用 block_for 配置选项指定驱动程序在等待作业可用之前应阻塞的时间。

根据你的队列负载,调整这个值可以比持续轮询 Redis 数据库以获取新作业更有效。例如,你可以将该值设置为 5 以指示驱动程序应在等待作业可用时阻塞五秒:

php
    'redis' => [
        'driver' => 'redis',
        'connection' => env('REDIS_QUEUE_CONNECTION', 'default'),
        'queue' => env('REDIS_QUEUE', 'default'),
        'retry_after' => env('REDIS_QUEUE_RETRY_AFTER', 90),
        'block_for' => 5,
        'after_commit' => false,
    ],

WARNING

block_for 设置为 0 将导致队列工作进程无限期阻塞,直到有作业可用。这也将阻止处理下一个作业之前处理信号,如 SIGTERM

其他驱动程序前提条件

以下依赖项是必需的,用于列出的队列驱动程序。这些依赖项可以通过 Composer 包管理器安装:

  • Amazon SQS: aws/aws-sdk-php ~3.0
  • Beanstalkd: pda/pheanstalk ~5.0
  • Redis: predis/predis ~2.0 或 phpredis PHP 扩展

创建任务

生成任务类

默认情况下,所有的可排队任务都存储在 app/Jobs 目录中。如果 app/Jobs 目录不存在,当你运行 make:job Artisan 命令时会自动创建:

shell
php artisan make:job ProcessPodcast

生成的类将实现 Illuminate\Contracts\Queue\ShouldQueue 接口,指示 Laravel 应将该任务推送到队列以异步执行。

NOTE

任务存根可以使用 存根发布 进行自定义。

类结构

任务类非常简单,通常只包含一个 handle 方法,当队列处理该任务时会调用该方法。让我们来看一个示例任务类。在这个示例中,我们假设我们管理一个播客发布服务,需要在发布之前处理上传的播客文件:

php
    <?php

    namespace App\Jobs;

    use App\Models\Podcast;
    use App\Services\AudioProcessor;
    use Illuminate\Contracts\Queue\ShouldQueue;
    use Illuminate\Foundation\Queue\Queueable;

    class ProcessPodcast implements ShouldQueue
    {
        use Queueable;

        /**
         * Create a new job instance.
         */
        public function __construct(
            public Podcast $podcast,
        ) {}

        /**
         * Execute the job.
         */
        public function handle(AudioProcessor $processor): void
        {
            // Process uploaded podcast...
        }
    }

在这个示例中,注意我们能够在排队任务的构造函数中传递一个 Eloquent 模型。由于任务使用的 Queueable trait,Eloquent 模型及其加载的关系将在任务处理时优雅地序列化和反序列化。

如果你的排队任务在其构造函数中接受一个 Eloquent 模型,那么只有该模型的标识符将被序列化到队列中。当任务实际处理时,队列系统将自动从数据库中重新检索完整的模型实例及其加载的关系。这种模型序列化方法允许发送到队列驱动程序的作业负载大小更小。

handle 方法依赖注入

当队列处理任务时,将调用 handle 方法。注意,我们能够在任务的 handle 方法上类型提示依赖项。Laravel 服务容器 会自动注入这些依赖项。

如果你想完全控制容器如何将依赖项注入 handle 方法,你可以使用容器的 bindMethod 方法。bindMethod 方法接受一个回调,该回调接收任务和容器。在回调中,你可以自由地调用 handle 方法。通常,你应该从 App\Providers\AppServiceProvider 服务提供者boot 方法中调用此方法:

php
    use App\Jobs\ProcessPodcast;
    use App\Services\AudioProcessor;
    use Illuminate\Contracts\Foundation\Application;

    $this->app->bindMethod([ProcessPodcast::class, 'handle'], function (ProcessPodcast $job, Application $app) {
        return $job->handle($app->make(AudioProcessor::class));
    });

WARNING

二进制数据,如原始图像内容,应该在传递给排队任务之前通过 base64_encode 函数传递。否则,当将作业放置到队列驱动程序时,作业可能无法正确序列化为 JSON。

排队关系

因为所有加载的 Eloquent 模型关系也会在任务排队时序列化,所以序列化的作业字符串有时可能会变得非常大。此外,当任务被反序列化并从数据库中重新检索模型关系时,它们将以其完整形式检索。任务排队过程中对模型进行序列化之前应用的任何先前关系约束都不会在任务反序列化时应用。因此,如果你希望使用给定关系的子集,你应该在排队任务中重新约束该关系。

或者,为了防止关系被序列化,你可以在设置属性值时调用模型上的 withoutRelations 方法。此方法将返回一个没有其加载关系的模型实例:

php
    /**
     * Create a new job instance.
     */
    public function __construct(Podcast $podcast)
    {
        $this->podcast = $podcast->withoutRelations();
    }

如果你使用 PHP 构造函数属性提升并且希望指示 Eloquent 模型不应序列化其关系,你可以使用 WithoutRelations 属性:

php
    use Illuminate\Queue\Attributes\WithoutRelations;

    /**
     * Create a new job instance.
     */
    public function __construct(
        #[WithoutRelations]
        public Podcast $podcast
    ) {
    }

如果一个任务接收一个 Eloquent 模型集合或数组而不是单个模型,那么该集合中的模型在任务反序列化和执行时将不会重新获取其关系。这是为了防止对处理大量模型的任务造成过多的资源使用。

唯一任务

WARNING

唯一任务需要支持 的缓存驱动程序。目前,memcachedredisdynamodbdatabasefilearray 缓存驱动程序支持原子锁。此外,唯一任务约束不适用于批处理中的任务。

有时,你可能希望确保在任何给定时间点只有一个特定任务实例在队列上。你可以通过在你的任务类上实现 ShouldBeUnique 接口来实现这一点。此接口不要求你在类上定义任何其他方法:

php
    <?php

    use Illuminate\Contracts\Queue\ShouldQueue;
    use Illuminate\Contracts\Queue\ShouldBeUnique;

    class UpdateSearchIndex implements ShouldQueue, ShouldBeUnique
    {
        ...
    }

在上面的示例中,UpdateSearchIndex 任务是唯一的。因此,如果队列上已经有同一任务的另一个实例并且尚未完成处理,那么该任务将不会被分发。

在某些情况下,你可能希望定义一个特定的"键"使任务唯一,或者你可能希望指定一个超时时间,超过该时间任务将不再保持唯一。为了实现这一点,你可以在你的任务类上定义 uniqueIduniqueFor 属性或方法:

php
    <?php

    use App\Models\Product;
    use Illuminate\Contracts\Queue\ShouldQueue;
    use Illuminate\Contracts\Queue\ShouldBeUnique;

    class UpdateSearchIndex implements ShouldQueue, ShouldBeUnique
    {
        /**
         * The product instance.
         *
         * @var \App\Product
         */
        public $product;

        /**
         * The number of seconds after which the job's unique lock will be released.
         *
         * @var int
         */
        public $uniqueFor = 3600;

        /**
         * Get the unique ID for the job.
         */
        public function uniqueId(): string
        {
            return $this->product->id;
        }
    }

在上面的示例中,UpdateSearchIndex 任务是由产品 ID 唯一的。因此,任何新分发的具有相同产品 ID 的任务都将被忽略,直到现有任务完成处理。此外,如果现有任务在一小时内未被处理,唯一锁将被释放,另一个具有相同唯一键的任务可以分发到队列中。

WARNING

如果你的应用程序从多个 Web 服务器或容器分发任务,你应该确保所有服务器都与同一中央缓存服务器通信,以便 Laravel 可以准确确定任务是否是唯一的。

保持任务唯一直到处理开始

默认情况下,唯一任务在任务完成处理或失败所有重试尝试后才会"解锁"。但是,可能会有一些情况下,你希望你的任务在处理之前立即解锁。为了实现这一点,你的任务应该实现 ShouldBeUniqueUntilProcessing 契约而不是 ShouldBeUnique 契约:

php
    <?php

    use App\Models\Product;
    use Illuminate\Contracts\Queue\ShouldQueue;
    use Illuminate\Contracts\Queue\ShouldBeUniqueUntilProcessing;

    class UpdateSearchIndex implements ShouldQueue, ShouldBeUniqueUntilProcessing
    {
        // ...
    }

唯一任务锁

在后台,当分发 ShouldBeUnique 任务时,Laravel 会尝试使用 uniqueId 键获取。如果未能获取锁,则不会分发该任务。此锁在任务完成处理或失败所有重试尝试后释放。默认情况下,Laravel 将使用默认缓存驱动程序获取此锁。但是,如果你希望使用另一个驱动程序获取锁,你可以定义一个 uniqueVia 方法,该方法返回应使用的缓存驱动程序:

php
    use Illuminate\Contracts\Cache\Repository;
    use Illuminate\Support\Facades\Cache;

    class UpdateSearchIndex implements ShouldQueue, ShouldBeUnique
    {
        ...

        /**
         * Get the cache driver for the unique job lock.
         */
        public function uniqueVia(): Repository
        {
            return Cache::driver('redis');
        }
    }

NOTE

如果你只需要限制任务的并发处理,请使用 WithoutOverlapping 任务中间件。

加密任务

Laravel 允许你通过 加密 来确保任务数据的隐私和完整性。要开始,只需在任务类上添加 ShouldBeEncrypted 接口。一旦此接口添加到类中,Laravel 将在将任务推送到队列之前自动加密你的任务:

php
    <?php

    use Illuminate\Contracts\Queue\ShouldBeEncrypted;
    use Illuminate\Contracts\Queue\ShouldQueue;

    class UpdateSearchIndex implements ShouldQueue, ShouldBeEncrypted
    {
        // ...
    }

任务中间件

任务中间件允许你在执行排队任务时包装自定义逻辑,从而减少任务本身的样板代码。例如,考虑以下 handle 方法,它利用 Laravel 的 Redis 速率限制功能,允许每五秒只有一个作业处理:

php
    use Illuminate\Support\Facades\Redis;

    /**
     * Execute the job.
     */
    public function handle(): void
    {
        Redis::throttle('key')->block(0)->allow(1)->every(5)->then(function () {
            info('Lock obtained...');

            // Handle job...
        }, function () {
            // Could not obtain lock...

            return $this->release(5);
        });
    }

虽然此代码是有效的,但 handle 方法的实现变得混乱,因为它被 Redis 速率限制逻辑淹没了。此外,任何其他我们希望速率限制的作业都必须重复此速率限制逻辑。

而不是在 handle 方法中进行速率限制,我们可以定义一个任务中间件来处理速率限制。Laravel 没有任务中间件的默认位置,因此你可以随意将任务中间件放置在应用程序的任何位置。在这个示例中,我们将把中间件放在 app/Jobs/Middleware 目录中:

php
    <?php

    namespace App\Jobs\Middleware;

    use Closure;
    use Illuminate\Support\Facades\Redis;

    class RateLimited
    {
        /**
         * Process the queued job.
         *
         * @param  \Closure(object): void  $next
         */
        public function handle(object $job, Closure $next): void
        {
            Redis::throttle('key')
                    ->block(0)->allow(1)->every(5)
                    ->then(function () use ($job, $next) {
                        // Lock obtained...

                        $next($job);
                    }, function () use ($job) {
                        // Could not obtain lock...

                        $job->release(5);
                    });
        }
    }

如你所见,就像 路由中间件 一样,任务中间件接收正在处理的任务和应调用以继续处理任务的回调。

创建任务中间件后,可以通过从任务的 middleware 方法返回它们来将它们附加到任务上。此方法不存在于由 make:job Artisan 命令生成的任务中,因此你需要手动将其添加到你的任务类中:

php
    use App\Jobs\Middleware\RateLimited;

    /**
     * Get the middleware the job should pass through.
     *
     * @return array<int, object>
     */
    public function middleware(): array
    {
        return [new RateLimited];
    }

NOTE

任务中间件也可以分配给可排队的事件监听器、可邮件和可通知。

速率限制

虽然我们刚刚演示了如何编写自己的速率限制任务中间件,但 Laravel 实际上包含了一个速率限制中间件,你可以利用它来限制任务的速率。就像 路由速率限制器 一样,作业速率限制器是使用 RateLimiter Facade的 for 方法定义的。

例如,你可能希望允许用户每小时备份一次数据,而对高级客户没有任何限制。为了实现这一点,你可以在 AppServiceProviderboot 方法中定义一个 RateLimiter:

php
    use Illuminate\Cache\RateLimiting\Limit;
    use Illuminate\Support\Facades\RateLimiter;

    /**
     * Bootstrap any application services.
     */
    public function boot(): void
    {
        RateLimiter::for('backups', function (object $job) {
            return $job->user->vipCustomer()
                        ? Limit::none()
                        : Limit::perHour(1)->by($job->user->id);
        });
    }

在上面的示例中,我们定义了一个每小时的速率限制;但是,你可以轻松地根据分钟定义速率限制,使用 perMinute 方法。此外,你可以将任何你想要的值传递给速率限制的 by 方法;但是,此值通常用于按客户分段速率限制:

php
    return Limit::perMinute(50)->by($job->user->id);

一旦定义了速率限制,你可以使用 Illuminate\Queue\Middleware\RateLimited 中间件将其附加到你的任务上。每次任务超过速率限制时,此中间件将任务释放回队列,并根据速率限制持续时间适当延迟。

php
    use Illuminate\Queue\Middleware\RateLimited;

    /**
     * Get the middleware the job should pass through.
     *
     * @return array<int, object>
     */
    public function middleware(): array
    {
        return [new RateLimited('backups')];
    }

释放速率限制的任务回到队列中仍会增加任务的总尝试次数。你可能希望根据任务类上的 triesmaxExceptions 属性调整这些属性。或者,你可能希望使用 retryUntil 方法 定义任务应不再尝试的时间长度。

如果你不希望速率限制的任务被重试,你可以使用 dontRelease 方法:

php
    /**
     * Get the middleware the job should pass through.
     *
     * @return array<int, object>
     */
    public function middleware(): array
    {
        return [(new RateLimited('backups'))->dontRelease()];
    }

NOTE

如果你使用 Redis,你可以使用 Illuminate\Queue\Middleware\RateLimitedWithRedis 中间件,它针对 Redis 进行了优化,比基本速率限制中间件更高效。

防止任务重叠

Laravel 包含一个 Illuminate\Queue\Middleware\WithoutOverlapping 中间件,允许你根据任意键防止任务重叠。这对于可能同时修改同一资源的排队任务非常有用,该资源一次只能由一个任务修改。

例如,让我们假设你有一个排队任务,它更新用户的信用分数,并且你希望防止同一用户 ID 的信用分数更新任务重叠。为了实现这一点,你可以从你的任务的 middleware 方法返回 WithoutOverlapping 中间件:

php
    use Illuminate\Queue\Middleware\WithoutOverlapping;

    /**
     * Get the middleware the job should pass through.
     *
     * @return array<int, object>
     */
    public function middleware(): array
    {
        return [new WithoutOverlapping($this->user->id)];
    }

任何重叠的同类任务都将被释放回队列。你还可以指定在释放重叠任务之前必须经过的秒数:

php
    /**
     * Get the middleware the job should pass through.
     *
     * @return array<int, object>
     */
    public function middleware(): array
    {
        return [(new WithoutOverlapping($this->order->id))->releaseAfter(60)];
    }

如果你希望立即删除任何重叠的任务,以便它们不会被重试,你可以使用 dontRelease 方法:

php
    /**
     * Get the middleware the job should pass through.
     *
     * @return array<int, object>
     */
    public function middleware(): array
    {
        return [(new WithoutOverlapping($this->order->id))->dontRelease()];
    }

WithoutOverlapping 中间件由 Laravel 的原子锁功能支持。有时,你的任务可能会以意外的方式失败或超时,从而无法释放锁。因此,你可以显式定义锁的过期时间,使用 expireAfter 方法。例如,下面的示例将指示 Laravel 在任务开始处理三分钟后释放 WithoutOverlapping 锁:

php
    /**
     * Get the middleware the job should pass through.
     *
     * @return array<int, object>
     */
    public function middleware(): array
    {
        return [(new WithoutOverlapping($this->order->id))->expireAfter(180)];
    }

WARNING

WithoutOverlapping 中间件需要支持 的缓存驱动程序。目前,memcachedredisdynamodbdatabasefilearray 缓存驱动程序支持原子锁。

在任务类之间共享锁键

默认情况下,WithoutOverlapping 中间件只会防止同一类任务的重叠。因此,即使两个不同的任务类使用相同的锁键,它们也不会被阻止重叠。但是,你可以指示 Laravel 使用 shared 方法跨任务类应用该键:

php
use Illuminate\Queue\Middleware\WithoutOverlapping;

class ProviderIsDown
{
    // ...

    public function middleware(): array
    {
        return [
            (new WithoutOverlapping("status:{$this->provider}"))->shared(),
        ];
    }
}

class ProviderIsUp
{
    // ...

    public function middleware(): array
    {
        return [
            (new WithoutOverlapping("status:{$this->provider}"))->shared(),
        ];
    }
}

限制异常

Laravel 包含一个 Illuminate\Queue\Middleware\ThrottlesExceptions 中间件,允许你限制异常。一旦任务抛出给定数量的异常,所有进一步尝试执行该任务的尝试都将延迟,直到指定的时间间隔过去。此中间件对于与不稳定的第三方服务交互的任务特别有用。

例如,让我们假设一个排队任务与可能开始抛出异常的第三方 API 进行交互。为了限制异常,你可以从你的任务的 middleware 方法返回 ThrottlesExceptions 中间件。通常,此中间件应与实现 基于时间的尝试 的任务配对:

php
    use DateTime;
    use Illuminate\Queue\Middleware\ThrottlesExceptions;

    /**
     * Get the middleware the job should pass through.
     *
     * @return array<int, object>
     */
    public function middleware(): array
    {
        return [new ThrottlesExceptions(10, 5)];
    }

    /**
     * Determine the time at which the job should timeout.
     */
    public function retryUntil(): DateTime
    {
        return now()->addMinutes(5);
    }

传递给中间件的第一个构造函数参数是任务可以抛出的异常数量,而第二个构造函数参数是任务被限制后必须经过的分钟数,才能再次尝试该任务。在上面的代码示例中,如果任务在 5 分钟内抛出 10 个异常,我们将等待 5 分钟才会再次尝试该任务。

当任务抛出异常但尚未达到异常阈值时,任务通常会立即重试。但是,你可以在附加中间件到任务时调用 backoff 方法来指定这样的任务应该延迟多长时间:

php
    use Illuminate\Queue\Middleware\ThrottlesExceptions;

    /**
     * Get the middleware the job should pass through.
     *
     * @return array<int, object>
     */
    public function middleware(): array
    {
        return [(new ThrottlesExceptions(10, 5))->backoff(5)];
    }

在内部,此中间件使用 Laravel 的缓存系统来实现速率限制,并且任务的类名用作缓存"键"。你可以在附加中间件到任务时调用 by 方法来覆盖此键。这可能对你有用,如果你有多个任务与同一第三方服务交互,并且你希望它们共享一个限制"桶":

php
    use Illuminate\Queue\Middleware\ThrottlesExceptions;

    /**
     * Get the middleware the job should pass through.
     *
     * @return array<int, object>
     */
    public function middleware(): array
    {
        return [(new ThrottlesExceptions(10, 10))->by('key')];
    }

默认情况下,此中间件将限制每个异常。你可以在附加中间件到任务时调用 when 方法来修改此行为。只有当提供给 when 方法的闭包返回 true 时,异常才会被限制:

php
    use Illuminate\Http\Client\HttpClientException;
    use Illuminate\Queue\Middleware\ThrottlesExceptions;

    /**
     * Get the middleware the job should pass through.
     *
     * @return array<int, object>
     */
    public function middleware(): array
    {
        return [(new ThrottlesExceptions(10, 10))->when(
            fn (Throwable $throwable) => $throwable instanceof HttpClientException
        )];
    }

如果你希望限制的异常报告给你应用程序的异常处理程序,你可以这样做。你还可以选择提供一个闭包给 report 方法,只有当给定的闭包返回 true 时才会报告异常:

php
    use Illuminate\Http\Client\HttpClientException;
    use Illuminate\Queue\Middleware\ThrottlesExceptions;

    /**
     * Get the middleware the job should pass through.
     *
     * @return array<int, object>
     */
    public function middleware(): array
    {
        return [(new ThrottlesExceptions(10, 10))->report(
            fn (Throwable $throwable) => $throwable instanceof HttpClientException
        )];
    }

NOTE

如果你使用 Redis,你可以使用 Illuminate\Queue\Middleware\ThrottlesExceptionsWithRedis 中间件,它针对 Redis 进行了优化,比基本异常限制中间件更高效。

分发任务

一旦你编写了任务类,你可以使用任务本身的 dispatch 方法来分发它。传递给 dispatch 方法的参数将传递给任务的构造函数:

php
    <?php

    namespace App\Http\Controllers;

    use App\Http\Controllers\Controller;
    use App\Jobs\ProcessPodcast;
    use App\Models\Podcast;
    use Illuminate\Http\RedirectResponse;
    use Illuminate\Http\Request;

    class PodcastController extends Controller
    {
        /**
         * Store a new podcast.
         */
        public function store(Request $request): RedirectResponse
        {
            $podcast = Podcast::create(/* ... */);

            // ...

            ProcessPodcast::dispatch($podcast);

            return redirect('/podcasts');
        }
    }

如果你想有条件地分发任务,你可以使用 dispatchIfdispatchUnless 方法:

php
    ProcessPodcast::dispatchIf($accountActive, $podcast);

    ProcessPodcast::dispatchUnless($accountSuspended, $podcast);

在新的 Laravel 应用程序中,sync 驱动程序是默认的队列驱动程序。此驱动程序在当前请求的前台中同步执行任务,这在本地开发期间通常很方便。如果你想实际开始将任务排队以进行后台处理,你可以在应用程序的 config/queue.php 配置文件中指定不同的队列驱动程序。

延迟分发

如果你希望指定任务在队列工作进程可用于处理之前不应立即可用,你可以在分发任务时使用 delay 方法。例如,让我们指定任务在分发后 10 分钟内不应可用于处理:

php
    <?php

    namespace App\Http\Controllers;

    use App\Http\Controllers\Controller;
    use App\Jobs\ProcessPodcast;
    use App\Models\Podcast;
    use Illuminate\Http\RedirectResponse;
    use Illuminate\Http\Request;

    class PodcastController extends Controller
    {
        /**
         * Store a new podcast.
         */
        public function store(Request $request): RedirectResponse
        {
            $podcast = Podcast::create(/* ... */);

            // ...

            ProcessPodcast::dispatch($podcast)
                        ->delay(now()->addMinutes(10));

            return redirect('/podcasts');
        }
    }

WARNING

Amazon SQS 队列服务的最长延迟时间为 15 分钟。

在将响应发送到浏览器后进行调度

或者,如果您的 Web 服务器使用的是 FastCGI,则 dispatchAfterResponse 方法会将 job 的分派延迟到将 HTTP 响应发送到用户的浏览器之后。这仍将允许用户开始使用应用程序,即使排队的作业仍在执行。这通常只应用于需要大约一秒钟的工作,例如发送电子邮件。由于它们是在当前 HTTP 请求中处理的,因此以这种方式分派的作业不需要运行队列工作程序即可对其进行处理:

php
    use App\Jobs\SendNotification;

    SendNotification::dispatchAfterResponse();

你也可以 dispatch 一个闭包,并将 afterResponse 方法链接到 dispatch helper 上,以便在 HTTP 响应发送到浏览器后执行闭包:

php
    use App\Mail\WelcomeMessage;
    use Illuminate\Support\Facades\Mail;

    dispatch(function () {
        Mail::to('taylor@example.com')->send(new WelcomeMessage);
    })->afterResponse();

同步调度

如果您想立即(同步)dispatch 作业,则可以使用 dispatchSync 方法。使用此方法时,作业不会排队,而是在当前进程中立即执行:

php
    <?php

    namespace App\Http\Controllers;

    use App\Http\Controllers\Controller;
    use App\Jobs\ProcessPodcast;
    use App\Models\Podcast;
    use Illuminate\Http\RedirectResponse;
    use Illuminate\Http\Request;

    class PodcastController extends Controller
    {
        /**
         * Store a new podcast.
         */
        public function store(Request $request): RedirectResponse
        {
            $podcast = Podcast::create(/* ... */);

            // Create podcast...

            ProcessPodcast::dispatchSync($podcast);

            return redirect('/podcasts');
        }
    }

工作和数据库事务

虽然在 database 事务中分派 job 是完全可以的,但您应该特别小心以确保您的 job 实际上能够成功执行。在事务中分派作业时,该作业可能会在父事务提交之前由 worker 处理。发生这种情况时,您在数据库事务期间对模型或数据库记录所做的任何更新可能尚未反映在数据库中。此外,在事务中创建的任何模型或数据库记录在数据库中可能不存在。

值得庆幸的是,Laravel 提供了几种解决此问题的方法。首先,您可以在队列连接的配置数组中设置 after_commit 连接选项:

php
    'redis' => [
        'driver' => 'redis',
        // ...
        'after_commit' => true,
    ],

after_commit 选项为 true 时,您可以在 database 事务中分派 job;但是,Laravel 将等到打开的父数据库事务提交后,再实际分派作业。当然,如果当前没有打开数据库事务,则将立即分派作业。

如果事务由于事务期间发生的异常而回滚,则在该事务期间调度的作业将被丢弃。

NOTE

after_commit 配置选项设置为 true 还将导致在提交所有打开的数据库事务后调度任何排队的事件侦听器、mailables、notifications 和 broadcast 事件。

工作和数据库事务

如果未将 after_commit 队列连接配置选项设置为 true,则仍可以指示在提交所有打开的数据库事务后应分派特定作业。为此,您可以将 afterCommit 方法链接到您的 dispatch 操作上:

php
    use App\Jobs\ProcessPodcast;

    ProcessPodcast::dispatch($podcast)->afterCommit();

同样,如果 after_commit 配置选项设置为 true,则可以指示应立即分派特定作业,而无需等待任何打开的数据库事务提交:

php
    ProcessPodcast::dispatch($podcast)->beforeCommit();

作业链接

任务链接允许您指定在成功执行主任务后应按顺序运行的排队任务列表。如果序列中的一个作业失败,则其余作业将不会运行。要执行排队的作业链,你可以使用 Bus Facade 提供的 chain 方法。Laravel 的命令总线是一个较低级别的组件,排队作业调度构建在它之上:

php
    use App\Jobs\OptimizePodcast;
    use App\Jobs\ProcessPodcast;
    use App\Jobs\ReleasePodcast;
    use Illuminate\Support\Facades\Bus;

    Bus::chain([
        new ProcessPodcast,
        new OptimizePodcast,
        new ReleasePodcast,
    ])->dispatch();

除了链接 job 类实例之外,您还可以链接闭包:

php
    Bus::chain([
        new ProcessPodcast,
        new OptimizePodcast,
        function () {
            Podcast::update(/* ... */);
        },
    ])->dispatch();

WARNING

在作业中使用 $this->delete() 方法删除作业不会阻止处理链接的作业。只有当链中的作业失败时,链才会停止执行。

链连接和队列

如果要指定应用于链接作业的连接和队列,可以使用 onConnectiononQueue 方法。这些方法指定了应该使用的队列连接和队列名称,除非为排队的作业显式分配了不同的连接/队列:

php
    Bus::chain([
        new ProcessPodcast,
        new OptimizePodcast,
        new ReleasePodcast,
    ])->onConnection('redis')->onQueue('podcasts')->dispatch();

将 Job 添加到链中

有时,您可能需要从该链中的另一个作业中将作业预置或附加到现有作业链中。您可以使用 prependToChainappendToChain 方法完成此操作:

php
/**
 * Execute the job.
 */
public function handle(): void
{
    // ...

    // Prepend to the current chain, run job immediately after current job...
    $this->prependToChain(new TranscribePodcast);

    // Append to the current chain, run job at end of chain...
    $this->appendToChain(new TranscribePodcast);
}

Chain Failures 链故障

在链接作业时,你可以使用 catch 方法来指定一个闭包,如果链中的作业失败,应该调用该闭包。给定的回调将接收导致作业失败的 Throwable 实例:

php
    use Illuminate\Support\Facades\Bus;
    use Throwable;

    Bus::chain([
        new ProcessPodcast,
        new OptimizePodcast,
        new ReleasePodcast,
    ])->catch(function (Throwable $e) {
        // A job within the chain has failed...
    })->dispatch();

WARNING

由于链式回调稍后由 Laravel 队列序列化和执行,因此您不应在链式回调中使用 $this 变量。

自定义队列和连接

分派到特定队列

通过将作业推送到不同的队列,您可以对排队的作业进行“分类”,甚至可以确定分配给各种队列的工作线程数量的优先级。请记住,这不会将作业推送到队列配置文件定义的不同队列 “连接”,而只会推送到单个连接中的特定队列。要指定队列,请在分派作业时使用 onQueue 方法:

php
    <?php

    namespace App\Http\Controllers;

    use App\Http\Controllers\Controller;
    use App\Jobs\ProcessPodcast;
    use App\Models\Podcast;
    use Illuminate\Http\RedirectResponse;
    use Illuminate\Http\Request;

    class PodcastController extends Controller
    {
        /**
         * Store a new podcast.
         */
        public function store(Request $request): RedirectResponse
        {
            $podcast = Podcast::create(/* ... */);

            // Create podcast...

            ProcessPodcast::dispatch($podcast)->onQueue('processing');

            return redirect('/podcasts');
        }
    }

或者,您可以通过在作业的构造函数中调用 onQueue 方法来指定作业的队列:

php
    <?php

    namespace App\Jobs;

     use Illuminate\Contracts\Queue\ShouldQueue;
     use Illuminate\Foundation\Queue\Queueable;

    class ProcessPodcast implements ShouldQueue
    {
        use Queueable;

        /**
         * Create a new job instance.
         */
        public function __construct()
        {
            $this->onQueue('processing');
        }
    }

分派到特定连接

如果您的应用程序与多个队列连接交互,则可以使用 onConnection 方法指定要将作业推送到哪个连接:

php
    <?php

    namespace App\Http\Controllers;

    use App\Http\Controllers\Controller;
    use App\Jobs\ProcessPodcast;
    use App\Models\Podcast;
    use Illuminate\Http\RedirectResponse;
    use Illuminate\Http\Request;

    class PodcastController extends Controller
    {
        /**
         * Store a new podcast.
         */
        public function store(Request $request): RedirectResponse
        {
            $podcast = Podcast::create(/* ... */);

            // Create podcast...

            ProcessPodcast::dispatch($podcast)->onConnection('sqs');

            return redirect('/podcasts');
        }
    }

您可以将 onConnectiononQueue 方法链接在一起,以指定作业的连接和队列:

php
    ProcessPodcast::dispatch($podcast)
                  ->onConnection('sqs')
                  ->onQueue('processing');

或者,您可以通过在作业的构造函数中调用 onConnection 方法来指定作业的连接:

php
    <?php

    namespace App\Jobs;

     use Illuminate\Contracts\Queue\ShouldQueue;
     use Illuminate\Foundation\Queue\Queueable;

    class ProcessPodcast implements ShouldQueue
    {
        use Queueable;

        /**
         * Create a new job instance.
         */
        public function __construct()
        {
            $this->onConnection('sqs');
        }
    }

指定最大作业尝试次数/超时值

Max Attempts

如果其中一个排队的作业遇到错误,您可能不希望它无限期地重试。因此,Laravel 提供了多种方法来指定可以尝试作业的次数或时长。

指定作业可以尝试的最大次数的一种方法是通过 Artisan 命令行上的 --tries 开关。这将适用于该 worker 处理的所有作业,除非正在处理的作业指定了可以尝试的次数:

shell
php artisan queue:work --tries=3

如果作业超过其最大尝试次数,则将被视为“失败”作业。有关处理失败作业的更多信息,请参阅 失败作业文档.如果向 queue:work 命令提供 --tries=0,则作业将无限期重试。

您可以通过定义可以对 job 类本身尝试 job 的最大次数来采用更精细的方法。如果在作业上指定了最大尝试次数,则它将优先于命令行上提供的 --tries 值:

php
    <?php

    namespace App\Jobs;

    class ProcessPodcast implements ShouldQueue
    {
        /**
         * The number of times the job may be attempted.
         *
         * @var int
         */
        public $tries = 5;
    }

如果你需要对特定作业的最大尝试次数进行动态控制,你可以在作业上定义一个 tries 方法:

php
    /**
     * Determine number of times the job may be attempted.
     */
    public function tries(): int
    {
        return 5;
    }

基于时间的尝试

作为定义作业在失败之前可以尝试多少次的替代方法,您可以定义不应再尝试该作业的时间。这允许在给定的时间范围内尝试作业任意次数。要定义不应再尝试作业的时间,请将 retryUntil 方法添加到您的作业类中。此方法应返回 DateTime 实例:

php
    use DateTime;

    /**
     * Determine the time at which the job should timeout.
     */
    public function retryUntil(): DateTime
    {
        return now()->addMinutes(10);
    }

NOTE

您还可以在排队的事件侦听器上定义 tries 属性或 retryUntil 方法。

最大异常数

有时你可能希望指定一个作业可以尝试多次,但如果重试是由给定数量的未处理异常触发的(而不是由 release 方法直接释放),则应该失败。为此,您可以在 job 类上定义 maxExceptions 属性:

php
    <?php

    namespace App\Jobs;

    use Illuminate\Support\Facades\Redis;

    class ProcessPodcast implements ShouldQueue
    {
        /**
         * The number of times the job may be attempted.
         *
         * @var int
         */
        public $tries = 25;

        /**
         * The maximum number of unhandled exceptions to allow before failing.
         *
         * @var int
         */
        public $maxExceptions = 3;

        /**
         * Execute the job.
         */
        public function handle(): void
        {
            Redis::throttle('key')->allow(10)->every(60)->then(function () {
                // Lock obtained, process the podcast...
            }, function () {
                // Unable to obtain lock...
                return $this->release(10);
            });
        }
    }

在此示例中,如果应用程序无法获得 Redis 锁,则作业将释放 10 秒,并将继续重试最多 25 次。但是,如果作业引发了三个未处理的异常,则作业将失败。

超时

通常,您大致知道预计排队作业需要多长时间。因此,Laravel 允许您指定 “timeout” 值。默认情况下,超时值为 60 秒。如果作业的处理时间超过超时值指定的秒数,则处理该作业的工作程序将退出并显示错误。通常,worker 将由服务器上配置的 process manager 自动重新启动。

可以使用 Artisan 命令行上的 --timeout 开关指定作业可以运行的最大秒数:

shell
php artisan queue:work --timeout=30

如果作业因持续超时而超过其最大尝试次数,则将被标记为失败。

您还可以定义允许作业在 job 类本身上运行的最大秒数。如果在作业上指定了超时,它将优先于命令行上指定的任何超时:

php
    <?php

    namespace App\Jobs;

    class ProcessPodcast implements ShouldQueue
    {
        /**
         * The number of seconds the job can run before timing out.
         *
         * @var int
         */
        public $timeout = 120;
    }

有时,IO 阻塞进程(如套接字或传出 HTTP 连接)可能不遵守您指定的超时。因此,在使用这些功能时,您也应始终尝试使用其 API 指定超时。例如,在使用 Guzzle 时,您应该始终指定连接和请求超时值。

WARNING

必须安装 pcntl PHP 扩展才能指定作业超时。此外,作业的 “timeout” 值应始终小于其 “retry after” 值。否则,可能会在作业实际完成执行或超时之前重新尝试该作业。

超时失败

如果你想指示一个作业应该在超时时标记为失败,你可以在 job 类上定义 $failOnTimeout 属性:

php
/**
 * Indicate if the job should be marked as failed on timeout.
 *
 * @var bool
 */
public $failOnTimeout = true;

错误处理

如果在处理作业时引发异常,则作业将自动释放回队列,以便再次尝试。该作业将继续释放,直到尝试达到应用程序允许的最大次数。最大尝试次数由 queue:work Artisan 命令上使用的 --tries 开关定义。或者,可以在 job 类本身上定义最大尝试次数。有关运行队列 worker 的更多信息,请参见下文

手动释放任务

有时,您可能希望手动将作业释放回队列,以便以后可以再次尝试。您可以通过调用 release 方法来实现此目的:

php
    /**
     * Execute the job.
     */
    public function handle(): void
    {
        // ...

        $this->release();
    }

默认情况下,release 方法会将作业释放回队列中,以便立即处理。但是,您可以通过将 integer 或 date 实例传递给 release 方法,指示队列在经过给定的秒数之前不使作业可用于处理:

php
    $this->release(10);

    $this->release(now()->addSeconds(10));

手动使作业失败

有时,您可能需要手动将作业标记为 “failed”。为此,您可以调用 fail 方法:

php
    /**
     * Execute the job.
     */
    public function handle(): void
    {
        // ...

        $this->fail();
    }

如果由于捕获到的异常而要将作业标记为失败,则可以将异常传递给 fail 方法。或者,为了方便起见,您可以传递一个字符串错误消息,该消息将为您转换为异常:

php
    $this->fail($exception);

    $this->fail('Something went wrong.');

NOTE

有关失败作业的更多信息,请查看有关处理作业失败的文档

作业批处理

Laravel 的作业批处理功能允许您轻松执行一批作业,然后在该批作业完成执行时执行一些操作。在开始之前,您应该创建数据库迁移以构建一个表,该表将包含有关作业批次的元信息,例如其完成百分比。可以使用 make:queue-batches-table Artisan 命令生成此迁移:

shell
php artisan make:queue-batches-table

php artisan migrate

定义可批处理作业

要定义可批处理作业,您应该照常创建可排队作业;但是,您应该将 Illuminate\Bus\Batchable 特征添加到 job 类中。此 trait 提供对 batch 方法的访问,该方法可用于检索作业正在执行的当前 batch:

php
    <?php

    namespace App\Jobs;

    use Illuminate\Bus\Batchable;
    use Illuminate\Contracts\Queue\ShouldQueue;
    use Illuminate\Foundation\Queue\Queueable;

    class ImportCsv implements ShouldQueue
    {
        use Batchable, Queueable;

        /**
         * Execute the job.
         */
        public function handle(): void
        {
            if ($this->batch()->cancelled()) {
                // Determine if the batch has been cancelled...

                return;
            }

            // Import a portion of the CSV file...
        }
    }

分派批次

要分派一批作业,你应该使用 Bus Facade 的 batch 方法。当然,批处理在与完成回调结合使用时主要有用。因此,你可以使用 thencatchfinally 方法来定义批处理的完成回调。调用这些回调时,每个回调都将接收一个 Illuminate\Bus\Batch 实例。在此示例中,我们将假设我们正在对一批作业进行排队,每个作业处理 CSV 文件中给定数量的行:

php
    use App\Jobs\ImportCsv;
    use Illuminate\Bus\Batch;
    use Illuminate\Support\Facades\Bus;
    use Throwable;

    $batch = Bus::batch([
        new ImportCsv(1, 100),
        new ImportCsv(101, 200),
        new ImportCsv(201, 300),
        new ImportCsv(301, 400),
        new ImportCsv(401, 500),
    ])->before(function (Batch $batch) {
        // The batch has been created but no jobs have been added...
    })->progress(function (Batch $batch) {
        // A single job has completed successfully...
    })->then(function (Batch $batch) {
        // All jobs completed successfully...
    })->catch(function (Batch $batch, Throwable $e) {
        // First batch job failure detected...
    })->finally(function (Batch $batch) {
        // The batch has finished executing...
    })->dispatch();

    return $batch->id;

批处理的 ID(可以通过 $batch->id 属性访问)可用于在分派后查询 Laravel 命令总线以获取有关批处理的信息。

WARNING

由于批量回调由 Laravel 队列在稍后序列化和执行,因此您不应在回调中使用 $this 变量。此外,由于批处理作业包装在数据库事务中,因此不应在作业中执行触发隐式提交的数据库语句。

命名批处理

如果命名了 batch,一些工具(如 Laravel Horizon 和 Laravel Telescope)可能会为 batchs 提供更用户友好的调试信息。要为批处理分配任意名称,您可以在定义批处理时调用 name 方法:

php
    $batch = Bus::batch([
        // ...
    ])->then(function (Batch $batch) {
        // All jobs completed successfully...
    })->name('Import CSV')->dispatch();

批量连接和队列

如果要指定应用于批处理作业的连接和队列,可以使用 onConnectiononQueue 方法。所有批处理作业必须在同一连接和队列中执行:

php
    $batch = Bus::batch([
        // ...
    ])->then(function (Batch $batch) {
        // All jobs completed successfully...
    })->onConnection('redis')->onQueue('imports')->dispatch();

链和批次

您可以通过将链接的作业放在数组中来定义批处理中的一组链接作业。例如,我们可以并行执行两个任务链,并在两个任务链都完成处理后执行回调:

php
    use App\Jobs\ReleasePodcast;
    use App\Jobs\SendPodcastReleaseNotification;
    use Illuminate\Bus\Batch;
    use Illuminate\Support\Facades\Bus;

    Bus::batch([
        [
            new ReleasePodcast(1),
            new SendPodcastReleaseNotification(1),
        ],
        [
            new ReleasePodcast(2),
            new SendPodcastReleaseNotification(2),
        ],
    ])->then(function (Batch $batch) {
        // ...
    })->dispatch();

相反,您可以通过在链中定义 batchs 来运行中的 Batch of Job。例如,您可以先运行一批作业来发布多个播客,然后再运行一批作业来发送发布通知:

php
    use App\Jobs\FlushPodcastCache;
    use App\Jobs\ReleasePodcast;
    use App\Jobs\SendPodcastReleaseNotification;
    use Illuminate\Support\Facades\Bus;

    Bus::chain([
        new FlushPodcastCache,
        Bus::batch([
            new ReleasePodcast(1),
            new ReleasePodcast(2),
        ]),
        Bus::batch([
            new SendPodcastReleaseNotification(1),
            new SendPodcastReleaseNotification(2),
        ]),
    ])->dispatch();

将作业添加到批处理

有时,从批处理作业中向批处理中添加其他作业可能很有用。当您需要批处理数千个作业时,此模式非常有用,而这些作业在 Web 请求期间可能需要很长时间才能分派。因此,您可能希望分派初始批次的 “loader” 作业,这些作业会用更多的作业来水合该批次:

php
    $batch = Bus::batch([
        new LoadImportBatch,
        new LoadImportBatch,
        new LoadImportBatch,
    ])->then(function (Batch $batch) {
        // All jobs completed successfully...
    })->name('Import Contacts')->dispatch();

在此示例中,我们将使用 LoadImportBatch 作业通过其他作业冻结批处理。为此,我们可以在 batch 实例上使用 add 方法,该实例可以通过作业的 batch 方法访问:

php
    use App\Jobs\ImportContacts;
    use Illuminate\Support\Collection;

    /**
     * Execute the job.
     */
    public function handle(): void
    {
        if ($this->batch()->cancelled()) {
            return;
        }

        $this->batch()->add(Collection::times(1000, function () {
            return new ImportContacts;
        }));
    }

WARNING

您只能从属于同一批次的作业中将作业添加到批次中。

检验批次

提供给批处理完成回调的 Illuminate\Bus\Batch 实例具有多种属性和方法,可帮助您与给定的作业批次进行交互和检查:

php
    // The UUID of the batch...
    $batch->id;

    // The name of the batch (if applicable)...
    $batch->name;

    // The number of jobs assigned to the batch...
    $batch->totalJobs;

    // The number of jobs that have not been processed by the queue...
    $batch->pendingJobs;

    // The number of jobs that have failed...
    $batch->failedJobs;

    // The number of jobs that have been processed thus far...
    $batch->processedJobs();

    // The completion percentage of the batch (0-100)...
    $batch->progress();

    // Indicates if the batch has finished executing...
    $batch->finished();

    // Cancel the execution of the batch...
    $batch->cancel();

    // Indicates if the batch has been cancelled...
    $batch->cancelled();

从路由返回 Batch

所有 Illuminate\Bus\Batch 实例都是 JSON 可序列化的,这意味着您可以直接从应用程序的某个路由返回它们,以检索包含有关批处理的信息(包括其完成进度)的 JSON 有效负载。这样可以方便地在应用程序的 UI 中显示有关批处理完成进度的信息。

要按 ID 检索批处理,您可以使用 Bus Facade 的 findBatch 方法:

php
    use Illuminate\Support\Facades\Bus;
    use Illuminate\Support\Facades\Route;

    Route::get('/batch/{batchId}', function (string $batchId) {
        return Bus::findBatch($batchId);
    });

取消批处理

有时,您可能需要取消给定批处理的执行。这可以通过在 Illuminate\Bus\Batch 实例上调用 cancel 方法来实现:

php
    /**
     * Execute the job.
     */
    public function handle(): void
    {
        if ($this->user->exceedsImportLimit()) {
            return $this->batch()->cancel();
        }

        if ($this->batch()->cancelled()) {
            return;
        }
    }

正如您在前面的示例中可能已经注意到的那样,批处理作业通常应该在继续执行之前确定其相应的批处理是否已被取消。但是,为方便起见,您可以改为将 SkipIfBatchCancelled中间件分配给作业。顾名思义,如果相应的批处理已被取消,此中间件将指示 Laravel 不处理该作业:

php
    use Illuminate\Queue\Middleware\SkipIfBatchCancelled;

    /**
     * Get the middleware the job should pass through.
     */
    public function middleware(): array
    {
        return [new SkipIfBatchCancelled];
    }

失败

当批处理作业失败时,将调用 catch 回调(如果已分配)。此回调仅针对批处理中失败的第一个作业调用。

允许失败

当批处理中的作业失败时,Laravel 会自动将该批处理标记为 “cancelled”。如果需要,您可以禁用此行为,以便作业失败不会自动将批处理标记为已取消。这可以通过在分派批处理时调用 allowFailures 方法来实现:

php
    $batch = Bus::batch([
        // ...
    ])->then(function (Batch $batch) {
        // All jobs completed successfully...
    })->allowFailures()->dispatch();

重试失败的 Batch 作业

为方便起见,Laravel 提供了一个 queue:retry-batch Artisan 命令,允许您轻松重试给定批处理的所有失败作业。queue:retry-batch 命令接受应重试其失败作业的批处理的 UUID:

shell
php artisan queue:retry-batch 32dbc76c-4f82-4749-b610-a639fe0099b5

修剪批处理

如果不进行修剪,job_batches 表可以非常快速地累积记录。为了缓解这种情况,您应该安排 queue:prune-batches Artisan 命令每天运行:

php
    use Illuminate\Support\Facades\Schedule;

    Schedule::command('queue:prune-batches')->daily();

默认情况下,将修剪所有超过 24 小时的已完成批处理。您可以在调用命令时使用 hours 选项来确定保留批处理数据的时间。例如,以下命令将删除 48 小时前完成的所有批次:

php
    use Illuminate\Support\Facades\Schedule;

    Schedule::command('queue:prune-batches --hours=48')->daily();

有时,jobs_batches 表可能会累积从未成功完成的批次的批次记录,例如作业失败且该作业从未成功重试的批次。您可以指示 queue:prune-batches 命令使用 unfinished 选项修剪这些未完成的批处理记录:

php
    use Illuminate\Support\Facades\Schedule;

    Schedule::command('queue:prune-batches --hours=48 --unfinished=72')->daily();

同样,jobs_batches 表也可能累积已取消批次的批次记录。您可以指示 queue:prune-batches 命令使用 cancelled 选项修剪这些已取消的批处理记录:

php
    use Illuminate\Support\Facades\Schedule;

    Schedule::command('queue:prune-batches --hours=48 --cancelled=72')->daily();

在 DynamoDB 中存储批处理

Laravel 还支持在 DynamoDB 而不是关系数据库中存储批量元信息。但是,您需要手动创建一个 DynamoDB 表来存储所有批处理记录。

通常,此表应命名为 job_batches,但您应该根据应用程序的队列配置文件中的 queue.batching.table 配置值的值来命名表。

DynamoDB Batch 表配置

job_batches表应具有一个名为 application 的字符串主分区键和一个名为 id 的字符串主排序键。密钥的 application 部分将包含应用程序的名称,该名称由应用程序的 app 配置文件中的 name 配置值定义。由于应用程序名称是 DynamoDB 表键的一部分,因此您可以使用同一个表来存储多个 Laravel 应用程序的作业批处理。

此外,如果您想利用自动批量修剪,您可以为表定义 ttl 属性。

DynamoDB 配置

接下来,安装 AWS 开发工具包,以便您的 Laravel 应用程序可以与 Amazon DynamoDB 通信:

shell
composer require aws/aws-sdk-php

然后,将 queue.batching.driver 配置选项的值设置为 dynamodb。此外,您应该在批处理配置数组中定义 keysecretregion 配置选项。这些选项将用于向 AWS 进行身份验证。使用 dynamodb 驱动程序时,queue.batching.database 配置选项不是必需的:

php
'batching' => [
    'driver' => env('QUEUE_BATCHING_DRIVER', 'dynamodb'),
    'key' => env('AWS_ACCESS_KEY_ID'),
    'secret' => env('AWS_SECRET_ACCESS_KEY'),
    'region' => env('AWS_DEFAULT_REGION', 'us-east-1'),
    'table' => 'job_batches',
],

在 DynamoDB 中修剪批处理

当使用 DynamoDB 存储作业批处理信息时,用于修剪存储在关系数据库中的批处理的典型修剪命令将不起作用。相反,您可以利用 DynamoDB 的本机 TTL 功能自动删除旧批处理的记录。

如果您使用 ttl 属性定义了 DynamoDB 表,则可以定义配置参数以指示 Laravel 如何修剪批处理记录。queue.batching.ttl_attribute 配置值定义保存 TTL 的属性的名称,而 queue.batching.ttl 配置值定义相对于上次更新记录的秒数,在此秒数之后,可以从 DynamoDB 表中删除批处理记录:

php
'batching' => [
    'driver' => env('QUEUE_FAILED_DRIVER', 'dynamodb'),
    'key' => env('AWS_ACCESS_KEY_ID'),
    'secret' => env('AWS_SECRET_ACCESS_KEY'),
    'region' => env('AWS_DEFAULT_REGION', 'us-east-1'),
    'table' => 'job_batches',
    'ttl_attribute' => 'ttl',
    'ttl' => 60 * 60 * 24 * 7, // 7 days...
],

排队闭包

除了将 job 类分派给队列之外,您还可以分派一个闭包。这对于需要在当前请求周期之外执行的快速、简单的任务非常有用。当将闭包分派到队列时,闭包的代码内容是加密签名的,因此在传输过程中无法修改:

php
    $podcast = App\Podcast::find(1);

    dispatch(function () use ($podcast) {
        $podcast->publish();
    });

使用 catch 方法,您可以提供一个闭包,如果在用尽队列的所有已配置重试尝试后排队闭包未能成功完成,则应执行该闭包:

php
    use Throwable;

    dispatch(function () use ($podcast) {
        $podcast->publish();
    })->catch(function (Throwable $e) {
        // This job has failed...
    });

WARNING

由于 catch 回调是由 Laravel 队列稍后序列化和执行的,因此您不应在 catch 回调中使用 $this 变量。

运行 Queue Worker

queue:work 命令

Laravel 包含一个 Artisan 命令,该命令将启动队列工作程序并在新作业被推送到队列时对其进行处理。您可以使用 queue:work Artisan 命令运行 worker。请注意,一旦 queue:work 命令启动,它将继续运行,直到它被手动停止或您关闭终端:

shell
php artisan queue:work

NOTE

要使 queue:work 进程在后台永久运行,您应该使用进程监视器(如 Supervisor)来确保队列工作程序不会停止运行。

如果您希望将已处理的作业 ID 包含在命令的输出中,则可以在调用 queue:work 命令时包含 -v 标志:

shell
php artisan queue:work -v

请记住,queue worker 是长期存在的进程,并将启动的应用程序状态存储在内存中。因此,在启动后,他们不会注意到代码库中的更改。因此,在部署过程中,请务必重新启动队列工作程序。此外,请记住,您的应用程序创建或修改的任何静态状态都不会在作业之间自动重置。

或者,您可以运行 queue:listen 命令。使用 queue:listen 命令时,当您想要重新加载更新的代码或重置应用程序状态时,您不必手动重新启动 worker;但是,此命令的效率明显低于 queue:work 命令:

shell
php artisan queue:listen

运行多个队列工作程序

要将多个 worker 分配给一个队列并同时处理作业,您只需启动多个 queue:work 进程。这可以通过终端中的多个选项卡在本地完成,也可以在生产环境中使用工艺管理器的配置设置完成。使用 Supervisor 时,您可以使用 numprocs 配置值。

指定连接和队列

您还可以指定 worker 应使用哪个队列连接。传递给 work 命令的连接名称应对应于 config/queue.php 配置文件中定义的连接之一:

shell
php artisan queue:work redis

默认情况下,queue:work 命令仅处理给定连接上默认队列的作业。但是,您可以通过仅处理给定连接的特定队列来进一步自定义队列工作程序。例如,如果所有电子邮件都在 redis 队列连接上的电子邮件队列中处理,则可以发出以下命令来启动仅处理该队列的工作程序:

shell
php artisan queue:work redis --queue=emails

处理指定数量的作业

--once 选项可用于指示 worker 仅处理队列中的单个作业:

shell
php artisan queue:work --once

--max-jobs 选项可用于指示 worker 处理给定数量的作业,然后退出。此选项与 Supervisor 结合使用时可能很有用,这样您的 worker 在处理给定数量的作业后会自动重新启动,释放它们可能已经积累的任何内存:

shell
php artisan queue:work --max-jobs=1000

处理所有排队的作业,然后退出

--stop-when-empty 选项可用于指示 worker 处理所有作业,然后正常退出。如果您希望在队列为空后关闭容器,则在 Docker 容器中处理 Laravel 队列时,此选项非常有用:

shell
php artisan queue:work --stop-when-empty

在给定的秒数内处理作业

--max-time 选项可用于指示 worker 在给定的秒数内处理作业,然后退出。此选项与 Supervisor 结合使用时可能很有用,这样您的 worker 在处理作业给定时间后会自动重新启动,释放它们可能已经积累的任何内存:

shell
# Process jobs for one hour and then exit...
php artisan queue:work --max-time=3600

休眠持续时间

当队列中有可用的作业时,worker 将继续处理作业,而不会在作业之间出现延迟。但是,sleep 选项确定在没有可用作业的情况下,worker 将 “休眠” 的秒数。当然,在休眠时,worker 不会处理任何新的作业:

shell
php artisan queue:work --sleep=3

维护模式和队列

当您的应用程序处于维护模式时,不会处理任何排队的作业。一旦应用程序退出维护模式,这些作业将继续正常处理。

要在启用维护模式的情况下强制队列 worker 处理作业,您可以使用 --force 选项:

shell
php artisan queue:work --force

资源注意事项

守护程序队列工作程序在处理每个作业之前不会 “重启” 框架。因此,您应该在每个作业完成后释放所有繁重的资源。例如,如果您正在使用 GD 库进行图像处理,则应在处理完图像后使用 imagedestroy 释放内存。

队列优先级

有时,您可能希望确定队列的处理方式的优先级。例如,在 config/queue.php 配置文件中,您可以将 redis 连接的默认队列设置为 low。但是,有时您可能希望将作业推送到优先级队列,如下所示:

php
    dispatch((new Job)->onQueue('high'));

要启动一个工作程序,在继续处理队列上的任何作业之前验证所有队列作业都已处理,请将以逗号分隔的队列名称列表传递给 work 命令:

shell
php artisan queue:work --queue=high,low

队列 Worker 和部署

由于队列工作程序是长期进程,因此如果不重新启动,它们不会注意到代码的更改。因此,使用队列工作程序部署应用程序的最简单方法是在部署过程中重新启动工作程序。您可以通过发出 queue:restart 命令正常重新启动所有工作程序:

shell
php artisan queue:restart

此命令将指示所有队列工作程序在处理完当前作业后正常退出,以便不会丢失任何现有作业。由于队列工作程序将在执行 queue:restart 命令时退出,因此您应该运行进程管理器(如 Supervisor)来自动重新启动队列工作程序。

NOTE

队列使用缓存来存储重启信号,因此在使用此功能之前,您应该验证是否已为您的应用程序正确配置了缓存驱动程序。

作业过期和超时

作业过期

config/queue.php 配置文件中,每个队列连接都定义了一个 retry_after 选项。此选项指定队列连接在重试正在处理的作业之前应等待的秒数。例如,如果 retry_after 的值设置为 90,则如果作业已处理 90 秒而未被释放或删除,则该作业将被释放回队列中。通常,您应该将 retry_after 值设置为作业完成处理应合理花费的最大秒数。

WARNING

唯一不包含 retry_after 值的队列连接是 Amazon SQS。SQS 将根据默认可见性超时重试作业,该超时在 AWS 控制台中进行管理。

Worker 超时

queue:work Artisan 命令公开了 --timeout 选项。默认情况下,--timeout 值为 60 秒。如果作业的处理时间超过超时值指定的秒数,则处理该作业的工作程序将退出并显示错误。通常,worker 将由服务器上配置的 process manager 自动重新启动:

shell
php artisan queue:work --timeout=60

retry_after 配置选项和 --timeout CLI 选项不同,但协同工作以确保作业不会丢失,并且作业只成功处理一次。

WARNING

--timeout 值应始终比 retry_after 配置值至少短几秒钟。这将确保处理冻结作业的工作程序在重试作业之前始终终止。如果您的 --timeout 选项长于 retry_after 配置值,则您的作业可能会被处理两次。

Supervisor 配置

在生产环境中,您需要一种方法来保持 queue:work 流程运行。queue:work 进程可能会因各种原因而停止运行,例如超出工作线程超时或执行 queue:restart 命令。

因此,您需要配置一个进程监视器,该监视器可以检测 queue:work 进程何时退出并自动重新启动它们。此外,进程监视器还允许您指定要并发运行的 queue:work 进程数。Supervisor 是 Linux 环境中常用的进程监控器,我们将在以下文档中讨论如何配置它。

安装 Supervisor

Supervisor 是 Linux 操作系统的进程监控器,如果它们失败,它将自动重新启动 queue:work 进程。要在 Ubuntu 上安装 Supervisor,您可以使用以下命令:

shell
sudo apt-get install supervisor

NOTE

如果自己配置和管理 Supervisor 听起来让人不知所措,请考虑使用 Laravel Forge,它将自动为您的生产 Laravel 项目安装和配置 Supervisor。

配置 Supervisor

Supervisor 配置文件通常存储在 /etc/supervisor/conf.d 目录中。在此目录中,您可以创建任意数量的配置文件,以指示 supervisor 如何监控您的进程。例如,让我们创建一个 laravel-worker.conf 文件来启动和监控 queue:work 进程:

ini
[program:laravel-worker]
process_name=%(program_name)s_%(process_num)02d
command=php /home/forge/app.com/artisan queue:work sqs --sleep=3 --tries=3 --max-time=3600
autostart=true
autorestart=true
stopasgroup=true
killasgroup=true
user=forge
numprocs=8
redirect_stderr=true
stdout_logfile=/home/forge/app.com/worker.log
stopwaitsecs=3600

在此示例中,numprocs 指令将指示 Supervisor 运行 8 个 queue:work 进程并监控所有进程,如果失败,则自动重新启动它们。您应该更改配置的 command 指令以反映所需的队列连接和工作人员选项。

WARNING

您应确保 stopwaitsecs 的值大于运行时间最长的作业所消耗的秒数。否则,Supervisor 可能会在作业完成处理之前终止该作业。

启动 Supervisor

创建配置文件后,您可以使用以下命令更新 Supervisor 配置并启动进程:

shell
sudo supervisorctl reread

sudo supervisorctl update

sudo supervisorctl start "laravel-worker:*"

有关 Supervisor 的更多信息,请参阅 Supervisor 文档

处理失败的作业

有时,排队的作业会失败。别担心,事情并不总是按计划进行!Laravel 提供了一种方便的方法来指定应尝试作业的最大次数。异步作业超过此尝试次数后,它将入到failed_jobs数据库表中。同步调度的失败作业不会存储在此表中,其异常将立即由应用程序处理。

用于创建 failed_jobs 表的迁移通常已经存在于新的 Laravel 应用程序中。但是,如果您的应用程序不包含此表的迁移,则可以使用 make:queue-failed-table 命令创建迁移:

shell
php artisan make:queue-failed-table

php artisan migrate

在运行队列工作进程时,您可以使用 queue:work 命令上的 --tries 开关指定应尝试作业的最大次数。如果您没有为 --tries 选项指定值,则作业将仅尝试一次或由 job 类的 $tries 属性指定的多次:

shell
php artisan queue:work redis --tries=3

使用 --backoff 选项,您可以指定 Laravel 在重试遇到异常的作业之前应等待多少秒。默认情况下,作业会立即释放回队列中,以便可以再次尝试:

shell
php artisan queue:work redis --tries=3 --backoff=3

如果您想配置每个作业在重试遇到异常的作业之前应等待多少秒,您可以通过在 job 类上定义 backoff 属性来实现:

php
    /**
     * The number of seconds to wait before retrying the job.
     *
     * @var int
     */
    public $backoff = 3;

如果您需要更复杂的逻辑来确定 Job 的退避时间,您可以在 Job 类中定义一个 backoff 方法:

php
    /**
    * Calculate the number of seconds to wait before retrying the job.
    */
    public function backoff(): int
    {
        return 3;
    }

您可以通过从 backoff 方法返回 backoff 值数组来轻松配置 “exponential” backoffs。在此示例中,第一次重试的重试延迟为 1 秒,第二次重试为 5 秒,第三次重试为 10 秒,如果剩余尝试次数更多,则后续每次重试的延迟为 10 秒:

php
    /**
    * Calculate the number of seconds to wait before retrying the job.
    *
    * @return array<int, int>
    */
    public function backoff(): array
    {
        return [1, 5, 10];
    }

作业失败后清理

当特定作业失败时,您可能希望向用户发送警报或恢复该作业部分完成的任何操作。为此,您可以在 job 类上定义一个失败的方法。导致作业失败的 Throwable 实例将被传递给 failed 方法:

php
    <?php

    namespace App\Jobs;

    use App\Models\Podcast;
    use App\Services\AudioProcessor;
    use Illuminate\Contracts\Queue\ShouldQueue;
    use Illuminate\Foundation\Queue\Queueable;
    use Throwable;

    class ProcessPodcast implements ShouldQueue
    {
        use Queueable;

        /**
         * Create a new job instance.
         */
        public function __construct(
            public Podcast $podcast,
        ) {}

        /**
         * Execute the job.
         */
        public function handle(AudioProcessor $processor): void
        {
            // Process uploaded podcast...
        }

        /**
         * Handle a job failure.
         */
        public function failed(?Throwable $exception): void
        {
            // Send user notification of failure, etc...
        }
    }

WARNING

在调用失败的方法之前实例化作业的新实例;因此,在 handle 方法中可能发生的任何类属性修改都将丢失。

重试失败的作业

要查看已插入 failed_jobs 数据库表的所有失败作业,您可以使用 queue:failed Artisan 命令:

shell
php artisan queue:failed

queue:failed 命令将列出作业 ID、连接、队列、失败时间以及有关作业的其他信息。作业 ID 可用于重试失败的作业。例如,要重试 ID 为 ce7bb17c-cdd8-41f0-a8ec-7b4fef4e5ece 的失败作业,请发出以下命令:

shell
php artisan queue:retry ce7bb17c-cdd8-41f0-a8ec-7b4fef4e5ece

如有必要,您可以将多个 ID 传递给命令:

shell
php artisan queue:retry ce7bb17c-cdd8-41f0-a8ec-7b4fef4e5ece 91401d2c-0784-4f43-824c-34f94a33c24d

您还可以重试特定队列的所有失败作业:

shell
php artisan queue:retry --queue=name

要重试所有失败的作业,请执行 queue:retry 命令并将 all 作为 ID 传递:

shell
php artisan queue:retry all

如果您想删除失败的作业,可以使用 queue:forget 命令:

shell
php artisan queue:forget 91401d2c-0784-4f43-824c-34f94a33c24d

NOTE

使用 Horizon 时,您应该使用 horizon:forget 命令而不是 queue:forget 命令来删除失败的作业。

要从 failed_jobs 表中删除所有失败的作业,您可以使用 queue:flush 命令:

shell
php artisan queue:flush

忽略缺失的模型

将 Eloquent 模型注入作业时,该模型会在放入队列之前自动序列化,并在处理作业时从数据库中重新检索。但是,如果在作业等待工作程序处理时删除了模型,则您的作业可能会失败并出现 ModelNotFoundException

为方便起见,您可以通过将作业的 deleteWhenMissingModels 属性设置为 true 来选择自动删除缺少模型的作业。当此属性设置为 true 时,Laravel 将悄悄地丢弃作业而不会引发异常:

php
    /**
     * Delete the job if its models no longer exist.
     *
     * @var bool
     */
    public $deleteWhenMissingModels = true;

修剪失败的作业

您可以通过调用 queue:prune-failed Artisan 命令来修剪应用程序的 failed_jobs 表中的记录:

shell
php artisan queue:prune-failed

默认情况下,所有超过 24 小时的失败作业记录都将被修剪。如果为命令提供 --hours 选项,则只会保留在过去 N 小时内插入的失败作业记录。例如,以下命令将删除超过 48 小时前插入的所有失败作业记录:

shell
php artisan queue:prune-failed --hours=48

在 DynamoDB 中存储失败的作业

Laravel 还支持将失败的作业记录存储在 DynamoDB 中,而不是关系数据库表中。但是,您必须手动创建一个 DynamoDB 表来存储所有失败的作业记录。通常,此表应命名为 failed_jobs,但您应该根据应用程序的队列配置文件中的 queue.failed.table 配置值的值来命名表。

failed_jobs 表应具有一个名为 application 的字符串主分区键和一个名为 uuid 的字符串主排序键。密钥的 application 部分将包含应用程序的名称,该名称由应用程序的 app 配置文件中的 name 配置值定义。由于应用程序名称是 DynamoDB 表键的一部分,因此您可以使用同一个表来存储多个 Laravel 应用程序的失败作业。

此外,请确保安装 AWS 开发工具包,以便您的 Laravel 应用程序可以与 Amazon DynamoDB 通信:

shell
composer require aws/aws-sdk-php

接下来,将 queue.failed.driver 配置选项的值设置为 dynamodb。此外,您应该在失败的作业配置数组中定义 keysecretregion 配置选项。这些选项将用于向 AWS 进行身份验证。使用 dynamodb 驱动程序时,queue.failed.database 配置选项是不必要的:

php
'failed' => [
    'driver' => env('QUEUE_FAILED_DRIVER', 'dynamodb'),
    'key' => env('AWS_ACCESS_KEY_ID'),
    'secret' => env('AWS_SECRET_ACCESS_KEY'),
    'region' => env('AWS_DEFAULT_REGION', 'us-east-1'),
    'table' => 'failed_jobs',
],

禁用失败的作业存储

您可以通过将 queue.failed.driver 配置选项的值设置为 null 来指示 Laravel 丢弃失败的作业而不存储它们。通常,这可以通过 QUEUE_FAILED_DRIVER 环境变量来完成:

ini
QUEUE_FAILED_DRIVER=null

失败的作业事件

如果您想注册一个将在作业失败时调用的事件侦听器,则可以使用 Queue Facade 的 failing 方法。例如,我们可以从 Laravel 附带的 AppServiceProviderboot 方法中将闭包附加到此事件:

php
    <?php

    namespace App\Providers;

    use Illuminate\Support\Facades\Queue;
    use Illuminate\Support\ServiceProvider;
    use Illuminate\Queue\Events\JobFailed;

    class AppServiceProvider extends ServiceProvider
    {
        /**
         * Register any application services.
         */
        public function register(): void
        {
            // ...
        }

        /**
         * Bootstrap any application services.
         */
        public function boot(): void
        {
            Queue::failing(function (JobFailed $event) {
                // $event->connectionName
                // $event->job
                // $event->exception
            });
        }
    }

从队列中清除作业

NOTE

使用 Horizon 时,您应该使用 horizon:clear 命令从队列中清除作业,而不是 queue:clear 命令。

如果您想从默认连接的默认队列中删除所有作业,您可以使用 queue:clear Artisan 命令来实现:

shell
php artisan queue:clear

您还可以提供 connection 参数和 queue 选项来从特定连接和队列中删除作业:

shell
php artisan queue:clear redis --queue=emails

WARNING

从队列中清除作业仅适用于 SQS、Redis 和数据库队列驱动程序。此外,SQS 消息删除过程最多需要 60 秒,因此在您清除队列后最多 60 秒内发送到 SQS 队列的作业也可能会被删除。

监控您的队列

如果您的队列收到突然涌入的作业,它可能会变得不堪重负,从而导致作业完成等待时间过长。如果您愿意,Laravel 可以在您的队列作业计数超过指定阈值时提醒您。

要开始使用,您应该安排 queue:monitor 命令每分钟运行一次。该命令接受您要监控的队列的名称以及所需的作业计数阈值:

shell
php artisan queue:monitor redis:default,redis:deployments --max=100

仅计划此命令不足以触发通知,提醒您队列处于不堪重负的状态。当命令遇到作业计数超过阈值的队列时,将调度事件 Illuminate\Queue\Events\QueueBusy 。您可以在应用程序的 AppServiceProvider 中监听此事件,以便向您或您的开发团队发送通知:

php
use App\Notifications\QueueHasLongWaitTime;
use Illuminate\Queue\Events\QueueBusy;
use Illuminate\Support\Facades\Event;
use Illuminate\Support\Facades\Notification;

/**
 * Bootstrap any application services.
 */
public function boot(): void
{
    Event::listen(function (QueueBusy $event) {
        Notification::route('mail', 'dev@example.com')
                ->notify(new QueueHasLongWaitTime(
                    $event->connection,
                    $event->queue,
                    $event->size
                ));
    });
}

测试

在测试分派作业的代码时,您可能希望指示 Laravel 不实际执行作业本身,因为可以直接测试作业的代码,并且可以独立于分派它的代码进行测试。当然,要测试 Job 本身,你可以实例化一个 Job 实例,并直接在测试中调用 handle 方法。

你可以使用 Queue Facade的 fake 方法来防止排队的作业实际被推送到队列中。在调用 Queue Facade的 fake 方法后,您可以断言应用程序尝试将作业推送到队列:

php
<?php

use App\Jobs\AnotherJob;
use App\Jobs\FinalJob;
use App\Jobs\ShipOrder;
use Illuminate\Support\Facades\Queue;

test('orders can be shipped', function () {
    Queue::fake();

    // Perform order shipping...

    // Assert that no jobs were pushed...
    Queue::assertNothingPushed();

    // Assert a job was pushed to a given queue...
    Queue::assertPushedOn('queue-name', ShipOrder::class);

    // Assert a job was pushed twice...
    Queue::assertPushed(ShipOrder::class, 2);

    // Assert a job was not pushed...
    Queue::assertNotPushed(AnotherJob::class);

    // Assert that a Closure was pushed to the queue...
    Queue::assertClosurePushed();

    // Assert the total number of jobs that were pushed...
    Queue::assertCount(3);
});
php
<?php

namespace Tests\Feature;

use App\Jobs\AnotherJob;
use App\Jobs\FinalJob;
use App\Jobs\ShipOrder;
use Illuminate\Support\Facades\Queue;
use Tests\TestCase;

class ExampleTest extends TestCase
{
    public function test_orders_can_be_shipped(): void
    {
        Queue::fake();

        // Perform order shipping...

        // Assert that no jobs were pushed...
        Queue::assertNothingPushed();

        // Assert a job was pushed to a given queue...
        Queue::assertPushedOn('queue-name', ShipOrder::class);

        // Assert a job was pushed twice...
        Queue::assertPushed(ShipOrder::class, 2);

        // Assert a job was not pushed...
        Queue::assertNotPushed(AnotherJob::class);

        // Assert that a Closure was pushed to the queue...
        Queue::assertClosurePushed();

        // Assert the total number of jobs that were pushed...
        Queue::assertCount(3);
    }
}

你可以将一个闭包传递给 assertPushedassertNotPushed 方法,以断言推送的作业通过了给定的 “真值测试”。如果推送了至少一个通过给定真值测试的作业,则断言将成功:

php
    Queue::assertPushed(function (ShipOrder $job) use ($order) {
        return $job->order->id === $order->id;
    });

伪造作业子集

如果你只需要伪造特定的作业,同时允许你的其他作业正常执行,你可以将应该伪造的作业的类名传递给 fake 方法:

php
test('orders can be shipped', function () {
    Queue::fake([
        ShipOrder::class,
    ]);

    // Perform order shipping...

    // Assert a job was pushed twice...
    Queue::assertPushed(ShipOrder::class, 2);
});
php
public function test_orders_can_be_shipped(): void
{
    Queue::fake([
        ShipOrder::class,
    ]);

    // Perform order shipping...

    // Assert a job was pushed twice...
    Queue::assertPushed(ShipOrder::class, 2);
}

你可以使用 except 方法伪造除一组指定作业之外的所有作业:

php
    Queue::fake()->except([
        ShipOrder::class,
    ]);

测试任务链

要测试任务链,您需要使用 Bus Facade 的伪造功能。Bus Facade的 assertChained 方法可用于断言已调度作业链assertChained 方法接受一个已链接作业的数组作为其第一个参数:

php
    use App\Jobs\RecordShipment;
    use App\Jobs\ShipOrder;
    use App\Jobs\UpdateInventory;
    use Illuminate\Support\Facades\Bus;

    Bus::fake();

    // ...

    Bus::assertChained([
        ShipOrder::class,
        RecordShipment::class,
        UpdateInventory::class
    ]);

正如你在上面的例子中看到的,链接的 Job 数组可能是 Job 的类名数组。但是,您也可以提供一组实际的作业实例。这样做时,Laravel 将确保 job 实例属于同一类,并且具有与应用程序 dispatch 的链式 job 相同的 property 值:

php
    Bus::assertChained([
        new ShipOrder,
        new RecordShipment,
        new UpdateInventory,
    ]);

你可以使用 assertDispatchedWithoutChain 方法来断言一个 Job 是在没有 Job 链的情况下推送的:

php
    Bus::assertDispatchedWithoutChain(ShipOrder::class);

测试链修改

如果链式作业将作业预置或附加到现有链中,则可以使用该作业的 assertHasChain 方法来断言该作业具有预期的剩余作业链:

php
$job = new ProcessPodcast;

$job->handle();

$job->assertHasChain([
    new TranscribePodcast,
    new OptimizePodcast,
    new ReleasePodcast,
]);

assertDoesntHaveChain 方法可用于断言作业的剩余链为空:

php
$job->assertDoesntHaveChain();

测试链式批处理

如果你的作业链包含一批作业,你可以通过在链断言中插入 Bus::chainedBatch 定义来断言链式批处理符合你的预期:

php
    use App\Jobs\ShipOrder;
    use App\Jobs\UpdateInventory;
    use Illuminate\Bus\PendingBatch;
    use Illuminate\Support\Facades\Bus;

    Bus::assertChained([
        new ShipOrder,
        Bus::chainedBatch(function (PendingBatch $batch) {
            return $batch->jobs->count() === 3;
        }),
        new UpdateInventory,
    ]);

测试作业批次

Bus Facade的 assertBatched 方法可用于断言已分派一批作业。给 assertBatched 方法的闭包接收一个 Illuminate\Bus\PendingBatch 的实例,该实例可用于检查批处理中的作业:

php
    use Illuminate\Bus\PendingBatch;
    use Illuminate\Support\Facades\Bus;

    Bus::fake();

    // ...

    Bus::assertBatched(function (PendingBatch $batch) {
        return $batch->name == 'import-csv' &&
               $batch->jobs->count() === 10;
    });

您可以使用 assertBatchCount 方法来断言已调度了给定数量的批次:

php
    Bus::assertBatchCount(3);

您可以使用 assertNothingBatched 来断言没有调度任何批次:

php
    Bus::assertNothingBatched();

测试作业/队列交互

此外,您可能偶尔需要测试单个作业与其底层批处理的交互。例如,您可能需要测试作业是否取消了对其批处理的进一步处理。为此,您需要通过 withFakeBatch 方法为作业分配一个虚构批次。withFakeBatch 方法返回一个包含作业实例和虚构批处理的元组:

php
    [$job, $batch] = (new ShipOrder)->withFakeBatch();

    $job->handle();

    $this->assertTrue($batch->cancelled());
    $this->assertEmpty($batch->added);

测试作业/队列交互

有时,您可能需要测试排队的作业是否将自身释放回队列中。或者,您可能需要测试作业是否自行删除。您可以通过实例化作业并调用 withFakeQueueInteractions 方法来测试这些队列交互。

一旦作业的队列交互被伪造,您就可以在作业上调用 handle 方法。调用作业后,可以使用 assertReleasedassertDeletedassertNotDeletedassertFailedassertNotFailed 方法对作业的队列交互进行断言:

php
use App\Jobs\ProcessPodcast;

$job = (new ProcessPodcast)->withFakeQueueInteractions();

$job->handle();

$job->assertReleased(delay: 30);
$job->assertDeleted();
$job->assertNotDeleted();
$job->assertFailed();
$job->assertNotFailed();

工作事件

使用 QueueFacade上beforeafter 方法,您可以指定要在处理排队作业之前或之后执行的回调。这些回调是为控制面板执行其他日志记录或增量统计数据的绝佳机会。通常,您应该从服务提供商boot 方法调用这些方法。例如,我们可以使用 Laravel 附带的 AppServiceProvider:

php
    <?php

    namespace App\Providers;

    use Illuminate\Support\Facades\Queue;
    use Illuminate\Support\ServiceProvider;
    use Illuminate\Queue\Events\JobProcessed;
    use Illuminate\Queue\Events\JobProcessing;

    class AppServiceProvider extends ServiceProvider
    {
        /**
         * Register any application services.
         */
        public function register(): void
        {
            // ...
        }

        /**
         * Bootstrap any application services.
         */
        public function boot(): void
        {
            Queue::before(function (JobProcessing $event) {
                // $event->connectionName
                // $event->job
                // $event->job->payload()
            });

            Queue::after(function (JobProcessed $event) {
                // $event->connectionName
                // $event->job
                // $event->job->payload()
            });
        }
    }

使用 QueueFacade上looping 方法,您可以指定在 worker 尝试从队列中获取作业之前执行的回调。例如,您可以注册一个 closure 来回滚以前失败的作业留下的任何未解决的事务:

php
    use Illuminate\Support\Facades\DB;
    use Illuminate\Support\Facades\Queue;

    Queue::looping(function () {
        while (DB::transactionLevel() > 0) {
            DB::rollBack();
        }
    });