From 94111ccba6e0ba26907ffdece9ac3bd05392c677 Mon Sep 17 00:00:00 2001 From: whwyy Date: Sun, 28 Jun 2026 17:06:12 +0800 Subject: [PATCH] ea --- .gitignore | 4 + DESIGN.md | 321 ++++++++++++++ README.md | 119 +++++ bin/crontab | 132 ++++++ composer.json | 30 ++ config/crontab.php | 54 +++ src/Annotate/Crontab.php | 224 ++++++++++ src/CronExpression.php | 244 +++++++++++ src/CrontabCommand.php | 153 +++++++ src/CrontabProcess.php | 198 +++++++++ src/CrontabProviders.php | 42 ++ src/CrontabScheduler.php | 683 +++++++++++++++++++++++++++++ src/Events/OnTaskBeforeExecute.php | 24 + src/Events/OnTaskExecuted.php | 28 ++ src/Events/OnTaskFailed.php | 30 ++ src/TaskConfig.php | 105 +++++ src/TaskInterface.php | 23 + src/TaskRegistry.php | 125 ++++++ src/functions.php | 51 +++ tests/CronExpressionTest.php | 199 +++++++++ 20 files changed, 2789 insertions(+) create mode 100644 .gitignore create mode 100644 DESIGN.md create mode 100644 bin/crontab create mode 100644 composer.json create mode 100644 config/crontab.php create mode 100644 src/Annotate/Crontab.php create mode 100644 src/CronExpression.php create mode 100644 src/CrontabCommand.php create mode 100644 src/CrontabProcess.php create mode 100644 src/CrontabProviders.php create mode 100644 src/CrontabScheduler.php create mode 100644 src/Events/OnTaskBeforeExecute.php create mode 100644 src/Events/OnTaskExecuted.php create mode 100644 src/Events/OnTaskFailed.php create mode 100644 src/TaskConfig.php create mode 100644 src/TaskInterface.php create mode 100644 src/TaskRegistry.php create mode 100644 src/functions.php create mode 100644 tests/CronExpressionTest.php diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..b181b33 --- /dev/null +++ b/.gitignore @@ -0,0 +1,4 @@ +/vendor/ +.gstack/ +*.pid +storage/ diff --git a/DESIGN.md b/DESIGN.md new file mode 100644 index 0000000..20e1bf6 --- /dev/null +++ b/DESIGN.md @@ -0,0 +1,321 @@ +# kiri-crontab 任务调度系统 架构设计文档 + +## 一、功能概述 + +基于 Redis + Swoole 的定时任务调度系统,支持: +- 定时执行:指定具体时间点执行任务 +- 间隔执行:每隔 N 秒/min/hour 重复执行 +- cron 表达式:标准 5 字段 cron 表达式调度 +- 一次性任务:执行后自动移除 +- 任务暂停/恢复:通过 Redis 标记控制任务启停 +- **注解驱动**: 使用 `#[Crontab]` 注解声明任务,支持自动扫描发现 + +## 二、核心组件 + +``` +┌─────────────────────────────────────────────────────────┐ +│ kiri-crontab │ +├─────────────────────────────────────────────────────────┤ +│ │ +│ ┌──────────────┐ ┌──────────────────┐ │ +│ │CrontabCommand │───▶│ CrontabScheduler │ │ +│ │ (控制台命令) │ │ (调度引擎) │ │ +│ └──────────────┘ └────────┬─────────┘ │ +│ │ │ +│ ┌─────────▼─────────┐ │ +│ │ TaskRegistry │ │ +│ │ (任务注册中心) │ │ +│ └─────────┬─────────┘ │ +│ │ │ +│ ┌─────────▼─────────┐ │ +│ │ TaskInterface │ │ +│ │ (任务接口) │ │ +│ └──────────────────┘ │ +│ │ │ +│ ┌────────────────────────────▼──────────────────────┐ │ +│ │ Redis │ │ +│ │ ┌──────────────┐ ┌──────────────────────────┐ │ │ +│ │ │ Sorted Set │ │ Hash (任务元数据) │ │ │ +│ │ │ (调度队列) │ │ crontab:task:{key} │ │ │ +│ │ │ crontab:queue │ └──────────────────────────┘ │ │ +│ │ └──────────────┘ │ │ +│ └────────────────────────────────────────────────────┘ │ +│ │ +└─────────────────────────────────────────────────────────┘ +``` + +## 三、Redis 数据结构设计 + +### 3.1 调度队列 (Sorted Set) + +``` +Key: crontab:queue +Type: ZSET +Score: 下次执行时间戳 (Unix timestamp) +Member: 任务标识符 (task key) +``` + +ZSET 按时间戳排序,调度器只需 `ZRANGEBYSCORE` 获取到期任务。 + +### 3.2 任务元数据 (Hash) + +``` +Key: crontab:task:{taskKey} +Type: Hash +Fields: + class - 任务处理类完整路径 (如 App\Task\CleanLogTask) + name - 任务显示名称 + expression - 调度表达式 (every:60 | cron:*\/5 * * * * | daily:03:00 | at:1234567890) + next_run - 下次执行时间戳 (秒) + last_run - 上次执行时间戳 (秒) + status - 状态: active / paused / disabled + interval - 执行间隔描述 (可读) + created_at - 创建时间戳 +``` + +### 3.3 任务执行锁 + +``` +Key: crontab:lock:task:{taskKey} +Type: String +Value: Swoole Worker ID + Timestamp +TTL: 任务超时时间 (默认 300s) +``` + +防止同一任务重复执行(前次未完成则跳过本次)。 + +### 3.4 调度器主锁 + +``` +Key: crontab:lock:master +Type: String +Value: Worker PID +TTL: 60s (定期续期) +``` + +防止多实例同时调度,用 SET NX EX + Lua 脚本实现。 + +### 3.5 任务执行状态集合 + +``` +Key: crontab:running +Type: SET +Member: 当前正在执行的任务 key 列表 +``` + +用于监控哪些任务正在运行中。 + +## 四、调度表达式 + +| 格式 | 示例 | 含义 | +|------|------|------| +| `every:{秒}` | `every:60` | 每 60 秒执行 | +| `every:{秒}s` | `every:30s` | 每 30 秒执行 | +| `every:{分}m` | `every:5m` | 每 5 分钟执行 | +| `every:{时}h` | `every:1h` | 每 1 小时执行 | +| `daily:{HH:MM}` | `daily:03:00` | 每天凌晨 3 点 | +| `hourly:{MM}` | `hourly:30` | 每小时第 30 分 | +| `cron:{表达式}` | `cron:*\/5 * * * *` | 标准 5 字段 cron | +| `at:{时间戳}` | `at:1719590400` | 指定时间戳一次性 | + +## 五、调度流程 + +``` +┌──────────────────────────────────────┐ +│ 调度器启动 (process) │ +└────────────────┬─────────────────────┘ + │ + ▼ +┌──────────────────────────────────────┐ +│ 1. 扫描 TaskRegistry 注册的所有任务 │ +│ 将未注册到 Redis 的任务写入 Hash │ +│ 将任务加入 ZSET 调度队列 │ +└────────────────┬─────────────────────┘ + │ + ▼ + ┌──────────────────────┐ + │ 主循环 (每 1 秒 tick) │◀────────────────────┐ + └──────────┬───────────┘ │ + │ │ + ▼ │ + ┌──────────────────────┐ │ + │ 2. 获取主锁 │ │ + │ SET NX EX 60s │── 失败 ──────────────┘ + └──────────┬───────────┘ + │ 成功 + ▼ + ┌──────────────────────┐ + │ 3. ZRANGEBYSCORE │ + │ score <= now │── 空 ───────────────┐ + │ 获取到期任务列表 │ │ + └──────────┬───────────┘ │ + │ 有任务 │ + ▼ │ + ┌──────────────────────┐ │ + │ 4. 遍历到期任务 │ │ + │ - 获取任务锁 │ │ + │ - 执行任务 handle() │ │ + │ - 记录日志/更新状态 │◀─────────────────────┘ + │ - 计算下次执行时间 │ + │ - 更新 ZSET 分数 │ + └──────────┬───────────┘ + │ + ▼ + ┌──────────────────────┐ + │ 5. sleep(1) 等待下个 │ + │ tick │ + └──────────────────────┘ +``` + +## 六、任务生命周期 + +``` + [注册] ──▶ [待调度] ──▶ [获取锁] ──▶ [执行中] ──▶ [完成] + ▲ │ │ + │ │ 锁获取失败 │ + │ ▼ │ + │ [跳过本轮] │ + │ │ + └───────────────────────────────────────┘ + (等待下次调度) + + 状态流转: + active ──▶ paused ──▶ active (暂停/恢复) + active ──▶ disabled (禁用,从 ZSET 移除) + 一次性任务执行完成后: 从 ZSET 和 Redis 中移除 +``` + +## 七、与 kiri-core 集成方式 + +### 7.1 作为 Composer 包引入 +```json +{ + "require": { + "game-worker/kiri-crontab": "^v1.0" + } +} +``` + +### 7.2 通过 Provider 注册到框架 +```php +// config/servers.php 中添加 +'process' => [ + \Kiri\Crontab\CrontabProcess::class, +], +``` + +### 7.3 独立运行模式 +```bash +php bin/crontab start +php bin/crontab stop +php bin/crontab restart +``` + +### 7.4 注解驱动注册方式 +```php +use Kiri\Crontab\Annotate\Crontab; +use Kiri\Crontab\TaskInterface; + +#[Crontab(name: '清理日志', expression: 'daily:03:00')] +class CleanLogTask implements TaskInterface +{ + public function handle(): void + { + // 清理逻辑 + } +} +``` + +在配置中指定扫描路径: +```php +// config/crontab.php +'scan_paths' => [ + 'app/Task', + 'app/Crontab', +], +``` + +## 八、注解扫描流程 + +``` +┌──────────────────────────────────────┐ +│ CrontabScanner::scan($directory) │ +└────────────────┬─────────────────────┘ + │ + ▼ +┌──────────────────────────────────────┐ +│ 1. 递归遍历目录下所有 PHP 文件 │ +│ 跳过 vendor/tests/cache/storage │ +└────────────────┬─────────────────────┘ + │ + ▼ +┌──────────────────────────────────────┐ +│ 2. require_once 加载文件 │ +│ get_declared_classes() 获取新类 │ +└────────────────┬─────────────────────┘ + │ + ▼ +┌──────────────────────────────────────┐ +│ 3. 检查类是否 implements TaskInterface│ +│ ──否──▶ 跳过 │ +└────────────────┬─────────────────────┘ + │ 是 + ▼ +┌──────────────────────────────────────┐ +│ 4. ReflectionClass::getAttributes( │ +│ Crontab::class) 读取注解 │ +│ ──无──▶ 跳过 │ +└────────────────┬─────────────────────┘ + │ 有 + ▼ +┌──────────────────────────────────────┐ +│ 5. $registry->register([ │ +│ 'class' => $className, │ +│ 'name' => $crontab->name, │ +│ 'expression' => $crontab->expr, │ +│ 'status' => $crontab->status, │ +│ ]); │ +└──────────────────────────────────────┘ +``` + +## 九、依赖关系 + +``` +kiri-crontab +├── PHP >= 8.5 +├── ext-swoole (协程/进程) +├── ext-redis (Redis 客户端) +├── psr/log (PSR-3 日志) +└── symfony/console (可选,如集成 kiri-core 则复用项目已有的) +``` + +## 十、目录结构 + +``` +kiri-crontab/ +├── composer.json +├── DESIGN.md # 本文档 +├── README.md +├── bin/ +│ └── crontab # 独立运行入口脚本 +├── config/ +│ └── crontab.php # 默认配置 +├── src/ +│ ├── TaskInterface.php # 任务接口 +│ ├── TaskConfig.php # 任务配置值对象 +│ ├── TaskRegistry.php # 任务注册中心 +│ ├── CrontabScheduler.php # 核心调度引擎 +│ ├── CrontabProcess.php # Swoole 进程适配器 +│ ├── CrontabCommand.php # 控制台命令 +│ ├── CrontabScanner.php # 注解任务扫描器 +│ ├── CrontabProviders.php # kiri-core Provider 集成 +│ ├── CronExpression.php # Cron 表达式解析器 +│ ├── Annotate/ +│ │ └── Crontab.php # #[Crontab] 注解类 +│ └── Events/ +│ ├── OnTaskBeforeExecute.php +│ ├── OnTaskExecuted.php +│ └── OnTaskFailed.php +└── tests/ + └── CronExpressionTest.php +``` diff --git a/README.md b/README.md index e69de29..829f05a 100644 --- a/README.md +++ b/README.md @@ -0,0 +1,119 @@ +# kiri-crontab + +基于 Redis + Swoole 的 PHP 定时任务调度系统。 + +## 功能特性 + +- **闹钟模式**: 像设置闹钟一样声明任务 `hour: 3, minute: 0, loop: true` +- **间隔执行**: `tick: 30` 每30秒、`tickMinute: 5` 每5分钟、`tickHour: 1` 每1小时 +- **一次性任务**: `year: 2026, month: 12, day: 22, hour: 11` 指定时刻执行一次 +- **Cron 兜底**: 复杂场景用 `cron: '*/5 * * * *'` +- **动态投递**: 运行时通过 `submitToCrontab()` 随时提交、`cancelCrontabTask()` 取消 +- **类注解**: `#[Crontab]` 在类上声明调度参数,CrontabProcess 启动时自动发现 +- **协程并发**: 到期任务通过 Swoole 协程并发执行 +- **分布式锁**: Redis 主锁 + 任务锁防止重复执行 + +## 快速开始 + +### 安装 + +```bash +composer require game-worker/kiri-crontab +``` + +### 注解模式 + +```php +isMatched()) { + // 执行完后自动移除,不再调度 + CrontabScheduler::getInstance()?->cancelCurrentTask(); + return; + } + // 未匹配,继续检查 + } +} + +// 业务代码中动态投递 +$taskKey = submitToCrontab(MatchCheckTask::class, 'every:1', '匹配检查 #123'); + +// 条件满足时也可以外部取消 +cancelCrontabTask($taskKey); +``` + +### 配置文件 + +```php +// config/crontab.php +return [ + 'redis' => ['host' => '127.0.0.1', 'port' => 6379], + 'scheduler' => ['tick_interval' => 1], + 'tasks' => [ + ['class' => App\Task\CleanLogTask::class, 'name' => '清理日志', 'expression' => 'daily:03:00'], + ], +]; +``` + +### 启动 + +```bash +php bin/crontab start / stop / restart / status +``` + +### 集成到 kiri-core + +```php +// config/servers.php +'process' => [\Kiri\Crontab\CrontabProcess::class], +``` + +## 调度参数一览 + +| 参数 | 示例 | 含义 | +|------|------|------| +| `tick` | `tick: 30` | 每 30 秒循环 | +| `tickMinute` | `tickMinute: 5` | 每 5 分钟循环 | +| `tickHour` | `tickHour: 1` | 每 1 小时循环 | +| `hour` `minute` `second` | `hour: 3, minute: 0, loop: true` | 每天 03:00 | +| `minute` | `minute: 30, loop: true` | 每小时 :30 | +| `year` `month` `day` ... | `year: 2026, month: 12, day: 22` | 指定时刻一次性 | +| `cron` | `cron: '*\/5 * * * *'` | cron 表达式 | + +## 架构 + +详见 [DESIGN.md](./DESIGN.md) diff --git a/bin/crontab b/bin/crontab new file mode 100644 index 0000000..eef96bd --- /dev/null +++ b/bin/crontab @@ -0,0 +1,132 @@ +#!/usr/bin/env php +getMessage()}" . PHP_EOL); + } +} + +// 2. 发现注解任务 (检查类上的 #[Crontab] 注解) +foreach (get_declared_classes() as $className) { + try { + $reflect = new ReflectionClass($className); + if ($reflect->isAbstract()) continue; + if (!in_array(Kiri\Crontab\TaskInterface::class, class_implements($className), true)) continue; + + $attributes = $reflect->getAttributes(Kiri\Crontab\Annotate\Crontab::class); + if (empty($attributes)) continue; + + $instance = $attributes[0]->newInstance(); + $expr = $instance->buildExpression(); + if ($expr === '') continue; + + Kiri\Crontab\TaskRegistry::register([ + 'class' => $className, + 'name' => $instance->name !== '' ? $instance->name : $className, + 'expression' => $expr, + 'status' => $instance->status, + ]); + } catch (\Throwable) {} +} + +echo "[Crontab] 任务总数: " . TaskRegistry::count() . PHP_EOL; + +// PID 文件和日志文件 +$pidFile = $config['scheduler']['pid_file'] ?? (getcwd() . '/storage/crontab.pid'); +$logFile = $config['scheduler']['log_file'] ?? ''; + +// 创建命令实例 +$command = new CrontabCommand($pidFile, $logFile); + +// 路由到对应操作 +try { + match ($action) { + 'start' => $command->start($config), + 'stop' => $command->stop(), + 'restart' => $command->restart($config), + 'status' => $command->status(), + default => fwrite(STDERR, "未知操作: {$action} (支持: start|stop|restart|status)" . PHP_EOL), + }; +} catch (\Throwable $throwable) { + fwrite(STDERR, "执行失败: {$throwable->getMessage()}" . PHP_EOL); + fwrite(STDERR, "{$throwable->getTraceAsString()}" . PHP_EOL); + exit(1); +} diff --git a/composer.json b/composer.json new file mode 100644 index 0000000..6389c7b --- /dev/null +++ b/composer.json @@ -0,0 +1,30 @@ +{ + "name": "game-worker/kiri-crontab", + "description": "基于 Redis + Swoole 的定时任务调度系统,支持定时执行、间隔重复、cron 表达式", + "type": "library", + "license": "MIT", + "authors": [ + { + "name": "XiangLin", + "email": "as2252258@163.com" + } + ], + "require": { + "php": ">=8.5", + "ext-swoole": "*", + "ext-redis": "*", + "psr/log": "^1.0" + }, + "suggest": { + "symfony/console": "如需集成 kiri-core 或使用命令行管理,建议安装 ^v8.0", + "game-worker/kiri-core": "如需集成到 kiri-core 框架,建议安装 kiri-core" + }, + "autoload": { + "psr-4": { + "Kiri\\Crontab\\": "src/" + } + }, + "bin": [ + "bin/crontab" + ] +} diff --git a/config/crontab.php b/config/crontab.php new file mode 100644 index 0000000..1a715af --- /dev/null +++ b/config/crontab.php @@ -0,0 +1,54 @@ + [ + 'host' => env('CRONTAB_REDIS_HOST', '127.0.0.1'), + 'port' => (int)env('CRONTAB_REDIS_PORT', 6379), + 'auth' => env('CRONTAB_REDIS_AUTH', ''), + 'prefix' => env('CRONTAB_REDIS_PREFIX', ''), + 'databases' => (int)env('CRONTAB_REDIS_DB', 0), + 'timeout' => (int)env('CRONTAB_REDIS_TIMEOUT', 30), + ], + + // 调度器配置 + 'scheduler' => [ + // 调度器 tick 间隔 (秒) + 'tick_interval' => (int)env('CRONTAB_TICK_INTERVAL', 1), + // 任务执行超时时间 (秒) + 'task_timeout' => (int)env('CRONTAB_TASK_TIMEOUT', 300), + // 主锁 TTL (秒) + 'lock_ttl' => (int)env('CRONTAB_LOCK_TTL', 60), + // 主锁续期间隔 (秒) + 'lock_renew_interval' => (int)env('CRONTAB_LOCK_RENEW_INTERVAL', 15), + // 是否启用协程并发执行多个到期任务 + 'concurrent_tasks' => (bool)env('CRONTAB_CONCURRENT', true), + // 并发任务最大数量 + 'max_concurrent' => (int)env('CRONTAB_MAX_CONCURRENT', 10), + // PID 文件路径 (独立模式) + 'pid_file' => env('CRONTAB_PID_FILE', ''), + // 日志文件路径 (独立模式) + 'log_file' => env('CRONTAB_LOG_FILE', ''), + ], + + // 注册的任务列表 (配置模式,expression 字符串格式) + // 注解模式通过 #[Crontab] 在 handle() 方法上声明,kiri-core Scanner 自动发现 + // 表达式: every:60 | every:5m | every:1h | daily:03:00 | hourly:30 | cron:*\/5 * * * * | at:时间戳 + // 每个任务需实现 TaskInterface 接口 + 'tasks' => [ + // 示例: + // [ + // 'class' => App\Task\CleanLogTask::class, + // 'name' => '清理日志', + // 'expression' => 'daily:03:00', // 每天 03:00 + // ], + // [ + // 'class' => App\Task\HeartbeatTask::class, + // 'name' => '心跳检测', + // 'expression' => 'every:60', // 每 60 秒 + // ], + ], +]; diff --git a/src/Annotate/Crontab.php b/src/Annotate/Crontab.php new file mode 100644 index 0000000..aa670ba --- /dev/null +++ b/src/Annotate/Crontab.php @@ -0,0 +1,224 @@ +expression !== null && $this->expression !== '') { + return $this->expression; + } + + if ($this->cron !== null && $this->cron !== '') { + return 'cron:' . $this->cron; + } + + $everyExpr = $this->buildTickExpression(); + if ($everyExpr !== '') { + return $everyExpr; + } + + if ($this->hasAnyTimeField()) { + return $this->buildClockExpression(); + } + + if ($this->every !== null) { + return is_int($this->every) ? 'every:' . $this->every : 'every:' . $this->every; + } + + if ($this->dailyAt !== null) { + return 'daily:' . $this->dailyAt; + } + + if ($this->hourlyAt !== null) { + return 'hourly:' . $this->hourlyAt; + } + + if ($this->at !== null) { + return 'at:' . $this->at; + } + + return ''; + } + + /** + * 构建 tick 间隔表达式 + */ + private function buildTickExpression(): string + { + if ($this->tick !== null) { + return 'every:' . $this->tick; + } + if ($this->tickMinute !== null) { + return 'every:' . $this->tickMinute . 'm'; + } + if ($this->tickHour !== null) { + return 'every:' . $this->tickHour . 'h'; + } + return ''; + } + + /** + * 构建闹钟模式表达式 + */ + private function buildClockExpression(): string + { + $h = $this->hour ?? 0; + $m = $this->minute ?? 0; + $s = $this->second ?? 0; + + if ($this->loop) { + if ($this->isDailyPattern()) { + return sprintf('daily:%02d:%02d:%02d', $h, $m, $s); + } + if ($this->isHourlyPattern()) { + return sprintf('hourly:%02d', $m); + } + return $this->buildCronFromClock(); + } + + return $this->buildAtFromClock(); + } + + /** + * 是否匹配 daily 模式 (只有 hour/minute/second,无 year/month/day) + */ + private function isDailyPattern(): bool + { + return $this->year === null + && $this->month === null + && $this->day === null + && $this->hour !== null; + } + + /** + * 是否匹配 hourly 模式 (只有 minute/second,无 year/month/day/hour) + */ + private function isHourlyPattern(): bool + { + return $this->year === null + && $this->month === null + && $this->day === null + && $this->hour === null + && $this->minute !== null; + } + + /** + * 将时间字段转为标准 5 字段 cron 表达式 + */ + private function buildCronFromClock(): string + { + $min = $this->minute !== null ? (string)$this->minute : '*'; + $h = $this->hour !== null ? (string)$this->hour : '*'; + $d = $this->day !== null ? (string)$this->day : '*'; + $mon = $this->month !== null ? (string)$this->month : '*'; + + return "cron:{$min} {$h} {$d} {$mon} *"; + } + + /** + * 计算一次性执行的 Unix 时间戳 + */ + private function buildAtFromClock(): string + { + $now = time(); + $timeInfo = getdate($now); + + $y = $this->year ?? (int)$timeInfo['year']; + $mon = $this->month ?? (int)$timeInfo['mon']; + $d = $this->day ?? (int)$timeInfo['mday']; + $h = $this->hour ?? (int)$timeInfo['hours']; + $m = $this->minute ?? (int)$timeInfo['minutes']; + $s = $this->second ?? (int)$timeInfo['seconds']; + + $timestamp = mktime($h, $m, $s, $mon, $d, $y); + + if ($timestamp <= $now) { + if ($this->year === null && $this->month === null && $this->day === null) { + $timestamp = mktime($h, $m, $s, $mon, $d + 1, $y); + } elseif ($this->day === null) { + $timestamp = mktime($h, $m, $s, $mon + 1, $d, $y); + } + } + + return 'at:' . $timestamp; + } + + /** + * 检查是否设置了任意时间字段 + */ + private function hasAnyTimeField(): bool + { + return $this->year !== null + || $this->month !== null + || $this->day !== null + || $this->hour !== null + || $this->minute !== null + || $this->second !== null; + } + +} diff --git a/src/CronExpression.php b/src/CronExpression.php new file mode 100644 index 0000000..9b1bd20 --- /dev/null +++ b/src/CronExpression.php @@ -0,0 +1,244 @@ + $this->parseEvery($expression, $afterTimestamp), + str_starts_with($expression, 'daily:') => $this->parseDaily($expression, $afterTimestamp), + str_starts_with($expression, 'hourly:') => $this->parseHourly($expression, $afterTimestamp), + str_starts_with($expression, 'at:') => $this->parseAt($expression, $afterTimestamp), + str_starts_with($expression, 'cron:') => $this->parseCron(substr($expression, 5), $afterTimestamp), + default => $this->parseCron($expression, $afterTimestamp), + }; + } + + /** + * 获取表达式的可读间隔描述 + */ + public function getIntervalDescription(string $expression): string + { + return match (true) { + str_starts_with($expression, 'every:') => $this->describeEvery($expression), + str_starts_with($expression, 'daily:') => '每天 ' . substr($expression, 6), + str_starts_with($expression, 'hourly:') => '每小时第 ' . substr($expression, 7) . ' 分', + str_starts_with($expression, 'at:') => '一次性任务', + str_starts_with($expression, 'cron:') => 'Cron: ' . substr($expression, 5), + default => 'Cron: ' . $expression, + }; + } + + /** + * 是否为一次性任务 + */ + public function isOneShot(string $expression): bool + { + return str_starts_with($expression, 'at:'); + } + + /** + * 解析 every:{N}[s|m|h] 表达式 + */ + private function parseEvery(string $expression, int $afterTimestamp): int + { + $value = substr($expression, 6); + + $seconds = match (true) { + str_ends_with($value, 's') => (int)$value, + str_ends_with($value, 'm') => (int)$value * 60, + str_ends_with($value, 'h') => (int)$value * 3600, + default => (int)$value, + }; + + // 从上次调度时间累加,避免时间漂移和积压 + // 如果 afterTimestamp 距离上次调度超过 n 个周期,只加一个周期(按第一次调度时间对齐) + return $afterTimestamp + $seconds; + } + + /** + * 描述 every 表达式 + */ + private function describeEvery(string $expression): string + { + $value = substr($expression, 6); + return match (true) { + str_ends_with($value, 's') => '每 ' . ((int)$value) . ' 秒', + str_ends_with($value, 'm') => '每 ' . ((int)$value) . ' 分钟', + str_ends_with($value, 'h') => '每 ' . ((int)$value) . ' 小时', + default => '每 ' . ((int)$value) . ' 秒', + }; + } + + /** + * 解析 daily:{HH:MM} 表达式 + */ + private function parseDaily(string $expression, int $afterTimestamp): int + { + $timeStr = substr($expression, 6); + $parts = explode(':', $timeStr); + $hour = (int)($parts[0] ?? 0); + $minute = (int)($parts[1] ?? 0); + $second = (int)($parts[2] ?? 0); + + $currentDate = getdate($afterTimestamp); + $targetTime = mktime($hour, $minute, $second, $currentDate['mon'], $currentDate['mday'], $currentDate['year']); + + if ($targetTime <= $afterTimestamp) { + // 今天的时间已过,推到明天 + $targetTime = $targetTime + 86400; + } + + return $targetTime; + } + + /** + * 解析 hourly:{MM} 表达式 + */ + private function parseHourly(string $expression, int $afterTimestamp): int + { + $minute = (int)substr($expression, 7); + $currentDate = getdate($afterTimestamp); + $targetTime = mktime($currentDate['hours'], $minute, 0, $currentDate['mon'], $currentDate['mday'], $currentDate['year']); + + if ($targetTime <= $afterTimestamp) { + $targetTime = $targetTime + 3600; + } + + return $targetTime; + } + + /** + * 解析 at:{时间戳} 表达式 — 一次性任务 + */ + private function parseAt(string $expression, int $afterTimestamp): int + { + $timestamp = (int)substr($expression, 3); + if ($timestamp <= $afterTimestamp) { + // 已过期,返回 0 表示不再调度 + return 0; + } + return $timestamp; + } + + /** + * 解析标准 5 字段 cron 表达式: 分 时 日 月 周 + */ + private function parseCron(string $cronExpression, int $afterTimestamp): int + { + $fields = preg_split('/\s+/', trim($cronExpression)); + if (count($fields) !== 5) { + // 格式无效,返回 0 + return 0; + } + + $minute = $fields[0]; + $hour = $fields[1]; + $day = $fields[2]; + $month = $fields[3]; + $weekday = $fields[4]; + + // 从 afterTimestamp 下一秒开始逐分钟搜索,最多搜索 2 年 + $searchStart = $afterTimestamp + 60; + $searchEnd = $afterTimestamp + 365 * 2 * 86400; + + for ($ts = $searchStart; $ts <= $searchEnd; $ts += 60) { + $t = getdate($ts); + $matched = true; + + $matched = $matched && $this->matchCronField($minute, $t['minutes'], 0, 59); + $matched = $matched && $this->matchCronField($hour, $t['hours'], 0, 23); + $matched = $matched && $this->matchCronField($day, $t['mday'], 1, 31); + $matched = $matched && $this->matchCronField($month, $t['mon'], 1, 12); + $matched = $matched && $this->matchCronField($weekday, $t['wday'], 0, 6); + + if ($matched) { + return $ts; + } + } + + return 0; + } + + /** + * 匹配单个 cron 字段值,支持 *、*\/N、N、N,M、N-M + * + * @param string $fieldValue cron 字段原始值 + * @param int $current 当前时间单位的值 + * @param int $min 该字段的最小值 + * @param int $max 该字段的最大值 + */ + private function matchCronField(string $fieldValue, int $current, int $min, int $max): bool + { + // * 匹配所有值 + if ($fieldValue === '*') { + return true; + } + + // *\/N 每隔 N 步进 + if (str_starts_with($fieldValue, '*/')) { + $step = (int)substr($fieldValue, 2); + if ($step <= 0) { + return false; + } + return ($current - $min) % $step === 0; + } + + // 逗号分隔的枚举值 + if (str_contains($fieldValue, ',')) { + $values = explode(',', $fieldValue); + foreach ($values as $val) { + if ($this->matchCronField(trim($val), $current, $min, $max)) { + return true; + } + } + return false; + } + + // N-M 范围值 + if (str_contains($fieldValue, '-')) { + $parts = explode('-', $fieldValue); + $rangeStart = (int)$parts[0]; + $rangeEnd = (int)$parts[1]; + return $current >= $rangeStart && $current <= $rangeEnd; + } + + // 精确值 + return (int)$fieldValue === $current; + } + +} diff --git a/src/CrontabCommand.php b/src/CrontabCommand.php new file mode 100644 index 0000000..cde32b8 --- /dev/null +++ b/src/CrontabCommand.php @@ -0,0 +1,153 @@ +pidFile = $pidFile !== '' ? $pidFile : sys_get_temp_dir() . '/crontab.pid'; + $this->logFile = $logFile; + } + + /** + * 启动调度器进程 + */ + public function start(array $config): void + { + if ($this->isRunning()) { + echo "[Crontab] 调度器已在运行中, PID: " . $this->getPid() . PHP_EOL; + return; + } + + $count = TaskRegistry::count(); + if ($count === 0) { + echo "[Crontab] 警告: 没有注册任何任务" . PHP_EOL; + } + + echo "[Crontab] 启动调度器..." . PHP_EOL; + echo "[Crontab] 已注册 {$count} 个任务" . PHP_EOL; + + $process = new \Swoole\Process(function (\Swoole\Process $worker) use ($config) { + file_put_contents($this->pidFile, (string)$worker->pid); + + $crontabProcess = new CrontabProcess($config); + $crontabProcess->run(); + }, false, 0, true); + + $pid = $process->start(); + + echo "[Crontab] 调度器已启动, PID: {$pid}" . PHP_EOL; + echo "[Crontab] PID 文件: {$this->pidFile}" . PHP_EOL; + + \Swoole\Process::wait(); + } + + /** + * 停止调度器进程 + */ + public function stop(): void + { + if (!$this->isRunning()) { + echo "[Crontab] 调度器未运行" . PHP_EOL; + return; + } + + $pid = $this->getPid(); + + if ($pid > 0 && \Swoole\Process::kill($pid, 0)) { + \Swoole\Process::kill($pid, SIGTERM); + echo "[Crontab] 已发送停止信号, PID: {$pid}" . PHP_EOL; + + $timeout = 10; + while ($timeout > 0 && \Swoole\Process::kill($pid, 0)) { + usleep(200000); + $timeout--; + } + + if (\Swoole\Process::kill($pid, 0)) { + \Swoole\Process::kill($pid, SIGKILL); + echo "[Crontab] 进程未响应, 已强制终止" . PHP_EOL; + } + } + + if (file_exists($this->pidFile)) { + unlink($this->pidFile); + } + + echo "[Crontab] 调度器已停止" . PHP_EOL; + } + + /** + * 重启调度器进程 + */ + public function restart(array $config): void + { + echo "[Crontab] 重启调度器..." . PHP_EOL; + $this->stop(); + usleep(500000); + $this->start($config); + } + + /** + * 查看调度器状态 + */ + public function status(): void + { + if ($this->isRunning()) { + $pid = $this->getPid(); + echo "[Crontab] 状态: 运行中" . PHP_EOL; + echo "[Crontab] PID: {$pid}" . PHP_EOL; + } else { + echo "[Crontab] 状态: 未运行" . PHP_EOL; + } + } + + /** + * 检查调度器是否在运行 + */ + private function isRunning(): bool + { + $pid = $this->getPid(); + if ($pid <= 0) { + return false; + } + return \Swoole\Process::kill($pid, 0); + } + + /** + * 从 PID 文件读取进程 ID + */ + private function getPid(): int + { + if (!file_exists($this->pidFile)) { + return 0; + } + $pid = (int)file_get_contents($this->pidFile); + return $pid > 0 ? $pid : 0; + } + +} diff --git a/src/CrontabProcess.php b/src/CrontabProcess.php new file mode 100644 index 0000000..65900f1 --- /dev/null +++ b/src/CrontabProcess.php @@ -0,0 +1,198 @@ +run(); + * + * kiri-core 集成模式: + * 在 config/servers.php 的 process 中添加 CrontabProcess::class 即可 + */ +class CrontabProcess +{ + + private ?CrontabScheduler $scheduler = null; + + /** + * @param array $config 配置数组 (crontab.php 的完整内容) + * @param LoggerInterface|null $logger 日志记录器 + */ + public function __construct( + private array $config, + private ?LoggerInterface $logger = null, + ) { + if ($this->logger === null) { + $this->logger = new NullLogger(); + } + } + + /** + * 扫描注解任务并启动进程 (独立模式) + * + * @throws \Throwable + */ + public function run(): void + { + $this->logger->info('[CrontabProcess] 进程启动中...'); + + $this->discoverAnnotationTasks(); + + Coroutine::run(function () { + $this->startScheduler(); + }); + + $this->logger->info('[CrontabProcess] 进程已退出'); + } + + /** + * 在 kiri-core 自定义进程中启动调度器 + * 由框架的 AbstractProcess 生命周期调用 + * + * @throws \Throwable + */ + public function boot(): void + { + $this->logger->info('[CrontabProcess] 在 kiri-core 进程内启动...'); + + $this->discoverAnnotationTasks(); + + $this->startScheduler(); + } + + /** + * 获取当前调度器实例 + */ + public function getScheduler(): ?CrontabScheduler + { + return $this->scheduler; + } + + /** + * 从已声明的类中发现 #[Crontab] 类注解并注册到 TaskRegistry + * CrontabProcess 启动时自动调用,检查所有已加载的类 + */ + private function discoverAnnotationTasks(): void + { + $beforeCount = TaskRegistry::count(); + + foreach (get_declared_classes() as $className) { + if (!in_array(TaskInterface::class, class_implements($className), true)) { + continue; + } + + try { + $reflect = new \ReflectionClass($className); + if ($reflect->isAbstract()) { + continue; + } + + // 读取类上的 #[Crontab] 注解 + $attributes = $reflect->getAttributes(Annotate\Crontab::class); + if (empty($attributes)) { + continue; + } + + /** @var Annotate\Crontab $instance */ + $instance = $attributes[0]->newInstance(); + + $scheduleExpression = $instance->buildExpression(); + if ($scheduleExpression === '') { + continue; + } + + TaskRegistry::register([ + 'class' => $className, + 'name' => $instance->name !== '' ? $instance->name : $className, + 'expression' => $scheduleExpression, + 'status' => $instance->status, + ]); + } catch (\Throwable) { + // 跳过无法反射或注册失败的类 + } + } + + $afterCount = TaskRegistry::count(); + if ($afterCount > $beforeCount) { + $this->logger->info("[CrontabProcess] 发现注解任务 " . ($afterCount - $beforeCount) . " 个"); + } + } + + /** + * 初始化并启动调度器 + */ + private function startScheduler(): void + { + $redis = $this->createRedisConnection(); + $cronExpression = new CronExpression(); + + $schedulerConfig = $this->config['scheduler'] ?? []; + + $this->scheduler = new CrontabScheduler( + redis: $redis, + cronExpression: $cronExpression, + logger: $this->logger, + tickInterval: (int)($schedulerConfig['tick_interval'] ?? 1), + taskTimeout: (int)($schedulerConfig['task_timeout'] ?? 300), + lockTtl: (int)($schedulerConfig['lock_ttl'] ?? 60), + lockRenewInterval: (int)($schedulerConfig['lock_renew_interval'] ?? 15), + concurrentTasks: (bool)($schedulerConfig['concurrent_tasks'] ?? true), + maxConcurrent: (int)($schedulerConfig['max_concurrent'] ?? 10), + ); + + $this->scheduler->start(); + } + + /** + * 创建 Redis 连接 + * 在独立模式下直接创建连接,不通过 kiri-core 的连接池 + */ + private function createRedisConnection(): \Redis + { + $redisConfig = $this->config['redis'] ?? []; + + $host = $redisConfig['host'] ?? '127.0.0.1'; + $port = (int)($redisConfig['port'] ?? 6379); + $auth = $redisConfig['auth'] ?? ''; + $databases = (int)($redisConfig['databases'] ?? 0); + $timeout = (int)($redisConfig['timeout'] ?? 30); + $prefix = $redisConfig['prefix'] ?? ''; + + $redis = new \Redis(); + if (!$redis->connect($host, $port, $timeout)) { + throw new \RuntimeException("Redis 连接失败: {$host}:{$port}"); + } + + if (!empty($auth) && !$redis->auth($auth)) { + throw new \RuntimeException("Redis 认证失败: {$redis->getLastError()}"); + } + + $redis->select($databases); + + if (!empty($prefix)) { + $redis->setOption(\Redis::OPT_PREFIX, $prefix); + } + + $this->logger->info("[CrontabProcess] Redis 已连接: {$host}:{$port} DB:{$databases}"); + + return $redis; + } + +} diff --git a/src/CrontabProviders.php b/src/CrontabProviders.php new file mode 100644 index 0000000..9e2bad5 --- /dev/null +++ b/src/CrontabProviders.php @@ -0,0 +1,42 @@ + [ + * \Kiri\Crontab\CrontabProcess::class, + * ], + * ``` + * + * 在应用 Kernel 的 getCommands() 中添加: + * ```php + * public function getCommands(): array + * { + * return [ + * \Kiri\Crontab\CrontabCommand::class, + * ]; + * } + * ``` + */ +class CrontabProviders extends Providers +{ + + /** + * 注册 CrontabCommand 到 Console Application + */ + public function onImport(): void + { + $command = $this->container->get(CrontabCommand::class); + $console = $this->container->get(Application::class); + $console->addCommand($command); + } + +} diff --git a/src/CrontabScheduler.php b/src/CrontabScheduler.php new file mode 100644 index 0000000..126a3a2 --- /dev/null +++ b/src/CrontabScheduler.php @@ -0,0 +1,683 @@ +running = true; + + $this->logger->info('[CrontabScheduler] 调度器启动,注册 ' . TaskRegistry::count() . ' 个任务'); + + $this->syncTasks(); + + $nextTickTime = $this->calculateNextTickTime(); + + while ($this->running) { + $now = microtime(true); + $sleepSeconds = max(0, $nextTickTime - $now); + + if ($sleepSeconds > 0) { + usleep((int)($sleepSeconds * 1000000)); + } + + $nextTickTime = $this->calculateNextTickTime(); + + if (!$this->running) { + break; + } + + try { + $this->tick(); + } catch (\Throwable $throwable) { + $this->logger->error('[CrontabScheduler] tick 异常: ' . $throwable->getMessage()); + } + } + + self::$instance = null; + $this->logger->info('[CrontabScheduler] 调度器已停止'); + } + + /** + * 停止调度器 + */ + public function stop(): void + { + $this->running = false; + } + + /** + * 是否运行中 + */ + public function isRunning(): bool + { + return $this->running; + } + + // ────────────────────────────────────────────── + // 动态任务投递 API + // ────────────────────────────────────────────── + + /** + * 动态投递任务 — 运行时向调度系统提交一个即时任务 + * + * @param string $className 实现了 TaskInterface 的任务类 + * @param string $expression 调度表达式 (every:1, at:时间戳, daily:03:00 等) + * @param string $name 任务显示名称 (可选) + * @return string 返回生成的 taskKey,可用于后续 cancel + * + * 使用示例: + * $scheduler = CrontabScheduler::getInstance(); + * $taskKey = $scheduler->submit(MyTask::class, 'every:1', '匹配检查'); + * // 任务每秒执行,匹配成功后在 handle() 中调用 cancel 停止 + */ + public function submit(string $className, string $expression, string $name = ''): string + { + if (!class_exists($className)) { + throw new \InvalidArgumentException("任务类不存在: {$className}"); + } + if (!in_array(TaskInterface::class, class_implements($className), true)) { + throw new \InvalidArgumentException("{$className} 必须实现 TaskInterface 接口"); + } + + $taskKey = $this->generateDynamicTaskKey($className); + + $taskConfig = new TaskConfig( + taskKey: $taskKey, + className: $className, + name: $name !== '' ? $name : $className, + expression: $expression, + status: 'active', + createdAt: time(), + ); + + $this->persistNewTask($taskConfig); + + $this->logger->info("[CrontabScheduler] 动态投递任务: {$taskKey} '{$taskConfig->name}' '{$expression}'"); + + return $taskKey; + } + + /** + * 取消/移除指定任务 — 从 Redis 队列和元数据中彻底删除 + * + * @param string $taskKey 任务标识,由 submit() 返回 + * @return bool 是否成功取消 + */ + public function cancelTask(string $taskKey): bool + { + $hashKey = $this->getTaskHashKey($taskKey); + + if (!$this->redis->exists($hashKey)) { + return false; + } + + $this->removeFromQueue($taskKey); + $this->redis->del($hashKey); + $this->redis->del($this->getTaskLockKey($taskKey)); + + $this->logger->info("[CrontabScheduler] 任务已取消: {$taskKey}"); + + return true; + } + + /** + * 取消当前正在执行的任务 + * 在 TaskInterface::handle() 内部调用,执行完后任务将被移除不再调度 + * + * 使用示例: + * class MatchTask implements TaskInterface { + * public function handle(): void { + * if ($this->isMatched()) { + * CrontabScheduler::getInstance()?->cancelCurrentTask(); + * } + * } + * } + */ + public function cancelCurrentTask(): void + { + if ($this->currentTaskKey !== null) { + $this->markTaskForRemoval($this->currentTaskKey); + } + } + + /** + * 获取当前正在执行的任务 key (供任务内部使用) + */ + public function getCurrentTaskKey(): ?string + { + return $this->currentTaskKey; + } + + // ────────────────────────────────────────────── + // 任务管理 API + // ────────────────────────────────────────────── + + /** + * 同步任务到 Redis + */ + public function syncTasks(): void + { + foreach (TaskRegistry::all() as $taskKey => $taskConfig) { + $hashKey = $this->getTaskHashKey($taskKey); + + if (!$this->redis->exists($hashKey)) { + $this->persistNewTask($taskConfig); + continue; + } + + $this->mergeExistingTask($taskConfig); + } + } + + /** + * 刷新所有任务的表达式和下次运行时间 + */ + public function refreshTasks(): void + { + foreach (TaskRegistry::all() as $taskKey => $taskConfig) { + $hashKey = $this->getTaskHashKey($taskKey); + + $nextRun = $this->cronExpression->getNextRunTime($taskConfig->expression, time() - 1); + $interval = $this->cronExpression->getIntervalDescription($taskConfig->expression); + + $taskConfig->nextRun = $nextRun; + $taskConfig->interval = $interval; + + $hashData = $taskConfig->toHash(); + $this->redis->hMSet($hashKey, $hashData); + + if ($taskConfig->status === 'active' && $nextRun > 0) { + $this->redis->zAdd(self::QUEUE_KEY, $nextRun, $taskKey); + } + } + } + + /** + * 暂停指定任务 + */ + public function pauseTask(string $taskKey): bool + { + $hashKey = $this->getTaskHashKey($taskKey); + if (!$this->redis->exists($hashKey)) { + return false; + } + + $this->redis->hSet($hashKey, 'status', 'paused'); + $this->redis->zRem(self::QUEUE_KEY, $taskKey); + + $this->logger->info("[CrontabScheduler] 任务已暂停: {$taskKey}"); + return true; + } + + /** + * 恢复指定任务 + */ + public function resumeTask(string $taskKey): bool + { + $hashKey = $this->getTaskHashKey($taskKey); + $hash = $this->redis->hGetAll($hashKey); + if (empty($hash)) { + return false; + } + + $config = TaskConfig::fromHash($taskKey, $hash); + $nextRun = $this->cronExpression->getNextRunTime($config->expression, time() - 1); + + $this->redis->hMSet($hashKey, [ + 'status' => 'active', + 'next_run' => $nextRun, + ]); + + if ($nextRun > 0) { + $this->redis->zAdd(self::QUEUE_KEY, $nextRun, $taskKey); + } + + $this->logger->info("[CrontabScheduler] 任务已恢复: {$taskKey}"); + return true; + } + + // ────────────────────────────────────────────── + // 内部调度 + // ────────────────────────────────────────────── + + /** + * 单次 tick — 检查并执行到期的任务 + */ + private function tick(): void + { + if (!$this->acquireMasterLock()) { + return; + } + + try { + $this->processDueTasks(); + } finally { + $this->releaseMasterLock(); + } + } + + /** + * 获取主锁,防止多实例同时调度 + */ + private function acquireMasterLock(): bool + { + return $this->redis->set( + self::MASTER_LOCK_KEY, + (string)getmypid(), + ['nx', 'ex' => $this->lockTtl] + ); + } + + /** + * 释放主锁 + */ + private function releaseMasterLock(): void + { + $script = <<<'LUA' + if redis.call('get', KEYS[1]) == ARGV[1] then + return redis.call('del', KEYS[1]) + end + return 0 + LUA; + $this->redis->eval($script, [self::MASTER_LOCK_KEY, (string)getmypid()], 1); + } + + /** + * 续期主锁 + */ + private function renewMasterLock(): void + { + $this->redis->expire(self::MASTER_LOCK_KEY, $this->lockTtl); + } + + /** + * 处理所有到期任务 + */ + private function processDueTasks(): void + { + $now = time(); + + $dueTaskKeys = $this->redis->zRangeByScore(self::QUEUE_KEY, '-inf', $now, ['limit' => [0, 100]]); + + if (empty($dueTaskKeys)) { + $this->renewMasterLock(); + return; + } + + $this->renewMasterLock(); + + if ($this->concurrentTasks) { + $this->executeTasksConcurrently($dueTaskKeys, $now); + } else { + foreach ($dueTaskKeys as $taskKey) { + $this->executeSingleTask($taskKey, $now); + } + } + } + + /** + * 并发执行到期任务 (通过协程) + */ + private function executeTasksConcurrently(array $taskKeys, int $now): void + { + $batchSize = min(count($taskKeys), $this->maxConcurrent); + + $channel = new Coroutine\Channel($batchSize); + + foreach ($taskKeys as $taskKey) { + $channel->push(true); + + Coroutine::create(function () use ($taskKey, $now, $channel) { + try { + $this->executeSingleTask($taskKey, $now); + } catch (\Throwable $throwable) { + $this->logger->error("[CrontabScheduler] 并发执行异常: {$taskKey} - {$throwable->getMessage()}"); + } finally { + $channel->pop(); + } + }); + } + + for ($i = 0; $i < $batchSize; $i++) { + $channel->push(true); + } + $channel->close(); + } + + /** + * 执行单个任务 + */ + private function executeSingleTask(string $taskKey, int $now): void + { + $hashKey = $this->getTaskHashKey($taskKey); + $hash = $this->redis->hGetAll($hashKey); + + if (empty($hash)) { + $this->removeFromQueue($taskKey); + return; + } + + $config = TaskConfig::fromHash($taskKey, $hash); + + if ($config->status !== 'active') { + $this->removeFromQueue($taskKey); + return; + } + + if (!$this->acquireTaskLock($taskKey)) { + $this->logger->warning("[CrontabScheduler] 任务锁获取失败 (可能仍在执行中): {$taskKey}"); + return; + } + + $startTime = microtime(true); + + // 记录当前任务 key,供任务内部通过 cancelCurrentTask() 取消自身 + $this->currentTaskKey = $taskKey; + + try { + $this->redis->sAdd(self::RUNNING_SET_KEY, $taskKey); + + $this->logger->info("[CrontabScheduler] 开始执行: {$taskKey} ({$config->name})"); + + $className = $config->className; + if (!class_exists($className)) { + throw new \RuntimeException("任务类不存在: {$className}"); + } + + /** @var TaskInterface $taskInstance */ + $taskInstance = new $className(); + $taskInstance->handle(); + + $duration = round(microtime(true) - $startTime, 4); + + $this->logger->info("[CrontabScheduler] 执行成功: {$taskKey} 耗时 {$duration}s"); + } catch (\Throwable $throwable) { + $duration = round(microtime(true) - $startTime, 4); + + $this->logger->error("[CrontabScheduler] 执行失败: {$taskKey} 耗时 {$duration}s 错误: {$throwable->getMessage()}"); + } finally { + $this->redis->sRem(self::RUNNING_SET_KEY, $taskKey); + + // 检查是否被标记为"执行后移除" + $shouldRemove = $this->isTaskMarkedForRemoval($taskKey); + + if ($shouldRemove) { + $this->removeFromQueue($taskKey); + $this->redis->del($hashKey); + $this->redis->del($this->getTaskLockKey($taskKey)); + $this->clearRemovalFlag($taskKey); + $this->logger->info("[CrontabScheduler] 任务已自毁: {$taskKey}"); + } else { + $this->finalizeTaskExecution($config, $now); + } + + $this->releaseTaskLock($taskKey); + $this->currentTaskKey = null; + } + } + + /** + * 完成执行后的任务状态更新 + */ + private function finalizeTaskExecution(TaskConfig $config, int $now): void + { + $taskKey = $config->taskKey; + $hashKey = $this->getTaskHashKey($taskKey); + + $isOneShot = $this->cronExpression->isOneShot($config->expression); + + if ($isOneShot) { + $this->removeFromQueue($taskKey); + $this->redis->del($hashKey); + $this->logger->info("[CrontabScheduler] 一次性任务已移除: {$taskKey}"); + return; + } + + $nextRun = $this->cronExpression->getNextRunTime($config->expression, $now); + $interval = $this->cronExpression->getIntervalDescription($config->expression); + + $updateData = [ + 'last_run' => $now, + 'next_run' => $nextRun, + 'interval' => $interval, + ]; + + $this->redis->hMSet($hashKey, $updateData); + + if ($nextRun > 0) { + $this->redis->zAdd(self::QUEUE_KEY, $nextRun, $taskKey); + } else { + $this->removeFromQueue($taskKey); + } + } + + /** + * 标记任务为"下次执行后移除" + * 当任务内部调用 cancelCurrentTask() 时,不立即删除(执行中操作安全),仅标记 + * 等当前执行完成后,executeSingleTask 的 finally 块会检查此标记并清理 + */ + private function markTaskForRemoval(string $taskKey): void + { + $this->redis->set('crontab:removal:' . $taskKey, '1', 60); + } + + /** + * 检查任务是否被标记为待移除 + */ + private function isTaskMarkedForRemoval(string $taskKey): bool + { + return (bool)$this->redis->exists('crontab:removal:' . $taskKey); + } + + /** + * 清除移除标记 + */ + private function clearRemovalFlag(string $taskKey): void + { + $this->redis->del('crontab:removal:' . $taskKey); + } + + // ────────────────────────────────────────────── + // Redis 操作辅助 + // ────────────────────────────────────────────── + + /** + * 获取任务执行锁 + */ + private function acquireTaskLock(string $taskKey): bool + { + $lockKey = $this->getTaskLockKey($taskKey); + return $this->redis->set($lockKey, (string)time(), ['nx', 'ex' => $this->taskTimeout]); + } + + /** + * 释放任务执行锁 + */ + private function releaseTaskLock(string $taskKey): void + { + $this->redis->del($this->getTaskLockKey($taskKey)); + } + + /** + * 从调度队列移除任务 + */ + private function removeFromQueue(string $taskKey): void + { + $this->redis->zRem(self::QUEUE_KEY, $taskKey); + } + + /** + * 持久化新任务到 Redis + */ + private function persistNewTask(TaskConfig $taskConfig): void + { + $nextRun = $this->cronExpression->getNextRunTime($taskConfig->expression, time() - 1); + $interval = $this->cronExpression->getIntervalDescription($taskConfig->expression); + + $taskConfig->nextRun = $nextRun; + $taskConfig->interval = $interval; + $taskConfig->createdAt = time(); + + $hashKey = $this->getTaskHashKey($taskConfig->taskKey); + $this->redis->hMSet($hashKey, $taskConfig->toHash()); + + if ($taskConfig->status === 'active' && $nextRun > 0) { + $this->redis->zAdd(self::QUEUE_KEY, $nextRun, $taskConfig->taskKey); + } + + $this->logger->info("[CrontabScheduler] 新任务已注册: {$taskConfig->taskKey} 下次执行: " . date('Y-m-d H:i:s', $nextRun)); + } + + /** + * 合并更新已有任务配置 (保留运行时状态) + */ + private function mergeExistingTask(TaskConfig $taskConfig): void + { + $hashKey = $this->getTaskHashKey($taskConfig->taskKey); + + $updateData = [ + 'class' => $taskConfig->className, + 'name' => $taskConfig->name, + 'expression' => $taskConfig->expression, + ]; + $this->redis->hMSet($hashKey, $updateData); + } + + /** + * 生成动态任务的唯一标识 + */ + private function generateDynamicTaskKey(string $className): string + { + $baseKey = strtolower(str_replace('\\', '.', ltrim($className, '\\'))); + $suffix = substr(md5(uniqid((string)mt_rand(), true)), 0, 8); + + return 'dynamic.' . $baseKey . '.' . $suffix; + } + + // ────────────────────────────────────────────── + // 辅助方法 + // ────────────────────────────────────────────── + + /** + * 计算下一个 tick 的精确时间点 (对齐到秒边界) + */ + private function calculateNextTickTime(): float + { + $now = microtime(true); + $current = floor($now); + return $current + $this->tickInterval; + } + + /** + * 获取任务 Hash key + */ + private function getTaskHashKey(string $taskKey): string + { + return self::KEY_PREFIX . ':task:' . $taskKey; + } + + /** + * 获取任务执行锁 key + */ + private function getTaskLockKey(string $taskKey): string + { + return self::KEY_PREFIX . ':lock:task:' . $taskKey; + } + +} diff --git a/src/Events/OnTaskBeforeExecute.php b/src/Events/OnTaskBeforeExecute.php new file mode 100644 index 0000000..f46bf3c --- /dev/null +++ b/src/Events/OnTaskBeforeExecute.php @@ -0,0 +1,24 @@ +createdAt === 0) { + $this->createdAt = time(); + } + if ($this->name === '') { + $this->name = $this->taskKey; + } + } + + /** + * 从数组创建配置对象 + */ + public static function fromArray(array $data): static + { + return new static( + taskKey: $data['taskKey'] ?? $data['class'], + className: $data['class'], + name: $data['name'] ?? '', + expression: $data['expression'] ?? '', + status: $data['status'] ?? 'active', + nextRun: (int)($data['next_run'] ?? 0), + lastRun: (int)($data['last_run'] ?? 0), + interval: $data['interval'] ?? '', + createdAt: (int)($data['created_at'] ?? 0), + ); + } + + /** + * 从 Hash 数据生成配置对象 (从 Redis 读取) + */ + public static function fromHash(string $taskKey, array $hash): static + { + return new static( + taskKey: $taskKey, + className: $hash['class'] ?? '', + name: $hash['name'] ?? '', + expression: $hash['expression'] ?? '', + status: $hash['status'] ?? 'active', + nextRun: (int)($hash['next_run'] ?? 0), + lastRun: (int)($hash['last_run'] ?? 0), + interval: $hash['interval'] ?? '', + createdAt: (int)($hash['created_at'] ?? 0), + ); + } + + /** + * 转为 Hash 存储数组 + */ + public function toHash(): array + { + return [ + 'class' => $this->className, + 'name' => $this->name, + 'expression' => $this->expression, + 'status' => $this->status, + 'next_run' => $this->nextRun, + 'last_run' => $this->lastRun, + 'interval' => $this->interval, + 'created_at' => $this->createdAt, + ]; + } + +} diff --git a/src/TaskInterface.php b/src/TaskInterface.php new file mode 100644 index 0000000..1b76b9e --- /dev/null +++ b/src/TaskInterface.php @@ -0,0 +1,23 @@ + ..., 'expression' => 'daily:03:00']); + * $all = TaskRegistry::all(); + * $count = TaskRegistry::count(); + */ +class TaskRegistry +{ + + /** @var array 已注册任务,key 为任务标识 */ + private static array $tasks = []; + + /** + * 注册单个任务 + * + * @param array $config 任务配置数组,必须包含 class 和 expression 字段 + * @return TaskConfig + * @throws \InvalidArgumentException + */ + public static function register(array $config): TaskConfig + { + if (empty($config['class'])) { + throw new \InvalidArgumentException('任务配置必须包含 class 字段'); + } + if (empty($config['expression'])) { + throw new \InvalidArgumentException('任务配置必须包含 expression 字段'); + } + + $className = $config['class']; + if (!class_exists($className)) { + throw new \InvalidArgumentException("任务类不存在: {$className}"); + } + if (!in_array(TaskInterface::class, class_implements($className), true)) { + throw new \InvalidArgumentException("{$className} 必须实现 TaskInterface 接口"); + } + + $taskKey = self::generateTaskKey($config['class']); + $taskConfig = TaskConfig::fromArray(array_merge($config, ['taskKey' => $taskKey])); + + self::$tasks[$taskKey] = $taskConfig; + + return $taskConfig; + } + + /** + * 批量注册任务 + * + * @param array $configs 任务配置数组列表 + * @return array + */ + public static function registerMany(array $configs): array + { + $results = []; + foreach ($configs as $config) { + $taskConfig = self::register($config); + $results[$taskConfig->taskKey] = $taskConfig; + } + return $results; + } + + /** + * 获取所有已注册任务 + * + * @return array + */ + public static function all(): array + { + return self::$tasks; + } + + /** + * 获取指定任务配置 + */ + public static function get(string $taskKey): ?TaskConfig + { + return self::$tasks[$taskKey] ?? null; + } + + /** + * 获取所有活跃任务 (status=active) + */ + public static function getActiveTasks(): array + { + return array_filter(self::$tasks, fn(TaskConfig $config) => $config->status === 'active'); + } + + /** + * 任务总数 + */ + public static function count(): int + { + return count(self::$tasks); + } + + /** + * 清空注册表 (用于测试) + */ + public static function clear(): void + { + self::$tasks = []; + } + + /** + * 根据类名生成任务唯一标识 + * 将反斜杠替换为点号,转为小写可读 key + */ + private static function generateTaskKey(string $className): string + { + return strtolower( + str_replace('\\', '.', ltrim($className, '\\')) + ); + } + +} diff --git a/src/functions.php b/src/functions.php new file mode 100644 index 0000000..e424fc9 --- /dev/null +++ b/src/functions.php @@ -0,0 +1,51 @@ +submit($className, $expression, $name); +} + +/** + * 取消指定任务 + * + * @param string $taskKey 由 submit() 或 submitToCrontab() 返回的任务标识 + * @return bool 是否成功取消 + */ +function cancelCrontabTask(string $taskKey): bool +{ + $scheduler = CrontabScheduler::getInstance(); + + if ($scheduler === null) { + return false; + } + + return $scheduler->cancelTask($taskKey); +} diff --git a/tests/CronExpressionTest.php b/tests/CronExpressionTest.php new file mode 100644 index 0000000..1fdcda7 --- /dev/null +++ b/tests/CronExpressionTest.php @@ -0,0 +1,199 @@ +parser = new CronExpression(); + } + + /** + * 运行所有测试 + */ + public function run(): void + { + $methods = get_class_methods($this); + $passed = 0; + $failed = 0; + + foreach ($methods as $method) { + if (!str_starts_with($method, 'test')) { + continue; + } + + echo " 运行: {$method}..." . PHP_EOL; + try { + $this->{$method}(); + $passed++; + echo " ✓ 通过" . PHP_EOL; + } catch (\Throwable $throwable) { + $failed++; + echo " ✗ 失败: {$throwable->getMessage()}" . PHP_EOL; + } + } + + echo PHP_EOL; + echo "总计: " . ($passed + $failed) . " 个测试, {$passed} 通过, {$failed} 失败" . PHP_EOL; + } + + /** + * 基础断言 + */ + private function assertTrue(bool $condition, string $message = ''): void + { + if (!$condition) { + throw new \RuntimeException($message ?: '断言失败'); + } + } + + /** + * 测试 every:60 表达式 + */ + public function testEverySeconds(): void + { + $now = time(); + $next = $this->parser->getNextRunTime('every:60', $now); + $this->assertTrue($next === $now + 60, "期望 " . ($now + 60) . " 实际 $next"); + } + + /** + * 测试 every:5m 表达式 + */ + public function testEveryMinutes(): void + { + $now = time(); + $next = $this->parser->getNextRunTime('every:5m', $now); + $this->assertTrue($next === $now + 300, "期望 " . ($now + 300) . " 实际 $next"); + } + + /** + * 测试 every:1h 表达式 + */ + public function testEveryHours(): void + { + $now = time(); + $next = $this->parser->getNextRunTime('every:1h', $now); + $this->assertTrue($next === $now + 3600, "期望 " . ($now + 3600) . " 实际 $next"); + } + + /** + * 测试 daily 表达式 + */ + public function testDaily(): void + { + $now = mktime(10, 0, 0, 1, 15, 2026); // 2026-01-15 10:00:00 + $next = $this->parser->getNextRunTime('daily:03:00', $now); + $expectedToday = mktime(3, 0, 0, 1, 15, 2026); + + if ($now >= $expectedToday) { + $expected = mktime(3, 0, 0, 1, 16, 2026); + } else { + $expected = $expectedToday; + } + + $this->assertTrue($next === $expected, "期望 $expected 实际 $next"); + } + + /** + * 测试 hourly 表达式 + */ + public function testHourly(): void + { + $now = mktime(10, 15, 0, 1, 15, 2026); // 10:15 + $next = $this->parser->getNextRunTime('hourly:30', $now); + // 10:15 之后的下一个 10:30 + $expected = mktime(10, 30, 0, 1, 15, 2026); + $this->assertTrue($next === $expected, "期望 $expected 实际 $next"); + } + + /** + * 测试 at 一次性任务 + */ + public function testAt(): void + { + $now = time(); + $future = $now + 3600; + $next = $this->parser->getNextRunTime("at:{$future}", $now); + $this->assertTrue($next === $future, "期望 $future 实际 $next"); + + // 已过期的 at 任务返回 0 + $past = $now - 3600; + $expired = $this->parser->getNextRunTime("at:{$past}", $now); + $this->assertTrue($expired === 0, "期望 0 实际 $expired"); + } + + /** + * 测试 isOneShot + */ + public function testIsOneShot(): void + { + $this->assertTrue($this->parser->isOneShot('at:1234567890')); + $this->assertTrue(!$this->parser->isOneShot('every:60')); + $this->assertTrue(!$this->parser->isOneShot('daily:03:00')); + $this->assertTrue(!$this->parser->isOneShot('cron:* * * * *')); + } + + /** + * 测试 cron 表达式 每5分钟 + */ + public function testCronEvery5Minutes(): void + { + $now = mktime(10, 2, 0, 1, 15, 2026); // 10:02 + $next = $this->parser->getNextRunTime('cron:*/5 * * * *', $now); + // 下一个 */5 是 10:05 + $expected = mktime(10, 5, 0, 1, 15, 2026); + $this->assertTrue($next === $expected, "期望 $expected 实际 $next"); + } + + /** + * 测试 cron 表达式精确时间 + */ + public function testCronExactTime(): void + { + $now = mktime(10, 0, 0, 1, 15, 2026); + $next = $this->parser->getNextRunTime('cron:30 8 * * *', $now); + // 下一个 8:30 是明天 + $expected = mktime(8, 30, 0, 1, 16, 2026); + $this->assertTrue($next === $expected, "期望 $expected 实际 $next"); + } + + /** + * 测试 cron 无效表达式 + */ + public function testInvalidCronExpression(): void + { + $next = $this->parser->getNextRunTime('cron:invalid', time()); + $this->assertTrue($next === 0, "期望 0 实际 $next"); + } + + /** + * 测试间隔描述 + */ + public function testIntervalDescription(): void + { + $this->assertTrue($this->parser->getIntervalDescription('every:60') === '每 60 秒'); + $this->assertTrue($this->parser->getIntervalDescription('every:5m') === '每 5 分钟'); + $this->assertTrue($this->parser->getIntervalDescription('every:1h') === '每 1 小时'); + $this->assertTrue($this->parser->getIntervalDescription('daily:03:00') === '每天 03:00'); + $this->assertTrue($this->parser->getIntervalDescription('at:1234567890') === '一次性任务'); + } +} + +// 运行测试 +$test = new CronExpressionTest(); +$test->run();