This commit is contained in:
2026-07-01 17:03:59 +08:00
parent ad7a958662
commit 847ffbbef6
9 changed files with 223 additions and 88 deletions
+4 -5
View File
@@ -14,11 +14,10 @@
"ext-swoole": "*", "ext-swoole": "*",
"ext-redis": "*", "ext-redis": "*",
"psr/log": "^1.0", "psr/log": "^1.0",
"psr/event-dispatcher": "^1.0" "psr/event-dispatcher": "^1.0",
}, "symfony/console": "^v8.0",
"suggest": { "game-worker/kiri-core": "^v1.0",
"symfony/console": "如需集成 kiri-core 或使用命令行管理,建议安装 ^v8.0", "game-worker/kiri-http-server": "^v1.0"
"game-worker/kiri-core": "如需集成到 kiri-core 框架,建议安装 kiri-core"
}, },
"autoload": { "autoload": {
"psr-4": { "psr-4": {
+3 -3
View File
@@ -16,8 +16,8 @@ return [
// 调度器配置 // 调度器配置
'scheduler' => [ 'scheduler' => [
// 调度器 tick 间隔 (秒) // 调度器 tick 间隔 (秒,支持小数;毫秒级任务建议 0.05~0.1)
'tick_interval' => (int)env('CRONTAB_TICK_INTERVAL', 1), 'tick_interval' => (float)env('CRONTAB_TICK_INTERVAL', 0.1),
// 任务执行超时时间 (秒) // 任务执行超时时间 (秒)
'task_timeout' => (int)env('CRONTAB_TASK_TIMEOUT', 300), 'task_timeout' => (int)env('CRONTAB_TASK_TIMEOUT', 300),
// 主锁 TTL (秒) // 主锁 TTL (秒)
@@ -36,7 +36,7 @@ return [
// 注册的任务列表 (配置模式,expression 字符串格式) // 注册的任务列表 (配置模式,expression 字符串格式)
// 注解模式通过 #[Crontab] 在 handle() 方法上声明,kiri-core Scanner 自动发现 // 注解模式通过 #[Crontab] 在 handle() 方法上声明,kiri-core Scanner 自动发现
// 表达式: every:60 | every:5m | every:1h | daily:03:00 | hourly:30 | cron:*\/5 * * * * | at:时间戳 // 表达式: every:100ms | every:60 | every:5m | every:1h | daily:03:00 | hourly:30 | cron:*\/5 * * * * | at:时间戳
// 每个任务需实现 TaskInterface 接口 // 每个任务需实现 TaskInterface 接口
'tasks' => [ 'tasks' => [
// 示例: // 示例:
+103 -53
View File
@@ -14,40 +14,70 @@ namespace Kiri\Crontab;
* N-M 范围匹配 * N-M 范围匹配
* *
* 自定义表达式: * 自定义表达式:
* every:{毫秒}ms
* every:{秒} * every:{秒}
* every:{秒}s * every:{秒}s
* every:{分}m * every:{分}m
* every:{时}h * every:{时}h
* daily:{HH:MM} * daily:{HH:MM}
* hourly:{MM} * hourly:{MM}
* at:{时间戳} * at:{时间戳秒|时间戳毫秒|时间戳ms}
*/ */
class CronExpression class CronExpression
{ {
/** /**
* 计算给定时间戳之后的下次执行时间 * 计算给定秒级时间戳之后的下次执行时间
*
* 保留秒级返回值用于兼容旧调用;调度器应使用 getNextRunTimeMs() 获得毫秒级时间。
* *
* @param string $expression 调度表达式 * @param string $expression 调度表达式
* @param int $afterTimestamp 起始时间戳 (不含) * @param int $afterTimestamp 起始秒级时间戳 (不含)
* @return int 下次执行时间戳,一次性任务过期返回 0 * @return int 下次执行秒级时间戳,一次性任务过期返回 0
*/ */
public function getNextRunTime(string $expression, int $afterTimestamp = 0): int public function getNextRunTime(string $expression, int $afterTimestamp = 0): int
{ {
if ($afterTimestamp <= 0) { $afterTimestampMs = $afterTimestamp > 0 ? $afterTimestamp * 1000 : self::currentTimeMs();
$afterTimestamp = time(); $nextRunMs = $this->getNextRunTimeMs($expression, $afterTimestampMs);
if ($nextRunMs <= 0) {
return 0;
}
return intdiv($nextRunMs, 1000);
}
/**
* 计算给定毫秒级时间戳之后的下次执行时间。
*
* @param string $expression 调度表达式
* @param int $afterTimestampMs 起始毫秒级时间戳 (不含)
* @return int 下次执行毫秒级时间戳,一次性任务过期返回 0
*/
public function getNextRunTimeMs(string $expression, int $afterTimestampMs = 0): int
{
if ($afterTimestampMs <= 0) {
$afterTimestampMs = self::currentTimeMs();
} }
return match (true) { return match (true) {
str_starts_with($expression, 'every:') => $this->parseEvery($expression, $afterTimestamp), str_starts_with($expression, 'every:') => $this->parseEveryMs($expression, $afterTimestampMs),
str_starts_with($expression, 'daily:') => $this->parseDaily($expression, $afterTimestamp), str_starts_with($expression, 'daily:') => $this->parseDailyMs($expression, $afterTimestampMs),
str_starts_with($expression, 'hourly:') => $this->parseHourly($expression, $afterTimestamp), str_starts_with($expression, 'hourly:') => $this->parseHourlyMs($expression, $afterTimestampMs),
str_starts_with($expression, 'at:') => $this->parseAt($expression, $afterTimestamp), str_starts_with($expression, 'at:') => $this->parseAtMs($expression, $afterTimestampMs),
str_starts_with($expression, 'cron:') => $this->parseCron(substr($expression, 5), $afterTimestamp), str_starts_with($expression, 'cron:') => $this->parseCronMs(substr($expression, 5), $afterTimestampMs),
default => $this->parseCron($expression, $afterTimestamp), default => $this->parseCronMs($expression, $afterTimestampMs),
}; };
} }
/**
* 获取当前毫秒级 Unix 时间戳。
*/
public static function currentTimeMs(): int
{
return (int)floor(microtime(true) * 1000);
}
/** /**
* 获取表达式的可读间隔描述 * 获取表达式的可读间隔描述
*/ */
@@ -72,22 +102,39 @@ class CronExpression
} }
/** /**
* 解析 every:{N}[s|m|h] 表达式 * 解析 every:{N}[ms|s|m|h] 表达式,返回毫秒级下次执行时间。
*/ */
private function parseEvery(string $expression, int $afterTimestamp): int private function parseEveryMs(string $expression, int $afterTimestampMs): int
{ {
$value = substr($expression, 6); $intervalMs = $this->parseIntervalMs(substr($expression, 6));
if ($intervalMs <= 0) {
return 0;
}
$seconds = match (true) { return $afterTimestampMs + $intervalMs;
str_ends_with($value, 's') => (int)$value, }
str_ends_with($value, 'm') => (int)$value * 60,
str_ends_with($value, 'h') => (int)$value * 3600,
default => (int)$value,
};
// 从上次调度时间累加,避免时间漂移和积压 /**
// 如果 afterTimestamp 距离上次调度超过 n 个周期,只加一个周期(按第一次调度时间对齐) * 将 every 值解析成毫秒;无单位保持兼容,表示秒。
return $afterTimestamp + $seconds; */
private function parseIntervalMs(string $value): int
{
$value = trim($value);
if (str_ends_with($value, 'ms')) {
return max(0, (int)substr($value, 0, -2));
}
if (str_ends_with($value, 's')) {
return max(0, (int)round((float)substr($value, 0, -1) * 1000));
}
if (str_ends_with($value, 'm')) {
return max(0, (int)round((float)substr($value, 0, -1) * 60000));
}
if (str_ends_with($value, 'h')) {
return max(0, (int)round((float)substr($value, 0, -1) * 3600000));
}
return max(0, (int)round((float)$value * 1000));
} }
/** /**
@@ -97,17 +144,18 @@ class CronExpression
{ {
$value = substr($expression, 6); $value = substr($expression, 6);
return match (true) { return match (true) {
str_ends_with($value, 's') => '每 ' . ((int)$value) . ' 秒', str_ends_with($value, 'ms') => '每 ' . ((int)$value) . ' 秒',
str_ends_with($value, 'm') => '每 ' . ((int)$value) . ' 分钟', str_ends_with($value, 's') => '每 ' . ((int)$value) . ' ',
str_ends_with($value, 'h') => '每 ' . ((int)$value) . ' 小时', str_ends_with($value, 'm') => '每 ' . ((int)$value) . ' 分钟',
default => '每 ' . ((int)$value) . ' ', str_ends_with($value, 'h') => '每 ' . ((int)$value) . ' 小时',
default => '每 ' . ((int)$value) . ' 秒',
}; };
} }
/** /**
* 解析 daily:{HH:MM} 表达式 * 解析 daily:{HH:MM} 表达式
*/ */
private function parseDaily(string $expression, int $afterTimestamp): int private function parseDailyMs(string $expression, int $afterTimestampMs): int
{ {
$timeStr = substr($expression, 6); $timeStr = substr($expression, 6);
$parts = explode(':', $timeStr); $parts = explode(':', $timeStr);
@@ -115,54 +163,61 @@ class CronExpression
$minute = (int)($parts[1] ?? 0); $minute = (int)($parts[1] ?? 0);
$second = (int)($parts[2] ?? 0); $second = (int)($parts[2] ?? 0);
$afterTimestamp = intdiv($afterTimestampMs, 1000);
$currentDate = getdate($afterTimestamp); $currentDate = getdate($afterTimestamp);
$targetTime = mktime($hour, $minute, $second, $currentDate['mon'], $currentDate['mday'], $currentDate['year']); $targetTime = mktime($hour, $minute, $second, $currentDate['mon'], $currentDate['mday'], $currentDate['year']);
if ($targetTime <= $afterTimestamp) { if ($targetTime * 1000 <= $afterTimestampMs) {
// 今天的时间已过,推到明天 $targetTime += 86400;
$targetTime = $targetTime + 86400;
} }
return $targetTime; return $targetTime * 1000;
} }
/** /**
* 解析 hourly:{MM} 表达式 * 解析 hourly:{MM} 表达式
*/ */
private function parseHourly(string $expression, int $afterTimestamp): int private function parseHourlyMs(string $expression, int $afterTimestampMs): int
{ {
$minute = (int)substr($expression, 7); $minute = (int)substr($expression, 7);
$afterTimestamp = intdiv($afterTimestampMs, 1000);
$currentDate = getdate($afterTimestamp); $currentDate = getdate($afterTimestamp);
$targetTime = mktime($currentDate['hours'], $minute, 0, $currentDate['mon'], $currentDate['mday'], $currentDate['year']); $targetTime = mktime($currentDate['hours'], $minute, 0, $currentDate['mon'], $currentDate['mday'], $currentDate['year']);
if ($targetTime <= $afterTimestamp) { if ($targetTime * 1000 <= $afterTimestampMs) {
$targetTime = $targetTime + 3600; $targetTime += 3600;
} }
return $targetTime; return $targetTime * 1000;
} }
/** /**
* 解析 at:{时间戳} 表达式 — 一次性任务 * 解析 at:{时间戳} 表达式 — 一次性任务
*/ */
private function parseAt(string $expression, int $afterTimestamp): int private function parseAtMs(string $expression, int $afterTimestampMs): int
{ {
$timestamp = (int)substr($expression, 3); $value = trim(substr($expression, 3));
if ($timestamp <= $afterTimestamp) { $timestampMs = str_ends_with($value, 'ms')
// 已过期,返回 0 表示不再调度 ? (int)substr($value, 0, -2)
: (int)$value;
if ($timestampMs > 0 && $timestampMs < 1000000000000) {
$timestampMs *= 1000;
}
if ($timestampMs <= $afterTimestampMs) {
return 0; return 0;
} }
return $timestamp; return $timestampMs;
} }
/** /**
* 解析标准 5 字段 cron 表达式: 分 时 日 月 周 * 解析标准 5 字段 cron 表达式: 分 时 日 月 周
*/ */
private function parseCron(string $cronExpression, int $afterTimestamp): int private function parseCronMs(string $cronExpression, int $afterTimestampMs): int
{ {
$fields = preg_split('/\s+/', trim($cronExpression)); $fields = preg_split('/\s+/', trim($cronExpression));
if (count($fields) !== 5) { if (count($fields) !== 5) {
// 格式无效,返回 0
return 0; return 0;
} }
@@ -172,8 +227,8 @@ class CronExpression
$month = $fields[3]; $month = $fields[3];
$weekday = $fields[4]; $weekday = $fields[4];
// 从 afterTimestamp 下一秒开始逐分钟搜索,最多搜索 2 年 $afterTimestamp = intdiv($afterTimestampMs, 1000);
$searchStart = $afterTimestamp + 60; $searchStart = ((int)floor($afterTimestamp / 60) + 1) * 60;
$searchEnd = $afterTimestamp + 365 * 2 * 86400; $searchEnd = $afterTimestamp + 365 * 2 * 86400;
for ($ts = $searchStart; $ts <= $searchEnd; $ts += 60) { for ($ts = $searchStart; $ts <= $searchEnd; $ts += 60) {
@@ -187,7 +242,7 @@ class CronExpression
$matched = $matched && $this->matchCronField($weekday, $t['wday'], 0, 6); $matched = $matched && $this->matchCronField($weekday, $t['wday'], 0, 6);
if ($matched) { if ($matched) {
return $ts; return $ts * 1000;
} }
} }
@@ -204,12 +259,10 @@ class CronExpression
*/ */
private function matchCronField(string $fieldValue, int $current, int $min, int $max): bool private function matchCronField(string $fieldValue, int $current, int $min, int $max): bool
{ {
// * 匹配所有值
if ($fieldValue === '*') { if ($fieldValue === '*') {
return true; return true;
} }
// *\/N 每隔 N 步进
if (str_starts_with($fieldValue, '*/')) { if (str_starts_with($fieldValue, '*/')) {
$step = (int)substr($fieldValue, 2); $step = (int)substr($fieldValue, 2);
if ($step <= 0) { if ($step <= 0) {
@@ -218,7 +271,6 @@ class CronExpression
return ($current - $min) % $step === 0; return ($current - $min) % $step === 0;
} }
// 逗号分隔的枚举值
if (str_contains($fieldValue, ',')) { if (str_contains($fieldValue, ',')) {
$values = explode(',', $fieldValue); $values = explode(',', $fieldValue);
foreach ($values as $val) { foreach ($values as $val) {
@@ -229,7 +281,6 @@ class CronExpression
return false; return false;
} }
// N-M 范围值
if (str_contains($fieldValue, '-')) { if (str_contains($fieldValue, '-')) {
$parts = explode('-', $fieldValue); $parts = explode('-', $fieldValue);
$rangeStart = (int)$parts[0]; $rangeStart = (int)$parts[0];
@@ -237,7 +288,6 @@ class CronExpression
return $current >= $rangeStart && $current <= $rangeEnd; return $current >= $rangeStart && $current <= $rangeEnd;
} }
// 精确值
return (int)$fieldValue === $current; return (int)$fieldValue === $current;
} }
+1 -1
View File
@@ -183,7 +183,7 @@ class CrontabProcess extends AbstractProcess
$redis = $this->createRedisConnection(); $redis = $this->createRedisConnection();
$cronExpression = new CronExpression; $cronExpression = new CronExpression;
$schedulerConfig = $this->config['scheduler'] ?? []; $schedulerConfig = $this->config['scheduler'] ?? [];
$this->scheduler = new CrontabScheduler(redis: $redis, cronExpression: $cronExpression, logger: $this->logger, tickInterval: (int)($schedulerConfig['tick_interval'] ?? 1), taskTimeout: (int)($schedulerConfig['task_timeout'] ?? 300), lockTtl: (int)($schedulerConfig['lock_ttl'] ?? 60), lockRenewInterval: (int)($schedulerConfig['lock_renew_interval'] ?? 15), concurrentTasks: (bool)($schedulerConfig['concurrent_tasks'] ?? true), maxConcurrent: (int)($schedulerConfig['max_concurrent'] ?? 10),); $this->scheduler = new CrontabScheduler(redis: $redis, cronExpression: $cronExpression, logger: $this->logger, tickInterval: (float)($schedulerConfig['tick_interval'] ?? 0.1), taskTimeout: (int)($schedulerConfig['task_timeout'] ?? 300), lockTtl: (int)($schedulerConfig['lock_ttl'] ?? 60), lockRenewInterval: (int)($schedulerConfig['lock_renew_interval'] ?? 15), concurrentTasks: (bool)($schedulerConfig['concurrent_tasks'] ?? true), maxConcurrent: (int)($schedulerConfig['max_concurrent'] ?? 10),);
$this->scheduler->start(); $this->scheduler->start();
} }
+82 -22
View File
@@ -46,7 +46,7 @@ class CrontabScheduler extends Component
private const RUNNING_SET_KEY = 'crontab:running'; private const RUNNING_SET_KEY = 'crontab:running';
/** @var int 默认 tick 间隔 (秒) */ /** @var int 默认 tick 间隔 (秒) */
private const DEFAULT_TICK_INTERVAL = 1; private const DEFAULT_TICK_INTERVAL = 0.1;
/** @var int 默认任务执行超时 (秒) */ /** @var int 默认任务执行超时 (秒) */
private const DEFAULT_TASK_TIMEOUT = 300; private const DEFAULT_TASK_TIMEOUT = 300;
@@ -68,11 +68,14 @@ class CrontabScheduler extends Component
/** @var string|null 非协程环境下当前任务 key 回退存储 */ /** @var string|null 非协程环境下当前任务 key 回退存储 */
private ?string $fallbackCurrentTaskKey = null; private ?string $fallbackCurrentTaskKey = null;
/** @var array<string, string> 当前进程持有的任务锁 token */
private array $taskLockTokens = [];
/** /**
* @param \Redis $redis Redis 客户端 * @param \Redis $redis Redis 客户端
* @param CronExpression $cronExpression Cron 表达式解析器 * @param CronExpression $cronExpression Cron 表达式解析器
* @param LoggerInterface|null $logger 日志记录器(容器不可用时的兜底) * @param LoggerInterface|null $logger 日志记录器(容器不可用时的兜底)
* @param int $tickInterval tick 间隔 (秒) * @param float $tickInterval tick 间隔 (秒,支持小数)
* @param int $taskTimeout 任务执行超时 (秒) * @param int $taskTimeout 任务执行超时 (秒)
* @param int $lockTtl 主锁 TTL (秒) * @param int $lockTtl 主锁 TTL (秒)
* @param int $lockRenewInterval 主锁续期间隔 (秒) * @param int $lockRenewInterval 主锁续期间隔 (秒)
@@ -83,7 +86,7 @@ class CrontabScheduler extends Component
private \Redis $redis, private \Redis $redis,
private CronExpression $cronExpression, private CronExpression $cronExpression,
?LoggerInterface $logger = null, ?LoggerInterface $logger = null,
private int $tickInterval = self::DEFAULT_TICK_INTERVAL, private float $tickInterval = self::DEFAULT_TICK_INTERVAL,
private int $taskTimeout = self::DEFAULT_TASK_TIMEOUT, private int $taskTimeout = self::DEFAULT_TASK_TIMEOUT,
private int $lockTtl = self::DEFAULT_LOCK_TTL, private int $lockTtl = self::DEFAULT_LOCK_TTL,
private int $lockRenewInterval = 15, private int $lockRenewInterval = 15,
@@ -221,7 +224,11 @@ class CrontabScheduler extends Component
$this->removeFromQueue($taskKey); $this->removeFromQueue($taskKey);
$this->redis->del($hashKey); $this->redis->del($hashKey);
$this->redis->del($this->getTaskLockKey($taskKey)); $this->clearRemovalFlag($taskKey);
if (!$this->isTaskRunning($taskKey)) {
$this->redis->del($this->getTaskLockKey($taskKey));
}
$this->logInfo("[CrontabScheduler] 任务已取消: {$taskKey}"); $this->logInfo("[CrontabScheduler] 任务已取消: {$taskKey}");
@@ -326,7 +333,9 @@ class CrontabScheduler extends Component
$taskKey = substr($hashKey, strlen(self::KEY_PREFIX . ':task:')); $taskKey = substr($hashKey, strlen(self::KEY_PREFIX . ':task:'));
$this->removeFromQueue($taskKey); $this->removeFromQueue($taskKey);
$this->redis->del($hashKey); $this->redis->del($hashKey);
$this->redis->del($this->getTaskLockKey($taskKey)); if (!$this->isTaskRunning($taskKey)) {
$this->redis->del($this->getTaskLockKey($taskKey));
}
$this->redis->del('crontab:removal:' . $taskKey); $this->redis->del('crontab:removal:' . $taskKey);
$removedCount++; $removedCount++;
} }
@@ -344,7 +353,7 @@ class CrontabScheduler extends Component
foreach (TaskRegistry::all() as $taskKey => $taskConfig) { foreach (TaskRegistry::all() as $taskKey => $taskConfig) {
$hashKey = $this->getTaskHashKey($taskKey); $hashKey = $this->getTaskHashKey($taskKey);
$nextRun = $this->cronExpression->getNextRunTime($taskConfig->expression, time() - 1); $nextRun = $this->cronExpression->getNextRunTimeMs($taskConfig->expression, $this->currentTimeMs() - 1);
$interval = $this->cronExpression->getIntervalDescription($taskConfig->expression); $interval = $this->cronExpression->getIntervalDescription($taskConfig->expression);
$taskConfig->nextRun = $nextRun; $taskConfig->nextRun = $nextRun;
@@ -388,7 +397,7 @@ class CrontabScheduler extends Component
} }
$config = TaskConfig::fromHash($taskKey, $hash); $config = TaskConfig::fromHash($taskKey, $hash);
$nextRun = $this->cronExpression->getNextRunTime($config->expression, time() - 1); $nextRun = $this->cronExpression->getNextRunTimeMs($config->expression, $this->currentTimeMs() - 1);
$this->redis->hMSet($hashKey, [ $this->redis->hMSet($hashKey, [
'status' => 'active', 'status' => 'active',
@@ -462,7 +471,7 @@ class CrontabScheduler extends Component
*/ */
private function processDueTasks(): void private function processDueTasks(): void
{ {
$now = time(); $now = $this->currentTimeMs();
$dueTaskKeys = $this->redis->zRangeByScore( $dueTaskKeys = $this->redis->zRangeByScore(
self::QUEUE_KEY, self::QUEUE_KEY,
@@ -602,15 +611,15 @@ class CrontabScheduler extends Component
} finally { } finally {
$this->redis->sRem(self::RUNNING_SET_KEY, $taskKey); $this->redis->sRem(self::RUNNING_SET_KEY, $taskKey);
// 检查是否被标记为"执行后移除" // 任务执行期间可能被外部取消或配置同步删除,不允许执行结束后重新写回。
$shouldRemove = $this->isTaskMarkedForRemoval($taskKey); $taskExists = (bool)$this->redis->exists($hashKey);
$shouldRemove = !$taskExists || $this->isTaskMarkedForRemoval($taskKey);
if ($shouldRemove) { if ($shouldRemove) {
$this->removeFromQueue($taskKey); $this->removeFromQueue($taskKey);
$this->redis->del($hashKey); $this->redis->del($hashKey);
$this->redis->del($this->getTaskLockKey($taskKey));
$this->clearRemovalFlag($taskKey); $this->clearRemovalFlag($taskKey);
$this->logInfo("[CrontabScheduler] 任务已自毁: {$taskKey}"); $this->logInfo($taskExists ? "[CrontabScheduler] 任务已自毁: {$taskKey}" : "[CrontabScheduler] 任务执行期间已被删除: {$taskKey}");
} else { } else {
$this->finalizeTaskScheduling($config, $now); $this->finalizeTaskScheduling($config, $now);
} }
@@ -625,7 +634,7 @@ class CrontabScheduler extends Component
*/ */
private function finalizeTaskSuccess(TaskConfig $config, int $now, float $duration): void private function finalizeTaskSuccess(TaskConfig $config, int $now, float $duration): void
{ {
$nextRun = $this->cronExpression->getNextRunTime($config->expression, $now); $nextRun = $this->cronExpression->getNextRunTimeMs($config->expression, $now);
$isOneShot = $this->cronExpression->isOneShot($config->expression); $isOneShot = $this->cronExpression->isOneShot($config->expression);
$this->dispatchEvent(new OnTaskExecuted( $this->dispatchEvent(new OnTaskExecuted(
@@ -642,7 +651,7 @@ class CrontabScheduler extends Component
*/ */
private function finalizeTaskFailure(TaskConfig $config, int $now, float $duration, \Throwable $error): void private function finalizeTaskFailure(TaskConfig $config, int $now, float $duration, \Throwable $error): void
{ {
$nextRun = $this->cronExpression->getNextRunTime($config->expression, $now); $nextRun = $this->cronExpression->getNextRunTimeMs($config->expression, $now);
$this->dispatchEvent(new OnTaskFailed( $this->dispatchEvent(new OnTaskFailed(
taskKey: $config->taskKey, taskKey: $config->taskKey,
@@ -671,7 +680,7 @@ class CrontabScheduler extends Component
return; return;
} }
$nextRun = $this->cronExpression->getNextRunTime($config->expression, $now); $nextRun = $this->cronExpression->getNextRunTimeMs($config->expression, $now);
$interval = $this->cronExpression->getIntervalDescription($config->expression); $interval = $this->cronExpression->getIntervalDescription($config->expression);
$updateData = [ $updateData = [
@@ -738,7 +747,12 @@ class CrontabScheduler extends Component
private function acquireTaskLock(string $taskKey): bool private function acquireTaskLock(string $taskKey): bool
{ {
$lockKey = $this->getTaskLockKey($taskKey); $lockKey = $this->getTaskLockKey($taskKey);
return $this->redis->set($lockKey, (string)time(), ['nx', 'ex' => $this->taskTimeout]); $token = bin2hex(random_bytes(16));
$locked = $this->redis->set($lockKey, $token, ['nx', 'ex' => $this->taskTimeout]);
if ($locked) {
$this->taskLockTokens[$taskKey] = $token;
}
return (bool)$locked;
} }
/** /**
@@ -746,7 +760,28 @@ class CrontabScheduler extends Component
*/ */
private function releaseTaskLock(string $taskKey): void private function releaseTaskLock(string $taskKey): void
{ {
$this->redis->del($this->getTaskLockKey($taskKey)); $token = $this->taskLockTokens[$taskKey] ?? null;
unset($this->taskLockTokens[$taskKey]);
if ($token === null) {
return;
}
$script = <<<'LUA'
if redis.call('get', KEYS[1]) == ARGV[1] then
return redis.call('del', KEYS[1])
end
return 0
LUA;
$this->redis->eval($script, [$this->getTaskLockKey($taskKey), $token], 1);
}
/**
* 判断任务是否正在执行。
*/
private function isTaskRunning(string $taskKey): bool
{
return (bool)$this->redis->sIsMember(self::RUNNING_SET_KEY, $taskKey);
} }
/** /**
@@ -762,7 +797,7 @@ class CrontabScheduler extends Component
*/ */
private function persistNewTask(TaskConfig $taskConfig): void private function persistNewTask(TaskConfig $taskConfig): void
{ {
$nextRun = $this->cronExpression->getNextRunTime($taskConfig->expression, time() - 1); $nextRun = $this->cronExpression->getNextRunTimeMs($taskConfig->expression, $this->currentTimeMs() - 1);
$interval = $this->cronExpression->getIntervalDescription($taskConfig->expression); $interval = $this->cronExpression->getIntervalDescription($taskConfig->expression);
$taskConfig->nextRun = $nextRun; $taskConfig->nextRun = $nextRun;
@@ -776,7 +811,7 @@ class CrontabScheduler extends Component
$this->redis->zAdd(self::QUEUE_KEY, $nextRun, $taskConfig->taskKey); $this->redis->zAdd(self::QUEUE_KEY, $nextRun, $taskConfig->taskKey);
} }
$this->logInfo("[CrontabScheduler] 新任务已注册: {$taskConfig->taskKey} 下次执行: " . date('Y-m-d H:i:s', $nextRun)); $this->logInfo("[CrontabScheduler] 新任务已注册: {$taskConfig->taskKey} 下次执行: " . date('Y-m-d H:i:s', intdiv($nextRun, 1000)));
} }
/** /**
@@ -785,12 +820,31 @@ class CrontabScheduler extends Component
private function mergeExistingTask(TaskConfig $taskConfig): void private function mergeExistingTask(TaskConfig $taskConfig): void
{ {
$hashKey = $this->getTaskHashKey($taskConfig->taskKey); $hashKey = $this->getTaskHashKey($taskConfig->taskKey);
$hash = $this->redis->hGetAll($hashKey);
$expressionChanged = ($hash['expression'] ?? '') !== $taskConfig->expression;
$statusChanged = ($hash['status'] ?? 'active') !== $taskConfig->status;
$updateData = [ $updateData = [
'class' => $taskConfig->className, 'class' => $taskConfig->className,
'name' => $taskConfig->name, 'name' => $taskConfig->name,
'expression' => $taskConfig->expression, 'expression' => $taskConfig->expression,
'status' => $taskConfig->status,
]; ];
if ($expressionChanged || $statusChanged) {
$nextRun = $this->cronExpression->getNextRunTimeMs($taskConfig->expression, $this->currentTimeMs() - 1);
$interval = $this->cronExpression->getIntervalDescription($taskConfig->expression);
$updateData['next_run'] = $nextRun;
$updateData['interval'] = $interval;
if ($taskConfig->status === 'active' && $nextRun > 0) {
$this->redis->zAdd(self::QUEUE_KEY, $nextRun, $taskConfig->taskKey);
} else {
$this->removeFromQueue($taskConfig->taskKey);
}
}
$this->redis->hMSet($hashKey, $updateData); $this->redis->hMSet($hashKey, $updateData);
} }
@@ -814,9 +868,15 @@ class CrontabScheduler extends Component
*/ */
private function calculateNextTickTime(): float private function calculateNextTickTime(): float
{ {
$now = microtime(true); return microtime(true) + $this->tickInterval;
$current = floor($now); }
return $current + $this->tickInterval;
/**
* 当前毫秒级 Unix 时间戳。
*/
private function currentTimeMs(): int
{
return CronExpression::currentTimeMs();
} }
/** /**
+1 -1
View File
@@ -14,7 +14,7 @@ class OnTaskExecuted
* @param string $className 任务处理类 * @param string $className 任务处理类
* @param string $taskName 任务显示名称 * @param string $taskName 任务显示名称
* @param float $duration 执行耗时 (秒) * @param float $duration 执行耗时 (秒)
* @param int $nextRun 下次执行时间戳 * @param int $nextRun 下次执行毫秒级时间戳
*/ */
public function __construct( public function __construct(
public string $taskKey, public string $taskKey,
+1 -1
View File
@@ -15,7 +15,7 @@ class OnTaskFailed
* @param string $taskName 任务显示名称 * @param string $taskName 任务显示名称
* @param \Throwable $error 异常信息 * @param \Throwable $error 异常信息
* @param float $duration 执行耗时 (秒) * @param float $duration 执行耗时 (秒)
* @param int $nextRun 下次执行时间戳 (失败仍会调度下次) * @param int $nextRun 下次执行毫秒级时间戳 (失败仍会调度下次)
*/ */
public function __construct( public function __construct(
public string $taskKey, public string $taskKey,
+2 -2
View File
@@ -25,8 +25,8 @@ class TaskConfig
* @param string $name 任务显示名称 * @param string $name 任务显示名称
* @param string $expression 调度表达式 * @param string $expression 调度表达式
* @param string $status 状态: active / paused / disabled * @param string $status 状态: active / paused / disabled
* @param int $nextRun 下次执行时间戳 (0 表示立即) * @param int $nextRun 下次执行毫秒级时间戳 (0 表示立即)
* @param int $lastRun 上次执行时间戳 * @param int $lastRun 上次执行毫秒级时间戳
* @param string $interval 可读的执行间隔描述 * @param string $interval 可读的执行间隔描述
* @param int $createdAt 创建时间戳 * @param int $createdAt 创建时间戳
*/ */
+26
View File
@@ -70,6 +70,25 @@ class CronExpressionTest
$next = $this->parser->getNextRunTime('every:60', $now); $next = $this->parser->getNextRunTime('every:60', $now);
$this->assertTrue($next === $now + 60, "期望 " . ($now + 60) . " 实际 $next"); $this->assertTrue($next === $now + 60, "期望 " . ($now + 60) . " 实际 $next");
} }
/**
* 测试毫秒级 every:100ms 表达式
*/
public function testEveryMilliseconds(): void
{
$nowMs = 1760000000123;
$next = $this->parser->getNextRunTimeMs('every:100ms', $nowMs);
$this->assertTrue($next === $nowMs + 100, "期望 " . ($nowMs + 100) . " 实际 $next");
}
/**
* 测试秒级表达式在毫秒级接口中返回毫秒时间戳
*/
public function testEverySecondsAsMilliseconds(): void
{
$nowMs = 1760000000123;
$next = $this->parser->getNextRunTimeMs('every:1s', $nowMs);
$this->assertTrue($next === $nowMs + 1000, "期望 " . ($nowMs + 1000) . " 实际 $next");
}
/** /**
* 测试 every:5m 表达式 * 测试 every:5m 表达式
@@ -159,6 +178,13 @@ class CronExpressionTest
$expected = mktime(10, 5, 0, 1, 15, 2026); $expected = mktime(10, 5, 0, 1, 15, 2026);
$this->assertTrue($next === $expected, "期望 $expected 实际 $next"); $this->assertTrue($next === $expected, "期望 $expected 实际 $next");
} }
public function testCronAlignsToMinuteBoundary(): void
{
$now = mktime(10, 2, 30, 1, 15, 2026); // 10:02:30
$next = $this->parser->getNextRunTime('cron:*/5 * * * *', $now);
$expected = mktime(10, 5, 0, 1, 15, 2026);
$this->assertTrue($next === $expected, "期望 $expected 实际 $next");
}
/** /**
* 测试 cron 表达式精确时间 * 测试 cron 表达式精确时间