队列
介绍
Laravel 现在提供了 Horizon,一个美观的仪表板和配置系统,用于您的 Redis 驱动队列。查看完整的 Horizon 文档 以获取更多信息。
Laravel 队列为各种不同的队列后端提供了统一的 API,例如 Beanstalk、Amazon SQS、Redis,甚至是关系数据库。队列允许您推迟处理耗时的任务,例如发送电子邮件,直到稍后的时间。推迟这些耗时的任务可以显著加快应用程序的 Web 请求速度。
队列配置文件存储在 config/queue.php
中。在此文件中,您将找到框架中包含的每个队列驱动程序的连接配置,其中包括数据库、Beanstalkd、Amazon SQS、Redis 和一个同步驱动程序,该驱动程序将立即执行作业(用于本地使用)。还包括一个 null
队列驱动程序,它会丢弃排队的作业。
连接与队列
在开始使用 Laravel 队列之前,了解“连接”和“队列”之间的区别很重要。在您的 config/queue.php
配置文件中,有一个 connections
配置选项。此选项定义了与后端服务(如 Amazon SQS、Beanstalk 或 Redis)的特定连接。然而,任何给定的队列连接可能有多个“队列”,可以被视为不同的作业堆栈或堆。
请注意,queue
配置文件中的每个连接配置示例都包含一个 queue
属性。这是将作业发送到给定连接时将被调度到的默认队列。换句话说,如果您在调度作业时没有明确定义应将其调度到哪个队列,则作业将被放置在连接配置的 queue
属性中定义的队列中:
// 这个作业被发送到默认队列...
Job::dispatch();
// 这个作业被发送到“emails”队列...
Job::dispatch()->onQueue('emails');
某些应用程序可能不需要将作业推送到多个队列,而是更喜欢拥有一个简单的队列。然而,将作业推送到多个队列对于希望优先处理或分段处理作业的应用程序特别有用,因为 Laravel 队列工作者允许您按优先级指定应处理哪些队列。例如,如果您将作业推送到 high
队列,您可以运行一个工作者,给予它们更高的处理优先级:
php artisan queue:work --queue=high,default
驱动程序先决条件
数据库
要使用 database
队列驱动程序,您需要一个数据库表来保存作业。要生成创建此表的迁移,请运行 queue:table
Artisan 命令。创建迁移后,您可以使用 migrate
命令迁移数据库:
php artisan queue:table
php artisan migrate
Redis
要使用 redis
队列驱动程序,您应在 config/database.php
配置文件中配置 Redis 数据库连接。
如果您的 Redis 队列连接使用 Redis 集群,您的队列名称必须包含一个 key hash tag。这是为了确保给定队列的所有 Redis 键都放置在同一个哈希槽中:
'redis' => [
'driver' => 'redis',
'connection' => 'default',
'queue' => '{default}',
'retry_after' => 90,
],
其他驱动程序先决条件
以下依赖项是列出的队列驱动程序所需的:
- Amazon SQS:
aws/aws-sdk-php ~3.0
- Beanstalkd:
pda/pheanstalk ~3.0
- Redis:
predis/predis ~1.0
创建作业
生成作业类
默认情况下,应用程序的所有可排队作业都存储在 app/Jobs
目录中。如果 app/Jobs
目录不存在,当您运行 make:job
Artisan 命令时,它将被创建。您可以使用 Artisan CLI 生成一个新的排队作业:
php artisan make:job ProcessPodcast
生成的类将实现 Illuminate\Contracts\Queue\ShouldQueue
接口,指示 Laravel 该作业应被推送到队列以异步运行。
类结构
作业类非常简单,通常只包含一个 handle
方法,该方法在作业被队列处理时调用。首先,让我们看一个示例作业类。在此示例中,我们假装管理一个播客发布服务,并需要在发布前处理上传的播客文件:
<?php
namespace App\Jobs;
use App\Podcast;
use App\AudioProcessor;
use Illuminate\Bus\Queueable;
use Illuminate\Queue\SerializesModels;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
class ProcessPodcast implements ShouldQueue
{
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
protected $podcast;
/**
* 创建一个新的作业实例。
*
* @param Podcast $podcast
* @return void
*/
public function __construct(Podcast $podcast)
{
$this->podcast = $podcast;
}
/**
* 执行作业。
*
* @param AudioProcessor $processor
* @return void
*/
public function handle(AudioProcessor $processor)
{
// 处理上传的播客...
}
}
在此示例中,请注意我们能够将 Eloquent 模型 直接传递到排队作业的构造函数中。由于作业使用的 SerializesModels
trait,Eloquent 模型将在作业处理时被优雅地序列化和反序列化。如果您的排队作业在其构造函数中接受 Eloquent 模型,则只有模型的标识符会被序列化到队列中。当作业实际处理时,队列系统将自动从数据库中重新检索完整的模型实例。这对您的应用程序来说是完全透明的,并防止了序列化完整 Eloquent 模型实例时可能出现的问题。
handle
方法在作业被队列处理时调用。请注意,我们能够在作业的 handle
方法上进行类型提示依赖项。Laravel 服务容器 会自动注入这些依赖项。
二进制数据,例如原始图像内容,应在传递给排队作业之前通过 base64_encode
函数传递。否则,作业可能无法在放入队列时正确序列化为 JSON。
调度作业
编写作业类后,您可以使用作业本身的 dispatch
方法调度它。传递给 dispatch
方法的参数将传递给作业的构造函数:
<?php
namespace App\Http\Controllers;
use App\Jobs\ProcessPodcast;
use Illuminate\Http\Request;
use App\Http\Controllers\Controller;
class PodcastController extends Controller
{
/**
* 存储一个新的播客。
*
* @param Request $request
* @return Response
*/
public function store(Request $request)
{
// 创建播客...
ProcessPodcast::dispatch($podcast);
}
}
延迟调度
如果您希望延迟执行排队作业,可以在调度作业时使用 delay
方法。例如,让我们指定一个作业在调度后 10 分钟内不可用于处理:
<?php
namespace App\Http\Controllers;
use App\Jobs\ProcessPodcast;
use Illuminate\Http\Request;
use App\Http\Controllers\Controller;
class PodcastController extends Controller
{
/**
* 存储一个新的播客。
*
* @param Request $request
* @return Response
*/
public function store(Request $request)
{
// 创建播客...
ProcessPodcast::dispatch($podcast)
->delay(now()->addMinutes(10));
}
}
Amazon SQS 队列服务的最大延迟时间为 15 分钟。
作业链
作业链允许您指定一系列应按顺序运行的排队作业。如果序列中的一个作业失败,其余作业将不会运行。要执行排队作业链,您可以在任何可调度作业上使用 withChain
方法:
ProcessPodcast::withChain([
new OptimizePodcast,
new ReleasePodcast
])->dispatch();
自定义队列和连接
调度到特定队列
通过将作业推送到不同的队列,您可以“分类”排队作业,甚至可以优先考虑您分配给各种队列的工作者数量。请记住,这不会将作业推送到队列配置文件中定义的不同队列“连接”,而仅推送到单个连接内的特定队列。要指定队列,请在调度作业时使用 onQueue
方法:
<?php
namespace App\Http\Controllers;
use App\Jobs\ProcessPodcast;
use Illuminate\Http\Request;
use App\Http\Controllers\Controller;
class PodcastController extends Controller
{
/**
* 存储一个新的播客。
*
* @param Request $request
* @return Response
*/
public function store(Request $request)
{
// 创建播客...
ProcessPodcast::dispatch($podcast)->onQueue('processing');
}
}
调度到特定连接
如果您正在使用多个队列连接,可以指定将作业推送到哪个连接。要指定连接,请在调度作业时使用 onConnection
方法:
<?php
namespace App\Http\Controllers;
use App\Jobs\ProcessPodcast;
use Illuminate\Http\Request;
use App\Http\Controllers\Controller;
class PodcastController extends Controller
{
/**
* 存储一个新的播客。
*
* @param Request $request
* @return Response
*/
public function store(Request $request)
{
// 创建播客...
ProcessPodcast::dispatch($podcast)->onConnection('sqs');
}
}
当然,您可以链接 onConnection
和 onQueue
方法,以指定作业的连接和队列:
ProcessPodcast::dispatch($podcast)
->onConnection('sqs')
->onQueue('processing');
指定最大作业尝试次数/超时值
最大尝试次数
指定作业可以尝试的最大次数的一种方法是通过 Artisan 命令行上的 --tries
开关:
php artisan queue:work --tries=3
但是,您可以通过在作业类本身上定义最大尝试次数来采取更细粒度的方法。如果在作业上指定了最大尝试次数,它将优先于命令行上提供的值:
<?php
namespace App\Jobs;
class ProcessPodcast implements ShouldQueue
{
/**
* 作业可以尝试的次数。
*
* @var int
*/
public $tries = 5;
}
基于时间的尝试
作为定义作业可以尝试多少次的替代方法,您可以定义作业应超时的时间。这允许作业在给定时间范围内尝试任意次数。要定义作业应超时的时间,请在作业类中添加 retryUntil
方法:
/**
* 确定作业应超时的时间。
*
* @return \DateTime
*/
public function retryUntil()
{
return now()->addSeconds(5);
}
您还可以在排队的事件监听器上定义 retryUntil
方法。
超时
timeout
功能针对 PHP 7.1+ 和 pcntl
PHP 扩展进行了优化。
同样,可以使用 Artisan 命令行上的 --timeout
开关指定作业可以运行的最大秒数:
php artisan queue:work --timeout=30
但是,您也可以在作业类本身上定义作业应允许运行的最大秒数。如果在作业上指定了超时,它将优先于命令行上指定的任何超时:
<?php
namespace App\Jobs;
class ProcessPodcast implements ShouldQueue
{
/**
* 作业可以运行的最大秒数。
*
* @var int
*/
public $timeout = 120;
}
速率限制
此功能要求您的应用程序可以与 Redis 服务器 交互。
如果您的应用程序与 Redis 交互,您可以按时间或并发限制排队作业的速率。当您的排队作业与同样受速率限制的 API 交互时,此功能可以提供帮助。例如,使用 throttle
方法,您可以限制给定类型的作业每 60 秒仅运行 10 次。如果无法获得锁,您通常应将作业释放回队列,以便稍后重试:
Redis::throttle('key')->allow(10)->every(60)->then(function () {
// 作业逻辑...
}, function () {
// 无法获得锁...
return $this->release(10);
});
在上面的示例中,key
可以是任何唯一标识您希望限制速率的作业类型的字符串。例如,您可能希望根据作业的类名和它操作的 Eloquent 模型的 ID 构建键。
或者,您可以指定可以同时处理给定作业的最大工作者数量。当排队作业正在修改一个应该只由一个作业同时修改的资源时,这可能会有所帮助。例如,使用 funnel
方法,您可以限制给定类型的作业仅由一个工作者同时处理:
Redis::funnel('key')->limit(1)->then(function () {
// 作业逻辑...
}, function () {
// 无法获得锁...
return $this->release(10);
});
使用速率限制时,您的作业需要成功运行的尝试次数可能很难确定。因此,将速率限制与 基于时间的尝试 结合使用是有用的。
错误处理
如果在处理作业时抛出异常,作业将自动释放回队列,以便可以再次尝试。作业将继续释放,直到它已被尝试到应用程序允许的最大次数。最大尝试次数由 queue:work
Artisan 命令上使用的 --tries
开关定义。或者,可以在作业类本身上定义最大尝试次数。有关运行队列工作者的更多信息 可以在下面找到。
运行队列工作者
Laravel 包含一个队列工作者,它将在作业推送到队列时处理新作业。您可以使用 queue:work
Artisan 命令运行工作者。请注意,一旦 queue:work
命令启动,它将继续运行,直到手动停止或关闭终端:
php artisan queue:work
要使 queue:work
进程永久在后台运行,您应使用进程监视器,例如 Supervisor,以确保队列工作者不会停止运行。
请记住,队列工作者是长时间运行的进程,并将已启动的应用程序状态存储在内存中。因此,它们在启动后不会注意到代码库中的更改。因此,在部署过程中,请确保 重新启动队列工作者。
处理单个作业
可以使用 --once
选项指示工作者仅从队列中处理一个作业:
php artisan queue:work --once
指定连接和队列
您还可以指定工作者应使用哪个队列连接。传递给 work
命令的连接名称应对应于 config/queue.php
配置文件中定义的连接之一:
php artisan queue:work redis
您可以通过仅处理给定连接的特定队列来进一步自定义队列工作者。例如,如果您的所有电子邮件都在 redis
队列连接上的 emails
队列中处理,您可以发出以下命令以启动仅处理该队列的工作者:
php artisan queue:work redis --queue=emails
资源考虑
守护进程队列工作者在处理每个作业之前不会“重启”框架。因此,您应在每个作业完成后释放任何重资源。例如,如果您使用 GD 库进行图像处理,完成后应使用 imagedestroy
释放内存。
队列优先级
有时您可能希望优先处理队列。例如,在 config/queue.php
中,您可以将 redis
连接的默认 queue
设置为 low
。然而,偶尔您可能希望将作业推送到 high
优先级队列,如下所示:
dispatch((new Job)->onQueue('high'));
要启动一个工作者,确保所有 high
队列作业在继续处理 low
队列上的任何作业之前都已处理完毕,请将逗号分隔的队列名称列表传递给 work
命令:
php artisan queue:work --queue=high,low
队列工作者与部署
由于队列工作者是长时间运行的进程,因此在不重新启动的情况下不会拾取代码更改。因此,使用队列工作者部署应用程序的最简单方法是在部署过程中重新启动工作者。您可以通过发出 queue:restart
命令优雅地重新启动所有工作者:
php artisan queue:restart
此命令将指示所有队列工作者在完成当前作业后优雅地“死亡”,以确保没有现有作业丢失。由于在执行 queue:restart
命令时队列工作者将死亡,您应运行一个进程管理器,例如 Supervisor,以自动重新启动队列工作者。
队列使用 缓存 存储重启信号,因此在使用此功能之前,您应验证应用程序已正确配置缓存驱动程序。
作业过期与超时
作业过期
在 config/queue.php
配置文件中,每个队列连接定义了一个 retry_after
选项。此选项指定队列连接在重试正在处理的作业之前应等待的秒数。例如,如果 retry_after
的值设置为 90
,则如果作业已处理 90 秒而未被删除,它将被释放回队列。通常,您应将 retry_after
值设置为作业应合理完成处理的最大秒数。
唯一不包含 retry_after
值的队列连接是 Amazon SQS。SQS 将根据 AWS 控制台中管理的 默认可见性超时 重试作业。
工作者超时
queue:work
Artisan 命令公开了一个 --timeout
选项。--timeout
选项指定 Laravel 队列主进程在杀死正在处理作业的子队列工作者之前将等待的时间。有时,子队列进程可能会因各种原因而“冻结”,例如外部 HTTP 调用未响应。--timeout
选项会删除超过指定时间限制的冻结进程:
php artisan queue:work --timeout=60
retry_after
配置选项和 --timeout
CLI 选项是不同的,但它们协同工作以确保作业不会丢失,并且作业仅成功处理一次。
--timeout
值应始终比 retry_after
配置值短几秒钟。这将确保在重试作业之前,处理给定作业的工作者始终被杀死。如果您的 --timeout
选项长于 retry_after
配置值,您的作业可能会被处理两次。
工作者休眠时间
当队列上有作业可用时,工作者将继续处理作业,而不会在它们之间延迟。然而,sleep
选项决定了如果没有新作业可用,工作者将“休眠”多长时间。在休眠期间,工作者将不会处理任何新作业 - 作业将在工作者再次醒来后处理。
php artisan queue:work --sleep=3
Supervisor 配置
安装 Supervisor
Supervisor 是 Linux 操作系统的进程监视器,如果 queue:work
进程失败,它将自动重新启动。要在 Ubuntu 上安装 Supervisor,您可以使用以下命令:
sudo apt-get install supervisor
如果自己配置 Supervisor 听起来很复杂,请考虑使用 Laravel Forge,它将自动为您的 Laravel 项目安装和配置 Supervisor。
配置 Supervisor
Supervisor 配置文件通常存储在 /etc/supervisor/conf.d
目录中。在此目录中,您可以创建任意数量的配置文件,指示 Supervisor 如何监视您的进程。例如,让我们创建一个 laravel-worker.conf
文件,启动并监视一个 queue:work
进程:
[program:laravel-worker]
process_name=%(program_name)s_%(process_num)02d
command=php /home/forge/app.com/artisan queue:work sqs --sleep=3 --tries=3
autostart=true
autorestart=true
user=forge
numprocs=8
redirect_stderr=true
stdout_logfile=/home/forge/app.com/worker.log
在此示例中,numprocs
指令将指示 Supervisor 运行 8 个 queue:work
进程并监视所有进程,如果它们失败,将自动重新启动。当然,您应更改 command
指令中的 queue:work sqs
部分,以反映您所需的队列连接。
启动 Supervisor
创建配置文件后,您可以使用以下命令更新 Supervisor 配置并启动进程:
sudo supervisorctl reread
sudo supervisorctl update
sudo supervisorctl start laravel-worker:*
有关 Supervisor 的更多信息,请查阅 Supervisor 文档。
处理失败的作业
有时您的排队作业会失败。别担心,事情并不总是按计划进行!Laravel 提供了一种方便的方法来指定作业应尝试的最大次数。在作业超过此尝试次数后,它将被插入到 failed_jobs
数据库表中。要为 failed_jobs
表创建迁移,您可以使用 queue:failed-table
命令:
php artisan queue:failed-table
php artisan migrate
然后,在运行 队列工作者 时,您应使用 queue:work
命令上的 --tries
开关指定作业应尝试的最大次数。如果您未为 --tries
选项指定值,作业将无限期尝试:
php artisan queue:work redis --tries=3
清理失败的作业
您可以直接在作业类上定义一个 failed
方法,允许您在发生故障时执行作业特定的清理。这是向用户发送警报或撤销作业执行的任何操作的理想位置。导致作业失败的 Exception
将传递给 failed
方法:
<?php
namespace App\Jobs;
use Exception;
use App\Podcast;
use App\AudioProcessor;
use Illuminate\Bus\Queueable;
use Illuminate\Queue\SerializesModels;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Contracts\Queue\ShouldQueue;
class ProcessPodcast implements ShouldQueue
{
use InteractsWithQueue, Queueable, SerializesModels;
protected $podcast;
/**
* 创建一个新的作业实例。
*
* @param Podcast $podcast
* @return void
*/
public function __construct(Podcast $podcast)
{
$this->podcast = $podcast;
}
/**
* 执行作业。
*
* @param AudioProcessor $processor
* @return void
*/
public function handle(AudioProcessor $processor)
{
// 处理上传的播客...
}
/**
* 作业处理失败。
*
* @param Exception $exception
* @return void
*/
public function failed(Exception $exception)
{
// 发送用户失败通知等...
}
}
失败作业事件
如果您希望注册一个在作业失败时调用的事件,可以使用 Queue::failing
方法。此事件是通过电子邮件或 HipChat 通知您的团队的绝佳机会。例如,我们可以从 Laravel 附带的 AppServiceProvider
中附加一个回调到此事件:
<?php
namespace App\Providers;
use Illuminate\Support\Facades\Queue;
use Illuminate\Queue\Events\JobFailed;
use Illuminate\Support\ServiceProvider;
class AppServiceProvider extends ServiceProvider
{
/**
* 启动任何应用程序服务。
*
* @return void
*/
public function boot()
{
Queue::failing(function (JobFailed $event) {
// $event->connectionName
// $event->job
// $event->exception
});
}
/**
* 注册服务提供者。
*
* @return void
*/
public function register()
{
//
}
}
重试失败的作业
要查看已插入 failed_jobs
数据库表中的所有失败作业,您可以使用 queue:failed
Artisan 命令:
php artisan queue:failed
queue:failed
命令将列出作业 ID、连接、队列和失败时间。作业 ID 可用于重试失败的作业。例如,要重试 ID 为 5
的失败作业,请发出以下命令:
php artisan queue:retry 5
要重试所有失败的作业,请执行 queue:retry
命令并传递 all
作为 ID:
php artisan queue:retry all
如果您希望删除失败的作业,可以使用 queue:forget
命令:
php artisan queue:forget 5
要删除所有失败的作业,可以使用 queue:flush
命令:
php artisan queue:flush
作业事件
使用 Queue
facade 上的 before
和 after
方法,您可以指定在排队作业处理之前或之后执行的回调。这些回调是执行额外日志记录或为仪表板增加统计数据的绝佳机会。通常,您应从 服务提供者 调用这些方法。例如,我们可以使用 Laravel 附带的 AppServiceProvider
:
<?php
namespace App\Providers;
use Illuminate\Support\Facades\Queue;
use Illuminate\Support\ServiceProvider;
use Illuminate\Queue\Events\JobProcessed;
use Illuminate\Queue\Events\JobProcessing;
class AppServiceProvider extends ServiceProvider
{
/**
* 启动任何应用程序服务。
*
* @return void
*/
public function boot()
{
Queue::before(function (JobProcessing $event) {
// $event->connectionName
// $event->job
// $event->job->payload()
});
Queue::after(function (JobProcessed $event) {
// $event->connectionName
// $event->job
// $event->job->payload()
});
}
/**
* 注册服务提供者。
*
* @return void
*/
public function register()
{
//
}
}
使用 Queue
facade 上的 looping
方法,您可以指定在工作者尝试从队列中获取作业之前执行的回调。例如,您可能会注册一个闭包,以回滚任何由先前失败的作业留下的未完成事务:
Queue::looping(function () {
while (DB::transactionLevel() > 0) {
DB::rollBack();
}
});