作为Write for DOnations计划的一部分,作者选择了“编码女孩”来接受捐赠。
介绍
在 Linux 中,您可以使用多功能的crontab 工具在特定时间在后台处理长时间运行的任务。虽然守护进程非常适合运行重复性任务,但它有一个限制:您只能以 1 分钟的最小时间间隔执行任务。
但是,在许多应用程序中,为了避免糟糕的用户体验,最好让作业更频繁地执行。例如,如果您使用作业队列模型在您的网站上安排文件处理任务,那么长时间的等待将对最终用户产生负面影响。
另一种情况是应用程序使用作业队列模型在客户完成应用程序中的特定任务(例如,向收件人汇款)后向客户发送文本消息或电子邮件。如果用户在发送确认消息之前必须等待一分钟,他们可能会认为交易失败并尝试重复相同的交易。
为了克服这些挑战,您可以编写一个 PHP 脚本,该脚本在等待 crontab 守护程序在一分钟后再次调用它时,循环并重复处理 60 秒的任务。一旦 PHP 脚本被 crontab 守护程序第一次调用,它就可以在与您的应用程序逻辑匹配的时间段内执行任务,而无需让用户等待。
在本指南中,您将cron_jobs
在 Ubuntu 20.04 服务器上创建示例数据库。然后,您将设置一个tasks
表和一个脚本,该脚本使用 PHPwhile(...){...}
循环和sleep()
函数以 5 秒的间隔执行表中的作业。
先决条件
要完成本教程,您需要具备以下条件:
-
使用非 root 用户设置的 Ubuntu 20.04 服务器。按照我们的Ubuntu 20.04 初始服务器设置指南进行操作。
-
在您的服务器上设置的 LAMP 堆栈。请参阅如何在 Ubuntu 20.04 上安装 Linux、Apache、MySQL、PHP (LAMP) 堆栈指南。对于本教程,您可以跳过步骤 4 — 为您的网站创建虚拟主机。
步骤 1 — 设置数据库
在此步骤中,您将创建示例数据库和表。首先,SSH
到您的服务器并以 root 身份登录 MySQL:
- sudo mysql -u root -p
输入 MySQL 服务器的 root 密码,然后按ENTER
继续。然后,运行以下命令创建cron_jobs
数据库。
- CREATE DATABASE cron_jobs;
为数据库创建一个非 root 用户。您需要此用户的凭据才能cron_jobs
从 PHP连接到数据库。请记住EXAMPLE_PASSWORD
用强值替换:
- CREATE USER 'cron_jobs_user'@'localhost' IDENTIFIED WITH mysql_native_password BY 'EXAMPLE_PASSWORD';
- GRANT ALL PRIVILEGES ON cron_jobs.* TO 'cron_jobs_user'@'localhost';
- FLUSH PRIVILEGES;
接下来,切换到cron_jobs
数据库:
- USE cron_jobs;
OutputDatabase changed
选择数据库后,创建一个tasks
表。在此表中,您将插入一些将由 cron 作业自动执行的任务。由于运行 cron 作业的最小时间间隔是1
分钟,您稍后将编写一个 PHP 脚本来覆盖此设置,而是以 5 秒的间隔执行作业。
现在,创建您的tasks
表:
- CREATE TABLE tasks (
- task_id BIGINT NOT NULL AUTO_INCREMENT PRIMARY KEY,
- task_name VARCHAR(50),
- queued_at DATETIME,
- completed_at DATETIME,
- is_processed CHAR(1)
- ) ENGINE = InnoDB;
将三个记录插入到任务表中。使用列中的MySQLNOW()
函数queued_at
记录任务排队时的当前日期和时间。同样对于completed_at
列,使用 MySQLCURDATE()
函数设置默认时间为00:00:00
. 稍后,随着任务完成,您的脚本将更新此列:
- INSERT INTO tasks (task_name, queued_at, completed_at, is_processed) VALUES ('TASK 1', NOW(), CURDATE(), 'N');
- INSERT INTO tasks (task_name, queued_at, completed_at, is_processed) VALUES ('TASK 2', NOW(), CURDATE(), 'N');
- INSERT INTO tasks (task_name, queued_at, completed_at, is_processed) VALUES ('TASK 3', NOW(), CURDATE(), 'N');
运行每个INSERT
命令后确认输出:
OutputQuery OK, 1 row affected (0.00 sec)
...
通过SELECT
对tasks
表运行语句来确保数据到位:
- SELECT task_id, task_name, queued_at, completed_at, is_processed FROM tasks;
您将找到所有任务的列表:
Output+---------+-----------+---------------------+---------------------+--------------+
| task_id | task_name | queued_at | completed_at | is_processed |
+---------+-----------+---------------------+---------------------+--------------+
| 1 | TASK 1 | 2021-03-06 06:27:19 | 2021-03-06 00:00:00 | N |
| 2 | TASK 2 | 2021-03-06 06:27:28 | 2021-03-06 00:00:00 | N |
| 3 | TASK 3 | 2021-03-06 06:27:36 | 2021-03-06 00:00:00 | N |
+---------+-----------+---------------------+---------------------+--------------+
3 rows in set (0.00 sec)
该completed_at
列的时间设置为00:00:00
,一旦您接下来将创建的 PHP 脚本处理任务,该列就会更新。
从 MySQL 命令行界面退出:
- QUIT;
OutputBye
您的cron_jobs
数据库和tasks
表现已就绪,您现在可以创建处理作业的 PHP 脚本。
第 2 步 – 创建一个 PHP 脚本,在 5 秒后运行任务
在此步骤中,您将创建一个脚本,该脚本使用 PHPwhile(...){...}
循环和sleep
函数的组合在每 5 秒后运行一次任务。
/var/www/html/tasks.php
使用 nano 在 Web 服务器的根目录中打开一个新文件:
- sudo nano /var/www/html/tasks.php
接下来,try {
在<?php
标记后创建一个新块并声明您在步骤 1 中创建的数据库变量。 请记住替换EXAMPLE_PASSWORD
为您的数据库用户的实际密码:
<?php
try {
$db_name = 'cron_jobs';
$db_user = 'cron_jobs_user';
$db_password = 'EXAMPLE_PASSWORD';
$db_host = 'localhost';
接下来,声明一个新的 PDO(PHP 数据对象)类并设置属性ERRMODE_EXCEPTION
以捕获任何 PDO 错误。此外,切换ATTR_EMULATE_PREPARES
到false
让本机 MySQL 数据库引擎处理仿真。准备好的语句允许您分别发送 SQL 查询和数据以增强安全性并减少 SQL 注入攻击的机会:
$pdo = new PDO('mysql:host=' . $db_host . '; dbname=' . $db_name, $db_user, $db_password);
$pdo->setAttribute(PDO::ATTR_ERRMODE, PDO::ERRMODE_EXCEPTION);
$pdo->setAttribute(PDO::ATTR_EMULATE_PREPARES, false);
然后,创建一个名为的新变量$loop_expiry_time
并将其设置为当前时间加 60 秒。然后打开一个新的 PHPwhile(time() < $loop_expiry_time) {
语句。这里的想法是创建一个循环,直到当前时间 ( time()
) 与变量匹配$loop_expiry_time
:
$loop_expiry_time = time() + 60;
while (time() < $loop_expiry_time) {
接下来,声明一个准备好的 SQL 语句,从tasks
表中检索未处理的作业:
$data = [];
$sql = "select
task_id
from tasks
where is_processed = :is_processed
";
执行 SQL 命令并从tasks
表中获取列is_processed
设置为 的所有行N
。这意味着不处理行:
$data['is_processed'] = 'N';
$stmt = $pdo->prepare($sql);
$stmt->execute($data);
接下来,使用 PHPwhile ($row = $stmt->fetch(PDO::FETCH_ASSOC)) {...}
语句遍历检索到的行并创建另一个 SQL 语句。这一次,SQL 命令为每个处理的任务更新is_processed
和completed_at
列。这可确保您不会多次处理任务:
while ($row = $stmt->fetch(PDO::FETCH_ASSOC)) {
$data_update = [];
$sql_update = "update tasks set
is_processed = :is_processed,
completed_at = :completed_at
where task_id = :task_id
";
$data_update = [
'is_processed' => 'Y',
'completed_at' => date("Y-m-d H:i:s"),
'task_id' => $row['task_id']
];
$stmt = $pdo->prepare($sql_update);
$stmt->execute($data_update);
}
注意:如果您有一个大队列要处理(例如,每秒 100,000 条记录),您可能会考虑在Redis 服务器中排队作业,因为在实现作业队列模型时,它比 MySQL 快。尽管如此,本指南将处理较小的数据集。
在关闭第一个 PHPwhile (time() < $loop_expiry_time) {
语句之前,包含一个sleep(5);
语句来暂停作业执行 5 秒并释放您的服务器资源。
您可以根据您的业务逻辑以及您希望任务执行的速度来更改 5 秒时间段。例如,如果您希望任务在一分钟内处理 3 次,请将此值设置为 20 秒。
请记住块catch
内的任何 PDO 错误消息} catch (PDOException $ex) { echo $ex->getMessage(); }
:
sleep(5);
}
} catch (PDOException $ex) {
echo $ex->getMessage();
}
您的完整tasks.php
文件如下:
<?php
try {
$db_name = 'cron_jobs';
$db_user = 'cron_jobs_user';
$db_password = 'EXAMPLE_PASSWORD';
$db_host = 'localhost';
$pdo = new PDO('mysql:host=' . $db_host . '; dbname=' . $db_name, $db_user, $db_password);
$pdo->setAttribute(PDO::ATTR_ERRMODE, PDO::ERRMODE_EXCEPTION);
$pdo->setAttribute(PDO::ATTR_EMULATE_PREPARES, false);
$loop_expiry_time = time() + 60;
while (time() < $loop_expiry_time) {
$data = [];
$sql = "select
task_id
from tasks
where is_processed = :is_processed
";
$data['is_processed'] = 'N';
$stmt = $pdo->prepare($sql);
$stmt->execute($data);
while ($row = $stmt->fetch(PDO::FETCH_ASSOC)) {
$data_update = [];
$sql_update = "update tasks set
is_processed = :is_processed,
completed_at = :completed_at
where task_id = :task_id
";
$data_update = [
'is_processed' => 'Y',
'completed_at' => date("Y-m-d H:i:s"),
'task_id' => $row['task_id']
];
$stmt = $pdo->prepare($sql_update);
$stmt->execute($data_update);
}
sleep(5);
}
} catch (PDOException $ex) {
echo $ex->getMessage();
}
按CTRL
+保存文件X
,Y
然后按ENTER
。
完成/var/www/html/tasks.php
文件中的逻辑编码后,您将在下一步中安排 crontab 守护程序每 1 分钟执行一次文件。
步骤 3 — 安排 PHP 脚本在 1 分钟后运行
在 Linux 中,您可以通过在 crontab 文件中输入命令来安排作业在规定的时间后自动运行。在这一步中,您将指示 crontab 守护程序/var/www/html/tasks.php
每分钟运行一次您的脚本。因此,/etc/crontab
使用 nano打开文件:
- sudo nano /etc/crontab
然后在文件末尾添加以下内容以http://localhost/tasks.php
每1
分钟执行一次:
...
* * * * * root /usr/bin/wget -O - http://localhost/tasks.php
保存并关闭文件。
本指南假设您对 cron 作业的工作原理有基本的了解。考虑阅读我们关于如何在 Ubuntu 上使用 Cron 自动化任务的指南。
如前所述,尽管 cron 守护进程tasks.php
每 1 分钟运行一次文件,但一旦第一次执行该文件,它将在打开的任务中再循环 60 秒。到循环时间到期时,cron 守护程序将再次执行该文件并且该进程将继续。
更新并关闭/etc/crontab
文件后,crontab 守护程序应立即开始执行您插入tasks
表中的 MySQL 任务。为了确认一切是否按预期工作,cron_jobs
接下来您将查询您的数据库。
第 4 步 – 确认作业执行
在此步骤中,您将再次打开数据库以检查tasks.php
文件是否在由 crontab 自动执行时正在处理排队的作业。
以 root 用户身份重新登录 MySQL 服务器:
- sudo mysql -u root -p
然后,输入您的 MySQL 服务器的 root 密码并点击ENTER
继续。然后,切换到数据库:
- USE cron_jobs;
OutputDatabase changed
SELECT
对tasks
表运行一条语句:
- SELECT task_id, task_name, queued_at, completed_at, is_processed FROM tasks;
您将收到类似于以下内容的输出。在completed_at
列中,任务以5
秒为间隔进行了处理。此外,任务已标记为已完成,因为该is_processed
列现在设置为Y
,这意味着YES
。
Output
+---------+-----------+---------------------+---------------------+--------------+
| task_id | task_name | queued_at | completed_at | is_processed |
+---------+-----------+---------------------+---------------------+--------------+
| 1 | TASK 1 | 2021-03-06 06:27:19 | 2021-03-06 06:30:01 | Y |
| 2 | TASK 2 | 2021-03-06 06:27:28 | 2021-03-06 06:30:06 | Y |
| 3 | TASK 3 | 2021-03-06 06:27:36 | 2021-03-06 06:30:11 | Y |
+---------+-----------+---------------------+---------------------+--------------+
3 rows in set (0.00 sec)
这确认您的 PHP 脚本按预期工作;通过覆盖 crontab 守护程序设置的 1 分钟时间段的限制,您可以在更短的时间间隔内运行任务。
结论
在本指南中,您已经在 Ubuntu 20.04 服务器上设置了一个示例数据库。然后,您在表中创建了作业并使用 PHPwhile(...){...}
循环和sleep()
函数以 5 秒的间隔运行它们。当您下次实现基于作业队列的应用程序时,请使用本教程中的逻辑,其中任务需要在 1 分钟的时间段内运行多次。
如需更多 PHP 教程,请查看我们的PHP 主题页面。