This commit is contained in:
2026-06-28 17:06:12 +08:00
parent ece31c2830
commit 94111ccba6
20 changed files with 2789 additions and 0 deletions
+224
View File
@@ -0,0 +1,224 @@
<?php
declare(strict_types=1);
namespace Kiri\Crontab\Annotate;
/**
* 定时任务注解 — 标记一个类为定时任务
*
* 放在实现了 TaskInterface 的类上,由 CrontabProcess 启动时自动发现并注册
*
* 使用示例:
* #[Crontab(name: '清理日志', hour: 3, minute: 0, loop: true)]
* class CleanLogTask implements TaskInterface { ... }
*
* #[Crontab(name: '心跳', tick: 30, loop: true)]
* class HeartbeatTask implements TaskInterface { ... }
*/
#[\Attribute(\Attribute::TARGET_CLASS)]
class Crontab
{
/**
* @param string $name 任务显示名称(为空则使用类名)
* @param bool $loop 是否循环执行,默认 false (一次性)
* @param int|null $tick 循环间隔: 秒
* @param int|null $tickMinute 循环间隔: 分钟
* @param int|null $tickHour 循环间隔: 小时
* @param int|null $year 执行时间: 年
* @param int|null $month 执行时间: 月 (1-12)
* @param int|null $day 执行时间: 日 (1-31)
* @param int|null $hour 执行时间: 时 (0-23)
* @param int|null $minute 执行时间: 分 (0-59)
* @param int|null $second 执行时间: 秒 (0-59)
* @param string|null $cron 标准 5 字段 cron 表达式
* @param int|string|null $every 旧版兼容: 每N秒(int) 或 '5m'/'1h'
* @param string|null $dailyAt 旧版兼容: 每天 HH:MM
* @param int|null $hourlyAt 旧版兼容: 每小时第N分钟
* @param int|null $at 旧版兼容: Unix时间戳一次性
* @param string|null $expression 旧版兼容: 完整表达式字符串
* @param string $status 任务状态: active / paused / disabled
*/
public function __construct(
public string $name = '',
public bool $loop = false,
public ?int $tick = null,
public ?int $tickMinute = null,
public ?int $tickHour = null,
public ?int $year = null,
public ?int $month = null,
public ?int $day = null,
public ?int $hour = null,
public ?int $minute = null,
public ?int $second = null,
public ?string $cron = null,
public int|string|null $every = null,
public ?string $dailyAt = null,
public ?int $hourlyAt = null,
public ?int $at = null,
public ?string $expression = null,
public string $status = 'active',
) {
}
/**
* 将调度参数转换为标准 expression 字符串供底层引擎使用
*
* 转换优先级:
* expression → cron → tick* → 时间字段(loop) → 时间字段(once) → 旧版快捷参数
*/
public function buildExpression(): string
{
if ($this->expression !== null && $this->expression !== '') {
return $this->expression;
}
if ($this->cron !== null && $this->cron !== '') {
return 'cron:' . $this->cron;
}
$everyExpr = $this->buildTickExpression();
if ($everyExpr !== '') {
return $everyExpr;
}
if ($this->hasAnyTimeField()) {
return $this->buildClockExpression();
}
if ($this->every !== null) {
return is_int($this->every) ? 'every:' . $this->every : 'every:' . $this->every;
}
if ($this->dailyAt !== null) {
return 'daily:' . $this->dailyAt;
}
if ($this->hourlyAt !== null) {
return 'hourly:' . $this->hourlyAt;
}
if ($this->at !== null) {
return 'at:' . $this->at;
}
return '';
}
/**
* 构建 tick 间隔表达式
*/
private function buildTickExpression(): string
{
if ($this->tick !== null) {
return 'every:' . $this->tick;
}
if ($this->tickMinute !== null) {
return 'every:' . $this->tickMinute . 'm';
}
if ($this->tickHour !== null) {
return 'every:' . $this->tickHour . 'h';
}
return '';
}
/**
* 构建闹钟模式表达式
*/
private function buildClockExpression(): string
{
$h = $this->hour ?? 0;
$m = $this->minute ?? 0;
$s = $this->second ?? 0;
if ($this->loop) {
if ($this->isDailyPattern()) {
return sprintf('daily:%02d:%02d:%02d', $h, $m, $s);
}
if ($this->isHourlyPattern()) {
return sprintf('hourly:%02d', $m);
}
return $this->buildCronFromClock();
}
return $this->buildAtFromClock();
}
/**
* 是否匹配 daily 模式 (只有 hour/minute/second,无 year/month/day)
*/
private function isDailyPattern(): bool
{
return $this->year === null
&& $this->month === null
&& $this->day === null
&& $this->hour !== null;
}
/**
* 是否匹配 hourly 模式 (只有 minute/second,无 year/month/day/hour)
*/
private function isHourlyPattern(): bool
{
return $this->year === null
&& $this->month === null
&& $this->day === null
&& $this->hour === null
&& $this->minute !== null;
}
/**
* 将时间字段转为标准 5 字段 cron 表达式
*/
private function buildCronFromClock(): string
{
$min = $this->minute !== null ? (string)$this->minute : '*';
$h = $this->hour !== null ? (string)$this->hour : '*';
$d = $this->day !== null ? (string)$this->day : '*';
$mon = $this->month !== null ? (string)$this->month : '*';
return "cron:{$min} {$h} {$d} {$mon} *";
}
/**
* 计算一次性执行的 Unix 时间戳
*/
private function buildAtFromClock(): string
{
$now = time();
$timeInfo = getdate($now);
$y = $this->year ?? (int)$timeInfo['year'];
$mon = $this->month ?? (int)$timeInfo['mon'];
$d = $this->day ?? (int)$timeInfo['mday'];
$h = $this->hour ?? (int)$timeInfo['hours'];
$m = $this->minute ?? (int)$timeInfo['minutes'];
$s = $this->second ?? (int)$timeInfo['seconds'];
$timestamp = mktime($h, $m, $s, $mon, $d, $y);
if ($timestamp <= $now) {
if ($this->year === null && $this->month === null && $this->day === null) {
$timestamp = mktime($h, $m, $s, $mon, $d + 1, $y);
} elseif ($this->day === null) {
$timestamp = mktime($h, $m, $s, $mon + 1, $d, $y);
}
}
return 'at:' . $timestamp;
}
/**
* 检查是否设置了任意时间字段
*/
private function hasAnyTimeField(): bool
{
return $this->year !== null
|| $this->month !== null
|| $this->day !== null
|| $this->hour !== null
|| $this->minute !== null
|| $this->second !== null;
}
}
+244
View File
@@ -0,0 +1,244 @@
<?php
declare(strict_types=1);
namespace Kiri\Crontab;
/**
* 简易 Cron 表达式解析器 — 支持标准 5 字段 cron + 自定义表达式
*
* 标准 cron 格式: 分 时 日 月 周
* * 匹配任意值
* *\/N 每 N 单位执行一次
* N 精确匹配
* N,M 枚举匹配
* N-M 范围匹配
*
* 自定义表达式:
* every:{秒}
* every:{秒}s
* every:{分}m
* every:{时}h
* daily:{HH:MM}
* hourly:{MM}
* at:{时间戳}
*/
class CronExpression
{
/**
* 计算给定时间戳之后的下次执行时间
*
* @param string $expression 调度表达式
* @param int $afterTimestamp 起始时间戳 (不含)
* @return int 下次执行时间戳,一次性任务过期返回 0
*/
public function getNextRunTime(string $expression, int $afterTimestamp = 0): int
{
if ($afterTimestamp <= 0) {
$afterTimestamp = time();
}
return match (true) {
str_starts_with($expression, 'every:') => $this->parseEvery($expression, $afterTimestamp),
str_starts_with($expression, 'daily:') => $this->parseDaily($expression, $afterTimestamp),
str_starts_with($expression, 'hourly:') => $this->parseHourly($expression, $afterTimestamp),
str_starts_with($expression, 'at:') => $this->parseAt($expression, $afterTimestamp),
str_starts_with($expression, 'cron:') => $this->parseCron(substr($expression, 5), $afterTimestamp),
default => $this->parseCron($expression, $afterTimestamp),
};
}
/**
* 获取表达式的可读间隔描述
*/
public function getIntervalDescription(string $expression): string
{
return match (true) {
str_starts_with($expression, 'every:') => $this->describeEvery($expression),
str_starts_with($expression, 'daily:') => '每天 ' . substr($expression, 6),
str_starts_with($expression, 'hourly:') => '每小时第 ' . substr($expression, 7) . ' 分',
str_starts_with($expression, 'at:') => '一次性任务',
str_starts_with($expression, 'cron:') => 'Cron: ' . substr($expression, 5),
default => 'Cron: ' . $expression,
};
}
/**
* 是否为一次性任务
*/
public function isOneShot(string $expression): bool
{
return str_starts_with($expression, 'at:');
}
/**
* 解析 every:{N}[s|m|h] 表达式
*/
private function parseEvery(string $expression, int $afterTimestamp): int
{
$value = substr($expression, 6);
$seconds = match (true) {
str_ends_with($value, 's') => (int)$value,
str_ends_with($value, 'm') => (int)$value * 60,
str_ends_with($value, 'h') => (int)$value * 3600,
default => (int)$value,
};
// 从上次调度时间累加,避免时间漂移和积压
// 如果 afterTimestamp 距离上次调度超过 n 个周期,只加一个周期(按第一次调度时间对齐)
return $afterTimestamp + $seconds;
}
/**
* 描述 every 表达式
*/
private function describeEvery(string $expression): string
{
$value = substr($expression, 6);
return match (true) {
str_ends_with($value, 's') => '每 ' . ((int)$value) . ' 秒',
str_ends_with($value, 'm') => '每 ' . ((int)$value) . ' 分钟',
str_ends_with($value, 'h') => '每 ' . ((int)$value) . ' 小时',
default => '每 ' . ((int)$value) . ' 秒',
};
}
/**
* 解析 daily:{HH:MM} 表达式
*/
private function parseDaily(string $expression, int $afterTimestamp): int
{
$timeStr = substr($expression, 6);
$parts = explode(':', $timeStr);
$hour = (int)($parts[0] ?? 0);
$minute = (int)($parts[1] ?? 0);
$second = (int)($parts[2] ?? 0);
$currentDate = getdate($afterTimestamp);
$targetTime = mktime($hour, $minute, $second, $currentDate['mon'], $currentDate['mday'], $currentDate['year']);
if ($targetTime <= $afterTimestamp) {
// 今天的时间已过,推到明天
$targetTime = $targetTime + 86400;
}
return $targetTime;
}
/**
* 解析 hourly:{MM} 表达式
*/
private function parseHourly(string $expression, int $afterTimestamp): int
{
$minute = (int)substr($expression, 7);
$currentDate = getdate($afterTimestamp);
$targetTime = mktime($currentDate['hours'], $minute, 0, $currentDate['mon'], $currentDate['mday'], $currentDate['year']);
if ($targetTime <= $afterTimestamp) {
$targetTime = $targetTime + 3600;
}
return $targetTime;
}
/**
* 解析 at:{时间戳} 表达式 — 一次性任务
*/
private function parseAt(string $expression, int $afterTimestamp): int
{
$timestamp = (int)substr($expression, 3);
if ($timestamp <= $afterTimestamp) {
// 已过期,返回 0 表示不再调度
return 0;
}
return $timestamp;
}
/**
* 解析标准 5 字段 cron 表达式: 分 时 日 月 周
*/
private function parseCron(string $cronExpression, int $afterTimestamp): int
{
$fields = preg_split('/\s+/', trim($cronExpression));
if (count($fields) !== 5) {
// 格式无效,返回 0
return 0;
}
$minute = $fields[0];
$hour = $fields[1];
$day = $fields[2];
$month = $fields[3];
$weekday = $fields[4];
// 从 afterTimestamp 下一秒开始逐分钟搜索,最多搜索 2 年
$searchStart = $afterTimestamp + 60;
$searchEnd = $afterTimestamp + 365 * 2 * 86400;
for ($ts = $searchStart; $ts <= $searchEnd; $ts += 60) {
$t = getdate($ts);
$matched = true;
$matched = $matched && $this->matchCronField($minute, $t['minutes'], 0, 59);
$matched = $matched && $this->matchCronField($hour, $t['hours'], 0, 23);
$matched = $matched && $this->matchCronField($day, $t['mday'], 1, 31);
$matched = $matched && $this->matchCronField($month, $t['mon'], 1, 12);
$matched = $matched && $this->matchCronField($weekday, $t['wday'], 0, 6);
if ($matched) {
return $ts;
}
}
return 0;
}
/**
* 匹配单个 cron 字段值,支持 *、*\/N、N、N,M、N-M
*
* @param string $fieldValue cron 字段原始值
* @param int $current 当前时间单位的值
* @param int $min 该字段的最小值
* @param int $max 该字段的最大值
*/
private function matchCronField(string $fieldValue, int $current, int $min, int $max): bool
{
// * 匹配所有值
if ($fieldValue === '*') {
return true;
}
// *\/N 每隔 N 步进
if (str_starts_with($fieldValue, '*/')) {
$step = (int)substr($fieldValue, 2);
if ($step <= 0) {
return false;
}
return ($current - $min) % $step === 0;
}
// 逗号分隔的枚举值
if (str_contains($fieldValue, ',')) {
$values = explode(',', $fieldValue);
foreach ($values as $val) {
if ($this->matchCronField(trim($val), $current, $min, $max)) {
return true;
}
}
return false;
}
// N-M 范围值
if (str_contains($fieldValue, '-')) {
$parts = explode('-', $fieldValue);
$rangeStart = (int)$parts[0];
$rangeEnd = (int)$parts[1];
return $current >= $rangeStart && $current <= $rangeEnd;
}
// 精确值
return (int)$fieldValue === $current;
}
}
+153
View File
@@ -0,0 +1,153 @@
<?php
declare(strict_types=1);
namespace Kiri\Crontab;
/**
* Crontab 控制台命令 — 独立模式下的启动/停止/管理命令
*
* 支持的命令:
* php bin/crontab start 启动调度器
* php bin/crontab stop 停止调度器
* php bin/crontab restart 重启调度器
* php bin/crontab status 查看调度状态
*/
class CrontabCommand
{
/** @var string PID 文件路径 */
private string $pidFile;
/** @var string 日志文件路径 */
private string $logFile;
/**
* @param string $pidFile PID 文件路径
* @param string $logFile 日志文件路径
*/
public function __construct(
string $pidFile = '',
string $logFile = '',
) {
$this->pidFile = $pidFile !== '' ? $pidFile : sys_get_temp_dir() . '/crontab.pid';
$this->logFile = $logFile;
}
/**
* 启动调度器进程
*/
public function start(array $config): void
{
if ($this->isRunning()) {
echo "[Crontab] 调度器已在运行中, PID: " . $this->getPid() . PHP_EOL;
return;
}
$count = TaskRegistry::count();
if ($count === 0) {
echo "[Crontab] 警告: 没有注册任何任务" . PHP_EOL;
}
echo "[Crontab] 启动调度器..." . PHP_EOL;
echo "[Crontab] 已注册 {$count} 个任务" . PHP_EOL;
$process = new \Swoole\Process(function (\Swoole\Process $worker) use ($config) {
file_put_contents($this->pidFile, (string)$worker->pid);
$crontabProcess = new CrontabProcess($config);
$crontabProcess->run();
}, false, 0, true);
$pid = $process->start();
echo "[Crontab] 调度器已启动, PID: {$pid}" . PHP_EOL;
echo "[Crontab] PID 文件: {$this->pidFile}" . PHP_EOL;
\Swoole\Process::wait();
}
/**
* 停止调度器进程
*/
public function stop(): void
{
if (!$this->isRunning()) {
echo "[Crontab] 调度器未运行" . PHP_EOL;
return;
}
$pid = $this->getPid();
if ($pid > 0 && \Swoole\Process::kill($pid, 0)) {
\Swoole\Process::kill($pid, SIGTERM);
echo "[Crontab] 已发送停止信号, PID: {$pid}" . PHP_EOL;
$timeout = 10;
while ($timeout > 0 && \Swoole\Process::kill($pid, 0)) {
usleep(200000);
$timeout--;
}
if (\Swoole\Process::kill($pid, 0)) {
\Swoole\Process::kill($pid, SIGKILL);
echo "[Crontab] 进程未响应, 已强制终止" . PHP_EOL;
}
}
if (file_exists($this->pidFile)) {
unlink($this->pidFile);
}
echo "[Crontab] 调度器已停止" . PHP_EOL;
}
/**
* 重启调度器进程
*/
public function restart(array $config): void
{
echo "[Crontab] 重启调度器..." . PHP_EOL;
$this->stop();
usleep(500000);
$this->start($config);
}
/**
* 查看调度器状态
*/
public function status(): void
{
if ($this->isRunning()) {
$pid = $this->getPid();
echo "[Crontab] 状态: 运行中" . PHP_EOL;
echo "[Crontab] PID: {$pid}" . PHP_EOL;
} else {
echo "[Crontab] 状态: 未运行" . PHP_EOL;
}
}
/**
* 检查调度器是否在运行
*/
private function isRunning(): bool
{
$pid = $this->getPid();
if ($pid <= 0) {
return false;
}
return \Swoole\Process::kill($pid, 0);
}
/**
* 从 PID 文件读取进程 ID
*/
private function getPid(): int
{
if (!file_exists($this->pidFile)) {
return 0;
}
$pid = (int)file_get_contents($this->pidFile);
return $pid > 0 ? $pid : 0;
}
}
+198
View File
@@ -0,0 +1,198 @@
<?php
declare(strict_types=1);
namespace Kiri\Crontab;
use Psr\Log\LoggerInterface;
use Psr\Log\NullLogger;
use Swoole\Coroutine;
/**
* Crontab Swoole 进程 — 作为独立进程运行任务调度器
*
* 两种运行模式:
* 1. 独立模式: 直接调用 run() 启动,自建 Swoole 事件循环
* 2. kiri-core 集成模式: 注册为自定义 Process,由 kiri-http-server 管理生命周期
*
* 任务注册方式:
* - 配置模式: config/crontab.php 的 tasks 中声明
* - 注解模式: 任务类上使用 #[Crontab] 注解 + CrontabScanner 自动扫描
* 两种方式可同时使用
*
* 独立模式示例:
* $process = new CrontabProcess($config, $registry);
* $process->run();
*
* kiri-core 集成模式:
* 在 config/servers.php 的 process 中添加 CrontabProcess::class 即可
*/
class CrontabProcess
{
private ?CrontabScheduler $scheduler = null;
/**
* @param array $config 配置数组 (crontab.php 的完整内容)
* @param LoggerInterface|null $logger 日志记录器
*/
public function __construct(
private array $config,
private ?LoggerInterface $logger = null,
) {
if ($this->logger === null) {
$this->logger = new NullLogger();
}
}
/**
* 扫描注解任务并启动进程 (独立模式)
*
* @throws \Throwable
*/
public function run(): void
{
$this->logger->info('[CrontabProcess] 进程启动中...');
$this->discoverAnnotationTasks();
Coroutine::run(function () {
$this->startScheduler();
});
$this->logger->info('[CrontabProcess] 进程已退出');
}
/**
* 在 kiri-core 自定义进程中启动调度器
* 由框架的 AbstractProcess 生命周期调用
*
* @throws \Throwable
*/
public function boot(): void
{
$this->logger->info('[CrontabProcess] 在 kiri-core 进程内启动...');
$this->discoverAnnotationTasks();
$this->startScheduler();
}
/**
* 获取当前调度器实例
*/
public function getScheduler(): ?CrontabScheduler
{
return $this->scheduler;
}
/**
* 从已声明的类中发现 #[Crontab] 类注解并注册到 TaskRegistry
* CrontabProcess 启动时自动调用,检查所有已加载的类
*/
private function discoverAnnotationTasks(): void
{
$beforeCount = TaskRegistry::count();
foreach (get_declared_classes() as $className) {
if (!in_array(TaskInterface::class, class_implements($className), true)) {
continue;
}
try {
$reflect = new \ReflectionClass($className);
if ($reflect->isAbstract()) {
continue;
}
// 读取类上的 #[Crontab] 注解
$attributes = $reflect->getAttributes(Annotate\Crontab::class);
if (empty($attributes)) {
continue;
}
/** @var Annotate\Crontab $instance */
$instance = $attributes[0]->newInstance();
$scheduleExpression = $instance->buildExpression();
if ($scheduleExpression === '') {
continue;
}
TaskRegistry::register([
'class' => $className,
'name' => $instance->name !== '' ? $instance->name : $className,
'expression' => $scheduleExpression,
'status' => $instance->status,
]);
} catch (\Throwable) {
// 跳过无法反射或注册失败的类
}
}
$afterCount = TaskRegistry::count();
if ($afterCount > $beforeCount) {
$this->logger->info("[CrontabProcess] 发现注解任务 " . ($afterCount - $beforeCount) . "");
}
}
/**
* 初始化并启动调度器
*/
private function startScheduler(): void
{
$redis = $this->createRedisConnection();
$cronExpression = new CronExpression();
$schedulerConfig = $this->config['scheduler'] ?? [];
$this->scheduler = new CrontabScheduler(
redis: $redis,
cronExpression: $cronExpression,
logger: $this->logger,
tickInterval: (int)($schedulerConfig['tick_interval'] ?? 1),
taskTimeout: (int)($schedulerConfig['task_timeout'] ?? 300),
lockTtl: (int)($schedulerConfig['lock_ttl'] ?? 60),
lockRenewInterval: (int)($schedulerConfig['lock_renew_interval'] ?? 15),
concurrentTasks: (bool)($schedulerConfig['concurrent_tasks'] ?? true),
maxConcurrent: (int)($schedulerConfig['max_concurrent'] ?? 10),
);
$this->scheduler->start();
}
/**
* 创建 Redis 连接
* 在独立模式下直接创建连接,不通过 kiri-core 的连接池
*/
private function createRedisConnection(): \Redis
{
$redisConfig = $this->config['redis'] ?? [];
$host = $redisConfig['host'] ?? '127.0.0.1';
$port = (int)($redisConfig['port'] ?? 6379);
$auth = $redisConfig['auth'] ?? '';
$databases = (int)($redisConfig['databases'] ?? 0);
$timeout = (int)($redisConfig['timeout'] ?? 30);
$prefix = $redisConfig['prefix'] ?? '';
$redis = new \Redis();
if (!$redis->connect($host, $port, $timeout)) {
throw new \RuntimeException("Redis 连接失败: {$host}:{$port}");
}
if (!empty($auth) && !$redis->auth($auth)) {
throw new \RuntimeException("Redis 认证失败: {$redis->getLastError()}");
}
$redis->select($databases);
if (!empty($prefix)) {
$redis->setOption(\Redis::OPT_PREFIX, $prefix);
}
$this->logger->info("[CrontabProcess] Redis 已连接: {$host}:{$port} DB:{$databases}");
return $redis;
}
}
+42
View File
@@ -0,0 +1,42 @@
<?php
declare(strict_types=1);
namespace Kiri\Crontab;
use Kiri\Abstracts\Providers;
use Symfony\Component\Console\Application;
/**
* kiri-core 框架集成 Provider — 将 CrontabCommand 注册到 Symfony Console
*
* 在 kiri-core 项目的 config/servers.php 中配置:
* ```php
* 'process' => [
* \Kiri\Crontab\CrontabProcess::class,
* ],
* ```
*
* 在应用 Kernel 的 getCommands() 中添加:
* ```php
* public function getCommands(): array
* {
* return [
* \Kiri\Crontab\CrontabCommand::class,
* ];
* }
* ```
*/
class CrontabProviders extends Providers
{
/**
* 注册 CrontabCommand 到 Console Application
*/
public function onImport(): void
{
$command = $this->container->get(CrontabCommand::class);
$console = $this->container->get(Application::class);
$console->addCommand($command);
}
}
+683
View File
@@ -0,0 +1,683 @@
<?php
declare(strict_types=1);
namespace Kiri\Crontab;
use Psr\Log\LoggerInterface;
use Psr\Log\NullLogger;
use Swoole\Coroutine;
/**
* 核心调度引擎 — 基于 Redis ZSET 的定时任务调度器
*
* 职责:
* 1. 将 TaskRegistry 中的任务同步到 Redis (Hash + ZSET)
* 2. 主循环轮询 ZSET 获取到期任务
* 3. 通过任务锁防止并发重复执行
* 4. 执行任务并更新下次调度时间
* 5. 支持协程并发执行多个到期任务
* 6. 支持运行时动态投递和取消任务
*
* Redis 数据结构:
* crontab:queue — ZSET, score=下次执行时间戳, member=taskKey
* crontab:task:{key} — Hash, 任务元数据
* crontab:lock:master — String, 调度器主锁
* crontab:lock:task:{key} — String, 任务执行锁
* crontab:running — SET, 当前执行中的任务
*/
class CrontabScheduler
{
/** @var string Redis key 前缀 */
private const KEY_PREFIX = 'crontab';
/** @var string ZSET 调度队列 key */
private const QUEUE_KEY = 'crontab:queue';
/** @var string 主锁 key */
private const MASTER_LOCK_KEY = 'crontab:lock:master';
/** @var string 运行中任务 SET key */
private const RUNNING_SET_KEY = 'crontab:running';
/** @var int 默认 tick 间隔 (秒) */
private const DEFAULT_TICK_INTERVAL = 1;
/** @var int 默认任务执行超时 (秒) */
private const DEFAULT_TASK_TIMEOUT = 300;
/** @var int 默认主锁 TTL (秒) */
private const DEFAULT_LOCK_TTL = 60;
private bool $running = false;
/** @var string|null 当前正在执行的任务 key (协程安全) */
private ?string $currentTaskKey = null;
/** @var self|null 全局实例引用,供任务内部访问 */
private static ?self $instance = null;
/**
* @param \Redis $redis Redis 客户端
* @param TaskRegistry $registry 任务注册中心
* @param CronExpression $cronExpression Cron 表达式解析器
* @param LoggerInterface $logger 日志记录器
* @param int $tickInterval tick 间隔 (秒)
* @param int $taskTimeout 任务执行超时 (秒)
* @param int $lockTtl 主锁 TTL (秒)
* @param int $lockRenewInterval 主锁续期间隔 (秒)
* @param bool $concurrentTasks 是否协程并发执行
* @param int $maxConcurrent 最大并发数
*/
public function __construct(
private \Redis $redis,
private CronExpression $cronExpression,
private LoggerInterface $logger = new NullLogger(),
private int $tickInterval = self::DEFAULT_TICK_INTERVAL,
private int $taskTimeout = self::DEFAULT_TASK_TIMEOUT,
private int $lockTtl = self::DEFAULT_LOCK_TTL,
private int $lockRenewInterval = 15,
private bool $concurrentTasks = true,
private int $maxConcurrent = 10,
) {
self::$instance = $this;
}
/**
* 获取全局调度器实例
* 在任务 handle() 内部可通过此方法获取调度器引用,用于取消当前任务等操作
*/
public static function getInstance(): ?self
{
return self::$instance;
}
/**
* 启动调度器主循环
* 阻塞运行,直到 stop() 被调用或收到退出信号
*
* @throws \Throwable
*/
public function start(): void
{
$this->running = true;
$this->logger->info('[CrontabScheduler] 调度器启动,注册 ' . TaskRegistry::count() . ' 个任务');
$this->syncTasks();
$nextTickTime = $this->calculateNextTickTime();
while ($this->running) {
$now = microtime(true);
$sleepSeconds = max(0, $nextTickTime - $now);
if ($sleepSeconds > 0) {
usleep((int)($sleepSeconds * 1000000));
}
$nextTickTime = $this->calculateNextTickTime();
if (!$this->running) {
break;
}
try {
$this->tick();
} catch (\Throwable $throwable) {
$this->logger->error('[CrontabScheduler] tick 异常: ' . $throwable->getMessage());
}
}
self::$instance = null;
$this->logger->info('[CrontabScheduler] 调度器已停止');
}
/**
* 停止调度器
*/
public function stop(): void
{
$this->running = false;
}
/**
* 是否运行中
*/
public function isRunning(): bool
{
return $this->running;
}
// ──────────────────────────────────────────────
// 动态任务投递 API
// ──────────────────────────────────────────────
/**
* 动态投递任务 — 运行时向调度系统提交一个即时任务
*
* @param string $className 实现了 TaskInterface 的任务类
* @param string $expression 调度表达式 (every:1, at:时间戳, daily:03:00 等)
* @param string $name 任务显示名称 (可选)
* @return string 返回生成的 taskKey,可用于后续 cancel
*
* 使用示例:
* $scheduler = CrontabScheduler::getInstance();
* $taskKey = $scheduler->submit(MyTask::class, 'every:1', '匹配检查');
* // 任务每秒执行,匹配成功后在 handle() 中调用 cancel 停止
*/
public function submit(string $className, string $expression, string $name = ''): string
{
if (!class_exists($className)) {
throw new \InvalidArgumentException("任务类不存在: {$className}");
}
if (!in_array(TaskInterface::class, class_implements($className), true)) {
throw new \InvalidArgumentException("{$className} 必须实现 TaskInterface 接口");
}
$taskKey = $this->generateDynamicTaskKey($className);
$taskConfig = new TaskConfig(
taskKey: $taskKey,
className: $className,
name: $name !== '' ? $name : $className,
expression: $expression,
status: 'active',
createdAt: time(),
);
$this->persistNewTask($taskConfig);
$this->logger->info("[CrontabScheduler] 动态投递任务: {$taskKey} '{$taskConfig->name}' '{$expression}'");
return $taskKey;
}
/**
* 取消/移除指定任务 — 从 Redis 队列和元数据中彻底删除
*
* @param string $taskKey 任务标识,由 submit() 返回
* @return bool 是否成功取消
*/
public function cancelTask(string $taskKey): bool
{
$hashKey = $this->getTaskHashKey($taskKey);
if (!$this->redis->exists($hashKey)) {
return false;
}
$this->removeFromQueue($taskKey);
$this->redis->del($hashKey);
$this->redis->del($this->getTaskLockKey($taskKey));
$this->logger->info("[CrontabScheduler] 任务已取消: {$taskKey}");
return true;
}
/**
* 取消当前正在执行的任务
* 在 TaskInterface::handle() 内部调用,执行完后任务将被移除不再调度
*
* 使用示例:
* class MatchTask implements TaskInterface {
* public function handle(): void {
* if ($this->isMatched()) {
* CrontabScheduler::getInstance()?->cancelCurrentTask();
* }
* }
* }
*/
public function cancelCurrentTask(): void
{
if ($this->currentTaskKey !== null) {
$this->markTaskForRemoval($this->currentTaskKey);
}
}
/**
* 获取当前正在执行的任务 key (供任务内部使用)
*/
public function getCurrentTaskKey(): ?string
{
return $this->currentTaskKey;
}
// ──────────────────────────────────────────────
// 任务管理 API
// ──────────────────────────────────────────────
/**
* 同步任务到 Redis
*/
public function syncTasks(): void
{
foreach (TaskRegistry::all() as $taskKey => $taskConfig) {
$hashKey = $this->getTaskHashKey($taskKey);
if (!$this->redis->exists($hashKey)) {
$this->persistNewTask($taskConfig);
continue;
}
$this->mergeExistingTask($taskConfig);
}
}
/**
* 刷新所有任务的表达式和下次运行时间
*/
public function refreshTasks(): void
{
foreach (TaskRegistry::all() as $taskKey => $taskConfig) {
$hashKey = $this->getTaskHashKey($taskKey);
$nextRun = $this->cronExpression->getNextRunTime($taskConfig->expression, time() - 1);
$interval = $this->cronExpression->getIntervalDescription($taskConfig->expression);
$taskConfig->nextRun = $nextRun;
$taskConfig->interval = $interval;
$hashData = $taskConfig->toHash();
$this->redis->hMSet($hashKey, $hashData);
if ($taskConfig->status === 'active' && $nextRun > 0) {
$this->redis->zAdd(self::QUEUE_KEY, $nextRun, $taskKey);
}
}
}
/**
* 暂停指定任务
*/
public function pauseTask(string $taskKey): bool
{
$hashKey = $this->getTaskHashKey($taskKey);
if (!$this->redis->exists($hashKey)) {
return false;
}
$this->redis->hSet($hashKey, 'status', 'paused');
$this->redis->zRem(self::QUEUE_KEY, $taskKey);
$this->logger->info("[CrontabScheduler] 任务已暂停: {$taskKey}");
return true;
}
/**
* 恢复指定任务
*/
public function resumeTask(string $taskKey): bool
{
$hashKey = $this->getTaskHashKey($taskKey);
$hash = $this->redis->hGetAll($hashKey);
if (empty($hash)) {
return false;
}
$config = TaskConfig::fromHash($taskKey, $hash);
$nextRun = $this->cronExpression->getNextRunTime($config->expression, time() - 1);
$this->redis->hMSet($hashKey, [
'status' => 'active',
'next_run' => $nextRun,
]);
if ($nextRun > 0) {
$this->redis->zAdd(self::QUEUE_KEY, $nextRun, $taskKey);
}
$this->logger->info("[CrontabScheduler] 任务已恢复: {$taskKey}");
return true;
}
// ──────────────────────────────────────────────
// 内部调度
// ──────────────────────────────────────────────
/**
* 单次 tick — 检查并执行到期的任务
*/
private function tick(): void
{
if (!$this->acquireMasterLock()) {
return;
}
try {
$this->processDueTasks();
} finally {
$this->releaseMasterLock();
}
}
/**
* 获取主锁,防止多实例同时调度
*/
private function acquireMasterLock(): bool
{
return $this->redis->set(
self::MASTER_LOCK_KEY,
(string)getmypid(),
['nx', 'ex' => $this->lockTtl]
);
}
/**
* 释放主锁
*/
private function releaseMasterLock(): void
{
$script = <<<'LUA'
if redis.call('get', KEYS[1]) == ARGV[1] then
return redis.call('del', KEYS[1])
end
return 0
LUA;
$this->redis->eval($script, [self::MASTER_LOCK_KEY, (string)getmypid()], 1);
}
/**
* 续期主锁
*/
private function renewMasterLock(): void
{
$this->redis->expire(self::MASTER_LOCK_KEY, $this->lockTtl);
}
/**
* 处理所有到期任务
*/
private function processDueTasks(): void
{
$now = time();
$dueTaskKeys = $this->redis->zRangeByScore(self::QUEUE_KEY, '-inf', $now, ['limit' => [0, 100]]);
if (empty($dueTaskKeys)) {
$this->renewMasterLock();
return;
}
$this->renewMasterLock();
if ($this->concurrentTasks) {
$this->executeTasksConcurrently($dueTaskKeys, $now);
} else {
foreach ($dueTaskKeys as $taskKey) {
$this->executeSingleTask($taskKey, $now);
}
}
}
/**
* 并发执行到期任务 (通过协程)
*/
private function executeTasksConcurrently(array $taskKeys, int $now): void
{
$batchSize = min(count($taskKeys), $this->maxConcurrent);
$channel = new Coroutine\Channel($batchSize);
foreach ($taskKeys as $taskKey) {
$channel->push(true);
Coroutine::create(function () use ($taskKey, $now, $channel) {
try {
$this->executeSingleTask($taskKey, $now);
} catch (\Throwable $throwable) {
$this->logger->error("[CrontabScheduler] 并发执行异常: {$taskKey} - {$throwable->getMessage()}");
} finally {
$channel->pop();
}
});
}
for ($i = 0; $i < $batchSize; $i++) {
$channel->push(true);
}
$channel->close();
}
/**
* 执行单个任务
*/
private function executeSingleTask(string $taskKey, int $now): void
{
$hashKey = $this->getTaskHashKey($taskKey);
$hash = $this->redis->hGetAll($hashKey);
if (empty($hash)) {
$this->removeFromQueue($taskKey);
return;
}
$config = TaskConfig::fromHash($taskKey, $hash);
if ($config->status !== 'active') {
$this->removeFromQueue($taskKey);
return;
}
if (!$this->acquireTaskLock($taskKey)) {
$this->logger->warning("[CrontabScheduler] 任务锁获取失败 (可能仍在执行中): {$taskKey}");
return;
}
$startTime = microtime(true);
// 记录当前任务 key,供任务内部通过 cancelCurrentTask() 取消自身
$this->currentTaskKey = $taskKey;
try {
$this->redis->sAdd(self::RUNNING_SET_KEY, $taskKey);
$this->logger->info("[CrontabScheduler] 开始执行: {$taskKey} ({$config->name})");
$className = $config->className;
if (!class_exists($className)) {
throw new \RuntimeException("任务类不存在: {$className}");
}
/** @var TaskInterface $taskInstance */
$taskInstance = new $className();
$taskInstance->handle();
$duration = round(microtime(true) - $startTime, 4);
$this->logger->info("[CrontabScheduler] 执行成功: {$taskKey} 耗时 {$duration}s");
} catch (\Throwable $throwable) {
$duration = round(microtime(true) - $startTime, 4);
$this->logger->error("[CrontabScheduler] 执行失败: {$taskKey} 耗时 {$duration}s 错误: {$throwable->getMessage()}");
} finally {
$this->redis->sRem(self::RUNNING_SET_KEY, $taskKey);
// 检查是否被标记为"执行后移除"
$shouldRemove = $this->isTaskMarkedForRemoval($taskKey);
if ($shouldRemove) {
$this->removeFromQueue($taskKey);
$this->redis->del($hashKey);
$this->redis->del($this->getTaskLockKey($taskKey));
$this->clearRemovalFlag($taskKey);
$this->logger->info("[CrontabScheduler] 任务已自毁: {$taskKey}");
} else {
$this->finalizeTaskExecution($config, $now);
}
$this->releaseTaskLock($taskKey);
$this->currentTaskKey = null;
}
}
/**
* 完成执行后的任务状态更新
*/
private function finalizeTaskExecution(TaskConfig $config, int $now): void
{
$taskKey = $config->taskKey;
$hashKey = $this->getTaskHashKey($taskKey);
$isOneShot = $this->cronExpression->isOneShot($config->expression);
if ($isOneShot) {
$this->removeFromQueue($taskKey);
$this->redis->del($hashKey);
$this->logger->info("[CrontabScheduler] 一次性任务已移除: {$taskKey}");
return;
}
$nextRun = $this->cronExpression->getNextRunTime($config->expression, $now);
$interval = $this->cronExpression->getIntervalDescription($config->expression);
$updateData = [
'last_run' => $now,
'next_run' => $nextRun,
'interval' => $interval,
];
$this->redis->hMSet($hashKey, $updateData);
if ($nextRun > 0) {
$this->redis->zAdd(self::QUEUE_KEY, $nextRun, $taskKey);
} else {
$this->removeFromQueue($taskKey);
}
}
/**
* 标记任务为"下次执行后移除"
* 当任务内部调用 cancelCurrentTask() 时,不立即删除(执行中操作安全),仅标记
* 等当前执行完成后,executeSingleTask 的 finally 块会检查此标记并清理
*/
private function markTaskForRemoval(string $taskKey): void
{
$this->redis->set('crontab:removal:' . $taskKey, '1', 60);
}
/**
* 检查任务是否被标记为待移除
*/
private function isTaskMarkedForRemoval(string $taskKey): bool
{
return (bool)$this->redis->exists('crontab:removal:' . $taskKey);
}
/**
* 清除移除标记
*/
private function clearRemovalFlag(string $taskKey): void
{
$this->redis->del('crontab:removal:' . $taskKey);
}
// ──────────────────────────────────────────────
// Redis 操作辅助
// ──────────────────────────────────────────────
/**
* 获取任务执行锁
*/
private function acquireTaskLock(string $taskKey): bool
{
$lockKey = $this->getTaskLockKey($taskKey);
return $this->redis->set($lockKey, (string)time(), ['nx', 'ex' => $this->taskTimeout]);
}
/**
* 释放任务执行锁
*/
private function releaseTaskLock(string $taskKey): void
{
$this->redis->del($this->getTaskLockKey($taskKey));
}
/**
* 从调度队列移除任务
*/
private function removeFromQueue(string $taskKey): void
{
$this->redis->zRem(self::QUEUE_KEY, $taskKey);
}
/**
* 持久化新任务到 Redis
*/
private function persistNewTask(TaskConfig $taskConfig): void
{
$nextRun = $this->cronExpression->getNextRunTime($taskConfig->expression, time() - 1);
$interval = $this->cronExpression->getIntervalDescription($taskConfig->expression);
$taskConfig->nextRun = $nextRun;
$taskConfig->interval = $interval;
$taskConfig->createdAt = time();
$hashKey = $this->getTaskHashKey($taskConfig->taskKey);
$this->redis->hMSet($hashKey, $taskConfig->toHash());
if ($taskConfig->status === 'active' && $nextRun > 0) {
$this->redis->zAdd(self::QUEUE_KEY, $nextRun, $taskConfig->taskKey);
}
$this->logger->info("[CrontabScheduler] 新任务已注册: {$taskConfig->taskKey} 下次执行: " . date('Y-m-d H:i:s', $nextRun));
}
/**
* 合并更新已有任务配置 (保留运行时状态)
*/
private function mergeExistingTask(TaskConfig $taskConfig): void
{
$hashKey = $this->getTaskHashKey($taskConfig->taskKey);
$updateData = [
'class' => $taskConfig->className,
'name' => $taskConfig->name,
'expression' => $taskConfig->expression,
];
$this->redis->hMSet($hashKey, $updateData);
}
/**
* 生成动态任务的唯一标识
*/
private function generateDynamicTaskKey(string $className): string
{
$baseKey = strtolower(str_replace('\\', '.', ltrim($className, '\\')));
$suffix = substr(md5(uniqid((string)mt_rand(), true)), 0, 8);
return 'dynamic.' . $baseKey . '.' . $suffix;
}
// ──────────────────────────────────────────────
// 辅助方法
// ──────────────────────────────────────────────
/**
* 计算下一个 tick 的精确时间点 (对齐到秒边界)
*/
private function calculateNextTickTime(): float
{
$now = microtime(true);
$current = floor($now);
return $current + $this->tickInterval;
}
/**
* 获取任务 Hash key
*/
private function getTaskHashKey(string $taskKey): string
{
return self::KEY_PREFIX . ':task:' . $taskKey;
}
/**
* 获取任务执行锁 key
*/
private function getTaskLockKey(string $taskKey): string
{
return self::KEY_PREFIX . ':lock:task:' . $taskKey;
}
}
+24
View File
@@ -0,0 +1,24 @@
<?php
declare(strict_types=1);
namespace Kiri\Crontab\Events;
/**
* 任务执行前事件
*/
class OnTaskBeforeExecute
{
/**
* @param string $taskKey 任务标识
* @param string $className 任务处理类
* @param string $taskName 任务显示名称
*/
public function __construct(
public string $taskKey,
public string $className,
public string $taskName,
) {
}
}
+28
View File
@@ -0,0 +1,28 @@
<?php
declare(strict_types=1);
namespace Kiri\Crontab\Events;
/**
* 任务执行成功事件
*/
class OnTaskExecuted
{
/**
* @param string $taskKey 任务标识
* @param string $className 任务处理类
* @param string $taskName 任务显示名称
* @param float $duration 执行耗时 (秒)
* @param int $nextRun 下次执行时间戳
*/
public function __construct(
public string $taskKey,
public string $className,
public string $taskName,
public float $duration,
public int $nextRun,
) {
}
}
+30
View File
@@ -0,0 +1,30 @@
<?php
declare(strict_types=1);
namespace Kiri\Crontab\Events;
/**
* 任务执行失败事件
*/
class OnTaskFailed
{
/**
* @param string $taskKey 任务标识
* @param string $className 任务处理类
* @param string $taskName 任务显示名称
* @param \Throwable $error 异常信息
* @param float $duration 执行耗时 (秒)
* @param int $nextRun 下次执行时间戳 (失败仍会调度下次)
*/
public function __construct(
public string $taskKey,
public string $className,
public string $taskName,
public \Throwable $error,
public float $duration,
public int $nextRun,
) {
}
}
+105
View File
@@ -0,0 +1,105 @@
<?php
declare(strict_types=1);
namespace Kiri\Crontab;
/**
* 任务配置值对象 — 描述一个定时任务的完整元数据
*
* 调度表达式支持以下格式:
* every:{秒} 每 N 秒执行
* every:{秒}s 每 N 秒执行
* every:{分}m 每 N 分钟执行
* every:{时}h 每 N 小时执行
* daily:{HH:MM} 每天指定时间
* hourly:{MM} 每小时指定分钟
* cron:{表达式} 标准 5 字段 cron
* at:{时间戳} 一次性执行
*/
class TaskConfig
{
/**
* @param string $taskKey 任务唯一标识,用于 Redis key
* @param string $className 任务处理类完整路径
* @param string $name 任务显示名称
* @param string $expression 调度表达式
* @param string $status 状态: active / paused / disabled
* @param int $nextRun 下次执行时间戳 (0 表示立即)
* @param int $lastRun 上次执行时间戳
* @param string $interval 可读的执行间隔描述
* @param int $createdAt 创建时间戳
*/
public function __construct(
public string $taskKey,
public string $className,
public string $name = '',
public string $expression = '',
public string $status = 'active',
public int $nextRun = 0,
public int $lastRun = 0,
public string $interval = '',
public int $createdAt = 0,
) {
if ($this->createdAt === 0) {
$this->createdAt = time();
}
if ($this->name === '') {
$this->name = $this->taskKey;
}
}
/**
* 从数组创建配置对象
*/
public static function fromArray(array $data): static
{
return new static(
taskKey: $data['taskKey'] ?? $data['class'],
className: $data['class'],
name: $data['name'] ?? '',
expression: $data['expression'] ?? '',
status: $data['status'] ?? 'active',
nextRun: (int)($data['next_run'] ?? 0),
lastRun: (int)($data['last_run'] ?? 0),
interval: $data['interval'] ?? '',
createdAt: (int)($data['created_at'] ?? 0),
);
}
/**
* 从 Hash 数据生成配置对象 (从 Redis 读取)
*/
public static function fromHash(string $taskKey, array $hash): static
{
return new static(
taskKey: $taskKey,
className: $hash['class'] ?? '',
name: $hash['name'] ?? '',
expression: $hash['expression'] ?? '',
status: $hash['status'] ?? 'active',
nextRun: (int)($hash['next_run'] ?? 0),
lastRun: (int)($hash['last_run'] ?? 0),
interval: $hash['interval'] ?? '',
createdAt: (int)($hash['created_at'] ?? 0),
);
}
/**
* 转为 Hash 存储数组
*/
public function toHash(): array
{
return [
'class' => $this->className,
'name' => $this->name,
'expression' => $this->expression,
'status' => $this->status,
'next_run' => $this->nextRun,
'last_run' => $this->lastRun,
'interval' => $this->interval,
'created_at' => $this->createdAt,
];
}
}
+23
View File
@@ -0,0 +1,23 @@
<?php
declare(strict_types=1);
namespace Kiri\Crontab;
/**
* 任务接口 — 所有定时任务必须实现此接口
*
* 任务类通过 TaskRegistry 注册后,由 CrontabScheduler 按调度表达式定时执行
* 任务类构造函数可接受 DI 注入的参数(独立模式下需要自行管理依赖)
*/
interface TaskInterface
{
/**
* 执行任务的核心逻辑
*
* @return void
* @throws \Throwable 异常会被调度器捕获记录,不影响后续任务调度
*/
public function handle(): void;
}
+125
View File
@@ -0,0 +1,125 @@
<?php
declare(strict_types=1);
namespace Kiri\Crontab;
/**
* 任务注册中心 — 静态注册表,管理所有定时任务配置
*
* 支持两种注册方式:
* 1. Scanner 驱动: #[Crontab] 注解在方法上,Scanner 自动调用 register()
* 2. 配置驱动: config/crontab.php 的 tasks 列表中声明
*
* 使用方式:
* TaskRegistry::register(['class' => ..., 'expression' => 'daily:03:00']);
* $all = TaskRegistry::all();
* $count = TaskRegistry::count();
*/
class TaskRegistry
{
/** @var array<string, TaskConfig> 已注册任务,key 为任务标识 */
private static array $tasks = [];
/**
* 注册单个任务
*
* @param array $config 任务配置数组,必须包含 class 和 expression 字段
* @return TaskConfig
* @throws \InvalidArgumentException
*/
public static function register(array $config): TaskConfig
{
if (empty($config['class'])) {
throw new \InvalidArgumentException('任务配置必须包含 class 字段');
}
if (empty($config['expression'])) {
throw new \InvalidArgumentException('任务配置必须包含 expression 字段');
}
$className = $config['class'];
if (!class_exists($className)) {
throw new \InvalidArgumentException("任务类不存在: {$className}");
}
if (!in_array(TaskInterface::class, class_implements($className), true)) {
throw new \InvalidArgumentException("{$className} 必须实现 TaskInterface 接口");
}
$taskKey = self::generateTaskKey($config['class']);
$taskConfig = TaskConfig::fromArray(array_merge($config, ['taskKey' => $taskKey]));
self::$tasks[$taskKey] = $taskConfig;
return $taskConfig;
}
/**
* 批量注册任务
*
* @param array<array> $configs 任务配置数组列表
* @return array<string, TaskConfig>
*/
public static function registerMany(array $configs): array
{
$results = [];
foreach ($configs as $config) {
$taskConfig = self::register($config);
$results[$taskConfig->taskKey] = $taskConfig;
}
return $results;
}
/**
* 获取所有已注册任务
*
* @return array<string, TaskConfig>
*/
public static function all(): array
{
return self::$tasks;
}
/**
* 获取指定任务配置
*/
public static function get(string $taskKey): ?TaskConfig
{
return self::$tasks[$taskKey] ?? null;
}
/**
* 获取所有活跃任务 (status=active)
*/
public static function getActiveTasks(): array
{
return array_filter(self::$tasks, fn(TaskConfig $config) => $config->status === 'active');
}
/**
* 任务总数
*/
public static function count(): int
{
return count(self::$tasks);
}
/**
* 清空注册表 (用于测试)
*/
public static function clear(): void
{
self::$tasks = [];
}
/**
* 根据类名生成任务唯一标识
* 将反斜杠替换为点号,转为小写可读 key
*/
private static function generateTaskKey(string $className): string
{
return strtolower(
str_replace('\\', '.', ltrim($className, '\\'))
);
}
}
+51
View File
@@ -0,0 +1,51 @@
<?php
declare(strict_types=1);
namespace Kiri\Crontab;
/**
* 向调度系统动态投递一个任务
*
* 任何业务代码都可以直接调用此函数,无需持有 CrontabScheduler 引用
* 适用于运行时突发投递场景(如用户触发、事件回调等)
*
* @param string $className 实现了 TaskInterface 的任务类
* @param string $expression 调度表达式 (every:1, every:5m, at:时间戳, daily:03:00 等)
* @param string $name 任务显示名称 (可选)
* @return string 返回 taskKey,可用于后续取消
* @throws \RuntimeException 调度器未运行时抛出
*
* 使用示例:
* // 每秒检查匹配结果
* $key = submitToCrontab(MatchCheckTask::class, 'every:1', '匹配检查 #123');
*
* // 1 小时后执行一次
* $key = submitToCrontab(DelayNotifyTask::class, 'at:' . (time() + 3600), '延迟通知');
*/
function submitToCrontab(string $className, string $expression, string $name = ''): string
{
$scheduler = CrontabScheduler::getInstance();
if ($scheduler === null) {
throw new \RuntimeException('调度器未运行,无法投递任务。请先启动 CrontabProcess');
}
return $scheduler->submit($className, $expression, $name);
}
/**
* 取消指定任务
*
* @param string $taskKey 由 submit() 或 submitToCrontab() 返回的任务标识
* @return bool 是否成功取消
*/
function cancelCrontabTask(string $taskKey): bool
{
$scheduler = CrontabScheduler::getInstance();
if ($scheduler === null) {
return false;
}
return $scheduler->cancelTask($taskKey);
}