队列
队列
介绍
在构建 Web 应用程序时,你可能需要执行一些任务(例如解析和存储上传的 CSV 文件),但这些任务在典型的 Web 请求中花费的时间太长。幸运的是,Laravel 允许你轻松地创建可在后台排队处理的任务作业。通过将耗时的任务移到队列中,你的应用程序可以以超快的速度响应 Web 请求,并为客户提供更好的用户体验。
Laravel 队列提供了可以跨各种不同队列后台的统一 API,例如 Amazon SQS、Redis 甚至关系数据库。
队列配置文件存储在 config/queue.php
中。 在这个文件中,你可以找到框架中包含的每个队列驱动程序的连接配置,其中包括数据库,Amazon SQS,Redis,Beanstalkd,和一个同步驱动程序(供本地使用)。还包括一个用于丢弃排队任务的 null
队列驱动。
技巧:现在,Laravel 为你的 Redis 队列提供了 Horizon,一个漂亮的仪表盘和配置系统。查看完整的 Horizon 文档 了解更多信息。
连接 Vs. 队列
在开始使用 Laravel 队列之前,理解「连接」和「队列」之间的区别非常重要。 在 config/queue.php
配置文件中,有一个 connections
配置选项。 此选项定义到后端服务(如 Amazon SQS、Beanstalk 或 Redis)的特定连接。 然而,任何给定的队列连接都可能有多个「队列」,这些「队列」可能被认为是不同的堆栈或成堆的排队任务。
请注意, queue
配置文件中的每个连接配置示例都包含一个 queue
属性。 这是将任务发送到给定连接时将被分配到的默认队列。换句话说,如果您没有显式地定义任务应该被发送到哪个队列,那么该任务将被放置在连接配置的 queue
属性中定义的队列上:
use App\Jobs\ProcessPodcast;
// 这个任务将被推送到默认队列...
ProcessPodcast::dispatch();
// 这个任务将被推送到 "emails" 队列...
ProcessPodcast::dispatch()->onQueue('emails');
有些应用程序可能不需要将任务推到多个队列中,而是倾向于使用一个简单的队列。然而,如果希望对任务的处理方式进行优先级排序或分段时,将任务推送到多个队列就显得特别有用,因为 Laravel 队列工作程序允许您指定哪些队列应该按优先级处理。例如,如果您将任务推送到一个 high
队列,你可能会运行一个赋予它们更高处理优先级的 worker:
php artisan queue:work --queue=high,default
驱动程序说明和先决条件
数据库
要使用 database
队列驱动程序,你需要一个数据库表来保存任务。要生成创建此表的迁移,请运行 queue:table
Artisan 命令。一旦迁移已经创建,你可以使用 migrate
命令迁移你的数据库:
php artisan queue:table
php artisan migrate
Redis
要使用 redis
队列驱动程序,需要在 config/database.php
配置文件中配置一个 redis 数据库连接。
Redis 集群
如果你的 Redis
队列连接使用一个 Redis
集群,那么你的队列名称就必须包含一个 key hash tag。这是为了确保一个给定队列的所有 Redis
键都被放在同一个哈希插槽:
'redis' => [
'driver' => 'redis',
'connection' => 'default',
'queue' => '{default}',
'retry_after' => 90,
],
** 阻塞 **
在使用 Redis 队列时,您可以使用 block_for
配置选项来指定在遍历 worker 循环和重新轮询 Redis 数据库之前,驱动程序需要等待多长时间才能使任务变得可用。
根据您的队列负载调整此值要比连续轮询 Redis 数据库中的新任务更加有效。例如,您可以将值设置为 5
,以指示驱动程序在等待任务变得可用时应该阻塞 5 秒:
'redis' => [
'driver' => 'redis',
'connection' => 'default',
'queue' => 'default',
'retry_after' => 90,
'block_for' => 5,
],
注意:将
block_for
设置为0
将导致队列workers
一直阻塞,直到某一个任务变得可用。这还能防止在下一个任务被处理之前处理诸如SIGTERM
之类的信号。
其他驱动的先决条件
列出的队列驱动需要如下的依赖,这些依赖可通过 Composer 包管理器进行安装:
- Amazon SQS:
aws/aws-sdk-php ~3.0
- Beanstalkd:
pda/pheanstalk ~4.0
- Redis:
predis/predis ~1.0
or phpredis PHP 扩展
创建任务
生成任务类
默认情况下,应用程序的所有的可排队任务都被存储在了 app/Jobs
目录中。如果 app/Jobs
目录不存在,当您运行 make:job
Artisan 命令时,将会自动创建该目录。您可以使用 Artisan CLI 来生成一个新的队列任务:
php artisan make:job ProcessPodcast
生成的类将会实现 Illuminate\Contracts\Queue\ShouldQueue
接口,告诉 Laravel ,该任务应该推入队列以异步的方式运行。
技巧:您可以使用 stub 发布 来自定义任务 stub 。
类结构
任务类非常简单,通常只包含一个 handle
方法,在队列处理任务时将会调用它。让我们看一个任务类的示例。在这个例子中,我们假设我们管理一个 podcast 服务,并且需要在上传的 podcast 文件发布之前对其进行处理:
<?php
namespace App\Jobs;
use App\Models\Podcast;
use App\Services\AudioProcessor;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;
class ProcessPodcast implements ShouldQueue
{
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
/**
* podcast 实例
*
* @var \App\Models\Podcast
*/
protected $podcast;
/**
* 创建一个新的任务实例
*
* @param App\Models\Podcast $podcast
* @return void
*/
public function __construct(Podcast $podcast)
{
$this->podcast = $podcast;
}
/**
* 运行任务
*
* @param App\Services\AudioProcessor $processor
* @return void
*/
public function handle(AudioProcessor $processor)
{
// 处理上传的 podcast...
}
}
在本示例中,请注意,我们能够将一个 Eloquent model 直接传递到已排队任务的构造函数中。由于任务所使用的 SerializesModels
,在任务处理时,Eloquent 模型及其加载的关系将被优雅地序列化和非序列化。
如果你的队列任务在其构造函数中接受一个 Eloquent 模型,那么只有模型的标识符才会被序列化到队列中。当实际处理任务时,队列系统将自动重新从数据库中获取完整的模型实例及其加载的关系。这种用于模型序列化的方式允许将更小的作业有效负载发送给你的队列驱动程序。
<code>handle</code>方法依赖注入
当任务由队列处理时,将调用 handle
方法。注意,我们可以对任务的 handle
方法进行类型提示依赖。Laravel 服务容器 会自动注入这些依赖项。
如果您想完全控制容器如何将依赖注入 handle
方法,你可以使用容器的 bindMethod
方法。bindMethod
方法接受一个可接收任务和容器的回调。在回调中,你可以随意调用 handle
方法。通常,您应该从 服务提供者 中调用此方法:
use App\Jobs\ProcessPodcast;
use App\Services\AudioProcessor;
$this->app->bindMethod([ProcessPodcast::class, 'handle'], function ($job, $app) {
return $job->handle($app->make(AudioProcessor::class));
});
注意:二进制数据,例如原始图像内容,应该在传递到队列任务之前通过
base64_encode
函数传递。否则,在将任务放入队列时,可能无法正确地序列化为 JSON。
处理关系
因为加载的关系也会被序列化,所以处理序列化任务的字符串有时会变得相当大。为了防止该关系被序列化,可以在设置属性值时对模型调用 withoutRelations
方法。此方法将返回没有加载关系的模型实例:
/**
* 创建新的任务实例
*
* @param \App\Models\Podcast $podcast
* @return void
*/
public function __construct(Podcast $podcast)
{
$this->podcast = $podcast->withoutRelations();
}
唯一的任务
注意:唯一的任务需要支持 locks的缓存驱动程序. 目前,
memcached
,redis
,dynamodb
,database
,file
, 和array
缓存驱动程序支持原子锁。此外,唯一任务约束不适用于批量的任务。
有时,您可能需要确保在任何时间点,队列上都只有一个特定任务的实例。可以通过在任务类上执行 ShouldBeUnique
接口来实现。此接口不要求您在类上定义任何其他方法:
<?php
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Contracts\Queue\ShouldBeUnique;
class UpdateSearchIndex implements ShouldQueue, ShouldBeUnique
{
...
}
在上面的示例中,UpdateSearchIndex
是唯一的任务。因此,如果任务的另一个实例已在队列中并且尚未处理完成,则不会处理该任务。
在某些情况下,您可能需要定义一个使任务唯一的特定 「key」 ,或者您可能需要指定一个超时时间,当超过该时间之后,任务将不再是唯一任务。为此,可以在任务类上定义 uniqueId
和 uniqueFor
属性或方法:
<?php
use App\Product;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Contracts\Queue\ShouldBeUnique;
class UpdateSearchIndex implements ShouldQueue, ShouldBeUnique
{
/**
* 产品实例
*
* @var \App\Product
*/
public $product;
/**
* 解除任务唯一锁的秒数
*
* @var int
*/
public $uniqueFor = 3600;
/**
* 任务的 unique ID (唯一ID)
*
* @return string
*/
public function uniqueId()
{
return $this->product->id;
}
}
在上面的示例中,UpdateSearchIndex
任务中的 product ID 是唯一的。因此,在现有任务完成处理之前,任何具有相同 product ID 的任务都将被忽略。此外,如果现有任务在一小时内没有得到处理,则释放唯一锁,并将具有相同唯一键的另一个任务分派到该队列。
在任务处理开始前保证唯一
默认情况下,在任务完成处理或所有重试尝试均失败后,唯一任务将被「解锁」。 但是,在某些情况下,您可能希望任务在处理之前立即解锁。 为此,您的任务类可以继承 ShouldBeUniqueUntilProcessing
类,而不是继承 ShouldBeUnique
类:
<?php
use App\Product;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Contracts\Queue\ShouldBeUniqueUntilProcessing;
class UpdateSearchIndex implements ShouldQueue, ShouldBeUniqueUntilProcessing
{
// ...
}
唯一任务锁
在底层实现中,当分发 ShouldBeUnique
任务时,Laravel 尝试使用 uniqueId
键获取一个 锁。 如果未获取到锁,则不会分派任务。 当任务完成处理或所有重试尝试失败时,将释放此锁。 默认情况下,Laravel 将使用默认的缓存驱动程序来获取此锁。 但是,如果您希望使用其他驱动程序来获取锁,则可以定义一个 uniqueVia
方法,该方法返回一个缓存驱动对象:
use Illuminate\Support\Facades\Cache;
class UpdateSearchIndex implements ShouldQueue, ShouldBeUnique
{
...
/**
* 获取唯一任务锁的缓存驱动程序。
*
* @return \Illuminate\Contracts\Cache\Repository
*/
public function uniqueVia()
{
return Cache::driver('redis');
}
}
技巧:如果只需要限制作业的并发处理,请改用
WithoutOverlapping
任务中间件.
任务中间件
任务中间件允许你围绕排队任务的执行封装自定义逻辑,从而减少了任务本身的样板代码。例如,看下面的 handle
方法,它利用了 Laravel 的 Redis 速率限制特性,允许每 5 秒只处理一个任务:
use Illuminate\Support\Facades\Redis;
/**
* 执行任务
*
* @return void
*/
public function handle()
{
Redis::throttle('key')->block(0)->allow(1)->every(5)->then(function () {
info('Lock obtained...');
// 处理任务...
}, function () {
// 无法获取锁...
return $this->release(5);
});
}
虽然这段代码是有效的, 但是 handle
方法的结构却变得杂乱,因为它掺杂了 Redis 速率限制逻辑。此外,其他任务需要使用速率限制的时候,只能将限制逻辑复制一次。
我们可以定义一个处理速率限制的任务中间件,而不是在 handle 方法中定义速率限制。Laravel 没有任务中间件的默认位置,所以你可以将任务中间件放置在你喜欢的任何位置。在本例中,我们将把中间件放在 app/Jobs/Middleware
目录:
<?php
namespace App\Jobs\Middleware;
use Illuminate\Support\Facades\Redis;
class RateLimited
{
/**
* 处理队列任务
*
* @param mixed $job
* @param callable $next
* @return mixed
*/
public function handle($job, $next)
{
Redis::throttle('key')
->block(0)->allow(1)->every(5)
->then(function () use ($job, $next) {
// 获取锁...
$next($job);
}, function () use ($job) {
// 无法获取锁...
$job->release(5);
});
}
}
正如你看到的,类似于 路由中间件,任务中间件接收正在处理队列任务以及一个回调来继续处理队列任务。
在任务中间件被创建以后,他们可能被关联到通过从任务的 middleware
方法返回的任务。这个方法并不存在于 make:job
Artisan 命令搭建的任务中,所以你需要将它添加到你自己的任务类的定义中:
use App\Jobs\Middleware\RateLimited;
/**
* 获取一个可以被传递通过的中间件任务。
*
* @return array
*/
public function middleware()
{
return [new RateLimited];
}
访问限制
尽管我们刚刚演示了如何编写自己的访问限制的任务中间件,但 Laravel 实际上内置了一个访问限制中间件,你可以利用它来限制任务。与 路由限流器 一样,任务访问限制器是使用 RateLimiter
facade的 for
方法定义的。
例如,你可能希望允许用户每小时备份一次数据,但不对高级客户施加此类限制。为此,可以在 AppServiceProvider
的 boot
方法中定义 RateLimiter
:
use Illuminate\Cache\RateLimiting\Limit;
use Illuminate\Support\Facades\RateLimiter;
/**
* 注册应用程序服务。
*
* @return void
*/
public function boot()
{
RateLimiter::for('backups', function ($job) {
return $job->user->vipCustomer()
? Limit::none()
: Limit::perHour(1)->by($job->user->id);
});
}
在上面的例子中,我们定义了一个小时访问限制;但是,你可以使用 perMinute
方法轻松定义基于分钟的访问限制。此外,您可以将任何值传递给访问限制的 by
方法,但是,这个值通常用于按客户来区分不同的访问限制:
return Limit::perMinute(50)->by($job->user->id);
一旦定义了访问限制,就可以使用 Illuminate\Queue\Middleware\RateLimited
中间件将访问限制附加到任务中。每次任务超过访问限制所限定的时间时,此中间件都会根据访问限制中限定的时间适当的延迟将任务释放回队列。
use Illuminate\Queue\Middleware\RateLimited;
/**
* 获取任务的中间件。
*
* @return array
*/
public function middleware()
{
return [new RateLimited('backups')];
}
将访问限制的任务释放回队列会增加该任务的 attempts
总数。你可能希望相应地调整任务类上的 tries
和 maxExceptions
属性。或者,你可以使用 retryUntil
方法 来定义定义任务超时的时间。
技巧:如果你使用的是Redis,那么您可以使用
Illuminate\Queue\Middleware\RateLimitedWithRedis
中间件,该中间件针对Redis进行了优化,比基本的访问限制中间件效率更高。
防止重复任务
Laravel 包含一个 illighte\Queue\Middleware\WithoutOverlapping
中间件,可以防止基于任意键的重复任务。它可以让一个队列任务正在修改一个资源,而这个资源一次只能由一个任务修改。
例如,假设你有一个更新用户信用评分的队列任务,并且你希望防止同一用户 ID 的信用评分更新任务重复。 为此,你可以从任务的 middleware
方法返回 WithoutOverlapping
中间件:
use Illuminate\Queue\Middleware\WithoutOverlapping;
/**
* 获取任务的中间件。
*
* @return array
*/
public function middleware()
{
return [new WithoutOverlapping($this->user->id)];
}
任何重复任务都将放回队列。你还可以指定重新尝试执行任务之前必须经过的时间(单位为秒):
/**
* 获取任务的中间件。
*
* @return array
*/
public function middleware()
{
return [(new WithoutOverlapping($this->order->id))->releaseAfter(60)];
}
如果你不希望重试任何重复任务并且立即删除,你可以使用 dontRelease
方法:
/**
* 获取任务的中间件。
*
* @return array
*/
public function middleware()
{
return [(new WithoutOverlapping($this->order->id))->dontRelease()];
}
注意:
WithoutOverlapping
中间件需要一个支持 原子锁 的缓存驱动程序。 目前,memcached
、redis
、dynamodb
、database
、file
和array
缓存驱动程序支持原子锁。
分发任务
一旦编写了任务类,就可以使用任务本身的 dispatch
方法来分派它。传递给 dispatch
方法的参数将被传递给任务的构造函数:
<?php
namespace App\Http\Controllers;
use App\Http\Controllers\Controller;
use App\Jobs\ProcessPodcast;
use App\Models\Podcast;
use Illuminate\Http\Request;
class PodcastController extends Controller
{
/**
* 存储一个新的播客。
*
* @param \Illuminate\Http\Request $request
* @return \Illuminate\Http\Response
*/
public function store(Request $request)
{
$podcast = Podcast::create(...);
// ...
ProcessPodcast::dispatch($podcast);
}
}
如果你希望有条件地分派任务,可以使用 dispatchIf
和 dispatchUnless
方法:
ProcessPodcast::dispatchIf($accountActive, $podcast);
ProcessPodcast::dispatchUnless($accountSuspended, $podcast);
延迟分发
如果你希望延迟执行队列任务,可以在分发任务时使用 delay 方法 。例如,让我们指定调度任务在 10 分钟后他被调度后才执行,在这之前任务不会被执行:
<?php
namespace App\Http\Controllers;
use App\Http\Controllers\Controller;
use App\Jobs\ProcessPodcast;
use App\Models\Podcast;
use Illuminate\Http\Request;
class PodcastController extends Controller
{
/**
* 存储一个新的播客。
*
* @param \Illuminate\Http\Request $request
* @return \Illuminate\Http\Response
*/
public function store(Request $request)
{
$podcast = Podcast::create(...);
// ...
ProcessPodcast::dispatch($podcast)
->delay(now()->addMinutes(10));
}
}
注意:亚马逊 SQS 队列服务最大延时执行时间是 15 分钟
响应发送到浏览器后的调度
另外, dispatchAfterResponse
方法会延迟发送任务,直到将响应发送到用户的浏览器之后。这仍然允许用户开始使用应用程序,即使队列任务仍然在执行。这通常只适用于需要 1 秒钟的任务,比如发送电子邮件。由于是在当前 HTTP 请求中处理的,因此以这种方式分派的任务不需要运行队列来处理:
use App\Jobs\SendNotification;
SendNotification::dispatchAfterResponse();
你可以 dispatch
一个闭包,并将 afterResponse
方法放在 dispatch
后,这样在响应发送到浏览器后执行闭包:
use App\Mail\WelcomeMessage;
use Illuminate\Support\Facades\Mail;
dispatch(function () {
Mail::to('taylor@example.com')->send(new WelcomeMessage);
})->afterResponse();
同步调度
如果您想要立即 (同步地) 调度任务,您可以使用 dispatchSync
方法。当使用此方法时,任务将不会排队,并将立即运行在当前进程:
<?php
namespace App\Http\Controllers;
use App\Http\Controllers\Controller;
use App\Jobs\ProcessPodcast;
use App\Models\Podcast;
use Illuminate\Http\Request;
class PodcastController extends Controller
{
/**
* 存储一个新的播客。
*
* @param \Illuminate\Http\Request $request
* @return \Illuminate\Http\Response
*/
public function store(Request $request)
{
$podcast = Podcast::create(...);
// 创建播客...
ProcessPodcast::dispatchSync($podcast);
}
}
任务 & 数据库事务
虽然在数据库事务中分发任务是完全可以的,但你应该特别小心确保你的任务能够成功执行。在事务中分发任务时,该任务可能会在事务提交之前由进程处理。 发生这种情况时,你在数据库事务期间对模型或数据库记录所做的任何修改可能不会在数据库中生效。 此外,在事务中创建的任何模型或数据库记录也可能不会生效。
所幸,Laravel 提供了几种解决这个问题的方法。 首先,你可以在队列连接的配置选项中设置 after_commit
连接选项:
'redis' => [
'driver' => 'redis',
// ...
'after_commit' => true,
],
当 after_commit
选项为 true
时,你可以在数据库事务中分发任务; Laravel 会等到所有打开的数据库事务都已提交,然后才会开始分发任务。 当然,如果当前没有打开的数据库事务,任务将被立即分发。
如果事务因事务期间发生异常而回滚,则在该事务期间分发的已分发任务将被丢弃。
技巧:将
after_commit
配置选项设置为true
还会导致所有排队的事件侦听器、邮件、通知和广播事件在所有打开的数据库事务提交后才被调度。
内联指定提交调度
如果你没有将 after_commit
队列连接配置选项设置为 true
,你可能需要在所有打开的数据库事务提交后才调度特定的任务。 为此,你可以将 afterCommit
方法放到你的调度操作上:
use App\Jobs\ProcessPodcast;
ProcessPodcast::dispatch($podcast)->afterCommit();
同样,如果after_commit
配置选项设置为true
,则可以指示应立即调度特定作业,而无需等待任何打开的数据库事务提交:
ProcessPodcast::dispatch($podcast)->beforeCommit();
任务链
任务链允许您指定一组应在主任务成功执行后按顺序运行的排队任务。如果序列中的一个任务失败,其余的任务将不会运行。要执行一个排队的任务链,你可以使用 Bus
facade 提供的 chain
方法:
use App\Jobs\OptimizePodcast;
use App\Jobs\ProcessPodcast;
use App\Jobs\ReleasePodcast;
use Illuminate\Support\Facades\Bus;
Bus::chain([
new ProcessPodcast,
new OptimizePodcast,
new ReleasePodcast,
])->dispatch();
除了链接作业任务实例,你还可以链接闭包:
Bus::chain([
new ProcessPodcast,
new OptimizePodcast,
function () {
Podcast::update(...);
},
])->dispatch();
注意:使用
$this->delete()
方法删除作业不会阻止已被链接的任务被处理。只有当链中的任务失败时,该链才会停止执行。
链式连接 & 队列
如果你想指定应该用于已连接任务的默认连接和队列,可以使用 onConnection
和 onQueue
方法。这些方法指定了应该使用的队列连接和队列名称,除非队列任务被显式地分配了一个不同的连接 / 队列:
Bus::chain([
new ProcessPodcast,
new OptimizePodcast,
new ReleasePodcast,
])->onConnection('redis')->onQueue('podcasts')->dispatch();
链式故障
当链接作业时,可以使用 catch
方法指定一个闭包,如果链中的作业失败,则应调用该该闭包。 给定的回调将接收导致作业失败的 Throwable
实例:
use Illuminate\Support\Facades\Bus;
use Throwable;
Bus::chain([
new ProcessPodcast,
new OptimizePodcast,
new ReleasePodcast,
])->catch(function (Throwable $e) {
// 链式中的任务失败...
})->dispatch();
自定义队列 & 连接
调度到特定队列
通过将任务推到不同的队列,你可以对排队的任务进行「分类」,甚至可以对分配给不同队列的任务进行优先排序。请记住,这并不是将任务推到你的队列配置文件定义的不同队列「连接」,而是仅推到单个连接中的特定队列。若要指定队列,请在分发任务时使用 onQueue
方法:
<?php
namespace App\Http\Controllers;
use App\Http\Controllers\Controller;
use App\Jobs\ProcessPodcast;
use App\Models\Podcast;
use Illuminate\Http\Request;
class PodcastController extends Controller
{
/**
* 存储一个新的播客。
*
* @param \Illuminate\Http\Request $request
* @return \Illuminate\Http\Response
*/
public function store(Request $request)
{
$podcast = Podcast::create(...);
// 创建播客...
ProcessPodcast::dispatch($podcast)->onQueue('processing');
}
}
或者,你可以通过在任务的构造函数中调用 onQueue
方法来指定任务队列:
<?php
namespace App\Jobs;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;
class ProcessPodcast implements ShouldQueue
{
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
/**
* 创建一个新的任务实例。
*
* @return void
*/
public function __construct()
{
$this->onQueue('processing');
}
}
发送到特定连接
如果你正在使用多个队列连接,可以指定将任务推送到哪个连接。要指定连接,在调度作业时使用 onConnection
方法:
<?php
namespace App\Http\Controllers;
use App\Http\Controllers\Controller;
use App\Jobs\ProcessPodcast;
use App\Models\Podcast;
use Illuminate\Http\Request;
class PodcastController extends Controller
{
/**
* 创建一个新的 podcast
*
* @param \Illuminate\Http\Request $request
* @return \Illuminate\Http\Response
*/
public function store(Request $request)
{
$podcast = Podcast::create(...);
// 创建 podcast...
ProcessPodcast::dispatch($podcast)->onConnection('sqs');
}
}
你可以使用 onConnection
和 onQueue
方法来指定任务的连接和队列:
ProcessPodcast::dispatch($podcast)
->onConnection('sqs')
->onQueue('processing');
或者,你可以通过在调度作业的构造函数中调用 onConnection
方法来指定调度作业的连接。
<?php
namespace App\Jobs;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;
class ProcessPodcast implements ShouldQueue
{
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
/**
* 创建一个新的作业实例
*
* @return void
*/
public function __construct()
{
$this->onConnection('sqs');
}
}
指定任务最大尝试次数 / 超时值
最大尝试次数
如果你的一个队列任务遇到了错误,你可能不希望无限制的重试. 因此 Laravel 提供了各种方法来指定一个任务可以尝试多少次或多长时间。
指定任务可尝试的最大次数的其中一个方法是,通过 Artisan 命令行上的 --tries
开关。这将适用于调度作业的所有任务,除非正在处理的任务指定了最大尝试次数。
php artisan queue:work --tries=3
如果一个任务超过其最大尝试次数,将被视为「失败」的任务。有关处理失败任务的更多信息,可以参考 处理失败队列。
你可以采取更细化的方法来定义任务类本身的最大尝试次数。如果在任务上指定了最大尝试次数,它将优先于命令行上提供的 --tries
开关设定的值:
<?php
namespace App\Jobs;
class ProcessPodcast implements ShouldQueue
{
/**
* 任务可尝试次数
*
* @var int
*/
public $tries = 5;
}
基于时间的尝试
除了定义任务失败前尝试的次数之外,还可以定义任务应该超时的时间。这允许在给定的时间范围内尝试任意次数的任务。要定义任务超时的时间,请在任务类中添加 retryUntil
方法。这个方法应返回一个 DateTime
实例:
/**
* 确定任务应该超时的时间
*
* @return \DateTime
*/
public function retryUntil()
{
return now()->addMinutes(10);
}
技巧:你也可以在队列事件监听器上定义一个
tries
属性或retryUntil
方法。
最大异常数
有时,你可能希望指定某个任务可尝试很多次,但如果重试次数超过了给定数量,导致异常触发(而不是由 release
方法直接释放),则该任务应该失败。为了实现这一点,你可以在你的任务类中定义一个 maxExceptions
属性:
<?php
namespace App\Jobs;
use Illuminate\Support\Facades\Redis;
class ProcessPodcast implements ShouldQueue
{
/**
* 任务可尝试的次数
*
* @var int
*/
public $tries = 25;
/**
* 任务失败前允许的最大异常数
*
* @var int
*/
public $maxExceptions = 3;
/**
* 执行任务
*
* @return void
*/
public function handle()
{
Redis::throttle('key')->allow(10)->every(60)->then(function () {
// Lock obtained, process the podcast...
}, function () {
// Unable to obtain lock...
return $this->release(10);
});
}
}
在此示例中,如果应用程序无法获得 Redis 锁,则任务将释放十秒钟,并将继续重试 25 次。但是,如果任务抛出三个未处理的异常,则该任务将失败。
超时
注意:必须安装 PHP 扩展
pcntl
才能指定任务超时。
通常情况下,你可以大致的评估出排队的任务需要多长时间。因此,Laravel 允许你指定「超时」值。如果任务处理的时间超过超时值指定的秒数,则处理任务的 worker 将退出并显示错误。通常,worker 会被服务器上配置的 进程管理器 自动重启。
同样,任务可以运行的最大秒数可以使用 Artisan 命令行上的 –timeout 开关来指定:
php artisan queue:work --timeout=30
如果任务因不断超时而超过其最大尝试次数,则它将被标记为失败。
你也可以定义允许任务在任务类本身上运行的最大秒数。如果在任务上指定了超时,它将优先于在命令行上指定的任何超时:
<?php
namespace App\Jobs;
class ProcessPodcast implements ShouldQueue
{
/**
* 在超时之前任务可以运行的秒数
*
* @var int
*/
public $timeout = 120;
}
有些时候,诸如 socket 或在 HTTP 连接之类的 IO 阻止进程可能不会遵守你指定的超时。 因此,在使用这些功能时,也应始终尝试使用其API指定超时。 例如,在使用 Guzzle 时,应始终指定连接并请求的超时时间。
错误处理
如果在处理任务时抛出异常,任务将自动释放回队列,以便再次尝试。 任务将继续发布,直到尝试达到你的应用程序允许的最大次数为止。 最大尝试次数由 queue:work
Artisan 命令中使用的 --tries
开关定义。 或者,可以在任务类本身上定义最大尝试次数。 有关运行队列处理器的更多信息可以在下面找到.
手动发布任务
有时你可能希望手动将任务发布回队列,以便稍后再次尝试。 你可以通过调用 release
方法来完成此操作:
/**
* 执行任务。
*
* @return void
*/
public function handle()
{
// ...
$this->release();
}
默认情况下,release
方法会将任务发布回队列以供立即处理。 但是,通过向 release
方法传递一个整数,你可以指示队列在给定的秒数过去之前不使任务可用于处理:
$this->release(10)
手动使任务失败
有时,你可能需要手动将任务标记为“failed”。 为此,您可以调用 fail
方法:
/**
* 执行任务。
*
* @return void
*/
public function handle()
{
// ...
$this->fail();
}
如果你想将你的任务标记为由于你捕获的异常而失败,你可以将异常传递给 fail
方法:
$this->fail($exception);
技巧:有关失败任务的更多信息,请查看处理任务失败的文档 .
错误处理
如果在处理 job 时抛出异常,则 job 将自动释放回队列,以便再次尝试。 该 job 将继续被重试,直到其尝试次数达到应用程序允许的最大次数为止。最大的尝试次数可以在队列 queue:work
中使用的 --tries
定义。 或者, 可以在 job 类本身上定义最大尝试次数。有关运行队列工作者的更多信息 可以在下面找到.
手动发布 Job
有时,您可能希望手动将 job 释放回队列,以便可以在以后再次尝试。你可以通过调用 release
方法完成这一点:
/**
* 执行 job.
*
* @return void
*/
public function handle()
{
// ...
$this->release();
}
一般情况下, release
方法将 将 job 释放回队列以进行立即处理。 然而, 通过将整数传递给 release
方法,您可以指示队列不会使作业可用于处理,直到已经过了设置的秒数:
$this->release(10)
手动标错一个 job
有时,您可能需要手动标记 job 为 fail
。为此,你可以调用 fail
方法:
/**
* 执行 job.
*
* @return void
*/
public function handle()
{
// ...
$this->fail();
}
如果您想将 job 标记为由你捕获的异常而失败,您可以将异常传递给 fail
方法:
$this->fail($exception);
技巧:有关失败的 job 的更多信息, 查看 关于处理 job 失败的文件.
任务批处理
Laravel 的任务批处理功能允许你轻松地执行一批任务,然后在这批任务执行完毕后执行一些操作。 在开始之前,你应该创建一个数据库迁移来构建一个表来包含有关你的任务批次的元信息,例如它们的完成百分比。 这个迁移可以使用 queue:batches-table
Artisan 命令生成:
php artisan queue:batches-table
php artisan migrate
定义可批处理任务
要定义可批处理作业,你应该像往常一样创建可排队任务; 不过,你应当将 Illuminate\Bus\Batchable
特性添加到任务类。 此特征提供对 batch
方法的访问,该方法可用于检索任务正在执行的当前批处理:
<?php
namespace App\Jobs;
use Illuminate\Bus\Batchable;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;
class ImportCsv implements ShouldQueue
{
use Batchable, Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
/**
* 执行任务。
*
* @return void
*/
public function handle()
{
if ($this->batch()->cancelled()) {
// Determine if the batch has been cancelled...
return;
}
// 导入 CSV 文件的一部分...
}
}
调度批次
要分发一批任务,你应该使用 Bus
facade 的 batch
方法。 当然,你可以和批处理的回调结合使用。因此,你可以使用 then
、catch
和 finally
方法来定义批处理的完成回调。 这些回调中的每一个在被调用时都会收到一个 Illuminate\Bus\Batch
实例。 在这个例子中,我们假设我们正在排队一批任务,每个任务处理来自 CSV 文件的给定数量的行:
use App\Jobs\ImportCsv;
use Illuminate\Bus\Batch;
use Illuminate\Support\Facades\Bus;
use Throwable;
$batch = Bus::batch([
new ImportCsv(1, 100),
new ImportCsv(101, 200),
new ImportCsv(201, 300),
new ImportCsv(301, 400),
new ImportCsv(401, 500),
])->then(function (Batch $batch) {
// 所有任务都成功完成...
})->catch(function (Batch $batch, Throwable $e) {
// 检测到第一批任务失败...
})->finally(function (Batch $batch) {
// 批处理已完成执行...
})->dispatch();
return $batch->id;
批处理的 ID 可以通过 $batch->id
属性访问,可用于在分派批处理后查询 Laravel 命令总线以获取有关批处理的信息。
命名批次
如果对批次进行命名,一些工具如 Laravel Horizon 和 Laravel Telescope 可能会为批次提供更多用户友好的调试信息。 要为批处理指定任意名称,你可以在定义批处理时调用 name
方法:
$batch = Bus::batch([
// ...
])->then(function (Batch $batch) {
// 所有作业都成功完成...
})->name('Import CSV')->dispatch();
批量连接和队列
如果你想指定用于批处理任务的连接和队列,你可以使用 onConnection
和 onQueue
方法。 所有批处理任务必须在相同的连接和队列中执行:
$batch = Bus::batch([
// ...
])->then(function (Batch $batch) {
// 所有任务都成功完成...
})->onConnection('redis')->onQueue('imports')->dispatch();
批次内的链
你可以通过将链接任务放置在一个数组中来定义一组链接任务。 例如,我们可以并行执行两个任务链,并在两个任务链完成处理时执行回调:
use App\Jobs\ReleasePodcast;
use App\Jobs\SendPodcastReleaseNotification;
use Illuminate\Bus\Batch;
use Illuminate\Support\Facades\Bus;
Bus::batch([
[
new ReleasePodcast(1),
new SendPodcastReleaseNotification(1),
],
[
new ReleasePodcast(2),
new SendPodcastReleaseNotification(2),
],
])->then(function (Batch $batch) {
// ...
})->dispatch();
批量添加任务
有时,从批处理任务中向批处理添加其他任务可能很有用。 当你需要批处理数千个任务时,这种模式非常有用,而这些任务在 Web 请求期间可能需要很长时间才能调度。 因此,相反,你可能希望分派初始批次的“加载器”任务,这些任务与更多任务相结合:
$batch = Bus::batch([
new LoadImportBatch,
new LoadImportBatch,
new LoadImportBatch,
])->then(function (Batch $batch) {
// 所有任务都成功完成...
})->name('Import Contacts')->dispatch();
在这个示例中,我们将使用 LoadImportBatch
实例将其他任务添加到批处理,要实现这个功能,我们还需要在任务当中调用批处理的 add
来完成添加:
use App\Jobs\ImportContacts;
use Illuminate\Support\Collection;
/**
* 执行任务。
*
* @return void
*/
public function handle()
{
if ($this->batch()->cancelled()) {
return;
}
$this->batch()->add(Collection::times(1000, function () {
return new ImportContacts;
}));
}
注意:你只能将任务添加到当前任务所属的批处理中。
校验批处理
提供给批完成回调的 Illuminate\Bus\Batch
实例有多种属性和方法来帮助您与指定的批处理任务进行交互和检查:
// 批处理的UUID...
$batch->id;
// 批处理的名称(如果已经设置的话)...
$batch->name;
// 分配给批处理的任务数量...
$batch->totalJobs;
// 队列还没处理的任务数量...
$batch->pendingJobs;
// 失败的任务数量...
$batch->failedJobs;
// 到目前为止已经处理的任务数量...
$batch->processedJobs();
// 批处理已经完成的百分比(0-100)...
$batch->progress();
// 批处理是否已经完成执行...
$batch->finished();
// 取消批处理的运行...
$batch->cancel();
// 批处理是否已经取消...
$batch->cancelled();
从路由返回批处理
所有的 Illuminate\Bus\Batch
实例都是 JSON 可序列化的,这意味着你可以直接从应用程序的某个路由返回它们,以检索包含批处理信息的 JSON 有效负载,包括它的完成进度。这样可以方便地在应用程序的 UI 中显示批处理完成进度的信息。
要通过批处理 ID 来获取对应的批处理,可以使用 Bus
facade 的 findBatch
方法:
use Illuminate\Support\Facades\Bus;
use Illuminate\Support\Facades\Route;
Route::get('/batch/{batchId}', function (string $batchId) {
return Bus::findBatch($batchId);
});
取消批处理
有时候你可能需要取消指定的批处理的执行,可以通过 Illuminate\Bus\Batch
实例调用 cancel
方法来完成:
/**
* 执行任务
*
* @return void
*/
public function handle()
{
if ($this->user->exceedsImportLimit()) {
return $this->batch()->cancel();
}
if ($this->batch()->cancelled()) {
return;
}
}
通过前面的示例代码,那你可能已经注意到,批量处理任务时通常应该从它的handle
方法开始检查批量处理是否已经被取消:
/**
* 执行任务
*
* @return void
*/
public function handle()
{
if ($this->batch()->cancelled()) {
return;
}
// 继续处理...
}
批处理失败
当一个批处理的任务失败时,会调用 catch
回调(如果已定义),该回调只有在批处理中的任务运行失败才会调用。
允许失败
当批处理中的任务失败时,Laravel 会自动将该批处理标记为「已取消」,如果需要的话,你可以禁用该行为,可以通过分发批处理时调用 allowFailures
方法来实现:
$batch = Bus::batch([
// ...
])->then(function (Batch $batch) {
//所有任务已成功完成...
})->allowFailures()->dispatch();
重启失败的批处理任务
为了方便操作,Laravel提供了一个 Artisan 命令 queue:retry-batch
,该命令可以让你轻松重试批处理中所有失败的任务。queue:retry-batch
命令接收需要重试失败任务的批处理的 UUID :
php artisan queue:retry-batch 32dbc76c-4f82-4749-b610-a639fe0099b5
批量清理
如果不进行清理, job_batches
表中将会快速累积执行记录。为了缓解这种情况,你可以在 任务调度 中每天运行 queue:prune-batches
Artisan 命令:
$schedule->command('queue:prune-batches')->daily();
默认情况下,超过24小时的记录都会被清理掉。你可以在调用命令时使用 hours
选项来确定保留批量处理数据的时间。例如,下面的命令将批量清理超过48小时的数据记录:
$schedule->command('queue:prune-batches --hours=48')->daily();
队列闭包
除了将作业类推送到队列之外,你还可以推送闭包到队列。这对于需要在当前请求周期之外执行简单快捷的任务时非常方便。在将闭包推送到队列时,闭包的代码内容是经过加密签名的,因为在传输过程中没有办法进行修改:
$podcast = App\Podcast::find(1);
dispatch(function () use ($podcast) {
$podcast->publish();
});
当队列的闭包方法失败重试次数达到最大尝试次数:后仍没有成功运行时,会执行 catch
方法中的闭包:
use Throwable;
dispatch(function () use ($podcast) {
$podcast->publish();
})->catch(function (Throwable $e) {
// 任务失败了...
});
运行队列处理器
<code>queue:work</code> 命令
Laravel 有一个队列处理器对新推入队列的任务进行处理。通过 Artisan 命令 queue:work
来启动队列处理器。需要注意的是,一旦 queue:work
命令启动,将一直保持运行,直到它被手动停止或你关闭你的终端:
php artisan queue:work
技巧:为了让
queue:work
进程永久地在后台运行,您应该使用一个进程监视器,如Supervisor,以确保队列worker不会停止运行。
请记住,队列处理器是长生命周期的进程,并将启动的应用程序状态存储在内存中。因此,在启动它们之后,代码库中的更改对其不起作用。因此,在部署过程中,一定要重新启动你的队列处理器。此外,请记住,应用程序创建或修改的任何静态状态不会在任务之间自动重置。
或者,你可以运行 queue:listen
命令。在使用 queue:listen
命令时,当你想要重新加载更新的代码或重置应用程序状态时,你不必手动重新启动worker;但是,这个命令的效率不如 queue:work
:
php artisan queue:listen
运行多个队列任务
如果要将多个 worker 分配给一个队列来同时处理作业,你也可以很简单的启动多个queue:work
进程。它可以通过终端中的多个选项卡在本地完成,也可以通过配置流程管理器在生产环境中完成。当使用Supervisor时,你可以通过numprocs
配置值。
指定连接 & 队列
你可以指定任务处理器使用哪个连接。传递给 work
命令的连接名应该与 config/queue.php
配置文件中定义的一个连接相对应:
php artisan queue:work redis
你甚至可以通过仅处理特定连接的特定队列来进一步定制你的队列任务处理器。例如,如果你所有的电子邮件都在你的 redis
队列连接的 emails
队列中处理,你可以发出以下命令来启动一个只处理该队列的任务处理器:
php artisan queue:work redis --queue=emails
处理给定数量的任务
--once
参数可以用来指定任务处理器只处理一个队列中的任务:
php artisan queue:work --once
--max-jobs
参数可以指定任务处理器处理了多少个任务后关闭。这个参数可以用来结合 Supervisor 设置任务处理器执行多少个任务后重启:
php artisan queue:work --max-jobs=1000
处理所有队列中的任务 & 然后退出
--stop-when-empty
参数可以指定任务处理器处理所有任务后关闭。如果您希望在队列为空后关闭该容器,请在 Docker 容器中处理 Laravel 队列时使用此选项:
php artisan queue:work --stop-when-empty
处理给定时间的任务
--max-time
参数可以指定任务处理器处理了多少秒后关闭。这个参数可以用来结合 Supervisor 设置任务处理器执行多少秒后重启:
// 一小时后关闭...
php artisan queue:work --max-time=3600
Worker 睡眠时间
当队列中有可用的作业时,worker 将继续处理作业,它们之间没有延迟。但是,sleep
选项决定了在没有新工作可用的情况下 worker 将“休眠”多少秒。休眠时,worker 不会处理任何新作业 - 作业将在 worker 再次醒来后处理。
php artisan queue:work --sleep=3
资源方面的考虑
守护进程队列 worker 程序在处理每个任务之前不会「重新启动」框架。因此,你应该在每个任务完成后释放所有繁重的资源。例如,如果你正在使用 GD 库进行图像处理,那么在完成之后,应该使用 imagedestroy
来释放内存。
队列优先级
有时,你可能希望优先考虑如何处理队列。例如,在 config/queue.php
中,你可以将你的 redis
连接的默认 queue
设置为 low
。然而,有时你可能希望将一个任务推到一个 high
优先级队列,就像这样:
dispatch((new Job)->onQueue('high'));
要启动一个 worker,它在继续执行 low
队列上的任何作业之前,验证所有的 high
队列任务都被处理了,请将一个以逗号分隔的队列名称列表传递给 work
命令:
php artisan queue:work --queue=high,low
队列 worker & 部署
因为队列 worker 是长生命周期的进程,所以在重启之前,任何的代码更改都不会生效。因此,使用队列 worker 部署应用程序的最简单方法是在部署过程中重新启动 worker。你可以通过执行 queue:restart
命令来优雅地重新启动所有的 worker:
php artisan queue:restart
该命令将指示所有队列 worker 在完成当前任务后优雅地 “死亡”,这样就不会丢失现有的任务。由于在执行 queue:restart
命令时,队列 worker 将被杀掉,因此你应该运行一个进程管理器 (如 Supervisor) 来自动重新启动队列 worker。
技巧:队列使用 缓存 来存储重启信号,因此在使用该特性之前,你应该检查应用程序的缓存驱动程序是否正确配置。
任务到期 & 超时
任务到期
在你的 config/queue.php
配置文件,每个队列连接定义一个 retry_after
选项。此选项指定在重试正在处理的任务之前,队列连接应等待多少秒。例如,如果 retry_after
的值被设置为 90
,那么如果任务已经处理了 90 秒而没有被删除,那么它将被释放回队列。通常,您应该将 retry_after
值设置为你的任务完成处理所需的最大秒数。
注意:唯一不包含
retry_after
值的队列连接是 Amazon SQS。SQS 将基于在 AWS 控制台中管理的默认可见性超时重试任务。
Worker 超时
queue:work
Artisan 命令暴露一个 --timeout
选项。如果作业的处理时间超过超时值指定的秒数,则处理作业的工作人员将退出并显示错误。 通常情况下, worker 会被 你的服务器上配置的进程管理器自动重启:
php artisan queue:work --timeout=60
retry_after
配置选项和 --timeout
CLI 选项是不同的,但它们共同确保不会丢失任务,并且任务只被成功处理一次。
注意:
--timeout
值应该总是比retry_after
配置值至少短几秒。这将确保处理给定任务的 worker 总是在重试作业之前被杀死。如果你的--timeout
选项比你的retry_after
配置值长,你的任务可能会被处理两次。
Supervisor 配置
在生产环境中,你通常需要保证 queue:work
进程时刻处于运行状态。但该进程有可能因为各种各样的原因而意外终止,例如 Worker 的超时溢出 或者 queue:restart
命令的执行等。
因此,我们需要配置一个进程监控器来监控 queue:work
进程的状态并在需要时进行重启。另外,进程监控器也可以用来指定同时运行的 queue:work
进程数目。Supervisor 是一个常见于 Linux 环境的进程监控器,我们将在下文中用其进行讲解并说明如何正确配置它。
安装 Supervisor
Supervisor 是一个用于 Linux 操作系统的进程监视器,可以在 queue:work
进程意外终止时自动进行重启。你可以使用以下命令在 Ubuntu 上安装 Supervisor:
sudo apt-get install supervisor
技巧:如果你觉得自己配置 Supervisor 很困难,可以考虑使用 Laravel Forge,它将自动为你的 Laravel 项目安装和配置 Supervisor。
配置 Supervisor
Supervisor 配置文件通常储存在 /etc/supervisor/conf.d
目录。在此目录中,你可以创建任意数量的配置文件,这些配置文件将指示 supervisor 如何监视你的进程。例如,让我们创建一个 laravel-worker.conf
文件,启动并监视 queue:work
进程:
[program:laravel-worker]
process_name=%(program_name)s_%(process_num)02d
command=php /home/forge/app.com/artisan queue:work sqs --sleep=3 --tries=3 --max-time=3600
autostart=true
autorestart=true
stopasgroup=true
killasgroup=true
user=forge
numprocs=8
redirect_stderr=true
stdout_logfile=/home/forge/app.com/worker.log
stopwaitsecs=3600
在这个例子中,numprocs
指令将指示 Supervisor 运行 8 个 queue:work
进程并监视所有进程,如果它们失败会自动重新启动它们。你应更改配置的 command
指令以反映你所需的队列连接和工人选项。
注意:你应该确保
stopwaitsecs
的值大于运行时间最长的任务所消耗的秒数。 否则,Supervisor 可能会在任务完成处理之前终止该任务。
启动 Supervisor
创建配置文件后,你可以更新 Supervisor 配置并使用以下命令启动进程:
sudo supervisorctl reread
sudo supervisorctl update
sudo supervisorctl start laravel-worker:*
有关 Supervisor 的更多信息,请参阅 Supervisor 文档.
处理失败的任务
有时你排队的任务会失败。 别担心,事情并不总是按计划进行! Laravel 提供了一种方便的方法来指定应尝试任务的最大次数。任务超过此尝试次数后,将插入到 failed_jobs
数据库表中。 当然,如果该表尚不存在,我们将需要创建该表。要为 failed_jobs
表创建迁移,您可以使用 queue:failed-table
命令:
php artisan queue:failed-table
php artisan migrate
运行队列工作进程时,你可以使用 queue:work
命令上的 --tries
开关指定尝试任务的最大次数。 如果你没有为 --tries
选项指定值,则任务将仅尝试一次或与任务类的 $tries
属性指定的次数相同:
php artisan queue:work redis --tries=3
使用 --backoff
选项,你可以指定 Laravel 在重试遇到异常的任务之前应该等待的秒数。 默认情况下,任务会立即释放回队列,以便可以再次尝试:
php artisan queue:work redis --tries=3 --backoff=3
如果你想配置 Laravel 在重试每个任务遇到异常的任务之前应该等待的秒数,你可以通过在任务类上定义一个 backoff
属性来实现:
/**
* 重试任务前等待的秒数。
*
* @var int
*/
public $backoff = 3;
如果你需要更复杂的逻辑来确定任务的退避时间,你可以在你的任务类中定义一个 backoff
方法:
/**
* 计算重试任务前等待的秒数。
*
* @return int
*/
public function backoff()
{
return 3;
}
你可以通过从 backoff
方法返回一组退避值来轻松配置“指数”退避。 在此示例中,第一次重试的重试延迟为 1 秒,第二次重试为 5 秒,第三次重试为 10 秒:
/**
* 计算重试任务前等待的秒数。
*
* @return array
*/
public function backoff()
{
return [1, 5, 10];
}
任务失败后的清理
当特定任务失败时,你可能希望向用户发送警报或恢复任务部分完成的任何操作。 为此,你可以在任务类上定义一个 failed
方法。 导致任务失败的 Throwable
实例将被传递给 failed
方法:
<?php
namespace App\Jobs;
use App\Models\Podcast;
use App\Services\AudioProcessor;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;
use Throwable;
class ProcessPodcast implements ShouldQueue
{
use InteractsWithQueue, Queueable, SerializesModels;
/**
* 播客实例。
*
* @var \App\Podcast
*/
protected $podcast;
/**
* 创建一个新的任务实例。
*
* @param \App\Models\Podcast $podcast
* @return void
*/
public function __construct(Podcast $podcast)
{
$this->podcast = $podcast;
}
/**
* 执行任务。
*
* @param \App\Services\AudioProcessor $processor
* @return void
*/
public function handle(AudioProcessor $processor)
{
// 处理上传的播客......
}
/**
* 处理任务失败。
*
* @param \Throwable $exception
* @return void
*/
public function failed(Throwable $exception)
{
// 向用户发送失败通知等......
}
}
重试失败的任务
要查看已插入到你的 failed_jobs
数据库表中的所有失败任务,你可以使用 queue:failed
Artisan 命令:
php artisan queue:failed
此 queue:failed
命令将列出任务 ID、连接、队列、失败时间和其它有关任务的信息。 任务 ID 可用于重试失败的任务。 例如,要重试 ID 为 5
的失败任务,请发出以下命令:
php artisan queue:retry 5
如有必要,你可以将多个 ID 或 ID 范围(当使用数字 ID 时)传递给命令:
php artisan queue:retry 5 6 7 8 9 10
php artisan queue:retry --range=5-10
要重试所有失败的任务,请执行 queue:retry
命令并传递 all
作为 ID:
php artisan queue:retry all
如果你想删除一个失败的任务,你可以使用 queue:forget
命令:
php artisan queue:forget 5
技巧:使用 Horizon 时,你应该使用
horizon:forget
命令来删除失败的任务,而不是使用queue:forget
命令。
要从 failed_jobs
表中删除所有失败的任务,你可以使用 queue:flush
命令:
php artisan queue:flush
忽略缺失的模型
将 Eloquent 模型注入任务时,模型会在放入队列之前自动序列化,并在处理任务时从数据库中重新检索。但是,如果在任务等待工作人员处理时模型已被删除,你的任务可能会因 ModelNotFoundException
而失败。
为方便起见,你可以通过将任务的 deleteWhenMissingModels
属性设置为 true
来选择自动删除缺少模型的任务。 当此属性设置为 true
时,Laravel 将悄悄丢弃任务而不引发异常:
/**
* 如果其模型不再存在,则删除该任务。
*
* @var bool
*/
public $deleteWhenMissingModels = true;
失败任务事件
如果你想注册一个在任务失败时将被调用的事件侦听器,您可以使用 Queue
门面的 failing
方法。 例如,我们可以从 Laravel 中包含的 AppServiceProvider
的 boot
方法为这个事件附加一个闭包:
<?php
namespace App\Providers;
use Illuminate\Support\Facades\Queue;
use Illuminate\Support\ServiceProvider;
use Illuminate\Queue\Events\JobFailed;
class AppServiceProvider extends ServiceProvider
{
/**
* 注册任何应用程序服务。
*
* @return void
*/
public function register()
{
//
}
/**
* 引导任何应用程序服务。
*
* @return void
*/
public function boot()
{
Queue::failing(function (JobFailed $event) {
// $event->connectionName
// $event->job
// $event->exception
});
}
}
从队列中清除任务
技巧:使用 Horizon 时,您应该使用
horizon:clear
命令来清除队列中的任务,而不是使用queue:clear
命令。
如果你想从默认连接的默认队列中删除所有任务,你可以使用 queue:clear
Artisan 命令来执行此操作:
php artisan queue:clear
你还可以提供 connection
参数和 queue
选项以从特定连接和队列中删除任务:
php artisan queue:clear redis --queue=emails
注意:从队列中清除任务仅适用于 SQS、Redis 和数据库队列驱动程序。 此外,SQS 消息删除过程最多需要 60 秒,因此在你清除队列后 60 秒内发送到 SQS 队列的任务也可能会被删除。
任务事件
通过使用 Queue
facade 的 before
和 after
方法,可以指定在处理排队任务之前或之后执行的回调。如果要为控制面板执行附加日志记录或者增量统计,这些回调会是最佳的机会。 通常,你应该从 服务提供者 的 boot
方法调用。例如,我们可以使用 Laravel 的 AppServiceProvider
:
<?php
namespace App\Providers;
use Illuminate\Support\Facades\Queue;
use Illuminate\Support\ServiceProvider;
use Illuminate\Queue\Events\JobProcessed;
use Illuminate\Queue\Events\JobProcessing;
class AppServiceProvider extends ServiceProvider
{
/**
* 注册应用程序服务
*
* @return void
*/
public function register()
{
//
}
/**
* 启动应用程序服务
*
* @return void
*/
public function boot()
{
Queue::before(function (JobProcessing $event) {
// $event->connectionName
// $event->job
// $event->job->payload()
});
Queue::after(function (JobProcessed $event) {
// $event->connectionName
// $event->job
// $event->job->payload()
});
}
}
使用 Queue
facade 的 looping
方法,你可以在 worker 尝试从队列获取任务之前执行指定的回调。例如, 你可以注册一个闭包,用以回滚之前失败任务打开的任何事务:
use Illuminate\Support\Facades\DB;
use Illuminate\Support\Facades\Queue;
Queue::looping(function () {
while (DB::transactionLevel() > 0) {
DB::rollBack();
}
});