From ad7a95866228e964fb2e2d240307322738b5788c Mon Sep 17 00:00:00 2001 From: whwyy Date: Sun, 28 Jun 2026 17:31:59 +0800 Subject: [PATCH] eee --- DESIGN.md | 180 +++++++++++++++------- bin/crontab | 76 +++------ composer.json | 8 +- src/CrontabCommand.php | 170 ++++++++++++++++----- src/CrontabProcess.php | 322 +++++++++++++++++++++------------------ src/CrontabProviders.php | 23 ++- src/CrontabScheduler.php | 264 +++++++++++++++++++++++++++----- 7 files changed, 687 insertions(+), 356 deletions(-) diff --git a/DESIGN.md b/DESIGN.md index 20e1bf6..64e572d 100644 --- a/DESIGN.md +++ b/DESIGN.md @@ -8,7 +8,9 @@ - cron 表达式:标准 5 字段 cron 表达式调度 - 一次性任务:执行后自动移除 - 任务暂停/恢复:通过 Redis 标记控制任务启停 -- **注解驱动**: 使用 `#[Crontab]` 注解声明任务,支持自动扫描发现 +- **注解驱动**: 使用 `#[Crontab]` 注解声明任务,启动时自动扫描发现 +- **事件驱动**: 通过 kiri-core PSR-14 事件系统分发任务生命周期事件 +- **幽灵任务清理**: 启动时自动清理 Redis 中已从配置移除的任务 ## 二、核心组件 @@ -18,10 +20,16 @@ ├─────────────────────────────────────────────────────────┤ │ │ │ ┌──────────────┐ ┌──────────────────┐ │ -│ │CrontabCommand │───▶│ CrontabScheduler │ │ -│ │ (控制台命令) │ │ (调度引擎) │ │ +│ │CrontabCommand │───▶│ CrontabProcess │ │ +│ │ (Symfony命令) │ │ (继承AbstractProcess) │ │ └──────────────┘ └────────┬─────────┘ │ │ │ │ +│ ▼ │ +│ ┌──────────────────┐ │ +│ │ CrontabScheduler │ │ +│ │ (继承Component) │ │ +│ └────────┬─────────┘ │ +│ │ │ │ ┌─────────▼─────────┐ │ │ │ TaskRegistry │ │ │ │ (任务注册中心) │ │ @@ -41,6 +49,11 @@ │ │ └──────────────┘ │ │ │ └────────────────────────────────────────────────────┘ │ │ │ +│ ┌────────────────────────────────────────────────────┐ │ +│ │ kiri-core 事件系统 (PSR-14) │ │ +│ │ OnTaskBeforeExecute → OnTaskExecuted → OnTaskFailed│ │ +│ └────────────────────────────────────────────────────┘ │ +│ │ └─────────────────────────────────────────────────────────┘ ``` @@ -65,7 +78,7 @@ Type: Hash Fields: class - 任务处理类完整路径 (如 App\Task\CleanLogTask) name - 任务显示名称 - expression - 调度表达式 (every:60 | cron:*\/5 * * * * | daily:03:00 | at:1234567890) + expression - 调度表达式 (every:60 | cron:*/5 * * * * | daily:03:00 | at:1234567890) next_run - 下次执行时间戳 (秒) last_run - 上次执行时间戳 (秒) status - 状态: active / paused / disabled @@ -78,7 +91,7 @@ Fields: ``` Key: crontab:lock:task:{taskKey} Type: String -Value: Swoole Worker ID + Timestamp +Value: Timestamp TTL: 任务超时时间 (默认 300s) ``` @@ -90,7 +103,7 @@ TTL: 任务超时时间 (默认 300s) Key: crontab:lock:master Type: String Value: Worker PID -TTL: 60s (定期续期) +TTL: 60s (并发执行期间由独立协程定期续期) ``` 防止多实例同时调度,用 SET NX EX + Lua 脚本实现。 @@ -105,6 +118,17 @@ Member: 当前正在执行的任务 key 列表 用于监控哪些任务正在运行中。 +### 3.6 任务移除标记 + +``` +Key: crontab:removal:{taskKey} +Type: String +Value: "1" +TTL: 60s +``` + +当任务内部调用 cancelCurrentTask() 时设置此标记,执行完成后检查并清理。 + ## 四、调度表达式 | 格式 | 示例 | 含义 | @@ -115,7 +139,7 @@ Member: 当前正在执行的任务 key 列表 | `every:{时}h` | `every:1h` | 每 1 小时执行 | | `daily:{HH:MM}` | `daily:03:00` | 每天凌晨 3 点 | | `hourly:{MM}` | `hourly:30` | 每小时第 30 分 | -| `cron:{表达式}` | `cron:*\/5 * * * *` | 标准 5 字段 cron | +| `cron:{表达式}` | `cron:*/5 * * * *` | 标准 5 字段 cron | | `at:{时间戳}` | `at:1719590400` | 指定时间戳一次性 | ## 五、调度流程 @@ -129,6 +153,7 @@ Member: 当前正在执行的任务 key 列表 ┌──────────────────────────────────────┐ │ 1. 扫描 TaskRegistry 注册的所有任务 │ │ 将未注册到 Redis 的任务写入 Hash │ +│ 清理 Redis 中已移除的幽灵任务 │ │ 将任务加入 ZSET 调度队列 │ └────────────────┬─────────────────────┘ │ @@ -151,18 +176,22 @@ Member: 当前正在执行的任务 key 列表 └──────────┬───────────┘ │ │ 有任务 │ ▼ │ + ┌──────────────────────────────────────┐ │ + │ 4. 遍历到期任务 (协程并发) │ │ + │ - 启动主锁续期协程 │ │ + │ - 通过 Channel 控制最大并发数 │ │ + │ - 获取任务锁 │ │ + │ - 分发 OnTaskBeforeExecute 事件 │ │ + │ - 执行任务 handle() │ │ + │ - 分发 OnTaskExecuted/OnTaskFailed │ │ + │ - 计算下次执行时间 │ │ + │ - 更新 ZSET 分数 │ │ + │ - 停止锁续期协程 │ │ + └──────────────────────────────────────┘ │ + │ │ + ▼ │ ┌──────────────────────┐ │ - │ 4. 遍历到期任务 │ │ - │ - 获取任务锁 │ │ - │ - 执行任务 handle() │ │ - │ - 记录日志/更新状态 │◀─────────────────────┘ - │ - 计算下次执行时间 │ - │ - 更新 ZSET 分数 │ - └──────────┬───────────┘ - │ - ▼ - ┌──────────────────────┐ - │ 5. sleep(1) 等待下个 │ + │ 5. sleep(1) 等待下个 │◀─────────────────────┘ │ tick │ └──────────────────────┘ ``` @@ -183,11 +212,31 @@ Member: 当前正在执行的任务 key 列表 active ──▶ paused ──▶ active (暂停/恢复) active ──▶ disabled (禁用,从 ZSET 移除) 一次性任务执行完成后: 从 ZSET 和 Redis 中移除 + 任务从配置中移除: 启动时自动清理 (幽灵任务清理) ``` -## 七、与 kiri-core 集成方式 +## 七、事件系统 -### 7.1 作为 Composer 包引入 +调度器通过 kiri-core PSR-14 事件系统在任务执行的各个阶段分发事件: + +| 事件 | 触发时机 | 参数 | +|------|---------|------| +| `OnTaskBeforeExecute` | 任务执行前 | taskKey, className, taskName | +| `OnTaskExecuted` | 任务执行成功 | taskKey, className, taskName, duration, nextRun | +| `OnTaskFailed` | 任务执行失败 | taskKey, className, taskName, error, duration, nextRun | + +监听器注册方式(config/events.php): +```php +return [ + \Kiri\Crontab\Events\OnTaskFailed::class => [ + [App\Listener\CrontabAlertListener::class, 'process'], + ], +]; +``` + +## 八、与 kiri-core 集成方式 + +### 8.1 作为 Composer 包引入 ```json { "require": { @@ -196,22 +245,32 @@ Member: 当前正在执行的任务 key 列表 } ``` -### 7.2 通过 Provider 注册到框架 +### 8.2 通过 Provider 注册命令 +CrontabProviders 自动将 `sw:crontab` 命令注册到 Console Application。 + +### 8.3 通过 Process 集成 ```php // config/servers.php 中添加 'process' => [ \Kiri\Crontab\CrontabProcess::class, ], ``` +CrontabProcess 继承 AbstractProcess,支持框架的生命周期管理(信号处理、优雅退出)。 -### 7.3 独立运行模式 +### 8.4 独立运行模式 ```bash php bin/crontab start php bin/crontab stop php bin/crontab restart +php bin/crontab status ``` -### 7.4 注解驱动注册方式 +### 8.5 kiri-core 控制台模式 +```bash +php kiri.php sw:crontab start|stop|restart|status +``` + +### 8.6 注解驱动注册方式 ```php use Kiri\Crontab\Annotate\Crontab; use Kiri\Crontab\TaskInterface; @@ -226,70 +285,75 @@ class CleanLogTask implements TaskInterface } ``` -在配置中指定扫描路径: -```php -// config/crontab.php -'scan_paths' => [ - 'app/Task', - 'app/Crontab', -], -``` +任务类实现 TaskInterface 并使用 `#[Crontab]` 注解后,CrontabProcess 启动时自动通过 `get_declared_classes()` 发现并注册。 -## 八、注解扫描流程 +## 九、注解扫描流程 ``` ┌──────────────────────────────────────┐ -│ CrontabScanner::scan($directory) │ +│ CrontabProcess::discoverAnnotationTasks() │ └────────────────┬─────────────────────┘ │ ▼ ┌──────────────────────────────────────┐ -│ 1. 递归遍历目录下所有 PHP 文件 │ -│ 跳过 vendor/tests/cache/storage │ +│ 1. 遍历 get_declared_classes() │ +│ 遍历所有已加载的 PHP 类 │ └────────────────┬─────────────────────┘ │ ▼ ┌──────────────────────────────────────┐ -│ 2. require_once 加载文件 │ -│ get_declared_classes() 获取新类 │ -└────────────────┬─────────────────────┘ - │ - ▼ -┌──────────────────────────────────────┐ -│ 3. 检查类是否 implements TaskInterface│ +│ 2. 检查类是否 implements TaskInterface│ │ ──否──▶ 跳过 │ └────────────────┬─────────────────────┘ │ 是 ▼ ┌──────────────────────────────────────┐ -│ 4. ReflectionClass::getAttributes( │ +│ 3. ReflectionClass::getAttributes( │ │ Crontab::class) 读取注解 │ │ ──无──▶ 跳过 │ └────────────────┬─────────────────────┘ │ 有 ▼ ┌──────────────────────────────────────┐ -│ 5. $registry->register([ │ -│ 'class' => $className, │ -│ 'name' => $crontab->name, │ -│ 'expression' => $crontab->expr, │ -│ 'status' => $crontab->status, │ +│ 4. 调用 buildExpression() 生成表达式 │ +│ 验证表达式非空 │ +│ ──空──▶ 跳过 │ +└────────────────┬─────────────────────┘ + │ 有效 + ▼ +┌──────────────────────────────────────┐ +│ 5. TaskRegistry::register([ │ +│ 'class' => $className, │ +│ 'name' => $crontab->name, │ +│ 'expression' => $expr, │ +│ 'status' => $crontab->status│ │ ]); │ └──────────────────────────────────────┘ ``` -## 九、依赖关系 +## 十、依赖关系 ``` kiri-crontab ├── PHP >= 8.5 -├── ext-swoole (协程/进程) -├── ext-redis (Redis 客户端) -├── psr/log (PSR-3 日志) -└── symfony/console (可选,如集成 kiri-core 则复用项目已有的) +├── ext-swoole (协程/进程) +├── ext-redis (Redis 客户端) +├── psr/log (PSR-3 日志) +├── psr/event-dispatcher (PSR-14 事件) +└── symfony/console (命令行集成) ``` -## 十、目录结构 +与 kiri-core 框架的关系: +``` +kiri-crontab (外挂包) +├── 继承 Kiri\Abstracts\Component (CrontabScheduler) +├── 继承 Kiri\Abstracts\Providers (CrontabProviders) +├── 继承 Kiri\Server\Processes\AbstractProcess (CrontabProcess) +├── 使用 Kiri\Events\EventDispatch (事件分发) +└── 集成 Symfony\Component\Console\Command\Command +``` + +## 十一、目录结构 ``` kiri-crontab/ @@ -304,12 +368,12 @@ kiri-crontab/ │ ├── TaskInterface.php # 任务接口 │ ├── TaskConfig.php # 任务配置值对象 │ ├── TaskRegistry.php # 任务注册中心 -│ ├── CrontabScheduler.php # 核心调度引擎 -│ ├── CrontabProcess.php # Swoole 进程适配器 -│ ├── CrontabCommand.php # 控制台命令 -│ ├── CrontabScanner.php # 注解任务扫描器 +│ ├── CrontabScheduler.php # 核心调度引擎 (继承 Component) +│ ├── CrontabProcess.php # Swoole 进程适配器 (继承 AbstractProcess) +│ ├── CrontabCommand.php # Symfony 控制台命令 │ ├── CrontabProviders.php # kiri-core Provider 集成 │ ├── CronExpression.php # Cron 表达式解析器 +│ ├── functions.php # 全局辅助函数 │ ├── Annotate/ │ │ └── Crontab.php # #[Crontab] 注解类 │ └── Events/ diff --git a/bin/crontab b/bin/crontab index eef96bd..7f84faa 100644 --- a/bin/crontab +++ b/bin/crontab @@ -11,10 +11,9 @@ * * 任务定义方式: * 1. 配置文件: config/crontab.php 的 tasks 中声明 - * 2. 注解自动扫描: 在任务类上使用 #[Crontab] 注解,配置 scan_paths 目录 + * 2. 注解自动扫描: 在任务类上使用 #[Crontab] 注解 * - * 确保项目根目录有 config/crontab.php 配置文件 - * 并在其中定义 Redis 连接 (redis)、调度参数 (scheduler) 和扫描路径 (scan_paths) + * CrontabProcess 启动时会自动完成配置任务注册和注解扫描 */ declare(strict_types=1); @@ -41,14 +40,16 @@ if ($autoloadPath === null) { require $autoloadPath; use Kiri\Crontab\CrontabCommand; -use Kiri\Crontab\TaskRegistry; +use Symfony\Component\Console\Application; +use Symfony\Component\Console\Input\ArgvInput; +use Symfony\Component\Console\Output\ConsoleOutput; // 解析命令行参数 $args = $argv; $script = array_shift($args); $action = array_shift($args) ?: 'status'; -// 查找配置文件 +// 查找配置文件,获取 PID 文件路径 $configFiles = [ getcwd() . '/config/crontab.php', __DIR__ . '/../config/crontab.php', @@ -73,60 +74,19 @@ if (!is_array($config)) { exit(1); } -// 初始化任务注册中心 (静态注册表) -// 1. 注册配置文件中的任务 -$taskList = $config['tasks'] ?? []; -foreach ($taskList as $taskConfig) { - try { - TaskRegistry::register($taskConfig); - } catch (\Throwable $throwable) { - fwrite(STDERR, "任务注册失败: {$throwable->getMessage()}" . PHP_EOL); - } -} +// PID 文件路径 +$pidFile = $config['scheduler']['pid_file'] ?? (getcwd() . '/storage/crontab.pid'); -// 2. 发现注解任务 (检查类上的 #[Crontab] 注解) -foreach (get_declared_classes() as $className) { - try { - $reflect = new ReflectionClass($className); - if ($reflect->isAbstract()) continue; - if (!in_array(Kiri\Crontab\TaskInterface::class, class_implements($className), true)) continue; +// 创建 Console Application 并执行 +$application = new Application('kiri-crontab', '1.0.0'); +$application->setAutoExit(false); - $attributes = $reflect->getAttributes(Kiri\Crontab\Annotate\Crontab::class); - if (empty($attributes)) continue; +$command = new CrontabCommand($pidFile); +$command->setName('crontab'); +$application->add($command); - $instance = $attributes[0]->newInstance(); - $expr = $instance->buildExpression(); - if ($expr === '') continue; +$input = new ArgvInput(['crontab', $action]); +$output = new ConsoleOutput(); - Kiri\Crontab\TaskRegistry::register([ - 'class' => $className, - 'name' => $instance->name !== '' ? $instance->name : $className, - 'expression' => $expr, - 'status' => $instance->status, - ]); - } catch (\Throwable) {} -} - -echo "[Crontab] 任务总数: " . TaskRegistry::count() . PHP_EOL; - -// PID 文件和日志文件 -$pidFile = $config['scheduler']['pid_file'] ?? (getcwd() . '/storage/crontab.pid'); -$logFile = $config['scheduler']['log_file'] ?? ''; - -// 创建命令实例 -$command = new CrontabCommand($pidFile, $logFile); - -// 路由到对应操作 -try { - match ($action) { - 'start' => $command->start($config), - 'stop' => $command->stop(), - 'restart' => $command->restart($config), - 'status' => $command->status(), - default => fwrite(STDERR, "未知操作: {$action} (支持: start|stop|restart|status)" . PHP_EOL), - }; -} catch (\Throwable $throwable) { - fwrite(STDERR, "执行失败: {$throwable->getMessage()}" . PHP_EOL); - fwrite(STDERR, "{$throwable->getTraceAsString()}" . PHP_EOL); - exit(1); -} +$exitCode = $application->run($input, $output); +exit($exitCode); diff --git a/composer.json b/composer.json index 6389c7b..c24cbfc 100644 --- a/composer.json +++ b/composer.json @@ -13,7 +13,8 @@ "php": ">=8.5", "ext-swoole": "*", "ext-redis": "*", - "psr/log": "^1.0" + "psr/log": "^1.0", + "psr/event-dispatcher": "^1.0" }, "suggest": { "symfony/console": "如需集成 kiri-core 或使用命令行管理,建议安装 ^v8.0", @@ -22,7 +23,10 @@ "autoload": { "psr-4": { "Kiri\\Crontab\\": "src/" - } + }, + "files": [ + "src/functions.php" + ] }, "bin": [ "bin/crontab" diff --git a/src/CrontabCommand.php b/src/CrontabCommand.php index cde32b8..a05bc55 100644 --- a/src/CrontabCommand.php +++ b/src/CrontabCommand.php @@ -3,24 +3,36 @@ declare(strict_types=1); namespace Kiri\Crontab; +use Swoole\Process; +use Symfony\Component\Console\Command\Command; +use Symfony\Component\Console\Input\InputArgument; +use Symfony\Component\Console\Input\InputInterface; +use Symfony\Component\Console\Output\OutputInterface; + /** - * Crontab 控制台命令 — 独立模式下的启动/停止/管理命令 + * Crontab 控制台命令 — 管理调度器的启动/停止/重启/状态 * - * 支持的命令: - * php bin/crontab start 启动调度器 - * php bin/crontab stop 停止调度器 - * php bin/crontab restart 重启调度器 - * php bin/crontab status 查看调度状态 + * 使用方式: + * php kiri.php sw:crontab start + * php kiri.php sw:crontab stop + * php kiri.php sw:crontab restart + * php kiri.php sw:crontab status + * + * 独立模式: + * php bin/crontab start|stop|restart|status */ -class CrontabCommand +class CrontabCommand extends Command { + /** @var string 命令名称 */ + public string $command = 'sw:crontab'; + + /** @var string 命令描述 */ + public string $description = '管理 crontab 调度器: start|stop|restart|status'; + /** @var string PID 文件路径 */ private string $pidFile; - /** @var string 日志文件路径 */ - private string $logFile; - /** * @param string $pidFile PID 文件路径 * @param string $logFile 日志文件路径 @@ -29,68 +41,120 @@ class CrontabCommand string $pidFile = '', string $logFile = '', ) { + parent::__construct(); $this->pidFile = $pidFile !== '' ? $pidFile : sys_get_temp_dir() . '/crontab.pid'; - $this->logFile = $logFile; + } + + /** + * 配置命令参数 + */ + protected function configure(): void + { + $this->setName('sw:crontab') + ->addArgument('action', InputArgument::OPTIONAL, 'start|stop|restart|status', 'status') + ->setDescription('./snowflake sw:crontab start|stop|restart|status'); + } + + /** + * 执行命令 + * + * @param InputInterface $input 输入 + * @param OutputInterface $output 输出 + * @return int 退出码 + */ + public function execute(InputInterface $input, OutputInterface $output): int + { + $action = $input->getArgument('action'); + + try { + match ($action) { + 'start' => $this->start($output), + 'stop' => $this->stop($output), + 'restart' => $this->restart($output), + 'status' => $this->status($output), + default => $output->writeln("未知操作: {$action} (支持: start|stop|restart|status)"), + }; + } catch (\Throwable $throwable) { + $output->writeln("执行失败: {$throwable->getMessage()}"); + return Command::FAILURE; + } + + return Command::SUCCESS; } /** * 启动调度器进程 */ - public function start(array $config): void + private function start(OutputInterface $output): void { if ($this->isRunning()) { - echo "[Crontab] 调度器已在运行中, PID: " . $this->getPid() . PHP_EOL; + $output->writeln("Crontab 调度器已在运行中, PID: " . $this->getPid() . ""); return; } $count = TaskRegistry::count(); if ($count === 0) { - echo "[Crontab] 警告: 没有注册任何任务" . PHP_EOL; + $output->writeln("Crontab 警告: 没有注册任何任务"); } - echo "[Crontab] 启动调度器..." . PHP_EOL; - echo "[Crontab] 已注册 {$count} 个任务" . PHP_EOL; + $output->writeln("Crontab 启动调度器..."); + $output->writeln("Crontab 已注册 {$count} 个任务"); - $process = new \Swoole\Process(function (\Swoole\Process $worker) use ($config) { - file_put_contents($this->pidFile, (string)$worker->pid); + $config = $this->loadConfig(); + + $pidFile = $this->pidFile; + + $process = new Process(function (Process $worker) use ($config, $pidFile) { + file_put_contents($pidFile, (string)$worker->pid); + + // 注册配置文件中声明的任务 + $taskList = $config['tasks'] ?? []; + foreach ($taskList as $taskConfig) { + try { + TaskRegistry::register($taskConfig); + } catch (\Throwable $throwable) { + fwrite(STDERR, "任务注册失败: {$throwable->getMessage()}" . PHP_EOL); + } + } $crontabProcess = new CrontabProcess($config); - $crontabProcess->run(); + $crontabProcess->onShutdown($worker)->process($worker); }, false, 0, true); $pid = $process->start(); - echo "[Crontab] 调度器已启动, PID: {$pid}" . PHP_EOL; - echo "[Crontab] PID 文件: {$this->pidFile}" . PHP_EOL; + $output->writeln("Crontab 调度器已启动, PID: {$pid}"); + $output->writeln("Crontab PID 文件: {$pidFile}"); - \Swoole\Process::wait(); + Process::wait(); } /** * 停止调度器进程 */ - public function stop(): void + private function stop(OutputInterface $output): void { if (!$this->isRunning()) { - echo "[Crontab] 调度器未运行" . PHP_EOL; + $output->writeln("Crontab 调度器未运行"); return; } $pid = $this->getPid(); - if ($pid > 0 && \Swoole\Process::kill($pid, 0)) { - \Swoole\Process::kill($pid, SIGTERM); - echo "[Crontab] 已发送停止信号, PID: {$pid}" . PHP_EOL; + if ($pid > 0 && Process::kill($pid, 0)) { + Process::kill($pid, SIGTERM); + $output->writeln("Crontab 已发送停止信号, PID: {$pid}"); + // 等待进程优雅退出,最多等待 10 秒 $timeout = 10; - while ($timeout > 0 && \Swoole\Process::kill($pid, 0)) { + while ($timeout > 0 && Process::kill($pid, 0)) { usleep(200000); $timeout--; } - if (\Swoole\Process::kill($pid, 0)) { - \Swoole\Process::kill($pid, SIGKILL); - echo "[Crontab] 进程未响应, 已强制终止" . PHP_EOL; + if (Process::kill($pid, 0)) { + Process::kill($pid, SIGKILL); + $output->writeln("Crontab 进程未响应, 已强制终止"); } } @@ -98,34 +162,58 @@ class CrontabCommand unlink($this->pidFile); } - echo "[Crontab] 调度器已停止" . PHP_EOL; + $output->writeln("Crontab 调度器已停止"); } /** * 重启调度器进程 */ - public function restart(array $config): void + private function restart(OutputInterface $output): void { - echo "[Crontab] 重启调度器..." . PHP_EOL; - $this->stop(); + $output->writeln("Crontab 重启调度器..."); + $this->stop($output); usleep(500000); - $this->start($config); + $this->start($output); } /** * 查看调度器状态 */ - public function status(): void + private function status(OutputInterface $output): void { if ($this->isRunning()) { $pid = $this->getPid(); - echo "[Crontab] 状态: 运行中" . PHP_EOL; - echo "[Crontab] PID: {$pid}" . PHP_EOL; + $output->writeln("Crontab 状态: 运行中"); + $output->writeln("Crontab PID: {$pid}"); } else { - echo "[Crontab] 状态: 未运行" . PHP_EOL; + $output->writeln("Crontab 状态: 未运行"); } } + /** + * 加载 crontab 配置文件 + * + * @return array 配置数组 + */ + private function loadConfig(): array + { + $configFiles = [ + getcwd() . '/config/crontab.php', + dirname(__DIR__) . '/config/crontab.php', + ]; + + foreach ($configFiles as $file) { + if (file_exists($file)) { + $config = require $file; + if (is_array($config)) { + return $config; + } + } + } + + throw new \RuntimeException('未找到配置文件 config/crontab.php'); + } + /** * 检查调度器是否在运行 */ @@ -135,7 +223,7 @@ class CrontabCommand if ($pid <= 0) { return false; } - return \Swoole\Process::kill($pid, 0); + return Process::kill($pid, 0); } /** diff --git a/src/CrontabProcess.php b/src/CrontabProcess.php index 65900f1..4db485e 100644 --- a/src/CrontabProcess.php +++ b/src/CrontabProcess.php @@ -3,196 +3,224 @@ declare(strict_types=1); namespace Kiri\Crontab; +use Kiri\Server\Processes\AbstractProcess; use Psr\Log\LoggerInterface; use Psr\Log\NullLogger; use Swoole\Coroutine; +use Swoole\Process; /** - * Crontab Swoole 进程 — 作为独立进程运行任务调度器 + * Crontab 自定义进程 — 继承框架统一 Process 基类 * * 两种运行模式: - * 1. 独立模式: 直接调用 run() 启动,自建 Swoole 事件循环 - * 2. kiri-core 集成模式: 注册为自定义 Process,由 kiri-http-server 管理生命周期 + * 1. kiri-core 集成模式: 通过 config/servers.php 的 process 数组注册 + * → TraitProcess::genProcess 创建 Swoole\Process → onShutdown → process() + * 2. 独立模式: bin/crontab 直接 fork 子进程,调用 run() → process() * * 任务注册方式: - * - 配置模式: config/crontab.php 的 tasks 中声明 - * - 注解模式: 任务类上使用 #[Crontab] 注解 + CrontabScanner 自动扫描 + * - 配置模式: config/crontab.php 的 tasks 中声明 (自动注册) + * - 注解模式: 任务类上使用 #[Crontab] 注解 (启动时自动发现) * 两种方式可同时使用 * - * 独立模式示例: - * $process = new CrontabProcess($config, $registry); - * $process->run(); + * kiri-core 集成示例: + * // config/servers.php + * 'process' => [ + * \Kiri\Crontab\CrontabProcess::class, + * ], * - * kiri-core 集成模式: - * 在 config/servers.php 的 process 中添加 CrontabProcess::class 即可 + * 独立模式示例: + * php bin/crontab start */ -class CrontabProcess +class CrontabProcess extends AbstractProcess { - private ?CrontabScheduler $scheduler = null; + /** @var bool 启用协程信号等待 */ + protected bool $enable_coroutine = true; - /** - * @param array $config 配置数组 (crontab.php 的完整内容) - * @param LoggerInterface|null $logger 日志记录器 - */ - public function __construct( - private array $config, - private ?LoggerInterface $logger = null, - ) { - if ($this->logger === null) { - $this->logger = new NullLogger(); - } - } + private ?CrontabScheduler $scheduler = null; - /** - * 扫描注解任务并启动进程 (独立模式) - * - * @throws \Throwable - */ - public function run(): void - { - $this->logger->info('[CrontabProcess] 进程启动中...'); + /** @var LoggerInterface|null 日志记录器 (独立模式兜底) */ + private ?LoggerInterface $logger; - $this->discoverAnnotationTasks(); + /** + * @param array $config 配置数组 (crontab.php 的完整内容) + * 空数组时尝试从 kiri-core 配置系统加载 + * @param LoggerInterface|null $logger 日志记录器 + */ + public function __construct(private array $config = [], ?LoggerInterface $logger = null) + { + parent::__construct(); + $this->logger = $logger ?? new NullLogger; - Coroutine::run(function () { - $this->startScheduler(); - }); + // kiri-core 集成模式下,通过配置系统加载 crontab 配置 + if (empty($this->config) && function_exists('config')) { + $this->config = config('crontab', []); + } + } - $this->logger->info('[CrontabProcess] 进程已退出'); - } + /** + * 进程名称 + */ + public function getName(): string + { + return "Crontab Scheduler"; + } - /** - * 在 kiri-core 自定义进程中启动调度器 - * 由框架的 AbstractProcess 生命周期调用 - * - * @throws \Throwable - */ - public function boot(): void - { - $this->logger->info('[CrontabProcess] 在 kiri-core 进程内启动...'); + /** + * kiri-core 集成入口 — 由 AbstractProcess::onShutdown 调用 + * 在独立的 Swoole Process 中启动任务调度器 + * + * @param Process|null $process Swoole 进程对象 + */ + public function process(?Process $process): void + { + $this->logger->info('[CrontabProcess] 进程启动中...'); - $this->discoverAnnotationTasks(); + $this->registerConfigTasks(); + $this->discoverAnnotationTasks(); + $this->startScheduler(); - $this->startScheduler(); - } + $this->logger->info('[CrontabProcess] 进程已退出'); + } - /** - * 获取当前调度器实例 - */ - public function getScheduler(): ?CrontabScheduler - { - return $this->scheduler; - } + /** + * 收到 SIGTERM 信号时优雅停止调度器 + */ + public function onSigterm(): void + { + $this->logger->info('[CrontabProcess] 收到停止信号,正在关闭调度器...'); + $this->scheduler?->stop(); + } - /** - * 从已声明的类中发现 #[Crontab] 类注解并注册到 TaskRegistry - * CrontabProcess 启动时自动调用,检查所有已加载的类 - */ - private function discoverAnnotationTasks(): void - { - $beforeCount = TaskRegistry::count(); + /** + * 独立模式入口 — 在 Coroutine::run 中启动调度器 + * 供 bin/crontab 独立运行模式使用 + * + * @throws \Throwable + */ + public function run(): void + { + Coroutine\run(fn() => $this->process(null)); + } - foreach (get_declared_classes() as $className) { - if (!in_array(TaskInterface::class, class_implements($className), true)) { - continue; - } + /** + * 获取当前调度器实例 + */ + public function getScheduler(): ?CrontabScheduler + { + return $this->scheduler; + } - try { - $reflect = new \ReflectionClass($className); - if ($reflect->isAbstract()) { - continue; - } + /** + * 注册配置文件 (config/crontab.php) 中声明的任务 + */ + private function registerConfigTasks(): void + { + $taskList = $this->config['tasks'] ?? []; + foreach ($taskList as $taskConfig) { + try { + TaskRegistry::register($taskConfig); + } catch (\Throwable $throwable) { + $this->logger->warning("[CrontabProcess] 配置任务注册失败: {$throwable->getMessage()}"); + } + } + } - // 读取类上的 #[Crontab] 注解 - $attributes = $reflect->getAttributes(Annotate\Crontab::class); - if (empty($attributes)) { - continue; - } + /** + * 从已声明的类中发现 #[Crontab] 类注解并注册到 TaskRegistry + * 启动时自动调用,检查所有已加载的类 + */ + private function discoverAnnotationTasks(): void + { + $beforeCount = TaskRegistry::count(); - /** @var Annotate\Crontab $instance */ - $instance = $attributes[0]->newInstance(); + foreach (get_declared_classes() as $className) { + if (!in_array(TaskInterface::class, class_implements($className), true)) { + continue; + } - $scheduleExpression = $instance->buildExpression(); - if ($scheduleExpression === '') { - continue; - } + try { + $reflect = new \ReflectionClass($className); + if ($reflect->isAbstract()) { + continue; + } - TaskRegistry::register([ - 'class' => $className, - 'name' => $instance->name !== '' ? $instance->name : $className, - 'expression' => $scheduleExpression, - 'status' => $instance->status, - ]); - } catch (\Throwable) { - // 跳过无法反射或注册失败的类 - } - } + $attributes = $reflect->getAttributes(Annotate\Crontab::class); + if (empty($attributes)) { + continue; + } - $afterCount = TaskRegistry::count(); - if ($afterCount > $beforeCount) { - $this->logger->info("[CrontabProcess] 发现注解任务 " . ($afterCount - $beforeCount) . " 个"); - } - } + /** @var Annotate\Crontab $instance */ + $instance = $attributes[0]->newInstance(); - /** - * 初始化并启动调度器 - */ - private function startScheduler(): void - { - $redis = $this->createRedisConnection(); - $cronExpression = new CronExpression(); + $scheduleExpression = $instance->buildExpression(); + if ($scheduleExpression === '') { + continue; + } - $schedulerConfig = $this->config['scheduler'] ?? []; + TaskRegistry::register([ + 'class' => $className, + 'name' => $instance->name !== '' ? $instance->name : $className, + 'expression' => $scheduleExpression, + 'status' => $instance->status, + ]); + } catch (\Throwable $throwable) { + $this->logger->warning("[CrontabProcess] 注解扫描失败: {$className} - {$throwable->getMessage()}"); + } + } - $this->scheduler = new CrontabScheduler( - redis: $redis, - cronExpression: $cronExpression, - logger: $this->logger, - tickInterval: (int)($schedulerConfig['tick_interval'] ?? 1), - taskTimeout: (int)($schedulerConfig['task_timeout'] ?? 300), - lockTtl: (int)($schedulerConfig['lock_ttl'] ?? 60), - lockRenewInterval: (int)($schedulerConfig['lock_renew_interval'] ?? 15), - concurrentTasks: (bool)($schedulerConfig['concurrent_tasks'] ?? true), - maxConcurrent: (int)($schedulerConfig['max_concurrent'] ?? 10), - ); + $afterCount = TaskRegistry::count(); + if ($afterCount > $beforeCount) { + $this->logger->info("[CrontabProcess] 发现注解任务 " . ($afterCount - $beforeCount) . " 个"); + } + } - $this->scheduler->start(); - } + /** + * 初始化并启动调度器 + */ + private function startScheduler(): void + { + $redis = $this->createRedisConnection(); + $cronExpression = new CronExpression; + $schedulerConfig = $this->config['scheduler'] ?? []; + $this->scheduler = new CrontabScheduler(redis: $redis, cronExpression: $cronExpression, logger: $this->logger, tickInterval: (int)($schedulerConfig['tick_interval'] ?? 1), taskTimeout: (int)($schedulerConfig['task_timeout'] ?? 300), lockTtl: (int)($schedulerConfig['lock_ttl'] ?? 60), lockRenewInterval: (int)($schedulerConfig['lock_renew_interval'] ?? 15), concurrentTasks: (bool)($schedulerConfig['concurrent_tasks'] ?? true), maxConcurrent: (int)($schedulerConfig['max_concurrent'] ?? 10),); - /** - * 创建 Redis 连接 - * 在独立模式下直接创建连接,不通过 kiri-core 的连接池 - */ - private function createRedisConnection(): \Redis - { - $redisConfig = $this->config['redis'] ?? []; + $this->scheduler->start(); + } - $host = $redisConfig['host'] ?? '127.0.0.1'; - $port = (int)($redisConfig['port'] ?? 6379); - $auth = $redisConfig['auth'] ?? ''; - $databases = (int)($redisConfig['databases'] ?? 0); - $timeout = (int)($redisConfig['timeout'] ?? 30); - $prefix = $redisConfig['prefix'] ?? ''; + /** + * 创建 Redis 连接 + * 在独立模式下直接创建连接,不通过 kiri-core 的连接池 + */ + private function createRedisConnection(): \Redis + { + $redisConfig = $this->config['redis'] ?? []; - $redis = new \Redis(); - if (!$redis->connect($host, $port, $timeout)) { - throw new \RuntimeException("Redis 连接失败: {$host}:{$port}"); - } + $host = $redisConfig['host'] ?? '127.0.0.1'; + $port = (int)($redisConfig['port'] ?? 6379); + $auth = $redisConfig['auth'] ?? ''; + $databases = (int)($redisConfig['databases'] ?? 0); + $timeout = (int)($redisConfig['timeout'] ?? 30); + $prefix = $redisConfig['prefix'] ?? ''; - if (!empty($auth) && !$redis->auth($auth)) { - throw new \RuntimeException("Redis 认证失败: {$redis->getLastError()}"); - } + $redis = new \Redis; + if (!$redis->connect($host, $port, $timeout)) { + throw new \RuntimeException("Redis 连接失败: {$host}:{$port}"); + } - $redis->select($databases); + if (!empty($auth) && !$redis->auth($auth)) { + throw new \RuntimeException("Redis 认证失败: {$redis->getLastError()}"); + } - if (!empty($prefix)) { - $redis->setOption(\Redis::OPT_PREFIX, $prefix); - } + $redis->select($databases); - $this->logger->info("[CrontabProcess] Redis 已连接: {$host}:{$port} DB:{$databases}"); + if (!empty($prefix)) { + $redis->setOption(\Redis::OPT_PREFIX, $prefix); + } - return $redis; - } + $this->logger->info("[CrontabProcess] Redis 已连接: {$host}:{$port} DB:{$databases}"); + + return $redis; + } } diff --git a/src/CrontabProviders.php b/src/CrontabProviders.php index 9e2bad5..de139c3 100644 --- a/src/CrontabProviders.php +++ b/src/CrontabProviders.php @@ -4,10 +4,12 @@ declare(strict_types=1); namespace Kiri\Crontab; use Kiri\Abstracts\Providers; -use Symfony\Component\Console\Application; +use Kiri\Application; +use Psr\Container\ContainerExceptionInterface; +use Psr\Container\NotFoundExceptionInterface; /** - * kiri-core 框架集成 Provider — 将 CrontabCommand 注册到 Symfony Console + * kiri-core 框架集成 Provider — 将 CrontabCommand 注册到 Console Application * * 在 kiri-core 项目的 config/servers.php 中配置: * ```php @@ -15,28 +17,21 @@ use Symfony\Component\Console\Application; * \Kiri\Crontab\CrontabProcess::class, * ], * ``` - * - * 在应用 Kernel 的 getCommands() 中添加: - * ```php - * public function getCommands(): array - * { - * return [ - * \Kiri\Crontab\CrontabCommand::class, - * ]; - * } - * ``` */ class CrontabProviders extends Providers { /** * 注册 CrontabCommand 到 Console Application + * + * @return void + * @throws ContainerExceptionInterface + * @throws NotFoundExceptionInterface */ public function onImport(): void { - $command = $this->container->get(CrontabCommand::class); $console = $this->container->get(Application::class); - $console->addCommand($command); + $console->command(CrontabCommand::class); } } diff --git a/src/CrontabScheduler.php b/src/CrontabScheduler.php index 126a3a2..89a0235 100644 --- a/src/CrontabScheduler.php +++ b/src/CrontabScheduler.php @@ -3,6 +3,10 @@ declare(strict_types=1); namespace Kiri\Crontab; +use Kiri\Abstracts\Component; +use Kiri\Crontab\Events\OnTaskBeforeExecute; +use Kiri\Crontab\Events\OnTaskExecuted; +use Kiri\Crontab\Events\OnTaskFailed; use Psr\Log\LoggerInterface; use Psr\Log\NullLogger; use Swoole\Coroutine; @@ -17,6 +21,7 @@ use Swoole\Coroutine; * 4. 执行任务并更新下次调度时间 * 5. 支持协程并发执行多个到期任务 * 6. 支持运行时动态投递和取消任务 + * 7. 通过 kiri-core 事件系统分发任务生命周期事件 * * Redis 数据结构: * crontab:queue — ZSET, score=下次执行时间戳, member=taskKey @@ -25,7 +30,7 @@ use Swoole\Coroutine; * crontab:lock:task:{key} — String, 任务执行锁 * crontab:running — SET, 当前执行中的任务 */ -class CrontabScheduler +class CrontabScheduler extends Component { /** @var string Redis key 前缀 */ @@ -49,19 +54,24 @@ class CrontabScheduler /** @var int 默认主锁 TTL (秒) */ private const DEFAULT_LOCK_TTL = 60; - private bool $running = false; + /** @var int 每次拉取到期任务的最大数量 */ + private const MAX_DUE_TASKS_PER_TICK = 100; - /** @var string|null 当前正在执行的任务 key (协程安全) */ - private ?string $currentTaskKey = null; + private bool $running = false; /** @var self|null 全局实例引用,供任务内部访问 */ private static ?self $instance = null; + /** @var LoggerInterface|null 兜底日志,当容器未初始化时使用 */ + private ?LoggerInterface $fallbackLogger; + + /** @var string|null 非协程环境下当前任务 key 回退存储 */ + private ?string $fallbackCurrentTaskKey = null; + /** * @param \Redis $redis Redis 客户端 - * @param TaskRegistry $registry 任务注册中心 * @param CronExpression $cronExpression Cron 表达式解析器 - * @param LoggerInterface $logger 日志记录器 + * @param LoggerInterface|null $logger 日志记录器(容器不可用时的兜底) * @param int $tickInterval tick 间隔 (秒) * @param int $taskTimeout 任务执行超时 (秒) * @param int $lockTtl 主锁 TTL (秒) @@ -72,7 +82,7 @@ class CrontabScheduler public function __construct( private \Redis $redis, private CronExpression $cronExpression, - private LoggerInterface $logger = new NullLogger(), + ?LoggerInterface $logger = null, private int $tickInterval = self::DEFAULT_TICK_INTERVAL, private int $taskTimeout = self::DEFAULT_TASK_TIMEOUT, private int $lockTtl = self::DEFAULT_LOCK_TTL, @@ -80,6 +90,8 @@ class CrontabScheduler private bool $concurrentTasks = true, private int $maxConcurrent = 10, ) { + parent::__construct(); + $this->fallbackLogger = $logger; self::$instance = $this; } @@ -102,7 +114,7 @@ class CrontabScheduler { $this->running = true; - $this->logger->info('[CrontabScheduler] 调度器启动,注册 ' . TaskRegistry::count() . ' 个任务'); + $this->logInfo('[CrontabScheduler] 调度器启动,注册 ' . TaskRegistry::count() . ' 个任务'); $this->syncTasks(); @@ -125,12 +137,12 @@ class CrontabScheduler try { $this->tick(); } catch (\Throwable $throwable) { - $this->logger->error('[CrontabScheduler] tick 异常: ' . $throwable->getMessage()); + $this->logError('[CrontabScheduler] tick 异常: ' . $throwable->getMessage()); } } self::$instance = null; - $this->logger->info('[CrontabScheduler] 调度器已停止'); + $this->logInfo('[CrontabScheduler] 调度器已停止'); } /** @@ -188,7 +200,7 @@ class CrontabScheduler $this->persistNewTask($taskConfig); - $this->logger->info("[CrontabScheduler] 动态投递任务: {$taskKey} '{$taskConfig->name}' '{$expression}'"); + $this->logInfo("[CrontabScheduler] 动态投递任务: {$taskKey} '{$taskConfig->name}' '{$expression}'"); return $taskKey; } @@ -211,7 +223,7 @@ class CrontabScheduler $this->redis->del($hashKey); $this->redis->del($this->getTaskLockKey($taskKey)); - $this->logger->info("[CrontabScheduler] 任务已取消: {$taskKey}"); + $this->logInfo("[CrontabScheduler] 任务已取消: {$taskKey}"); return true; } @@ -231,17 +243,37 @@ class CrontabScheduler */ public function cancelCurrentTask(): void { - if ($this->currentTaskKey !== null) { - $this->markTaskForRemoval($this->currentTaskKey); + $taskKey = $this->getCurrentTaskKey(); + if ($taskKey !== null) { + $this->markTaskForRemoval($taskKey); } } /** * 获取当前正在执行的任务 key (供任务内部使用) + * 协程环境通过 Swoole Coroutine Context 实现协程安全 + * 非协程环境回退到实例属性 */ public function getCurrentTaskKey(): ?string { - return $this->currentTaskKey; + $context = Coroutine::getContext(); + if ($context === null) { + return $this->fallbackCurrentTaskKey; + } + return $context['crontab_current_task_key'] ?? null; + } + + /** + * 设置当前协程正在执行的任务 key + */ + private function setCurrentTaskKey(?string $taskKey): void + { + $context = Coroutine::getContext(); + if ($context === null) { + $this->fallbackCurrentTaskKey = $taskKey; + return; + } + $context['crontab_current_task_key'] = $taskKey; } // ────────────────────────────────────────────── @@ -250,11 +282,15 @@ class CrontabScheduler /** * 同步任务到 Redis + * 将 Registry 中的任务写入 Redis,同时清理 Redis 中已不在 Registry 的幽灵任务 */ public function syncTasks(): void { + $registryTaskKeys = []; + foreach (TaskRegistry::all() as $taskKey => $taskConfig) { $hashKey = $this->getTaskHashKey($taskKey); + $registryTaskKeys[] = $hashKey; if (!$this->redis->exists($hashKey)) { $this->persistNewTask($taskConfig); @@ -263,6 +299,41 @@ class CrontabScheduler $this->mergeExistingTask($taskConfig); } + + $this->cleanOrphanedTasks($registryTaskKeys); + } + + /** + * 清理 Redis 中已不在 Registry 的幽灵任务 + * + * @param array $activeHashKeys 当前 Registry 中存在的 hash key 列表 + */ + private function cleanOrphanedTasks(array $activeHashKeys): void + { + $activeHashKeyMap = array_flip($activeHashKeys); + + $redisTaskKeys = $this->redis->keys(self::KEY_PREFIX . ':task:*'); + if (empty($redisTaskKeys)) { + return; + } + + $removedCount = 0; + foreach ($redisTaskKeys as $hashKey) { + if (isset($activeHashKeyMap[$hashKey])) { + continue; + } + + $taskKey = substr($hashKey, strlen(self::KEY_PREFIX . ':task:')); + $this->removeFromQueue($taskKey); + $this->redis->del($hashKey); + $this->redis->del($this->getTaskLockKey($taskKey)); + $this->redis->del('crontab:removal:' . $taskKey); + $removedCount++; + } + + if ($removedCount > 0) { + $this->logInfo("[CrontabScheduler] 清理幽灵任务 {$removedCount} 个"); + } } /** @@ -301,7 +372,7 @@ class CrontabScheduler $this->redis->hSet($hashKey, 'status', 'paused'); $this->redis->zRem(self::QUEUE_KEY, $taskKey); - $this->logger->info("[CrontabScheduler] 任务已暂停: {$taskKey}"); + $this->logInfo("[CrontabScheduler] 任务已暂停: {$taskKey}"); return true; } @@ -328,7 +399,7 @@ class CrontabScheduler $this->redis->zAdd(self::QUEUE_KEY, $nextRun, $taskKey); } - $this->logger->info("[CrontabScheduler] 任务已恢复: {$taskKey}"); + $this->logInfo("[CrontabScheduler] 任务已恢复: {$taskKey}"); return true; } @@ -393,15 +464,17 @@ class CrontabScheduler { $now = time(); - $dueTaskKeys = $this->redis->zRangeByScore(self::QUEUE_KEY, '-inf', $now, ['limit' => [0, 100]]); + $dueTaskKeys = $this->redis->zRangeByScore( + self::QUEUE_KEY, + '-inf', + $now, + ['limit' => [0, self::MAX_DUE_TASKS_PER_TICK]] + ); if (empty($dueTaskKeys)) { - $this->renewMasterLock(); return; } - $this->renewMasterLock(); - if ($this->concurrentTasks) { $this->executeTasksConcurrently($dueTaskKeys, $now); } else { @@ -413,13 +486,34 @@ class CrontabScheduler /** * 并发执行到期任务 (通过协程) + * 在执行期间持续续期主锁,防止并发任务耗时超过锁 TTL + * 非协程环境下自动回退到顺序执行 */ private function executeTasksConcurrently(array $taskKeys, int $now): void { + // 非协程环境回退到顺序执行,避免死锁 + if (Coroutine::getCid() <= 0) { + foreach ($taskKeys as $taskKey) { + $this->executeSingleTask($taskKey, $now); + } + return; + } + $batchSize = min(count($taskKeys), $this->maxConcurrent); $channel = new Coroutine\Channel($batchSize); + // 启动主锁续期协程,在并发任务执行期间持续续期 + $stopRenewal = false; + Coroutine::create(function () use (&$stopRenewal) { + while (!$stopRenewal) { + Coroutine::sleep($this->lockRenewInterval); + if (!$stopRenewal) { + $this->renewMasterLock(); + } + } + }); + foreach ($taskKeys as $taskKey) { $channel->push(true); @@ -427,17 +521,21 @@ class CrontabScheduler try { $this->executeSingleTask($taskKey, $now); } catch (\Throwable $throwable) { - $this->logger->error("[CrontabScheduler] 并发执行异常: {$taskKey} - {$throwable->getMessage()}"); + $this->logError("[CrontabScheduler] 并发执行异常: {$taskKey} - {$throwable->getMessage()}"); } finally { $channel->pop(); } }); } + // 等待所有协程完成 for ($i = 0; $i < $batchSize; $i++) { $channel->push(true); } $channel->close(); + + // 停止锁续期协程 + $stopRenewal = true; } /** @@ -461,19 +559,25 @@ class CrontabScheduler } if (!$this->acquireTaskLock($taskKey)) { - $this->logger->warning("[CrontabScheduler] 任务锁获取失败 (可能仍在执行中): {$taskKey}"); + $this->logWarning("[CrontabScheduler] 任务锁获取失败 (可能仍在执行中): {$taskKey}"); return; } $startTime = microtime(true); - // 记录当前任务 key,供任务内部通过 cancelCurrentTask() 取消自身 - $this->currentTaskKey = $taskKey; + // 通过 Swoole Coroutine Context 记录当前任务 key,确保协程安全 + $this->setCurrentTaskKey($taskKey); try { $this->redis->sAdd(self::RUNNING_SET_KEY, $taskKey); - $this->logger->info("[CrontabScheduler] 开始执行: {$taskKey} ({$config->name})"); + $this->logInfo("[CrontabScheduler] 开始执行: {$taskKey} ({$config->name})"); + + $this->dispatchEvent(new OnTaskBeforeExecute( + taskKey: $taskKey, + className: $config->className, + taskName: $config->name, + )); $className = $config->className; if (!class_exists($className)) { @@ -486,11 +590,15 @@ class CrontabScheduler $duration = round(microtime(true) - $startTime, 4); - $this->logger->info("[CrontabScheduler] 执行成功: {$taskKey} 耗时 {$duration}s"); + $this->logInfo("[CrontabScheduler] 执行成功: {$taskKey} 耗时 {$duration}s"); + + $this->finalizeTaskSuccess($config, $now, $duration); } catch (\Throwable $throwable) { $duration = round(microtime(true) - $startTime, 4); - $this->logger->error("[CrontabScheduler] 执行失败: {$taskKey} 耗时 {$duration}s 错误: {$throwable->getMessage()}"); + $this->logError("[CrontabScheduler] 执行失败: {$taskKey} 耗时 {$duration}s 错误: {$throwable->getMessage()}"); + + $this->finalizeTaskFailure($config, $now, $duration, $throwable); } finally { $this->redis->sRem(self::RUNNING_SET_KEY, $taskKey); @@ -502,20 +610,54 @@ class CrontabScheduler $this->redis->del($hashKey); $this->redis->del($this->getTaskLockKey($taskKey)); $this->clearRemovalFlag($taskKey); - $this->logger->info("[CrontabScheduler] 任务已自毁: {$taskKey}"); + $this->logInfo("[CrontabScheduler] 任务已自毁: {$taskKey}"); } else { - $this->finalizeTaskExecution($config, $now); + $this->finalizeTaskScheduling($config, $now); } $this->releaseTaskLock($taskKey); - $this->currentTaskKey = null; + $this->setCurrentTaskKey(null); } } /** - * 完成执行后的任务状态更新 + * 任务执行成功后的生命周期处理 */ - private function finalizeTaskExecution(TaskConfig $config, int $now): void + private function finalizeTaskSuccess(TaskConfig $config, int $now, float $duration): void + { + $nextRun = $this->cronExpression->getNextRunTime($config->expression, $now); + $isOneShot = $this->cronExpression->isOneShot($config->expression); + + $this->dispatchEvent(new OnTaskExecuted( + taskKey: $config->taskKey, + className: $config->className, + taskName: $config->name, + duration: $duration, + nextRun: $isOneShot ? 0 : $nextRun, + )); + } + + /** + * 任务执行失败后的生命周期处理 + */ + private function finalizeTaskFailure(TaskConfig $config, int $now, float $duration, \Throwable $error): void + { + $nextRun = $this->cronExpression->getNextRunTime($config->expression, $now); + + $this->dispatchEvent(new OnTaskFailed( + taskKey: $config->taskKey, + className: $config->className, + taskName: $config->name, + error: $error, + duration: $duration, + nextRun: $nextRun, + )); + } + + /** + * 完成执行后的任务调度状态更新 + */ + private function finalizeTaskScheduling(TaskConfig $config, int $now): void { $taskKey = $config->taskKey; $hashKey = $this->getTaskHashKey($taskKey); @@ -525,7 +667,7 @@ class CrontabScheduler if ($isOneShot) { $this->removeFromQueue($taskKey); $this->redis->del($hashKey); - $this->logger->info("[CrontabScheduler] 一次性任务已移除: {$taskKey}"); + $this->logInfo("[CrontabScheduler] 一次性任务已移除: {$taskKey}"); return; } @@ -547,10 +689,23 @@ class CrontabScheduler } } + /** + * 通过 kiri-core 事件系统分发事件 + * 容器不可用时静默跳过 + */ + private function dispatchEvent(object $event): void + { + try { + $this->getDispatch()->dispatch($event); + } catch (\Throwable) { + // 容器未初始化或事件系统不可用时跳过 + } + } + /** * 标记任务为"下次执行后移除" * 当任务内部调用 cancelCurrentTask() 时,不立即删除(执行中操作安全),仅标记 - * 等当前执行完成后,executeSingleTask 的 finally 块会检查此标记并清理 + * 等当前执行完成后,finalize 块会检查此标记并清理 */ private function markTaskForRemoval(string $taskKey): void { @@ -621,7 +776,7 @@ class CrontabScheduler $this->redis->zAdd(self::QUEUE_KEY, $nextRun, $taskConfig->taskKey); } - $this->logger->info("[CrontabScheduler] 新任务已注册: {$taskConfig->taskKey} 下次执行: " . date('Y-m-d H:i:s', $nextRun)); + $this->logInfo("[CrontabScheduler] 新任务已注册: {$taskConfig->taskKey} 下次执行: " . date('Y-m-d H:i:s', $nextRun)); } /** @@ -680,4 +835,41 @@ class CrontabScheduler return self::KEY_PREFIX . ':lock:task:' . $taskKey; } + // ────────────────────────────────────────────── + // 日志辅助 — 兼容容器未初始化的场景 + // ────────────────────────────────────────────── + + private function logInfo(string $message): void + { + $this->log('info', $message); + } + + private function logError(string $message): void + { + $this->log('error', $message); + } + + private function logWarning(string $message): void + { + $this->log('warning', $message); + } + + /** + * 统一日志输出入口 + * 优先使用 Component::getLogger(),容器不可用时回退到注入的 fallbackLogger + */ + private function log(string $level, string $message): void + { + try { + $logger = $this->getLogger(); + $logger->{$level}($message); + return; + } catch (\Throwable) { + } + + if ($this->fallbackLogger instanceof LoggerInterface) { + $this->fallbackLogger->{$level}($message); + } + } + }