队列
Queues
介绍
在构建 Web 应用程序时,你可能需要执行一些任务,例如解析和存储上传的 CSV 文件,这些任务在典型的 Web 请求期间需要很长时间才能执行。 值得庆幸的是,Laravel 允许你轻松创建可以在后台处理的队列任务。 通过将时间密集型任务移至队列,你的应用程序可以以极快的速度响应 Web 请求,并为你的客户提供更好的用户体验。
Laravel 队列为各种不同的队列驱动提供统一的队列 API,例如 Amazon SQS, Redis,甚至是关系型数据库。
Laravel 队列的配置选项存储在 config/queue.php
文件中。 在这个文件中,你可以找到框架中包含的每个队列驱动的连接配置,包括数据库, Amazon SQS, Redis, 和 Beanstalkd 驱动,以及一个会立即执行任务的同步驱动(用于本地开发)。还包括一个用于丢弃排队任务的 null
队列驱动。
注意
Laravel 提供了 Horizon ,适用于 Redis 驱动队列。 Horizon 是一个拥有漂亮仪表盘的配置系统。如需了解更多信息请查看完整的 Horizon 文档。
连接 vs 驱动
在开始使用 Laravel 队列之前,理解「连接」和「队列」之间的区别非常重要。 在 config/queue.php
配置文件中,有一个 connections
连接选项。 此选项定义连接某个驱动(如 Amazon SQS、Beanstalk 或 Redis)。然而,任何给定的队列连接都可能有多个「队列」,这些「队列」可能被认为是不同的堆栈或成堆的排队任务。
请注意, queue
配置文件中的每个连接配置示例都包含一个 queue
属性。这是将任务发送到给定连接时将被分配到的默认队列。换句话说,如果你没有显式地定义任务应该被发送到哪个队列,那么该任务将被放置在连接配置的 queue
属性中定义的队列上:
use App\Jobs\ProcessPodcast;
// 这个任务将被推送到默认队列...
ProcessPodcast::dispatch();
// 这个任务将被推送到「emails」队列...
ProcessPodcast::dispatch()->onQueue('emails');
有些应用程序可能不需要将任务推到多个队列中,而是倾向于使用一个简单的队列。然而,如果希望对任务的处理方式进行优先级排序或分段时,将任务推送到多个队列就显得特别有用,因为 Laravel 队列工作程序允许你指定哪些队列应该按优先级处理。例如,如果你将任务推送到一个 high
队列,你可能会运行一个赋予它们更高处理优先级的 worker:
php artisan queue:work --queue=high,default
驱动程序说明和先决条件
数据库
要使用 database
队列驱动,你需要一个数据库表来保存任务。通常,这包含在 Laravel 默认的 0001_01_01_000002_create_jobs_table.php
数据库迁移 中; 然而,如果你的应用程序不包含此迁移,可以使用 make:queue-table
Artisan 命令创建它:
php artisan make:queue-table
php artisan migrate
Redis
要使用 redis
队列驱动程序,需要在 config/database.php
配置文件中配置一个 redis 数据库连接。
注意
serializer
和compression
选项不被redis
队列驱动支持。
Redis 集群
如果你的 Redis 队列当中使用了 Redis 集群,那么你的队列名称就必须包含一个 key hash tag。这是为了确保一个给定队列的所有 Redis 键都被放在同一个哈希槽中:
'redis' => [
'driver' => 'redis',
'connection' => env('REDIS_QUEUE_CONNECTION', 'default'),
'queue' => env('REDIS_QUEUE', '{default}'),
'retry_after' => env('REDIS_QUEUE_RETRY_AFTER', 90),
'block_for' => null,
'after_commit' => false,
],
阻塞
在使用 Redis 队列时,您可以使用 block_for
配置选项来指定在遍历 worker 循环和重新轮询 Redis 数据库之前,驱动程序需要等待多长时间才能使任务变得可用。
根据你的队列负载调整此值要比连续轮询 Redis 数据库中的新任务更加有效。例如,你可以将值设置为 5
以指示驱动程序在等待任务变得可用时应该阻塞 5 秒:
'redis' => [
'driver' => 'redis',
'connection' => env('REDIS_QUEUE_CONNECTION', 'default'),
'queue' => env('REDIS_QUEUE', 'default'),
'retry_after' => env('REDIS_QUEUE_RETRY_AFTER', 90),
'block_for' => 5,
'after_commit' => false,
],
注意
将block_for
设置成0
将导致队列任务一直阻塞,直到某一个任务变得可用。这也将防止在下一个任务被处理之前处理诸如 SIGTERM 之类的信号。
其他驱动所需的先决条件
列出的队列驱动需要如下的依赖,这些依赖可通过 Composer 包管理器进行安装:
- Amazon SQS:
aws/aws-sdk-php ~3.0
- Beanstalkd:
pda/pheanstalk ~5.0
- Redis:
predis/predis ~2.0
或 phpredis PHP 扩展
创建任务
生成任务类
默认情况下,应用程序的所有的可排队任务都被存储在了app/Jobs
目录中。如果 app/Jobs
目录不存在,当你运行 make:job
Artisan 命令时,将会自动创建该目录:
php artisan make:job ProcessPodcast
生成的类将会实现 Illuminate\Contracts\Queue\ShouldQueue
接口, 告诉 Laravel ,该任务应该推入队列以异步的方式运行。
技巧
任务模板可以使用 模板发布 来自定义任务 stub 。
任务类结构
任务类非常简单,通常只包含一个 handle
方法,在队列处理任务时将会调用它。让我们看一个任务类的示例。在这个例子中,我们假设我们管理一个 podcast 服务,并且需要在上传的 podcast 文件发布之前对其进行处理:
<?php
namespace App\Jobs;
use App\Models\Podcast;
use App\Services\AudioProcessor;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;
class ProcessPodcast implements ShouldQueue
{
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
/**
* 创建一个新的任务实例.
*/
public function __construct(
public Podcast $podcast,
) {}
/**
* 运行任务.
*/
public function handle(AudioProcessor $processor): void
{
// 处理上传的 podcast...
}
}
在此示例中,请注意我们能够直接将 Eloquent模型 传递给队列任务的构造函数。由于任务使用了SerializesModels
特性,Eloquent模型及其加载的关系将会在任务处理时优雅地进行序列化和反序列化。
如果您的队列任务在其构造函数中接受了一个Eloquent模型,只有模型的标识符会被序列化到队列中。当任务实际被处理时,队列系统会自动从数据库中重新获取完整的模型实例及其加载的关系。这种模型序列化的方法使得发送到队列驱动程序的任务负载要小得多。
<code>handle</code> 方法依赖注入
当任务由队列处理时,会调用 handle
方法。请注意,我们能够在任务的 handle
方法上类型提示依赖项。Laravel的 服务容器 会自动注入这些依赖。
如果您希望完全控制服务容器如何将依赖项注入到 handle
方法中,可以使用容器的 bindMethod
方法。bindMethod
方法接受一个回调,该回调接收任务实例和容器本身。在回调内,您可以自由地以任何方式调用 handle
方法。通常,您应该在 App\Providers\AppServiceProvider
服务提供者 的 boot
方法中调用此方法:
use App\Jobs\ProcessPodcast;
use App\Services\AudioProcessor;
use Illuminate\Contracts\Foundation\Application;
$this->app->bindMethod([ProcessPodcast::class, 'handle'], function (ProcessPodcast $job, Application $app) {
return $job->handle($app->make(AudioProcessor::class));
});
警告
二进制数据,如原始图像内容,在传递给队列任务之前应通过base64_encode
函数进行编码。否则,任务在放入队列时可能无法正确地序列化为JSON。
队列关系
因为加载的关系也会被序列化,所以处理序列化任务的字符串有时会变得相当大。此外,当任务反序列化并且从数据库重新检索模型关系时,它们将被完整检索。在任务队列过程中模型被序列化之前应用的任何关系关联,在作业反序列化时不会应用。因此,如果你希望在队列任务中处理给定关系的一个子集,你应该在队列作业中重新限定该关系。
或者为了防止该关系被序列化,可以在设置属性值时对模型调用 withoutRelations
方法。此方法将返回没有加载关系的模型实例:
/**
* 创建一个新的任务实例.
*/
public function __construct(Podcast $podcast)
{
$this->podcast = $podcast->withoutRelations();
}
如果使用 PHP 构造函数,并希望指定一个 Eloquent 模型不应该序列化它的关系,你可以使用 WithoutRelations
属性 :
use Illuminate\Queue\Attributes\WithoutRelations;
/**
* 创建一个新的任务实例.
*/
public function __construct(
#[WithoutRelations]
public Podcast $podcast
) {
}
如果任务接收到的是 Eloquent 模型的集合或数组而不是单个模型,则集合内的模型在作业反序列化和执行时不会恢复它们的关系。这样做是为了防止处理大量模型的作业产生过多的资源使用。
唯一任务
注意
唯一任务需要支持 locks 的缓存驱动程序。 目前,memcached
,redis
,dynamodb
,database
,file
, 和array
缓存驱动支持原子锁。 此外,独特的任务约束不适用于批次内的任务。
有时,你可能希望确保在任何时间点队列中只有一个特定任务的实例。你可以通过在你的工作类上实现 ShouldBeUnique
接口来做到这一点。这个接口不需要你在你的类上定义任何额外的方法:
<?php
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Contracts\Queue\ShouldBeUnique;
class UpdateSearchIndex implements ShouldQueue, ShouldBeUnique
{
...
}
以上示例中, UpdateSearchIndex
任务是唯一的。因此,如果任务的另一个实例已经在队列中并且尚未完成处理,则不会分派该任务。
在某些情况下,你可能想要定义一个使任务唯一的特定「键」,或者你可能想要指定一个超时时间,超过该时间任务不再保持唯一。为此,你可以在任务类上定义 uniqueId
和 uniqueFor
属性或方法:
<?php
use App\Models\Product;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Contracts\Queue\ShouldBeUnique;
class UpdateSearchIndex implements ShouldQueue, ShouldBeUnique
{
/**
* 产品实例.
*
* @var \App\Product
*/
public $product;
/**
* 任务的唯一锁将在多少秒后被释放。
*
* @var int
*/
public $uniqueFor = 3600;
/**
* 任务的唯一锁将被释放的秒数。
*/
public function uniqueId(): string
{
return $this->product->id;
}
}
以上示例中, UpdateSearchIndex
任务中的 product ID 是唯一的。因此,在现有任务完成处理之前,任何具有相同 product ID 的任务都将被忽略。此外,如果现有任务在一小时内没有得到处理,则释放唯一锁,并将具有相同唯一键的另一个任务分派到该队列。
注意
如果你的应用程序从多个 web 服务器或容器分派任务,你应该确保你的所有服务器都与同一个中央缓存服务器通信,以便 Laravel 能够准确确定任务是否唯一。
在任务处理开始前保证唯一
默认情况下,一个唯一的任务在处理完成后或者在所有重试尝试失败后会「解锁」。然而,在某些情况下,你可能希望你的任务在处理之前立即解锁。要做到这一点,你的任务应该实现 ShouldBeUniqueUntilProcessing
契约而不是 ShouldBeUnique
契约:
<?php
use App\Models\Product;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Contracts\Queue\ShouldBeUniqueUntilProcessing;
class UpdateSearchIndex implements ShouldQueue, ShouldBeUniqueUntilProcessing
{
// ...
}
唯一任务锁
在底层实现中,当一个 ShouldBeUnique
任务被派发时,Laravel 会尝试使用 uniqueId
键获得一个 锁。如果没有获得锁,任务就不会被派发。这个锁在任务处理完成或所有重试尝试失败后会被释放。默认情况下,Laravel 将使用默认缓存驱动来获取这个锁。然而,如果你希望使用另一个驱动来获取锁,你可以定义一个 uniqueVia
方法,该方法返回应该使用的缓存驱动:
use Illuminate\Contracts\Cache\Repository;
use Illuminate\Support\Facades\Cache;
class UpdateSearchIndex implements ShouldQueue, ShouldBeUnique
{
...
/**
* 获取唯一任务锁的缓存驱动。
*/
public function uniqueVia(): Repository
{
return Cache::driver('redis');
}
}
技巧
如果只需要限制任务的并发处理,请改用WithoutOverlapping
任务中间件。
加密任务
Laravel 允许你通过加密来确保任务数据的隐私和完整性。要开始使用,只需向作业类添加 ShouldBeEncrypted
接口。一旦在类上添加了这个接口,Laravel 将自动加密你的任务后再推送到队列:
<?php
use Illuminate\Contracts\Queue\ShouldBeEncrypted;
use Illuminate\Contracts\Queue\ShouldQueue;
class UpdateSearchIndex implements ShouldQueue, ShouldBeEncrypted
{
// ...
}
任务中间件
任务中间件允许你围绕排队任务的执行封装自定义逻辑,从而减少了任务本身的样板代码。例如,看下面的 handle
方法,它利用了 Laravel 的 Redis 速率限制特性,允许每 5 秒只处理一个任务:
use Illuminate\Support\Facades\Redis;
/**
* Execute the job.
*/
public function handle(): void
{
Redis::throttle('key')->block(0)->allow(1)->every(5)->then(function () {
info('取得了锁...');
// 处理任务...
}, function () {
// 无法获取锁...
return $this->release(5);
});
}
虽然这段代码是有效的, 但是 handle
方法的结构却变得杂乱,因为它掺杂了 Redis 速率限制逻辑。此外,其他任务需要使用速率限制的时候,只能将限制逻辑复制一次。
我们可以定义一个处理速率限制的任务中间件,而不是在 handle 方法中定义速率限制。Laravel 没有任务中间件的默认位置,所以你可以将任务中间件放置在你喜欢的任何位置。在本例中,我们将把中间件放在 app/Jobs/Middleware
目录:
<?php
namespace App\Jobs\Middleware;
use Closure;
use Illuminate\Support\Facades\Redis;
class RateLimited
{
/**
* 处理队列任务
*
* @param \Closure(object): void $next
*/
public function handle(object $job, Closure $next): void
{
Redis::throttle('key')
->block(0)->allow(1)->every(5)
->then(function () use ($job, $next) {
// 已获得锁...
$next($job);
}, function () use ($job) {
// 没有获取到锁...
$job->release(5);
});
}
}
正如你看到的,类似于 路由中间件,任务中间件接收正在处理队列任务以及一个回调来继续处理队列任务。
在任务中间件被创建以后,他们可能被关联到通过从任务的 middleware
方法返回的任务。这个方法并不存在于 make:job
Artisan 命令搭建的任务中,所以你需要将它添加到你自己的任务类的定义中:
use App\Jobs\Middleware\RateLimited;
/**
* 获取一个可以被传递通过的中间件任务
*
* @return array<int, object>
*/
public function middleware(): array
{
return [new RateLimited];
}
技巧
任务中间件也可以分配其他可队列处理的监听事件当中,比如邮件,通知等。
访问限制
尽管我们刚刚演示了如何编写自己的访问限制的任务中间件,但 Laravel 实际上内置了一个访问限制中间件,你可以利用它来限制任务。与 路由限流器 一样,任务访问限制器是使用 RateLimiter
facade 的 for
方法定义的。
例如,你可能希望允许用户每小时备份一次数据,但不对高级客户施加此类限制。为此,可以在 AppServiceProvider
的 boot
方法中定义 RateLimiter
:
use Illuminate\Cache\RateLimiting\Limit;
use Illuminate\Support\Facades\RateLimiter;
/**
* 注册应用程序服务
*/
public function boot(): void
{
RateLimiter::for('backups', function (object $job) {
return $job->user->vipCustomer()
? Limit::none()
: Limit::perHour(1)->by($job->user->id);
});
}
在上面的例子中,我们定义了一个小时访问限制;但是,你可以使用 perMinute
方法轻松定义基于分钟的访问限制。此外,你可以将任何值传递给访问限制的 by
方法,但是,这个值通常用于按客户来区分不同的访问限制:
return Limit::perMinute(50)->by($job->user->id);
定义速率限制后,你可以使用 Illuminate\Queue\Middleware\RateLimited
中间件将速率限制器附加到备份任务。 每次任务超过速率限制时,此中间件都会根据速率限制持续时间以适当的延迟将任务释放回队列。
use Illuminate\Queue\Middleware\RateLimited;
/**
* 获取任务时,应该通过的中间件
*
* @return array<int, object>
*/
public function middleware(): array
{
return [new RateLimited('backups')];
}
将速率受限的任务释放回队列仍然会增加任务的 「尝试」总数。你可能希望相应地调整你的任务类上的 tries
和 maxExceptions
属性。或者,你可能希望使用 retryUntil
方法 来定义不再尝试任务之前的时间量。
如果你不想在速率限制时重试任务,你可以使用 dontRelease
方法:
/**
* 获取任务时,应该通过的中间件
*
* @return array<int, object>
*/
public function middleware(): array
{
return [(new RateLimited('backups'))->dontRelease()];
}
技巧
如果你使用 Redis,你可以使用 Illuminate\Queue\Middleware\RateLimitedWithRedis 中间件,它针对 Redis 进行了微调,比基本的限速中间件更高效。
防止任务重叠
Laravel 包含一个 Illuminate\Queue\Middleware\WithoutOverlapping
中间件,允许你根据任意键防止任务重叠。当排队的任务正在修改一次只能由一个任务修改的资源时,这会很有帮助。
例如,假设你有一个更新用户信用评分的排队任务,并且你希望防止同一用户 ID 的信用评分更新任务重叠。为此,你可以从任务的 middleware
方法返回 WithoutOverlapping
中间件:
use Illuminate\Queue\Middleware\WithoutOverlapping;
/**
* 获取任务时,应该通过的中间件。
*
* @return array<int, object>
*/
public function middleware(): array
{
return [new WithoutOverlapping($this->user->id)];
}
任何重叠的任务都将被释放回队列。你还可以指定再次尝试释放的任务之前必须经过的秒数:
/**
* 获取任务时,应该通过的中间件。
*
* @return array<int, object>
*/
public function middleware(): array
{
return [(new WithoutOverlapping($this->order->id))->releaseAfter(60)];
}
如果你想立即删除任何重叠的任务,你可以使用 dontRelease
方法,这样它们就不会被重试:
/**
* 获取任务时,应该通过的中间件。
*
* @return array<int, object>
*/
public function middleware(): array
{
return [(new WithoutOverlapping($this->order->id))->dontRelease()];
}
WithoutOverlapping
中间件由 Laravel 的原子锁特性提供支持。有时,你的任务可能会以未释放锁的方式意外失败或超时。因此,你可以使用 expireAfter
方法显式定义锁定过期时间。例如,下面的示例将指示 Laravel 在任务开始处理三分钟后释放 WithoutOverlapping
锁:
/**
* 获取任务时,应该通过的中间件。
*
* @return array<int, object>
*/
public function middleware(): array
{
return [(new WithoutOverlapping($this->order->id))->expireAfter(180)];
}
注意
WithoutOverlapping
中间件需要支持 locks 的缓存驱动程序。目前,memcached
、redis
、dynamodb
、database
、file
和array
缓存驱动支持原子锁。
跨任务类别共享锁
默认情况下,WithoutOverlapping
中间件只会阻止同一类的重叠任务。 因此,尽管两个不同的任务类可能使用相同的锁,但不会阻止它们重叠。 但是,你可以使用 shared
方法指示 Laravel 跨任务类应用锁:
use Illuminate\Queue\Middleware\WithoutOverlapping;
class ProviderIsDown
{
// ...
public function middleware(): array
{
return [
(new WithoutOverlapping("status:{$this->provider}"))->shared(),
];
}
}
class ProviderIsUp
{
// ...
public function middleware(): array
{
return [
(new WithoutOverlapping("status:{$this->provider}"))->shared(),
];
}
}
节流限制异常
Laravel 包含一个 Illuminate\Queue\Middleware\ThrottlesExceptions
中间件,允许你限制异常。一旦任务抛出给定数量的异常,所有进一步执行该任务的尝试都会延迟,直到经过指定的时间间隔。该中间件对于与不稳定的第三方服务交互的任务特别有用。
例如,让我们想象一个队列任务与开始抛出异常的第三方 API 交互。要限制异常,你可以从任务的 middleware
方法返回 ThrottlesExceptions
中间件。通常,此中间件应与实现 基于时间的尝试的任务配对:
use DateTime;
use Illuminate\Queue\Middleware\ThrottlesExceptions;
/**
* 获取任务时,应该通过的中间件。
*
* @return array<int, object>
*/
public function middleware(): array
{
return [new ThrottlesExceptions(10, 5)];
}
/**
* 确定任务应该超时的时间。
*/
public function retryUntil(): DateTime
{
return now()->addMinutes(5);
}
中间件接受的第一个构造函数参数是任务在被限制之前可以抛出的异常数,而第二个构造函数参数是在任务被限制后再次尝试之前应该经过的分钟数。在上面的代码示例中,如果任务在 5 分钟内抛出 10 个异常,我们将等待 5 分钟,然后再次尝试该任务。
当任务抛出异常但尚未达到异常阈值时,通常会立即重试该任务。但是,你可以通过在将中间件附加到任务时调用 backoff
方法来指定此类任务应延迟的分钟数:
use Illuminate\Queue\Middleware\ThrottlesExceptions;
/**
* 获取任务时,应该通过的中间件
*
* @return array<int, object>
*/
public function middleware(): array
{
return [(new ThrottlesExceptions(10, 5))->backoff(5)];
}
在内部,这个中间件使用 Laravel 的缓存系统来实现速率限制,并利用任务的类名作为缓存 「键」。 在将中间件附加到任务时,你可以通过调用 by
法来覆盖此键。 如果你有多个任务与同一个第三方服务交互并且你希望它们共享一个共同的节流 「桶」,这可能会很有用:
use Illuminate\Queue\Middleware\ThrottlesExceptions;
/**
* 获取任务时,应该通过的中间件
*
* @return array<int, object>
*/
public function middleware(): array
{
return [(new ThrottlesExceptions(10, 10))->by('key')];
}
默认情况下,这个中间件将限制每个异常。你可以通过在附加中间件到任务时调用 when
方法来修改这种行为。只有当提供给 when
方法的闭包返回 true
时,异常才会被节流:
use Illuminate\Http\Client\HttpClientException;
use Illuminate\Queue\Middleware\ThrottlesExceptions;
/**
* 获取任务时,应该通过的中间件
*
* @return array<int, object>
*/
public function middleware(): array
{
return [(new ThrottlesExceptions(10, 10))->when(
fn (Throwable $throwable) => $throwable instanceof HttpClientException
)];
}
如果你希望将被节流的异常报告给你的应用程序的异常处理程序,你可以在附加中间件到任务时调用 report
方法来做到这一点。你也可以提供一个闭包给 report
方法,并且如果给定闭包返回 true
,异常才会被报告:
use Illuminate\Http\Client\HttpClientException;
use Illuminate\Queue\Middleware\ThrottlesExceptions;
/**
* 获取任务时,应该通过的中间件
*
* @return array<int, object>
*/
public function middleware(): array
{
return [(new ThrottlesExceptions(10, 10))->report(
fn (Throwable $throwable) => $throwable instanceof HttpClientException
)];
}
技巧
如果你使用的是 Redis,你可以使用Illuminate\Queue\Middleware\ThrottlesExceptionsWithRedis
中间件,它为 Redis 进行了优化,比基本的异常节流中间件更高效。
调度任务
一旦你写好了你的任务类,你可以使用任务本身的 dispatch
方法来调度它。传递给 dispatch
方法的参数将被提供给任务的构造函数:
<?php
namespace App\Http\Controllers;
use App\Http\Controllers\Controller;
use App\Jobs\ProcessPodcast;
use App\Models\Podcast;
use Illuminate\Http\RedirectResponse;
use Illuminate\Http\Request;
class PodcastController extends Controller
{
/**
* 存储一个新的播客。
*/
public function store(Request $request): RedirectResponse
{
$podcast = Podcast::create(/* ... */);
// ...
ProcessPodcast::dispatch($podcast);
return redirect('/podcasts');
}
}
如果你想有条件地分派任务,你可以使用 dispatchIf
和 dispatchUnless
方法:
ProcessPodcast::dispatchIf($accountActive, $podcast);
ProcessPodcast::dispatchUnless($accountSuspended, $podcast);
在新的 Laravel 应用程序中,默认的队列驱动是 sync
驱动。这个驱动在当前请求的前台同步执行任务,通常在本地开发中很方便。如果你想在后台处理队列任务,你可以在应用程序的 config/queue.php
配置文件中指定不同的队列驱动。
延迟调度
如果你想指定一个任务不应该立即可供队列工作器处理,当调度任务时,你可以使用 delay
方法。例如,让我们指定一个任务应该在调度后 10 分钟才能被处理:
<?php
namespace App\Http\Controllers;
use App\Http\Controllers\Controller;
use App\Jobs\ProcessPodcast;
use App\Models\Podcast;
use Illuminate\Http\RedirectResponse;
use Illuminate\Http\Request;
class PodcastController extends Controller
{
/**
* 储存一个新的播客
*/
public function store(Request $request): RedirectResponse
{
$podcast = Podcast::create(/* ... */);
// ...
ProcessPodcast::dispatch($podcast)
->delay(now()->addMinutes(10));
return redirect('/podcasts');
}
}
注意
Amazon SQS 队列服务的最大延迟时间为 15 分钟。
响应发送到浏览器后调度
另外,如果你的 web 服务器使用 FastCGI,dispatchAfterResponse
方法延迟调度作业,直到 HTTP 响应发送到用户浏览器之后。这将仍然允许用户开始使用应用程序,即使一个排队的任务仍然在执行。这通常只应该用于执行时间大约一秒钟的任务,例如发送电子邮件。由于它们是在当前 HTTP 请求中处理的,以这种方式调度的任务不需要运行队列工作器就能被处理:
use App\Jobs\SendNotification;
SendNotification::dispatchAfterResponse();
你也可以 dispatch
一个闭包并将 afterResponse
方法链式调用到 dispatch
帮助器上,以在发送 HTTP 响应后执行闭包:
use App\Mail\WelcomeMessage;
use Illuminate\Support\Facades\Mail;
dispatch(function () {
Mail::to('[email protected]')->send(new WelcomeMessage);
})->afterResponse();
同步调度
如果你想立即(同步)调度任务,你可以使用 dispatchSync
方法。使用此方法时,任务不会排队,会在当前进程内立即执行:
<?php
namespace App\Http\Controllers;
use App\Http\Controllers\Controller;
use App\Jobs\ProcessPodcast;
use App\Models\Podcast;
use Illuminate\Http\RedirectResponse;
use Illuminate\Http\Request;
class PodcastController extends Controller
{
/**
* 存储一个新的播客
*/
public function store(Request $request): RedirectResponse
{
$podcast = Podcast::create(/* ... */);
// 创建播客...
ProcessPodcast::dispatchSync($podcast);
return redirect('/podcasts');
}
}
作业和数据库事务
在数据库事务中调度作业是完全可以的,但你应该特别注意确保你的作业实际上能够成功执行。在事务中调度作业时,有可能作业在父事务提交之前就被工作器处理了。当这种情况发生时,你在数据库事务期间对模型或数据库记录所做的任何更新可能还没有反映在数据库中。此外,在事务中创建的任何模型或数据库记录可能还不存在于数据库中。
幸运的是,Laravel 提供了几种方法来解决这个问题。首先,你可以在队列连接的配置数组中设置 after_commit
连接选项:
'redis' => [
'driver' => 'redis',
// ...
'after_commit' => true,
],
当 after_commit
选项为 true
, 你可以在数据库事务中调度作业;但是,Laravel 会等到打开的父数据库事务提交后再实际调度作业。当然,如果目前没有开启的数据库事务,作业将被立即调度。
如果事务因在事务期间发生的异常而回滚,那么在该事务期间调度的作业将被丢弃。
[!NOTE]
将after_commit
配置选项设置为true
还会导致任何排队的事件监听器、邮件、通知和广播事件在所有打开的数据库事务提交后被调度。
内联指定提交调度行为
如果你没有将 after_commit
队列连接配置选项设置为 true
,你仍然可以指定特定作业应在所有打开的数据库事务提交后被调度。为此,你可以将 afterCommit
方法链接到你的调度操作:
use App\Jobs\ProcessPodcast;
ProcessPodcast::dispatch($podcast)->afterCommit();
类似地,如果 after_commit
配置选项被设置为 true
,你可以指定特定作业应立即调度,无需等待任何打开的数据库事务提交:
ProcessPodcast::dispatch($podcast)->beforeCommit();
任务链
任务链允许你指定一组应在主任务成功执行后按顺序运行的排队任务。如果序列中的一个任务失败,其余的任务将不会运行。要执行一个排队的任务链,你可以使用 Bus
facade 提供的 chain
方法。Laravel 的命令总线是建立在排队作业调度之上的底层组件::
use App\Jobs\OptimizePodcast;
use App\Jobs\ProcessPodcast;
use App\Jobs\ReleasePodcast;
use Illuminate\Support\Facades\Bus;
Bus::chain([
new ProcessPodcast,
new OptimizePodcast,
new ReleasePodcast,
])->dispatch();
除了链式调用作业类实例,你还可以链式闭包:
Bus::chain([
new ProcessPodcast,
new OptimizePodcast,
function () {
Podcast::update(/* ... */);
},
])->dispatch();
注意
在任务中使用$this->delete()
方法删除任务不会阻止链式任务的处理。只有当链中的任务失败时,链才会停止执行。
链式连接 & 队列
如果要指定链式任务应使用的连接和队列,可以使用 onConnection
和 onQueue
方法。这些方法指定应使用的队列连接和队列名称,除非为排队任务显式分配了不同的连接 / 队列:
Bus::chain([
new ProcessPodcast,
new OptimizePodcast,
new ReleasePodcast,
])->onConnection('redis')->onQueue('podcasts')->dispatch();
向链中添加任务
有时,你可能需要从链中的另一个任务内向现有的任务链前置或后置添加任务。你可以使用 prependToChain
和 appendToChain
方法来实现这一点:
/**
* 执行任务。
*/
public function handle(): void
{
// ...
// 前置到当前链,在当前任务之后立即运行任务...
$this->prependToChain(new TranscribePodcast);
// 后置到当前链,在链末尾运行任务...
$this->appendToChain(new TranscribePodcast);
}
链式失败
在链式任务中,你可以使用 catch
方法来指定一个闭包,该闭包在链中的任务失败时被调用。给定的回调将接收导致任务失败的 Throwable
实例:
use Illuminate\Support\Facades\Bus;
use Throwable;
Bus::chain([
new ProcessPodcast,
new OptimizePodcast,
new ReleasePodcast,
])->catch(function (Throwable $e) {
// 链中的某个任务失败了...
})->dispatch();
警告
由于链回调被序列化并在稍后由 Laravel 队列执行,因此你不应在链回调中使用$this
变量。
自定义队列连接
分发到特定队列
通过将任务推送到不同的队列,你可以「分类」你的队列任务,甚至可以优先分配多少个 worker 到各种队列。请记住,这并不会将任务推送到队列配置文件中定义的不同队列「连接」,而只是推送到单个连接内的特定队列。要指定队列,在分发任务时使用 onQueue
方法:
<?php
namespace App\Http\Controllers;
use App\Http\Controllers\Controller;
use App\Jobs\ProcessPodcast;
use App\Models\Podcast;
use Illuminate\Http\RedirectResponse;
use Illuminate\Http\Request;
class PodcastController extends Controller
{
/**
* 存储一个新的播客。
*/
public function store(Request $request): RedirectResponse
{
$podcast = Podcast::create(/* ... */);
// 创建播客...
ProcessPodcast::dispatch($podcast)->onQueue('processing');
return redirect('/podcasts');
}
}
或者,你可以在任务的构造函数中调用 onQueue
方法来指定任务的队列:
<?php
namespace App\Jobs;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;
class ProcessPodcast implements ShouldQueue
{
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
/**
* 创建一个新的任务实例。
*/
public function __construct()
{
$this->onQueue('processing');
}
}
调度到特定连接
如果你的应用与多个队列连接交互,你可以使用 onConnection
方法来指定任务应推送到的连接:
<?php
namespace App\Http\Controllers;
use App\Http\Controllers\Controller;
use App\Jobs\ProcessPodcast;
use App\Models\Podcast;
use Illuminate\Http\RedirectResponse;
use Illuminate\Http\Request;
class PodcastController extends Controller
{
/**
* 存储一个新的播客。
*/
public function store(Request $request): RedirectResponse
{
$podcast = Podcast::create(/* ... */);
// 创建播客...
ProcessPodcast::dispatch($podcast)->onConnection('sqs');
return redirect('/podcasts');
}
}
你可以链式调用 onConnection
和 onQueue
方法来指定任务的连接和队列:
ProcessPodcast::dispatch($podcast)
->onConnection('sqs')
->onQueue('processing');
或者,你可以在任务的构造函数中调用 onConnection
方法来指定任务的连接:
<?php
namespace App\Jobs;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;
class ProcessPodcast implements ShouldQueue
{
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
/**
* 创建一个新的作业实例。
*/
public function __construct()
{
$this->onConnection('sqs');
}
}
指定最大任务尝试 次数/ 超时值
最大尝试次数
如果你的队列任务遇到错误,你可能不希望它无限期地重试。因此,Laravel 提供了多种方法来指定任务可以尝试的次数或持续时间。
指定作业可能尝试的最大次数的一种方法是通过 Artisan 命令行的 --tries
开关。这将适用于调度作业的所有任务,除非正在处理的任务指定了它最大尝试次数:
php artisan queue:work --tries=3
如果一个任务超过其最大尝试次数,它将被视为「失败」的任务。有关处理失败任务的更多信息,请查看处理失败队列。如果给 queue:work
命令提供了 --tries=0
,任务将无限次重试。
你可以采取更细粒度的方法,通过在任务类本身定义任务可以尝试的最大次数。如果在任务上指定了最大尝试次数,它将优先于命令行上提供的 --tries
值:
<?php
namespace App\Jobs;
class ProcessPodcast implements ShouldQueue
{
/**
* 任务可尝试的次数
*
* @var int
*/
public $tries = 5;
}
如果你需要对特定任务的最大尝试次数进行动态控制,你可以在作业上定义一个 tries
方法:
/**
* 确定任务可以尝试的次数。
*/
public function tries(): int
{
return 5;
}
基于时间的尝试
作为定义任务在失败前可尝试次数的替代方式,你可以定义一个任务应该超时的时间。这允许在给定时间范围内任意次数地尝试任务。要定义任务超时的时间,请在任务类中添加一个 retryUntil
方法。此方法应该返回一个 DateTime
实例:
use DateTime;
/**
* 确定任务应当超时的时间。
*/
public function retryUntil(): DateTime
{
return now()->addMinutes(10);
}
注意
你也可以在队列事件监听器上定义tries
属性或retryUntil
方法。
最大异常数
有时你可能希望指定一个任务可以尝试多次,但如果重试是由给定数量的未处理异常触发的(而不是直接被 release
方法释放的),应该失败。为此,你可以在任务类中定义一个 maxExceptions
属性:
<?php
namespace App\Jobs;
use Illuminate\Support\Facades\Redis;
class ProcessPodcast implements ShouldQueue
{
/**
* 可以尝试任务的次数
*
* @var int
*/
public $tries = 25;
/**
* 失败前允许的最大未处理异常数
*
* @var int
*/
public $maxExceptions = 3;
/**
* 执行
*/
public function handle(): void
{
Redis::throttle('key')->allow(10)->every(60)->then(function () {
// 获得锁,处理播客...
}, function () {
// 无法获取锁...
return $this->release(10);
});
}
}
在此示例中,如果应用程序无法获得 Redis 锁,则该任务将在 10 秒后被释放,并将继续重试最多 25 次。但是,如果任务抛出三个未处理的异常,则任务将失败。
超时
通常,你大致知道你的排队任务需要多长时间。因此,Laravel 允许你指定一个「超时」值。默认情况下,超时值为 60 秒。如果任务的处理时间超过超时值指定的秒数,处理任务的工作进程将退出并报错。通常,工作进程将由服务器上配置的进程管理器自动重启。
任务可以运行的最大秒数可以使用 Artisan 命令行上的 --timeout
开关指定:
php artisan queue:work --timeout=30
如果任务因不断超时超过其最大尝试次数,它将被标记为失败。
你还可以在任务类本身定义任务允许运行的最大秒数。如果在任务上指定了超时时间,它将优先于命令行上指定的任何超时时间:
<?php
namespace App\Jobs;
class ProcessPodcast implements ShouldQueue
{
/**
* 在超时之前任务可以运行的秒数.
*
* @var int
*/
public $timeout = 120;
}
有时,诸如 socket 或外部 HTTP 连接之类的 IO 阻塞过程可能不会遵守你指定的超时。因此,在使用这些功能时,你应该始终尝试使用他们的 API 来指定超时时间。例如,当使用 Guzzle 时,你应该总是指定连接和请求超时值。
注意
必须安装 PHPpcntl
扩展才能指定任务超时。此外,任务的「超时」值应始终小于其「任务到期」值。否则,可能在任务实际完成执行或超时前,任务就会被重试。
超时失败
如果你希望在超时时将任务标记为 failed,可以在任务类上定义 $failOnTimeout
属性:
/**
* 标示是否应在超时时标记为失败
*
* @var bool
*/
public $failOnTimeout = true;
错误处理
如果在处理任务时抛出异常,任务将自动被释放回队列,以便稍后再尝试。只要应用程序允许,任务将继续发布,直到尝试达到你的应用程序允许的最大次数为止。最大尝试次数由 queue:work
Artisan 命令上使用的 --tries
开关定义。或者,最大尝试次数可以在任务类本身上定义。有关运行队列工作器的更多信息可以在下面找到。
手动释放任务
有时你可能希望手动将任务释放回队列,以便稍后再次尝试。你可以通过调用 release
方法来实现这一点:
/**
* 执行任务
*/
public function handle(): void
{
// ...
$this->release();
}
默认情况下,release
方法会将任务立即释放回队列。但是,你可以通过向 release
方法传递一个整数或日期实例来指示队列,使任务在指定的秒数之后才能进行处理:
$this->release(10);
$this->release(now()->addSeconds(10));
手动标记任务失败
有时你可能需要手动将任务标记为「失败」。为此,你可以调用 fail
方法:
/**
* 执行作业
*/
public function handle(): void
{
// ...
$this->fail();
}
如果你捕获了一个异常,你想直接将你的任务标记为失败,你可以将异常传递给 fail
方法。 或者,为方便起见,你可以传递一个字符串来表示错误异常信息:
$this->fail($exception);
$this->fail('Something went wrong.');
技巧
有关失败任务的更多信息,请查看 处理任务失败的文档。
任务批处理
Laravel 的任务批处理功能允许你轻松执行一批作业,然后在任务批次完成执行后执行某些操作。在开始之前,你应该创建一个数据库迁移来构建一个表,该表将包含有关任务批次的元信息,例如它们的完成百分比。可以使用 make:queue-batches-table
Artisan 命令来生成这个迁移:
php artisan make:queue-batches-table
php artisan migrate
定义可批处理任务
要定义可批处理的任务,你应该像平常一样创建一个可排队的任务 ;但是,你应该将 Illuminate\Bus\Batchable
trait 添加到任务类。这个 trait 提供了一个 batch
方法,可用于检索任务正在执行的当前批次:
<?php
namespace App\Jobs;
use Illuminate\Bus\Batchable;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;
class ImportCsv implements ShouldQueue
{
use Batchable, Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
/**
* 执行任务
*/
public function handle(): void
{
if ($this->batch()->cancelled()) {
// 确定批次是否已取消...
return;
}
// 导入 CSV 文件的一部分...
}
}
调度批处理任务
要调度一批作业,你应该使用 Bus
facade 的 batch
方法。当然,批处理主要在与完成回调结合使用时才有用。因此,你可以使用 then
、catch
和 finally
方法为批处理定义完成回调。这些回调在被调用时都将接收到一个 Illuminate\Bus\Batch
实例。在这个例子中,我们假设我们正在排队一批任务,每个任务处理 CSV 文件中给定数量的行:
use App\Jobs\ImportCsv;
use Illuminate\Bus\Batch;
use Illuminate\Support\Facades\Bus;
use Throwable;
$batch = Bus::batch([
new ImportCsv(1, 100),
new ImportCsv(101, 200),
new ImportCsv(201, 300),
new ImportCsv(301, 400),
new ImportCsv(401, 500),
])->before(function (Batch $batch) {
// 批处理已创建但尚未添加任何作业...
})->progress(function (Batch $batch) {
// 单个作业已成功完成..
})->then(function (Batch $batch) {
// 所有作业都已成功完成...
})->catch(function (Batch $batch, Throwable $e) {
// 检测到第一个批处理作业失败...
})->finally(function (Batch $batch) {
// 批处理执行完毕...
})->dispatch();
return $batch->id;
批处理的 ID 可以通过 $batch->id
属性访问,可用于 查询 Laravel 命令总线 以获取有关批次分派后的信息。
注意
由于批处理回调会被序列化并在稍后由 Laravel 队列执行,所以你不应该在回调中使用$this
变量。此外,由于批处理作业是封装在数据库事务中的,所以不应该在作业中执行触发隐式提交的数据库语句。
命名批处理
部分工具,如 Laravel Horizon 和 Laravel Telescope 可能会为命名的批处理提供更友好的调试信息。为了给批处理分配一个任意名称,你可以在定义批处理时调用 name
方法:
$batch = Bus::batch([
// ...
])->then(function (Batch $batch) {
// 所有任务都已成功完成...
})->name('Import CSV')->dispatch();
批处理连接 & 队列
如果你想指定应用于批处理任务的连接和队列,你可以使用 onConnection
和 onQueue
方法。 所有批处理任务必须在相同的连接和队列中执行:
$batch = Bus::batch([
// ...
])->then(function (Batch $batch) {
// 所有任务均已成功完成...
})->onConnection('redis')->onQueue('imports')->dispatch();
链式 & 批处理
你可以将一组链式任务定义在一个批处理中,方法是将链接的任务放在一个数组内。例如,我们可以并行执行两个任务链,并在两个任务链都完成处理后执行一个回调:
use App\Jobs\ReleasePodcast;
use App\Jobs\SendPodcastReleaseNotification;
use Illuminate\Bus\Batch;
use Illuminate\Support\Facades\Bus;
Bus::batch([
[
new ReleasePodcast(1),
new SendPodcastReleaseNotification(1),
],
[
new ReleasePodcast(2),
new SendPodcastReleaseNotification(2),
],
])->then(function (Batch $batch) {
// ...
})->dispatch();
相反,你可以通过在链中定义批处理来在一个链 中运行一批任务。例如,你可以先运行一批任务来发布多个播客,然后运行一批任务来发送发布通知:
use App\Jobs\FlushPodcastCache;
use App\Jobs\ReleasePodcast;
use App\Jobs\SendPodcastReleaseNotification;
use Illuminate\Support\Facades\Bus;
Bus::chain([
new FlushPodcastCache,
Bus::batch([
new ReleasePodcast(1),
new ReleasePodcast(2),
]),
Bus::batch([
new SendPodcastReleaseNotification(1),
new SendPodcastReleaseNotification(2),
]),
])->dispatch();
批量添加任务
有些时候,批量向批处理中添加任务可能很有用。当你需要批量处理数千个任务时,这种模式非常好用,而这些任务在 Web 请求期间可能需要很长时间才能调度。因此,你可能希望调度初始批次的「加载器」任务,这些任务与更多任务相结合:
$batch = Bus::batch([
new LoadImportBatch,
new LoadImportBatch,
new LoadImportBatch,
])->then(function (Batch $batch) {
// All jobs completed successfully...
})->name('Import Contacts')->dispatch();
在这个例子中,我们将使用 LoadImportBatch
任务为批处理添加其他任务。为此,我们可以对批处理实例使用 add
方法,该方法可以通过任务的 batch
方法访问:
use App\Jobs\ImportContacts;
use Illuminate\Support\Collection;
/**
* 执行任务
*/
public function handle(): void
{
if ($this->batch()->cancelled()) {
return;
}
$this->batch()->add(Collection::times(1000, function () {
return new ImportContacts;
}));
}
注意
你只能将任务添加到当前任务所属的批处理中。
校验批处理
为批处理完成后提供回调的 Illuminate\Bus\Batch
实例中具有多种属性和方法,可以帮助你与指定的批处理业务进行交互和检查:
// 批处理的UUID..
$batch->id;
// 批处理的名称(如果已经设置的话)...
$batch->name;
// 分配给批处理的任务数量...
$batch->totalJobs;
// 队列还没处理的任务数量...
$batch->pendingJobs;
// 失败的任务数量...
$batch->failedJobs;
// 到目前为止已经处理的任务数量...
$batch->processedJobs();
// 批处理已经完成的百分比(0-100)...
$batch->progress();
// 批处理是否已经完成执行...
$batch->finished();
// 取消批处理的运行...
$batch->cancel();
// 批处理是否已经取消...
$batch->cancelled();
从路由返回批次
所有 Illuminate\Bus\Batch
实例都是 JSON 可序列化的,这意味着你可以直接从应用程序的一个路由返回它们,以检索包含有关批处理的信息的 JSON 有效负载,包括其完成进度。这样可以方便地在应用程序的 UI 中显示有关批处理完成进度的信息。
要通过 ID 检索批次,你可以使用 Bus
facade 的 findBatch
方法:
use Illuminate\Support\Facades\Bus;
use Illuminate\Support\Facades\Route;
Route::get('/batch/{batchId}', function (string $batchId) {
return Bus::findBatch($batchId);
});
取消批次
有时你可能需要取消给定批处理的执行。这可以通过调用 Illuminate\Bus\Batch
实例的 cancel
方法来完成:
/**
* 执行任务。
*/
public function handle(): void
{
if ($this->user->exceedsImportLimit()) {
return $this->batch()->cancel();
}
if ($this->batch()->cancelled()) {
return;
}
}
正如你在前面的示例中可能已经注意到的那样,批处理任务通常应在继续执行之前确定其对应的批处理是否已被取消。但为了方便起见,你也可以为任务分配 SkipIfBatchCancelled
中间件。顾名思义,这个中间件会指示 Laravel,如果其对应的批处理已被取消,则不处理任务:
use Illuminate\Queue\Middleware\SkipIfBatchCancelled;
/**
* 获取任务应通过的中间件。
*/
public function middleware(): array
{
return [new SkipIfBatchCancelled];
}
批处理失败
当批处理任务失败时,将调用 catch
回调(如果已分配)。此回调仅针对批处理中失败的第一个任务调用。
允许失败
当批处理中的任务失败时,Laravel 会自动将批处理标记为「已取消」。如果你愿意,你可以禁用此行为,以便任务失败不会自动将批处理标记为已取消。这可以通过在调度批处理时调用 allowFailures
方法来实现:
$batch = Bus::batch([
// ...
])->then(function (Batch $batch) {
// 所有任务均已成功完成...
})->allowFailures()->dispatch();
重试失败的批处理任务
为了方便起见,Laravel 提供了一个 queue:retry-batch
Artisan 命令,允许你轻松地重试给定批处理的所有失败作业。queue:retry-batch
命令接受要重试其失败任务的批处理的 UUID:
php artisan queue:retry-batch 32dbc76c-4f82-4749-b610-a639fe0099b5
修剪批处理
如果不进行修剪,job_batches
表可以非常快速地积累记录。为了缓解这种情况,你应该 schedule queue:prune-batches
Artisan 命令每天运行:
use Illuminate\Support\Facades\Schedule;
Schedule::command('queue:prune-batches')->daily();
默认情况下,所有已完成且超过 24 小时的批处理都会被修剪。在调用命令时你可以使用 hours
选项来决定保留批处理数据的时间长短。例如,以下命令将删除所有 48 小时前完成的批处理:
use Illuminate\Support\Facades\Schedule;
Schedule::command('queue:prune-batches --hours=48')->daily();
有时,你的 jobs_batches
表可能会积累从未成功完成的批处理记录,例如未成功重试的失败任务的批处理。你可以指示 queue:prune-batches
命令使用 unfinished
选项来修剪这些未完成的批处理记录:
use Illuminate\Support\Facades\Schedule;
Schedule::command('queue:prune-batches --hours=48 --unfinished=72')->daily();
同样,你的 jobs_batches
表也可能积累已取消批处理的批处理记录。你可以指示 queue:prune-batches
命令使用 cancelled
选项来修剪这些已取消批处理记录:
use Illuminate\Support\Facades\Schedule;
Schedule::command('queue:prune-batches --hours=48 --cancelled=72')->daily();
在 DynamoDB 中存储批处理
Laravel 还支持在 DynamoDB 中存储批处理元信息,而不是在关系数据库中。不过,你需要手动创建一个 DynamoDB 表来存储所有的批处理记录。
通常情况下,此表应命名为 job_batches
,但你应基于应用程序 queue
配置文件中的 queue.batching.table
配置值来命名此表。
DynamoDB 批处理表配置
job_batches
表应具有一个名为 application
的字符串主分区键和一个名为 id
的字符串主排序键。键中的 application
部分将包含你的应用程序名称,该名称由应用程序 app
配置文件中的 name
配置值定义。由于应用程序名称是 DynamoDB 表键的一部分,因此可以使用同一张表来存储多个 Laravel 应用程序的任务批处理。
此外,如果你希望利用 自动批处理修剪 功能,你可以为表定义 ttl
属性。
DynamoDB 配置
接下来,安装 AWS SDK,以便你的 Laravel 应用程序可以与 Amazon DynamoDB 通信:
composer require aws/aws-sdk-php
然后,将 queue.batching.driver
配置选项的值设置为 dynamodb
。此外,你应在 batching
配置数组中定义 key
、secret
和 region
配置选项。这些选项将用于认证 AWS。使用 dynamodb
驱动时不需要 queue.batching.database
配置选项:
'batching' => [
'driver' => env('QUEUE_FAILED_DRIVER', 'dynamodb'),
'key' => env('AWS_ACCESS_KEY_ID'),
'secret' => env('AWS_SECRET_ACCESS_KEY'),
'region' => env('AWS_DEFAULT_REGION', 'us-east-1'),
'table' => 'job_batches',
],
在 DynamoDB 中修剪批次
在使用 DynamoDB 存储作业批次信息时,常规用于修剪关系数据库中存储批次的命令将无法适用。相反,你以利用 DynamoDB 内置的 TTL 功能 来自动删除旧批次的记录。
如果你在 DynamoDB 表中定义了 ttl
属性,你可以定义配置参数指导 Laravel 如何修剪批次记录。配置值 queue.batching.ttl_attribute
定义持有 TTL 的属性名称,而 queue.batching.ttl
配置值定义批次记录可以从 DynamoDB 表中删除的秒数,相对于记录最后一次更新的时间:
'batching' => [
'driver' => env('QUEUE_FAILED_DRIVER', 'dynamodb'),
'key' => env('AWS_ACCESS_KEY_ID'),
'secret' => env('AWS_SECRET_ACCESS_KEY'),
'region' => env('AWS_DEFAULT_REGION', 'us-east-1'),
'table' => 'job_batches',
'ttl_attribute' => 'ttl',
'ttl' => 60 * 60 * 24 * 7, // 7 days...
],
队列闭包
除了发送任务类到队列外,你还可以发送闭包。这对于需要在当前请求周期之外执行的快速、简单的任务非常有用。当将闭包发送到队列时,闭包的代码内容是加密签名的,防止在传输中被修改:
$podcast = App\Podcast::find(1);
dispatch(function () use ($podcast) {
$podcast->publish();
});
使用 catch
方法,你可以提供一个闭包,该闭包在队列闭包在用完队列的配置重试次数后未能成功完成时执行:
use Throwable;
dispatch(function () use ($podcast) {
$podcast->publish();
})->catch(function (Throwable $e) {
// This job has failed...
});
注意
由于catch
回调由 Laravel 队列稍后序列化并执行,因此你不应在catch
回调中使用$this
变量
Running the Queue Worker
运行队列工作者
<code>queue:work</code> 命令
Laravel 包含了一个 Artisan 命令,可以启动一个队列进程并处理新推送到队列的作业。你可以使用 queue:work
Artisan 命令运行任务进程。请注意一旦 queue:work
命令启动,它将持续运行直到被手动停止或关闭终端:
php artisan queue:work
技巧
为了让queue:work
进程在后台永久运行,你应该使用进程监控工具如 Supervisor来确保队列任务进程不会停止运行。
如果你希望处理的任务 ID 包含在命令的输出中,则可以在调用 queue:work
命令时包含 -v
标志:
php artisan queue:work -v
请记住,队列任务工作者是长期存在的进程,并并将启动的应用程序状态存储在内存中。因此,它们在启动后不会注意到代码库的变化。因此,在部署过程中,记得重启你的任务队列进程。此外,记住应用程序创建或修改的任何静态状态都不会自动在任务之间重置。
或者,你可以运行 queue:listen
命令。使用 queue:listen
命令时,当你想重新加载已更新的代码或重置应用状态时,不需要手动重启工作进程;但是,与 queue:work
命令相比,该命令效率要低得多:
php artisan queue:listen
运行多个队列进程
要将多个 worker 分配到一个队列并同时处理任务,你应该简单地启动多个 queue:work 进程。 这可以通过终端中的多个选项卡在本地完成,也可以使用流程管理器的配置设置在生产环境中完成。 使用 Supervisor 时,你可以使用 numprocs 配置值。
要给队列分配多个工作进程并同时处理任务,你只需启动多个 queue:work
进程即可。在本地可以通过在终端中打开多个标签页来完成,在生产环境中可以使用你的进程管理器的配置设置完成。在使用 Supervisor 时,你可以使用 numprocs
配置值。
指定连接 & 队列
你还可以指定工作进程应使用的队列连接。传递给 work
命令的连接名称应对应于你的 config/queue.php
配置文件中定义的连接之一:
php artisan queue:work redis
默认情况下,queue:work
命令仅处理给定连接上的默认队列的任务。但是,你可以通过仅处理给定连接上的特定队列来进一步自定义队列工作者。例如,如果你的所有电子邮件都在你的 redis
队列连接的 emails
队列中处理,那么你可以发出以下命令来启动一个只处理该队列的工作进程:
php artisan queue:work redis --queue=emails
处理指定数量的任务
--once
选项可用于指定进程仅处理队列中的单个任务
php artisan queue:work --once
--max-jobs
选项可以用于指示工作进程处理给定数量的任务然后退出。结合 Supervisor使用时,这个选项可能很有用,因为你的工作进程会在处理给定数量的作业后自动重启,释放可能积累的任何内存:
php artisan queue:work --max-jobs=1000
处理所有排队的任务然后退出
--stop-when-empty
选项可以用于指示进程处理所有任务然后优雅地退出。当在 Docker 容器中处理 Laravel 队列时,如果你希望在队列为空后关闭容器,这个选项可能会很有用:
php artisan queue:work --stop-when-empty
在给定的秒数内处理任务
--max-time
选项可用于指示进程给定的秒数内处理任务。结合 Supervisor 使用时,这个选项可能很有用,因为你的进程会在处理作业指定时间后自动重启,释放可能积累的任何内存:
# 处理进程一小时,然后退出...
php artisan queue:work --max-time=3600
进程睡眠时间
当队列中有任务可用时,程将继续处理任务,而不会在它们之间产生延迟。然而,如果队列上没有可用的任务,sleep
选项将决定进程「休眠」 多少秒。当然,在休眠时,进程不会处理任何新任务:
php artisan queue:work --sleep=3
维护模式 & 队列
当应用程序处于 维护模式 时,不会处理任何队列任务。一旦应用程序退出维护模式,任务将继续像往常一样处理。
如果你希望在维护模式启用时强制队列进程处理任务,你可以使用 --force
选项:
php artisan queue:work --force
资源注意事项
守护进程队列在处理每个任务前并不会「重启」框架。因此,在每个任务完成后,应当释放任何占用的大量资源。例如,如果你使用 GD 库进行图像处理,完成图像处理后,应通过调用 imagedestroy
来释放内存,
队列优先级
有时你可能希望优先处理队列的处理方式。例如,在 config/queue.php
配置文件中,你可能会将 redis
连接的默认 queue
设置为 low
。但是,偶尔你可能希望将任务推送到 high
优先级队列,如下所示:
dispatch((new Job)->onQueue('high'));
要启动一个进程以确保所有 high
队列任务在继续执行 low
队列上的任何任务之前都得到处理,请请将队列名称的逗号分隔列表传递给 work
命令:
php artisan queue:work --queue=high,low
队列进程 & 部署
由于队列任务是长期存在的进程,如果不重新启动,他们不会注意到代码的更改。因此,使用队列任务部署应用程序的最简单方法是在部署过程中重新启动任务。你可以通过发出 queue:restart
命令优雅地重新启动所有进程:
php artisan queue:restart
此命令将指示所有队列进程在处理完当前任务后正常退出,以免丢失现有任务。由于队列任务将在执行queue:restart
命令时退出,你应该运行诸如 Supervisor之类的进程管理器来自动重新启动队列任务。
注意
队列使用 cache 来存储重启信号,因此你应该在使用此功能之前验证是否为你的应用程序正确配置了缓存驱动程序
任务到期 & 超时
任务到期
在 config/queue.php
配置文件中,每个队列连接都定义了一个 retry_after
选项。该选项指定队列连接在重试正在处理的作业之前应该等待多少秒。例如,如果 retry_after
的值设置为 90
,如果作业已经处理了 90 秒而没有被释放或删除,则该作业将被释放回队列。通常,你应该将 retry_after
值设置为作业完成处理所需的最大秒数。
警告
唯一不包含retry_after
值的队列连接是 Amazon SQS。SQS 将根据 AWS 控制台内管理的 默认可见性超时 重试作业。
进程超时
queue:work
Artisan 命令暴露了一个 --timeout
选项。默认情况下,--timeout
值为 60 秒。如果一个任务的处理时间超过了超时值指定的秒数,处理该任务的进程将会出错退出。通常,进程会被配置在你服务器上的 进程管理器 自动重启:
php artisan queue:work --timeout=60
retry_after
配置选项和 --timeout
CLI 选项是不同的,但它们协同工作以确保任务不会丢失并且任务仅成功处理一次。
警告
--timeout
值应始终至少比retry_after
配置值短几秒钟。这将确保处理冻结任务的进程在任务被重试之前总是被终止。如果你的--timeout
选项比retry_after
配置值长,你的任务可能会被处理两次。
Supervisor 配置
在生产中,你需要一种方法来保持 queue:work
进程运行。 queue:work
进程可能会因多种原因停止运行,例如超过 worker 超时或执行 queue:restart
命令。
在生产环境中,你需要一种方法来保持你的 queue:work
进程运行。queue:work
进程可能会因各种原因停止运行,例如超过进程超时或执行 queue:restart
命令。
因此,你需要配置一个进程监视器,可以检测你的 queue:work
进程何时退出并自动重启它们。此外,进程监视器可以让你指定希望同时运行多少个 queue:work
进程。Supervisor 是 Linux 环境中常用的进程监视器,我们将在以下文档中讨论如何配置它。
Worker 超时
The queue:work
Artisan 命令提供了一个 --timeout
选项。默认情况下, --timeout
v值为 60 秒。如果一个任务处理时间超过了超时值指定的秒数,处理该任务的 worker 将以错误退出。 通常,worker 会被 在你服务器上配置的进程管理器自动重启:
php artisan queue:work --timeout=60
retry_after
配置选项和 --timeout
CLI 选项是不同的,但它们协同工作以确保任务不会丢失,并且任务只被成功处理一次。
警告
--timeout
值应该始终比你的retry_after
配置值至少短几秒。这将确保处理冻结任务的 worker 在任务重试之前总是被终止。如果你的--timeout
选项长于你的retry_after
配置值,你的任务可能会被处理两次。
Supervisor 配置
在生产环境中,你需要一种方法来保持你的 queue:work
进程持续运行。 queue:work
进程可能由于各种原因停止运行,比如超过了 worker 超时时间或执行了 queue:restart
命令。
因此,你需要配置一个进程监视器,它能够检测到 queue:work
进程何时退出并自动重启它们。此外,进程监视器还可以让你指定想要同时运行多少个 queue:work
进程。Supervisor 是一个在 Linux 环境中常用的进程监视器,我们将在接下来的文档中讨论如何配置它。
安装 Supervisor
Supervisor 是一个用于 Linux 操作系统的进程监视器,它会在你的 queue:work
进程失败时自动重启它们。要在 Ubuntu 上安装 Supervisor,你可以使用以下命令:
sudo apt-get install supervisor
提示
如果自己配置和管理 Supervisor 感觉有点力不从心,可以考虑使用 Laravel Forge,它会为你的生产环境 Laravel 项目自动安装和配置 Supervisor。
配置 Supervisor
Supervisor 配置文件通常存储在 /etc/supervisor/conf.d
目录中。在这个目录中,你可以创建任意数量的配置文件,指示 supervisor 如何监控你的进程。例如,让我们创建一个 laravel-worker.conf
文件,用于启动和监控 queue:work
进程:
[program:laravel-worker]
process_name=%(program_name)s_%(process_num)02d
command=php /home/forge/app.com/artisan queue:work sqs --sleep=3 --tries=3 --max-time=3600
autostart=true
autorestart=true
stopasgroup=true
killasgroup=true
user=forge
numprocs=8
redirect_stderr=true
stdout_logfile=/home/forge/app.com/worker.log
stopwaitsecs=3600
在这个例子中, numprocs
指令将指示 Supervisor 运行 8 个 queue:work
进程并监控所有这些进程,如果它们失败则自动重启。你应该修改配置的 command
指令以反映你想要的队列连接和 worker 选项。
警告
你应该确保stopwaitsecs
的值大于你最长运行的任务所消耗的秒数。否则,Supervisor 可能会在任务处理完成之前终止它。
启动 Supervisor
创建配置文件后,你可以使用以下命令更新 Supervisor 配置并启动进程:
sudo supervisorctl reread
sudo supervisorctl update
sudo supervisorctl start "laravel-worker:*"
有关 Supervisor 的更多信息,请查阅 Supervisor 文档。
处理失败的任务
有时候你的队列任务可能会失败。别担心,事情不总是按计划进行!Laravel 提供了一种方便的方式来指定任务尝试的最大次数。在一个异步任务尝试次数超过这个数量后,它将被插入到 failed_jobs
数据库表中。同步调度任务 如果失败了,则不会存储在这个表中,它们的异常会立即被应用程序处理。
在新的 Laravel 应用程序中通常会预设一个用于创建 failed_jobs
表的迁移。然而,如果你的应用程序没有包含这个表的迁移,你可以使用 make:queue-failed-table
命令来创建迁移:
php artisan make:queue-failed-table
php artisan migrate
当运行一个 queue worker 进程时,你可以使用 queue:work
命令的 --tries
开关来指定任务尝试的最大次数。如果你没有为 --tries
选项指定一个值,则任务将仅尝试一次或与任务类的 $tries
属性指定的次数相同:
php artisan queue:work redis --tries=3
使用 --backoff
选项,你可以指定 Laravel 在重试遇到异常的任务之前应该等待多少秒。默认情况下,任务会立即释放回队列,以便可以再次尝试:
php artisan queue:work redis --tries=3 --backoff=3
如果你想配置 Laravel 在重试每个任务遇到异常的任务之前应该等待多少秒,你可以通过在你的任务类上定义一个 backoff
属性来实现:
/**
* 重试任务前等待的秒数
*
* @var int
*/
public $backoff = 3;
如果你需要更复杂的逻辑来确定任务的退避时间,你可以在你的任务类上定义一个 backoff
方法:
/**
* 计算重试任务之前要等待的秒数
*/
public function backoff(): int
{
return 3;
}
你可以通过从 backoff
方法返回一组退避值来轻松配置 「exponential」 退避。在此示例中,第一次重试的重试延迟为 1 秒,第二次重试为 5 秒,第三次重试为 10 秒,如果后续有更多重试,则重试延迟为10秒:
/**
* 计算重试任务之前要等待的秒数
*
* @return array<int, int>
*/
public function backoff(): array
{
return [1, 5, 10];
}
任务失败后清理
当特定任务失败时,你可能希望向用户发送警报或恢复该任务部分完成的任何操作。为此,你可以在任务类上定义一个 failed
方法。导致作业失败的 Throwable
实例将被传递给 failed
方法:
<?php
namespace App\Jobs;
use App\Models\Podcast;
use App\Services\AudioProcessor;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;
use Throwable;
class ProcessPodcast implements ShouldQueue
{
use InteractsWithQueue, Queueable, SerializesModels;
/**
* 创建新任务实例
*/
public function __construct(
public Podcast $podcast,
) {}
/**
* 执行任务
*/
public function handle(AudioProcessor $processor): void
{
// 理上传的播客...
}
/**
* 处理失败作业
*/
public function failed(?Throwable $exception): void
{
// 向用户发送失败通知等...
}
}
注意
调用failed
方法之前前会实例化任务的新实例;因此,在handle
方法中可能发生的任何类属性修改都将丢失。
重试失败的任务
要查看已插入到你的 failed_jobs
数据库表中的所有失败任务,你可以使用 queue:failed
Artisan 命令:
php artisan queue:failed
queue:failed
命令会列出任务 ID、连接、队列、失败时间以及关于任务的其他信息。可以使用任务 ID 来重试失败的任务。例如,要重试一个 ID 为 ce7bb17c-cdd8-41f0-a8ec-7b4fef4e5ece
的失败任务,请执行以下命令:
php artisan queue:retry ce7bb17c-cdd8-41f0-a8ec-7b4fef4e5ece
如有必要,可以向命令传递多个 ID:
php artisan queue:retry ce7bb17c-cdd8-41f0-a8ec-7b4fef4e5ece 91401d2c-0784-4f43-824c-34f94a33c24d
还可以重试指定队列的所有失败任务:
php artisan queue:retry --queue=name
重试所有失败任务,可以执行 queue:retry
命令,并将 all
作为 ID 传递:
php artisan queue:retry all
如果要删除指定的失败任务,可以使用 queue:forget
命令:
php artisan queue:forget 91401d2c-0784-4f43-824c-34f94a33c24d
技巧
使用 Horizon 时,应该使用Horizon:forget
命令来删除失败任务,而不是queue:forget
命令。
删除 failed_jobs
表中所有失败任务,可以使用 queue:flush
命令:
php artisan queue:flush
忽略缺失的模型
在将 Eloquent 模型注入到任务中时,这个模型会在放置到队列之前自动序列化,并在任务处理时重新从数据库获取。然而,如果模型在任务等待消费时被删除了,则任务可能会失败,抛出 ModelNotFoundException
异常。
为了方便起见,你可以通过将任务的 deleteWhenMissingModels
属性设置为 true
来选择自动删除缺失模型的任务。当这个属性设置为 true
时,Laravel 会安静地丢弃任务,而不会引发异常:
/**
* 如果任务的模型不存在,则删除该任务
*
* @var bool
*/
public $deleteWhenMissingModels = true;
删除失败的任务
你可以通过调用 queue:prune-failed
Artisan 命令删除应用程序的 failed_jobs
表中的所有记录:
php artisan queue:prune-failed
默认情况下,将删除所有超过 24 小时的失败任务记录,如果为命令提供 --hours
选项,则仅保留在过去 N 小时内插入的失败任务记录。例如,以下命令将删除超过 48 小时前插入的所有失败任务记录:
php artisan queue:prune-failed --hours=48
在 DynamoDB 中存储失败的任务
Laravel 也支持在 DynamoDB 中而非关系型数据库表中存储失败作业的记录。然而,你必须手动创建一个 DynamoDB 表来存储所有的失败作业记录。通常,这个表应该被命名为 failed_jobs
,但你应该根据应用程序的 queue
配置文件中 queue.failed.table
配置值来命名表。
failed_jobs
表应该有一个名为 application
的字符串主分区键,以及一个名为 uuid
的字符串主排序键。键的 application
部分将包含你的应用程序名称,该名称由应用程序的 app
配置文件中的 name
配置值定义。由于应用程序名称是 DynamoDB 表键的一部分,你可以使用同一个表来存储多个 Laravel 应用程序的失败任务。
此外,请确保你安装了 AWS SDK 以便你的 Laravel 应用程序可以与 Amazon DynamoDB 通信:
composer require aws/aws-sdk-php
接下来,将 queue.failed.driver
配置选项的值设置为 dynamodb
。此外,你应该在失败任务配置数组中定义 key
、secret
和 region
配置选项。这些选项将被用于与 AWS 进行认证。使用 dynamodb
驱动时,queue.failed.database
配置选项不是必须的:
'failed' => [
'driver' => env('QUEUE_FAILED_DRIVER', 'dynamodb'),
'key' => env('AWS_ACCESS_KEY_ID'),
'secret' => env('AWS_SECRET_ACCESS_KEY'),
'region' => env('AWS_DEFAULT_REGION', 'us-east-1'),
'table' => 'failed_jobs',
],
禁用失败任务的存储
你可以通过将 queue.failed.driver
配置选项的值设置为 null
来指示 Laravel 不存储失败的任务。通常,这通过 QUEUE_FAILED_DRIVER
环境变量来完成:
QUEUE_FAILED_DRIVER=null
失败任务事件
如果你想注册一个在任务失败时调用的事件监听器,你可以使用 Queue
facade 的 failing
方法。例如,我们可以在 Laravel 中包含的 AppServiceProvider
的 boot
方法中附加一个闭包到这个事件:
<?php
namespace App\Providers;
use Illuminate\Support\Facades\Queue;
use Illuminate\Support\ServiceProvider;
use Illuminate\Queue\Events\JobFailed;
class AppServiceProvider extends ServiceProvider
{
/**
* 注册任何应用程序服务
*/
public function register(): void
{
// ...
}
/**
* 引导任何应用程序服务
*/
public function boot(): void
{
Queue::failing(function (JobFailed $event) {
// $event->connectionName
// $event->job
// $event->exception
});
}
}
从队列中清除任务
技巧
使用 Horizon 时,应使用horizon:clear
命令从队列中清除作业,而不是使用queue:clear
命令。
如果你想从默认连接的默认队列中删除所有任务,你可以使用 queue:clear
Artisan 命令来执行此操作:
php artisan queue:clear
你还可以提供 connection
参数和 queue
选项以从特定连接和队列中删除任务:
php artisan queue:clear redis --queue=emails
注意
从队列中清除任务仅适用于 SQS、Redis 和数据库队列驱动程序。 此外,SQS 消息删除过程最多需要 60 秒,因此在你清除队列后 60 秒内发送到 SQS 队列的任务也可能会被删除。
监控你的队列
如果你的队列突然涌入了大量的任务,它会导致队列任务繁重,从而增加了任务的完成时间,想你所想, Laravel 可以在队列执行超过设定的阈值时候提醒你。
首先, 你应该每分钟运行一次 queue:monitor
命令。这个命令可以设定任务的名称,以及你想要设定的任务数量阈值:
php artisan queue:monitor redis:default,redis:deployments --max=100
仅仅调度此命令并不足以触发通知提醒你队列状态不堪重负。当命令遇到任务数量超过你的阈值的队列时,将会触发 Illuminate\Queue\Events\QueueBusy
事件。你可以在应用程序的 AppServiceProvider
中监听这个事件来向你或你的开发团队发送通知:
use App\Notifications\QueueHasLongWaitTime;
use Illuminate\Queue\Events\QueueBusy;
use Illuminate\Support\Facades\Event;
use Illuminate\Support\Facades\Notification;
/**
* 启动任何应用程序服务
*/
public function boot(): void
{
Event::listen(function (QueueBusy $event) {
Notification::route('mail', '[email protected]')
->notify(new QueueHasLongWaitTime(
$event->connection,
$event->queue,
$event->size
));
});
}
测试
在测试调度任务的代码时,你可能希望指示 Laravel 实际上不执行任务本身,因为任务的代码可以直接单独地测试,而不是与调度它的代码一起测试。当然,为了测试任务本身,你可以在测试中实例化一个任务实例并直接调用 handle
方法。
你可以使用 Queue
facade 的 fake
方法来阻止排队的任务被实际推送到队列。在调用了 Queue
facade 的 fake
方法之后,你可以断言应用程序试图推送任务到队列:
<?php
//译者注:Pest 示例
use App\Jobs\AnotherJob;
use App\Jobs\FinalJob;
use App\Jobs\ShipOrder;
use Illuminate\Support\Facades\Queue;
test('orders can be shipped', function () {
Queue::fake();
// 执行订单发货...
// 断言没有任务被推送......
Queue::assertNothingPushed();
// 断言一个任务被推送到一个给定的队列...
Queue::assertPushedOn('queue-name', ShipOrder::class);
// 断言任务被推了两次...
Queue::assertPushed(ShipOrder::class, 2);
// 断言任务没有被推送...
Queue::assertNotPushed(AnotherJob::class);
// 断言闭包被推送到队列中...
Queue::assertClosurePushed();
// 断言推送的作业总数…
Queue::assertCount(3);
});
<?php
//译者注:PHPUnit 示例
namespace Tests\Feature;
use App\Jobs\AnotherJob;
use App\Jobs\FinalJob;
use App\Jobs\ShipOrder;
use Illuminate\Support\Facades\Queue;
use Tests\TestCase;
class ExampleTest extends TestCase
{
public function test_orders_can_be_shipped(): void
{
Queue::fake();
// 执行订单发货...
// 断言没有任务被推送......
Queue::assertNothingPushed();
// 断言一个任务被推送到一个给定的队列...
Queue::assertPushedOn('queue-name', ShipOrder::class);
// 断言任务被推了两次...
Queue::assertPushed(ShipOrder::class, 2);
// 断言任务没有被推送...
Queue::assertNotPushed(AnotherJob::class);
// 断言闭包被推送到队列中...
Queue::assertClosurePushed();
// 断言推送的作业总数…
Queue::assertCount(3);
}
}
你可以将一个闭包传递给 assertPushed
或 assertNotPushed
方法来断言推送了一个通过特定「真实性测试」的任务。如果至少有一个推送的任务通过了给定的真实测试,则断言将成功:
Queue::assertPushed(function (ShipOrder $job) use ($order) {
return $job->order->id === $order->id;
});
伪造任务的一个子集
如果你只需要伪造特定的任务,同时允许你的其他任务正常执行,你可以将应该伪造的任务的类名传递给 fake
方法:
//译者注:Pest 示例
test('orders can be shipped', function () {
Queue::fake([
ShipOrder::class,
]);
// 执行订单发货...
// 断言任务被推了两次......
Queue::assertPushed(ShipOrder::class, 2);
});
//译者注:PHPUnit 示例
public function test_orders_can_be_shipped(): void
{
Queue::fake([
ShipOrder::class,
]);
// 执行订单发货...
// 断言任务被推了两次......
Queue::assertPushed(ShipOrder::class, 2);
}
你可以使用 except
方法伪造除一组指定任务之外的所有任务:
Queue::fake()->except([
ShipOrder::class,
]);
测试任务链
要测试任务链,你需要使用 Bus
facade 的伪造功能。可以使用 Bus
facade 的 assertChained
方法来断言任务链 已经被分派。assertChained
方法接受一个链式任务数组作为其第一个参数:
use App\Jobs\RecordShipment;
use App\Jobs\ShipOrder;
use App\Jobs\UpdateInventory;
use Illuminate\Support\Facades\Bus;
Bus::fake();
// ...
Bus::assertChained([
ShipOrder::class,
RecordShipment::class,
UpdateInventory::class
]);
正如你在上面的示例中看到的,链式任务数组可能是任务类名称的数组。 但是,你也可以提供一组实际的任务实例。 这样做时,Laravel 将确保任务实例属于同一类,并且与你的应用程序调度的链式任务具有相同的属性值:
Bus::assertChained([
new ShipOrder,
new RecordShipment,
new UpdateInventory,
]);
你可以使用 assertDispatchedWithoutChain
方法来断言一个任务是在没有任务链的情况下被推送的:
Bus::assertDispatchedWithoutChain(ShipOrder::class);
测试任务链更改
如果链式任务将任务添加到现有链中,你可以使用任务的 assertHasChain
方法来断言任务具有预期的剩余任务链:
$job = new ProcessPodcast;
$job->handle();
$job->assertHasChain([
new TranscribePodcast,
new OptimizePodcast,
new ReleasePodcast,
]);
可以使用 assertDoesntHaveChain
方法来断言任务的剩余链为空:
$job->assertDoesntHaveChain();
测试链式批处理
如果你的任务链包含一个任务批处理,你可以通过在链式断言中插入一个 Bus::chainedBatch
定义来断言链式批处理符合你的预期:
use App\Jobs\ShipOrder;
use App\Jobs\UpdateInventory;
use Illuminate\Bus\PendingBatch;
use Illuminate\Support\Facades\Bus;
Bus::assertChained([
new ShipOrder,
Bus::chainedBatch(function (PendingBatch $batch) {
return $batch->jobs->count() === 3;
}),
new UpdateInventory,
]);
测试任务批处理
Bus
facade 的 assertBatched
方法可以用来断言已经派发了一个任务批处理 。提供给 assertBatched
方法的闭包接收到一个 Illuminate\Bus\PendingBatch
实例,该实例可用于检查批处理中的任务:
use Illuminate\Bus\PendingBatch;
use Illuminate\Support\Facades\Bus;
Bus::fake();
// ...
Bus::assertBatched(function (PendingBatch $batch) {
return $batch->name == 'import-csv' &&
$batch->jobs->count() === 10;
});
你可以使用 assertBatchCount
方法来断言派发了特定数量的批处理:
Bus::assertBatchCount(3);
你可以使用 assertNothingBatched
来断言没有批处理被派发:
Bus::assertNothingBatched();
测试任务 / 批处理交互
此外,你可能偶尔需要测试单个任务与其基础批处理的交互。例如,你可能需要测试任务否取消了其批处理的进一步处理。要做到这一点,你需要通过 withFakeBatch
方法为任务分配一个假批处理。withFakeBatch
方法返回一个包含任务实例和假批处理的元组:
[$job, $batch] = (new ShipOrder)->withFakeBatch();
$job->handle();
$this->assertTrue($batch->cancelled());
$this->assertEmpty($batch->added);
测试任务 / 队列交互
有时候,你可能需要测试一个排队的任务是否将自己释放回队列。或者,你可能需要测试任务是否已经删除了自己。你可以通过实例化任务并调用withFakeQueueInteractions
方法来测试这些队列交互。
一旦任务的队列交互被模拟,你就可以调用作业的handle
方法。调用任务之后,assertReleased
、assertDeleted
和assertFailed
方法可用来对任务的队列交互进行断言:
use App\Jobs\ProcessPodcast;
$job = (new ProcessPodcast)->withFakeQueueInteractions();
$job->handle();
$job->assertReleased(delay: 30);
$job->assertDeleted();
$job->assertFailed();
任务事件
使用 Queue facade 上的 before 和 after 方法,你可以指定要在处理排队任务之前或之后执行的回调。 这些回调是为仪表板执行额外日志记录或增量统计的绝佳机会。 通常,你应该从 服务提供者 的 boot 方法中调用这些方法。 例如,我们可以使用 Laravel 自带的 AppServiceProvider:
使用 Queue
facade 上的 before
和 after
方法,你可以指定在处理排队任务之前或之后执行的回调函数。这些回调函数是执行额外日志记录或为仪表板增加统计数据的绝佳机会。通常,你应该从服务提供者的boot
方法中调用这些方法。例如,我们可以使用 Laravel 自带的AppServiceProvider
:
<?php
namespace App\Providers;
use Illuminate\Support\Facades\Queue;
use Illuminate\Support\ServiceProvider;
use Illuminate\Queue\Events\JobProcessed;
use Illuminate\Queue\Events\JobProcessing;
class AppServiceProvider extends ServiceProvider
{
/**
* 注册任何应用服务
*/
public function register(): void
{
// ...
}
/**
* 引导任何应用服务
*/
public function boot(): void
{
Queue::before(function (JobProcessing $event) {
// $event->connectionName
// $event->job
// $event->job->payload()
});
Queue::after(function (JobProcessed $event) {
// $event->connectionName
// $event->job
// $event->job->payload()
});
}
}
使用 Queue
facade 上的looping
方法, 你可以在 worker 尝试从队列获取任务之前执行指定的回调。例如,你可以注册一个闭包,用以回滚之前失败任务打开的任何事务:
use Illuminate\Support\Facades\DB;
use Illuminate\Support\Facades\Queue;
Queue::looping(function () {
while (DB::transactionLevel() > 0) {
DB::rollBack();
}
});
原文地址:cndocs/11.x/qu...
译文地址:cndocs/11.x/qu...